- Add WorkErrorCode type with RATE_LIMITED, AUTH_FAILED, TIMEOUT, STALE_WORKER, AGENT_ERROR, INVALID_SPEC - Add ClassifyAgentError function to detect error patterns from stderr - Add error_code column to work_queue table (migration 016) - Add FailWithCode method to WorkQueue interface and implementations - Update RequeueStaleWithIDs to mark permanently failed tasks with STALE_WORKER - Add ErrorCode to BuildResult for API responses - Update work executor to classify errors before failing tasks This enables users to see actual failure reasons (e.g., "RATE_LIMITED") instead of builds stuck in "running" state forever when Claude hits rate limits. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
263 lines
6.9 KiB
Go
263 lines
6.9 KiB
Go
// Package postgres provides PostgreSQL-based implementations of port interfaces.
|
|
package postgres
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
|
|
"github.com/orchard9/rdev/internal/domain"
|
|
"github.com/orchard9/rdev/internal/port"
|
|
)
|
|
|
|
// WorkQueueRepository implements port.WorkQueue using PostgreSQL.
|
|
type WorkQueueRepository struct {
|
|
db *sql.DB
|
|
}
|
|
|
|
// NewWorkQueueRepository creates a new PostgreSQL work queue repository.
|
|
func NewWorkQueueRepository(db *sql.DB) *WorkQueueRepository {
|
|
return &WorkQueueRepository{db: db}
|
|
}
|
|
|
|
// Ensure WorkQueueRepository implements port.WorkQueue at compile time.
|
|
var _ port.WorkQueue = (*WorkQueueRepository)(nil)
|
|
|
|
// Enqueue adds a task to the queue.
|
|
func (r *WorkQueueRepository) Enqueue(ctx context.Context, task *domain.WorkTask) (string, error) {
|
|
specJSON, err := json.Marshal(task.Spec)
|
|
if err != nil {
|
|
return "", fmt.Errorf("marshal task spec: %w", err)
|
|
}
|
|
|
|
var id string
|
|
err = r.db.QueryRowContext(ctx, `
|
|
INSERT INTO work_queue (project_id, task_type, task_spec, priority, callback_url, max_retries)
|
|
VALUES ($1, $2, $3, $4, $5, $6)
|
|
RETURNING id
|
|
`, task.ProjectID, string(task.Type), specJSON, task.Priority, nullString(task.CallbackURL), task.MaxRetries).Scan(&id)
|
|
|
|
if err != nil {
|
|
return "", fmt.Errorf("enqueue work task: %w", err)
|
|
}
|
|
|
|
return id, nil
|
|
}
|
|
|
|
// Dequeue atomically claims the next available task for a worker.
|
|
func (r *WorkQueueRepository) Dequeue(ctx context.Context, workerID string) (*domain.WorkTask, error) {
|
|
// Use a single UPDATE ... RETURNING with subquery for atomic claim
|
|
// This avoids explicit transaction management while still being safe
|
|
var task domain.WorkTask
|
|
var taskType string
|
|
var specJSON []byte
|
|
var status string
|
|
var callbackURL sql.NullString
|
|
var startedAt sql.NullTime
|
|
var completedAt sql.NullTime
|
|
var resultJSON []byte
|
|
var errorMsg sql.NullString
|
|
|
|
err := r.db.QueryRowContext(ctx, `
|
|
UPDATE work_queue
|
|
SET status = 'running', worker_id = $1, started_at = NOW()
|
|
WHERE id = (
|
|
SELECT id FROM work_queue
|
|
WHERE status = 'pending'
|
|
ORDER BY priority DESC, created_at ASC
|
|
LIMIT 1
|
|
FOR UPDATE SKIP LOCKED
|
|
)
|
|
RETURNING id, project_id, task_type, task_spec, status, priority, worker_id,
|
|
callback_url, created_at, started_at, completed_at, result, error,
|
|
retry_count, max_retries
|
|
`, workerID).Scan(
|
|
&task.ID,
|
|
&task.ProjectID,
|
|
&taskType,
|
|
&specJSON,
|
|
&status,
|
|
&task.Priority,
|
|
&task.WorkerID,
|
|
&callbackURL,
|
|
&task.CreatedAt,
|
|
&startedAt,
|
|
&completedAt,
|
|
&resultJSON,
|
|
&errorMsg,
|
|
&task.RetryCount,
|
|
&task.MaxRetries,
|
|
)
|
|
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return nil, nil // No pending tasks
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("dequeue work task: %w", err)
|
|
}
|
|
|
|
task.Type = domain.WorkTaskType(taskType)
|
|
task.Status = domain.WorkTaskStatus(status)
|
|
|
|
if callbackURL.Valid {
|
|
task.CallbackURL = callbackURL.String
|
|
}
|
|
if startedAt.Valid {
|
|
task.StartedAt = &startedAt.Time
|
|
}
|
|
if completedAt.Valid {
|
|
task.CompletedAt = &completedAt.Time
|
|
}
|
|
if errorMsg.Valid {
|
|
task.Error = errorMsg.String
|
|
}
|
|
|
|
// Parse task spec
|
|
if len(specJSON) > 0 {
|
|
if err := json.Unmarshal(specJSON, &task.Spec); err != nil {
|
|
return nil, fmt.Errorf("unmarshal task spec: %w", err)
|
|
}
|
|
}
|
|
|
|
// Parse result
|
|
if len(resultJSON) > 0 {
|
|
task.Result = &domain.WorkResult{}
|
|
if err := json.Unmarshal(resultJSON, task.Result); err != nil {
|
|
return nil, fmt.Errorf("unmarshal task result: %w", err)
|
|
}
|
|
}
|
|
|
|
return &task, nil
|
|
}
|
|
|
|
// Complete marks a task as successfully completed with results.
|
|
func (r *WorkQueueRepository) Complete(ctx context.Context, taskID string, result *domain.WorkResult) error {
|
|
var resultJSON []byte
|
|
var err error
|
|
|
|
if result != nil {
|
|
resultJSON, err = json.Marshal(result)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal result: %w", err)
|
|
}
|
|
}
|
|
|
|
res, err := r.db.ExecContext(ctx, `
|
|
UPDATE work_queue
|
|
SET status = 'completed', completed_at = NOW(), result = $1
|
|
WHERE id = $2 AND status = 'running'
|
|
`, resultJSON, taskID)
|
|
if err != nil {
|
|
return fmt.Errorf("complete work task: %w", err)
|
|
}
|
|
|
|
rows, err := res.RowsAffected()
|
|
if err != nil {
|
|
return fmt.Errorf("rows affected: %w", err)
|
|
}
|
|
if rows == 0 {
|
|
return domain.ErrWorkTaskNotFound
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Fail marks a task as failed with an error message.
|
|
// Uses a single atomic UPDATE to avoid race conditions between SELECT and UPDATE.
|
|
func (r *WorkQueueRepository) Fail(ctx context.Context, taskID string, errMsg string) error {
|
|
return r.FailWithCode(ctx, taskID, errMsg, domain.WorkErrorCodeNone)
|
|
}
|
|
|
|
// FailWithCode marks a task as failed with an error message and categorized error code.
|
|
// The error code enables clients to distinguish failure types (rate limit, auth, timeout).
|
|
// If retry_count < max_retries, the task will be re-queued as pending (error_code cleared).
|
|
func (r *WorkQueueRepository) FailWithCode(ctx context.Context, taskID string, errMsg string, code domain.WorkErrorCode) error {
|
|
// Use a single atomic query that handles both retry and permanent failure cases.
|
|
// When retrying, clear error_code so the task gets a fresh start.
|
|
// Only set error_code on permanent failure.
|
|
var errorCode *string
|
|
if code != domain.WorkErrorCodeNone {
|
|
codeStr := string(code)
|
|
errorCode = &codeStr
|
|
}
|
|
|
|
result, err := r.db.ExecContext(ctx, `
|
|
UPDATE work_queue
|
|
SET
|
|
status = CASE
|
|
WHEN retry_count < max_retries THEN 'pending'
|
|
ELSE 'failed'
|
|
END,
|
|
worker_id = CASE
|
|
WHEN retry_count < max_retries THEN NULL
|
|
ELSE worker_id
|
|
END,
|
|
started_at = CASE
|
|
WHEN retry_count < max_retries THEN NULL
|
|
ELSE started_at
|
|
END,
|
|
completed_at = CASE
|
|
WHEN retry_count >= max_retries THEN NOW()
|
|
ELSE completed_at
|
|
END,
|
|
retry_count = CASE
|
|
WHEN retry_count < max_retries THEN retry_count + 1
|
|
ELSE retry_count
|
|
END,
|
|
error = $1,
|
|
error_code = CASE
|
|
WHEN retry_count >= max_retries THEN $3
|
|
ELSE NULL
|
|
END
|
|
WHERE id = $2
|
|
`, errMsg, taskID, errorCode)
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("fail work task: %w", err)
|
|
}
|
|
|
|
rows, err := result.RowsAffected()
|
|
if err != nil {
|
|
return fmt.Errorf("rows affected: %w", err)
|
|
}
|
|
if rows == 0 {
|
|
return domain.ErrWorkTaskNotFound
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Cancel marks a pending task as cancelled.
|
|
func (r *WorkQueueRepository) Cancel(ctx context.Context, taskID string) error {
|
|
result, err := r.db.ExecContext(ctx, `
|
|
UPDATE work_queue
|
|
SET status = 'cancelled', completed_at = NOW()
|
|
WHERE id = $1 AND status = 'pending'
|
|
`, taskID)
|
|
if err != nil {
|
|
return fmt.Errorf("cancel work task: %w", err)
|
|
}
|
|
|
|
rows, err := result.RowsAffected()
|
|
if err != nil {
|
|
return fmt.Errorf("rows affected: %w", err)
|
|
}
|
|
|
|
if rows == 0 {
|
|
// Check if task exists
|
|
var exists bool
|
|
err := r.db.QueryRowContext(ctx, `SELECT EXISTS(SELECT 1 FROM work_queue WHERE id = $1)`, taskID).Scan(&exists)
|
|
if err != nil {
|
|
return fmt.Errorf("check exists: %w", err)
|
|
}
|
|
if !exists {
|
|
return domain.ErrWorkTaskNotFound
|
|
}
|
|
return fmt.Errorf("task is not in pending state")
|
|
}
|
|
|
|
return nil
|
|
}
|