sp2-verify-1770321468/pkg/queue/postgres.go
jordan badbc31192
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
ci/woodpecker/manual/woodpecker Pipeline was successful
Initialize project from skeleton template
2026-02-05 19:57:50 +00:00

308 lines
8.5 KiB
Go

package queue
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
"git.threesix.ai/jordan/sp2-verify-1770321468/pkg/logging"
)
// PostgresQueue implements Producer and Consumer using PostgreSQL.
// Uses FOR UPDATE SKIP LOCKED for atomic, non-blocking dequeue.
type PostgresQueue struct {
db *sqlx.DB
logger *logging.Logger
}
// Ensure PostgresQueue implements Queue at compile time.
var _ Queue = (*PostgresQueue)(nil)
// NewPostgresQueue creates a queue backed by PostgreSQL.
func NewPostgresQueue(db *sqlx.DB, logger *logging.Logger) *PostgresQueue {
return &PostgresQueue{
db: db,
logger: logger.WithComponent("queue"),
}
}
// Enqueue adds a job to the queue with default options.
func (q *PostgresQueue) Enqueue(ctx context.Context, jobType string, payload map[string]any) (string, error) {
return q.EnqueueWithOptions(ctx, Job{
Type: jobType,
Payload: payload,
Priority: 0,
MaxRetries: 3,
})
}
// EnqueueWithOptions adds a job with custom configuration.
func (q *PostgresQueue) EnqueueWithOptions(ctx context.Context, job Job) (string, error) {
// Validate required fields
if job.Type == "" {
return "", fmt.Errorf("job type is required: %w", ErrJobNotFound)
}
job.ID = uuid.New().String()
job.Status = StatusPending
job.CreatedAt = time.Now().UTC()
// Apply defaults and constraints
if job.MaxRetries == 0 {
job.MaxRetries = 3
}
if job.MaxRetries > 100 {
job.MaxRetries = 100 // Cap at reasonable limit
}
if job.Payload == nil {
job.Payload = make(map[string]any)
}
payloadJSON, err := json.Marshal(job.Payload)
if err != nil {
return "", fmt.Errorf("marshal payload: %w", err)
}
_, err = q.db.ExecContext(ctx, `
INSERT INTO jobs (id, job_type, payload, status, priority, max_retries, created_at)
VALUES ($1, $2, $3, $4, $5, $6, $7)
`, job.ID, job.Type, payloadJSON, job.Status, job.Priority, job.MaxRetries, job.CreatedAt)
if err != nil {
return "", fmt.Errorf("insert job: %w", err)
}
q.logger.Debug("job enqueued", "job_id", job.ID, "type", job.Type, "priority", job.Priority)
return job.ID, nil
}
// Dequeue atomically claims the next pending job.
// Uses UPDATE with subquery + FOR UPDATE SKIP LOCKED for atomic, non-blocking claim.
func (q *PostgresQueue) Dequeue(ctx context.Context, workerID string) (*Job, error) {
now := time.Now().UTC()
// Atomic claim: UPDATE with subquery + FOR UPDATE SKIP LOCKED
// This avoids explicit transaction management while being safe for concurrent workers.
var job jobRow
err := q.db.QueryRowxContext(ctx, `
UPDATE jobs
SET status = $1, worker_id = $2, started_at = $3
WHERE id = (
SELECT id FROM jobs
WHERE status = $4
ORDER BY priority DESC, created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING id, job_type, payload, status, priority, created_at, started_at,
completed_at, retry_count, max_retries, error, worker_id
`, StatusRunning, workerID, now, StatusPending).StructScan(&job)
if errors.Is(err, sql.ErrNoRows) {
return nil, ErrNoJob
}
if err != nil {
return nil, fmt.Errorf("dequeue job: %w", err)
}
result, err := job.toJob()
if err != nil {
return nil, fmt.Errorf("parse job: %w", err)
}
q.logger.Debug("job dequeued", "job_id", result.ID, "type", result.Type, "worker_id", workerID)
return result, nil
}
// Ack marks a job as successfully completed.
func (q *PostgresQueue) Ack(ctx context.Context, jobID string) error {
now := time.Now().UTC()
result, err := q.db.ExecContext(ctx, `
UPDATE jobs SET status = $1, completed_at = $2 WHERE id = $3
`, StatusCompleted, now, jobID)
if err != nil {
return fmt.Errorf("ack job: %w", err)
}
rows, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("check rows affected: %w", err)
}
if rows == 0 {
return ErrJobNotFound
}
q.logger.Debug("job completed", "job_id", jobID)
return nil
}
// Fail marks a job as failed, requeuing if retries remain.
// Uses atomic UPDATE to handle retry logic in a single query.
func (q *PostgresQueue) Fail(ctx context.Context, jobID string, errMsg string) error {
// Atomic: increment retry_count, check if should requeue or fail permanently.
// When retrying: clear worker_id and started_at, set status to pending.
// When exhausted: set status to failed, set completed_at.
now := time.Now().UTC()
result, err := q.db.ExecContext(ctx, `
UPDATE jobs
SET
retry_count = retry_count + 1,
error = $1,
status = CASE
WHEN retry_count + 1 >= max_retries THEN $2
ELSE $3
END,
started_at = CASE
WHEN retry_count + 1 >= max_retries THEN started_at
ELSE NULL
END,
worker_id = CASE
WHEN retry_count + 1 >= max_retries THEN worker_id
ELSE NULL
END,
completed_at = CASE
WHEN retry_count + 1 >= max_retries THEN $4
ELSE NULL
END
WHERE id = $5
`, errMsg, StatusFailed, StatusPending, now, jobID)
if err != nil {
return fmt.Errorf("fail job: %w", err)
}
rows, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("check rows affected: %w", err)
}
if rows == 0 {
return ErrJobNotFound
}
// Truncate error message to prevent log bloat (limit to 500 chars for logging)
logErrMsg := errMsg
if len(logErrMsg) > 500 {
logErrMsg = logErrMsg[:497] + "..."
}
q.logger.Debug("job failed", "job_id", jobID, "error", logErrMsg)
return nil
}
// Heartbeat extends the job's visibility timeout.
// Updates started_at to prevent RequeueStale from reclaiming the job.
func (q *PostgresQueue) Heartbeat(ctx context.Context, jobID string) error {
result, err := q.db.ExecContext(ctx, `
UPDATE jobs SET started_at = $1 WHERE id = $2 AND status = $3
`, time.Now().UTC(), jobID, StatusRunning)
if err != nil {
return fmt.Errorf("heartbeat job: %w", err)
}
rows, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("check rows affected: %w", err)
}
if rows == 0 {
return ErrJobNotFound
}
return nil
}
// RequeueStale requeues jobs that have been running too long without heartbeat.
// Call this periodically (e.g., every minute) to recover from crashed workers.
// Returns the number of jobs requeued.
func (q *PostgresQueue) RequeueStale(ctx context.Context, timeout time.Duration) (int64, error) {
cutoff := time.Now().UTC().Add(-timeout)
result, err := q.db.ExecContext(ctx, `
UPDATE jobs
SET status = $1, worker_id = NULL, started_at = NULL
WHERE status = $2 AND started_at < $3
`, StatusPending, StatusRunning, cutoff)
if err != nil {
return 0, fmt.Errorf("requeue stale jobs: %w", err)
}
count, err := result.RowsAffected()
if err != nil {
return 0, fmt.Errorf("check rows affected: %w", err)
}
if count > 0 {
q.logger.Info("requeued stale jobs", "count", count, "timeout", timeout)
}
return count, nil
}
// GetJob retrieves a job by ID (for inspection/debugging).
func (q *PostgresQueue) GetJob(ctx context.Context, jobID string) (*Job, error) {
var job jobRow
err := q.db.QueryRowxContext(ctx, `
SELECT id, job_type, payload, status, priority, created_at, started_at,
completed_at, retry_count, max_retries, error, worker_id
FROM jobs WHERE id = $1
`, jobID).StructScan(&job)
if errors.Is(err, sql.ErrNoRows) {
return nil, ErrJobNotFound
}
if err != nil {
return nil, fmt.Errorf("get job: %w", err)
}
return job.toJob()
}
// jobRow is the database representation of a Job.
// Handles nullable columns and JSON payload.
type jobRow struct {
ID string `db:"id"`
Type string `db:"job_type"`
Payload []byte `db:"payload"`
Status string `db:"status"`
Priority int `db:"priority"`
CreatedAt time.Time `db:"created_at"`
StartedAt sql.NullTime `db:"started_at"`
CompletedAt sql.NullTime `db:"completed_at"`
RetryCount int `db:"retry_count"`
MaxRetries int `db:"max_retries"`
Error sql.NullString `db:"error"`
WorkerID sql.NullString `db:"worker_id"`
}
func (r *jobRow) toJob() (*Job, error) {
job := &Job{
ID: r.ID,
Type: r.Type,
Status: JobStatus(r.Status),
Priority: r.Priority,
CreatedAt: r.CreatedAt,
RetryCount: r.RetryCount,
MaxRetries: r.MaxRetries,
}
if r.StartedAt.Valid {
job.StartedAt = &r.StartedAt.Time
}
if r.CompletedAt.Valid {
job.CompletedAt = &r.CompletedAt.Time
}
if r.Error.Valid {
job.Error = r.Error.String
}
if r.WorkerID.Valid {
job.WorkerID = r.WorkerID.String
}
if len(r.Payload) > 0 {
if err := json.Unmarshal(r.Payload, &job.Payload); err != nil {
return nil, fmt.Errorf("unmarshal payload: %w", err)
}
}
return job, nil
}