rdev/internal/adapter/codeagent/claudecode/adapter.go
jordan c59d348040 chore: prepare for composable monorepo template implementation
This commit captures the current state before implementing the composable
monorepo template system. Key changes included:

Infrastructure:
- Add CockroachDB provisioner adapter for database provisioning
- Add Redis provisioner adapter for cache provisioning
- Add build events system with PostgreSQL storage
- Add WebSocket endpoint for real-time build progress

Code agent improvements:
- Fix Claude Code adapter to use default allowed tools instead of dangerously-skip-permissions
- Add context-aware stream closing for cancellation support
- Improve parser tests for edge cases

Build system:
- Add build event constants and metrics
- Remove deprecated git_operations.go (replaced by pod_git_operations.go)
- Add rollback logic for multi-step provisioning operations

Documentation:
- Add composable-monorepo feature documentation
- Add DNS/Cloudflare service documentation
- Update deployment and troubleshooting guides

Cookbooks:
- Add fullstack-app cookbook
- Refactor landing-test with shared library

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-31 11:39:28 -07:00

398 lines
11 KiB
Go

// Package claudecode provides a CodeAgent implementation for Anthropic's Claude Code CLI.
package claudecode
import (
"bufio"
"context"
"fmt"
"io"
"os/exec"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/port"
)
// Adapter implements port.CodeAgent using Anthropic's Claude Code CLI.
type Adapter struct {
namespace string
mu sync.RWMutex
// Track active sessions for cancellation
activeSessions map[string]context.CancelFunc
sessionsMu sync.Mutex
}
// NewAdapter creates a new Claude Code adapter.
func NewAdapter(namespace string) *Adapter {
return &Adapter{
namespace: namespace,
activeSessions: make(map[string]context.CancelFunc),
}
}
// Ensure Adapter implements port.CodeAgent at compile time.
var _ port.CodeAgent = (*Adapter)(nil)
// Name returns a human-readable name for this agent.
func (a *Adapter) Name() string {
return "Claude Code"
}
// Provider returns the agent provider identifier.
func (a *Adapter) Provider() domain.AgentProvider {
return domain.AgentProviderClaudeCode
}
// Execute runs a Claude Code command and streams events to the handler.
func (a *Adapter) Execute(ctx context.Context, req *domain.AgentRequest, handler domain.AgentEventHandler) (*domain.AgentResult, error) {
if req.Prompt == "" {
return nil, fmt.Errorf("prompt is required")
}
a.mu.RLock()
namespace := a.namespace
a.mu.RUnlock()
// Create cancellable context
execCtx, cancel := context.WithCancel(ctx)
defer cancel()
// Track session for potential cancellation
sessionID := generateSessionID()
a.sessionsMu.Lock()
a.activeSessions[sessionID] = cancel
a.sessionsMu.Unlock()
defer func() {
a.sessionsMu.Lock()
delete(a.activeSessions, sessionID)
a.sessionsMu.Unlock()
}()
// Get pod name from project (passed via metadata or lookup)
var podName string
if req.Metadata != nil {
podName = req.Metadata["pod_name"]
}
if podName == "" {
return &domain.AgentResult{
SessionID: sessionID,
ExitCode: 1,
Error: fmt.Errorf("pod_name is required in request metadata"),
}, nil
}
// Build kubectl exec command for Claude Code
args := a.buildCommandArgs(namespace, podName, req)
// Apply timeout if specified
if req.Timeout > 0 {
var timeoutCancel context.CancelFunc
execCtx, timeoutCancel = context.WithTimeout(execCtx, req.Timeout)
defer timeoutCancel()
}
startTime := time.Now()
kubectl := exec.CommandContext(execCtx, "kubectl", args...)
// Get stdout pipe for stream-json output
stdout, err := kubectl.StdoutPipe()
if err != nil {
return &domain.AgentResult{
SessionID: sessionID,
ExitCode: 1,
Error: fmt.Errorf("stdout pipe: %w", err),
}, nil
}
// Get stderr for error messages
stderr, err := kubectl.StderrPipe()
if err != nil {
return &domain.AgentResult{
SessionID: sessionID,
ExitCode: 1,
Error: fmt.Errorf("stderr pipe: %w", err),
}, nil
}
// Start the command
if err := kubectl.Start(); err != nil {
return &domain.AgentResult{
SessionID: sessionID,
ExitCode: 1,
Error: fmt.Errorf("start: %w", err),
}, nil
}
// Stream and parse output
var wg sync.WaitGroup
var finalOutput strings.Builder
var stderrOutput strings.Builder
var parseErr error
var resultMsg *StreamMessage
wg.Add(2)
// Parse stream-json from stdout
go func() {
defer wg.Done()
resultMsg, parseErr = a.parseStreamOutput(stdout, handler, &finalOutput)
}()
// Stream stderr as error events and capture for error message
go func() {
defer wg.Done()
a.streamStderrCapture(stderr, handler, &stderrOutput)
}()
wg.Wait()
// Wait for command completion
cmdErr := kubectl.Wait()
duration := time.Since(startTime)
result := &domain.AgentResult{
SessionID: sessionID,
DurationMs: duration.Milliseconds(),
FinalOutput: finalOutput.String(),
}
// Determine exit code and error
if cmdErr != nil {
if exitErr, ok := cmdErr.(*exec.ExitError); ok {
result.ExitCode = exitErr.ExitCode()
} else {
result.ExitCode = 1
result.Error = cmdErr
}
// Include stderr and troubleshooting help in error
result.Error = a.buildErrorWithHelp(result.Error, stderrOutput.String(), namespace, podName)
} else if parseErr != nil {
result.ExitCode = 1
result.Error = a.buildErrorWithHelp(parseErr, stderrOutput.String(), namespace, podName)
} else if resultMsg != nil && !resultMsg.IsSuccess() {
result.ExitCode = 1
if resultMsg.Error != "" {
result.Error = a.buildErrorWithHelp(fmt.Errorf("%s", resultMsg.Error), stderrOutput.String(), namespace, podName)
} else {
result.Error = a.buildErrorWithHelp(nil, stderrOutput.String(), namespace, podName)
}
}
// Update session ID from result if available
if resultMsg != nil && resultMsg.SessionID != "" {
result.SessionID = resultMsg.SessionID
}
return result, nil
}
// defaultAllowedTools is the list of tools to allow when running Claude Code
// in automated mode. Using --allowedTools instead of --dangerously-skip-permissions
// because the latter is blocked when running as root (which claudebox pods do).
var defaultAllowedTools = []string{
"Bash", "Edit", "Write", "Read", "Glob", "Grep", "Task", "WebFetch", "WebSearch",
}
// buildCommandArgs constructs the kubectl exec arguments for Claude Code.
// IMPORTANT: The prompt MUST come immediately after "claude" (before other flags)
// because Claude Code's CLI parser expects the positional prompt argument early.
func (a *Adapter) buildCommandArgs(namespace, podName string, req *domain.AgentRequest) []string {
// Start with kubectl exec and the prompt right after "claude"
// This is required because Claude Code's CLI doesn't accept the prompt at the end
args := []string{
"exec", "-n", namespace, podName, "--",
"claude",
req.Prompt, // Prompt MUST come first after "claude"
"-p", // Print mode (non-interactive)
"--verbose", // Required for stream-json output
"--output-format", "stream-json",
}
// Add session continuation if resuming
if req.SessionID != "" {
args = append(args, "--resume", req.SessionID)
}
// Add allowed tools - use request's tools if specified, otherwise use defaults.
// This replaces --dangerously-skip-permissions which is blocked when running as root.
allowedTools := req.AllowedTools
if len(allowedTools) == 0 {
allowedTools = defaultAllowedTools
}
for _, tool := range allowedTools {
args = append(args, "--allowedTools", tool)
}
// Add working directory if specified
if req.WorkingDir != "" && req.WorkingDir != "/workspace" {
args = append(args, "--add-dir", req.WorkingDir)
}
return args
}
// parseStreamOutput reads and parses NDJSON stream-json output.
func (a *Adapter) parseStreamOutput(r io.Reader, handler domain.AgentEventHandler, output *strings.Builder) (*StreamMessage, error) {
scanner := bufio.NewScanner(r)
// Increase buffer for long lines
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, 1024*1024)
var resultMsg *StreamMessage
for scanner.Scan() {
line := scanner.Bytes()
if len(line) == 0 {
continue
}
msg, err := ParseStreamMessage(line)
if err != nil {
// Non-JSON line, treat as plain output
event := domain.AgentEvent{
Type: domain.AgentEventOutput,
Timestamp: time.Now(),
Content: string(line),
Stream: "stdout",
}
handler(event)
output.WriteString(string(line))
output.WriteString("\n")
continue
}
// Convert to agent event and dispatch
event := msg.ToAgentEvent()
handler(event)
// Collect output text
if msg.Type == StreamMessageMessage && msg.Role == "assistant" {
text := extractTextContent(msg.Content)
if text != "" {
output.WriteString(text)
}
}
// Track result message
if msg.IsTerminal() {
resultMsg = msg
}
}
if err := scanner.Err(); err != nil {
return resultMsg, fmt.Errorf("scanner error: %w", err)
}
return resultMsg, nil
}
// streamStderrCapture reads stderr, emits error events, and captures output.
func (a *Adapter) streamStderrCapture(r io.Reader, handler domain.AgentEventHandler, capture *strings.Builder) {
scanner := bufio.NewScanner(r)
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, 1024*1024)
for scanner.Scan() {
line := scanner.Text()
if line == "" {
continue
}
handler(domain.AgentEvent{
Type: domain.AgentEventError,
Timestamp: time.Now(),
Content: line,
Stream: "stderr",
})
// Capture stderr for error message (limit to 4KB)
if capture.Len() < 4096 {
if capture.Len() > 0 {
capture.WriteString("\n")
}
capture.WriteString(line)
}
}
}
// buildErrorWithHelp creates an error message with stderr output and troubleshooting help.
func (a *Adapter) buildErrorWithHelp(err error, stderr, namespace, podName string) error {
var msg strings.Builder
if err != nil {
msg.WriteString(err.Error())
} else {
msg.WriteString("claude command failed")
}
// Include stderr if available
if stderr != "" {
msg.WriteString("\n\nstderr:\n")
msg.WriteString(stderr)
}
// Add troubleshooting help
msg.WriteString("\n\n---\nTroubleshooting:\n")
msg.WriteString("If Claude is not authenticated, run:\n")
fmt.Fprintf(&msg, " kubectl exec -it -n %s %s -- claude login\n", namespace, podName)
msg.WriteString("\nTo test Claude manually:\n")
fmt.Fprintf(&msg, " kubectl exec -it -n %s %s -- claude -p \"hello\"\n", namespace, podName)
return fmt.Errorf("%s", msg.String())
}
// Cancel attempts to cancel a running session.
func (a *Adapter) Cancel(ctx context.Context, sessionID string) error {
a.sessionsMu.Lock()
defer a.sessionsMu.Unlock()
cancel, exists := a.activeSessions[sessionID]
if !exists {
return nil // Session not found is not an error
}
cancel()
return nil
}
// Capabilities returns what this agent supports.
func (a *Adapter) Capabilities() domain.AgentCapabilities {
return domain.AgentCapabilities{
Provider: domain.AgentProviderClaudeCode,
SupportsSessionContinuation: true,
SupportsModelSelection: false, // Claude Code only uses Claude
SupportsToolControl: true,
SupportedModels: []string{"claude-sonnet-4-20250514", "claude-opus-4-20250514"},
DefaultModel: "claude-sonnet-4-20250514",
MaxPromptLength: 0, // Unlimited
SupportsStreaming: true,
}
}
// DefaultAvailabilityTimeout is the maximum time to wait when checking agent availability.
// This timeout prevents blocking the caller when kubectl or the cluster is slow or unresponsive.
const DefaultAvailabilityTimeout = 5 * time.Second
// Available checks if kubectl is available and can connect to the cluster.
func (a *Adapter) Available(ctx context.Context) bool {
ctx, cancel := context.WithTimeout(ctx, DefaultAvailabilityTimeout)
defer cancel()
cmd := exec.CommandContext(ctx, "kubectl", "cluster-info", "--request-timeout=5s")
return cmd.Run() == nil
}
// sessionCounter is used to ensure unique session IDs.
var sessionCounter atomic.Uint64
// generateSessionID creates a unique session identifier.
func generateSessionID() string {
count := sessionCounter.Add(1)
return fmt.Sprintf("claude-%d-%d", time.Now().UnixNano(), count)
}