package queue import ( "context" "encoding/json" "errors" "fmt" "time" "github.com/google/uuid" "github.com/redis/go-redis/v9" "git.threesix.ai/jordan/sp4-verify-1770325799/pkg/logging" ) const ( // RedisQueueKey is the sorted set key for the job queue RedisQueueKey = "jobs:queue" // RedisJobPrefix is the prefix for job data hash keys RedisJobPrefix = "jobs:data:" // RedisRunningKey is the set of currently running job IDs RedisRunningKey = "jobs:running" ) // RedisQueue implements Producer and Consumer using Redis. // Uses sorted sets for priority ordering and lists for atomic dequeue. type RedisQueue struct { client *redis.Client logger *logging.Logger } // Ensure RedisQueue implements Queue at compile time. var _ Queue = (*RedisQueue)(nil) // NewRedisQueue creates a queue backed by Redis. func NewRedisQueue(client *redis.Client, logger *logging.Logger) *RedisQueue { return &RedisQueue{ client: client, logger: logger.WithComponent("redis-queue"), } } // Enqueue adds a job to the queue with default options. func (q *RedisQueue) Enqueue(ctx context.Context, jobType string, payload map[string]any) (string, error) { return q.EnqueueWithOptions(ctx, Job{ Type: jobType, Payload: payload, Priority: 0, MaxRetries: 3, }) } // EnqueueWithOptions adds a job with custom configuration. func (q *RedisQueue) EnqueueWithOptions(ctx context.Context, job Job) (string, error) { if job.Type == "" { return "", fmt.Errorf("job type is required") } job.ID = uuid.New().String() job.Status = StatusPending job.CreatedAt = time.Now().UTC() if job.MaxRetries == 0 { job.MaxRetries = 3 } if job.MaxRetries > 100 { job.MaxRetries = 100 } if job.Payload == nil { job.Payload = make(map[string]any) } // Serialize job to JSON data, err := json.Marshal(job) if err != nil { return "", fmt.Errorf("marshal job: %w", err) } // Use a pipeline for atomic operations pipe := q.client.Pipeline() // Store job data jobKey := RedisJobPrefix + job.ID pipe.Set(ctx, jobKey, data, 0) // Add to sorted set with score = -priority (higher priority = lower score = first out) // Secondary sort by timestamp for FIFO within same priority score := float64(-job.Priority) + float64(job.CreatedAt.UnixNano())/1e18 pipe.ZAdd(ctx, RedisQueueKey, redis.Z{ Score: score, Member: job.ID, }) _, err = pipe.Exec(ctx) if err != nil { return "", fmt.Errorf("enqueue job: %w", err) } q.logger.Debug("job enqueued", "job_id", job.ID, "type", job.Type, "priority", job.Priority) return job.ID, nil } // Dequeue atomically claims the next pending job. func (q *RedisQueue) Dequeue(ctx context.Context, workerID string) (*Job, error) { // Pop the highest priority job (lowest score) atomically result, err := q.client.ZPopMin(ctx, RedisQueueKey, 1).Result() if err != nil { return nil, fmt.Errorf("dequeue job: %w", err) } if len(result) == 0 { return nil, ErrNoJob } jobID := result[0].Member.(string) jobKey := RedisJobPrefix + jobID // Get job data data, err := q.client.Get(ctx, jobKey).Bytes() if err != nil { if errors.Is(err, redis.Nil) { return nil, ErrJobNotFound } return nil, fmt.Errorf("get job data: %w", err) } var job Job if err := json.Unmarshal(data, &job); err != nil { return nil, fmt.Errorf("unmarshal job: %w", err) } // Update job status now := time.Now().UTC() job.Status = StatusRunning job.StartedAt = &now job.WorkerID = workerID // Save updated job and add to running set updatedData, err := json.Marshal(job) if err != nil { return nil, fmt.Errorf("marshal updated job: %w", err) } pipe := q.client.Pipeline() pipe.Set(ctx, jobKey, updatedData, 0) pipe.SAdd(ctx, RedisRunningKey, jobID) _, err = pipe.Exec(ctx) if err != nil { return nil, fmt.Errorf("update job status: %w", err) } q.logger.Debug("job dequeued", "job_id", job.ID, "type", job.Type, "worker_id", workerID) return &job, nil } // Ack marks a job as successfully completed. func (q *RedisQueue) Ack(ctx context.Context, jobID string) error { jobKey := RedisJobPrefix + jobID data, err := q.client.Get(ctx, jobKey).Bytes() if err != nil { if errors.Is(err, redis.Nil) { return ErrJobNotFound } return fmt.Errorf("get job: %w", err) } var job Job if err := json.Unmarshal(data, &job); err != nil { return fmt.Errorf("unmarshal job: %w", err) } now := time.Now().UTC() job.Status = StatusCompleted job.CompletedAt = &now updatedData, err := json.Marshal(job) if err != nil { return fmt.Errorf("marshal job: %w", err) } pipe := q.client.Pipeline() pipe.Set(ctx, jobKey, updatedData, 24*time.Hour) // Keep completed jobs for 24h pipe.SRem(ctx, RedisRunningKey, jobID) _, err = pipe.Exec(ctx) if err != nil { return fmt.Errorf("ack job: %w", err) } q.logger.Debug("job completed", "job_id", jobID) return nil } // Fail marks a job as failed, requeuing if retries remain. func (q *RedisQueue) Fail(ctx context.Context, jobID string, errMsg string) error { jobKey := RedisJobPrefix + jobID data, err := q.client.Get(ctx, jobKey).Bytes() if err != nil { if errors.Is(err, redis.Nil) { return ErrJobNotFound } return fmt.Errorf("get job: %w", err) } var job Job if err := json.Unmarshal(data, &job); err != nil { return fmt.Errorf("unmarshal job: %w", err) } job.RetryCount++ job.Error = errMsg pipe := q.client.Pipeline() pipe.SRem(ctx, RedisRunningKey, jobID) if job.RetryCount >= job.MaxRetries { // Exhausted retries - mark as failed now := time.Now().UTC() job.Status = StatusFailed job.CompletedAt = &now updatedData, err := json.Marshal(job) if err != nil { return fmt.Errorf("marshal job: %w", err) } pipe.Set(ctx, jobKey, updatedData, 24*time.Hour) // Keep failed jobs for 24h } else { // Requeue for retry job.Status = StatusPending job.StartedAt = nil job.WorkerID = "" updatedData, err := json.Marshal(job) if err != nil { return fmt.Errorf("marshal job: %w", err) } pipe.Set(ctx, jobKey, updatedData, 0) // Re-add to queue with original priority score := float64(-job.Priority) + float64(job.CreatedAt.UnixNano())/1e18 pipe.ZAdd(ctx, RedisQueueKey, redis.Z{ Score: score, Member: jobID, }) } _, err = pipe.Exec(ctx) if err != nil { return fmt.Errorf("fail job: %w", err) } logErrMsg := errMsg if len(logErrMsg) > 500 { logErrMsg = logErrMsg[:497] + "..." } q.logger.Debug("job failed", "job_id", jobID, "retry_count", job.RetryCount, "max_retries", job.MaxRetries, "error", logErrMsg) return nil } // Heartbeat extends the job's visibility timeout (no-op for Redis implementation). func (q *RedisQueue) Heartbeat(ctx context.Context, jobID string) error { // For Redis, we track running jobs in a set but don't have visibility timeout. // This could be extended to use Redis EXPIRE on job keys if needed. return nil } // GetJob retrieves a job by ID (for inspection/debugging). func (q *RedisQueue) GetJob(ctx context.Context, jobID string) (*Job, error) { jobKey := RedisJobPrefix + jobID data, err := q.client.Get(ctx, jobKey).Bytes() if err != nil { if errors.Is(err, redis.Nil) { return nil, ErrJobNotFound } return nil, fmt.Errorf("get job: %w", err) } var job Job if err := json.Unmarshal(data, &job); err != nil { return nil, fmt.Errorf("unmarshal job: %w", err) } return &job, nil } // QueueLength returns the number of pending jobs. func (q *RedisQueue) QueueLength(ctx context.Context) (int64, error) { return q.client.ZCard(ctx, RedisQueueKey).Result() } // RequeueStale requeues jobs that have been running too long. func (q *RedisQueue) RequeueStale(ctx context.Context, timeout time.Duration) (int64, error) { // Get all running job IDs runningIDs, err := q.client.SMembers(ctx, RedisRunningKey).Result() if err != nil { return 0, fmt.Errorf("get running jobs: %w", err) } cutoff := time.Now().UTC().Add(-timeout) var requeued int64 for _, jobID := range runningIDs { job, err := q.GetJob(ctx, jobID) if err != nil { continue // Job may have been deleted } if job.StartedAt != nil && job.StartedAt.Before(cutoff) { // Requeue stale job job.Status = StatusPending job.StartedAt = nil job.WorkerID = "" data, err := json.Marshal(job) if err != nil { continue } pipe := q.client.Pipeline() pipe.Set(ctx, RedisJobPrefix+jobID, data, 0) pipe.SRem(ctx, RedisRunningKey, jobID) score := float64(-job.Priority) + float64(job.CreatedAt.UnixNano())/1e18 pipe.ZAdd(ctx, RedisQueueKey, redis.Z{ Score: score, Member: jobID, }) _, err = pipe.Exec(ctx) if err == nil { requeued++ } } } if requeued > 0 { q.logger.Info("requeued stale jobs", "count", requeued, "timeout", timeout) } return requeued, nil }