package worker import ( "context" "fmt" "strings" "time" "github.com/orchard9/rdev/internal/domain" "github.com/orchard9/rdev/internal/logging" "github.com/orchard9/rdev/internal/port" ) // Build event type constants for SSE streaming. const ( BuildEventStarted = "build.started" BuildEventOutput = "build.output" BuildEventCompleted = "build.completed" BuildEventFailed = "build.failed" BuildEventToolUse = "build.tool_use" BuildEventToolResult = "build.tool_result" BuildEventError = "build.error" ) // BuildExecutor handles WorkTaskTypeBuild tasks. // It translates BuildSpec fields from the work task's Spec map into an // AgentRequest, executes via a CodeAgent, and returns a BuildResult. type BuildExecutor struct { agentRegistry port.CodeAgentRegistry podGitOps *PodGitOperations // Post-build git operations (runs in pod) streams port.StreamPublisher // SSE stream publisher for real-time events defaultPodName string // Default claudebox pod for agent execution namespace string // Kubernetes namespace for the pod } // BuildExecutorConfig holds configuration for the build executor. type BuildExecutorConfig struct { DefaultPodName string // Default pod to execute Claude Code in (e.g., "claudebox-0") Namespace string // Kubernetes namespace (e.g., "rdev") } // NewBuildExecutor creates a new build executor. func NewBuildExecutor( agentRegistry port.CodeAgentRegistry, podGitOps *PodGitOperations, streams port.StreamPublisher, cfg *BuildExecutorConfig, ) *BuildExecutor { if cfg == nil { cfg = &BuildExecutorConfig{ DefaultPodName: "claudebox-0", Namespace: "rdev", } } return &BuildExecutor{ agentRegistry: agentRegistry, podGitOps: podGitOps, streams: streams, defaultPodName: cfg.DefaultPodName, namespace: cfg.Namespace, } } // Execute runs a build task by translating its spec into an agent call. func (b *BuildExecutor) Execute(ctx context.Context, task *domain.WorkTask) *domain.BuildResult { log := logging.FromContext(ctx).WithWorker("build-executor") start := time.Now() streamID := task.ID // Use task ID as stream ID for SSE // Publish BuildEventStarted event b.publishEvent(streamID, BuildEventStarted, map[string]any{ "task_id": task.ID, "project_id": task.ProjectID, "started_at": start.Format(time.RFC3339), }) spec, err := b.parseSpec(task.Spec) if err != nil { b.publishEvent(streamID, BuildEventFailed, map[string]any{ "task_id": task.ID, "error": fmt.Sprintf("invalid build spec: %v", err), }) return &domain.BuildResult{ Success: false, Error: fmt.Sprintf("invalid build spec: %v", err), DurationMs: time.Since(start).Milliseconds(), } } // Working directory in the pod where the project repo is cloned workDir := "/workspace" // Get a code agent agent := b.agentRegistry.Default() if agent == nil { return &domain.BuildResult{ Success: false, Error: "no code agent available", DurationMs: time.Since(start).Milliseconds(), } } // Determine which pod to execute in (from task spec or default) podName, _ := task.Spec["pod_name"].(string) if podName == "" { podName = b.defaultPodName } // Clone or update the git repository if git operations are needed. // This ensures the workspace is a valid git repo before the agent runs. if (spec.AutoCommit || spec.AutoPush) && b.podGitOps != nil { if spec.GitCloneURL == "" { b.publishEvent(streamID, BuildEventFailed, map[string]any{ "task_id": task.ID, "error": "git_clone_url is required when auto_commit or auto_push is enabled", }) return &domain.BuildResult{ Success: false, Error: "git_clone_url is required when auto_commit or auto_push is enabled", DurationMs: time.Since(start).Milliseconds(), } } log.Info("ensuring git repository is ready", "task_id", task.ID, "pod", podName, "workDir", workDir, ) cloneResult := b.podGitOps.CloneRepo(ctx, podName, workDir, spec.GitCloneURL) if cloneResult.Error != nil { b.publishEvent(streamID, BuildEventFailed, map[string]any{ "task_id": task.ID, "error": fmt.Sprintf("git clone failed: %v", cloneResult.Error), }) return &domain.BuildResult{ Success: false, Error: fmt.Sprintf("git clone failed: %v", cloneResult.Error), DurationMs: time.Since(start).Milliseconds(), } } if cloneResult.Cloned { b.publishEvent(streamID, BuildEventOutput, map[string]any{ "content": fmt.Sprintf("Cloned repository to %s", workDir), }) } // Reset to main to ensure clean workspace state. // Worker pods may be left on a feature branch from a previous task. if err := b.podGitOps.ResetToMain(ctx, podName, workDir); err != nil { log.Warn("failed to reset workspace to main, continuing", "task_id", task.ID, logging.FieldError, err, ) } } // Build the agent request with pod metadata for Claude Code adapter agentReq := &domain.AgentRequest{ Prompt: spec.Prompt, ProjectID: domain.ProjectID(task.ProjectID), WorkingDir: workDir, Timeout: 10 * time.Minute, Metadata: map[string]string{ "pod_name": podName, "namespace": b.namespace, }, } // Collect output with a size cap to prevent OOM on verbose builds. const maxOutputSize = 1 << 20 // 1MB var outputBuilder strings.Builder log.Info("executing build via agent", "task_id", task.ID, "project_id", task.ProjectID, "agent", agent.Name(), "work_dir", workDir, ) // Execute the agent agentResult, err := agent.Execute(ctx, agentReq, func(event domain.AgentEvent) { // Publish all agent events to the SSE stream eventType := BuildEventOutput switch event.Type { case domain.AgentEventToolUse: eventType = BuildEventToolUse case domain.AgentEventToolResult: eventType = BuildEventToolResult case domain.AgentEventError: eventType = BuildEventError } b.publishEvent(streamID, eventType, map[string]any{ "content": event.Content, "stream": event.Stream, "tool_name": event.ToolName, }) // Also buffer output for final result if event.Type == domain.AgentEventOutput || event.Type == domain.AgentEventError { if outputBuilder.Len() >= maxOutputSize { return // Output cap reached, discard further output } if outputBuilder.Len() > 0 { outputBuilder.WriteString("\n") } remaining := maxOutputSize - outputBuilder.Len() if len(event.Content) > remaining { outputBuilder.WriteString(event.Content[:remaining]) outputBuilder.WriteString("\n... [output truncated at 1MB]") } else { outputBuilder.WriteString(event.Content) } } }) if err != nil { b.publishEvent(streamID, BuildEventFailed, map[string]any{ "task_id": task.ID, "error": fmt.Sprintf("agent execution failed: %v", err), "duration_ms": time.Since(start).Milliseconds(), }) b.closeStream(ctx, streamID) return &domain.BuildResult{ Success: false, Error: fmt.Sprintf("agent execution failed: %v", err), Output: outputBuilder.String(), DurationMs: time.Since(start).Milliseconds(), } } // Use streamed output, but fall back to agent's captured output if streaming missed it output := outputBuilder.String() if output == "" && agentResult.FinalOutput != "" { output = agentResult.FinalOutput } result := &domain.BuildResult{ Success: agentResult.Success(), Output: output, DurationMs: time.Since(start).Milliseconds(), Artifacts: make(map[string]string), } // Include SDLC context in artifacts for callback routing if spec.SDLCContext != nil { if spec.SDLCContext.Feature != "" { result.Artifacts["sdlc_feature"] = spec.SDLCContext.Feature } if spec.SDLCContext.ArtifactType != "" { result.Artifacts["sdlc_artifact_type"] = spec.SDLCContext.ArtifactType } if spec.SDLCContext.TaskID != "" { result.Artifacts["sdlc_task_id"] = spec.SDLCContext.TaskID } } if !agentResult.Success() { errMsg := "agent returned non-zero exit code" if agentResult.Error != nil { errMsg = agentResult.Error.Error() } result.Error = errMsg } // Post-build git operations: commit and push changes programmatically. // This is deterministic - we don't rely on the LLM to run git commands. if result.Success && spec.AutoCommit && b.podGitOps != nil { commitMsg := fmt.Sprintf("build: %s", truncate(spec.Prompt, 72)) gitResult := b.podGitOps.CommitAndPush(ctx, podName, workDir, commitMsg, spec.AutoPush) if gitResult.Error != nil { log.Warn("post-build git operations failed", "task_id", task.ID, "error", gitResult.Error, ) result.Success = false result.Error = fmt.Sprintf("build succeeded but git operations failed: %v", gitResult.Error) } else if gitResult.HasChanges { result.CommitSHA = gitResult.CommitSHA result.FilesChanged = gitResult.FilesChanged log.Info("post-build git operations completed", "task_id", task.ID, "commit", gitResult.CommitSHA, "files", len(gitResult.FilesChanged), "pushed", gitResult.Pushed, ) } else { log.Info("no changes to commit after build", "task_id", task.ID, ) } } // Publish completion event if result.Success { b.publishEvent(streamID, BuildEventCompleted, map[string]any{ "task_id": task.ID, "success": true, "commit_sha": result.CommitSHA, "files_changed": result.FilesChanged, "duration_ms": result.DurationMs, }) } else { b.publishEvent(streamID, BuildEventFailed, map[string]any{ "task_id": task.ID, "error": result.Error, "duration_ms": result.DurationMs, }) } b.closeStream(ctx, streamID) return result } // publishEvent publishes an event to the SSE stream if a stream publisher is configured. func (b *BuildExecutor) publishEvent(streamID, eventType string, data map[string]any) { if b.streams == nil { return } b.streams.Publish(streamID, port.StreamEvent{ Type: eventType, Data: data, }) } // streamCloseDelay is the time to wait before closing a stream after build completion. // This allows SSE clients to receive final events before the stream is closed. const streamCloseDelay = 5 * time.Second // closeStream closes the stream after a delay to allow clients to receive final events. // The close is context-aware and respects cancellation. func (b *BuildExecutor) closeStream(ctx context.Context, streamID string) { if b.streams == nil { return } // Close stream after a short delay to ensure final events are delivered. // Use a goroutine with context awareness to avoid race conditions. go func() { select { case <-ctx.Done(): // Context cancelled, close immediately b.streams.Close(streamID) case <-time.After(streamCloseDelay): b.streams.Close(streamID) } }() } // parsedBuildSpec holds typed fields extracted from the task spec map. type parsedBuildSpec struct { Prompt string AutoCommit bool AutoPush bool GitCloneURL string // SDLCContext holds SDLC-specific context for callback routing. SDLCContext *sdlcContext } // sdlcContext holds SDLC-specific context extracted from the task spec. type sdlcContext struct { Feature string ArtifactType string TaskID string } // parseSpec extracts typed BuildSpec fields from the generic map[string]any. func (b *BuildExecutor) parseSpec(spec map[string]any) (*parsedBuildSpec, error) { prompt, _ := spec["prompt"].(string) if prompt == "" { return nil, fmt.Errorf("prompt is required") } autoCommit, _ := spec["auto_commit"].(bool) autoPush, _ := spec["auto_push"].(bool) gitCloneURL, _ := spec["git_clone_url"].(string) parsed := &parsedBuildSpec{ Prompt: prompt, AutoCommit: autoCommit, AutoPush: autoPush, GitCloneURL: gitCloneURL, } // Extract SDLC context if present if sdlcCtx, ok := spec["sdlc_context"].(map[string]any); ok { parsed.SDLCContext = &sdlcContext{ Feature: stringFromMap(sdlcCtx, "feature"), ArtifactType: stringFromMap(sdlcCtx, "artifact_type"), TaskID: stringFromMap(sdlcCtx, "task_id"), } } return parsed, nil } // stringFromMap safely extracts a string from a map[string]any. func stringFromMap(m map[string]any, key string) string { if v, ok := m[key].(string); ok { return v } return "" } // truncate shortens a string to maxLen, adding "..." if truncated. func truncate(s string, maxLen int) string { if len(s) <= maxLen { return s } if maxLen <= 3 { return s[:maxLen] } return s[:maxLen-3] + "..." }