// Package queue provides a SQL-compatible job queue for async processing. // Works with PostgreSQL and CockroachDB. // // This package implements a reliable producer/consumer pattern using: // - Atomic dequeue with FOR UPDATE SKIP LOCKED // - Automatic retry (immediate requeue up to max_retries) // - Job priority and ordering // - Stale job recovery via RequeueStale // // Usage: // // // Run migrations (idempotent, call from service and worker) // queue.RunMigrations(ctx, pool) // // // Producer: enqueue a job // producer := queue.NewQueue(pool.DB, logger) // jobID, err := producer.Enqueue(ctx, "send_email", map[string]any{ // "to": "user@example.com", // "subject": "Welcome!", // }) // // // Consumer: process jobs // consumer := queue.NewQueue(pool.DB, logger) // job, err := consumer.Dequeue(ctx, "worker-1") // if err == queue.ErrNoJob { // // Queue is empty // } // // ... process job ... // consumer.Ack(ctx, job.ID) package queue import ( "context" "errors" "time" ) // Job represents an async job in the queue. type Job struct { ID string `json:"id" db:"id"` Type string `json:"type" db:"job_type"` Payload map[string]any `json:"payload" db:"payload"` Status JobStatus `json:"status" db:"status"` Priority int `json:"priority" db:"priority"` CreatedAt time.Time `json:"created_at" db:"created_at"` StartedAt *time.Time `json:"started_at,omitempty" db:"started_at"` CompletedAt *time.Time `json:"completed_at,omitempty" db:"completed_at"` RetryCount int `json:"retry_count" db:"retry_count"` MaxRetries int `json:"max_retries" db:"max_retries"` Error string `json:"error,omitempty" db:"error"` WorkerID string `json:"worker_id,omitempty" db:"worker_id"` } // JobStatus represents the current state of a job. type JobStatus string const ( StatusPending JobStatus = "pending" StatusRunning JobStatus = "running" StatusCompleted JobStatus = "completed" StatusFailed JobStatus = "failed" ) // String returns the string representation of the status. func (s JobStatus) String() string { return string(s) } // Producer enqueues jobs for async processing. type Producer interface { // Enqueue adds a job to the queue with default options. // Returns the job ID on success. Enqueue(ctx context.Context, jobType string, payload map[string]any) (jobID string, err error) // EnqueueWithOptions adds a job with custom priority, retries, etc. // The job's ID, Status, and CreatedAt are set automatically. EnqueueWithOptions(ctx context.Context, job Job) (jobID string, err error) } // Consumer dequeues and processes jobs. type Consumer interface { // Dequeue atomically claims the next pending job. // Returns ErrNoJob if the queue is empty. Dequeue(ctx context.Context, workerID string) (*Job, error) // Ack marks a job as successfully completed. Ack(ctx context.Context, jobID string) error // Fail marks a job as failed. If retries remain, requeues automatically. Fail(ctx context.Context, jobID string, errMsg string) error // Heartbeat extends the job's visibility timeout. // Call periodically for long-running jobs to prevent requeue. Heartbeat(ctx context.Context, jobID string) error } // Queue combines Producer and Consumer for convenience. type Queue interface { Producer Consumer } // JobReader provides read-only access to job status. // Used by handlers to expose job status via API without requiring full queue access. type JobReader interface { // GetJob returns a job by ID. Returns ErrJobNotFound if the job doesn't exist. GetJob(ctx context.Context, jobID string) (*Job, error) } // Handler processes a single job. // Return nil for success, error for failure (triggers retry if attempts remain). type Handler func(ctx context.Context, job *Job) error // Sentinel errors. var ( // ErrNoJob is returned when the queue has no pending jobs. ErrNoJob = errors.New("no job available") // ErrJobNotFound is returned when a job ID doesn't exist. ErrJobNotFound = errors.New("job not found") )