sp2-verify-1770324794/pkg/redisqueue/queue.go
rdev-worker 154c535204
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
build: /implement-feature async-jobs --requirements 'API: POST /jobs pushes ...
2026-02-05 21:04:47 +00:00

199 lines
4.8 KiB
Go

package redisqueue
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
"git.threesix.ai/jordan/sp2-verify-1770324794/pkg/logging"
)
const (
// queueKey is the Redis list key for pending job IDs.
queueKey = "jobs:queue"
// jobKeyPrefix is the prefix for job data keys.
jobKeyPrefix = "jobs:data:"
// defaultJobTTL is the TTL for completed/failed jobs.
defaultJobTTL = 24 * time.Hour
)
// RedisQueue implements a job queue backed by Redis.
type RedisQueue struct {
client *redis.Client
logger *logging.Logger
}
// NewRedisQueue creates a new Redis-backed job queue.
func NewRedisQueue(client *redis.Client, logger *logging.Logger) *RedisQueue {
return &RedisQueue{
client: client,
logger: logger.WithComponent("redisqueue"),
}
}
// Enqueue adds a job to the queue.
// The job ID is generated automatically if not set.
func (q *RedisQueue) Enqueue(ctx context.Context, jobType string, payload map[string]any) (*Job, error) {
if jobType == "" {
return nil, fmt.Errorf("job type is required")
}
job := &Job{
ID: uuid.New().String(),
Type: jobType,
Payload: payload,
Status: StatusPending,
CreatedAt: time.Now().UTC(),
}
if job.Payload == nil {
job.Payload = make(map[string]any)
}
// Store job data
data, err := json.Marshal(job)
if err != nil {
return nil, fmt.Errorf("marshal job: %w", err)
}
jobKey := jobKeyPrefix + job.ID
// Use transaction to ensure atomicity
pipe := q.client.TxPipeline()
pipe.Set(ctx, jobKey, data, 0) // No TTL for pending jobs
pipe.RPush(ctx, queueKey, job.ID)
if _, err := pipe.Exec(ctx); err != nil {
return nil, fmt.Errorf("enqueue job: %w", err)
}
q.logger.Debug("job enqueued", "job_id", job.ID, "type", job.Type)
return job, nil
}
// GetJob retrieves a job by ID.
func (q *RedisQueue) GetJob(ctx context.Context, jobID string) (*Job, error) {
jobKey := jobKeyPrefix + jobID
data, err := q.client.Get(ctx, jobKey).Bytes()
if err != nil {
if err == redis.Nil {
return nil, ErrJobNotFound
}
return nil, fmt.Errorf("get job: %w", err)
}
var job Job
if err := json.Unmarshal(data, &job); err != nil {
return nil, fmt.Errorf("unmarshal job: %w", err)
}
return &job, nil
}
// Dequeue retrieves the next pending job from the queue.
// Blocks until a job is available or the timeout expires.
// Returns ErrNoJob if timeout expires with no job available.
func (q *RedisQueue) Dequeue(ctx context.Context, timeout time.Duration) (*Job, error) {
// BLPOP blocks until a job is available or timeout
result, err := q.client.BLPop(ctx, timeout, queueKey).Result()
if err != nil {
if err == redis.Nil {
return nil, ErrNoJob
}
return nil, fmt.Errorf("dequeue job: %w", err)
}
// result[0] is the key, result[1] is the value (job ID)
if len(result) < 2 {
return nil, ErrNoJob
}
jobID := result[1]
// Get job data
job, err := q.GetJob(ctx, jobID)
if err != nil {
return nil, err
}
// Update status to running
now := time.Now().UTC()
job.Status = StatusRunning
job.StartedAt = &now
if err := q.saveJob(ctx, job); err != nil {
// Re-queue the job on save failure
q.client.LPush(ctx, queueKey, jobID)
return nil, fmt.Errorf("update job status: %w", err)
}
q.logger.Debug("job dequeued", "job_id", job.ID, "type", job.Type)
return job, nil
}
// Complete marks a job as completed.
func (q *RedisQueue) Complete(ctx context.Context, jobID string) error {
job, err := q.GetJob(ctx, jobID)
if err != nil {
return err
}
now := time.Now().UTC()
job.Status = StatusCompleted
job.CompletedAt = &now
if err := q.saveJobWithTTL(ctx, job, defaultJobTTL); err != nil {
return fmt.Errorf("complete job: %w", err)
}
q.logger.Debug("job completed", "job_id", jobID)
return nil
}
// Fail marks a job as failed with an error message.
func (q *RedisQueue) Fail(ctx context.Context, jobID string, errMsg string) error {
job, err := q.GetJob(ctx, jobID)
if err != nil {
return err
}
now := time.Now().UTC()
job.Status = StatusFailed
job.CompletedAt = &now
job.Error = errMsg
if err := q.saveJobWithTTL(ctx, job, defaultJobTTL); err != nil {
return fmt.Errorf("fail job: %w", err)
}
q.logger.Debug("job failed", "job_id", jobID, "error", errMsg)
return nil
}
// saveJob saves a job to Redis without TTL.
func (q *RedisQueue) saveJob(ctx context.Context, job *Job) error {
return q.saveJobWithTTL(ctx, job, 0)
}
// saveJobWithTTL saves a job to Redis with optional TTL.
func (q *RedisQueue) saveJobWithTTL(ctx context.Context, job *Job, ttl time.Duration) error {
data, err := json.Marshal(job)
if err != nil {
return fmt.Errorf("marshal job: %w", err)
}
jobKey := jobKeyPrefix + job.ID
return q.client.Set(ctx, jobKey, data, ttl).Err()
}
// HealthCheck verifies Redis connectivity.
func (q *RedisQueue) HealthCheck(ctx context.Context) error {
return q.client.Ping(ctx).Err()
}