148 lines
3.8 KiB
Go
148 lines
3.8 KiB
Go
// Package handlers provides the worker's job processing logic.
|
|
package handlers
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
|
|
"git.threesix.ai/jordan/persona-community-5/pkg/logging"
|
|
"git.threesix.ai/jordan/persona-community-5/pkg/queue"
|
|
)
|
|
|
|
// Config holds handler configuration.
|
|
type Config struct {
|
|
// PollInterval is how often to check for new jobs when queue is empty.
|
|
PollInterval time.Duration
|
|
|
|
// StaleJobTimeout is how long a job can run before being considered stale.
|
|
StaleJobTimeout time.Duration
|
|
|
|
// JobTimeout is the maximum time a job handler can run.
|
|
JobTimeout time.Duration
|
|
}
|
|
|
|
// Handler processes background jobs from the queue.
|
|
type Handler struct {
|
|
logger *logging.Logger
|
|
queue queue.Consumer
|
|
handlers map[string]queue.Handler
|
|
config Config
|
|
workerID string
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// New creates a new Handler.
|
|
func New(logger *logging.Logger, q queue.Consumer, cfg Config) *Handler {
|
|
// Apply defaults
|
|
if cfg.PollInterval == 0 {
|
|
cfg.PollInterval = 10 * time.Second
|
|
}
|
|
if cfg.StaleJobTimeout == 0 {
|
|
cfg.StaleJobTimeout = 5 * time.Minute
|
|
}
|
|
if cfg.JobTimeout == 0 {
|
|
cfg.JobTimeout = 5 * time.Minute
|
|
}
|
|
|
|
return &Handler{
|
|
logger: logger.WithComponent("handler"),
|
|
queue: q,
|
|
handlers: make(map[string]queue.Handler),
|
|
config: cfg,
|
|
workerID: uuid.New().String(),
|
|
}
|
|
}
|
|
|
|
// RegisterHandler registers a handler for a specific job type.
|
|
// Call this before Run() to set up job processing.
|
|
func (h *Handler) RegisterHandler(jobType string, handler queue.Handler) {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
h.handlers[jobType] = handler
|
|
h.logger.Info("registered job handler", "type", jobType)
|
|
}
|
|
|
|
// Run starts the worker loop and processes jobs until context is cancelled.
|
|
func (h *Handler) Run(ctx context.Context) {
|
|
h.logger.Info("worker loop started", "worker_id", h.workerID)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
h.logger.Info("worker loop stopping", "worker_id", h.workerID)
|
|
return
|
|
default:
|
|
if err := h.processNextJob(ctx); err != nil {
|
|
if errors.Is(err, queue.ErrNoJob) {
|
|
// Queue is empty, wait before polling again
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(h.config.PollInterval):
|
|
continue
|
|
}
|
|
}
|
|
// Log error and continue
|
|
h.logger.Error("error processing job", "error", err)
|
|
time.Sleep(time.Second) // Brief pause on error
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// processNextJob dequeues and processes a single job.
|
|
func (h *Handler) processNextJob(ctx context.Context) error {
|
|
job, err := h.queue.Dequeue(ctx, h.workerID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Get handler for job type
|
|
h.mu.RLock()
|
|
handler, ok := h.handlers[job.Type]
|
|
h.mu.RUnlock()
|
|
|
|
if !ok {
|
|
h.logger.Error("no handler for job type", "job_id", job.ID, "type", job.Type)
|
|
return h.queue.Fail(ctx, job.ID, fmt.Sprintf("unknown job type: %s", job.Type))
|
|
}
|
|
|
|
// Apply middleware and process (TimeoutMiddleware handles the deadline)
|
|
wrappedHandler := queue.Chain(
|
|
queue.RecoveryMiddleware(h.logger),
|
|
queue.LoggingMiddleware(h.logger),
|
|
queue.TimeoutMiddleware(h.config.JobTimeout),
|
|
)(handler)
|
|
|
|
// Use parent context - TimeoutMiddleware applies the job timeout
|
|
jobCtx := ctx
|
|
_ = jobCtx // jobCtx used below
|
|
|
|
if err := wrappedHandler(jobCtx, job); err != nil {
|
|
// Truncate error message to prevent log bloat and potential data leakage
|
|
errMsg := truncateErrorMessage(err.Error(), 1000)
|
|
h.logger.Debug("job handler failed", "job_id", job.ID, "error", errMsg)
|
|
return h.queue.Fail(ctx, job.ID, errMsg)
|
|
}
|
|
|
|
return h.queue.Ack(ctx, job.ID)
|
|
}
|
|
|
|
// WorkerID returns this handler's unique worker identifier.
|
|
func (h *Handler) WorkerID() string {
|
|
return h.workerID
|
|
}
|
|
|
|
// truncateErrorMessage limits error message length to prevent log bloat.
|
|
func truncateErrorMessage(msg string, maxLen int) string {
|
|
if len(msg) <= maxLen {
|
|
return msg
|
|
}
|
|
return msg[:maxLen-3] + "..."
|
|
}
|