// Package postgres provides PostgreSQL-based implementations of port interfaces. package postgres import ( "context" "database/sql" "encoding/json" "errors" "fmt" "github.com/orchard9/rdev/internal/domain" "github.com/orchard9/rdev/internal/port" ) // WorkQueueRepository implements port.WorkQueue using PostgreSQL. type WorkQueueRepository struct { db *sql.DB } // NewWorkQueueRepository creates a new PostgreSQL work queue repository. func NewWorkQueueRepository(db *sql.DB) *WorkQueueRepository { return &WorkQueueRepository{db: db} } // Ensure WorkQueueRepository implements port.WorkQueue at compile time. var _ port.WorkQueue = (*WorkQueueRepository)(nil) // Enqueue adds a task to the queue. func (r *WorkQueueRepository) Enqueue(ctx context.Context, task *domain.WorkTask) (string, error) { specJSON, err := json.Marshal(task.Spec) if err != nil { return "", fmt.Errorf("marshal task spec: %w", err) } var id string err = r.db.QueryRowContext(ctx, ` INSERT INTO work_queue (project_id, task_type, task_spec, priority, callback_url, max_retries) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id `, task.ProjectID, string(task.Type), specJSON, task.Priority, nullString(task.CallbackURL), task.MaxRetries).Scan(&id) if err != nil { return "", fmt.Errorf("enqueue work task: %w", err) } return id, nil } // Dequeue atomically claims the next available task for a worker. func (r *WorkQueueRepository) Dequeue(ctx context.Context, workerID string) (*domain.WorkTask, error) { // Use a single UPDATE ... RETURNING with subquery for atomic claim // This avoids explicit transaction management while still being safe var task domain.WorkTask var taskType string var specJSON []byte var status string var callbackURL sql.NullString var startedAt sql.NullTime var completedAt sql.NullTime var resultJSON []byte var errorMsg sql.NullString // Clear stale error/completed_at when claiming for execution or retry. // This prevents the API from returning confusing "running" status with stale error messages. err := r.db.QueryRowContext(ctx, ` UPDATE work_queue SET status = 'running', worker_id = $1, started_at = NOW(), error = NULL, completed_at = NULL WHERE id = ( SELECT id FROM work_queue WHERE status = 'pending' ORDER BY priority DESC, created_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED ) RETURNING id, project_id, task_type, task_spec, status, priority, worker_id, callback_url, created_at, started_at, completed_at, result, error, retry_count, max_retries `, workerID).Scan( &task.ID, &task.ProjectID, &taskType, &specJSON, &status, &task.Priority, &task.WorkerID, &callbackURL, &task.CreatedAt, &startedAt, &completedAt, &resultJSON, &errorMsg, &task.RetryCount, &task.MaxRetries, ) if errors.Is(err, sql.ErrNoRows) { return nil, nil // No pending tasks } if err != nil { return nil, fmt.Errorf("dequeue work task: %w", err) } task.Type = domain.WorkTaskType(taskType) task.Status = domain.WorkTaskStatus(status) if callbackURL.Valid { task.CallbackURL = callbackURL.String } if startedAt.Valid { task.StartedAt = &startedAt.Time } if completedAt.Valid { task.CompletedAt = &completedAt.Time } if errorMsg.Valid { task.Error = errorMsg.String } // Parse task spec if len(specJSON) > 0 { if err := json.Unmarshal(specJSON, &task.Spec); err != nil { return nil, fmt.Errorf("unmarshal task spec: %w", err) } } // Parse result if len(resultJSON) > 0 { task.Result = &domain.WorkResult{} if err := json.Unmarshal(resultJSON, task.Result); err != nil { return nil, fmt.Errorf("unmarshal task result: %w", err) } } return &task, nil } // Complete marks a task as successfully completed with results. func (r *WorkQueueRepository) Complete(ctx context.Context, taskID string, result *domain.WorkResult) error { var resultJSON []byte var err error if result != nil { resultJSON, err = json.Marshal(result) if err != nil { return fmt.Errorf("marshal result: %w", err) } } res, err := r.db.ExecContext(ctx, ` UPDATE work_queue SET status = 'completed', completed_at = NOW(), result = $1 WHERE id = $2 AND status = 'running' `, resultJSON, taskID) if err != nil { return fmt.Errorf("complete work task: %w", err) } rows, err := res.RowsAffected() if err != nil { return fmt.Errorf("rows affected: %w", err) } if rows == 0 { return domain.ErrWorkTaskNotFound } return nil } // Fail marks a task as failed with an error message. // Uses a single atomic UPDATE to avoid race conditions between SELECT and UPDATE. func (r *WorkQueueRepository) Fail(ctx context.Context, taskID string, errMsg string) error { return r.FailWithCode(ctx, taskID, errMsg, domain.WorkErrorCodeNone) } // FailWithCode marks a task as failed with an error message and categorized error code. // The error code enables clients to distinguish failure types (rate limit, auth, timeout). // If retry_count < max_retries, the task will be re-queued as pending (error_code cleared). func (r *WorkQueueRepository) FailWithCode(ctx context.Context, taskID string, errMsg string, code domain.WorkErrorCode) error { // Use a single atomic query that handles both retry and permanent failure cases. // When retrying, clear error_code so the task gets a fresh start. // Only set error_code on permanent failure. var errorCode *string if code != domain.WorkErrorCodeNone { codeStr := string(code) errorCode = &codeStr } result, err := r.db.ExecContext(ctx, ` UPDATE work_queue SET status = CASE WHEN retry_count < max_retries THEN 'pending' ELSE 'failed' END, worker_id = CASE WHEN retry_count < max_retries THEN NULL ELSE worker_id END, started_at = CASE WHEN retry_count < max_retries THEN NULL ELSE started_at END, completed_at = CASE WHEN retry_count >= max_retries THEN NOW() ELSE completed_at END, retry_count = CASE WHEN retry_count < max_retries THEN retry_count + 1 ELSE retry_count END, error = $1, error_code = CASE WHEN retry_count >= max_retries THEN $3 ELSE NULL END WHERE id = $2 `, errMsg, taskID, errorCode) if err != nil { return fmt.Errorf("fail work task: %w", err) } rows, err := result.RowsAffected() if err != nil { return fmt.Errorf("rows affected: %w", err) } if rows == 0 { return domain.ErrWorkTaskNotFound } return nil } // Cancel marks a pending task as cancelled. func (r *WorkQueueRepository) Cancel(ctx context.Context, taskID string) error { result, err := r.db.ExecContext(ctx, ` UPDATE work_queue SET status = 'cancelled', completed_at = NOW() WHERE id = $1 AND status = 'pending' `, taskID) if err != nil { return fmt.Errorf("cancel work task: %w", err) } rows, err := result.RowsAffected() if err != nil { return fmt.Errorf("rows affected: %w", err) } if rows == 0 { // Check if task exists var exists bool err := r.db.QueryRowContext(ctx, `SELECT EXISTS(SELECT 1 FROM work_queue WHERE id = $1)`, taskID).Scan(&exists) if err != nil { return fmt.Errorf("check exists: %w", err) } if !exists { return domain.ErrWorkTaskNotFound } return fmt.Errorf("task is not in pending state") } return nil }