sp4-debug-1770477266/workers/worker-svc/internal/handlers/handler.go
jordan 927537046a
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
Add components: service/auth-svc, service/chat-svc, worker/worker-svc
2026-02-07 15:14:33 +00:00

148 lines
3.8 KiB
Go

// Package handlers provides the worker's job processing logic.
package handlers
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/google/uuid"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/logging"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/queue"
)
// Config holds handler configuration.
type Config struct {
// PollInterval is how often to check for new jobs when queue is empty.
PollInterval time.Duration
// StaleJobTimeout is how long a job can run before being considered stale.
StaleJobTimeout time.Duration
// JobTimeout is the maximum time a job handler can run.
JobTimeout time.Duration
}
// Handler processes background jobs from the queue.
type Handler struct {
logger *logging.Logger
queue queue.Consumer
handlers map[string]queue.Handler
config Config
workerID string
mu sync.RWMutex
}
// New creates a new Handler.
func New(logger *logging.Logger, q queue.Consumer, cfg Config) *Handler {
// Apply defaults
if cfg.PollInterval == 0 {
cfg.PollInterval = 10 * time.Second
}
if cfg.StaleJobTimeout == 0 {
cfg.StaleJobTimeout = 5 * time.Minute
}
if cfg.JobTimeout == 0 {
cfg.JobTimeout = 5 * time.Minute
}
return &Handler{
logger: logger.WithComponent("handler"),
queue: q,
handlers: make(map[string]queue.Handler),
config: cfg,
workerID: uuid.New().String(),
}
}
// RegisterHandler registers a handler for a specific job type.
// Call this before Run() to set up job processing.
func (h *Handler) RegisterHandler(jobType string, handler queue.Handler) {
h.mu.Lock()
defer h.mu.Unlock()
h.handlers[jobType] = handler
h.logger.Info("registered job handler", "type", jobType)
}
// Run starts the worker loop and processes jobs until context is cancelled.
func (h *Handler) Run(ctx context.Context) {
h.logger.Info("worker loop started", "worker_id", h.workerID)
for {
select {
case <-ctx.Done():
h.logger.Info("worker loop stopping", "worker_id", h.workerID)
return
default:
if err := h.processNextJob(ctx); err != nil {
if errors.Is(err, queue.ErrNoJob) {
// Queue is empty, wait before polling again
select {
case <-ctx.Done():
return
case <-time.After(h.config.PollInterval):
continue
}
}
// Log error and continue
h.logger.Error("error processing job", "error", err)
time.Sleep(time.Second) // Brief pause on error
}
}
}
}
// processNextJob dequeues and processes a single job.
func (h *Handler) processNextJob(ctx context.Context) error {
job, err := h.queue.Dequeue(ctx, h.workerID)
if err != nil {
return err
}
// Get handler for job type
h.mu.RLock()
handler, ok := h.handlers[job.Type]
h.mu.RUnlock()
if !ok {
h.logger.Error("no handler for job type", "job_id", job.ID, "type", job.Type)
return h.queue.Fail(ctx, job.ID, fmt.Sprintf("unknown job type: %s", job.Type))
}
// Apply middleware and process (TimeoutMiddleware handles the deadline)
wrappedHandler := queue.Chain(
queue.RecoveryMiddleware(h.logger),
queue.LoggingMiddleware(h.logger),
queue.TimeoutMiddleware(h.config.JobTimeout),
)(handler)
// Use parent context - TimeoutMiddleware applies the job timeout
jobCtx := ctx
_ = jobCtx // jobCtx used below
if err := wrappedHandler(jobCtx, job); err != nil {
// Truncate error message to prevent log bloat and potential data leakage
errMsg := truncateErrorMessage(err.Error(), 1000)
h.logger.Debug("job handler failed", "job_id", job.ID, "error", errMsg)
return h.queue.Fail(ctx, job.ID, errMsg)
}
return h.queue.Ack(ctx, job.ID)
}
// WorkerID returns this handler's unique worker identifier.
func (h *Handler) WorkerID() string {
return h.workerID
}
// truncateErrorMessage limits error message length to prevent log bloat.
func truncateErrorMessage(msg string, maxLen int) string {
if len(msg) <= maxLen {
return msg
}
return msg[:maxLen-3] + "..."
}