rdev/internal/claudebox/executor.go
jordan 3b35900a2d feat: enterprise worker pool with HTTP sidecar pattern
Implements horizontally-scalable worker pool architecture:
- claudebox-sidecar: HTTP server for Claude Code, git, and SDLC ops
- rdev-worker: standalone worker binary polling rdev-api for tasks
- HTTP client adapter for sidecar communication
- HPA with custom Prometheus metrics for autoscaling
- ServiceMonitor for metrics scraping

Code review fixes applied:
- URL-encode query parameters in GitStatus (Critical #1)
- Remove unused shellQuote function (Critical #2)
- Use stdlib strings.Split/TrimSpace (Critical #3)
- Add version injection via ldflags (Warning #4)
- Add debug logging for swallowed git/sdlc errors (Warning #5, #6)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-05 16:21:11 -07:00

334 lines
7.3 KiB
Go

package claudebox
import (
"bufio"
"context"
"fmt"
"io"
"os/exec"
"strings"
"sync"
"time"
)
// Default allowed tools for Claude Code execution.
var defaultAllowedTools = []string{
"Bash", "Edit", "Write", "Read", "Glob", "Grep", "Task", "WebFetch", "WebSearch",
}
// Executor runs Claude Code locally in the container.
type Executor struct {
workDir string
}
// NewExecutor creates a new local executor.
func NewExecutor(workDir string) *Executor {
return &Executor{
workDir: workDir,
}
}
// ExecuteResult contains the result of a Claude Code execution.
type ExecuteResult struct {
Success bool
Output string
ExitCode int
DurationMs int64
Error error
SessionID string
FinalOutput string
}
// Execute runs Claude Code and returns the complete result.
func (e *Executor) Execute(ctx context.Context, req *ExecuteRequest) *ExecuteResult {
var output strings.Builder
start := time.Now()
result := &ExecuteResult{}
// Apply timeout if specified
if req.Timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Duration(req.Timeout)*time.Second)
defer cancel()
}
// Build command args
args := e.buildArgs(req)
// Execute claude command
cmd := exec.CommandContext(ctx, "claude", args...)
// Get working directory
workDir := req.WorkingDir
if workDir == "" {
workDir = e.workDir
}
cmd.Dir = workDir
// Capture output
stdout, err := cmd.StdoutPipe()
if err != nil {
result.Error = fmt.Errorf("stdout pipe: %w", err)
result.DurationMs = time.Since(start).Milliseconds()
return result
}
stderr, err := cmd.StderrPipe()
if err != nil {
result.Error = fmt.Errorf("stderr pipe: %w", err)
result.DurationMs = time.Since(start).Milliseconds()
return result
}
if err := cmd.Start(); err != nil {
result.Error = fmt.Errorf("start: %w", err)
result.DurationMs = time.Since(start).Milliseconds()
return result
}
// Read output
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
scanner := bufio.NewScanner(stdout)
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, 1024*1024)
for scanner.Scan() {
output.WriteString(scanner.Text())
output.WriteString("\n")
}
}()
var stderrOutput strings.Builder
go func() {
defer wg.Done()
scanner := bufio.NewScanner(stderr)
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, 1024*1024)
for scanner.Scan() {
stderrOutput.WriteString(scanner.Text())
stderrOutput.WriteString("\n")
}
}()
wg.Wait()
cmdErr := cmd.Wait()
result.DurationMs = time.Since(start).Milliseconds()
result.Output = output.String()
result.FinalOutput = output.String()
if cmdErr != nil {
if exitErr, ok := cmdErr.(*exec.ExitError); ok {
result.ExitCode = exitErr.ExitCode()
} else {
result.ExitCode = 1
result.Error = cmdErr
}
// Append stderr to error message
if stderrOutput.Len() > 0 {
if result.Error != nil {
result.Error = fmt.Errorf("%w\nstderr: %s", result.Error, stderrOutput.String())
} else {
result.Error = fmt.Errorf("stderr: %s", stderrOutput.String())
}
}
} else {
result.Success = true
}
return result
}
// StreamEventHandler is called for each event during streaming execution.
type StreamEventHandler func(StreamEvent)
// ExecuteStream runs Claude Code and streams events to the handler.
func (e *Executor) ExecuteStream(ctx context.Context, req *ExecuteRequest, handler StreamEventHandler) *ExecuteResult {
start := time.Now()
result := &ExecuteResult{}
// Apply timeout if specified
if req.Timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Duration(req.Timeout)*time.Second)
defer cancel()
}
// Build command args with stream-json output
args := e.buildStreamArgs(req)
cmd := exec.CommandContext(ctx, "claude", args...)
workDir := req.WorkingDir
if workDir == "" {
workDir = e.workDir
}
cmd.Dir = workDir
stdout, err := cmd.StdoutPipe()
if err != nil {
result.Error = fmt.Errorf("stdout pipe: %w", err)
result.DurationMs = time.Since(start).Milliseconds()
return result
}
stderr, err := cmd.StderrPipe()
if err != nil {
result.Error = fmt.Errorf("stderr pipe: %w", err)
result.DurationMs = time.Since(start).Milliseconds()
return result
}
if err := cmd.Start(); err != nil {
result.Error = fmt.Errorf("start: %w", err)
result.DurationMs = time.Since(start).Milliseconds()
return result
}
// Emit started event
handler(StreamEvent{
Type: "started",
Timestamp: time.Now().UTC().Format(time.RFC3339),
})
// Stream output
var wg sync.WaitGroup
var output strings.Builder
wg.Add(2)
go func() {
defer wg.Done()
e.streamOutput(stdout, "stdout", handler, &output)
}()
go func() {
defer wg.Done()
e.streamStderr(stderr, handler)
}()
wg.Wait()
cmdErr := cmd.Wait()
result.DurationMs = time.Since(start).Milliseconds()
result.Output = output.String()
result.FinalOutput = output.String()
if cmdErr != nil {
if exitErr, ok := cmdErr.(*exec.ExitError); ok {
result.ExitCode = exitErr.ExitCode()
} else {
result.ExitCode = 1
result.Error = cmdErr
}
handler(StreamEvent{
Type: "failed",
Content: cmdErr.Error(),
Timestamp: time.Now().UTC().Format(time.RFC3339),
})
} else {
result.Success = true
handler(StreamEvent{
Type: "completed",
Timestamp: time.Now().UTC().Format(time.RFC3339),
Data: map[string]any{
"duration_ms": result.DurationMs,
"exit_code": result.ExitCode,
},
})
}
return result
}
// buildArgs constructs Claude Code command arguments.
func (e *Executor) buildArgs(req *ExecuteRequest) []string {
args := []string{
req.Prompt,
"-p",
}
// Add allowed tools
allowedTools := req.AllowedTools
if len(allowedTools) == 0 {
allowedTools = defaultAllowedTools
}
for _, tool := range allowedTools {
args = append(args, "--allowedTools", tool)
}
return args
}
// buildStreamArgs constructs Claude Code command arguments with streaming output.
func (e *Executor) buildStreamArgs(req *ExecuteRequest) []string {
args := []string{
req.Prompt,
"-p",
"--verbose",
"--output-format", "stream-json",
}
// Add allowed tools
allowedTools := req.AllowedTools
if len(allowedTools) == 0 {
allowedTools = defaultAllowedTools
}
for _, tool := range allowedTools {
args = append(args, "--allowedTools", tool)
}
return args
}
// streamOutput reads from stdout and sends events.
func (e *Executor) streamOutput(r io.Reader, stream string, handler StreamEventHandler, output *strings.Builder) {
scanner := bufio.NewScanner(r)
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, 1024*1024)
for scanner.Scan() {
line := scanner.Text()
if line == "" {
continue
}
output.WriteString(line)
output.WriteString("\n")
handler(StreamEvent{
Type: "output",
Content: line,
Stream: stream,
Timestamp: time.Now().UTC().Format(time.RFC3339),
})
}
}
// streamStderr reads from stderr and sends error events.
func (e *Executor) streamStderr(r io.Reader, handler StreamEventHandler) {
scanner := bufio.NewScanner(r)
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, 1024*1024)
for scanner.Scan() {
line := scanner.Text()
if line == "" {
continue
}
handler(StreamEvent{
Type: "error",
Content: line,
Stream: "stderr",
Timestamp: time.Now().UTC().Format(time.RFC3339),
})
}
}