package queue import ( "context" "database/sql" "embed" "encoding/json" "errors" "fmt" "time" "github.com/google/uuid" "github.com/jmoiron/sqlx" "git.threesix.ai/jordan/persona-community-2/pkg/database" "git.threesix.ai/jordan/persona-community-2/pkg/logging" ) //go:embed migrations/*.sql var migrationsFS embed.FS // RunMigrations creates the jobs table if it doesn't exist. // Safe to call from both service and worker (idempotent). func RunMigrations(ctx context.Context, pool *database.Pool) error { return database.RunMigrations(ctx, pool, migrationsFS, "migrations") } // DBQueue implements Producer and Consumer using SQL (PostgreSQL or CockroachDB). // Uses FOR UPDATE SKIP LOCKED for atomic, non-blocking dequeue. type DBQueue struct { db *sqlx.DB logger *logging.Logger } // Ensure DBQueue implements Queue and JobReader at compile time. var ( _ Queue = (*DBQueue)(nil) _ JobReader = (*DBQueue)(nil) ) // NewQueue creates a queue backed by a SQL database (PostgreSQL or CockroachDB). func NewQueue(db *sqlx.DB, logger *logging.Logger) *DBQueue { return &DBQueue{ db: db, logger: logger.WithComponent("queue"), } } // Enqueue adds a job to the queue with default options. func (q *DBQueue) Enqueue(ctx context.Context, jobType string, payload map[string]any) (string, error) { return q.EnqueueWithOptions(ctx, Job{ Type: jobType, Payload: payload, Priority: 0, MaxRetries: 3, }) } // EnqueueWithOptions adds a job with custom configuration. func (q *DBQueue) EnqueueWithOptions(ctx context.Context, job Job) (string, error) { // Validate required fields if job.Type == "" { return "", fmt.Errorf("job type is required: %w", ErrJobNotFound) } job.ID = uuid.New().String() job.Status = StatusPending job.CreatedAt = time.Now().UTC() // Apply defaults and constraints if job.MaxRetries == 0 { job.MaxRetries = 3 } if job.MaxRetries > 100 { job.MaxRetries = 100 // Cap at reasonable limit } if job.Payload == nil { job.Payload = make(map[string]any) } payloadJSON, err := json.Marshal(job.Payload) if err != nil { return "", fmt.Errorf("marshal payload: %w", err) } _, err = q.db.ExecContext(ctx, ` INSERT INTO jobs (id, job_type, payload, status, priority, max_retries, created_at) VALUES ($1, $2, $3, $4, $5, $6, $7) `, job.ID, job.Type, payloadJSON, job.Status, job.Priority, job.MaxRetries, job.CreatedAt) if err != nil { return "", fmt.Errorf("insert job: %w", err) } q.logger.Debug("job enqueued", "job_id", job.ID, "type", job.Type, "priority", job.Priority) return job.ID, nil } // Dequeue atomically claims the next pending job. // Uses UPDATE with subquery + FOR UPDATE SKIP LOCKED for atomic, non-blocking claim. func (q *DBQueue) Dequeue(ctx context.Context, workerID string) (*Job, error) { now := time.Now().UTC() // Atomic claim: UPDATE with subquery + FOR UPDATE SKIP LOCKED // This avoids explicit transaction management while being safe for concurrent workers. var job jobRow err := q.db.QueryRowxContext(ctx, ` UPDATE jobs SET status = $1, worker_id = $2, started_at = $3 WHERE id = ( SELECT id FROM jobs WHERE status = $4 ORDER BY priority DESC, created_at ASC FOR UPDATE SKIP LOCKED LIMIT 1 ) RETURNING id, job_type, payload, status, priority, created_at, started_at, completed_at, retry_count, max_retries, error, worker_id `, StatusRunning, workerID, now, StatusPending).StructScan(&job) if errors.Is(err, sql.ErrNoRows) { return nil, ErrNoJob } if err != nil { return nil, fmt.Errorf("dequeue job: %w", err) } result, err := job.toJob() if err != nil { return nil, fmt.Errorf("parse job: %w", err) } q.logger.Debug("job dequeued", "job_id", result.ID, "type", result.Type, "worker_id", workerID) return result, nil } // Ack marks a job as successfully completed. func (q *DBQueue) Ack(ctx context.Context, jobID string) error { now := time.Now().UTC() result, err := q.db.ExecContext(ctx, ` UPDATE jobs SET status = $1, completed_at = $2 WHERE id = $3 `, StatusCompleted, now, jobID) if err != nil { return fmt.Errorf("ack job: %w", err) } rows, err := result.RowsAffected() if err != nil { return fmt.Errorf("check rows affected: %w", err) } if rows == 0 { return ErrJobNotFound } q.logger.Debug("job completed", "job_id", jobID) return nil } // Fail marks a job as failed, requeuing if retries remain. // Uses atomic UPDATE to handle retry logic in a single query. func (q *DBQueue) Fail(ctx context.Context, jobID string, errMsg string) error { // Atomic: increment retry_count, check if should requeue or fail permanently. // When retrying: clear worker_id and started_at, set status to pending. // When exhausted: set status to failed, set completed_at. now := time.Now().UTC() result, err := q.db.ExecContext(ctx, ` UPDATE jobs SET retry_count = retry_count + 1, error = $1, status = CASE WHEN retry_count + 1 >= max_retries THEN $2 ELSE $3 END, started_at = CASE WHEN retry_count + 1 >= max_retries THEN started_at ELSE NULL END, worker_id = CASE WHEN retry_count + 1 >= max_retries THEN worker_id ELSE NULL END, completed_at = CASE WHEN retry_count + 1 >= max_retries THEN $4 ELSE NULL END WHERE id = $5 `, errMsg, StatusFailed, StatusPending, now, jobID) if err != nil { return fmt.Errorf("fail job: %w", err) } rows, err := result.RowsAffected() if err != nil { return fmt.Errorf("check rows affected: %w", err) } if rows == 0 { return ErrJobNotFound } // Truncate error message to prevent log bloat (limit to 500 chars for logging) logErrMsg := errMsg if len(logErrMsg) > 500 { logErrMsg = logErrMsg[:497] + "..." } q.logger.Debug("job failed", "job_id", jobID, "error", logErrMsg) return nil } // Heartbeat extends the job's visibility timeout. // Updates started_at to prevent RequeueStale from reclaiming the job. func (q *DBQueue) Heartbeat(ctx context.Context, jobID string) error { result, err := q.db.ExecContext(ctx, ` UPDATE jobs SET started_at = $1 WHERE id = $2 AND status = $3 `, time.Now().UTC(), jobID, StatusRunning) if err != nil { return fmt.Errorf("heartbeat job: %w", err) } rows, err := result.RowsAffected() if err != nil { return fmt.Errorf("check rows affected: %w", err) } if rows == 0 { return ErrJobNotFound } return nil } // RequeueStale requeues jobs that have been running too long without heartbeat. // Call this periodically (e.g., every minute) to recover from crashed workers. // Returns the number of jobs requeued. func (q *DBQueue) RequeueStale(ctx context.Context, timeout time.Duration) (int64, error) { cutoff := time.Now().UTC().Add(-timeout) result, err := q.db.ExecContext(ctx, ` UPDATE jobs SET status = $1, worker_id = NULL, started_at = NULL WHERE status = $2 AND started_at < $3 `, StatusPending, StatusRunning, cutoff) if err != nil { return 0, fmt.Errorf("requeue stale jobs: %w", err) } count, err := result.RowsAffected() if err != nil { return 0, fmt.Errorf("check rows affected: %w", err) } if count > 0 { q.logger.Info("requeued stale jobs", "count", count, "timeout", timeout) } return count, nil } // GetJob retrieves a job by ID (for inspection/debugging). func (q *DBQueue) GetJob(ctx context.Context, jobID string) (*Job, error) { var job jobRow err := q.db.QueryRowxContext(ctx, ` SELECT id, job_type, payload, status, priority, created_at, started_at, completed_at, retry_count, max_retries, error, worker_id FROM jobs WHERE id = $1 `, jobID).StructScan(&job) if errors.Is(err, sql.ErrNoRows) { return nil, ErrJobNotFound } if err != nil { return nil, fmt.Errorf("get job: %w", err) } return job.toJob() } // jobRow is the database representation of a Job. // Handles nullable columns and JSON payload. type jobRow struct { ID string `db:"id"` Type string `db:"job_type"` Payload []byte `db:"payload"` Status string `db:"status"` Priority int `db:"priority"` CreatedAt time.Time `db:"created_at"` StartedAt sql.NullTime `db:"started_at"` CompletedAt sql.NullTime `db:"completed_at"` RetryCount int `db:"retry_count"` MaxRetries int `db:"max_retries"` Error sql.NullString `db:"error"` WorkerID sql.NullString `db:"worker_id"` } func (r *jobRow) toJob() (*Job, error) { job := &Job{ ID: r.ID, Type: r.Type, Status: JobStatus(r.Status), Priority: r.Priority, CreatedAt: r.CreatedAt, RetryCount: r.RetryCount, MaxRetries: r.MaxRetries, } if r.StartedAt.Valid { job.StartedAt = &r.StartedAt.Time } if r.CompletedAt.Valid { job.CompletedAt = &r.CompletedAt.Time } if r.Error.Valid { job.Error = r.Error.String } if r.WorkerID.Valid { job.WorkerID = r.WorkerID.String } if len(r.Payload) > 0 { if err := json.Unmarshal(r.Payload, &job.Payload); err != nil { return nil, fmt.Errorf("unmarshal payload: %w", err) } } return job, nil }