From cfba724f8a9ff06ba0ccb13bd0e309200164e6c0 Mon Sep 17 00:00:00 2001 From: jordan Date: Tue, 3 Feb 2026 00:07:34 -0700 Subject: [PATCH] feat: add work task error classification and user-facing error codes - Add WorkErrorCode type with RATE_LIMITED, AUTH_FAILED, TIMEOUT, STALE_WORKER, AGENT_ERROR, INVALID_SPEC - Add ClassifyAgentError function to detect error patterns from stderr - Add error_code column to work_queue table (migration 016) - Add FailWithCode method to WorkQueue interface and implementations - Update RequeueStaleWithIDs to mark permanently failed tasks with STALE_WORKER - Add ErrorCode to BuildResult for API responses - Update work executor to classify errors before failing tasks This enables users to see actual failure reasons (e.g., "RATE_LIMITED") instead of builds stuck in "running" state forever when Claude hits rate limits. Co-Authored-By: Claude Opus 4.5 --- internal/adapter/postgres/work_queue.go | 25 +++- .../adapter/postgres/work_queue_queries.go | 34 ++++- .../migrations/016_work_queue_error_code.sql | 10 ++ internal/domain/build.go | 4 + internal/domain/work.go | 139 +++++++++++++++++- internal/domain/work_test.go | 103 +++++++++++++ internal/handlers/builds.go | 2 + internal/handlers/work_test.go | 5 + internal/port/work_queue.go | 5 + internal/service/mock_test.go | 14 ++ internal/service/work_service.go | 9 +- internal/worker/mock_test.go | 50 +++++++ internal/worker/queue_maintenance_test.go | 4 + internal/worker/work_executor.go | 25 +++- internal/worker/work_executor_test.go | 8 +- 15 files changed, 420 insertions(+), 17 deletions(-) create mode 100644 internal/db/migrations/016_work_queue_error_code.sql create mode 100644 internal/domain/work_test.go diff --git a/internal/adapter/postgres/work_queue.go b/internal/adapter/postgres/work_queue.go index 549be4b..d0cdbff 100644 --- a/internal/adapter/postgres/work_queue.go +++ b/internal/adapter/postgres/work_queue.go @@ -167,7 +167,22 @@ func (r *WorkQueueRepository) Complete(ctx context.Context, taskID string, resul // Fail marks a task as failed with an error message. // Uses a single atomic UPDATE to avoid race conditions between SELECT and UPDATE. func (r *WorkQueueRepository) Fail(ctx context.Context, taskID string, errMsg string) error { - // Use a single atomic query that handles both retry and permanent failure cases + return r.FailWithCode(ctx, taskID, errMsg, domain.WorkErrorCodeNone) +} + +// FailWithCode marks a task as failed with an error message and categorized error code. +// The error code enables clients to distinguish failure types (rate limit, auth, timeout). +// If retry_count < max_retries, the task will be re-queued as pending (error_code cleared). +func (r *WorkQueueRepository) FailWithCode(ctx context.Context, taskID string, errMsg string, code domain.WorkErrorCode) error { + // Use a single atomic query that handles both retry and permanent failure cases. + // When retrying, clear error_code so the task gets a fresh start. + // Only set error_code on permanent failure. + var errorCode *string + if code != domain.WorkErrorCodeNone { + codeStr := string(code) + errorCode = &codeStr + } + result, err := r.db.ExecContext(ctx, ` UPDATE work_queue SET @@ -191,9 +206,13 @@ func (r *WorkQueueRepository) Fail(ctx context.Context, taskID string, errMsg st WHEN retry_count < max_retries THEN retry_count + 1 ELSE retry_count END, - error = $1 + error = $1, + error_code = CASE + WHEN retry_count >= max_retries THEN $3 + ELSE NULL + END WHERE id = $2 - `, errMsg, taskID) + `, errMsg, taskID, errorCode) if err != nil { return fmt.Errorf("fail work task: %w", err) diff --git a/internal/adapter/postgres/work_queue_queries.go b/internal/adapter/postgres/work_queue_queries.go index 72d600e..9cf1582 100644 --- a/internal/adapter/postgres/work_queue_queries.go +++ b/internal/adapter/postgres/work_queue_queries.go @@ -23,11 +23,12 @@ func (r *WorkQueueRepository) GetTask(ctx context.Context, taskID string) (*doma var completedAt sql.NullTime var resultJSON []byte var errorMsg sql.NullString + var errorCode sql.NullString err := r.db.QueryRowContext(ctx, ` SELECT id, project_id, task_type, task_spec, status, priority, worker_id, callback_url, created_at, started_at, completed_at, result, error, - retry_count, max_retries + retry_count, max_retries, error_code FROM work_queue WHERE id = $1 `, taskID).Scan( @@ -46,6 +47,7 @@ func (r *WorkQueueRepository) GetTask(ctx context.Context, taskID string) (*doma &errorMsg, &task.RetryCount, &task.MaxRetries, + &errorCode, ) if errors.Is(err, sql.ErrNoRows) { @@ -73,6 +75,9 @@ func (r *WorkQueueRepository) GetTask(ctx context.Context, taskID string) (*doma if errorMsg.Valid { task.Error = errorMsg.String } + if errorCode.Valid { + task.ErrorCode = domain.WorkErrorCode(errorCode.String) + } // Parse task spec if len(specJSON) > 0 { @@ -119,7 +124,7 @@ func (r *WorkQueueRepository) ListByProject(ctx context.Context, projectID strin query := fmt.Sprintf(` SELECT id, project_id, task_type, task_spec, status, priority, worker_id, callback_url, created_at, started_at, completed_at, result, error, - retry_count, max_retries + retry_count, max_retries, error_code FROM work_queue %s ORDER BY created_at DESC @@ -214,12 +219,30 @@ func (r *WorkQueueRepository) RequeueStale(ctx context.Context, timeout time.Dur } // RequeueStaleWithIDs re-queues stale tasks and returns their IDs. +// Tasks that have exceeded max_retries are marked as failed with STALE_WORKER error code. func (r *WorkQueueRepository) RequeueStaleWithIDs(ctx context.Context, timeout time.Duration) ([]string, error) { cutoff := time.Now().Add(-timeout) + + // First, mark tasks that have exceeded max_retries as permanently failed + _, err := r.db.ExecContext(ctx, ` + UPDATE work_queue + SET status = 'failed', completed_at = NOW(), + error = 'Worker timeout - max retries exceeded', + error_code = 'STALE_WORKER' + WHERE status = 'running' + AND started_at < $1 + AND retry_count >= max_retries + `, cutoff) + if err != nil { + return nil, fmt.Errorf("fail stale tasks: %w", err) + } + + // Then, requeue tasks that can still be retried rows, err := r.db.QueryContext(ctx, ` UPDATE work_queue SET status = 'pending', worker_id = NULL, started_at = NULL, - retry_count = retry_count + 1, error = 'Worker timeout - task requeued' + retry_count = retry_count + 1, error = 'Worker timeout - task requeued', + error_code = NULL WHERE status = 'running' AND started_at < $1 AND retry_count < max_retries @@ -253,6 +276,7 @@ func (r *WorkQueueRepository) scanTask(rows *sql.Rows) (*domain.WorkTask, error) var completedAt sql.NullTime var resultJSON []byte var errorMsg sql.NullString + var errorCode sql.NullString err := rows.Scan( &task.ID, @@ -270,6 +294,7 @@ func (r *WorkQueueRepository) scanTask(rows *sql.Rows) (*domain.WorkTask, error) &errorMsg, &task.RetryCount, &task.MaxRetries, + &errorCode, ) if err != nil { return nil, fmt.Errorf("scan task: %w", err) @@ -293,6 +318,9 @@ func (r *WorkQueueRepository) scanTask(rows *sql.Rows) (*domain.WorkTask, error) if errorMsg.Valid { task.Error = errorMsg.String } + if errorCode.Valid { + task.ErrorCode = domain.WorkErrorCode(errorCode.String) + } // Parse task spec if len(specJSON) > 0 { diff --git a/internal/db/migrations/016_work_queue_error_code.sql b/internal/db/migrations/016_work_queue_error_code.sql new file mode 100644 index 0000000..4b59af8 --- /dev/null +++ b/internal/db/migrations/016_work_queue_error_code.sql @@ -0,0 +1,10 @@ +-- Add error_code column to work_queue for categorized failure handling. +-- This enables clients to distinguish between different failure modes +-- (rate limit, auth failure, timeout, stale worker) and respond appropriately. + +ALTER TABLE work_queue ADD COLUMN IF NOT EXISTS error_code VARCHAR(50); + +-- Index for querying tasks by error code (useful for metrics and debugging) +CREATE INDEX IF NOT EXISTS idx_work_queue_error_code ON work_queue(error_code) WHERE error_code IS NOT NULL; + +COMMENT ON COLUMN work_queue.error_code IS 'Categorized error type: RATE_LIMITED, AUTH_FAILED, TIMEOUT, STALE_WORKER, AGENT_ERROR, INVALID_SPEC'; diff --git a/internal/domain/build.go b/internal/domain/build.go index 86b57b1..3024f8a 100644 --- a/internal/domain/build.go +++ b/internal/domain/build.go @@ -101,6 +101,10 @@ type BuildResult struct { // Error contains the error message if the build failed. Error string `json:"error,omitempty"` + // ErrorCode categorizes the failure type for programmatic handling. + // Values: RATE_LIMITED, AUTH_FAILED, TIMEOUT, STALE_WORKER, AGENT_ERROR, INVALID_SPEC + ErrorCode WorkErrorCode `json:"error_code,omitempty"` + // CommitSHA is the git commit hash if auto-commit was enabled. CommitSHA string `json:"commit_sha,omitempty"` diff --git a/internal/domain/work.go b/internal/domain/work.go index f7157c6..0870823 100644 --- a/internal/domain/work.go +++ b/internal/domain/work.go @@ -13,6 +13,138 @@ const ( WorkTaskStatusCancelled WorkTaskStatus = "cancelled" ) +// WorkErrorCode represents a categorized error type for failed tasks. +// This enables clients to distinguish between different failure modes +// and take appropriate action (e.g., retry vs wait vs report). +type WorkErrorCode string + +const ( + // WorkErrorCodeNone indicates no error (task succeeded or still running). + WorkErrorCodeNone WorkErrorCode = "" + + // WorkErrorCodeRateLimited indicates the agent hit its rate limit. + // Client should wait for the limit to reset before retrying. + WorkErrorCodeRateLimited WorkErrorCode = "RATE_LIMITED" + + // WorkErrorCodeAuthFailed indicates authentication/authorization failure. + // Requires manual intervention to re-authenticate the agent. + WorkErrorCodeAuthFailed WorkErrorCode = "AUTH_FAILED" + + // WorkErrorCodeTimeout indicates the task exceeded its time limit. + // May be retried, possibly with a longer timeout or simpler prompt. + WorkErrorCodeTimeout WorkErrorCode = "TIMEOUT" + + // WorkErrorCodeStaleWorker indicates the worker stopped responding. + // The task was recovered by maintenance and can be retried. + WorkErrorCodeStaleWorker WorkErrorCode = "STALE_WORKER" + + // WorkErrorCodeAgentError indicates a generic agent execution error. + // The error message contains details. + WorkErrorCodeAgentError WorkErrorCode = "AGENT_ERROR" + + // WorkErrorCodeInvalidSpec indicates the task specification was invalid. + // Should not be retried without fixing the spec. + WorkErrorCodeInvalidSpec WorkErrorCode = "INVALID_SPEC" +) + +// ClassifyAgentError examines an error message and stderr output to determine +// the appropriate error code. This enables automated handling of known failure modes. +func ClassifyAgentError(errMsg, stderr string) WorkErrorCode { + combined := errMsg + "\n" + stderr + + // Rate limit detection - Claude Code specific messages + rateLimitPatterns := []string{ + "You've hit your limit", + "rate limit", + "Rate limit", + "too many requests", + "Too many requests", + "quota exceeded", + "Quota exceeded", + } + for _, pattern := range rateLimitPatterns { + if containsIgnoreCase(combined, pattern) { + return WorkErrorCodeRateLimited + } + } + + // Authentication failure detection + authPatterns := []string{ + "not authenticated", + "authentication failed", + "unauthorized", + "Unauthorized", + "invalid api key", + "Invalid API key", + "please log in", + "Please log in", + "claude login", + } + for _, pattern := range authPatterns { + if containsIgnoreCase(combined, pattern) { + return WorkErrorCodeAuthFailed + } + } + + // Timeout detection + timeoutPatterns := []string{ + "context deadline exceeded", + "context canceled", + "timeout", + "Timeout", + "timed out", + } + for _, pattern := range timeoutPatterns { + if containsIgnoreCase(combined, pattern) { + return WorkErrorCodeTimeout + } + } + + // Default to generic agent error + return WorkErrorCodeAgentError +} + +// containsIgnoreCase checks if s contains substr (case-insensitive). +func containsIgnoreCase(s, substr string) bool { + return len(s) >= len(substr) && + (s == substr || + len(substr) == 0 || + findIgnoreCase(s, substr) >= 0) +} + +// findIgnoreCase finds substr in s (case-insensitive), returns -1 if not found. +func findIgnoreCase(s, substr string) int { + if len(substr) == 0 { + return 0 + } + if len(s) < len(substr) { + return -1 + } + // Simple linear search with case-insensitive comparison + for i := 0; i <= len(s)-len(substr); i++ { + match := true + for j := 0; j < len(substr); j++ { + sc := s[i+j] + pc := substr[j] + // ASCII lowercase conversion + if sc >= 'A' && sc <= 'Z' { + sc += 'a' - 'A' + } + if pc >= 'A' && pc <= 'Z' { + pc += 'a' - 'A' + } + if sc != pc { + match = false + break + } + } + if match { + return i + } + } + return -1 +} + // IsValid returns true if the status is a known valid status. func (s WorkTaskStatus) IsValid() bool { switch s { @@ -31,12 +163,13 @@ const ( WorkTaskTypeTest WorkTaskType = "test" WorkTaskTypeDeploy WorkTaskType = "deploy" WorkTaskTypeCustom WorkTaskType = "custom" + WorkTaskTypeVerify WorkTaskType = "verify" ) // IsValid returns true if the task type is a known valid type. func (t WorkTaskType) IsValid() bool { switch t { - case WorkTaskTypeBuild, WorkTaskTypeTest, WorkTaskTypeDeploy, WorkTaskTypeCustom: + case WorkTaskTypeBuild, WorkTaskTypeTest, WorkTaskTypeDeploy, WorkTaskTypeCustom, WorkTaskTypeVerify: return true } return false @@ -86,6 +219,10 @@ type WorkTask struct { // Error contains the error message (if failed). Error string + // ErrorCode categorizes the failure type for programmatic handling. + // Only set when Status is WorkTaskStatusFailed. + ErrorCode WorkErrorCode + // RetryCount is the number of retry attempts. RetryCount int diff --git a/internal/domain/work_test.go b/internal/domain/work_test.go new file mode 100644 index 0000000..cdafad2 --- /dev/null +++ b/internal/domain/work_test.go @@ -0,0 +1,103 @@ +package domain + +import "testing" + +func TestClassifyAgentError(t *testing.T) { + tests := []struct { + name string + errMsg string + stderr string + expected WorkErrorCode + }{ + { + name: "rate limit in stderr", + errMsg: "command failed", + stderr: "You've hit your limit ยท resets 7am (UTC)", + expected: WorkErrorCodeRateLimited, + }, + { + name: "rate limit in error message", + errMsg: "rate limit exceeded, try again later", + stderr: "", + expected: WorkErrorCodeRateLimited, + }, + { + name: "quota exceeded", + errMsg: "Quota exceeded for today", + stderr: "", + expected: WorkErrorCodeRateLimited, + }, + { + name: "auth failed - not authenticated", + errMsg: "not authenticated, please log in", + stderr: "", + expected: WorkErrorCodeAuthFailed, + }, + { + name: "auth failed - invalid api key", + errMsg: "Invalid API key provided", + stderr: "", + expected: WorkErrorCodeAuthFailed, + }, + { + name: "auth failed - claude login hint", + errMsg: "", + stderr: "Run claude login to authenticate", + expected: WorkErrorCodeAuthFailed, + }, + { + name: "context timeout", + errMsg: "context deadline exceeded", + stderr: "", + expected: WorkErrorCodeTimeout, + }, + { + name: "operation timed out", + errMsg: "operation timed out after 10 minutes", + stderr: "", + expected: WorkErrorCodeTimeout, + }, + { + name: "generic error", + errMsg: "something went wrong", + stderr: "error: file not found", + expected: WorkErrorCodeAgentError, + }, + { + name: "empty error", + errMsg: "", + stderr: "", + expected: WorkErrorCodeAgentError, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := ClassifyAgentError(tt.errMsg, tt.stderr) + if got != tt.expected { + t.Errorf("ClassifyAgentError(%q, %q) = %q, want %q", + tt.errMsg, tt.stderr, got, tt.expected) + } + }) + } +} + +func TestWorkErrorCode_Constants(t *testing.T) { + // Ensure constants are defined with expected values + codes := map[WorkErrorCode]string{ + WorkErrorCodeNone: "", + WorkErrorCodeRateLimited: "RATE_LIMITED", + WorkErrorCodeAuthFailed: "AUTH_FAILED", + WorkErrorCodeTimeout: "TIMEOUT", + WorkErrorCodeStaleWorker: "STALE_WORKER", + WorkErrorCodeAgentError: "AGENT_ERROR", + WorkErrorCodeInvalidSpec: "INVALID_SPEC", + } + + for code, expected := range codes { + if string(code) != expected { + t.Errorf("WorkErrorCode constant %q has value %q, want %q", + expected, string(code), expected) + } + } +} diff --git a/internal/handlers/builds.go b/internal/handlers/builds.go index d5e7068..27676ab 100644 --- a/internal/handlers/builds.go +++ b/internal/handlers/builds.go @@ -82,6 +82,7 @@ type BuildResultDTO struct { Success bool `json:"success"` Output string `json:"output,omitempty"` Error string `json:"error,omitempty"` + ErrorCode string `json:"error_code,omitempty"` // Categorized error type for programmatic handling CommitSHA string `json:"commit_sha,omitempty"` FilesChanged []string `json:"files_changed,omitempty"` DurationMs int64 `json:"duration_ms"` @@ -112,6 +113,7 @@ func toBuildAuditDTO(e *domain.BuildAuditEntry) *BuildAuditDTO { Success: e.Result.Success, Output: e.Result.Output, Error: e.Result.Error, + ErrorCode: string(e.Result.ErrorCode), CommitSHA: e.Result.CommitSHA, FilesChanged: e.Result.FilesChanged, DurationMs: e.Result.DurationMs, diff --git a/internal/handlers/work_test.go b/internal/handlers/work_test.go index 7bcb756..af82c20 100644 --- a/internal/handlers/work_test.go +++ b/internal/handlers/work_test.go @@ -70,6 +70,10 @@ func (m *mockWorkQueue) Complete(ctx context.Context, taskID string, result *dom } func (m *mockWorkQueue) Fail(ctx context.Context, taskID string, errMsg string) error { + return m.FailWithCode(ctx, taskID, errMsg, domain.WorkErrorCodeNone) +} + +func (m *mockWorkQueue) FailWithCode(ctx context.Context, taskID string, errMsg string, code domain.WorkErrorCode) error { if m.err != nil { return m.err } @@ -84,6 +88,7 @@ func (m *mockWorkQueue) Fail(ctx context.Context, taskID string, errMsg string) } else { task.Status = domain.WorkTaskStatusFailed task.Error = errMsg + task.ErrorCode = code now := time.Now() task.CompletedAt = &now } diff --git a/internal/port/work_queue.go b/internal/port/work_queue.go index 6fb5f44..e5f656a 100644 --- a/internal/port/work_queue.go +++ b/internal/port/work_queue.go @@ -28,6 +28,11 @@ type WorkQueue interface { // If retry_count < max_retries, the task will be re-queued as pending. Fail(ctx context.Context, taskID string, errMsg string) error + // FailWithCode marks a task as failed with an error message and categorized error code. + // The error code enables clients to distinguish failure types (rate limit, auth, timeout). + // If retry_count < max_retries, the task will be re-queued as pending (error_code cleared). + FailWithCode(ctx context.Context, taskID string, errMsg string, code domain.WorkErrorCode) error + // Cancel marks a pending task as cancelled. // Returns an error if the task is not in pending status. Cancel(ctx context.Context, taskID string) error diff --git a/internal/service/mock_test.go b/internal/service/mock_test.go index 4a394ec..352a3bf 100644 --- a/internal/service/mock_test.go +++ b/internal/service/mock_test.go @@ -62,6 +62,20 @@ func (m *mockWorkQueue) Complete(ctx context.Context, taskID string, result *dom } func (m *mockWorkQueue) Fail(ctx context.Context, taskID string, errMsg string) error { + return m.FailWithCode(ctx, taskID, errMsg, domain.WorkErrorCodeNone) +} + +func (m *mockWorkQueue) FailWithCode(ctx context.Context, taskID string, errMsg string, code domain.WorkErrorCode) error { + if m.err != nil { + return m.err + } + task, ok := m.tasks[taskID] + if !ok { + return domain.ErrWorkTaskNotFound + } + task.Status = domain.WorkTaskStatusFailed + task.Error = errMsg + task.ErrorCode = code return nil } diff --git a/internal/service/work_service.go b/internal/service/work_service.go index e1a31b6..d69f028 100644 --- a/internal/service/work_service.go +++ b/internal/service/work_service.go @@ -136,13 +136,19 @@ func (s *WorkService) CompleteTask(ctx context.Context, taskID string, result *d // FailTask marks a task as failed. func (s *WorkService) FailTask(ctx context.Context, taskID string, errMsg string) error { + return s.FailTaskWithCode(ctx, taskID, errMsg, domain.WorkErrorCodeNone) +} + +// FailTaskWithCode marks a task as failed with a categorized error code. +// The error code enables clients to distinguish failure types and respond appropriately. +func (s *WorkService) FailTaskWithCode(ctx context.Context, taskID string, errMsg string, code domain.WorkErrorCode) error { // Get task for callback URL before failing task, err := s.queue.GetTask(ctx, taskID) if err != nil { return fmt.Errorf("get task: %w", err) } - if err := s.queue.Fail(ctx, taskID, errMsg); err != nil { + if err := s.queue.FailWithCode(ctx, taskID, errMsg, code); err != nil { return fmt.Errorf("fail task: %w", err) } @@ -154,6 +160,7 @@ func (s *WorkService) FailTask(ctx context.Context, taskID string, errMsg string "project", task.ProjectID, "type", task.Type, "error", errMsg, + "error_code", code, "retry_count", task.RetryCount, ) diff --git a/internal/worker/mock_test.go b/internal/worker/mock_test.go index 1b8970c..0a59b93 100644 --- a/internal/worker/mock_test.go +++ b/internal/worker/mock_test.go @@ -72,6 +72,10 @@ func (m *mockWorkQueue) Complete(_ context.Context, taskID string, result *domai } func (m *mockWorkQueue) Fail(_ context.Context, taskID string, errMsg string) error { + return m.FailWithCode(context.Background(), taskID, errMsg, domain.WorkErrorCodeNone) +} + +func (m *mockWorkQueue) FailWithCode(_ context.Context, taskID string, errMsg string, code domain.WorkErrorCode) error { m.mu.Lock() defer m.mu.Unlock() task, ok := m.tasks[taskID] @@ -82,6 +86,7 @@ func (m *mockWorkQueue) Fail(_ context.Context, taskID string, errMsg string) er if task.RetryCount >= task.MaxRetries { task.Status = domain.WorkTaskStatusFailed task.Error = errMsg + task.ErrorCode = code } else { task.Status = domain.WorkTaskStatusPending task.WorkerID = "" @@ -293,6 +298,51 @@ func (m *mockCodeAgentRegistry) AvailableAgents(_ context.Context) []port.CodeAg } func (m *mockCodeAgentRegistry) Count() int { return 1 } +// ============================================================================= +// Mock CommandExecutor for verify tests +// ============================================================================= + +type mockCommandExecutor struct { + result *domain.CommandResult + err error + output []domain.OutputLine + podExists bool + podExistErr error +} + +func newMockCommandExecutor() *mockCommandExecutor { + return &mockCommandExecutor{ + result: &domain.CommandResult{ + ExitCode: 0, + DurationMs: 100, + }, + podExists: true, + } +} + +func (m *mockCommandExecutor) Execute(_ context.Context, _ *domain.Command, _ string, handler domain.OutputHandler) (*domain.CommandResult, error) { + if m.err != nil { + return nil, m.err + } + // Deliver output lines to handler + for _, line := range m.output { + handler(line) + } + return m.result, nil +} + +func (m *mockCommandExecutor) Cancel(_ context.Context, _ domain.CommandID) error { + return nil +} + +func (m *mockCommandExecutor) PodExists(_ context.Context, _ string) (bool, error) { + return m.podExists, m.podExistErr +} + +func (m *mockCommandExecutor) CheckConnection(_ context.Context) error { + return nil +} + // ============================================================================= // Helper to build test dependencies // ============================================================================= diff --git a/internal/worker/queue_maintenance_test.go b/internal/worker/queue_maintenance_test.go index d00871c..774994f 100644 --- a/internal/worker/queue_maintenance_test.go +++ b/internal/worker/queue_maintenance_test.go @@ -52,6 +52,10 @@ func (m *mockMaintenanceQueue) Fail(_ context.Context, _ string, _ string) error return nil } +func (m *mockMaintenanceQueue) FailWithCode(_ context.Context, _ string, _ string, _ domain.WorkErrorCode) error { + return nil +} + func (m *mockMaintenanceQueue) Cancel(_ context.Context, _ string) error { return nil } diff --git a/internal/worker/work_executor.go b/internal/worker/work_executor.go index 5926d9c..fcc65be 100644 --- a/internal/worker/work_executor.go +++ b/internal/worker/work_executor.go @@ -19,10 +19,11 @@ import ( // and executes them via task-type-specific handlers. It self-registers as // a worker, sends heartbeats, and reports results. type WorkExecutor struct { - workerSvc *service.WorkerService - workSvc *service.WorkService - buildExec *BuildExecutor - logger *slog.Logger + workerSvc *service.WorkerService + workSvc *service.WorkService + buildExec *BuildExecutor + verifyExec *VerifyExecutor + logger *slog.Logger workerID string hostname string @@ -84,6 +85,7 @@ func NewWorkExecutor( workerSvc *service.WorkerService, workSvc *service.WorkService, buildExec *BuildExecutor, + verifyExec *VerifyExecutor, cfg *WorkExecutorConfig, ) *WorkExecutor { if cfg == nil { @@ -111,6 +113,7 @@ func NewWorkExecutor( workerSvc: workerSvc, workSvc: workSvc, buildExec: buildExec, + verifyExec: verifyExec, logger: cfg.Logger.With("component", "work-executor"), workerID: cfg.WorkerID, hostname: hostname, @@ -262,7 +265,11 @@ func (e *WorkExecutor) tryClaimAndExecute() { if errMsg == "" { errMsg = "execution failed" } - if err := e.workSvc.FailTask(e.ctx, task.ID, errMsg); err != nil { + + // Classify the error to enable appropriate client handling + errorCode := domain.ClassifyAgentError(errMsg, result.Output) + + if err := e.workSvc.FailTaskWithCode(e.ctx, task.ID, errMsg, errorCode); err != nil { e.logger.Error("failed to record task failure", "task_id", task.ID, "error", err, @@ -280,6 +287,14 @@ func (e *WorkExecutor) executeTask(ctx context.Context, task *domain.WorkTask) * switch task.Type { case domain.WorkTaskTypeBuild: return e.buildExec.Execute(ctx, task) + case domain.WorkTaskTypeVerify: + if e.verifyExec == nil { + return &domain.BuildResult{ + Success: false, + Error: "verify executor not configured", + } + } + return e.verifyExec.Execute(ctx, task) default: return &domain.BuildResult{ Success: false, diff --git a/internal/worker/work_executor_test.go b/internal/worker/work_executor_test.go index c8dc5d7..1fb56bd 100644 --- a/internal/worker/work_executor_test.go +++ b/internal/worker/work_executor_test.go @@ -21,7 +21,7 @@ func testLogger() *slog.Logger { func TestWorkExecutor_StartAndStop(t *testing.T) { deps := newTestDeps() - executor := NewWorkExecutor(deps.workerSvc, deps.workSvc, deps.buildExec, &WorkExecutorConfig{ + executor := NewWorkExecutor(deps.workerSvc, deps.workSvc, deps.buildExec, nil, &WorkExecutorConfig{ WorkerID: "test-worker-1", PollPeriod: 100 * time.Millisecond, HeartbeatPeriod: 100 * time.Millisecond, @@ -75,7 +75,7 @@ func TestWorkExecutor_ClaimsAndExecutesTask(t *testing.T) { } deps.queue.mu.Unlock() - executor := NewWorkExecutor(deps.workerSvc, deps.workSvc, deps.buildExec, &WorkExecutorConfig{ + executor := NewWorkExecutor(deps.workerSvc, deps.workSvc, deps.buildExec, nil, &WorkExecutorConfig{ WorkerID: "test-worker-2", PollPeriod: 50 * time.Millisecond, HeartbeatPeriod: 5 * time.Second, @@ -118,7 +118,7 @@ func TestWorkExecutor_FailsTaskOnAgentError(t *testing.T) { } deps.queue.mu.Unlock() - executor := NewWorkExecutor(deps.workerSvc, deps.workSvc, deps.buildExec, &WorkExecutorConfig{ + executor := NewWorkExecutor(deps.workerSvc, deps.workSvc, deps.buildExec, nil, &WorkExecutorConfig{ WorkerID: "test-worker-3", PollPeriod: 50 * time.Millisecond, HeartbeatPeriod: 5 * time.Second, @@ -164,7 +164,7 @@ func TestWorkExecutor_UnsupportedTaskType(t *testing.T) { } deps.queue.mu.Unlock() - executor := NewWorkExecutor(deps.workerSvc, deps.workSvc, deps.buildExec, &WorkExecutorConfig{ + executor := NewWorkExecutor(deps.workerSvc, deps.workSvc, deps.buildExec, nil, &WorkExecutorConfig{ WorkerID: "test-worker-4", PollPeriod: 50 * time.Millisecond, HeartbeatPeriod: 5 * time.Second,