308 lines
8.5 KiB
Go
308 lines
8.5 KiB
Go
package queue
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/jmoiron/sqlx"
|
|
|
|
"git.threesix.ai/jordan/sp4-test-1770498663/pkg/logging"
|
|
)
|
|
|
|
// PostgresQueue implements Producer and Consumer using PostgreSQL.
|
|
// Uses FOR UPDATE SKIP LOCKED for atomic, non-blocking dequeue.
|
|
type PostgresQueue struct {
|
|
db *sqlx.DB
|
|
logger *logging.Logger
|
|
}
|
|
|
|
// Ensure PostgresQueue implements Queue at compile time.
|
|
var _ Queue = (*PostgresQueue)(nil)
|
|
|
|
// NewPostgresQueue creates a queue backed by PostgreSQL.
|
|
func NewPostgresQueue(db *sqlx.DB, logger *logging.Logger) *PostgresQueue {
|
|
return &PostgresQueue{
|
|
db: db,
|
|
logger: logger.WithComponent("queue"),
|
|
}
|
|
}
|
|
|
|
// Enqueue adds a job to the queue with default options.
|
|
func (q *PostgresQueue) Enqueue(ctx context.Context, jobType string, payload map[string]any) (string, error) {
|
|
return q.EnqueueWithOptions(ctx, Job{
|
|
Type: jobType,
|
|
Payload: payload,
|
|
Priority: 0,
|
|
MaxRetries: 3,
|
|
})
|
|
}
|
|
|
|
// EnqueueWithOptions adds a job with custom configuration.
|
|
func (q *PostgresQueue) EnqueueWithOptions(ctx context.Context, job Job) (string, error) {
|
|
// Validate required fields
|
|
if job.Type == "" {
|
|
return "", fmt.Errorf("job type is required: %w", ErrJobNotFound)
|
|
}
|
|
|
|
job.ID = uuid.New().String()
|
|
job.Status = StatusPending
|
|
job.CreatedAt = time.Now().UTC()
|
|
|
|
// Apply defaults and constraints
|
|
if job.MaxRetries == 0 {
|
|
job.MaxRetries = 3
|
|
}
|
|
if job.MaxRetries > 100 {
|
|
job.MaxRetries = 100 // Cap at reasonable limit
|
|
}
|
|
if job.Payload == nil {
|
|
job.Payload = make(map[string]any)
|
|
}
|
|
|
|
payloadJSON, err := json.Marshal(job.Payload)
|
|
if err != nil {
|
|
return "", fmt.Errorf("marshal payload: %w", err)
|
|
}
|
|
|
|
_, err = q.db.ExecContext(ctx, `
|
|
INSERT INTO jobs (id, job_type, payload, status, priority, max_retries, created_at)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
|
`, job.ID, job.Type, payloadJSON, job.Status, job.Priority, job.MaxRetries, job.CreatedAt)
|
|
if err != nil {
|
|
return "", fmt.Errorf("insert job: %w", err)
|
|
}
|
|
|
|
q.logger.Debug("job enqueued", "job_id", job.ID, "type", job.Type, "priority", job.Priority)
|
|
return job.ID, nil
|
|
}
|
|
|
|
// Dequeue atomically claims the next pending job.
|
|
// Uses UPDATE with subquery + FOR UPDATE SKIP LOCKED for atomic, non-blocking claim.
|
|
func (q *PostgresQueue) Dequeue(ctx context.Context, workerID string) (*Job, error) {
|
|
now := time.Now().UTC()
|
|
|
|
// Atomic claim: UPDATE with subquery + FOR UPDATE SKIP LOCKED
|
|
// This avoids explicit transaction management while being safe for concurrent workers.
|
|
var job jobRow
|
|
err := q.db.QueryRowxContext(ctx, `
|
|
UPDATE jobs
|
|
SET status = $1, worker_id = $2, started_at = $3
|
|
WHERE id = (
|
|
SELECT id FROM jobs
|
|
WHERE status = $4
|
|
ORDER BY priority DESC, created_at ASC
|
|
FOR UPDATE SKIP LOCKED
|
|
LIMIT 1
|
|
)
|
|
RETURNING id, job_type, payload, status, priority, created_at, started_at,
|
|
completed_at, retry_count, max_retries, error, worker_id
|
|
`, StatusRunning, workerID, now, StatusPending).StructScan(&job)
|
|
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return nil, ErrNoJob
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("dequeue job: %w", err)
|
|
}
|
|
|
|
result, err := job.toJob()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse job: %w", err)
|
|
}
|
|
|
|
q.logger.Debug("job dequeued", "job_id", result.ID, "type", result.Type, "worker_id", workerID)
|
|
return result, nil
|
|
}
|
|
|
|
// Ack marks a job as successfully completed.
|
|
func (q *PostgresQueue) Ack(ctx context.Context, jobID string) error {
|
|
now := time.Now().UTC()
|
|
result, err := q.db.ExecContext(ctx, `
|
|
UPDATE jobs SET status = $1, completed_at = $2 WHERE id = $3
|
|
`, StatusCompleted, now, jobID)
|
|
if err != nil {
|
|
return fmt.Errorf("ack job: %w", err)
|
|
}
|
|
|
|
rows, err := result.RowsAffected()
|
|
if err != nil {
|
|
return fmt.Errorf("check rows affected: %w", err)
|
|
}
|
|
if rows == 0 {
|
|
return ErrJobNotFound
|
|
}
|
|
|
|
q.logger.Debug("job completed", "job_id", jobID)
|
|
return nil
|
|
}
|
|
|
|
// Fail marks a job as failed, requeuing if retries remain.
|
|
// Uses atomic UPDATE to handle retry logic in a single query.
|
|
func (q *PostgresQueue) Fail(ctx context.Context, jobID string, errMsg string) error {
|
|
// Atomic: increment retry_count, check if should requeue or fail permanently.
|
|
// When retrying: clear worker_id and started_at, set status to pending.
|
|
// When exhausted: set status to failed, set completed_at.
|
|
now := time.Now().UTC()
|
|
result, err := q.db.ExecContext(ctx, `
|
|
UPDATE jobs
|
|
SET
|
|
retry_count = retry_count + 1,
|
|
error = $1,
|
|
status = CASE
|
|
WHEN retry_count + 1 >= max_retries THEN $2
|
|
ELSE $3
|
|
END,
|
|
started_at = CASE
|
|
WHEN retry_count + 1 >= max_retries THEN started_at
|
|
ELSE NULL
|
|
END,
|
|
worker_id = CASE
|
|
WHEN retry_count + 1 >= max_retries THEN worker_id
|
|
ELSE NULL
|
|
END,
|
|
completed_at = CASE
|
|
WHEN retry_count + 1 >= max_retries THEN $4
|
|
ELSE NULL
|
|
END
|
|
WHERE id = $5
|
|
`, errMsg, StatusFailed, StatusPending, now, jobID)
|
|
if err != nil {
|
|
return fmt.Errorf("fail job: %w", err)
|
|
}
|
|
|
|
rows, err := result.RowsAffected()
|
|
if err != nil {
|
|
return fmt.Errorf("check rows affected: %w", err)
|
|
}
|
|
if rows == 0 {
|
|
return ErrJobNotFound
|
|
}
|
|
|
|
// Truncate error message to prevent log bloat (limit to 500 chars for logging)
|
|
logErrMsg := errMsg
|
|
if len(logErrMsg) > 500 {
|
|
logErrMsg = logErrMsg[:497] + "..."
|
|
}
|
|
q.logger.Debug("job failed", "job_id", jobID, "error", logErrMsg)
|
|
return nil
|
|
}
|
|
|
|
// Heartbeat extends the job's visibility timeout.
|
|
// Updates started_at to prevent RequeueStale from reclaiming the job.
|
|
func (q *PostgresQueue) Heartbeat(ctx context.Context, jobID string) error {
|
|
result, err := q.db.ExecContext(ctx, `
|
|
UPDATE jobs SET started_at = $1 WHERE id = $2 AND status = $3
|
|
`, time.Now().UTC(), jobID, StatusRunning)
|
|
if err != nil {
|
|
return fmt.Errorf("heartbeat job: %w", err)
|
|
}
|
|
|
|
rows, err := result.RowsAffected()
|
|
if err != nil {
|
|
return fmt.Errorf("check rows affected: %w", err)
|
|
}
|
|
if rows == 0 {
|
|
return ErrJobNotFound
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// RequeueStale requeues jobs that have been running too long without heartbeat.
|
|
// Call this periodically (e.g., every minute) to recover from crashed workers.
|
|
// Returns the number of jobs requeued.
|
|
func (q *PostgresQueue) RequeueStale(ctx context.Context, timeout time.Duration) (int64, error) {
|
|
cutoff := time.Now().UTC().Add(-timeout)
|
|
result, err := q.db.ExecContext(ctx, `
|
|
UPDATE jobs
|
|
SET status = $1, worker_id = NULL, started_at = NULL
|
|
WHERE status = $2 AND started_at < $3
|
|
`, StatusPending, StatusRunning, cutoff)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("requeue stale jobs: %w", err)
|
|
}
|
|
|
|
count, err := result.RowsAffected()
|
|
if err != nil {
|
|
return 0, fmt.Errorf("check rows affected: %w", err)
|
|
}
|
|
if count > 0 {
|
|
q.logger.Info("requeued stale jobs", "count", count, "timeout", timeout)
|
|
}
|
|
return count, nil
|
|
}
|
|
|
|
// GetJob retrieves a job by ID (for inspection/debugging).
|
|
func (q *PostgresQueue) GetJob(ctx context.Context, jobID string) (*Job, error) {
|
|
var job jobRow
|
|
err := q.db.QueryRowxContext(ctx, `
|
|
SELECT id, job_type, payload, status, priority, created_at, started_at,
|
|
completed_at, retry_count, max_retries, error, worker_id
|
|
FROM jobs WHERE id = $1
|
|
`, jobID).StructScan(&job)
|
|
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return nil, ErrJobNotFound
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("get job: %w", err)
|
|
}
|
|
|
|
return job.toJob()
|
|
}
|
|
|
|
// jobRow is the database representation of a Job.
|
|
// Handles nullable columns and JSON payload.
|
|
type jobRow struct {
|
|
ID string `db:"id"`
|
|
Type string `db:"job_type"`
|
|
Payload []byte `db:"payload"`
|
|
Status string `db:"status"`
|
|
Priority int `db:"priority"`
|
|
CreatedAt time.Time `db:"created_at"`
|
|
StartedAt sql.NullTime `db:"started_at"`
|
|
CompletedAt sql.NullTime `db:"completed_at"`
|
|
RetryCount int `db:"retry_count"`
|
|
MaxRetries int `db:"max_retries"`
|
|
Error sql.NullString `db:"error"`
|
|
WorkerID sql.NullString `db:"worker_id"`
|
|
}
|
|
|
|
func (r *jobRow) toJob() (*Job, error) {
|
|
job := &Job{
|
|
ID: r.ID,
|
|
Type: r.Type,
|
|
Status: JobStatus(r.Status),
|
|
Priority: r.Priority,
|
|
CreatedAt: r.CreatedAt,
|
|
RetryCount: r.RetryCount,
|
|
MaxRetries: r.MaxRetries,
|
|
}
|
|
|
|
if r.StartedAt.Valid {
|
|
job.StartedAt = &r.StartedAt.Time
|
|
}
|
|
if r.CompletedAt.Valid {
|
|
job.CompletedAt = &r.CompletedAt.Time
|
|
}
|
|
if r.Error.Valid {
|
|
job.Error = r.Error.String
|
|
}
|
|
if r.WorkerID.Valid {
|
|
job.WorkerID = r.WorkerID.String
|
|
}
|
|
|
|
if len(r.Payload) > 0 {
|
|
if err := json.Unmarshal(r.Payload, &job.Payload); err != nil {
|
|
return nil, fmt.Errorf("unmarshal payload: %w", err)
|
|
}
|
|
}
|
|
|
|
return job, nil
|
|
}
|