feat: add work task error classification and user-facing error codes
- Add WorkErrorCode type with RATE_LIMITED, AUTH_FAILED, TIMEOUT, STALE_WORKER, AGENT_ERROR, INVALID_SPEC - Add ClassifyAgentError function to detect error patterns from stderr - Add error_code column to work_queue table (migration 016) - Add FailWithCode method to WorkQueue interface and implementations - Update RequeueStaleWithIDs to mark permanently failed tasks with STALE_WORKER - Add ErrorCode to BuildResult for API responses - Update work executor to classify errors before failing tasks This enables users to see actual failure reasons (e.g., "RATE_LIMITED") instead of builds stuck in "running" state forever when Claude hits rate limits. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
cce4314a39
commit
cfba724f8a
@ -167,7 +167,22 @@ func (r *WorkQueueRepository) Complete(ctx context.Context, taskID string, resul
|
||||
// Fail marks a task as failed with an error message.
|
||||
// Uses a single atomic UPDATE to avoid race conditions between SELECT and UPDATE.
|
||||
func (r *WorkQueueRepository) Fail(ctx context.Context, taskID string, errMsg string) error {
|
||||
// Use a single atomic query that handles both retry and permanent failure cases
|
||||
return r.FailWithCode(ctx, taskID, errMsg, domain.WorkErrorCodeNone)
|
||||
}
|
||||
|
||||
// FailWithCode marks a task as failed with an error message and categorized error code.
|
||||
// The error code enables clients to distinguish failure types (rate limit, auth, timeout).
|
||||
// If retry_count < max_retries, the task will be re-queued as pending (error_code cleared).
|
||||
func (r *WorkQueueRepository) FailWithCode(ctx context.Context, taskID string, errMsg string, code domain.WorkErrorCode) error {
|
||||
// Use a single atomic query that handles both retry and permanent failure cases.
|
||||
// When retrying, clear error_code so the task gets a fresh start.
|
||||
// Only set error_code on permanent failure.
|
||||
var errorCode *string
|
||||
if code != domain.WorkErrorCodeNone {
|
||||
codeStr := string(code)
|
||||
errorCode = &codeStr
|
||||
}
|
||||
|
||||
result, err := r.db.ExecContext(ctx, `
|
||||
UPDATE work_queue
|
||||
SET
|
||||
@ -191,9 +206,13 @@ func (r *WorkQueueRepository) Fail(ctx context.Context, taskID string, errMsg st
|
||||
WHEN retry_count < max_retries THEN retry_count + 1
|
||||
ELSE retry_count
|
||||
END,
|
||||
error = $1
|
||||
error = $1,
|
||||
error_code = CASE
|
||||
WHEN retry_count >= max_retries THEN $3
|
||||
ELSE NULL
|
||||
END
|
||||
WHERE id = $2
|
||||
`, errMsg, taskID)
|
||||
`, errMsg, taskID, errorCode)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("fail work task: %w", err)
|
||||
|
||||
@ -23,11 +23,12 @@ func (r *WorkQueueRepository) GetTask(ctx context.Context, taskID string) (*doma
|
||||
var completedAt sql.NullTime
|
||||
var resultJSON []byte
|
||||
var errorMsg sql.NullString
|
||||
var errorCode sql.NullString
|
||||
|
||||
err := r.db.QueryRowContext(ctx, `
|
||||
SELECT id, project_id, task_type, task_spec, status, priority, worker_id,
|
||||
callback_url, created_at, started_at, completed_at, result, error,
|
||||
retry_count, max_retries
|
||||
retry_count, max_retries, error_code
|
||||
FROM work_queue
|
||||
WHERE id = $1
|
||||
`, taskID).Scan(
|
||||
@ -46,6 +47,7 @@ func (r *WorkQueueRepository) GetTask(ctx context.Context, taskID string) (*doma
|
||||
&errorMsg,
|
||||
&task.RetryCount,
|
||||
&task.MaxRetries,
|
||||
&errorCode,
|
||||
)
|
||||
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
@ -73,6 +75,9 @@ func (r *WorkQueueRepository) GetTask(ctx context.Context, taskID string) (*doma
|
||||
if errorMsg.Valid {
|
||||
task.Error = errorMsg.String
|
||||
}
|
||||
if errorCode.Valid {
|
||||
task.ErrorCode = domain.WorkErrorCode(errorCode.String)
|
||||
}
|
||||
|
||||
// Parse task spec
|
||||
if len(specJSON) > 0 {
|
||||
@ -119,7 +124,7 @@ func (r *WorkQueueRepository) ListByProject(ctx context.Context, projectID strin
|
||||
query := fmt.Sprintf(`
|
||||
SELECT id, project_id, task_type, task_spec, status, priority, worker_id,
|
||||
callback_url, created_at, started_at, completed_at, result, error,
|
||||
retry_count, max_retries
|
||||
retry_count, max_retries, error_code
|
||||
FROM work_queue
|
||||
%s
|
||||
ORDER BY created_at DESC
|
||||
@ -214,12 +219,30 @@ func (r *WorkQueueRepository) RequeueStale(ctx context.Context, timeout time.Dur
|
||||
}
|
||||
|
||||
// RequeueStaleWithIDs re-queues stale tasks and returns their IDs.
|
||||
// Tasks that have exceeded max_retries are marked as failed with STALE_WORKER error code.
|
||||
func (r *WorkQueueRepository) RequeueStaleWithIDs(ctx context.Context, timeout time.Duration) ([]string, error) {
|
||||
cutoff := time.Now().Add(-timeout)
|
||||
|
||||
// First, mark tasks that have exceeded max_retries as permanently failed
|
||||
_, err := r.db.ExecContext(ctx, `
|
||||
UPDATE work_queue
|
||||
SET status = 'failed', completed_at = NOW(),
|
||||
error = 'Worker timeout - max retries exceeded',
|
||||
error_code = 'STALE_WORKER'
|
||||
WHERE status = 'running'
|
||||
AND started_at < $1
|
||||
AND retry_count >= max_retries
|
||||
`, cutoff)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("fail stale tasks: %w", err)
|
||||
}
|
||||
|
||||
// Then, requeue tasks that can still be retried
|
||||
rows, err := r.db.QueryContext(ctx, `
|
||||
UPDATE work_queue
|
||||
SET status = 'pending', worker_id = NULL, started_at = NULL,
|
||||
retry_count = retry_count + 1, error = 'Worker timeout - task requeued'
|
||||
retry_count = retry_count + 1, error = 'Worker timeout - task requeued',
|
||||
error_code = NULL
|
||||
WHERE status = 'running'
|
||||
AND started_at < $1
|
||||
AND retry_count < max_retries
|
||||
@ -253,6 +276,7 @@ func (r *WorkQueueRepository) scanTask(rows *sql.Rows) (*domain.WorkTask, error)
|
||||
var completedAt sql.NullTime
|
||||
var resultJSON []byte
|
||||
var errorMsg sql.NullString
|
||||
var errorCode sql.NullString
|
||||
|
||||
err := rows.Scan(
|
||||
&task.ID,
|
||||
@ -270,6 +294,7 @@ func (r *WorkQueueRepository) scanTask(rows *sql.Rows) (*domain.WorkTask, error)
|
||||
&errorMsg,
|
||||
&task.RetryCount,
|
||||
&task.MaxRetries,
|
||||
&errorCode,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("scan task: %w", err)
|
||||
@ -293,6 +318,9 @@ func (r *WorkQueueRepository) scanTask(rows *sql.Rows) (*domain.WorkTask, error)
|
||||
if errorMsg.Valid {
|
||||
task.Error = errorMsg.String
|
||||
}
|
||||
if errorCode.Valid {
|
||||
task.ErrorCode = domain.WorkErrorCode(errorCode.String)
|
||||
}
|
||||
|
||||
// Parse task spec
|
||||
if len(specJSON) > 0 {
|
||||
|
||||
10
internal/db/migrations/016_work_queue_error_code.sql
Normal file
10
internal/db/migrations/016_work_queue_error_code.sql
Normal file
@ -0,0 +1,10 @@
|
||||
-- Add error_code column to work_queue for categorized failure handling.
|
||||
-- This enables clients to distinguish between different failure modes
|
||||
-- (rate limit, auth failure, timeout, stale worker) and respond appropriately.
|
||||
|
||||
ALTER TABLE work_queue ADD COLUMN IF NOT EXISTS error_code VARCHAR(50);
|
||||
|
||||
-- Index for querying tasks by error code (useful for metrics and debugging)
|
||||
CREATE INDEX IF NOT EXISTS idx_work_queue_error_code ON work_queue(error_code) WHERE error_code IS NOT NULL;
|
||||
|
||||
COMMENT ON COLUMN work_queue.error_code IS 'Categorized error type: RATE_LIMITED, AUTH_FAILED, TIMEOUT, STALE_WORKER, AGENT_ERROR, INVALID_SPEC';
|
||||
@ -101,6 +101,10 @@ type BuildResult struct {
|
||||
// Error contains the error message if the build failed.
|
||||
Error string `json:"error,omitempty"`
|
||||
|
||||
// ErrorCode categorizes the failure type for programmatic handling.
|
||||
// Values: RATE_LIMITED, AUTH_FAILED, TIMEOUT, STALE_WORKER, AGENT_ERROR, INVALID_SPEC
|
||||
ErrorCode WorkErrorCode `json:"error_code,omitempty"`
|
||||
|
||||
// CommitSHA is the git commit hash if auto-commit was enabled.
|
||||
CommitSHA string `json:"commit_sha,omitempty"`
|
||||
|
||||
|
||||
@ -13,6 +13,138 @@ const (
|
||||
WorkTaskStatusCancelled WorkTaskStatus = "cancelled"
|
||||
)
|
||||
|
||||
// WorkErrorCode represents a categorized error type for failed tasks.
|
||||
// This enables clients to distinguish between different failure modes
|
||||
// and take appropriate action (e.g., retry vs wait vs report).
|
||||
type WorkErrorCode string
|
||||
|
||||
const (
|
||||
// WorkErrorCodeNone indicates no error (task succeeded or still running).
|
||||
WorkErrorCodeNone WorkErrorCode = ""
|
||||
|
||||
// WorkErrorCodeRateLimited indicates the agent hit its rate limit.
|
||||
// Client should wait for the limit to reset before retrying.
|
||||
WorkErrorCodeRateLimited WorkErrorCode = "RATE_LIMITED"
|
||||
|
||||
// WorkErrorCodeAuthFailed indicates authentication/authorization failure.
|
||||
// Requires manual intervention to re-authenticate the agent.
|
||||
WorkErrorCodeAuthFailed WorkErrorCode = "AUTH_FAILED"
|
||||
|
||||
// WorkErrorCodeTimeout indicates the task exceeded its time limit.
|
||||
// May be retried, possibly with a longer timeout or simpler prompt.
|
||||
WorkErrorCodeTimeout WorkErrorCode = "TIMEOUT"
|
||||
|
||||
// WorkErrorCodeStaleWorker indicates the worker stopped responding.
|
||||
// The task was recovered by maintenance and can be retried.
|
||||
WorkErrorCodeStaleWorker WorkErrorCode = "STALE_WORKER"
|
||||
|
||||
// WorkErrorCodeAgentError indicates a generic agent execution error.
|
||||
// The error message contains details.
|
||||
WorkErrorCodeAgentError WorkErrorCode = "AGENT_ERROR"
|
||||
|
||||
// WorkErrorCodeInvalidSpec indicates the task specification was invalid.
|
||||
// Should not be retried without fixing the spec.
|
||||
WorkErrorCodeInvalidSpec WorkErrorCode = "INVALID_SPEC"
|
||||
)
|
||||
|
||||
// ClassifyAgentError examines an error message and stderr output to determine
|
||||
// the appropriate error code. This enables automated handling of known failure modes.
|
||||
func ClassifyAgentError(errMsg, stderr string) WorkErrorCode {
|
||||
combined := errMsg + "\n" + stderr
|
||||
|
||||
// Rate limit detection - Claude Code specific messages
|
||||
rateLimitPatterns := []string{
|
||||
"You've hit your limit",
|
||||
"rate limit",
|
||||
"Rate limit",
|
||||
"too many requests",
|
||||
"Too many requests",
|
||||
"quota exceeded",
|
||||
"Quota exceeded",
|
||||
}
|
||||
for _, pattern := range rateLimitPatterns {
|
||||
if containsIgnoreCase(combined, pattern) {
|
||||
return WorkErrorCodeRateLimited
|
||||
}
|
||||
}
|
||||
|
||||
// Authentication failure detection
|
||||
authPatterns := []string{
|
||||
"not authenticated",
|
||||
"authentication failed",
|
||||
"unauthorized",
|
||||
"Unauthorized",
|
||||
"invalid api key",
|
||||
"Invalid API key",
|
||||
"please log in",
|
||||
"Please log in",
|
||||
"claude login",
|
||||
}
|
||||
for _, pattern := range authPatterns {
|
||||
if containsIgnoreCase(combined, pattern) {
|
||||
return WorkErrorCodeAuthFailed
|
||||
}
|
||||
}
|
||||
|
||||
// Timeout detection
|
||||
timeoutPatterns := []string{
|
||||
"context deadline exceeded",
|
||||
"context canceled",
|
||||
"timeout",
|
||||
"Timeout",
|
||||
"timed out",
|
||||
}
|
||||
for _, pattern := range timeoutPatterns {
|
||||
if containsIgnoreCase(combined, pattern) {
|
||||
return WorkErrorCodeTimeout
|
||||
}
|
||||
}
|
||||
|
||||
// Default to generic agent error
|
||||
return WorkErrorCodeAgentError
|
||||
}
|
||||
|
||||
// containsIgnoreCase checks if s contains substr (case-insensitive).
|
||||
func containsIgnoreCase(s, substr string) bool {
|
||||
return len(s) >= len(substr) &&
|
||||
(s == substr ||
|
||||
len(substr) == 0 ||
|
||||
findIgnoreCase(s, substr) >= 0)
|
||||
}
|
||||
|
||||
// findIgnoreCase finds substr in s (case-insensitive), returns -1 if not found.
|
||||
func findIgnoreCase(s, substr string) int {
|
||||
if len(substr) == 0 {
|
||||
return 0
|
||||
}
|
||||
if len(s) < len(substr) {
|
||||
return -1
|
||||
}
|
||||
// Simple linear search with case-insensitive comparison
|
||||
for i := 0; i <= len(s)-len(substr); i++ {
|
||||
match := true
|
||||
for j := 0; j < len(substr); j++ {
|
||||
sc := s[i+j]
|
||||
pc := substr[j]
|
||||
// ASCII lowercase conversion
|
||||
if sc >= 'A' && sc <= 'Z' {
|
||||
sc += 'a' - 'A'
|
||||
}
|
||||
if pc >= 'A' && pc <= 'Z' {
|
||||
pc += 'a' - 'A'
|
||||
}
|
||||
if sc != pc {
|
||||
match = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if match {
|
||||
return i
|
||||
}
|
||||
}
|
||||
return -1
|
||||
}
|
||||
|
||||
// IsValid returns true if the status is a known valid status.
|
||||
func (s WorkTaskStatus) IsValid() bool {
|
||||
switch s {
|
||||
@ -31,12 +163,13 @@ const (
|
||||
WorkTaskTypeTest WorkTaskType = "test"
|
||||
WorkTaskTypeDeploy WorkTaskType = "deploy"
|
||||
WorkTaskTypeCustom WorkTaskType = "custom"
|
||||
WorkTaskTypeVerify WorkTaskType = "verify"
|
||||
)
|
||||
|
||||
// IsValid returns true if the task type is a known valid type.
|
||||
func (t WorkTaskType) IsValid() bool {
|
||||
switch t {
|
||||
case WorkTaskTypeBuild, WorkTaskTypeTest, WorkTaskTypeDeploy, WorkTaskTypeCustom:
|
||||
case WorkTaskTypeBuild, WorkTaskTypeTest, WorkTaskTypeDeploy, WorkTaskTypeCustom, WorkTaskTypeVerify:
|
||||
return true
|
||||
}
|
||||
return false
|
||||
@ -86,6 +219,10 @@ type WorkTask struct {
|
||||
// Error contains the error message (if failed).
|
||||
Error string
|
||||
|
||||
// ErrorCode categorizes the failure type for programmatic handling.
|
||||
// Only set when Status is WorkTaskStatusFailed.
|
||||
ErrorCode WorkErrorCode
|
||||
|
||||
// RetryCount is the number of retry attempts.
|
||||
RetryCount int
|
||||
|
||||
|
||||
103
internal/domain/work_test.go
Normal file
103
internal/domain/work_test.go
Normal file
@ -0,0 +1,103 @@
|
||||
package domain
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestClassifyAgentError(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
errMsg string
|
||||
stderr string
|
||||
expected WorkErrorCode
|
||||
}{
|
||||
{
|
||||
name: "rate limit in stderr",
|
||||
errMsg: "command failed",
|
||||
stderr: "You've hit your limit · resets 7am (UTC)",
|
||||
expected: WorkErrorCodeRateLimited,
|
||||
},
|
||||
{
|
||||
name: "rate limit in error message",
|
||||
errMsg: "rate limit exceeded, try again later",
|
||||
stderr: "",
|
||||
expected: WorkErrorCodeRateLimited,
|
||||
},
|
||||
{
|
||||
name: "quota exceeded",
|
||||
errMsg: "Quota exceeded for today",
|
||||
stderr: "",
|
||||
expected: WorkErrorCodeRateLimited,
|
||||
},
|
||||
{
|
||||
name: "auth failed - not authenticated",
|
||||
errMsg: "not authenticated, please log in",
|
||||
stderr: "",
|
||||
expected: WorkErrorCodeAuthFailed,
|
||||
},
|
||||
{
|
||||
name: "auth failed - invalid api key",
|
||||
errMsg: "Invalid API key provided",
|
||||
stderr: "",
|
||||
expected: WorkErrorCodeAuthFailed,
|
||||
},
|
||||
{
|
||||
name: "auth failed - claude login hint",
|
||||
errMsg: "",
|
||||
stderr: "Run claude login to authenticate",
|
||||
expected: WorkErrorCodeAuthFailed,
|
||||
},
|
||||
{
|
||||
name: "context timeout",
|
||||
errMsg: "context deadline exceeded",
|
||||
stderr: "",
|
||||
expected: WorkErrorCodeTimeout,
|
||||
},
|
||||
{
|
||||
name: "operation timed out",
|
||||
errMsg: "operation timed out after 10 minutes",
|
||||
stderr: "",
|
||||
expected: WorkErrorCodeTimeout,
|
||||
},
|
||||
{
|
||||
name: "generic error",
|
||||
errMsg: "something went wrong",
|
||||
stderr: "error: file not found",
|
||||
expected: WorkErrorCodeAgentError,
|
||||
},
|
||||
{
|
||||
name: "empty error",
|
||||
errMsg: "",
|
||||
stderr: "",
|
||||
expected: WorkErrorCodeAgentError,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got := ClassifyAgentError(tt.errMsg, tt.stderr)
|
||||
if got != tt.expected {
|
||||
t.Errorf("ClassifyAgentError(%q, %q) = %q, want %q",
|
||||
tt.errMsg, tt.stderr, got, tt.expected)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestWorkErrorCode_Constants(t *testing.T) {
|
||||
// Ensure constants are defined with expected values
|
||||
codes := map[WorkErrorCode]string{
|
||||
WorkErrorCodeNone: "",
|
||||
WorkErrorCodeRateLimited: "RATE_LIMITED",
|
||||
WorkErrorCodeAuthFailed: "AUTH_FAILED",
|
||||
WorkErrorCodeTimeout: "TIMEOUT",
|
||||
WorkErrorCodeStaleWorker: "STALE_WORKER",
|
||||
WorkErrorCodeAgentError: "AGENT_ERROR",
|
||||
WorkErrorCodeInvalidSpec: "INVALID_SPEC",
|
||||
}
|
||||
|
||||
for code, expected := range codes {
|
||||
if string(code) != expected {
|
||||
t.Errorf("WorkErrorCode constant %q has value %q, want %q",
|
||||
expected, string(code), expected)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -82,6 +82,7 @@ type BuildResultDTO struct {
|
||||
Success bool `json:"success"`
|
||||
Output string `json:"output,omitempty"`
|
||||
Error string `json:"error,omitempty"`
|
||||
ErrorCode string `json:"error_code,omitempty"` // Categorized error type for programmatic handling
|
||||
CommitSHA string `json:"commit_sha,omitempty"`
|
||||
FilesChanged []string `json:"files_changed,omitempty"`
|
||||
DurationMs int64 `json:"duration_ms"`
|
||||
@ -112,6 +113,7 @@ func toBuildAuditDTO(e *domain.BuildAuditEntry) *BuildAuditDTO {
|
||||
Success: e.Result.Success,
|
||||
Output: e.Result.Output,
|
||||
Error: e.Result.Error,
|
||||
ErrorCode: string(e.Result.ErrorCode),
|
||||
CommitSHA: e.Result.CommitSHA,
|
||||
FilesChanged: e.Result.FilesChanged,
|
||||
DurationMs: e.Result.DurationMs,
|
||||
|
||||
@ -70,6 +70,10 @@ func (m *mockWorkQueue) Complete(ctx context.Context, taskID string, result *dom
|
||||
}
|
||||
|
||||
func (m *mockWorkQueue) Fail(ctx context.Context, taskID string, errMsg string) error {
|
||||
return m.FailWithCode(ctx, taskID, errMsg, domain.WorkErrorCodeNone)
|
||||
}
|
||||
|
||||
func (m *mockWorkQueue) FailWithCode(ctx context.Context, taskID string, errMsg string, code domain.WorkErrorCode) error {
|
||||
if m.err != nil {
|
||||
return m.err
|
||||
}
|
||||
@ -84,6 +88,7 @@ func (m *mockWorkQueue) Fail(ctx context.Context, taskID string, errMsg string)
|
||||
} else {
|
||||
task.Status = domain.WorkTaskStatusFailed
|
||||
task.Error = errMsg
|
||||
task.ErrorCode = code
|
||||
now := time.Now()
|
||||
task.CompletedAt = &now
|
||||
}
|
||||
|
||||
@ -28,6 +28,11 @@ type WorkQueue interface {
|
||||
// If retry_count < max_retries, the task will be re-queued as pending.
|
||||
Fail(ctx context.Context, taskID string, errMsg string) error
|
||||
|
||||
// FailWithCode marks a task as failed with an error message and categorized error code.
|
||||
// The error code enables clients to distinguish failure types (rate limit, auth, timeout).
|
||||
// If retry_count < max_retries, the task will be re-queued as pending (error_code cleared).
|
||||
FailWithCode(ctx context.Context, taskID string, errMsg string, code domain.WorkErrorCode) error
|
||||
|
||||
// Cancel marks a pending task as cancelled.
|
||||
// Returns an error if the task is not in pending status.
|
||||
Cancel(ctx context.Context, taskID string) error
|
||||
|
||||
@ -62,6 +62,20 @@ func (m *mockWorkQueue) Complete(ctx context.Context, taskID string, result *dom
|
||||
}
|
||||
|
||||
func (m *mockWorkQueue) Fail(ctx context.Context, taskID string, errMsg string) error {
|
||||
return m.FailWithCode(ctx, taskID, errMsg, domain.WorkErrorCodeNone)
|
||||
}
|
||||
|
||||
func (m *mockWorkQueue) FailWithCode(ctx context.Context, taskID string, errMsg string, code domain.WorkErrorCode) error {
|
||||
if m.err != nil {
|
||||
return m.err
|
||||
}
|
||||
task, ok := m.tasks[taskID]
|
||||
if !ok {
|
||||
return domain.ErrWorkTaskNotFound
|
||||
}
|
||||
task.Status = domain.WorkTaskStatusFailed
|
||||
task.Error = errMsg
|
||||
task.ErrorCode = code
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -136,13 +136,19 @@ func (s *WorkService) CompleteTask(ctx context.Context, taskID string, result *d
|
||||
|
||||
// FailTask marks a task as failed.
|
||||
func (s *WorkService) FailTask(ctx context.Context, taskID string, errMsg string) error {
|
||||
return s.FailTaskWithCode(ctx, taskID, errMsg, domain.WorkErrorCodeNone)
|
||||
}
|
||||
|
||||
// FailTaskWithCode marks a task as failed with a categorized error code.
|
||||
// The error code enables clients to distinguish failure types and respond appropriately.
|
||||
func (s *WorkService) FailTaskWithCode(ctx context.Context, taskID string, errMsg string, code domain.WorkErrorCode) error {
|
||||
// Get task for callback URL before failing
|
||||
task, err := s.queue.GetTask(ctx, taskID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get task: %w", err)
|
||||
}
|
||||
|
||||
if err := s.queue.Fail(ctx, taskID, errMsg); err != nil {
|
||||
if err := s.queue.FailWithCode(ctx, taskID, errMsg, code); err != nil {
|
||||
return fmt.Errorf("fail task: %w", err)
|
||||
}
|
||||
|
||||
@ -154,6 +160,7 @@ func (s *WorkService) FailTask(ctx context.Context, taskID string, errMsg string
|
||||
"project", task.ProjectID,
|
||||
"type", task.Type,
|
||||
"error", errMsg,
|
||||
"error_code", code,
|
||||
"retry_count", task.RetryCount,
|
||||
)
|
||||
|
||||
|
||||
@ -72,6 +72,10 @@ func (m *mockWorkQueue) Complete(_ context.Context, taskID string, result *domai
|
||||
}
|
||||
|
||||
func (m *mockWorkQueue) Fail(_ context.Context, taskID string, errMsg string) error {
|
||||
return m.FailWithCode(context.Background(), taskID, errMsg, domain.WorkErrorCodeNone)
|
||||
}
|
||||
|
||||
func (m *mockWorkQueue) FailWithCode(_ context.Context, taskID string, errMsg string, code domain.WorkErrorCode) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
task, ok := m.tasks[taskID]
|
||||
@ -82,6 +86,7 @@ func (m *mockWorkQueue) Fail(_ context.Context, taskID string, errMsg string) er
|
||||
if task.RetryCount >= task.MaxRetries {
|
||||
task.Status = domain.WorkTaskStatusFailed
|
||||
task.Error = errMsg
|
||||
task.ErrorCode = code
|
||||
} else {
|
||||
task.Status = domain.WorkTaskStatusPending
|
||||
task.WorkerID = ""
|
||||
@ -293,6 +298,51 @@ func (m *mockCodeAgentRegistry) AvailableAgents(_ context.Context) []port.CodeAg
|
||||
}
|
||||
func (m *mockCodeAgentRegistry) Count() int { return 1 }
|
||||
|
||||
// =============================================================================
|
||||
// Mock CommandExecutor for verify tests
|
||||
// =============================================================================
|
||||
|
||||
type mockCommandExecutor struct {
|
||||
result *domain.CommandResult
|
||||
err error
|
||||
output []domain.OutputLine
|
||||
podExists bool
|
||||
podExistErr error
|
||||
}
|
||||
|
||||
func newMockCommandExecutor() *mockCommandExecutor {
|
||||
return &mockCommandExecutor{
|
||||
result: &domain.CommandResult{
|
||||
ExitCode: 0,
|
||||
DurationMs: 100,
|
||||
},
|
||||
podExists: true,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mockCommandExecutor) Execute(_ context.Context, _ *domain.Command, _ string, handler domain.OutputHandler) (*domain.CommandResult, error) {
|
||||
if m.err != nil {
|
||||
return nil, m.err
|
||||
}
|
||||
// Deliver output lines to handler
|
||||
for _, line := range m.output {
|
||||
handler(line)
|
||||
}
|
||||
return m.result, nil
|
||||
}
|
||||
|
||||
func (m *mockCommandExecutor) Cancel(_ context.Context, _ domain.CommandID) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockCommandExecutor) PodExists(_ context.Context, _ string) (bool, error) {
|
||||
return m.podExists, m.podExistErr
|
||||
}
|
||||
|
||||
func (m *mockCommandExecutor) CheckConnection(_ context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// Helper to build test dependencies
|
||||
// =============================================================================
|
||||
|
||||
@ -52,6 +52,10 @@ func (m *mockMaintenanceQueue) Fail(_ context.Context, _ string, _ string) error
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockMaintenanceQueue) FailWithCode(_ context.Context, _ string, _ string, _ domain.WorkErrorCode) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockMaintenanceQueue) Cancel(_ context.Context, _ string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -19,10 +19,11 @@ import (
|
||||
// and executes them via task-type-specific handlers. It self-registers as
|
||||
// a worker, sends heartbeats, and reports results.
|
||||
type WorkExecutor struct {
|
||||
workerSvc *service.WorkerService
|
||||
workSvc *service.WorkService
|
||||
buildExec *BuildExecutor
|
||||
logger *slog.Logger
|
||||
workerSvc *service.WorkerService
|
||||
workSvc *service.WorkService
|
||||
buildExec *BuildExecutor
|
||||
verifyExec *VerifyExecutor
|
||||
logger *slog.Logger
|
||||
|
||||
workerID string
|
||||
hostname string
|
||||
@ -84,6 +85,7 @@ func NewWorkExecutor(
|
||||
workerSvc *service.WorkerService,
|
||||
workSvc *service.WorkService,
|
||||
buildExec *BuildExecutor,
|
||||
verifyExec *VerifyExecutor,
|
||||
cfg *WorkExecutorConfig,
|
||||
) *WorkExecutor {
|
||||
if cfg == nil {
|
||||
@ -111,6 +113,7 @@ func NewWorkExecutor(
|
||||
workerSvc: workerSvc,
|
||||
workSvc: workSvc,
|
||||
buildExec: buildExec,
|
||||
verifyExec: verifyExec,
|
||||
logger: cfg.Logger.With("component", "work-executor"),
|
||||
workerID: cfg.WorkerID,
|
||||
hostname: hostname,
|
||||
@ -262,7 +265,11 @@ func (e *WorkExecutor) tryClaimAndExecute() {
|
||||
if errMsg == "" {
|
||||
errMsg = "execution failed"
|
||||
}
|
||||
if err := e.workSvc.FailTask(e.ctx, task.ID, errMsg); err != nil {
|
||||
|
||||
// Classify the error to enable appropriate client handling
|
||||
errorCode := domain.ClassifyAgentError(errMsg, result.Output)
|
||||
|
||||
if err := e.workSvc.FailTaskWithCode(e.ctx, task.ID, errMsg, errorCode); err != nil {
|
||||
e.logger.Error("failed to record task failure",
|
||||
"task_id", task.ID,
|
||||
"error", err,
|
||||
@ -280,6 +287,14 @@ func (e *WorkExecutor) executeTask(ctx context.Context, task *domain.WorkTask) *
|
||||
switch task.Type {
|
||||
case domain.WorkTaskTypeBuild:
|
||||
return e.buildExec.Execute(ctx, task)
|
||||
case domain.WorkTaskTypeVerify:
|
||||
if e.verifyExec == nil {
|
||||
return &domain.BuildResult{
|
||||
Success: false,
|
||||
Error: "verify executor not configured",
|
||||
}
|
||||
}
|
||||
return e.verifyExec.Execute(ctx, task)
|
||||
default:
|
||||
return &domain.BuildResult{
|
||||
Success: false,
|
||||
|
||||
@ -21,7 +21,7 @@ func testLogger() *slog.Logger {
|
||||
func TestWorkExecutor_StartAndStop(t *testing.T) {
|
||||
deps := newTestDeps()
|
||||
|
||||
executor := NewWorkExecutor(deps.workerSvc, deps.workSvc, deps.buildExec, &WorkExecutorConfig{
|
||||
executor := NewWorkExecutor(deps.workerSvc, deps.workSvc, deps.buildExec, nil, &WorkExecutorConfig{
|
||||
WorkerID: "test-worker-1",
|
||||
PollPeriod: 100 * time.Millisecond,
|
||||
HeartbeatPeriod: 100 * time.Millisecond,
|
||||
@ -75,7 +75,7 @@ func TestWorkExecutor_ClaimsAndExecutesTask(t *testing.T) {
|
||||
}
|
||||
deps.queue.mu.Unlock()
|
||||
|
||||
executor := NewWorkExecutor(deps.workerSvc, deps.workSvc, deps.buildExec, &WorkExecutorConfig{
|
||||
executor := NewWorkExecutor(deps.workerSvc, deps.workSvc, deps.buildExec, nil, &WorkExecutorConfig{
|
||||
WorkerID: "test-worker-2",
|
||||
PollPeriod: 50 * time.Millisecond,
|
||||
HeartbeatPeriod: 5 * time.Second,
|
||||
@ -118,7 +118,7 @@ func TestWorkExecutor_FailsTaskOnAgentError(t *testing.T) {
|
||||
}
|
||||
deps.queue.mu.Unlock()
|
||||
|
||||
executor := NewWorkExecutor(deps.workerSvc, deps.workSvc, deps.buildExec, &WorkExecutorConfig{
|
||||
executor := NewWorkExecutor(deps.workerSvc, deps.workSvc, deps.buildExec, nil, &WorkExecutorConfig{
|
||||
WorkerID: "test-worker-3",
|
||||
PollPeriod: 50 * time.Millisecond,
|
||||
HeartbeatPeriod: 5 * time.Second,
|
||||
@ -164,7 +164,7 @@ func TestWorkExecutor_UnsupportedTaskType(t *testing.T) {
|
||||
}
|
||||
deps.queue.mu.Unlock()
|
||||
|
||||
executor := NewWorkExecutor(deps.workerSvc, deps.workSvc, deps.buildExec, &WorkExecutorConfig{
|
||||
executor := NewWorkExecutor(deps.workerSvc, deps.workSvc, deps.buildExec, nil, &WorkExecutorConfig{
|
||||
WorkerID: "test-worker-4",
|
||||
PollPeriod: 50 * time.Millisecond,
|
||||
HeartbeatPeriod: 5 * time.Second,
|
||||
|
||||
Loading…
Reference in New Issue
Block a user