This commit captures the current state before implementing the composable monorepo template system. Key changes included: Infrastructure: - Add CockroachDB provisioner adapter for database provisioning - Add Redis provisioner adapter for cache provisioning - Add build events system with PostgreSQL storage - Add WebSocket endpoint for real-time build progress Code agent improvements: - Fix Claude Code adapter to use default allowed tools instead of dangerously-skip-permissions - Add context-aware stream closing for cancellation support - Improve parser tests for edge cases Build system: - Add build event constants and metrics - Remove deprecated git_operations.go (replaced by pod_git_operations.go) - Add rollback logic for multi-step provisioning operations Documentation: - Add composable-monorepo feature documentation - Add DNS/Cloudflare service documentation - Update deployment and troubleshooting guides Cookbooks: - Add fullstack-app cookbook - Refactor landing-test with shared library Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
364 lines
11 KiB
Go
364 lines
11 KiB
Go
package worker
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/orchard9/rdev/internal/domain"
|
|
"github.com/orchard9/rdev/internal/port"
|
|
)
|
|
|
|
// Build event type constants for SSE streaming.
|
|
const (
|
|
BuildEventStarted = "build.started"
|
|
BuildEventOutput = "build.output"
|
|
BuildEventCompleted = "build.completed"
|
|
BuildEventFailed = "build.failed"
|
|
BuildEventToolUse = "build.tool_use"
|
|
BuildEventToolResult = "build.tool_result"
|
|
BuildEventError = "build.error"
|
|
)
|
|
|
|
// BuildExecutor handles WorkTaskTypeBuild tasks.
|
|
// It translates BuildSpec fields from the work task's Spec map into an
|
|
// AgentRequest, executes via a CodeAgent, and returns a BuildResult.
|
|
type BuildExecutor struct {
|
|
agentRegistry port.CodeAgentRegistry
|
|
podGitOps *PodGitOperations // Post-build git operations (runs in pod)
|
|
streams port.StreamPublisher // SSE stream publisher for real-time events
|
|
logger *slog.Logger
|
|
defaultPodName string // Default claudebox pod for agent execution
|
|
namespace string // Kubernetes namespace for the pod
|
|
}
|
|
|
|
// BuildExecutorConfig holds configuration for the build executor.
|
|
type BuildExecutorConfig struct {
|
|
DefaultPodName string // Default pod to execute Claude Code in (e.g., "claudebox-0")
|
|
Namespace string // Kubernetes namespace (e.g., "rdev")
|
|
}
|
|
|
|
// NewBuildExecutor creates a new build executor.
|
|
func NewBuildExecutor(
|
|
agentRegistry port.CodeAgentRegistry,
|
|
podGitOps *PodGitOperations,
|
|
streams port.StreamPublisher,
|
|
logger *slog.Logger,
|
|
cfg *BuildExecutorConfig,
|
|
) *BuildExecutor {
|
|
if logger == nil {
|
|
logger = slog.Default()
|
|
}
|
|
if cfg == nil {
|
|
cfg = &BuildExecutorConfig{
|
|
DefaultPodName: "claudebox-0",
|
|
Namespace: "rdev",
|
|
}
|
|
}
|
|
return &BuildExecutor{
|
|
agentRegistry: agentRegistry,
|
|
podGitOps: podGitOps,
|
|
streams: streams,
|
|
logger: logger.With("component", "build-executor"),
|
|
defaultPodName: cfg.DefaultPodName,
|
|
namespace: cfg.Namespace,
|
|
}
|
|
}
|
|
|
|
// Execute runs a build task by translating its spec into an agent call.
|
|
func (b *BuildExecutor) Execute(ctx context.Context, task *domain.WorkTask) *domain.BuildResult {
|
|
start := time.Now()
|
|
streamID := task.ID // Use task ID as stream ID for SSE
|
|
|
|
// Publish BuildEventStarted event
|
|
b.publishEvent(streamID, "BuildEventStarted", map[string]any{
|
|
"task_id": task.ID,
|
|
"project_id": task.ProjectID,
|
|
"started_at": start.Format(time.RFC3339),
|
|
})
|
|
|
|
spec, err := b.parseSpec(task.Spec)
|
|
if err != nil {
|
|
b.publishEvent(streamID, "BuildEventFailed", map[string]any{
|
|
"task_id": task.ID,
|
|
"error": fmt.Sprintf("invalid build spec: %v", err),
|
|
})
|
|
return &domain.BuildResult{
|
|
Success: false,
|
|
Error: fmt.Sprintf("invalid build spec: %v", err),
|
|
DurationMs: time.Since(start).Milliseconds(),
|
|
}
|
|
}
|
|
|
|
// Working directory in the pod where the project repo is cloned
|
|
workDir := "/workspace"
|
|
|
|
// Get a code agent
|
|
agent := b.agentRegistry.Default()
|
|
if agent == nil {
|
|
return &domain.BuildResult{
|
|
Success: false,
|
|
Error: "no code agent available",
|
|
DurationMs: time.Since(start).Milliseconds(),
|
|
}
|
|
}
|
|
|
|
// Determine which pod to execute in (from task spec or default)
|
|
podName, _ := task.Spec["pod_name"].(string)
|
|
if podName == "" {
|
|
podName = b.defaultPodName
|
|
}
|
|
|
|
// Clone or update the git repository if git operations are needed.
|
|
// This ensures the workspace is a valid git repo before the agent runs.
|
|
if (spec.AutoCommit || spec.AutoPush) && b.podGitOps != nil {
|
|
if spec.GitCloneURL == "" {
|
|
b.publishEvent(streamID, "BuildEventFailed", map[string]any{
|
|
"task_id": task.ID,
|
|
"error": "git_clone_url is required when auto_commit or auto_push is enabled",
|
|
})
|
|
return &domain.BuildResult{
|
|
Success: false,
|
|
Error: "git_clone_url is required when auto_commit or auto_push is enabled",
|
|
DurationMs: time.Since(start).Milliseconds(),
|
|
}
|
|
}
|
|
|
|
b.logger.Info("ensuring git repository is ready",
|
|
"task_id", task.ID,
|
|
"pod", podName,
|
|
"workDir", workDir,
|
|
)
|
|
|
|
cloneResult := b.podGitOps.CloneRepo(ctx, podName, workDir, spec.GitCloneURL)
|
|
if cloneResult.Error != nil {
|
|
b.publishEvent(streamID, "BuildEventFailed", map[string]any{
|
|
"task_id": task.ID,
|
|
"error": fmt.Sprintf("git clone failed: %v", cloneResult.Error),
|
|
})
|
|
return &domain.BuildResult{
|
|
Success: false,
|
|
Error: fmt.Sprintf("git clone failed: %v", cloneResult.Error),
|
|
DurationMs: time.Since(start).Milliseconds(),
|
|
}
|
|
}
|
|
|
|
if cloneResult.Cloned {
|
|
b.publishEvent(streamID, "BuildEventOutput", map[string]any{
|
|
"content": fmt.Sprintf("Cloned repository to %s", workDir),
|
|
})
|
|
}
|
|
}
|
|
|
|
// Build the agent request with pod metadata for Claude Code adapter
|
|
agentReq := &domain.AgentRequest{
|
|
Prompt: spec.Prompt,
|
|
ProjectID: domain.ProjectID(task.ProjectID),
|
|
WorkingDir: workDir,
|
|
Timeout: 10 * time.Minute,
|
|
Metadata: map[string]string{
|
|
"pod_name": podName,
|
|
"namespace": b.namespace,
|
|
},
|
|
}
|
|
|
|
// Collect output with a size cap to prevent OOM on verbose builds.
|
|
const maxOutputSize = 1 << 20 // 1MB
|
|
var outputBuilder strings.Builder
|
|
|
|
b.logger.Info("executing build via agent",
|
|
"task_id", task.ID,
|
|
"project_id", task.ProjectID,
|
|
"agent", agent.Name(),
|
|
"work_dir", workDir,
|
|
)
|
|
|
|
// Execute the agent
|
|
agentResult, err := agent.Execute(ctx, agentReq, func(event domain.AgentEvent) {
|
|
// Publish all agent events to the SSE stream
|
|
eventType := "BuildEventOutput"
|
|
switch event.Type {
|
|
case domain.AgentEventToolUse:
|
|
eventType = "BuildEventToolUse"
|
|
case domain.AgentEventToolResult:
|
|
eventType = "BuildEventToolResult"
|
|
case domain.AgentEventError:
|
|
eventType = "BuildEventError"
|
|
}
|
|
b.publishEvent(streamID, eventType, map[string]any{
|
|
"content": event.Content,
|
|
"stream": event.Stream,
|
|
"tool_name": event.ToolName,
|
|
})
|
|
|
|
// Also buffer output for final result
|
|
if event.Type == domain.AgentEventOutput || event.Type == domain.AgentEventError {
|
|
if outputBuilder.Len() >= maxOutputSize {
|
|
return // Output cap reached, discard further output
|
|
}
|
|
if outputBuilder.Len() > 0 {
|
|
outputBuilder.WriteString("\n")
|
|
}
|
|
remaining := maxOutputSize - outputBuilder.Len()
|
|
if len(event.Content) > remaining {
|
|
outputBuilder.WriteString(event.Content[:remaining])
|
|
outputBuilder.WriteString("\n... [output truncated at 1MB]")
|
|
} else {
|
|
outputBuilder.WriteString(event.Content)
|
|
}
|
|
}
|
|
})
|
|
|
|
if err != nil {
|
|
b.publishEvent(streamID, "BuildEventFailed", map[string]any{
|
|
"task_id": task.ID,
|
|
"error": fmt.Sprintf("agent execution failed: %v", err),
|
|
"duration_ms": time.Since(start).Milliseconds(),
|
|
})
|
|
b.closeStream(ctx, streamID)
|
|
return &domain.BuildResult{
|
|
Success: false,
|
|
Error: fmt.Sprintf("agent execution failed: %v", err),
|
|
Output: outputBuilder.String(),
|
|
DurationMs: time.Since(start).Milliseconds(),
|
|
}
|
|
}
|
|
|
|
result := &domain.BuildResult{
|
|
Success: agentResult.Success(),
|
|
Output: outputBuilder.String(),
|
|
DurationMs: time.Since(start).Milliseconds(),
|
|
}
|
|
|
|
if !agentResult.Success() {
|
|
errMsg := "agent returned non-zero exit code"
|
|
if agentResult.Error != nil {
|
|
errMsg = agentResult.Error.Error()
|
|
}
|
|
result.Error = errMsg
|
|
}
|
|
|
|
// Post-build git operations: commit and push changes programmatically.
|
|
// This is deterministic - we don't rely on the LLM to run git commands.
|
|
if result.Success && spec.AutoCommit && b.podGitOps != nil {
|
|
commitMsg := fmt.Sprintf("build: %s", truncate(spec.Prompt, 72))
|
|
gitResult := b.podGitOps.CommitAndPush(ctx, podName, workDir, commitMsg, spec.AutoPush)
|
|
|
|
if gitResult.Error != nil {
|
|
b.logger.Warn("post-build git operations failed",
|
|
"task_id", task.ID,
|
|
"error", gitResult.Error,
|
|
)
|
|
result.Success = false
|
|
result.Error = fmt.Sprintf("build succeeded but git operations failed: %v", gitResult.Error)
|
|
} else if gitResult.HasChanges {
|
|
result.CommitSHA = gitResult.CommitSHA
|
|
result.FilesChanged = gitResult.FilesChanged
|
|
b.logger.Info("post-build git operations completed",
|
|
"task_id", task.ID,
|
|
"commit", gitResult.CommitSHA,
|
|
"files", len(gitResult.FilesChanged),
|
|
"pushed", gitResult.Pushed,
|
|
)
|
|
} else {
|
|
b.logger.Info("no changes to commit after build",
|
|
"task_id", task.ID,
|
|
)
|
|
}
|
|
}
|
|
|
|
// Publish completion event
|
|
if result.Success {
|
|
b.publishEvent(streamID, "BuildEventCompleted", map[string]any{
|
|
"task_id": task.ID,
|
|
"success": true,
|
|
"commit_sha": result.CommitSHA,
|
|
"files_changed": result.FilesChanged,
|
|
"duration_ms": result.DurationMs,
|
|
})
|
|
} else {
|
|
b.publishEvent(streamID, "BuildEventFailed", map[string]any{
|
|
"task_id": task.ID,
|
|
"error": result.Error,
|
|
"duration_ms": result.DurationMs,
|
|
})
|
|
}
|
|
b.closeStream(ctx, streamID)
|
|
|
|
return result
|
|
}
|
|
|
|
// publishEvent publishes an event to the SSE stream if a stream publisher is configured.
|
|
func (b *BuildExecutor) publishEvent(streamID, eventType string, data map[string]any) {
|
|
if b.streams == nil {
|
|
return
|
|
}
|
|
b.streams.Publish(streamID, port.StreamEvent{
|
|
Type: eventType,
|
|
Data: data,
|
|
})
|
|
}
|
|
|
|
// streamCloseDelay is the time to wait before closing a stream after build completion.
|
|
// This allows SSE clients to receive final events before the stream is closed.
|
|
const streamCloseDelay = 5 * time.Second
|
|
|
|
// closeStream closes the stream after a delay to allow clients to receive final events.
|
|
// The close is context-aware and respects cancellation.
|
|
func (b *BuildExecutor) closeStream(ctx context.Context, streamID string) {
|
|
if b.streams == nil {
|
|
return
|
|
}
|
|
// Close stream after a short delay to ensure final events are delivered.
|
|
// Use a goroutine with context awareness to avoid race conditions.
|
|
go func() {
|
|
select {
|
|
case <-ctx.Done():
|
|
// Context cancelled, close immediately
|
|
b.streams.Close(streamID)
|
|
case <-time.After(streamCloseDelay):
|
|
b.streams.Close(streamID)
|
|
}
|
|
}()
|
|
}
|
|
|
|
// parsedBuildSpec holds typed fields extracted from the task spec map.
|
|
type parsedBuildSpec struct {
|
|
Prompt string
|
|
AutoCommit bool
|
|
AutoPush bool
|
|
GitCloneURL string
|
|
}
|
|
|
|
// parseSpec extracts typed BuildSpec fields from the generic map[string]any.
|
|
func (b *BuildExecutor) parseSpec(spec map[string]any) (*parsedBuildSpec, error) {
|
|
prompt, _ := spec["prompt"].(string)
|
|
if prompt == "" {
|
|
return nil, fmt.Errorf("prompt is required")
|
|
}
|
|
|
|
autoCommit, _ := spec["auto_commit"].(bool)
|
|
autoPush, _ := spec["auto_push"].(bool)
|
|
gitCloneURL, _ := spec["git_clone_url"].(string)
|
|
|
|
return &parsedBuildSpec{
|
|
Prompt: prompt,
|
|
AutoCommit: autoCommit,
|
|
AutoPush: autoPush,
|
|
GitCloneURL: gitCloneURL,
|
|
}, nil
|
|
}
|
|
|
|
// truncate shortens a string to maxLen, adding "..." if truncated.
|
|
func truncate(s string, maxLen int) string {
|
|
if len(s) <= maxLen {
|
|
return s
|
|
}
|
|
if maxLen <= 3 {
|
|
return s[:maxLen]
|
|
}
|
|
return s[:maxLen-3] + "..."
|
|
}
|