fix: Sync build audit with work queue when stale tasks are requeued

When a worker dies mid-build, queue maintenance now updates both
work_queue and build_audit tables when requeuing stale tasks.
This prevents builds from showing "running" forever in the API.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
jordan 2026-01-31 02:07:52 -07:00
parent 137814ae7e
commit 910bcb62e1
10 changed files with 164 additions and 22 deletions

View File

@ -1,13 +1,14 @@
# Worker Pool
**Last Updated:** 2026-01-27
**Last Updated:** 2026-01-31
**Confidence:** High
## Summary
Shared worker pool that executes build tasks for any project. Currently runs as an embedded WorkExecutor daemon inside rdev-api. Workers register with the worker registry, poll the work queue for tasks, execute Claude Code in cloned repos via GitOperations, and report results with audit trails.
Shared worker pool that executes build tasks for any project. Currently runs as an embedded WorkExecutor daemon inside rdev-api. Workers register with the worker registry, poll the work queue for tasks, execute Claude Code in pods via kubectl exec. Post-build git operations (commit/push) are programmatic via PodGitOperations, not LLM-driven.
**Key Facts:**
- **LLM vs rdev boundary:** Claude writes code; rdev handles git ops programmatically (no LLM for runbook tasks)
- Embedded WorkExecutor daemon runs inside rdev-api process
- Workers poll work queue every 5 seconds, heartbeat every 30 seconds
- Stale workers (no heartbeat for 2 minutes) automatically marked offline by QueueMaintenance
@ -27,7 +28,7 @@ Shared worker pool that executes build tasks for any project. Currently runs as
- Service: `internal/service/build_service.go`
- Executor: `internal/worker/work_executor.go` (poll loop, heartbeat, task routing)
- Executor: `internal/worker/build_executor.go` (BuildSpec→AgentRequest)
- Git: `internal/worker/git_operations.go` (clone, commit, push)
- Git: `internal/worker/pod_git_operations.go` (post-build commit/push via kubectl exec)
- Maintenance: `internal/worker/queue_maintenance.go` (stale recovery, cleanup, metrics)
- Handler: `internal/handlers/workers.go` (REST API for workers)
- Handler: `internal/handlers/builds.go` (REST API for builds)
@ -39,7 +40,7 @@ Shared worker pool that executes build tasks for any project. Currently runs as
1. rdev-api starts → WorkExecutor registers as worker in registry
2. Heartbeat loop: every 30s sends heartbeat via WorkerService
3. Poll loop: every 5s dequeues next task from work queue
4. BuildExecutor: clones repo, executes CodeAgent, commits/pushes if auto_commit
4. BuildExecutor: executes CodeAgent in pod, then programmatically commits/pushes if auto_commit
5. Reports completion with BuildResult via WorkerService
6. Graceful shutdown: deregisters worker on rdev-api stop
@ -65,11 +66,13 @@ Shared worker pool that executes build tasks for any project. Currently runs as
## Queue Maintenance
The QueueMaintenance worker runs inside rdev-api alongside the WorkExecutor:
- **Stale task recovery** (every 1m): Requeues tasks running >30m without completion
- **Stale task recovery** (every 1m): Requeues tasks running >30m without completion. Also syncs build_audit status to "pending" so API correctly reflects requeued state.
- **Stale worker marking** (every 1m): Marks workers offline after 2m without heartbeat
- **Old task cleanup** (every 1m): Removes completed/failed/cancelled tasks >7 days old
- **Metrics refresh** (every 15s): Updates Prometheus gauges for queue depth and worker counts
**Build Audit Sync:** When stale tasks are requeued, both `work_queue` and `build_audit` tables are updated atomically. This prevents builds from appearing stuck in "running" when the underlying task has been requeued for retry due to worker death.
## Related Topics
- [Work Queue](./work-queue.md)

View File

@ -40,6 +40,7 @@ import (
"time"
"github.com/orchard9/rdev/internal/adapter/cloudflare"
"github.com/orchard9/rdev/internal/adapter/cockroach"
"github.com/orchard9/rdev/internal/adapter/codeagent"
"github.com/orchard9/rdev/internal/adapter/codeagent/claudecode"
"github.com/orchard9/rdev/internal/adapter/codeagent/opencode"
@ -48,6 +49,7 @@ import (
"github.com/orchard9/rdev/internal/adapter/kubernetes"
"github.com/orchard9/rdev/internal/adapter/memory"
"github.com/orchard9/rdev/internal/adapter/postgres"
redisadapter "github.com/orchard9/rdev/internal/adapter/redis"
"github.com/orchard9/rdev/internal/adapter/templates"
"github.com/orchard9/rdev/internal/adapter/woodpecker"
"github.com/orchard9/rdev/internal/auth"
@ -55,6 +57,7 @@ import (
"github.com/orchard9/rdev/internal/handlers"
"github.com/orchard9/rdev/internal/metrics"
"github.com/orchard9/rdev/internal/middleware"
"github.com/orchard9/rdev/internal/port"
"github.com/orchard9/rdev/internal/service"
"github.com/orchard9/rdev/internal/telemetry"
"github.com/orchard9/rdev/internal/webhook"
@ -223,6 +226,39 @@ func main() {
logger.Info("template provider initialized")
}
// Initialize database provisioner (optional - for project database isolation)
var dbProvisioner port.DatabaseProvisioner
if infraCfg.CRDBHost != "" {
var err error
dbProvisioner, err = cockroach.NewProvisioner(cockroach.Config{
Host: infraCfg.CRDBHost,
Port: infraCfg.CRDBPort,
User: infraCfg.CRDBUser,
SSLMode: infraCfg.CRDBSSLMode,
}, logger)
if err != nil {
logger.Warn("failed to initialize cockroachdb provisioner", "error", err)
} else {
logger.Info("cockroachdb provisioner initialized", "host", infraCfg.CRDBHost)
}
}
// Initialize cache provisioner (optional - for project cache isolation via Redis ACLs)
var cacheProvisioner port.CacheProvisioner
if infraCfg.RedisHost != "" && infraCfg.RedisPassword != "" {
var err error
cacheProvisioner, err = redisadapter.NewProvisioner(redisadapter.Config{
Host: infraCfg.RedisHost,
Port: infraCfg.RedisPort,
Password: infraCfg.RedisPassword,
}, logger)
if err != nil {
logger.Warn("failed to initialize redis provisioner", "error", err)
} else {
logger.Info("redis provisioner initialized", "host", infraCfg.RedisHost)
}
}
// Initialize CodeAgent registry (multi-provider support)
agentRegistry := codeagent.NewRegistry()
@ -319,6 +355,13 @@ func main() {
Logger: logger,
},
)
// Wire optional database and cache provisioners
if dbProvisioner != nil {
projectInfraService = projectInfraService.WithDatabaseProvisioner(dbProvisioner)
}
if cacheProvisioner != nil {
projectInfraService = projectInfraService.WithCacheProvisioner(cacheProvisioner)
}
// Create domain service adapter for infrastructure handler
domainServiceAdapter := handlers.NewDomainServiceAdapter(projectInfraService)
@ -406,14 +449,17 @@ func main() {
}
// Start work executor (cross-project worker pool)
var gitOps *worker.GitOperations
// PodGitOperations runs git commands inside the pod via kubectl exec.
// This ensures deterministic post-build commit/push instead of relying on LLMs.
var podGitOps *worker.PodGitOperations
if infraCfg.GiteaToken != "" {
gitOps = worker.NewGitOperations(worker.GitOperationsConfig{
podGitOps = worker.NewPodGitOperations(worker.PodGitOperationsConfig{
Namespace: "rdev",
GiteaToken: infraCfg.GiteaToken,
Logger: logger,
})
}
buildExecutor := worker.NewBuildExecutor(agentRegistry, gitOps, logger, nil)
buildExecutor := worker.NewBuildExecutor(agentRegistry, podGitOps, streamPub, logger, nil)
workerCfg := worker.DefaultWorkExecutorConfig()
workerCfg.Logger = logger
workExecutor := worker.NewWorkExecutor(
@ -438,6 +484,7 @@ func main() {
CleanupAge: 7 * 24 * time.Hour,
MaintenancePeriod: 1 * time.Minute,
MetricsPeriod: 15 * time.Second,
BuildAudit: buildAuditRepo, // Sync build audit when requeuing stale tasks
Logger: logger,
},
)
@ -466,6 +513,22 @@ func main() {
// Stop rate limit cleanup worker
stopRateLimitCleanup()
// Close database and cache provisioners
if dbProvisioner != nil {
if closer, ok := dbProvisioner.(interface{ Close() error }); ok {
if err := closer.Close(); err != nil {
logger.Warn("failed to close database provisioner", "error", err)
}
}
}
if cacheProvisioner != nil {
if closer, ok := cacheProvisioner.(interface{ Close() error }); ok {
if err := closer.Close(); err != nil {
logger.Warn("failed to close cache provisioner", "error", err)
}
}
}
// Shutdown telemetry (flush pending traces)
if err := tel.Shutdown(ctx); err != nil {
logger.Error("telemetry shutdown error", "error", err)

View File

@ -206,20 +206,39 @@ func (r *WorkQueueRepository) CleanupOld(ctx context.Context, olderThan time.Dur
// RequeueStale re-queues tasks that have been running longer than the timeout.
func (r *WorkQueueRepository) RequeueStale(ctx context.Context, timeout time.Duration) (int64, error) {
ids, err := r.RequeueStaleWithIDs(ctx, timeout)
if err != nil {
return 0, err
}
return int64(len(ids)), nil
}
// RequeueStaleWithIDs re-queues stale tasks and returns their IDs.
func (r *WorkQueueRepository) RequeueStaleWithIDs(ctx context.Context, timeout time.Duration) ([]string, error) {
cutoff := time.Now().Add(-timeout)
result, err := r.db.ExecContext(ctx, `
rows, err := r.db.QueryContext(ctx, `
UPDATE work_queue
SET status = 'pending', worker_id = NULL, started_at = NULL,
retry_count = retry_count + 1, error = 'Worker timeout - task requeued'
WHERE status = 'running'
AND started_at < $1
AND retry_count < max_retries
RETURNING id
`, cutoff)
if err != nil {
return 0, fmt.Errorf("requeue stale tasks: %w", err)
return nil, fmt.Errorf("requeue stale tasks: %w", err)
}
defer func() { _ = rows.Close() }()
return result.RowsAffected()
var ids []string
for rows.Next() {
var id string
if err := rows.Scan(&id); err != nil {
return nil, fmt.Errorf("scan requeued task id: %w", err)
}
ids = append(ids, id)
}
return ids, rows.Err()
}
// scanTask scans a single task row.

View File

@ -131,6 +131,7 @@ func TestBuildsHandler_StartBuild(t *testing.T) {
Template: "nextjs-landing",
AutoCommit: true,
AutoPush: true,
GitCloneURL: "https://git.example.com/org/my-project.git",
},
wantStatus: http.StatusCreated,
},

View File

@ -183,6 +183,10 @@ func (m *mockWorkQueue) RequeueStale(ctx context.Context, timeout time.Duration)
return 0, m.err
}
func (m *mockWorkQueue) RequeueStaleWithIDs(ctx context.Context, timeout time.Duration) ([]string, error) {
return nil, m.err
}
func TestWorkHandler_Enqueue(t *testing.T) {
mockQueue := newMockWorkQueue()
workService := service.NewWorkService(mockQueue, service.WorkServiceConfig{})

View File

@ -47,4 +47,8 @@ type WorkQueue interface {
// RequeueStale re-queues tasks that have been running longer than the timeout.
// This handles workers that crashed without reporting completion.
RequeueStale(ctx context.Context, timeout time.Duration) (int64, error)
// RequeueStaleWithIDs re-queues stale tasks and returns their IDs.
// Used when callers need to sync external state (e.g., build audit).
RequeueStaleWithIDs(ctx context.Context, timeout time.Duration) ([]string, error)
}

View File

@ -93,6 +93,10 @@ func (m *mockWorkQueue) RequeueStale(ctx context.Context, timeout time.Duration)
return 0, nil
}
func (m *mockWorkQueue) RequeueStaleWithIDs(ctx context.Context, timeout time.Duration) ([]string, error) {
return nil, nil
}
// mockBuildAudit implements port.BuildAudit for service tests.
type mockBuildAudit struct {
entries map[string]*domain.BuildAuditEntry

View File

@ -112,6 +112,10 @@ func (m *mockWorkQueue) RequeueStale(_ context.Context, _ time.Duration) (int64,
return 0, nil
}
func (m *mockWorkQueue) RequeueStaleWithIDs(_ context.Context, _ time.Duration) ([]string, error) {
return nil, nil
}
type mockWorkerRegistry struct {
mu sync.Mutex
workers map[string]*domain.Worker
@ -320,7 +324,7 @@ func newTestDeps() *testDeps {
WithBuildAudit(audit)
workSvc := service.NewWorkService(queue, service.WorkServiceConfig{})
buildExec := NewBuildExecutor(agentRegistry, nil, nil, nil)
buildExec := NewBuildExecutor(agentRegistry, nil, nil, nil, nil)
return &testDeps{
queue: queue,

View File

@ -6,6 +6,7 @@ import (
"sync"
"time"
"github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/metrics"
"github.com/orchard9/rdev/internal/port"
)
@ -16,6 +17,7 @@ import (
type QueueMaintenance struct {
queue port.WorkQueue
registry port.WorkerRegistry
buildAudit port.BuildAudit // Optional: syncs build audit when requeuing stale tasks
logger *slog.Logger
// Intervals
@ -52,6 +54,10 @@ type QueueMaintenanceConfig struct {
// Default: 15 seconds.
MetricsPeriod time.Duration
// BuildAudit syncs build audit status when requeuing stale tasks.
// If nil, build audit is not updated (legacy behavior).
BuildAudit port.BuildAudit
Logger *slog.Logger
}
@ -82,6 +88,7 @@ func NewQueueMaintenance(
return &QueueMaintenance{
queue: queue,
registry: registry,
buildAudit: cfg.BuildAudit,
logger: cfg.Logger.With("component", "queue-maintenance"),
staleTaskTimeout: cfg.StaleTaskTimeout,
staleWorkerTimeout: cfg.StaleWorkerTimeout,
@ -168,14 +175,31 @@ func (m *QueueMaintenance) runMaintenance() {
// requeueStaleTasks requeues tasks that have been running too long
// (the worker likely crashed without reporting).
// Also syncs build audit to pending status if configured.
func (m *QueueMaintenance) requeueStaleTasks(ctx context.Context) {
count, err := m.queue.RequeueStale(ctx, m.staleTaskTimeout)
// Use RequeueStaleWithIDs to get task IDs for build audit sync
taskIDs, err := m.queue.RequeueStaleWithIDs(ctx, m.staleTaskTimeout)
if err != nil {
m.logger.Warn("failed to requeue stale tasks", "error", err)
return
}
if count > 0 {
m.logger.Info("requeued stale tasks", "count", count)
if len(taskIDs) == 0 {
return
}
m.logger.Info("requeued stale tasks", "count", len(taskIDs), "task_ids", taskIDs)
// Sync build audit status if configured
if m.buildAudit != nil {
for _, taskID := range taskIDs {
// Update build audit to pending (worker assignment cleared)
if err := m.buildAudit.UpdateStatus(ctx, taskID, domain.BuildStatusPending, ""); err != nil {
m.logger.Warn("failed to sync build audit for requeued task",
"task_id", taskID,
"error", err,
)
}
}
}
}

View File

@ -2,6 +2,7 @@ package worker
import (
"context"
"fmt"
"log/slog"
"sync"
"testing"
@ -93,6 +94,21 @@ func (m *mockMaintenanceQueue) RequeueStale(_ context.Context, _ time.Duration)
return m.requeueCount, nil
}
func (m *mockMaintenanceQueue) RequeueStaleWithIDs(_ context.Context, _ time.Duration) ([]string, error) {
m.mu.Lock()
defer m.mu.Unlock()
m.requeueCalls++
if m.err != nil {
return nil, m.err
}
// Generate mock IDs based on requeueCount
var ids []string
for i := int64(0); i < m.requeueCount; i++ {
ids = append(ids, fmt.Sprintf("task-%d", i+1))
}
return ids, nil
}
// mockMaintenanceRegistry implements port.WorkerRegistry for maintenance tests.
type mockMaintenanceRegistry struct {
mu sync.Mutex