Compare commits

..

1 Commits

Author SHA1 Message Date
rdev-worker
154c535204 build: /implement-feature async-jobs --requirements 'API: POST /jobs pushes ...
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2026-02-05 21:04:47 +00:00
23 changed files with 1569 additions and 15 deletions

View File

@ -0,0 +1,4 @@
name: feature/async-jobs
feature: async-jobs
base_branch: main
created_at: 2026-02-05T20:59:15.288435201Z

36
.sdlc/config.yaml Normal file
View File

@ -0,0 +1,36 @@
version: 1
project:
name: workspace
branches:
main: main
feature_prefix: feature/
phases:
enabled:
- draft
- specified
- planned
- ready
- implementation
- review
- audit
- qa
- merge
- released
required_artifacts:
audit:
- audit
planned:
- spec
- design
- tasks
- qa_plan
qa:
- qa_results
review:
- review
specified:
- spec
compliance:
require_approvals: true
require_branch: true
require_qa: true

View File

@ -0,0 +1,151 @@
# Technical Design: Async Jobs
**Feature:** async-jobs
**Status:** approved
**Author:** Claude
**Created:** 2026-02-05
## Architecture Overview
```
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ API Service │────▶│ Redis │◀────│ Background │
│ (services/api) │ │ │ │ Worker │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
│ POST /jobs │ jobs:queue │ BLPOP
│ GET /jobs/{id} │ jobs:data:{id} │ Update status
└────────────────────────┴──────────────────────┘
```
## Component Design
### 1. Redis Job Queue Package (`pkg/redisqueue`)
A new shared package providing Redis-based job queue operations:
```go
// pkg/redisqueue/queue.go
type RedisQueue struct {
client *redis.Client
logger *logging.Logger
}
func NewRedisQueue(client *redis.Client, logger *logging.Logger) *RedisQueue
// Producer operations (for API)
func (q *RedisQueue) Enqueue(ctx context.Context, job *Job) error
func (q *RedisQueue) GetJob(ctx context.Context, jobID string) (*Job, error)
// Consumer operations (for Worker)
func (q *RedisQueue) Dequeue(ctx context.Context, timeout time.Duration) (*Job, error)
func (q *RedisQueue) UpdateStatus(ctx context.Context, jobID string, status JobStatus, err string) error
```
### 2. API Service Modifications
**New Files:**
- `services/api/internal/api/handlers/job.go` - Job HTTP handlers
- `services/api/internal/port/job.go` - JobQueue port interface
- `services/api/internal/service/job.go` - Job business logic
**Modified Files:**
- `services/api/cmd/server/main.go` - Add Redis client, job service initialization
- `services/api/internal/api/routes.go` - Register job routes
- `services/api/internal/config/config.go` - Add Redis URL config
### 3. Worker Modifications
**Modified Files:**
- `workers/background-processor/cmd/worker/main.go` - Add Redis client, job handler
- `workers/background-processor/internal/config/config.go` - Add Redis URL, work simulation config
- `workers/background-processor/internal/handlers/jobs.go` - Async job handler
## Redis Data Structure
### Queue List: `jobs:queue`
- Type: List
- Operations: RPUSH (enqueue), BLPOP (dequeue)
- Contains: Job IDs only (lightweight)
### Job Data: `jobs:data:{id}`
- Type: Hash (stored as JSON string for simplicity)
- Fields: id, type, payload, status, created_at, started_at, completed_at, error
- TTL: 24 hours after completion (configurable)
## Sequence Diagrams
### Create Job Flow
```
Client → API → JobService.Create()
├── Generate UUID
├── Create Job struct
├── SET jobs:data:{id} (JSON)
├── RPUSH jobs:queue (id only)
└── Return job with pending status
```
### Get Job Flow
```
Client → API → JobService.Get(id)
├── GET jobs:data:{id}
└── Return job or 404
```
### Worker Processing Flow
```
Worker → RedisQueue.Dequeue()
├── BLPOP jobs:queue
├── GET jobs:data:{id}
├── Update status to "running"
├── Simulate work (sleep)
└── Update status to "completed"
```
## Configuration
### API Service
```bash
REDIS_URL=redis://localhost:6379
```
### Worker
```bash
REDIS_URL=redis://localhost:6379
JOB_SIMULATION_DURATION=2s # Duration to simulate work
```
## Error Handling
| Scenario | Behavior |
|----------|----------|
| Redis connection failure | Return 503 Service Unavailable |
| Job not found | Return 404 Not Found |
| Invalid job payload | Return 400 Bad Request |
| Worker crash during processing | Job remains in "running" (future: add timeout/recovery) |
## Testing Strategy
1. **Unit Tests**: Mock Redis client, test service logic
2. **Integration Tests**: Real Redis (via testcontainers or local), test full flow
## Files to Create/Modify
### New Files
1. `pkg/redisqueue/queue.go` - Redis queue implementation
2. `pkg/redisqueue/job.go` - Job struct and status constants
3. `services/api/internal/api/handlers/job.go` - Job handlers
4. `services/api/internal/api/handlers/job_test.go` - Handler tests
5. `services/api/internal/port/job.go` - JobQueue interface
6. `services/api/internal/service/job.go` - Job service
7. `workers/background-processor/internal/handlers/jobs.go` - Job processor
### Modified Files
1. `services/api/cmd/server/main.go` - Add Redis setup
2. `services/api/internal/api/routes.go` - Add job routes
3. `services/api/internal/config/config.go` - Add Redis config
4. `workers/background-processor/cmd/worker/main.go` - Add Redis queue processing
5. `workers/background-processor/internal/config/config.go` - Add Redis config

View File

@ -0,0 +1,50 @@
slug: async-jobs
title: Async Jobs
created: 2026-02-05T20:55:41.038321309Z
branch: feature/async-jobs
phase: implementation
phase_history:
- phase: draft
entered: 2026-02-05T20:55:41.038321309Z
exited: 2026-02-05T20:58:45.493500609Z
- phase: specified
entered: 2026-02-05T20:58:45.493500609Z
exited: 2026-02-05T20:59:10.961841048Z
- phase: planned
entered: 2026-02-05T20:59:10.961841048Z
exited: 2026-02-05T20:59:19.471361712Z
- phase: ready
entered: 2026-02-05T20:59:19.471361712Z
exited: 2026-02-05T20:59:19.476689499Z
- phase: implementation
entered: 2026-02-05T20:59:19.476689499Z
artifacts:
audit:
status: pending
path: audit.md
design:
status: approved
path: design.md
approved_by: user
approved_at: 2026-02-05T20:58:34.850861259Z
qa_plan:
status: approved
path: qa-plan.md
approved_by: user
approved_at: 2026-02-05T20:59:03.604991907Z
qa_results:
status: pending
path: qa-results.md
review:
status: pending
path: review.md
spec:
status: approved
path: spec.md
approved_by: user
approved_at: 2026-02-05T20:58:34.846086132Z
tasks:
status: approved
path: tasks.md
approved_by: user
approved_at: 2026-02-05T20:58:34.855261099Z

View File

@ -0,0 +1,81 @@
# QA Plan: Async Jobs
**Feature:** async-jobs
**Status:** approved
**Author:** Claude
**Created:** 2026-02-05
## Test Scenarios
### API Tests
#### TC-001: Create Job - Success
**Endpoint:** POST /api/api/jobs
**Input:**
```json
{
"type": "test_job",
"payload": {"key": "value"}
}
```
**Expected:** 201 Created with job ID and status "pending"
#### TC-002: Create Job - Missing Type
**Endpoint:** POST /api/api/jobs
**Input:**
```json
{
"payload": {"key": "value"}
}
```
**Expected:** 400 Bad Request with validation error
#### TC-003: Get Job - Success
**Endpoint:** GET /api/api/jobs/{id}
**Precondition:** Job exists
**Expected:** 200 OK with job details
#### TC-004: Get Job - Not Found
**Endpoint:** GET /api/api/jobs/{unknown-id}
**Expected:** 404 Not Found
#### TC-005: Get Job - Invalid ID Format
**Endpoint:** GET /api/api/jobs/invalid-uuid
**Expected:** 400 Bad Request
### Worker Tests
#### TC-006: Job Processing - Success
**Steps:**
1. Create job via POST /api/api/jobs
2. Wait for worker to process
3. GET /api/api/jobs/{id}
**Expected:** Status changes: pending -> running -> completed
#### TC-007: Job Processing - Failure Handling
**Steps:**
1. Create job designed to fail
2. Wait for worker to process
3. GET /api/api/jobs/{id}
**Expected:** Status is "failed" with error message
### Integration Tests
#### TC-008: End-to-End Flow
**Steps:**
1. Start API and Worker with Redis
2. POST /api/api/jobs
3. Poll GET /api/api/jobs/{id} until completed
**Expected:** Job completes within expected time
## Test Environment
- Redis running locally or in CI
- API service running on port 8001
- Worker running and connected to same Redis
## Success Criteria
- All test cases pass
- No race conditions in concurrent job processing
- Proper error handling and status codes

View File

@ -0,0 +1,114 @@
# Feature Specification: Async Jobs
**Feature:** async-jobs
**Status:** approved
**Author:** Claude
**Created:** 2026-02-05
## Overview
Implement an async job processing system using Redis as the job queue backend. The API service exposes endpoints to create and query jobs, while the background worker processes jobs from the Redis queue.
## Requirements
### Functional Requirements
1. **POST /api/api/jobs** - Create a new async job
- Accepts JSON payload with job type and data
- Pushes job to Redis queue
- Returns job ID and initial status ("pending")
2. **GET /api/api/jobs/{id}** - Get job status
- Returns current job status and metadata
- Statuses: pending, running, completed, failed
3. **Worker Processing**
- Worker pops jobs from Redis queue (blocking pop)
- Simulates work (configurable delay)
- Updates job status in Redis upon completion/failure
### Non-Functional Requirements
- Jobs must survive API restarts (persisted in Redis)
- Multiple workers can process jobs concurrently
- Job status queries must be fast (O(1) lookup)
## API Design
### Create Job
```
POST /api/api/jobs
Content-Type: application/json
{
"type": "process_data",
"payload": {
"key": "value"
}
}
```
Response (201 Created):
```json
{
"data": {
"id": "550e8400-e29b-41d4-a716-446655440000",
"type": "process_data",
"status": "pending",
"created_at": "2026-02-05T10:00:00Z"
}
}
```
### Get Job Status
```
GET /api/api/jobs/{id}
```
Response (200 OK):
```json
{
"data": {
"id": "550e8400-e29b-41d4-a716-446655440000",
"type": "process_data",
"status": "completed",
"payload": {"key": "value"},
"created_at": "2026-02-05T10:00:00Z",
"started_at": "2026-02-05T10:00:01Z",
"completed_at": "2026-02-05T10:00:03Z"
}
}
```
## Data Model
### Job Structure (stored in Redis)
```go
type Job struct {
ID string `json:"id"`
Type string `json:"type"`
Payload map[string]any `json:"payload"`
Status string `json:"status"` // pending, running, completed, failed
CreatedAt time.Time `json:"created_at"`
StartedAt *time.Time `json:"started_at,omitempty"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
Error string `json:"error,omitempty"`
}
```
### Redis Keys
- `jobs:queue` - List for pending jobs (RPUSH/BLPOP)
- `jobs:data:{id}` - Hash for job metadata
## Acceptance Criteria
1. POST /api/api/jobs creates a job and returns 201 with job ID
2. GET /api/api/jobs/{id} returns job status
3. GET /api/api/jobs/{id} returns 404 for unknown jobs
4. Worker processes jobs from queue
5. Job status updates are visible via GET endpoint
6. Simulated work delay is configurable

View File

@ -0,0 +1,151 @@
# Implementation Tasks: Async Jobs
**Feature:** async-jobs
**Status:** approved
**Author:** Claude
**Created:** 2026-02-05
## Task List
### Task 1: Create Redis Queue Package
**ID:** task-1-redis-queue-pkg
**Status:** pending
**Blocked By:** none
Create the shared `pkg/redisqueue` package with job queue abstractions.
**Files to create:**
- `pkg/redisqueue/job.go` - Job struct and JobStatus constants
- `pkg/redisqueue/queue.go` - RedisQueue implementation
**Acceptance Criteria:**
- Job struct with ID, Type, Payload, Status, timestamps
- JobStatus constants: pending, running, completed, failed
- Enqueue: store job data + push ID to queue
- Dequeue: BLPOP queue + fetch job data + set running
- GetJob: fetch job by ID
- UpdateStatus: update job status fields
---
### Task 2: Add Redis Configuration to API Service
**ID:** task-2-api-redis-config
**Status:** pending
**Blocked By:** task-1-redis-queue-pkg
Add Redis URL configuration and client initialization to the API service.
**Files to modify:**
- `services/api/internal/config/config.go` - Add RedisURL field
- `services/api/cmd/server/main.go` - Initialize Redis client
**Acceptance Criteria:**
- Config loads REDIS_URL from environment
- Redis client created in main.go
- Graceful handling of connection errors
---
### Task 3: Implement Job Port and Service
**ID:** task-3-job-service
**Status:** pending
**Blocked By:** task-2-api-redis-config
Create the job port interface and service layer for job operations.
**Files to create:**
- `services/api/internal/port/job.go` - JobQueue interface
- `services/api/internal/service/job.go` - JobService implementation
**Acceptance Criteria:**
- JobQueue interface defines Create and Get operations
- JobService implements business logic
- Service validates input before queue operations
---
### Task 4: Implement Job HTTP Handlers
**ID:** task-4-job-handlers
**Status:** pending
**Blocked By:** task-3-job-service
Create HTTP handlers for POST /jobs and GET /jobs/{id}.
**Files to create:**
- `services/api/internal/api/handlers/job.go` - Job handlers
- `services/api/internal/api/handlers/job_test.go` - Handler tests
**Files to modify:**
- `services/api/internal/api/routes.go` - Register routes
**Acceptance Criteria:**
- POST /api/api/jobs creates job, returns 201
- GET /api/api/jobs/{id} returns job status
- GET /api/api/jobs/{id} returns 404 for unknown
- Request validation with proper error messages
- Tests cover success and error cases
---
### Task 5: Add Redis Configuration to Worker
**ID:** task-5-worker-redis-config
**Status:** pending
**Blocked By:** task-1-redis-queue-pkg
Add Redis URL and job simulation configuration to the worker.
**Files to modify:**
- `workers/background-processor/internal/config/config.go` - Add Redis and simulation config
**Acceptance Criteria:**
- Config loads REDIS_URL from environment
- Config loads JOB_SIMULATION_DURATION with default (2s)
---
### Task 6: Implement Worker Job Processing
**ID:** task-6-worker-job-processing
**Status:** pending
**Blocked By:** task-5-worker-redis-config
Implement the async job handler in the background worker.
**Files to create:**
- `workers/background-processor/internal/handlers/jobs.go` - Job processor
**Files to modify:**
- `workers/background-processor/cmd/worker/main.go` - Register job handler
**Acceptance Criteria:**
- Worker dequeues jobs from Redis
- Worker simulates work with configurable delay
- Worker updates job status to completed
- Worker handles errors and marks jobs failed
- Graceful shutdown waits for in-flight jobs
---
## Dependency Graph
```
task-1-redis-queue-pkg
├───────────────────┐
▼ ▼
task-2-api-redis-config task-5-worker-redis-config
│ │
▼ ▼
task-3-job-service task-6-worker-job-processing
task-4-job-handlers
```
## Implementation Order
1. task-1-redis-queue-pkg (no dependencies)
2. task-2-api-redis-config (depends on 1)
3. task-5-worker-redis-config (depends on 1, can run in parallel with 2)
4. task-3-job-service (depends on 2)
5. task-4-job-handlers (depends on 3)
6. task-6-worker-job-processing (depends on 5)

63
.sdlc/state.yaml Normal file
View File

@ -0,0 +1,63 @@
version: 1
project:
name: workspace
active_work:
features:
- slug: async-jobs
branch: feature/async-jobs
phase: implementation
blocked: []
last_updated: 2026-02-05T20:59:19.477322049Z
last_action: TRANSITION
last_actor: cli
history:
- timestamp: 2026-02-05T20:55:41.038713347Z
action: CREATE_FEATURE
feature: async-jobs
actor: cli
result: success
- timestamp: 2026-02-05T20:58:34.846487758Z
action: APPROVE_ARTIFACT
feature: async-jobs
actor: user
result: success
- timestamp: 2026-02-05T20:58:34.851336232Z
action: APPROVE_ARTIFACT
feature: async-jobs
actor: user
result: success
- timestamp: 2026-02-05T20:58:34.855826833Z
action: APPROVE_ARTIFACT
feature: async-jobs
actor: user
result: success
- timestamp: 2026-02-05T20:58:45.494013676Z
action: TRANSITION
feature: async-jobs
actor: cli
result: success
- timestamp: 2026-02-05T20:59:03.605701633Z
action: APPROVE_ARTIFACT
feature: async-jobs
actor: user
result: success
- timestamp: 2026-02-05T20:59:10.962347511Z
action: TRANSITION
feature: async-jobs
actor: cli
result: success
- timestamp: 2026-02-05T20:59:15.292480976Z
action: CREATE_BRANCH
feature: async-jobs
actor: cli
result: success
- timestamp: 2026-02-05T20:59:19.47199851Z
action: TRANSITION
feature: async-jobs
actor: cli
result: success
- timestamp: 2026-02-05T20:59:19.477320797Z
action: TRANSITION
feature: async-jobs
actor: cli
result: success

43
pkg/redisqueue/job.go Normal file
View File

@ -0,0 +1,43 @@
// Package redisqueue provides a Redis-backed job queue for async processing.
package redisqueue
import (
"errors"
"time"
)
// Job represents an async job in the queue.
type Job struct {
ID string `json:"id"`
Type string `json:"type"`
Payload map[string]any `json:"payload"`
Status JobStatus `json:"status"`
CreatedAt time.Time `json:"created_at"`
StartedAt *time.Time `json:"started_at,omitempty"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
Error string `json:"error,omitempty"`
}
// JobStatus represents the current state of a job.
type JobStatus string
const (
StatusPending JobStatus = "pending"
StatusRunning JobStatus = "running"
StatusCompleted JobStatus = "completed"
StatusFailed JobStatus = "failed"
)
// String returns the string representation of the status.
func (s JobStatus) String() string {
return string(s)
}
// Sentinel errors.
var (
// ErrNoJob is returned when the queue has no pending jobs.
ErrNoJob = errors.New("no job available")
// ErrJobNotFound is returned when a job ID doesn't exist.
ErrJobNotFound = errors.New("job not found")
)

198
pkg/redisqueue/queue.go Normal file
View File

@ -0,0 +1,198 @@
package redisqueue
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
"git.threesix.ai/jordan/sp2-verify-1770324794/pkg/logging"
)
const (
// queueKey is the Redis list key for pending job IDs.
queueKey = "jobs:queue"
// jobKeyPrefix is the prefix for job data keys.
jobKeyPrefix = "jobs:data:"
// defaultJobTTL is the TTL for completed/failed jobs.
defaultJobTTL = 24 * time.Hour
)
// RedisQueue implements a job queue backed by Redis.
type RedisQueue struct {
client *redis.Client
logger *logging.Logger
}
// NewRedisQueue creates a new Redis-backed job queue.
func NewRedisQueue(client *redis.Client, logger *logging.Logger) *RedisQueue {
return &RedisQueue{
client: client,
logger: logger.WithComponent("redisqueue"),
}
}
// Enqueue adds a job to the queue.
// The job ID is generated automatically if not set.
func (q *RedisQueue) Enqueue(ctx context.Context, jobType string, payload map[string]any) (*Job, error) {
if jobType == "" {
return nil, fmt.Errorf("job type is required")
}
job := &Job{
ID: uuid.New().String(),
Type: jobType,
Payload: payload,
Status: StatusPending,
CreatedAt: time.Now().UTC(),
}
if job.Payload == nil {
job.Payload = make(map[string]any)
}
// Store job data
data, err := json.Marshal(job)
if err != nil {
return nil, fmt.Errorf("marshal job: %w", err)
}
jobKey := jobKeyPrefix + job.ID
// Use transaction to ensure atomicity
pipe := q.client.TxPipeline()
pipe.Set(ctx, jobKey, data, 0) // No TTL for pending jobs
pipe.RPush(ctx, queueKey, job.ID)
if _, err := pipe.Exec(ctx); err != nil {
return nil, fmt.Errorf("enqueue job: %w", err)
}
q.logger.Debug("job enqueued", "job_id", job.ID, "type", job.Type)
return job, nil
}
// GetJob retrieves a job by ID.
func (q *RedisQueue) GetJob(ctx context.Context, jobID string) (*Job, error) {
jobKey := jobKeyPrefix + jobID
data, err := q.client.Get(ctx, jobKey).Bytes()
if err != nil {
if err == redis.Nil {
return nil, ErrJobNotFound
}
return nil, fmt.Errorf("get job: %w", err)
}
var job Job
if err := json.Unmarshal(data, &job); err != nil {
return nil, fmt.Errorf("unmarshal job: %w", err)
}
return &job, nil
}
// Dequeue retrieves the next pending job from the queue.
// Blocks until a job is available or the timeout expires.
// Returns ErrNoJob if timeout expires with no job available.
func (q *RedisQueue) Dequeue(ctx context.Context, timeout time.Duration) (*Job, error) {
// BLPOP blocks until a job is available or timeout
result, err := q.client.BLPop(ctx, timeout, queueKey).Result()
if err != nil {
if err == redis.Nil {
return nil, ErrNoJob
}
return nil, fmt.Errorf("dequeue job: %w", err)
}
// result[0] is the key, result[1] is the value (job ID)
if len(result) < 2 {
return nil, ErrNoJob
}
jobID := result[1]
// Get job data
job, err := q.GetJob(ctx, jobID)
if err != nil {
return nil, err
}
// Update status to running
now := time.Now().UTC()
job.Status = StatusRunning
job.StartedAt = &now
if err := q.saveJob(ctx, job); err != nil {
// Re-queue the job on save failure
q.client.LPush(ctx, queueKey, jobID)
return nil, fmt.Errorf("update job status: %w", err)
}
q.logger.Debug("job dequeued", "job_id", job.ID, "type", job.Type)
return job, nil
}
// Complete marks a job as completed.
func (q *RedisQueue) Complete(ctx context.Context, jobID string) error {
job, err := q.GetJob(ctx, jobID)
if err != nil {
return err
}
now := time.Now().UTC()
job.Status = StatusCompleted
job.CompletedAt = &now
if err := q.saveJobWithTTL(ctx, job, defaultJobTTL); err != nil {
return fmt.Errorf("complete job: %w", err)
}
q.logger.Debug("job completed", "job_id", jobID)
return nil
}
// Fail marks a job as failed with an error message.
func (q *RedisQueue) Fail(ctx context.Context, jobID string, errMsg string) error {
job, err := q.GetJob(ctx, jobID)
if err != nil {
return err
}
now := time.Now().UTC()
job.Status = StatusFailed
job.CompletedAt = &now
job.Error = errMsg
if err := q.saveJobWithTTL(ctx, job, defaultJobTTL); err != nil {
return fmt.Errorf("fail job: %w", err)
}
q.logger.Debug("job failed", "job_id", jobID, "error", errMsg)
return nil
}
// saveJob saves a job to Redis without TTL.
func (q *RedisQueue) saveJob(ctx context.Context, job *Job) error {
return q.saveJobWithTTL(ctx, job, 0)
}
// saveJobWithTTL saves a job to Redis with optional TTL.
func (q *RedisQueue) saveJobWithTTL(ctx context.Context, job *Job, ttl time.Duration) error {
data, err := json.Marshal(job)
if err != nil {
return fmt.Errorf("marshal job: %w", err)
}
jobKey := jobKeyPrefix + job.ID
return q.client.Set(ctx, jobKey, data, ttl).Err()
}
// HealthCheck verifies Redis connectivity.
func (q *RedisQueue) HealthCheck(ctx context.Context) error {
return q.client.Ping(ctx).Err()
}

View File

@ -2,10 +2,16 @@
package main
import (
"os"
"github.com/redis/go-redis/v9"
"git.threesix.ai/jordan/sp2-verify-1770324794/pkg/app"
"git.threesix.ai/jordan/sp2-verify-1770324794/pkg/logging"
"git.threesix.ai/jordan/sp2-verify-1770324794/pkg/redisqueue"
"git.threesix.ai/jordan/sp2-verify-1770324794/services/api/internal/adapter/memory"
"git.threesix.ai/jordan/sp2-verify-1770324794/services/api/internal/api"
"git.threesix.ai/jordan/sp2-verify-1770324794/services/api/internal/config"
"git.threesix.ai/jordan/sp2-verify-1770324794/services/api/internal/service"
)
@ -13,17 +19,37 @@ func main() {
// Create logger
logger := logging.Default()
// Load configuration
cfg := config.Load()
// Create Redis client
redisOpts, err := redis.ParseURL(cfg.RedisURL)
if err != nil {
logger.Error("failed to parse redis url", "error", err)
os.Exit(1)
}
redisClient := redis.NewClient(redisOpts)
// Create adapters (repositories)
exampleRepo := memory.NewExampleRepository()
jobQueue := redisqueue.NewRedisQueue(redisClient, logger)
// Create services (business logic)
exampleService := service.NewExampleService(exampleRepo, logger)
jobService := service.NewJobService(jobQueue, logger)
// Create application
application := app.New("api", app.WithDefaultPort(8001))
// Close Redis on shutdown
application.OnShutdown(func() {
if err := redisClient.Close(); err != nil {
logger.Error("failed to close redis client", "error", err)
}
})
// Register routes with dependency injection
api.RegisterRoutes(application, exampleService)
api.RegisterRoutes(application, exampleService, jobService)
// Start server
application.Run()

View File

@ -2,7 +2,10 @@ module git.threesix.ai/jordan/sp2-verify-1770324794/services/api
go 1.23
require git.threesix.ai/jordan/sp2-verify-1770324794/pkg v0.0.0
require (
git.threesix.ai/jordan/sp2-verify-1770324794/pkg v0.0.0
github.com/redis/go-redis/v9 v9.7.0
)
// Use local workspace modules (for Docker builds without go.work)
replace git.threesix.ai/jordan/sp2-verify-1770324794/pkg => ../../pkg

View File

@ -0,0 +1,111 @@
package handlers
import (
"errors"
"net/http"
"github.com/go-chi/chi/v5"
"github.com/google/uuid"
"git.threesix.ai/jordan/sp2-verify-1770324794/pkg/app"
"git.threesix.ai/jordan/sp2-verify-1770324794/pkg/httperror"
"git.threesix.ai/jordan/sp2-verify-1770324794/pkg/httpresponse"
"git.threesix.ai/jordan/sp2-verify-1770324794/pkg/logging"
"git.threesix.ai/jordan/sp2-verify-1770324794/pkg/redisqueue"
"git.threesix.ai/jordan/sp2-verify-1770324794/services/api/internal/service"
)
// Job handles HTTP requests for job resources.
type Job struct {
svc *service.JobService
logger *logging.Logger
}
// NewJob creates a new Job handler with injected dependencies.
func NewJob(svc *service.JobService, logger *logging.Logger) *Job {
return &Job{
svc: svc,
logger: logger.WithComponent("JobHandler"),
}
}
// CreateJobRequest is the request body for creating a job.
type CreateJobRequest struct {
Type string `json:"type" validate:"required,min=1,max=100"`
Payload map[string]any `json:"payload"`
}
// JobResponse is the response for a job resource.
type JobResponse struct {
ID string `json:"id"`
Type string `json:"type"`
Status string `json:"status"`
Payload map[string]any `json:"payload,omitempty"`
CreatedAt string `json:"created_at"`
StartedAt *string `json:"started_at,omitempty"`
CompletedAt *string `json:"completed_at,omitempty"`
Error string `json:"error,omitempty"`
}
// toJobResponse converts a redisqueue.Job to a JobResponse.
func toJobResponse(j *redisqueue.Job) JobResponse {
resp := JobResponse{
ID: j.ID,
Type: j.Type,
Status: j.Status.String(),
Payload: j.Payload,
CreatedAt: j.CreatedAt.Format("2006-01-02T15:04:05Z"),
Error: j.Error,
}
if j.StartedAt != nil {
s := j.StartedAt.Format("2006-01-02T15:04:05Z")
resp.StartedAt = &s
}
if j.CompletedAt != nil {
s := j.CompletedAt.Format("2006-01-02T15:04:05Z")
resp.CompletedAt = &s
}
return resp
}
// Create creates a new job.
func (h *Job) Create(w http.ResponseWriter, r *http.Request) error {
var req CreateJobRequest
if err := app.BindAndValidate(r, &req); err != nil {
return err
}
job, err := h.svc.Create(r.Context(), service.CreateJobInput{
Type: req.Type,
Payload: req.Payload,
})
if err != nil {
return err
}
httpresponse.Created(w, r, toJobResponse(job))
return nil
}
// Get returns a job by ID.
func (h *Job) Get(w http.ResponseWriter, r *http.Request) error {
id := chi.URLParam(r, "id")
// Validate UUID format
if _, err := uuid.Parse(id); err != nil {
return httperror.BadRequest("invalid id format")
}
job, err := h.svc.Get(r.Context(), id)
if err != nil {
if errors.Is(err, redisqueue.ErrJobNotFound) {
return httperror.NotFound("job not found")
}
return err
}
httpresponse.OK(w, r, toJobResponse(job))
return nil
}

View File

@ -0,0 +1,238 @@
package handlers
import (
"bytes"
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"
"github.com/go-chi/chi/v5"
"git.threesix.ai/jordan/sp2-verify-1770324794/pkg/logging"
"git.threesix.ai/jordan/sp2-verify-1770324794/pkg/redisqueue"
"git.threesix.ai/jordan/sp2-verify-1770324794/services/api/internal/port"
"git.threesix.ai/jordan/sp2-verify-1770324794/services/api/internal/service"
)
// mockJobQueue implements port.JobQueue for testing.
type mockJobQueue struct {
mu sync.RWMutex
jobs map[string]*redisqueue.Job
}
var _ port.JobQueue = (*mockJobQueue)(nil)
func newMockJobQueue() *mockJobQueue {
return &mockJobQueue{
jobs: make(map[string]*redisqueue.Job),
}
}
func (m *mockJobQueue) Enqueue(ctx context.Context, jobType string, payload map[string]any) (*redisqueue.Job, error) {
m.mu.Lock()
defer m.mu.Unlock()
job := &redisqueue.Job{
ID: "550e8400-e29b-41d4-a716-446655440000",
Type: jobType,
Payload: payload,
Status: redisqueue.StatusPending,
CreatedAt: time.Now().UTC(),
}
m.jobs[job.ID] = job
return job, nil
}
func (m *mockJobQueue) GetJob(ctx context.Context, jobID string) (*redisqueue.Job, error) {
m.mu.RLock()
defer m.mu.RUnlock()
job, ok := m.jobs[jobID]
if !ok {
return nil, redisqueue.ErrJobNotFound
}
copy := *job
return &copy, nil
}
func newTestJobHandler() (*Job, *mockJobQueue) {
queue := newMockJobQueue()
svc := service.NewJobService(queue, logging.Nop())
handler := NewJob(svc, logging.Nop())
return handler, queue
}
func TestJob_Create(t *testing.T) {
tests := []struct {
name string
body any
wantStatus int
}{
{
name: "valid request",
body: CreateJobRequest{
Type: "test_job",
Payload: map[string]any{"key": "value"},
},
wantStatus: http.StatusCreated,
},
{
name: "valid request without payload",
body: CreateJobRequest{
Type: "simple_job",
},
wantStatus: http.StatusCreated,
},
{
name: "empty body",
body: nil,
wantStatus: http.StatusBadRequest,
},
{
name: "missing type",
body: map[string]any{
"payload": map[string]any{"key": "value"},
},
wantStatus: http.StatusBadRequest,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
handler, _ := newTestJobHandler()
r := chi.NewRouter()
r.Post("/api/api/jobs", func(w http.ResponseWriter, r *http.Request) {
if err := handler.Create(w, r); err != nil {
if tt.wantStatus == http.StatusBadRequest {
w.WriteHeader(http.StatusBadRequest)
} else {
w.WriteHeader(http.StatusInternalServerError)
}
return
}
})
var body []byte
if tt.body != nil {
var err error
body, err = json.Marshal(tt.body)
if err != nil {
t.Fatalf("failed to marshal body: %v", err)
}
}
req := httptest.NewRequest(http.MethodPost, "/api/api/jobs", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != tt.wantStatus {
t.Errorf("expected status %d, got %d", tt.wantStatus, w.Code)
}
if tt.wantStatus == http.StatusCreated {
var resp map[string]any
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
t.Fatalf("failed to decode response: %v", err)
}
data, ok := resp["data"].(map[string]any)
if !ok {
t.Fatal("expected 'data' field in response")
}
if data["status"] != "pending" {
t.Errorf("expected status 'pending', got %v", data["status"])
}
if data["id"] == "" {
t.Error("expected non-empty job ID")
}
}
})
}
}
func TestJob_Get(t *testing.T) {
handler, queue := newTestJobHandler()
// Seed data
job := &redisqueue.Job{
ID: "550e8400-e29b-41d4-a716-446655440000",
Type: "test_job",
Payload: map[string]any{"key": "value"},
Status: redisqueue.StatusCompleted,
CreatedAt: time.Now().UTC(),
}
queue.jobs[job.ID] = job
tests := []struct {
name string
id string
wantStatus int
}{
{
name: "valid uuid - found",
id: "550e8400-e29b-41d4-a716-446655440000",
wantStatus: http.StatusOK,
},
{
name: "valid uuid - not found",
id: "550e8400-e29b-41d4-a716-446655440001",
wantStatus: http.StatusNotFound,
},
{
name: "invalid uuid",
id: "not-a-uuid",
wantStatus: http.StatusBadRequest,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := chi.NewRouter()
r.Get("/api/api/jobs/{id}", func(w http.ResponseWriter, r *http.Request) {
if err := handler.Get(w, r); err != nil {
switch tt.wantStatus {
case http.StatusNotFound:
w.WriteHeader(http.StatusNotFound)
case http.StatusBadRequest:
w.WriteHeader(http.StatusBadRequest)
default:
w.WriteHeader(http.StatusInternalServerError)
}
return
}
})
req := httptest.NewRequest(http.MethodGet, "/api/api/jobs/"+tt.id, nil)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != tt.wantStatus {
t.Errorf("expected status %d, got %d", tt.wantStatus, w.Code)
}
if tt.wantStatus == http.StatusOK {
var resp map[string]any
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
t.Fatalf("failed to decode response: %v", err)
}
data, ok := resp["data"].(map[string]any)
if !ok {
t.Fatal("expected 'data' field in response")
}
if data["id"] != tt.id {
t.Errorf("expected id %s, got %v", tt.id, data["id"])
}
}
})
}
}

View File

@ -14,13 +14,15 @@ import (
// This allows the monorepo to expose multiple services under a single domain:
// - https://domain/api/api/health
// - https://domain/api/api/examples
func RegisterRoutes(application *app.App, exampleService *service.ExampleService) {
// - https://domain/api/api/jobs
func RegisterRoutes(application *app.App, exampleService *service.ExampleService, jobService *service.JobService) {
logger := application.Logger()
cfg := config.Load()
// Initialize handlers with injected services
healthHandler := handlers.NewHealth(logger)
exampleHandler := handlers.NewExample(exampleService, logger)
jobHandler := handlers.NewJob(jobService, logger)
// Build and mount OpenAPI spec
spec := NewServiceSpec()
@ -35,6 +37,10 @@ func RegisterRoutes(application *app.App, exampleService *service.ExampleService
r.Get("/examples", app.Wrap(exampleHandler.List))
r.Get("/examples/{id}", app.Wrap(exampleHandler.Get))
// Job routes (public for this implementation)
r.Post("/jobs", app.Wrap(jobHandler.Create))
r.Get("/jobs/{id}", app.Wrap(jobHandler.Get))
// Protected routes (auth required when enabled)
r.Group(func(r app.Router) {
if cfg.AuthEnabled {

View File

@ -8,7 +8,8 @@ func NewServiceSpec() *openapi.OpenAPISpec {
WithDescription("REST API for the api service").
WithBearerSecurity("bearer", "JWT authentication token").
WithTag("Health", "Service health endpoints").
WithTag("Examples", "Example CRUD endpoints")
WithTag("Examples", "Example CRUD endpoints").
WithTag("Jobs", "Async job endpoints")
// Define reusable schemas
spec.WithSchema("Example", openapi.Object(map[string]openapi.Schema{
@ -29,6 +30,23 @@ func NewServiceSpec() *openapi.OpenAPISpec {
"description": openapi.StringWithMinMax(0, 500).WithDescription("Updated description"),
}))
// Job schemas
spec.WithSchema("Job", openapi.Object(map[string]openapi.Schema{
"id": openapi.UUID().WithDescription("Unique job identifier"),
"type": openapi.String().WithDescription("Job type").WithExample("process_data"),
"status": openapi.String().WithDescription("Job status: pending, running, completed, failed").WithExample("pending"),
"payload": openapi.Object(nil).WithDescription("Job payload data"),
"created_at": openapi.DateTime().WithDescription("Creation timestamp"),
"started_at": openapi.DateTime().WithDescription("When processing started"),
"completed_at": openapi.DateTime().WithDescription("When processing completed"),
"error": openapi.String().WithDescription("Error message if failed"),
}, "id", "type", "status", "created_at"))
spec.WithSchema("CreateJobRequest", openapi.Object(map[string]openapi.Schema{
"type": openapi.StringWithMinMax(1, 100).WithDescription("Job type"),
"payload": openapi.Object(nil).WithDescription("Job payload data"),
}, "type"))
// Health
spec.AddPath("/api/api/health", "get", map[string]any{
"summary": "Health check",
@ -108,5 +126,30 @@ func NewServiceSpec() *openapi.OpenAPISpec {
},
})
// Create job
spec.AddPath("/api/api/jobs", "post", map[string]any{
"summary": "Create async job",
"description": "Creates a new async job that will be processed by a background worker.",
"tags": []string{"Jobs"},
"requestBody": openapi.RequestBody(openapi.Ref("CreateJobRequest"), true),
"responses": map[string]any{
"201": openapi.OpResponse("Created", openapi.ResponseSchema(openapi.Ref("Job"))),
"400": openapi.OpResponse("Bad request", openapi.ErrorResponseSchema()),
},
})
// Get job
spec.AddPath("/api/api/jobs/{id}", "get", map[string]any{
"summary": "Get job status",
"description": "Returns the current status and details of an async job.",
"tags": []string{"Jobs"},
"parameters": []any{openapi.IDParam()},
"responses": map[string]any{
"200": openapi.OpResponse("Success", openapi.ResponseSchema(openapi.Ref("Job"))),
"400": openapi.OpResponse("Bad request", openapi.ErrorResponseSchema()),
"404": openapi.OpResponse("Not found", openapi.ErrorResponseSchema()),
},
})
return spec
}

View File

@ -18,10 +18,18 @@ type Config struct {
// Auth
AuthEnabled bool
JWTSecret string
// Redis
RedisURL string
}
// Load reads configuration from environment variables.
func Load() *Config {
redisURL := os.Getenv("REDIS_URL")
if redisURL == "" {
redisURL = "redis://localhost:6379"
}
return &Config{
AppConfig: config.ReadAppConfig(),
Server: config.ReadServerConfig(),
@ -30,5 +38,7 @@ func Load() *Config {
AuthEnabled: strings.EqualFold(os.Getenv("AUTH_ENABLED"), "true"),
JWTSecret: os.Getenv("JWT_SECRET"),
RedisURL: redisURL,
}
}

View File

@ -0,0 +1,19 @@
package port
import (
"context"
"git.threesix.ai/jordan/sp2-verify-1770324794/pkg/redisqueue"
)
// JobQueue defines the interface for job queue operations.
// Implementations may use Redis, PostgreSQL, or other backends.
type JobQueue interface {
// Enqueue adds a job to the queue.
// Returns the created job with ID and pending status.
Enqueue(ctx context.Context, jobType string, payload map[string]any) (*redisqueue.Job, error)
// GetJob retrieves a job by ID.
// Returns redisqueue.ErrJobNotFound if the job doesn't exist.
GetJob(ctx context.Context, jobID string) (*redisqueue.Job, error)
}

View File

@ -0,0 +1,52 @@
package service
import (
"context"
"fmt"
"git.threesix.ai/jordan/sp2-verify-1770324794/pkg/logging"
"git.threesix.ai/jordan/sp2-verify-1770324794/pkg/redisqueue"
"git.threesix.ai/jordan/sp2-verify-1770324794/services/api/internal/port"
)
// JobService handles job-related business logic.
type JobService struct {
queue port.JobQueue
logger *logging.Logger
}
// NewJobService creates a new job service.
func NewJobService(queue port.JobQueue, logger *logging.Logger) *JobService {
return &JobService{
queue: queue,
logger: logger.WithService("JobService"),
}
}
// CreateInput contains the data needed to create a job.
type CreateJobInput struct {
Type string
Payload map[string]any
}
// Create creates a new async job.
// Returns an error if the job type is empty.
func (s *JobService) Create(ctx context.Context, input CreateJobInput) (*redisqueue.Job, error) {
if input.Type == "" {
return nil, fmt.Errorf("job type is required")
}
job, err := s.queue.Enqueue(ctx, input.Type, input.Payload)
if err != nil {
return nil, err
}
s.logger.Info("job created", "job_id", job.ID, "type", job.Type)
return job, nil
}
// Get retrieves a job by ID.
// Returns redisqueue.ErrJobNotFound if the job doesn't exist.
func (s *JobService) Get(ctx context.Context, jobID string) (*redisqueue.Job, error) {
return s.queue.GetJob(ctx, jobID)
}

View File

@ -9,9 +9,12 @@ import (
"syscall"
"time"
"github.com/redis/go-redis/v9"
"git.threesix.ai/jordan/sp2-verify-1770324794/pkg/database"
"git.threesix.ai/jordan/sp2-verify-1770324794/pkg/logging"
"git.threesix.ai/jordan/sp2-verify-1770324794/pkg/queue"
"git.threesix.ai/jordan/sp2-verify-1770324794/pkg/redisqueue"
"git.threesix.ai/jordan/sp2-verify-1770324794/workers/background-processor/internal/config"
"git.threesix.ai/jordan/sp2-verify-1770324794/workers/background-processor/internal/handlers"
)
@ -64,14 +67,14 @@ func main() {
database.MustRunMigrations(ctx, pool, migrationsFS, "migrations")
logger.Info("migrations complete")
// Initialize queue
// Initialize PostgreSQL queue (for existing job handlers)
jobQueue := queue.NewPostgresQueue(pool.DB, logger)
// Initialize and start handler
// Initialize and start handler for PostgreSQL queue
handler := handlers.New(logger, jobQueue, handlers.Config{
PollInterval: cfg.Worker.PollInterval,
StaleJobTimeout: cfg.Worker.StaleJobTimeout,
JobTimeout: cfg.Worker.JobTimeout,
PollInterval: cfg.Worker.PollInterval,
StaleJobTimeout: cfg.Worker.StaleJobTimeout,
JobTimeout: cfg.Worker.JobTimeout,
})
// Register job handlers
@ -79,16 +82,48 @@ func main() {
// handler.RegisterHandler("send_email", emailHandler)
// handler.RegisterHandler("process_image", imageHandler)
// Initialize Redis client for async jobs
redisOpts, err := redis.ParseURL(cfg.Redis.URL)
if err != nil {
logger.Error("failed to parse redis url", "error", err)
os.Exit(1)
}
redisClient := redis.NewClient(redisOpts)
defer func() {
if err := redisClient.Close(); err != nil {
logger.Error("failed to close redis client", "error", err)
}
}()
// Verify Redis connection
if err := redisClient.Ping(ctx).Err(); err != nil {
logger.Error("failed to connect to redis", "error", err)
os.Exit(1)
}
logger.Info("connected to redis", "url", cfg.Redis.URL)
// Initialize Redis queue for async jobs
redisQueue := redisqueue.NewRedisQueue(redisClient, logger)
// Initialize async job processor
asyncProcessor := handlers.NewAsyncJobProcessor(redisQueue, logger, handlers.AsyncJobProcessorConfig{
SimulationDuration: cfg.Worker.SimulationDuration,
PollTimeout: cfg.Worker.PollInterval,
})
// Setup signal handling
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
// Start worker in goroutine
// Start PostgreSQL worker in goroutine
go handler.Run(ctx)
// Start stale job recovery in goroutine
go runStaleJobRecovery(ctx, jobQueue, cfg.Worker.StaleJobTimeout, logger)
// Start Redis async job processor in goroutine
go asyncProcessor.Run(ctx)
// Wait for shutdown signal
sig := <-sigCh
logger.Info("received shutdown signal", "signal", sig.String())

View File

@ -5,6 +5,7 @@ go 1.23
require (
git.threesix.ai/jordan/sp2-verify-1770324794/pkg v0.0.0
github.com/google/uuid v1.6.0
github.com/redis/go-redis/v9 v9.7.0
)
// Use local workspace modules (for Docker builds without go.work)

View File

@ -15,6 +15,12 @@ type Config struct {
Database config.DatabaseConfig
Logging config.LoggingConfig
Worker WorkerConfig
Redis RedisConfig
}
// RedisConfig holds Redis connection settings.
type RedisConfig struct {
URL string
}
// WorkerConfig holds worker-specific settings.
@ -34,6 +40,9 @@ type WorkerConfig struct {
// JobTimeout is the maximum time a single job handler can run.
JobTimeout time.Duration
// SimulationDuration is how long to simulate work for async jobs.
SimulationDuration time.Duration
}
// Load reads configuration from environment variables.
@ -46,6 +55,8 @@ func Load() (*Config, error) {
viper.SetDefault("WORKER_MAX_RETRIES", 3)
viper.SetDefault("WORKER_STALE_JOB_TIMEOUT", "5m")
viper.SetDefault("WORKER_JOB_TIMEOUT", "5m")
viper.SetDefault("WORKER_SIMULATION_DURATION", "2s")
viper.SetDefault("REDIS_URL", "redis://localhost:6379")
},
}); err != nil {
return nil, err
@ -56,11 +67,15 @@ func Load() (*Config, error) {
Database: config.ReadDatabaseConfig(),
Logging: config.ReadLoggingConfig(),
Worker: WorkerConfig{
PollInterval: viper.GetDuration("WORKER_POLL_INTERVAL"),
BatchSize: viper.GetInt("WORKER_BATCH_SIZE"),
MaxRetries: viper.GetInt("WORKER_MAX_RETRIES"),
StaleJobTimeout: viper.GetDuration("WORKER_STALE_JOB_TIMEOUT"),
JobTimeout: viper.GetDuration("WORKER_JOB_TIMEOUT"),
PollInterval: viper.GetDuration("WORKER_POLL_INTERVAL"),
BatchSize: viper.GetInt("WORKER_BATCH_SIZE"),
MaxRetries: viper.GetInt("WORKER_MAX_RETRIES"),
StaleJobTimeout: viper.GetDuration("WORKER_STALE_JOB_TIMEOUT"),
JobTimeout: viper.GetDuration("WORKER_JOB_TIMEOUT"),
SimulationDuration: viper.GetDuration("WORKER_SIMULATION_DURATION"),
},
Redis: RedisConfig{
URL: viper.GetString("REDIS_URL"),
},
}, nil
}

View File

@ -0,0 +1,104 @@
package handlers
import (
"context"
"time"
"git.threesix.ai/jordan/sp2-verify-1770324794/pkg/logging"
"git.threesix.ai/jordan/sp2-verify-1770324794/pkg/redisqueue"
)
// AsyncJobProcessor processes jobs from the Redis queue.
type AsyncJobProcessor struct {
logger *logging.Logger
queue *redisqueue.RedisQueue
simulationDuration time.Duration
pollTimeout time.Duration
}
// AsyncJobProcessorConfig holds configuration for the async job processor.
type AsyncJobProcessorConfig struct {
SimulationDuration time.Duration
PollTimeout time.Duration
}
// NewAsyncJobProcessor creates a new async job processor.
func NewAsyncJobProcessor(queue *redisqueue.RedisQueue, logger *logging.Logger, cfg AsyncJobProcessorConfig) *AsyncJobProcessor {
if cfg.SimulationDuration == 0 {
cfg.SimulationDuration = 2 * time.Second
}
if cfg.PollTimeout == 0 {
cfg.PollTimeout = 5 * time.Second
}
return &AsyncJobProcessor{
logger: logger.WithComponent("async-job-processor"),
queue: queue,
simulationDuration: cfg.SimulationDuration,
pollTimeout: cfg.PollTimeout,
}
}
// Run starts the async job processing loop.
// It continuously polls the Redis queue for jobs and processes them.
func (p *AsyncJobProcessor) Run(ctx context.Context) {
p.logger.Info("async job processor started",
"simulation_duration", p.simulationDuration,
"poll_timeout", p.pollTimeout,
)
for {
select {
case <-ctx.Done():
p.logger.Info("async job processor stopping")
return
default:
if err := p.processNextJob(ctx); err != nil {
if err == redisqueue.ErrNoJob {
// No jobs available, continue polling
continue
}
p.logger.Error("error processing job", "error", err)
// Brief pause on error to avoid tight loop
time.Sleep(time.Second)
}
}
}
}
// processNextJob dequeues and processes a single job from the Redis queue.
func (p *AsyncJobProcessor) processNextJob(ctx context.Context) error {
// Dequeue with timeout (blocks until job available or timeout)
job, err := p.queue.Dequeue(ctx, p.pollTimeout)
if err != nil {
return err
}
p.logger.Info("processing job",
"job_id", job.ID,
"type", job.Type,
"payload", job.Payload,
)
// Simulate work
select {
case <-ctx.Done():
// Context cancelled during work - don't mark as failed, let it be reprocessed
p.logger.Warn("job processing interrupted", "job_id", job.ID)
return ctx.Err()
case <-time.After(p.simulationDuration):
// Work completed
}
// Mark job as completed
if err := p.queue.Complete(ctx, job.ID); err != nil {
p.logger.Error("failed to complete job",
"job_id", job.ID,
"error", err,
)
return err
}
p.logger.Info("job completed", "job_id", job.ID)
return nil
}