slack-worker-1770281299/pkg/queue/queue.go
jordan a02c18810a
All checks were successful
ci/woodpecker/manual/woodpecker Pipeline was successful
ci/woodpecker/push/woodpecker Pipeline was successful
Initialize project from skeleton template
2026-02-05 08:48:20 +00:00

111 lines
3.6 KiB
Go

// Package queue provides a PostgreSQL-backed job queue for async processing.
//
// This package implements a reliable producer/consumer pattern using:
// - Atomic dequeue with FOR UPDATE SKIP LOCKED
// - Automatic retry (immediate requeue up to max_retries)
// - Job priority and ordering
// - Stale job recovery via RequeueStale
//
// Usage:
//
// // Producer: enqueue a job
// producer := queue.NewPostgresQueue(pool.DB, logger)
// jobID, err := producer.Enqueue(ctx, "send_email", map[string]any{
// "to": "user@example.com",
// "subject": "Welcome!",
// })
//
// // Consumer: process jobs
// consumer := queue.NewPostgresQueue(pool.DB, logger)
// job, err := consumer.Dequeue(ctx, "worker-1")
// if err == queue.ErrNoJob {
// // Queue is empty
// }
// // ... process job ...
// consumer.Ack(ctx, job.ID)
package queue
import (
"context"
"errors"
"time"
)
// Job represents an async job in the queue.
type Job struct {
ID string `json:"id" db:"id"`
Type string `json:"type" db:"job_type"`
Payload map[string]any `json:"payload" db:"payload"`
Status JobStatus `json:"status" db:"status"`
Priority int `json:"priority" db:"priority"`
CreatedAt time.Time `json:"created_at" db:"created_at"`
StartedAt *time.Time `json:"started_at,omitempty" db:"started_at"`
CompletedAt *time.Time `json:"completed_at,omitempty" db:"completed_at"`
RetryCount int `json:"retry_count" db:"retry_count"`
MaxRetries int `json:"max_retries" db:"max_retries"`
Error string `json:"error,omitempty" db:"error"`
WorkerID string `json:"worker_id,omitempty" db:"worker_id"`
}
// JobStatus represents the current state of a job.
type JobStatus string
const (
StatusPending JobStatus = "pending"
StatusRunning JobStatus = "running"
StatusCompleted JobStatus = "completed"
StatusFailed JobStatus = "failed"
)
// String returns the string representation of the status.
func (s JobStatus) String() string {
return string(s)
}
// Producer enqueues jobs for async processing.
type Producer interface {
// Enqueue adds a job to the queue with default options.
// Returns the job ID on success.
Enqueue(ctx context.Context, jobType string, payload map[string]any) (jobID string, err error)
// EnqueueWithOptions adds a job with custom priority, retries, etc.
// The job's ID, Status, and CreatedAt are set automatically.
EnqueueWithOptions(ctx context.Context, job Job) (jobID string, err error)
}
// Consumer dequeues and processes jobs.
type Consumer interface {
// Dequeue atomically claims the next pending job.
// Returns ErrNoJob if the queue is empty.
Dequeue(ctx context.Context, workerID string) (*Job, error)
// Ack marks a job as successfully completed.
Ack(ctx context.Context, jobID string) error
// Fail marks a job as failed. If retries remain, requeues automatically.
Fail(ctx context.Context, jobID string, errMsg string) error
// Heartbeat extends the job's visibility timeout.
// Call periodically for long-running jobs to prevent requeue.
Heartbeat(ctx context.Context, jobID string) error
}
// Queue combines Producer and Consumer for convenience.
type Queue interface {
Producer
Consumer
}
// Handler processes a single job.
// Return nil for success, error for failure (triggers retry if attempts remain).
type Handler func(ctx context.Context, job *Job) error
// Sentinel errors.
var (
// ErrNoJob is returned when the queue has no pending jobs.
ErrNoJob = errors.New("no job available")
// ErrJobNotFound is returned when a job ID doesn't exist.
ErrJobNotFound = errors.New("job not found")
)