// Package claudecode provides a CodeAgent implementation for Anthropic's Claude Code CLI. package claudecode import ( "bufio" "context" "fmt" "io" "os/exec" "strings" "sync" "sync/atomic" "time" "github.com/orchard9/rdev/internal/domain" "github.com/orchard9/rdev/internal/port" ) // Adapter implements port.CodeAgent using Anthropic's Claude Code CLI. type Adapter struct { namespace string mu sync.RWMutex // Track active sessions for cancellation activeSessions map[string]context.CancelFunc sessionsMu sync.Mutex } // NewAdapter creates a new Claude Code adapter. func NewAdapter(namespace string) *Adapter { return &Adapter{ namespace: namespace, activeSessions: make(map[string]context.CancelFunc), } } // Ensure Adapter implements port.CodeAgent at compile time. var _ port.CodeAgent = (*Adapter)(nil) // Name returns a human-readable name for this agent. func (a *Adapter) Name() string { return "Claude Code" } // Provider returns the agent provider identifier. func (a *Adapter) Provider() domain.AgentProvider { return domain.AgentProviderClaudeCode } // Execute runs a Claude Code command and streams events to the handler. func (a *Adapter) Execute(ctx context.Context, req *domain.AgentRequest, handler domain.AgentEventHandler) (*domain.AgentResult, error) { if req.Prompt == "" { return nil, fmt.Errorf("prompt is required") } a.mu.RLock() namespace := a.namespace a.mu.RUnlock() // Create cancellable context execCtx, cancel := context.WithCancel(ctx) defer cancel() // Track session for potential cancellation sessionID := generateSessionID() a.sessionsMu.Lock() a.activeSessions[sessionID] = cancel a.sessionsMu.Unlock() defer func() { a.sessionsMu.Lock() delete(a.activeSessions, sessionID) a.sessionsMu.Unlock() }() // Get pod name from project (passed via metadata or lookup) var podName string if req.Metadata != nil { podName = req.Metadata["pod_name"] } if podName == "" { return &domain.AgentResult{ SessionID: sessionID, ExitCode: 1, Error: fmt.Errorf("pod_name is required in request metadata"), }, nil } // Build kubectl exec command for Claude Code args := a.buildCommandArgs(namespace, podName, req) // Apply timeout if specified if req.Timeout > 0 { var timeoutCancel context.CancelFunc execCtx, timeoutCancel = context.WithTimeout(execCtx, req.Timeout) defer timeoutCancel() } startTime := time.Now() kubectl := exec.CommandContext(execCtx, "kubectl", args...) // Get stdout pipe for stream-json output stdout, err := kubectl.StdoutPipe() if err != nil { return &domain.AgentResult{ SessionID: sessionID, ExitCode: 1, Error: fmt.Errorf("stdout pipe: %w", err), }, nil } // Get stderr for error messages stderr, err := kubectl.StderrPipe() if err != nil { return &domain.AgentResult{ SessionID: sessionID, ExitCode: 1, Error: fmt.Errorf("stderr pipe: %w", err), }, nil } // Start the command if err := kubectl.Start(); err != nil { return &domain.AgentResult{ SessionID: sessionID, ExitCode: 1, Error: fmt.Errorf("start: %w", err), }, nil } // Stream and parse output var wg sync.WaitGroup var finalOutput strings.Builder var stderrOutput strings.Builder var parseErr error var resultMsg *StreamMessage wg.Add(2) // Parse stream-json from stdout go func() { defer wg.Done() resultMsg, parseErr = a.parseStreamOutput(stdout, handler, &finalOutput) }() // Stream stderr as error events and capture for error message go func() { defer wg.Done() a.streamStderrCapture(stderr, handler, &stderrOutput) }() wg.Wait() // Wait for command completion cmdErr := kubectl.Wait() duration := time.Since(startTime) result := &domain.AgentResult{ SessionID: sessionID, DurationMs: duration.Milliseconds(), FinalOutput: finalOutput.String(), } // Determine exit code and error if cmdErr != nil { if exitErr, ok := cmdErr.(*exec.ExitError); ok { result.ExitCode = exitErr.ExitCode() } else { result.ExitCode = 1 result.Error = cmdErr } // Include stderr and troubleshooting help in error result.Error = a.buildErrorWithHelp(result.Error, stderrOutput.String(), namespace, podName) } else if parseErr != nil { result.ExitCode = 1 result.Error = a.buildErrorWithHelp(parseErr, stderrOutput.String(), namespace, podName) } else if resultMsg != nil && !resultMsg.IsSuccess() { result.ExitCode = 1 if resultMsg.Error != "" { result.Error = a.buildErrorWithHelp(fmt.Errorf("%s", resultMsg.Error), stderrOutput.String(), namespace, podName) } else { result.Error = a.buildErrorWithHelp(nil, stderrOutput.String(), namespace, podName) } } // Update session ID from result if available if resultMsg != nil && resultMsg.SessionID != "" { result.SessionID = resultMsg.SessionID } return result, nil } // defaultAllowedTools is the list of tools to allow when running Claude Code // in automated mode. Using --allowedTools instead of --dangerously-skip-permissions // because the latter is blocked when running as root (which claudebox pods do). var defaultAllowedTools = []string{ "Bash", "Edit", "Write", "Read", "Glob", "Grep", "Task", "WebFetch", "WebSearch", } // buildCommandArgs constructs the kubectl exec arguments for Claude Code. // IMPORTANT: The prompt MUST come immediately after "claude" (before other flags) // because Claude Code's CLI parser expects the positional prompt argument early. func (a *Adapter) buildCommandArgs(namespace, podName string, req *domain.AgentRequest) []string { // Start with kubectl exec and the prompt right after "claude" // This is required because Claude Code's CLI doesn't accept the prompt at the end args := []string{ "exec", "-n", namespace, podName, "--", "claude", req.Prompt, // Prompt MUST come first after "claude" "-p", // Print mode (non-interactive) "--verbose", // Required for stream-json output "--output-format", "stream-json", } // Add session continuation if resuming if req.SessionID != "" { args = append(args, "--resume", req.SessionID) } // Add allowed tools - use request's tools if specified, otherwise use defaults. // This replaces --dangerously-skip-permissions which is blocked when running as root. allowedTools := req.AllowedTools if len(allowedTools) == 0 { allowedTools = defaultAllowedTools } for _, tool := range allowedTools { args = append(args, "--allowedTools", tool) } // Add working directory if specified if req.WorkingDir != "" && req.WorkingDir != "/workspace" { args = append(args, "--add-dir", req.WorkingDir) } return args } // parseStreamOutput reads and parses NDJSON stream-json output. func (a *Adapter) parseStreamOutput(r io.Reader, handler domain.AgentEventHandler, output *strings.Builder) (*StreamMessage, error) { scanner := bufio.NewScanner(r) // Increase buffer for long lines buf := make([]byte, 0, 64*1024) scanner.Buffer(buf, 1024*1024) var resultMsg *StreamMessage for scanner.Scan() { line := scanner.Bytes() if len(line) == 0 { continue } msg, err := ParseStreamMessage(line) if err != nil { // Non-JSON line, treat as plain output event := domain.AgentEvent{ Type: domain.AgentEventOutput, Timestamp: time.Now(), Content: string(line), Stream: "stdout", } handler(event) output.WriteString(string(line)) output.WriteString("\n") continue } // Convert to agent event and dispatch event := msg.ToAgentEvent() handler(event) // Collect output text if msg.Type == StreamMessageMessage && msg.Role == "assistant" { text := extractTextContent(msg.Content) if text != "" { output.WriteString(text) } } // Track result message if msg.IsTerminal() { resultMsg = msg } } if err := scanner.Err(); err != nil { return resultMsg, fmt.Errorf("scanner error: %w", err) } return resultMsg, nil } // streamStderrCapture reads stderr, emits error events, and captures output. func (a *Adapter) streamStderrCapture(r io.Reader, handler domain.AgentEventHandler, capture *strings.Builder) { scanner := bufio.NewScanner(r) buf := make([]byte, 0, 64*1024) scanner.Buffer(buf, 1024*1024) for scanner.Scan() { line := scanner.Text() if line == "" { continue } handler(domain.AgentEvent{ Type: domain.AgentEventError, Timestamp: time.Now(), Content: line, Stream: "stderr", }) // Capture stderr for error message (limit to 4KB) if capture.Len() < 4096 { if capture.Len() > 0 { capture.WriteString("\n") } capture.WriteString(line) } } } // buildErrorWithHelp creates an error message with stderr output and troubleshooting help. func (a *Adapter) buildErrorWithHelp(err error, stderr, namespace, podName string) error { var msg strings.Builder if err != nil { msg.WriteString(err.Error()) } else { msg.WriteString("claude command failed") } // Include stderr if available if stderr != "" { msg.WriteString("\n\nstderr:\n") msg.WriteString(stderr) } // Add troubleshooting help msg.WriteString("\n\n---\nTroubleshooting:\n") msg.WriteString("If Claude is not authenticated, run:\n") fmt.Fprintf(&msg, " kubectl exec -it -n %s %s -- claude login\n", namespace, podName) msg.WriteString("\nTo test Claude manually:\n") fmt.Fprintf(&msg, " kubectl exec -it -n %s %s -- claude -p \"hello\"\n", namespace, podName) return fmt.Errorf("%s", msg.String()) } // Cancel attempts to cancel a running session. func (a *Adapter) Cancel(ctx context.Context, sessionID string) error { a.sessionsMu.Lock() defer a.sessionsMu.Unlock() cancel, exists := a.activeSessions[sessionID] if !exists { return nil // Session not found is not an error } cancel() return nil } // Capabilities returns what this agent supports. func (a *Adapter) Capabilities() domain.AgentCapabilities { return domain.AgentCapabilities{ Provider: domain.AgentProviderClaudeCode, SupportsSessionContinuation: true, SupportsModelSelection: false, // Claude Code only uses Claude SupportsToolControl: true, SupportedModels: []string{"claude-sonnet-4-20250514", "claude-opus-4-20250514"}, DefaultModel: "claude-sonnet-4-20250514", MaxPromptLength: 0, // Unlimited SupportsStreaming: true, } } // DefaultAvailabilityTimeout is the maximum time to wait when checking agent availability. // This timeout prevents blocking the caller when kubectl or the cluster is slow or unresponsive. const DefaultAvailabilityTimeout = 5 * time.Second // Available checks if kubectl is available and can connect to the cluster. func (a *Adapter) Available(ctx context.Context) bool { ctx, cancel := context.WithTimeout(ctx, DefaultAvailabilityTimeout) defer cancel() cmd := exec.CommandContext(ctx, "kubectl", "cluster-info", "--request-timeout=5s") return cmd.Run() == nil } // sessionCounter is used to ensure unique session IDs. var sessionCounter atomic.Uint64 // generateSessionID creates a unique session identifier. func generateSessionID() string { count := sessionCounter.Add(1) return fmt.Sprintf("claude-%d-%d", time.Now().UnixNano(), count) }