rdev/internal/worker/build_executor.go
jordan cefc15aa7d
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
fix(worker): include stdout in error messages when Claude command fails
Auth errors like "OAuth token has expired" were lost because Claude writes
them to stdout, not stderr. The error message only showed kubectl's generic
"command terminated with exit code 1". Now includes both stdout and stderr
in the error, making failures immediately diagnosable.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-10 17:55:46 -07:00

407 lines
12 KiB
Go

package worker
import (
"context"
"fmt"
"strings"
"time"
"github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/logging"
"github.com/orchard9/rdev/internal/port"
)
// Build event type constants for SSE streaming.
const (
BuildEventStarted = "build.started"
BuildEventOutput = "build.output"
BuildEventCompleted = "build.completed"
BuildEventFailed = "build.failed"
BuildEventToolUse = "build.tool_use"
BuildEventToolResult = "build.tool_result"
BuildEventError = "build.error"
)
// BuildExecutor handles WorkTaskTypeBuild tasks.
// It translates BuildSpec fields from the work task's Spec map into an
// AgentRequest, executes via a CodeAgent, and returns a BuildResult.
type BuildExecutor struct {
agentRegistry port.CodeAgentRegistry
podGitOps *PodGitOperations // Post-build git operations (runs in pod)
streams port.StreamPublisher // SSE stream publisher for real-time events
defaultPodName string // Default claudebox pod for agent execution
namespace string // Kubernetes namespace for the pod
}
// BuildExecutorConfig holds configuration for the build executor.
type BuildExecutorConfig struct {
DefaultPodName string // Default pod to execute Claude Code in (e.g., "claudebox-0")
Namespace string // Kubernetes namespace (e.g., "rdev")
}
// NewBuildExecutor creates a new build executor.
func NewBuildExecutor(
agentRegistry port.CodeAgentRegistry,
podGitOps *PodGitOperations,
streams port.StreamPublisher,
cfg *BuildExecutorConfig,
) *BuildExecutor {
if cfg == nil {
cfg = &BuildExecutorConfig{
DefaultPodName: "claudebox-0",
Namespace: "rdev",
}
}
return &BuildExecutor{
agentRegistry: agentRegistry,
podGitOps: podGitOps,
streams: streams,
defaultPodName: cfg.DefaultPodName,
namespace: cfg.Namespace,
}
}
// Execute runs a build task by translating its spec into an agent call.
func (b *BuildExecutor) Execute(ctx context.Context, task *domain.WorkTask) *domain.BuildResult {
log := logging.FromContext(ctx).WithWorker("build-executor")
start := time.Now()
streamID := task.ID // Use task ID as stream ID for SSE
// Publish BuildEventStarted event
b.publishEvent(streamID, BuildEventStarted, map[string]any{
"task_id": task.ID,
"project_id": task.ProjectID,
"started_at": start.Format(time.RFC3339),
})
spec, err := b.parseSpec(task.Spec)
if err != nil {
b.publishEvent(streamID, BuildEventFailed, map[string]any{
"task_id": task.ID,
"error": fmt.Sprintf("invalid build spec: %v", err),
})
return &domain.BuildResult{
Success: false,
Error: fmt.Sprintf("invalid build spec: %v", err),
DurationMs: time.Since(start).Milliseconds(),
}
}
// Working directory in the pod where the project repo is cloned
workDir := "/workspace"
// Get a code agent
agent := b.agentRegistry.Default()
if agent == nil {
return &domain.BuildResult{
Success: false,
Error: "no code agent available",
DurationMs: time.Since(start).Milliseconds(),
}
}
// Determine which pod to execute in (from task spec or default)
podName, _ := task.Spec["pod_name"].(string)
if podName == "" {
podName = b.defaultPodName
}
// Clone or update the git repository if git operations are needed.
// This ensures the workspace is a valid git repo before the agent runs.
if (spec.AutoCommit || spec.AutoPush) && b.podGitOps != nil {
if spec.GitCloneURL == "" {
b.publishEvent(streamID, BuildEventFailed, map[string]any{
"task_id": task.ID,
"error": "git_clone_url is required when auto_commit or auto_push is enabled",
})
return &domain.BuildResult{
Success: false,
Error: "git_clone_url is required when auto_commit or auto_push is enabled",
DurationMs: time.Since(start).Milliseconds(),
}
}
log.Info("ensuring git repository is ready",
"task_id", task.ID,
"pod", podName,
"workDir", workDir,
)
cloneResult := b.podGitOps.CloneRepo(ctx, podName, workDir, spec.GitCloneURL)
if cloneResult.Error != nil {
b.publishEvent(streamID, BuildEventFailed, map[string]any{
"task_id": task.ID,
"error": fmt.Sprintf("git clone failed: %v", cloneResult.Error),
})
return &domain.BuildResult{
Success: false,
Error: fmt.Sprintf("git clone failed: %v", cloneResult.Error),
DurationMs: time.Since(start).Milliseconds(),
}
}
if cloneResult.Cloned {
b.publishEvent(streamID, BuildEventOutput, map[string]any{
"content": fmt.Sprintf("Cloned repository to %s", workDir),
})
}
}
// Build the agent request with pod metadata for Claude Code adapter
agentReq := &domain.AgentRequest{
Prompt: spec.Prompt,
ProjectID: domain.ProjectID(task.ProjectID),
WorkingDir: workDir,
Timeout: 10 * time.Minute,
Metadata: map[string]string{
"pod_name": podName,
"namespace": b.namespace,
},
}
// Collect output with a size cap to prevent OOM on verbose builds.
const maxOutputSize = 1 << 20 // 1MB
var outputBuilder strings.Builder
log.Info("executing build via agent",
"task_id", task.ID,
"project_id", task.ProjectID,
"agent", agent.Name(),
"work_dir", workDir,
)
// Execute the agent
agentResult, err := agent.Execute(ctx, agentReq, func(event domain.AgentEvent) {
// Publish all agent events to the SSE stream
eventType := BuildEventOutput
switch event.Type {
case domain.AgentEventToolUse:
eventType = BuildEventToolUse
case domain.AgentEventToolResult:
eventType = BuildEventToolResult
case domain.AgentEventError:
eventType = BuildEventError
}
b.publishEvent(streamID, eventType, map[string]any{
"content": event.Content,
"stream": event.Stream,
"tool_name": event.ToolName,
})
// Also buffer output for final result
if event.Type == domain.AgentEventOutput || event.Type == domain.AgentEventError {
if outputBuilder.Len() >= maxOutputSize {
return // Output cap reached, discard further output
}
if outputBuilder.Len() > 0 {
outputBuilder.WriteString("\n")
}
remaining := maxOutputSize - outputBuilder.Len()
if len(event.Content) > remaining {
outputBuilder.WriteString(event.Content[:remaining])
outputBuilder.WriteString("\n... [output truncated at 1MB]")
} else {
outputBuilder.WriteString(event.Content)
}
}
})
if err != nil {
b.publishEvent(streamID, BuildEventFailed, map[string]any{
"task_id": task.ID,
"error": fmt.Sprintf("agent execution failed: %v", err),
"duration_ms": time.Since(start).Milliseconds(),
})
b.closeStream(ctx, streamID)
return &domain.BuildResult{
Success: false,
Error: fmt.Sprintf("agent execution failed: %v", err),
Output: outputBuilder.String(),
DurationMs: time.Since(start).Milliseconds(),
}
}
// Use streamed output, but fall back to agent's captured output if streaming missed it
output := outputBuilder.String()
if output == "" && agentResult.FinalOutput != "" {
output = agentResult.FinalOutput
}
result := &domain.BuildResult{
Success: agentResult.Success(),
Output: output,
DurationMs: time.Since(start).Milliseconds(),
Artifacts: make(map[string]string),
}
// Include SDLC context in artifacts for callback routing
if spec.SDLCContext != nil {
if spec.SDLCContext.Feature != "" {
result.Artifacts["sdlc_feature"] = spec.SDLCContext.Feature
}
if spec.SDLCContext.ArtifactType != "" {
result.Artifacts["sdlc_artifact_type"] = spec.SDLCContext.ArtifactType
}
if spec.SDLCContext.TaskID != "" {
result.Artifacts["sdlc_task_id"] = spec.SDLCContext.TaskID
}
}
if !agentResult.Success() {
errMsg := "agent returned non-zero exit code"
if agentResult.Error != nil {
errMsg = agentResult.Error.Error()
}
result.Error = errMsg
}
// Post-build git operations: commit and push changes programmatically.
// This is deterministic - we don't rely on the LLM to run git commands.
if result.Success && spec.AutoCommit && b.podGitOps != nil {
commitMsg := fmt.Sprintf("build: %s", truncate(spec.Prompt, 72))
gitResult := b.podGitOps.CommitAndPush(ctx, podName, workDir, commitMsg, spec.AutoPush)
if gitResult.Error != nil {
log.Warn("post-build git operations failed",
"task_id", task.ID,
"error", gitResult.Error,
)
result.Success = false
result.Error = fmt.Sprintf("build succeeded but git operations failed: %v", gitResult.Error)
} else if gitResult.HasChanges {
result.CommitSHA = gitResult.CommitSHA
result.FilesChanged = gitResult.FilesChanged
log.Info("post-build git operations completed",
"task_id", task.ID,
"commit", gitResult.CommitSHA,
"files", len(gitResult.FilesChanged),
"pushed", gitResult.Pushed,
)
} else {
log.Info("no changes to commit after build",
"task_id", task.ID,
)
}
}
// Publish completion event
if result.Success {
b.publishEvent(streamID, BuildEventCompleted, map[string]any{
"task_id": task.ID,
"success": true,
"commit_sha": result.CommitSHA,
"files_changed": result.FilesChanged,
"duration_ms": result.DurationMs,
})
} else {
b.publishEvent(streamID, BuildEventFailed, map[string]any{
"task_id": task.ID,
"error": result.Error,
"duration_ms": result.DurationMs,
})
}
b.closeStream(ctx, streamID)
return result
}
// publishEvent publishes an event to the SSE stream if a stream publisher is configured.
func (b *BuildExecutor) publishEvent(streamID, eventType string, data map[string]any) {
if b.streams == nil {
return
}
b.streams.Publish(streamID, port.StreamEvent{
Type: eventType,
Data: data,
})
}
// streamCloseDelay is the time to wait before closing a stream after build completion.
// This allows SSE clients to receive final events before the stream is closed.
const streamCloseDelay = 5 * time.Second
// closeStream closes the stream after a delay to allow clients to receive final events.
// The close is context-aware and respects cancellation.
func (b *BuildExecutor) closeStream(ctx context.Context, streamID string) {
if b.streams == nil {
return
}
// Close stream after a short delay to ensure final events are delivered.
// Use a goroutine with context awareness to avoid race conditions.
go func() {
select {
case <-ctx.Done():
// Context cancelled, close immediately
b.streams.Close(streamID)
case <-time.After(streamCloseDelay):
b.streams.Close(streamID)
}
}()
}
// parsedBuildSpec holds typed fields extracted from the task spec map.
type parsedBuildSpec struct {
Prompt string
AutoCommit bool
AutoPush bool
GitCloneURL string
// SDLCContext holds SDLC-specific context for callback routing.
SDLCContext *sdlcContext
}
// sdlcContext holds SDLC-specific context extracted from the task spec.
type sdlcContext struct {
Feature string
ArtifactType string
TaskID string
}
// parseSpec extracts typed BuildSpec fields from the generic map[string]any.
func (b *BuildExecutor) parseSpec(spec map[string]any) (*parsedBuildSpec, error) {
prompt, _ := spec["prompt"].(string)
if prompt == "" {
return nil, fmt.Errorf("prompt is required")
}
autoCommit, _ := spec["auto_commit"].(bool)
autoPush, _ := spec["auto_push"].(bool)
gitCloneURL, _ := spec["git_clone_url"].(string)
parsed := &parsedBuildSpec{
Prompt: prompt,
AutoCommit: autoCommit,
AutoPush: autoPush,
GitCloneURL: gitCloneURL,
}
// Extract SDLC context if present
if sdlcCtx, ok := spec["sdlc_context"].(map[string]any); ok {
parsed.SDLCContext = &sdlcContext{
Feature: stringFromMap(sdlcCtx, "feature"),
ArtifactType: stringFromMap(sdlcCtx, "artifact_type"),
TaskID: stringFromMap(sdlcCtx, "task_id"),
}
}
return parsed, nil
}
// stringFromMap safely extracts a string from a map[string]any.
func stringFromMap(m map[string]any, key string) string {
if v, ok := m[key].(string); ok {
return v
}
return ""
}
// truncate shortens a string to maxLen, adding "..." if truncated.
func truncate(s string, maxLen int) string {
if len(s) <= maxLen {
return s
}
if maxLen <= 3 {
return s[:maxLen]
}
return s[:maxLen-3] + "..."
}