All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
- Add claude_id field to sessions (migration 026) for tracking Claude process IDs across pod restarts - Extend session repository with UpdateClaudeID and session lookup methods - Improve kubernetes executor with better error handling and exec streaming - Add claudebox client/server improvements for session lifecycle - Expand sessions handler with exec streaming endpoint - Add comprehensive tests for sessions and kubernetes executor Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
355 lines
7.9 KiB
Go
355 lines
7.9 KiB
Go
package claudebox
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"os/exec"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// Default allowed tools for Claude Code execution.
|
|
var defaultAllowedTools = []string{
|
|
"Bash", "Edit", "Write", "Read", "Glob", "Grep", "Task", "WebFetch", "WebSearch",
|
|
}
|
|
|
|
// Executor runs Claude Code locally in the container.
|
|
type Executor struct {
|
|
workDir string
|
|
}
|
|
|
|
// NewExecutor creates a new local executor.
|
|
func NewExecutor(workDir string) *Executor {
|
|
return &Executor{
|
|
workDir: workDir,
|
|
}
|
|
}
|
|
|
|
// ExecuteResult contains the result of a Claude Code execution.
|
|
type ExecuteResult struct {
|
|
Success bool
|
|
Output string
|
|
ExitCode int
|
|
DurationMs int64
|
|
Error error
|
|
SessionID string
|
|
FinalOutput string
|
|
}
|
|
|
|
// Execute runs Claude Code and returns the complete result.
|
|
func (e *Executor) Execute(ctx context.Context, req *ExecuteRequest) *ExecuteResult {
|
|
var output strings.Builder
|
|
start := time.Now()
|
|
|
|
result := &ExecuteResult{}
|
|
|
|
// Apply timeout if specified
|
|
if req.Timeout > 0 {
|
|
var cancel context.CancelFunc
|
|
ctx, cancel = context.WithTimeout(ctx, time.Duration(req.Timeout)*time.Second)
|
|
defer cancel()
|
|
}
|
|
|
|
// Build command args
|
|
args := e.buildArgs(req)
|
|
|
|
// Execute claude command
|
|
cmd := exec.CommandContext(ctx, "claude", args...)
|
|
|
|
// Get working directory
|
|
workDir := req.WorkingDir
|
|
if workDir == "" {
|
|
workDir = e.workDir
|
|
}
|
|
cmd.Dir = workDir
|
|
|
|
// Capture output
|
|
stdout, err := cmd.StdoutPipe()
|
|
if err != nil {
|
|
result.Error = fmt.Errorf("stdout pipe: %w", err)
|
|
result.DurationMs = time.Since(start).Milliseconds()
|
|
return result
|
|
}
|
|
|
|
stderr, err := cmd.StderrPipe()
|
|
if err != nil {
|
|
result.Error = fmt.Errorf("stderr pipe: %w", err)
|
|
result.DurationMs = time.Since(start).Milliseconds()
|
|
return result
|
|
}
|
|
|
|
if err := cmd.Start(); err != nil {
|
|
result.Error = fmt.Errorf("start: %w", err)
|
|
result.DurationMs = time.Since(start).Milliseconds()
|
|
return result
|
|
}
|
|
|
|
// Read output
|
|
var wg sync.WaitGroup
|
|
wg.Add(2)
|
|
|
|
go func() {
|
|
defer wg.Done()
|
|
scanner := bufio.NewScanner(stdout)
|
|
buf := make([]byte, 0, 64*1024)
|
|
scanner.Buffer(buf, 1024*1024)
|
|
for scanner.Scan() {
|
|
output.WriteString(scanner.Text())
|
|
output.WriteString("\n")
|
|
}
|
|
}()
|
|
|
|
var stderrOutput strings.Builder
|
|
go func() {
|
|
defer wg.Done()
|
|
scanner := bufio.NewScanner(stderr)
|
|
buf := make([]byte, 0, 64*1024)
|
|
scanner.Buffer(buf, 1024*1024)
|
|
for scanner.Scan() {
|
|
stderrOutput.WriteString(scanner.Text())
|
|
stderrOutput.WriteString("\n")
|
|
}
|
|
}()
|
|
|
|
wg.Wait()
|
|
cmdErr := cmd.Wait()
|
|
|
|
result.DurationMs = time.Since(start).Milliseconds()
|
|
result.Output = output.String()
|
|
result.FinalOutput = output.String()
|
|
|
|
if cmdErr != nil {
|
|
if exitErr, ok := cmdErr.(*exec.ExitError); ok {
|
|
result.ExitCode = exitErr.ExitCode()
|
|
} else {
|
|
result.ExitCode = 1
|
|
result.Error = cmdErr
|
|
}
|
|
|
|
// Append stderr to error message
|
|
if stderrOutput.Len() > 0 {
|
|
if result.Error != nil {
|
|
result.Error = fmt.Errorf("%w\nstderr: %s", result.Error, stderrOutput.String())
|
|
} else {
|
|
result.Error = fmt.Errorf("stderr: %s", stderrOutput.String())
|
|
}
|
|
}
|
|
} else {
|
|
result.Success = true
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// StreamEventHandler is called for each event during streaming execution.
|
|
type StreamEventHandler func(StreamEvent)
|
|
|
|
// ExecuteStream runs Claude Code and streams events to the handler.
|
|
func (e *Executor) ExecuteStream(ctx context.Context, req *ExecuteRequest, handler StreamEventHandler) *ExecuteResult {
|
|
start := time.Now()
|
|
result := &ExecuteResult{}
|
|
|
|
// Apply timeout if specified
|
|
if req.Timeout > 0 {
|
|
var cancel context.CancelFunc
|
|
ctx, cancel = context.WithTimeout(ctx, time.Duration(req.Timeout)*time.Second)
|
|
defer cancel()
|
|
}
|
|
|
|
// Build command args with stream-json output
|
|
args := e.buildStreamArgs(req)
|
|
|
|
cmd := exec.CommandContext(ctx, "claude", args...)
|
|
|
|
workDir := req.WorkingDir
|
|
if workDir == "" {
|
|
workDir = e.workDir
|
|
}
|
|
cmd.Dir = workDir
|
|
|
|
stdout, err := cmd.StdoutPipe()
|
|
if err != nil {
|
|
result.Error = fmt.Errorf("stdout pipe: %w", err)
|
|
result.DurationMs = time.Since(start).Milliseconds()
|
|
return result
|
|
}
|
|
|
|
stderr, err := cmd.StderrPipe()
|
|
if err != nil {
|
|
result.Error = fmt.Errorf("stderr pipe: %w", err)
|
|
result.DurationMs = time.Since(start).Milliseconds()
|
|
return result
|
|
}
|
|
|
|
if err := cmd.Start(); err != nil {
|
|
result.Error = fmt.Errorf("start: %w", err)
|
|
result.DurationMs = time.Since(start).Milliseconds()
|
|
return result
|
|
}
|
|
|
|
// Emit started event
|
|
handler(StreamEvent{
|
|
Type: "started",
|
|
Timestamp: time.Now().UTC().Format(time.RFC3339),
|
|
})
|
|
|
|
// Stream output
|
|
var wg sync.WaitGroup
|
|
var output strings.Builder
|
|
var capturedSessionID string
|
|
|
|
wg.Add(2)
|
|
|
|
go func() {
|
|
defer wg.Done()
|
|
e.streamOutput(stdout, "stdout", handler, &output, &capturedSessionID)
|
|
}()
|
|
|
|
go func() {
|
|
defer wg.Done()
|
|
e.streamStderr(stderr, handler)
|
|
}()
|
|
|
|
wg.Wait()
|
|
cmdErr := cmd.Wait()
|
|
|
|
result.DurationMs = time.Since(start).Milliseconds()
|
|
result.Output = output.String()
|
|
result.FinalOutput = output.String()
|
|
result.SessionID = capturedSessionID
|
|
|
|
if cmdErr != nil {
|
|
if exitErr, ok := cmdErr.(*exec.ExitError); ok {
|
|
result.ExitCode = exitErr.ExitCode()
|
|
} else {
|
|
result.ExitCode = 1
|
|
result.Error = cmdErr
|
|
}
|
|
|
|
handler(StreamEvent{
|
|
Type: "failed",
|
|
Content: cmdErr.Error(),
|
|
Timestamp: time.Now().UTC().Format(time.RFC3339),
|
|
})
|
|
} else {
|
|
result.Success = true
|
|
handler(StreamEvent{
|
|
Type: "completed",
|
|
Timestamp: time.Now().UTC().Format(time.RFC3339),
|
|
Data: map[string]any{
|
|
"duration_ms": result.DurationMs,
|
|
"exit_code": result.ExitCode,
|
|
},
|
|
})
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// buildArgs constructs Claude Code command arguments.
|
|
func (e *Executor) buildArgs(req *ExecuteRequest) []string {
|
|
args := []string{
|
|
"-p",
|
|
req.Prompt,
|
|
}
|
|
|
|
// Add allowed tools
|
|
allowedTools := req.AllowedTools
|
|
if len(allowedTools) == 0 {
|
|
allowedTools = defaultAllowedTools
|
|
}
|
|
for _, tool := range allowedTools {
|
|
args = append(args, "--allowedTools", tool)
|
|
}
|
|
|
|
return args
|
|
}
|
|
|
|
// buildStreamArgs constructs Claude Code command arguments with streaming output.
|
|
func (e *Executor) buildStreamArgs(req *ExecuteRequest) []string {
|
|
args := []string{
|
|
"-p",
|
|
req.Prompt,
|
|
"--verbose",
|
|
"--output-format", "stream-json",
|
|
}
|
|
|
|
if req.ResumeSessionID != "" {
|
|
args = append(args, "--resume", req.ResumeSessionID)
|
|
}
|
|
|
|
// Add allowed tools
|
|
allowedTools := req.AllowedTools
|
|
if len(allowedTools) == 0 {
|
|
allowedTools = defaultAllowedTools
|
|
}
|
|
for _, tool := range allowedTools {
|
|
args = append(args, "--allowedTools", tool)
|
|
}
|
|
|
|
return args
|
|
}
|
|
|
|
// streamOutput reads from stdout and sends events.
|
|
// capturedSessionID is optionally set when JSONL output contains a session_id field.
|
|
func (e *Executor) streamOutput(r io.Reader, stream string, handler StreamEventHandler, output *strings.Builder, capturedSessionID *string) {
|
|
scanner := bufio.NewScanner(r)
|
|
buf := make([]byte, 0, 64*1024)
|
|
scanner.Buffer(buf, 1024*1024)
|
|
|
|
for scanner.Scan() {
|
|
line := scanner.Text()
|
|
if line == "" {
|
|
continue
|
|
}
|
|
|
|
output.WriteString(line)
|
|
output.WriteString("\n")
|
|
|
|
// Parse JSONL to capture session_id from Claude stream-json output.
|
|
if capturedSessionID != nil && *capturedSessionID == "" {
|
|
var raw map[string]json.RawMessage
|
|
if err := json.Unmarshal([]byte(line), &raw); err == nil {
|
|
if sidRaw, ok := raw["session_id"]; ok {
|
|
var sid string
|
|
if _ = json.Unmarshal(sidRaw, &sid); sid != "" {
|
|
*capturedSessionID = sid
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
handler(StreamEvent{
|
|
Type: "output",
|
|
Content: line,
|
|
Stream: stream,
|
|
Timestamp: time.Now().UTC().Format(time.RFC3339),
|
|
})
|
|
}
|
|
}
|
|
|
|
// streamStderr reads from stderr and sends error events.
|
|
func (e *Executor) streamStderr(r io.Reader, handler StreamEventHandler) {
|
|
scanner := bufio.NewScanner(r)
|
|
buf := make([]byte, 0, 64*1024)
|
|
scanner.Buffer(buf, 1024*1024)
|
|
|
|
for scanner.Scan() {
|
|
line := scanner.Text()
|
|
if line == "" {
|
|
continue
|
|
}
|
|
|
|
handler(StreamEvent{
|
|
Type: "error",
|
|
Content: line,
|
|
Stream: "stderr",
|
|
Timestamp: time.Now().UTC().Format(time.RFC3339),
|
|
})
|
|
}
|
|
}
|