rdev/internal/port/work_queue.go
jordan cfba724f8a feat: add work task error classification and user-facing error codes
- Add WorkErrorCode type with RATE_LIMITED, AUTH_FAILED, TIMEOUT, STALE_WORKER, AGENT_ERROR, INVALID_SPEC
- Add ClassifyAgentError function to detect error patterns from stderr
- Add error_code column to work_queue table (migration 016)
- Add FailWithCode method to WorkQueue interface and implementations
- Update RequeueStaleWithIDs to mark permanently failed tasks with STALE_WORKER
- Add ErrorCode to BuildResult for API responses
- Update work executor to classify errors before failing tasks

This enables users to see actual failure reasons (e.g., "RATE_LIMITED") instead of
builds stuck in "running" state forever when Claude hits rate limits.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 00:07:34 -07:00

60 lines
2.6 KiB
Go

// Package port defines interfaces (ports) for external dependencies.
package port
import (
"context"
"time"
"github.com/orchard9/rdev/internal/domain"
)
// WorkQueue defines operations for the worker pool task queue.
// Unlike CommandQueue (project-specific claudebox commands), WorkQueue
// supports generic tasks that any worker in the pool can claim and execute.
type WorkQueue interface {
// Enqueue adds a task to the queue.
// Returns the task ID.
Enqueue(ctx context.Context, task *domain.WorkTask) (string, error)
// Dequeue atomically claims the next available task for a worker.
// Uses FOR UPDATE SKIP LOCKED for concurrent worker safety.
// Returns nil if no tasks are available.
Dequeue(ctx context.Context, workerID string) (*domain.WorkTask, error)
// Complete marks a task as successfully completed with results.
Complete(ctx context.Context, taskID string, result *domain.WorkResult) error
// Fail marks a task as failed with an error message.
// If retry_count < max_retries, the task will be re-queued as pending.
Fail(ctx context.Context, taskID string, errMsg string) error
// FailWithCode marks a task as failed with an error message and categorized error code.
// The error code enables clients to distinguish failure types (rate limit, auth, timeout).
// If retry_count < max_retries, the task will be re-queued as pending (error_code cleared).
FailWithCode(ctx context.Context, taskID string, errMsg string, code domain.WorkErrorCode) error
// Cancel marks a pending task as cancelled.
// Returns an error if the task is not in pending status.
Cancel(ctx context.Context, taskID string) error
// GetTask retrieves a task by ID.
GetTask(ctx context.Context, taskID string) (*domain.WorkTask, error)
// ListByProject returns tasks for a project with optional status filter and pagination.
ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error)
// GetStats returns queue statistics.
GetStats(ctx context.Context) (*domain.WorkQueueStats, error)
// CleanupOld removes completed/failed/cancelled tasks older than the specified duration.
CleanupOld(ctx context.Context, olderThan time.Duration) (int64, error)
// RequeueStale re-queues tasks that have been running longer than the timeout.
// This handles workers that crashed without reporting completion.
RequeueStale(ctx context.Context, timeout time.Duration) (int64, error)
// RequeueStaleWithIDs re-queues stale tasks and returns their IDs.
// Used when callers need to sync external state (e.g., build audit).
RequeueStaleWithIDs(ctx context.Context, timeout time.Duration) ([]string, error)
}