From 910bcb62e1d06bacfa320fdfd6952c46dfa20fa4 Mon Sep 17 00:00:00 2001 From: jordan Date: Sat, 31 Jan 2026 02:07:52 -0700 Subject: [PATCH] fix: Sync build audit with work queue when stale tasks are requeued When a worker dies mid-build, queue maintenance now updates both work_queue and build_audit tables when requeuing stale tasks. This prevents builds from showing "running" forever in the API. Co-Authored-By: Claude Opus 4.5 --- ai-lookup/services/worker-pool.md | 13 ++-- cmd/rdev-api/main.go | 69 ++++++++++++++++++- .../adapter/postgres/work_queue_queries.go | 25 ++++++- internal/handlers/builds_test.go | 9 +-- internal/handlers/work_test.go | 4 ++ internal/port/work_queue.go | 4 ++ internal/service/mock_test.go | 4 ++ internal/worker/mock_test.go | 6 +- internal/worker/queue_maintenance.go | 36 ++++++++-- internal/worker/queue_maintenance_test.go | 16 +++++ 10 files changed, 164 insertions(+), 22 deletions(-) diff --git a/ai-lookup/services/worker-pool.md b/ai-lookup/services/worker-pool.md index 2c5d12b..be631a7 100644 --- a/ai-lookup/services/worker-pool.md +++ b/ai-lookup/services/worker-pool.md @@ -1,13 +1,14 @@ # Worker Pool -**Last Updated:** 2026-01-27 +**Last Updated:** 2026-01-31 **Confidence:** High ## Summary -Shared worker pool that executes build tasks for any project. Currently runs as an embedded WorkExecutor daemon inside rdev-api. Workers register with the worker registry, poll the work queue for tasks, execute Claude Code in cloned repos via GitOperations, and report results with audit trails. +Shared worker pool that executes build tasks for any project. Currently runs as an embedded WorkExecutor daemon inside rdev-api. Workers register with the worker registry, poll the work queue for tasks, execute Claude Code in pods via kubectl exec. Post-build git operations (commit/push) are programmatic via PodGitOperations, not LLM-driven. **Key Facts:** +- **LLM vs rdev boundary:** Claude writes code; rdev handles git ops programmatically (no LLM for runbook tasks) - Embedded WorkExecutor daemon runs inside rdev-api process - Workers poll work queue every 5 seconds, heartbeat every 30 seconds - Stale workers (no heartbeat for 2 minutes) automatically marked offline by QueueMaintenance @@ -27,7 +28,7 @@ Shared worker pool that executes build tasks for any project. Currently runs as - Service: `internal/service/build_service.go` - Executor: `internal/worker/work_executor.go` (poll loop, heartbeat, task routing) - Executor: `internal/worker/build_executor.go` (BuildSpec→AgentRequest) -- Git: `internal/worker/git_operations.go` (clone, commit, push) +- Git: `internal/worker/pod_git_operations.go` (post-build commit/push via kubectl exec) - Maintenance: `internal/worker/queue_maintenance.go` (stale recovery, cleanup, metrics) - Handler: `internal/handlers/workers.go` (REST API for workers) - Handler: `internal/handlers/builds.go` (REST API for builds) @@ -39,7 +40,7 @@ Shared worker pool that executes build tasks for any project. Currently runs as 1. rdev-api starts → WorkExecutor registers as worker in registry 2. Heartbeat loop: every 30s sends heartbeat via WorkerService 3. Poll loop: every 5s dequeues next task from work queue -4. BuildExecutor: clones repo, executes CodeAgent, commits/pushes if auto_commit +4. BuildExecutor: executes CodeAgent in pod, then programmatically commits/pushes if auto_commit 5. Reports completion with BuildResult via WorkerService 6. Graceful shutdown: deregisters worker on rdev-api stop @@ -65,11 +66,13 @@ Shared worker pool that executes build tasks for any project. Currently runs as ## Queue Maintenance The QueueMaintenance worker runs inside rdev-api alongside the WorkExecutor: -- **Stale task recovery** (every 1m): Requeues tasks running >30m without completion +- **Stale task recovery** (every 1m): Requeues tasks running >30m without completion. Also syncs build_audit status to "pending" so API correctly reflects requeued state. - **Stale worker marking** (every 1m): Marks workers offline after 2m without heartbeat - **Old task cleanup** (every 1m): Removes completed/failed/cancelled tasks >7 days old - **Metrics refresh** (every 15s): Updates Prometheus gauges for queue depth and worker counts +**Build Audit Sync:** When stale tasks are requeued, both `work_queue` and `build_audit` tables are updated atomically. This prevents builds from appearing stuck in "running" when the underlying task has been requeued for retry due to worker death. + ## Related Topics - [Work Queue](./work-queue.md) diff --git a/cmd/rdev-api/main.go b/cmd/rdev-api/main.go index a8e6275..140ac9a 100644 --- a/cmd/rdev-api/main.go +++ b/cmd/rdev-api/main.go @@ -40,6 +40,7 @@ import ( "time" "github.com/orchard9/rdev/internal/adapter/cloudflare" + "github.com/orchard9/rdev/internal/adapter/cockroach" "github.com/orchard9/rdev/internal/adapter/codeagent" "github.com/orchard9/rdev/internal/adapter/codeagent/claudecode" "github.com/orchard9/rdev/internal/adapter/codeagent/opencode" @@ -48,6 +49,7 @@ import ( "github.com/orchard9/rdev/internal/adapter/kubernetes" "github.com/orchard9/rdev/internal/adapter/memory" "github.com/orchard9/rdev/internal/adapter/postgres" + redisadapter "github.com/orchard9/rdev/internal/adapter/redis" "github.com/orchard9/rdev/internal/adapter/templates" "github.com/orchard9/rdev/internal/adapter/woodpecker" "github.com/orchard9/rdev/internal/auth" @@ -55,6 +57,7 @@ import ( "github.com/orchard9/rdev/internal/handlers" "github.com/orchard9/rdev/internal/metrics" "github.com/orchard9/rdev/internal/middleware" + "github.com/orchard9/rdev/internal/port" "github.com/orchard9/rdev/internal/service" "github.com/orchard9/rdev/internal/telemetry" "github.com/orchard9/rdev/internal/webhook" @@ -223,6 +226,39 @@ func main() { logger.Info("template provider initialized") } + // Initialize database provisioner (optional - for project database isolation) + var dbProvisioner port.DatabaseProvisioner + if infraCfg.CRDBHost != "" { + var err error + dbProvisioner, err = cockroach.NewProvisioner(cockroach.Config{ + Host: infraCfg.CRDBHost, + Port: infraCfg.CRDBPort, + User: infraCfg.CRDBUser, + SSLMode: infraCfg.CRDBSSLMode, + }, logger) + if err != nil { + logger.Warn("failed to initialize cockroachdb provisioner", "error", err) + } else { + logger.Info("cockroachdb provisioner initialized", "host", infraCfg.CRDBHost) + } + } + + // Initialize cache provisioner (optional - for project cache isolation via Redis ACLs) + var cacheProvisioner port.CacheProvisioner + if infraCfg.RedisHost != "" && infraCfg.RedisPassword != "" { + var err error + cacheProvisioner, err = redisadapter.NewProvisioner(redisadapter.Config{ + Host: infraCfg.RedisHost, + Port: infraCfg.RedisPort, + Password: infraCfg.RedisPassword, + }, logger) + if err != nil { + logger.Warn("failed to initialize redis provisioner", "error", err) + } else { + logger.Info("redis provisioner initialized", "host", infraCfg.RedisHost) + } + } + // Initialize CodeAgent registry (multi-provider support) agentRegistry := codeagent.NewRegistry() @@ -319,6 +355,13 @@ func main() { Logger: logger, }, ) + // Wire optional database and cache provisioners + if dbProvisioner != nil { + projectInfraService = projectInfraService.WithDatabaseProvisioner(dbProvisioner) + } + if cacheProvisioner != nil { + projectInfraService = projectInfraService.WithCacheProvisioner(cacheProvisioner) + } // Create domain service adapter for infrastructure handler domainServiceAdapter := handlers.NewDomainServiceAdapter(projectInfraService) @@ -406,14 +449,17 @@ func main() { } // Start work executor (cross-project worker pool) - var gitOps *worker.GitOperations + // PodGitOperations runs git commands inside the pod via kubectl exec. + // This ensures deterministic post-build commit/push instead of relying on LLMs. + var podGitOps *worker.PodGitOperations if infraCfg.GiteaToken != "" { - gitOps = worker.NewGitOperations(worker.GitOperationsConfig{ + podGitOps = worker.NewPodGitOperations(worker.PodGitOperationsConfig{ + Namespace: "rdev", GiteaToken: infraCfg.GiteaToken, Logger: logger, }) } - buildExecutor := worker.NewBuildExecutor(agentRegistry, gitOps, logger, nil) + buildExecutor := worker.NewBuildExecutor(agentRegistry, podGitOps, streamPub, logger, nil) workerCfg := worker.DefaultWorkExecutorConfig() workerCfg.Logger = logger workExecutor := worker.NewWorkExecutor( @@ -438,6 +484,7 @@ func main() { CleanupAge: 7 * 24 * time.Hour, MaintenancePeriod: 1 * time.Minute, MetricsPeriod: 15 * time.Second, + BuildAudit: buildAuditRepo, // Sync build audit when requeuing stale tasks Logger: logger, }, ) @@ -466,6 +513,22 @@ func main() { // Stop rate limit cleanup worker stopRateLimitCleanup() + // Close database and cache provisioners + if dbProvisioner != nil { + if closer, ok := dbProvisioner.(interface{ Close() error }); ok { + if err := closer.Close(); err != nil { + logger.Warn("failed to close database provisioner", "error", err) + } + } + } + if cacheProvisioner != nil { + if closer, ok := cacheProvisioner.(interface{ Close() error }); ok { + if err := closer.Close(); err != nil { + logger.Warn("failed to close cache provisioner", "error", err) + } + } + } + // Shutdown telemetry (flush pending traces) if err := tel.Shutdown(ctx); err != nil { logger.Error("telemetry shutdown error", "error", err) diff --git a/internal/adapter/postgres/work_queue_queries.go b/internal/adapter/postgres/work_queue_queries.go index a2af0f8..72d600e 100644 --- a/internal/adapter/postgres/work_queue_queries.go +++ b/internal/adapter/postgres/work_queue_queries.go @@ -206,20 +206,39 @@ func (r *WorkQueueRepository) CleanupOld(ctx context.Context, olderThan time.Dur // RequeueStale re-queues tasks that have been running longer than the timeout. func (r *WorkQueueRepository) RequeueStale(ctx context.Context, timeout time.Duration) (int64, error) { + ids, err := r.RequeueStaleWithIDs(ctx, timeout) + if err != nil { + return 0, err + } + return int64(len(ids)), nil +} + +// RequeueStaleWithIDs re-queues stale tasks and returns their IDs. +func (r *WorkQueueRepository) RequeueStaleWithIDs(ctx context.Context, timeout time.Duration) ([]string, error) { cutoff := time.Now().Add(-timeout) - result, err := r.db.ExecContext(ctx, ` + rows, err := r.db.QueryContext(ctx, ` UPDATE work_queue SET status = 'pending', worker_id = NULL, started_at = NULL, retry_count = retry_count + 1, error = 'Worker timeout - task requeued' WHERE status = 'running' AND started_at < $1 AND retry_count < max_retries + RETURNING id `, cutoff) if err != nil { - return 0, fmt.Errorf("requeue stale tasks: %w", err) + return nil, fmt.Errorf("requeue stale tasks: %w", err) } + defer func() { _ = rows.Close() }() - return result.RowsAffected() + var ids []string + for rows.Next() { + var id string + if err := rows.Scan(&id); err != nil { + return nil, fmt.Errorf("scan requeued task id: %w", err) + } + ids = append(ids, id) + } + return ids, rows.Err() } // scanTask scans a single task row. diff --git a/internal/handlers/builds_test.go b/internal/handlers/builds_test.go index 241a698..45eed7d 100644 --- a/internal/handlers/builds_test.go +++ b/internal/handlers/builds_test.go @@ -127,10 +127,11 @@ func TestBuildsHandler_StartBuild(t *testing.T) { name: "valid_build", projectID: "my-project", body: StartBuildRequest{ - Prompt: "Build a landing page with Next.js", - Template: "nextjs-landing", - AutoCommit: true, - AutoPush: true, + Prompt: "Build a landing page with Next.js", + Template: "nextjs-landing", + AutoCommit: true, + AutoPush: true, + GitCloneURL: "https://git.example.com/org/my-project.git", }, wantStatus: http.StatusCreated, }, diff --git a/internal/handlers/work_test.go b/internal/handlers/work_test.go index 77adf35..b7f1b9b 100644 --- a/internal/handlers/work_test.go +++ b/internal/handlers/work_test.go @@ -183,6 +183,10 @@ func (m *mockWorkQueue) RequeueStale(ctx context.Context, timeout time.Duration) return 0, m.err } +func (m *mockWorkQueue) RequeueStaleWithIDs(ctx context.Context, timeout time.Duration) ([]string, error) { + return nil, m.err +} + func TestWorkHandler_Enqueue(t *testing.T) { mockQueue := newMockWorkQueue() workService := service.NewWorkService(mockQueue, service.WorkServiceConfig{}) diff --git a/internal/port/work_queue.go b/internal/port/work_queue.go index 67d9993..6fb5f44 100644 --- a/internal/port/work_queue.go +++ b/internal/port/work_queue.go @@ -47,4 +47,8 @@ type WorkQueue interface { // RequeueStale re-queues tasks that have been running longer than the timeout. // This handles workers that crashed without reporting completion. RequeueStale(ctx context.Context, timeout time.Duration) (int64, error) + + // RequeueStaleWithIDs re-queues stale tasks and returns their IDs. + // Used when callers need to sync external state (e.g., build audit). + RequeueStaleWithIDs(ctx context.Context, timeout time.Duration) ([]string, error) } diff --git a/internal/service/mock_test.go b/internal/service/mock_test.go index ddf7363..4a394ec 100644 --- a/internal/service/mock_test.go +++ b/internal/service/mock_test.go @@ -93,6 +93,10 @@ func (m *mockWorkQueue) RequeueStale(ctx context.Context, timeout time.Duration) return 0, nil } +func (m *mockWorkQueue) RequeueStaleWithIDs(ctx context.Context, timeout time.Duration) ([]string, error) { + return nil, nil +} + // mockBuildAudit implements port.BuildAudit for service tests. type mockBuildAudit struct { entries map[string]*domain.BuildAuditEntry diff --git a/internal/worker/mock_test.go b/internal/worker/mock_test.go index be1c35d..1b8970c 100644 --- a/internal/worker/mock_test.go +++ b/internal/worker/mock_test.go @@ -112,6 +112,10 @@ func (m *mockWorkQueue) RequeueStale(_ context.Context, _ time.Duration) (int64, return 0, nil } +func (m *mockWorkQueue) RequeueStaleWithIDs(_ context.Context, _ time.Duration) ([]string, error) { + return nil, nil +} + type mockWorkerRegistry struct { mu sync.Mutex workers map[string]*domain.Worker @@ -320,7 +324,7 @@ func newTestDeps() *testDeps { WithBuildAudit(audit) workSvc := service.NewWorkService(queue, service.WorkServiceConfig{}) - buildExec := NewBuildExecutor(agentRegistry, nil, nil, nil) + buildExec := NewBuildExecutor(agentRegistry, nil, nil, nil, nil) return &testDeps{ queue: queue, diff --git a/internal/worker/queue_maintenance.go b/internal/worker/queue_maintenance.go index f84478f..2c4ab21 100644 --- a/internal/worker/queue_maintenance.go +++ b/internal/worker/queue_maintenance.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/orchard9/rdev/internal/domain" "github.com/orchard9/rdev/internal/metrics" "github.com/orchard9/rdev/internal/port" ) @@ -14,9 +15,10 @@ import ( // and worker registry: stale task recovery, stale worker marking, // old task cleanup, and queue metrics refresh. type QueueMaintenance struct { - queue port.WorkQueue - registry port.WorkerRegistry - logger *slog.Logger + queue port.WorkQueue + registry port.WorkerRegistry + buildAudit port.BuildAudit // Optional: syncs build audit when requeuing stale tasks + logger *slog.Logger // Intervals staleTaskTimeout time.Duration @@ -52,6 +54,10 @@ type QueueMaintenanceConfig struct { // Default: 15 seconds. MetricsPeriod time.Duration + // BuildAudit syncs build audit status when requeuing stale tasks. + // If nil, build audit is not updated (legacy behavior). + BuildAudit port.BuildAudit + Logger *slog.Logger } @@ -82,6 +88,7 @@ func NewQueueMaintenance( return &QueueMaintenance{ queue: queue, registry: registry, + buildAudit: cfg.BuildAudit, logger: cfg.Logger.With("component", "queue-maintenance"), staleTaskTimeout: cfg.StaleTaskTimeout, staleWorkerTimeout: cfg.StaleWorkerTimeout, @@ -168,14 +175,31 @@ func (m *QueueMaintenance) runMaintenance() { // requeueStaleTasks requeues tasks that have been running too long // (the worker likely crashed without reporting). +// Also syncs build audit to pending status if configured. func (m *QueueMaintenance) requeueStaleTasks(ctx context.Context) { - count, err := m.queue.RequeueStale(ctx, m.staleTaskTimeout) + // Use RequeueStaleWithIDs to get task IDs for build audit sync + taskIDs, err := m.queue.RequeueStaleWithIDs(ctx, m.staleTaskTimeout) if err != nil { m.logger.Warn("failed to requeue stale tasks", "error", err) return } - if count > 0 { - m.logger.Info("requeued stale tasks", "count", count) + if len(taskIDs) == 0 { + return + } + + m.logger.Info("requeued stale tasks", "count", len(taskIDs), "task_ids", taskIDs) + + // Sync build audit status if configured + if m.buildAudit != nil { + for _, taskID := range taskIDs { + // Update build audit to pending (worker assignment cleared) + if err := m.buildAudit.UpdateStatus(ctx, taskID, domain.BuildStatusPending, ""); err != nil { + m.logger.Warn("failed to sync build audit for requeued task", + "task_id", taskID, + "error", err, + ) + } + } } } diff --git a/internal/worker/queue_maintenance_test.go b/internal/worker/queue_maintenance_test.go index df20a34..d00871c 100644 --- a/internal/worker/queue_maintenance_test.go +++ b/internal/worker/queue_maintenance_test.go @@ -2,6 +2,7 @@ package worker import ( "context" + "fmt" "log/slog" "sync" "testing" @@ -93,6 +94,21 @@ func (m *mockMaintenanceQueue) RequeueStale(_ context.Context, _ time.Duration) return m.requeueCount, nil } +func (m *mockMaintenanceQueue) RequeueStaleWithIDs(_ context.Context, _ time.Duration) ([]string, error) { + m.mu.Lock() + defer m.mu.Unlock() + m.requeueCalls++ + if m.err != nil { + return nil, m.err + } + // Generate mock IDs based on requeueCount + var ids []string + for i := int64(0); i < m.requeueCount; i++ { + ids = append(ids, fmt.Sprintf("task-%d", i+1)) + } + return ids, nil +} + // mockMaintenanceRegistry implements port.WorkerRegistry for maintenance tests. type mockMaintenanceRegistry struct { mu sync.Mutex