Add worker component: background-processor
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
This commit is contained in:
parent
cf8cc3c0a4
commit
682b4198df
@ -36,6 +36,33 @@ steps:
|
|||||||
|
|
||||||
# COMPONENT_STEPS_BELOW
|
# COMPONENT_STEPS_BELOW
|
||||||
|
|
||||||
|
# Woodpecker CI step for background-processor worker
|
||||||
|
# Add this step to your .woodpecker.yml
|
||||||
|
|
||||||
|
build-background-processor:
|
||||||
|
image: woodpeckerci/plugin-kaniko
|
||||||
|
settings:
|
||||||
|
registry: registry.threesix.ai
|
||||||
|
repo: slack-q-1770281596/background-processor
|
||||||
|
tags:
|
||||||
|
- latest
|
||||||
|
- ${CI_COMMIT_SHA:0:8}
|
||||||
|
context: .
|
||||||
|
dockerfile: workers/background-processor/Dockerfile
|
||||||
|
cache: true
|
||||||
|
skip-tls-verify: true
|
||||||
|
when:
|
||||||
|
branch: main
|
||||||
|
event: push
|
||||||
|
|
||||||
|
deploy-background-processor:
|
||||||
|
image: bitnami/kubectl:latest
|
||||||
|
commands:
|
||||||
|
- kubectl set image deployment/slack-q-1770281596-background-processor background-processor=registry.threesix.ai/slack-q-1770281596/background-processor:${CI_COMMIT_SHA:0:8} -n projects || echo "Deployment not found, skipping"
|
||||||
|
when:
|
||||||
|
branch: main
|
||||||
|
event: push
|
||||||
|
|
||||||
# Woodpecker CI step for api service
|
# Woodpecker CI step for api service
|
||||||
# Add this step to your .woodpecker.yml
|
# Add this step to your .woodpecker.yml
|
||||||
|
|
||||||
|
|||||||
@ -79,4 +79,5 @@ slack-q-1770281596/
|
|||||||
| Component | Type | Path |
|
| Component | Type | Path |
|
||||||
|-----------|------|------|
|
|-----------|------|------|
|
||||||
| **api** | API service | `services/api/` |
|
| **api** | API service | `services/api/` |
|
||||||
|
| **background-processor** | Background worker | `workers/background-processor/` |
|
||||||
|
|
||||||
|
|||||||
1
Procfile
1
Procfile
@ -1,3 +1,4 @@
|
|||||||
# Local development processes
|
# Local development processes
|
||||||
# Components will be added below as they're created
|
# Components will be added below as they're created
|
||||||
api: cd services/api && make run
|
api: cd services/api && make run
|
||||||
|
background-processor: cd workers/background-processor && make run
|
||||||
|
|||||||
1
go.work
1
go.work
@ -2,4 +2,5 @@ go 1.23
|
|||||||
|
|
||||||
use ./pkg
|
use ./pkg
|
||||||
use ./services/api
|
use ./services/api
|
||||||
|
use ./workers/background-processor
|
||||||
// Component modules will be added below
|
// Component modules will be added below
|
||||||
|
|||||||
23
workers/background-processor/.env.example
Normal file
23
workers/background-processor/.env.example
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
# background-processor Worker Configuration
|
||||||
|
|
||||||
|
# App
|
||||||
|
APP_NAME=background-processor
|
||||||
|
APP_ENVIRONMENT=development
|
||||||
|
APP_DEBUG=true
|
||||||
|
|
||||||
|
# Logging
|
||||||
|
LOG_LEVEL=debug
|
||||||
|
LOG_FORMAT=text
|
||||||
|
|
||||||
|
# Database (required for job queue)
|
||||||
|
DATABASE_URL=postgres://dev:dev@localhost:5432/slack-q-1770281596?sslmode=disable
|
||||||
|
|
||||||
|
# Worker
|
||||||
|
WORKER_POLL_INTERVAL=10s
|
||||||
|
WORKER_BATCH_SIZE=10
|
||||||
|
WORKER_MAX_RETRIES=3
|
||||||
|
WORKER_STALE_JOB_TIMEOUT=5m
|
||||||
|
WORKER_JOB_TIMEOUT=5m
|
||||||
|
|
||||||
|
# Redis (optional, for cache)
|
||||||
|
# REDIS_URL=redis://localhost:6379/0
|
||||||
31
workers/background-processor/Dockerfile
Normal file
31
workers/background-processor/Dockerfile
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
# Build stage
|
||||||
|
FROM golang:1.23-alpine AS builder
|
||||||
|
|
||||||
|
RUN apk add --no-cache git
|
||||||
|
|
||||||
|
# Configure Go workspace and private modules
|
||||||
|
ENV GOPRIVATE=git.threesix.ai/*
|
||||||
|
ENV GOWORK=/app/go.work
|
||||||
|
|
||||||
|
WORKDIR /app
|
||||||
|
|
||||||
|
# Copy go workspace and all source (workspace deps are local)
|
||||||
|
# Note: go.work.sum may not exist if no external dependencies have been synced yet
|
||||||
|
COPY go.work ./
|
||||||
|
COPY go.work.su[m] ./
|
||||||
|
COPY pkg/ ./pkg/
|
||||||
|
COPY workers/background-processor/ ./workers/background-processor/
|
||||||
|
|
||||||
|
# Build from workspace root
|
||||||
|
RUN CGO_ENABLED=0 go build -o /background-processor ./workers/background-processor/cmd/worker
|
||||||
|
|
||||||
|
# Production stage
|
||||||
|
FROM alpine:3.19
|
||||||
|
|
||||||
|
RUN apk add --no-cache ca-certificates tzdata
|
||||||
|
|
||||||
|
WORKDIR /
|
||||||
|
|
||||||
|
COPY --from=builder /background-processor /background-processor
|
||||||
|
|
||||||
|
ENTRYPOINT ["/background-processor"]
|
||||||
34
workers/background-processor/Makefile
Normal file
34
workers/background-processor/Makefile
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
.PHONY: build run test lint fmt docker-build clean
|
||||||
|
|
||||||
|
WORKER := background-processor
|
||||||
|
BINARY := bin/$(WORKER)
|
||||||
|
GO_MODULE := git.threesix.ai/jordan/slack-q-1770281596
|
||||||
|
|
||||||
|
# Build the worker binary
|
||||||
|
build:
|
||||||
|
go build -o $(BINARY) ./cmd/worker
|
||||||
|
|
||||||
|
# Run the worker locally
|
||||||
|
run:
|
||||||
|
go run ./cmd/worker
|
||||||
|
|
||||||
|
# Run tests
|
||||||
|
test:
|
||||||
|
go test -v ./...
|
||||||
|
|
||||||
|
# Run linter
|
||||||
|
lint:
|
||||||
|
golangci-lint run ./...
|
||||||
|
|
||||||
|
# Format code
|
||||||
|
fmt:
|
||||||
|
gofmt -w .
|
||||||
|
goimports -w -local $(GO_MODULE) .
|
||||||
|
|
||||||
|
# Build Docker image (run from monorepo root)
|
||||||
|
docker-build:
|
||||||
|
docker build -t $(WORKER):latest -f Dockerfile ../..
|
||||||
|
|
||||||
|
# Clean build artifacts
|
||||||
|
clean:
|
||||||
|
rm -rf bin/
|
||||||
128
workers/background-processor/cmd/worker/main.go
Normal file
128
workers/background-processor/cmd/worker/main.go
Normal file
@ -0,0 +1,128 @@
|
|||||||
|
// Package main is the entry point for the background-processor worker.
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"embed"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.threesix.ai/jordan/slack-q-1770281596/pkg/config"
|
||||||
|
"git.threesix.ai/jordan/slack-q-1770281596/pkg/database"
|
||||||
|
"git.threesix.ai/jordan/slack-q-1770281596/pkg/logging"
|
||||||
|
"git.threesix.ai/jordan/slack-q-1770281596/pkg/queue"
|
||||||
|
"git.threesix.ai/jordan/slack-q-1770281596/workers/background-processor/internal/handlers"
|
||||||
|
workerconfig "git.threesix.ai/jordan/slack-q-1770281596/workers/background-processor/internal/config"
|
||||||
|
)
|
||||||
|
|
||||||
|
//go:embed migrations/*.sql
|
||||||
|
var migrationsFS embed.FS
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
// Initialize logger first (with defaults) so we can log config errors
|
||||||
|
logger := logging.New(logging.Config{
|
||||||
|
Level: logging.LevelInfo,
|
||||||
|
Format: logging.FormatJSON,
|
||||||
|
}).WithService("background-processor")
|
||||||
|
|
||||||
|
// Initialize configuration
|
||||||
|
cfg, err := workerconfig.Load()
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("failed to load config", "error", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reconfigure logger with loaded config
|
||||||
|
logger = logging.New(logging.Config{
|
||||||
|
Level: logging.ParseLevel(cfg.Logging.Level),
|
||||||
|
Format: logging.ParseFormat(cfg.Logging.Format),
|
||||||
|
Environment: cfg.AppConfig.Environment,
|
||||||
|
AddSource: cfg.AppConfig.IsDevelopment(),
|
||||||
|
}).WithService("background-processor")
|
||||||
|
|
||||||
|
logger.Info("starting background-processor worker")
|
||||||
|
|
||||||
|
// Setup graceful shutdown
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Connect to database
|
||||||
|
pool, err := database.Connect(ctx, cfg.Database.URL, database.Options{
|
||||||
|
MaxOpenConns: cfg.Database.MaxOpenConns,
|
||||||
|
MaxIdleConns: cfg.Database.MaxIdleConns,
|
||||||
|
ConnMaxLifetime: cfg.Database.ConnMaxLifetime,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("failed to connect to database", "error", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
defer pool.Close()
|
||||||
|
logger.Info("connected to database", "url", pool.URL)
|
||||||
|
|
||||||
|
// Run migrations
|
||||||
|
database.MustRunMigrations(ctx, pool, migrationsFS, "migrations")
|
||||||
|
logger.Info("migrations complete")
|
||||||
|
|
||||||
|
// Initialize queue
|
||||||
|
jobQueue := queue.NewPostgresQueue(pool.DB, logger)
|
||||||
|
|
||||||
|
// Initialize and start handler
|
||||||
|
handler := handlers.New(logger, jobQueue, handlers.Config{
|
||||||
|
PollInterval: cfg.Worker.PollInterval,
|
||||||
|
StaleJobTimeout: cfg.Worker.StaleJobTimeout,
|
||||||
|
JobTimeout: cfg.Worker.JobTimeout,
|
||||||
|
})
|
||||||
|
|
||||||
|
// Register job handlers
|
||||||
|
// TODO: Register your job handlers here
|
||||||
|
// handler.RegisterHandler("send_email", emailHandler)
|
||||||
|
// handler.RegisterHandler("process_image", imageHandler)
|
||||||
|
|
||||||
|
// Setup signal handling
|
||||||
|
sigCh := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
|
||||||
|
|
||||||
|
// Start worker in goroutine
|
||||||
|
go handler.Run(ctx)
|
||||||
|
|
||||||
|
// Start stale job recovery in goroutine
|
||||||
|
go runStaleJobRecovery(ctx, jobQueue, cfg.Worker.StaleJobTimeout, logger)
|
||||||
|
|
||||||
|
// Wait for shutdown signal
|
||||||
|
sig := <-sigCh
|
||||||
|
logger.Info("received shutdown signal", "signal", sig.String())
|
||||||
|
|
||||||
|
// Trigger graceful shutdown with grace period
|
||||||
|
logger.Info("initiating graceful shutdown")
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
// Give in-flight jobs time to complete (grace period)
|
||||||
|
// This allows handlers to notice context cancellation and finish cleanly.
|
||||||
|
const shutdownGracePeriod = 5 * time.Second
|
||||||
|
time.Sleep(shutdownGracePeriod)
|
||||||
|
|
||||||
|
logger.Info("background-processor worker stopped")
|
||||||
|
}
|
||||||
|
|
||||||
|
// runStaleJobRecovery periodically requeues jobs that have been running too long.
|
||||||
|
func runStaleJobRecovery(ctx context.Context, q *queue.PostgresQueue, timeout time.Duration, logger *logging.Logger) {
|
||||||
|
const staleCheckInterval = time.Minute
|
||||||
|
ticker := time.NewTicker(staleCheckInterval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
count, err := q.RequeueStale(ctx, timeout)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("failed to requeue stale jobs", "error", err)
|
||||||
|
} else if count > 0 {
|
||||||
|
logger.Info("requeued stale jobs", "count", count)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,32 @@
|
|||||||
|
-- Jobs queue table for async job processing.
|
||||||
|
-- Used by pkg/queue for producer/consumer patterns.
|
||||||
|
CREATE TABLE IF NOT EXISTS jobs (
|
||||||
|
id UUID PRIMARY KEY,
|
||||||
|
job_type VARCHAR(255) NOT NULL,
|
||||||
|
payload JSONB NOT NULL DEFAULT '{}',
|
||||||
|
status VARCHAR(50) NOT NULL DEFAULT 'pending',
|
||||||
|
priority INT NOT NULL DEFAULT 0,
|
||||||
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||||
|
started_at TIMESTAMPTZ,
|
||||||
|
completed_at TIMESTAMPTZ,
|
||||||
|
retry_count INT NOT NULL DEFAULT 0,
|
||||||
|
max_retries INT NOT NULL DEFAULT 3,
|
||||||
|
error TEXT,
|
||||||
|
worker_id VARCHAR(255)
|
||||||
|
);
|
||||||
|
|
||||||
|
-- Index for efficient dequeue: pending jobs ordered by priority (desc) and age (asc).
|
||||||
|
-- Partial index only includes pending jobs for efficiency.
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_jobs_dequeue ON jobs (priority DESC, created_at ASC)
|
||||||
|
WHERE status = 'pending';
|
||||||
|
|
||||||
|
-- Index for finding stale running jobs that need requeue.
|
||||||
|
-- Used by RequeueStale to recover from crashed workers.
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_jobs_stale ON jobs (started_at)
|
||||||
|
WHERE status = 'running';
|
||||||
|
|
||||||
|
-- Index for listing/filtering jobs by type.
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_jobs_type ON jobs (job_type, created_at DESC);
|
||||||
|
|
||||||
|
-- Index for listing jobs by status (useful for monitoring dashboards).
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs (status, created_at DESC);
|
||||||
8
workers/background-processor/component.yaml
Normal file
8
workers/background-processor/component.yaml
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
name: background-processor
|
||||||
|
type: worker
|
||||||
|
path: workers/background-processor
|
||||||
|
dependencies: []
|
||||||
|
# Add dependencies as needed:
|
||||||
|
# - postgres
|
||||||
|
# - redis
|
||||||
|
# - rabbitmq
|
||||||
11
workers/background-processor/go.mod
Normal file
11
workers/background-processor/go.mod
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
module git.threesix.ai/jordan/slack-q-1770281596/workers/background-processor
|
||||||
|
|
||||||
|
go 1.23
|
||||||
|
|
||||||
|
require (
|
||||||
|
git.threesix.ai/jordan/slack-q-1770281596/pkg v0.0.0
|
||||||
|
github.com/google/uuid v1.6.0
|
||||||
|
)
|
||||||
|
|
||||||
|
// Use local workspace modules (for Docker builds without go.work)
|
||||||
|
replace git.threesix.ai/jordan/slack-q-1770281596/pkg => ../../pkg
|
||||||
0
workers/background-processor/go.sum
Normal file
0
workers/background-processor/go.sum
Normal file
66
workers/background-processor/internal/config/config.go
Normal file
66
workers/background-processor/internal/config/config.go
Normal file
@ -0,0 +1,66 @@
|
|||||||
|
// Package config provides worker-specific configuration.
|
||||||
|
package config
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/spf13/viper"
|
||||||
|
|
||||||
|
"git.threesix.ai/jordan/slack-q-1770281596/pkg/config"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Config holds background-processor worker configuration.
|
||||||
|
type Config struct {
|
||||||
|
config.AppConfig
|
||||||
|
Database config.DatabaseConfig
|
||||||
|
Logging config.LoggingConfig
|
||||||
|
Worker WorkerConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
// WorkerConfig holds worker-specific settings.
|
||||||
|
type WorkerConfig struct {
|
||||||
|
// PollInterval is how often to check for new jobs when queue is empty.
|
||||||
|
PollInterval time.Duration
|
||||||
|
|
||||||
|
// BatchSize is the max number of jobs to process per poll (for batch workers).
|
||||||
|
BatchSize int
|
||||||
|
|
||||||
|
// MaxRetries is the default maximum retry attempts for failed jobs.
|
||||||
|
MaxRetries int
|
||||||
|
|
||||||
|
// StaleJobTimeout is how long a job can run before being considered stale.
|
||||||
|
// Jobs running longer than this without heartbeat will be requeued.
|
||||||
|
StaleJobTimeout time.Duration
|
||||||
|
|
||||||
|
// JobTimeout is the maximum time a single job handler can run.
|
||||||
|
JobTimeout time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// Load reads configuration from environment variables.
|
||||||
|
func Load() (*Config, error) {
|
||||||
|
if err := config.Init(config.Options{
|
||||||
|
AppName: "background-processor",
|
||||||
|
SetDefaults: func() {
|
||||||
|
viper.SetDefault("WORKER_POLL_INTERVAL", "10s")
|
||||||
|
viper.SetDefault("WORKER_BATCH_SIZE", 10)
|
||||||
|
viper.SetDefault("WORKER_MAX_RETRIES", 3)
|
||||||
|
viper.SetDefault("WORKER_STALE_JOB_TIMEOUT", "5m")
|
||||||
|
viper.SetDefault("WORKER_JOB_TIMEOUT", "5m")
|
||||||
|
},
|
||||||
|
}); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Config{
|
||||||
|
AppConfig: config.ReadAppConfig(),
|
||||||
|
Database: config.ReadDatabaseConfig(),
|
||||||
|
Logging: config.ReadLoggingConfig(),
|
||||||
|
Worker: WorkerConfig{
|
||||||
|
PollInterval: viper.GetDuration("WORKER_POLL_INTERVAL"),
|
||||||
|
BatchSize: viper.GetInt("WORKER_BATCH_SIZE"),
|
||||||
|
MaxRetries: viper.GetInt("WORKER_MAX_RETRIES"),
|
||||||
|
StaleJobTimeout: viper.GetDuration("WORKER_STALE_JOB_TIMEOUT"),
|
||||||
|
JobTimeout: viper.GetDuration("WORKER_JOB_TIMEOUT"),
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
147
workers/background-processor/internal/handlers/handler.go
Normal file
147
workers/background-processor/internal/handlers/handler.go
Normal file
@ -0,0 +1,147 @@
|
|||||||
|
// Package handlers provides the worker's job processing logic.
|
||||||
|
package handlers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
|
||||||
|
"git.threesix.ai/jordan/slack-q-1770281596/pkg/logging"
|
||||||
|
"git.threesix.ai/jordan/slack-q-1770281596/pkg/queue"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Config holds handler configuration.
|
||||||
|
type Config struct {
|
||||||
|
// PollInterval is how often to check for new jobs when queue is empty.
|
||||||
|
PollInterval time.Duration
|
||||||
|
|
||||||
|
// StaleJobTimeout is how long a job can run before being considered stale.
|
||||||
|
StaleJobTimeout time.Duration
|
||||||
|
|
||||||
|
// JobTimeout is the maximum time a job handler can run.
|
||||||
|
JobTimeout time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handler processes background jobs from the queue.
|
||||||
|
type Handler struct {
|
||||||
|
logger *logging.Logger
|
||||||
|
queue queue.Consumer
|
||||||
|
handlers map[string]queue.Handler
|
||||||
|
config Config
|
||||||
|
workerID string
|
||||||
|
mu sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// New creates a new Handler.
|
||||||
|
func New(logger *logging.Logger, q queue.Consumer, cfg Config) *Handler {
|
||||||
|
// Apply defaults
|
||||||
|
if cfg.PollInterval == 0 {
|
||||||
|
cfg.PollInterval = 10 * time.Second
|
||||||
|
}
|
||||||
|
if cfg.StaleJobTimeout == 0 {
|
||||||
|
cfg.StaleJobTimeout = 5 * time.Minute
|
||||||
|
}
|
||||||
|
if cfg.JobTimeout == 0 {
|
||||||
|
cfg.JobTimeout = 5 * time.Minute
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Handler{
|
||||||
|
logger: logger.WithComponent("handler"),
|
||||||
|
queue: q,
|
||||||
|
handlers: make(map[string]queue.Handler),
|
||||||
|
config: cfg,
|
||||||
|
workerID: uuid.New().String(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegisterHandler registers a handler for a specific job type.
|
||||||
|
// Call this before Run() to set up job processing.
|
||||||
|
func (h *Handler) RegisterHandler(jobType string, handler queue.Handler) {
|
||||||
|
h.mu.Lock()
|
||||||
|
defer h.mu.Unlock()
|
||||||
|
h.handlers[jobType] = handler
|
||||||
|
h.logger.Info("registered job handler", "type", jobType)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run starts the worker loop and processes jobs until context is cancelled.
|
||||||
|
func (h *Handler) Run(ctx context.Context) {
|
||||||
|
h.logger.Info("worker loop started", "worker_id", h.workerID)
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
h.logger.Info("worker loop stopping", "worker_id", h.workerID)
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
if err := h.processNextJob(ctx); err != nil {
|
||||||
|
if errors.Is(err, queue.ErrNoJob) {
|
||||||
|
// Queue is empty, wait before polling again
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-time.After(h.config.PollInterval):
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Log error and continue
|
||||||
|
h.logger.Error("error processing job", "error", err)
|
||||||
|
time.Sleep(time.Second) // Brief pause on error
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// processNextJob dequeues and processes a single job.
|
||||||
|
func (h *Handler) processNextJob(ctx context.Context) error {
|
||||||
|
job, err := h.queue.Dequeue(ctx, h.workerID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get handler for job type
|
||||||
|
h.mu.RLock()
|
||||||
|
handler, ok := h.handlers[job.Type]
|
||||||
|
h.mu.RUnlock()
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
h.logger.Error("no handler for job type", "job_id", job.ID, "type", job.Type)
|
||||||
|
return h.queue.Fail(ctx, job.ID, fmt.Sprintf("unknown job type: %s", job.Type))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply middleware and process (TimeoutMiddleware handles the deadline)
|
||||||
|
wrappedHandler := queue.Chain(
|
||||||
|
queue.RecoveryMiddleware(h.logger),
|
||||||
|
queue.LoggingMiddleware(h.logger),
|
||||||
|
queue.TimeoutMiddleware(h.config.JobTimeout),
|
||||||
|
)(handler)
|
||||||
|
|
||||||
|
// Use parent context - TimeoutMiddleware applies the job timeout
|
||||||
|
jobCtx := ctx
|
||||||
|
_ = jobCtx // jobCtx used below
|
||||||
|
|
||||||
|
if err := wrappedHandler(jobCtx, job); err != nil {
|
||||||
|
// Truncate error message to prevent log bloat and potential data leakage
|
||||||
|
errMsg := truncateErrorMessage(err.Error(), 1000)
|
||||||
|
h.logger.Debug("job handler failed", "job_id", job.ID, "error", errMsg)
|
||||||
|
return h.queue.Fail(ctx, job.ID, errMsg)
|
||||||
|
}
|
||||||
|
|
||||||
|
return h.queue.Ack(ctx, job.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// WorkerID returns this handler's unique worker identifier.
|
||||||
|
func (h *Handler) WorkerID() string {
|
||||||
|
return h.workerID
|
||||||
|
}
|
||||||
|
|
||||||
|
// truncateErrorMessage limits error message length to prevent log bloat.
|
||||||
|
func truncateErrorMessage(msg string, maxLen int) string {
|
||||||
|
if len(msg) <= maxLen {
|
||||||
|
return msg
|
||||||
|
}
|
||||||
|
return msg[:maxLen-3] + "..."
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue
Block a user