// Package handlers provides the worker's job processing logic. package handlers import ( "context" "errors" "fmt" "sync" "time" "github.com/google/uuid" "git.threesix.ai/jordan/sp2-verify-1770324794/pkg/logging" "git.threesix.ai/jordan/sp2-verify-1770324794/pkg/queue" ) // Config holds handler configuration. type Config struct { // PollInterval is how often to check for new jobs when queue is empty. PollInterval time.Duration // StaleJobTimeout is how long a job can run before being considered stale. StaleJobTimeout time.Duration // JobTimeout is the maximum time a job handler can run. JobTimeout time.Duration } // Handler processes background jobs from the queue. type Handler struct { logger *logging.Logger queue queue.Consumer handlers map[string]queue.Handler config Config workerID string mu sync.RWMutex } // New creates a new Handler. func New(logger *logging.Logger, q queue.Consumer, cfg Config) *Handler { // Apply defaults if cfg.PollInterval == 0 { cfg.PollInterval = 10 * time.Second } if cfg.StaleJobTimeout == 0 { cfg.StaleJobTimeout = 5 * time.Minute } if cfg.JobTimeout == 0 { cfg.JobTimeout = 5 * time.Minute } return &Handler{ logger: logger.WithComponent("handler"), queue: q, handlers: make(map[string]queue.Handler), config: cfg, workerID: uuid.New().String(), } } // RegisterHandler registers a handler for a specific job type. // Call this before Run() to set up job processing. func (h *Handler) RegisterHandler(jobType string, handler queue.Handler) { h.mu.Lock() defer h.mu.Unlock() h.handlers[jobType] = handler h.logger.Info("registered job handler", "type", jobType) } // Run starts the worker loop and processes jobs until context is cancelled. func (h *Handler) Run(ctx context.Context) { h.logger.Info("worker loop started", "worker_id", h.workerID) for { select { case <-ctx.Done(): h.logger.Info("worker loop stopping", "worker_id", h.workerID) return default: if err := h.processNextJob(ctx); err != nil { if errors.Is(err, queue.ErrNoJob) { // Queue is empty, wait before polling again select { case <-ctx.Done(): return case <-time.After(h.config.PollInterval): continue } } // Log error and continue h.logger.Error("error processing job", "error", err) time.Sleep(time.Second) // Brief pause on error } } } } // processNextJob dequeues and processes a single job. func (h *Handler) processNextJob(ctx context.Context) error { job, err := h.queue.Dequeue(ctx, h.workerID) if err != nil { return err } // Get handler for job type h.mu.RLock() handler, ok := h.handlers[job.Type] h.mu.RUnlock() if !ok { h.logger.Error("no handler for job type", "job_id", job.ID, "type", job.Type) return h.queue.Fail(ctx, job.ID, fmt.Sprintf("unknown job type: %s", job.Type)) } // Apply middleware and process (TimeoutMiddleware handles the deadline) wrappedHandler := queue.Chain( queue.RecoveryMiddleware(h.logger), queue.LoggingMiddleware(h.logger), queue.TimeoutMiddleware(h.config.JobTimeout), )(handler) // Use parent context - TimeoutMiddleware applies the job timeout jobCtx := ctx _ = jobCtx // jobCtx used below if err := wrappedHandler(jobCtx, job); err != nil { // Truncate error message to prevent log bloat and potential data leakage errMsg := truncateErrorMessage(err.Error(), 1000) h.logger.Debug("job handler failed", "job_id", job.ID, "error", errMsg) return h.queue.Fail(ctx, job.ID, errMsg) } return h.queue.Ack(ctx, job.ID) } // WorkerID returns this handler's unique worker identifier. func (h *Handler) WorkerID() string { return h.workerID } // truncateErrorMessage limits error message length to prevent log bloat. func truncateErrorMessage(msg string, maxLen int) string { if len(msg) <= maxLen { return msg } return msg[:maxLen-3] + "..." }