From 682b4198dfbfe27652bcbad2cd2b88729c07bab6 Mon Sep 17 00:00:00 2001 From: jordan Date: Thu, 5 Feb 2026 08:53:26 +0000 Subject: [PATCH] Add worker component: background-processor --- .woodpecker.yml | 27 ++++ CLAUDE.md | 1 + Procfile | 1 + go.work | 1 + workers/background-processor/.env.example | 23 +++ workers/background-processor/Dockerfile | 31 ++++ workers/background-processor/Makefile | 34 ++++ .../background-processor/cmd/worker/main.go | 128 +++++++++++++++ .../cmd/worker/migrations/001_create_jobs.sql | 32 ++++ workers/background-processor/component.yaml | 8 + workers/background-processor/go.mod | 11 ++ workers/background-processor/go.sum | 0 .../internal/config/config.go | 66 ++++++++ .../internal/handlers/handler.go | 147 ++++++++++++++++++ 14 files changed, 510 insertions(+) create mode 100644 workers/background-processor/.env.example create mode 100644 workers/background-processor/Dockerfile create mode 100644 workers/background-processor/Makefile create mode 100644 workers/background-processor/cmd/worker/main.go create mode 100644 workers/background-processor/cmd/worker/migrations/001_create_jobs.sql create mode 100644 workers/background-processor/component.yaml create mode 100644 workers/background-processor/go.mod create mode 100644 workers/background-processor/go.sum create mode 100644 workers/background-processor/internal/config/config.go create mode 100644 workers/background-processor/internal/handlers/handler.go diff --git a/.woodpecker.yml b/.woodpecker.yml index a69c46e..4b5010e 100644 --- a/.woodpecker.yml +++ b/.woodpecker.yml @@ -36,6 +36,33 @@ steps: # COMPONENT_STEPS_BELOW + # Woodpecker CI step for background-processor worker + # Add this step to your .woodpecker.yml + + build-background-processor: + image: woodpeckerci/plugin-kaniko + settings: + registry: registry.threesix.ai + repo: slack-q-1770281596/background-processor + tags: + - latest + - ${CI_COMMIT_SHA:0:8} + context: . + dockerfile: workers/background-processor/Dockerfile + cache: true + skip-tls-verify: true + when: + branch: main + event: push + + deploy-background-processor: + image: bitnami/kubectl:latest + commands: + - kubectl set image deployment/slack-q-1770281596-background-processor background-processor=registry.threesix.ai/slack-q-1770281596/background-processor:${CI_COMMIT_SHA:0:8} -n projects || echo "Deployment not found, skipping" + when: + branch: main + event: push + # Woodpecker CI step for api service # Add this step to your .woodpecker.yml diff --git a/CLAUDE.md b/CLAUDE.md index 27b1088..7d3e31b 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -79,4 +79,5 @@ slack-q-1770281596/ | Component | Type | Path | |-----------|------|------| | **api** | API service | `services/api/` | +| **background-processor** | Background worker | `workers/background-processor/` | diff --git a/Procfile b/Procfile index 60fb4f9..50d632e 100644 --- a/Procfile +++ b/Procfile @@ -1,3 +1,4 @@ # Local development processes # Components will be added below as they're created api: cd services/api && make run +background-processor: cd workers/background-processor && make run diff --git a/go.work b/go.work index 9b16450..cc6d431 100644 --- a/go.work +++ b/go.work @@ -2,4 +2,5 @@ go 1.23 use ./pkg use ./services/api +use ./workers/background-processor // Component modules will be added below diff --git a/workers/background-processor/.env.example b/workers/background-processor/.env.example new file mode 100644 index 0000000..75c6bd2 --- /dev/null +++ b/workers/background-processor/.env.example @@ -0,0 +1,23 @@ +# background-processor Worker Configuration + +# App +APP_NAME=background-processor +APP_ENVIRONMENT=development +APP_DEBUG=true + +# Logging +LOG_LEVEL=debug +LOG_FORMAT=text + +# Database (required for job queue) +DATABASE_URL=postgres://dev:dev@localhost:5432/slack-q-1770281596?sslmode=disable + +# Worker +WORKER_POLL_INTERVAL=10s +WORKER_BATCH_SIZE=10 +WORKER_MAX_RETRIES=3 +WORKER_STALE_JOB_TIMEOUT=5m +WORKER_JOB_TIMEOUT=5m + +# Redis (optional, for cache) +# REDIS_URL=redis://localhost:6379/0 diff --git a/workers/background-processor/Dockerfile b/workers/background-processor/Dockerfile new file mode 100644 index 0000000..bbb4102 --- /dev/null +++ b/workers/background-processor/Dockerfile @@ -0,0 +1,31 @@ +# Build stage +FROM golang:1.23-alpine AS builder + +RUN apk add --no-cache git + +# Configure Go workspace and private modules +ENV GOPRIVATE=git.threesix.ai/* +ENV GOWORK=/app/go.work + +WORKDIR /app + +# Copy go workspace and all source (workspace deps are local) +# Note: go.work.sum may not exist if no external dependencies have been synced yet +COPY go.work ./ +COPY go.work.su[m] ./ +COPY pkg/ ./pkg/ +COPY workers/background-processor/ ./workers/background-processor/ + +# Build from workspace root +RUN CGO_ENABLED=0 go build -o /background-processor ./workers/background-processor/cmd/worker + +# Production stage +FROM alpine:3.19 + +RUN apk add --no-cache ca-certificates tzdata + +WORKDIR / + +COPY --from=builder /background-processor /background-processor + +ENTRYPOINT ["/background-processor"] diff --git a/workers/background-processor/Makefile b/workers/background-processor/Makefile new file mode 100644 index 0000000..b8dd780 --- /dev/null +++ b/workers/background-processor/Makefile @@ -0,0 +1,34 @@ +.PHONY: build run test lint fmt docker-build clean + +WORKER := background-processor +BINARY := bin/$(WORKER) +GO_MODULE := git.threesix.ai/jordan/slack-q-1770281596 + +# Build the worker binary +build: + go build -o $(BINARY) ./cmd/worker + +# Run the worker locally +run: + go run ./cmd/worker + +# Run tests +test: + go test -v ./... + +# Run linter +lint: + golangci-lint run ./... + +# Format code +fmt: + gofmt -w . + goimports -w -local $(GO_MODULE) . + +# Build Docker image (run from monorepo root) +docker-build: + docker build -t $(WORKER):latest -f Dockerfile ../.. + +# Clean build artifacts +clean: + rm -rf bin/ diff --git a/workers/background-processor/cmd/worker/main.go b/workers/background-processor/cmd/worker/main.go new file mode 100644 index 0000000..2a0d647 --- /dev/null +++ b/workers/background-processor/cmd/worker/main.go @@ -0,0 +1,128 @@ +// Package main is the entry point for the background-processor worker. +package main + +import ( + "context" + "embed" + "os" + "os/signal" + "syscall" + "time" + + "git.threesix.ai/jordan/slack-q-1770281596/pkg/config" + "git.threesix.ai/jordan/slack-q-1770281596/pkg/database" + "git.threesix.ai/jordan/slack-q-1770281596/pkg/logging" + "git.threesix.ai/jordan/slack-q-1770281596/pkg/queue" + "git.threesix.ai/jordan/slack-q-1770281596/workers/background-processor/internal/handlers" + workerconfig "git.threesix.ai/jordan/slack-q-1770281596/workers/background-processor/internal/config" +) + +//go:embed migrations/*.sql +var migrationsFS embed.FS + +func main() { + // Initialize logger first (with defaults) so we can log config errors + logger := logging.New(logging.Config{ + Level: logging.LevelInfo, + Format: logging.FormatJSON, + }).WithService("background-processor") + + // Initialize configuration + cfg, err := workerconfig.Load() + if err != nil { + logger.Error("failed to load config", "error", err) + os.Exit(1) + } + + // Reconfigure logger with loaded config + logger = logging.New(logging.Config{ + Level: logging.ParseLevel(cfg.Logging.Level), + Format: logging.ParseFormat(cfg.Logging.Format), + Environment: cfg.AppConfig.Environment, + AddSource: cfg.AppConfig.IsDevelopment(), + }).WithService("background-processor") + + logger.Info("starting background-processor worker") + + // Setup graceful shutdown + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Connect to database + pool, err := database.Connect(ctx, cfg.Database.URL, database.Options{ + MaxOpenConns: cfg.Database.MaxOpenConns, + MaxIdleConns: cfg.Database.MaxIdleConns, + ConnMaxLifetime: cfg.Database.ConnMaxLifetime, + }) + if err != nil { + logger.Error("failed to connect to database", "error", err) + os.Exit(1) + } + defer pool.Close() + logger.Info("connected to database", "url", pool.URL) + + // Run migrations + database.MustRunMigrations(ctx, pool, migrationsFS, "migrations") + logger.Info("migrations complete") + + // Initialize queue + jobQueue := queue.NewPostgresQueue(pool.DB, logger) + + // Initialize and start handler + handler := handlers.New(logger, jobQueue, handlers.Config{ + PollInterval: cfg.Worker.PollInterval, + StaleJobTimeout: cfg.Worker.StaleJobTimeout, + JobTimeout: cfg.Worker.JobTimeout, + }) + + // Register job handlers + // TODO: Register your job handlers here + // handler.RegisterHandler("send_email", emailHandler) + // handler.RegisterHandler("process_image", imageHandler) + + // Setup signal handling + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + + // Start worker in goroutine + go handler.Run(ctx) + + // Start stale job recovery in goroutine + go runStaleJobRecovery(ctx, jobQueue, cfg.Worker.StaleJobTimeout, logger) + + // Wait for shutdown signal + sig := <-sigCh + logger.Info("received shutdown signal", "signal", sig.String()) + + // Trigger graceful shutdown with grace period + logger.Info("initiating graceful shutdown") + cancel() + + // Give in-flight jobs time to complete (grace period) + // This allows handlers to notice context cancellation and finish cleanly. + const shutdownGracePeriod = 5 * time.Second + time.Sleep(shutdownGracePeriod) + + logger.Info("background-processor worker stopped") +} + +// runStaleJobRecovery periodically requeues jobs that have been running too long. +func runStaleJobRecovery(ctx context.Context, q *queue.PostgresQueue, timeout time.Duration, logger *logging.Logger) { + const staleCheckInterval = time.Minute + ticker := time.NewTicker(staleCheckInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + count, err := q.RequeueStale(ctx, timeout) + if err != nil { + logger.Error("failed to requeue stale jobs", "error", err) + } else if count > 0 { + logger.Info("requeued stale jobs", "count", count) + } + } + } +} diff --git a/workers/background-processor/cmd/worker/migrations/001_create_jobs.sql b/workers/background-processor/cmd/worker/migrations/001_create_jobs.sql new file mode 100644 index 0000000..5af8ef9 --- /dev/null +++ b/workers/background-processor/cmd/worker/migrations/001_create_jobs.sql @@ -0,0 +1,32 @@ +-- Jobs queue table for async job processing. +-- Used by pkg/queue for producer/consumer patterns. +CREATE TABLE IF NOT EXISTS jobs ( + id UUID PRIMARY KEY, + job_type VARCHAR(255) NOT NULL, + payload JSONB NOT NULL DEFAULT '{}', + status VARCHAR(50) NOT NULL DEFAULT 'pending', + priority INT NOT NULL DEFAULT 0, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + started_at TIMESTAMPTZ, + completed_at TIMESTAMPTZ, + retry_count INT NOT NULL DEFAULT 0, + max_retries INT NOT NULL DEFAULT 3, + error TEXT, + worker_id VARCHAR(255) +); + +-- Index for efficient dequeue: pending jobs ordered by priority (desc) and age (asc). +-- Partial index only includes pending jobs for efficiency. +CREATE INDEX IF NOT EXISTS idx_jobs_dequeue ON jobs (priority DESC, created_at ASC) + WHERE status = 'pending'; + +-- Index for finding stale running jobs that need requeue. +-- Used by RequeueStale to recover from crashed workers. +CREATE INDEX IF NOT EXISTS idx_jobs_stale ON jobs (started_at) + WHERE status = 'running'; + +-- Index for listing/filtering jobs by type. +CREATE INDEX IF NOT EXISTS idx_jobs_type ON jobs (job_type, created_at DESC); + +-- Index for listing jobs by status (useful for monitoring dashboards). +CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs (status, created_at DESC); diff --git a/workers/background-processor/component.yaml b/workers/background-processor/component.yaml new file mode 100644 index 0000000..d844662 --- /dev/null +++ b/workers/background-processor/component.yaml @@ -0,0 +1,8 @@ +name: background-processor +type: worker +path: workers/background-processor +dependencies: [] +# Add dependencies as needed: +# - postgres +# - redis +# - rabbitmq diff --git a/workers/background-processor/go.mod b/workers/background-processor/go.mod new file mode 100644 index 0000000..bf81c27 --- /dev/null +++ b/workers/background-processor/go.mod @@ -0,0 +1,11 @@ +module git.threesix.ai/jordan/slack-q-1770281596/workers/background-processor + +go 1.23 + +require ( + git.threesix.ai/jordan/slack-q-1770281596/pkg v0.0.0 + github.com/google/uuid v1.6.0 +) + +// Use local workspace modules (for Docker builds without go.work) +replace git.threesix.ai/jordan/slack-q-1770281596/pkg => ../../pkg diff --git a/workers/background-processor/go.sum b/workers/background-processor/go.sum new file mode 100644 index 0000000..e69de29 diff --git a/workers/background-processor/internal/config/config.go b/workers/background-processor/internal/config/config.go new file mode 100644 index 0000000..517d422 --- /dev/null +++ b/workers/background-processor/internal/config/config.go @@ -0,0 +1,66 @@ +// Package config provides worker-specific configuration. +package config + +import ( + "time" + + "github.com/spf13/viper" + + "git.threesix.ai/jordan/slack-q-1770281596/pkg/config" +) + +// Config holds background-processor worker configuration. +type Config struct { + config.AppConfig + Database config.DatabaseConfig + Logging config.LoggingConfig + Worker WorkerConfig +} + +// WorkerConfig holds worker-specific settings. +type WorkerConfig struct { + // PollInterval is how often to check for new jobs when queue is empty. + PollInterval time.Duration + + // BatchSize is the max number of jobs to process per poll (for batch workers). + BatchSize int + + // MaxRetries is the default maximum retry attempts for failed jobs. + MaxRetries int + + // StaleJobTimeout is how long a job can run before being considered stale. + // Jobs running longer than this without heartbeat will be requeued. + StaleJobTimeout time.Duration + + // JobTimeout is the maximum time a single job handler can run. + JobTimeout time.Duration +} + +// Load reads configuration from environment variables. +func Load() (*Config, error) { + if err := config.Init(config.Options{ + AppName: "background-processor", + SetDefaults: func() { + viper.SetDefault("WORKER_POLL_INTERVAL", "10s") + viper.SetDefault("WORKER_BATCH_SIZE", 10) + viper.SetDefault("WORKER_MAX_RETRIES", 3) + viper.SetDefault("WORKER_STALE_JOB_TIMEOUT", "5m") + viper.SetDefault("WORKER_JOB_TIMEOUT", "5m") + }, + }); err != nil { + return nil, err + } + + return &Config{ + AppConfig: config.ReadAppConfig(), + Database: config.ReadDatabaseConfig(), + Logging: config.ReadLoggingConfig(), + Worker: WorkerConfig{ + PollInterval: viper.GetDuration("WORKER_POLL_INTERVAL"), + BatchSize: viper.GetInt("WORKER_BATCH_SIZE"), + MaxRetries: viper.GetInt("WORKER_MAX_RETRIES"), + StaleJobTimeout: viper.GetDuration("WORKER_STALE_JOB_TIMEOUT"), + JobTimeout: viper.GetDuration("WORKER_JOB_TIMEOUT"), + }, + }, nil +} diff --git a/workers/background-processor/internal/handlers/handler.go b/workers/background-processor/internal/handlers/handler.go new file mode 100644 index 0000000..0a2bd90 --- /dev/null +++ b/workers/background-processor/internal/handlers/handler.go @@ -0,0 +1,147 @@ +// Package handlers provides the worker's job processing logic. +package handlers + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/google/uuid" + + "git.threesix.ai/jordan/slack-q-1770281596/pkg/logging" + "git.threesix.ai/jordan/slack-q-1770281596/pkg/queue" +) + +// Config holds handler configuration. +type Config struct { + // PollInterval is how often to check for new jobs when queue is empty. + PollInterval time.Duration + + // StaleJobTimeout is how long a job can run before being considered stale. + StaleJobTimeout time.Duration + + // JobTimeout is the maximum time a job handler can run. + JobTimeout time.Duration +} + +// Handler processes background jobs from the queue. +type Handler struct { + logger *logging.Logger + queue queue.Consumer + handlers map[string]queue.Handler + config Config + workerID string + mu sync.RWMutex +} + +// New creates a new Handler. +func New(logger *logging.Logger, q queue.Consumer, cfg Config) *Handler { + // Apply defaults + if cfg.PollInterval == 0 { + cfg.PollInterval = 10 * time.Second + } + if cfg.StaleJobTimeout == 0 { + cfg.StaleJobTimeout = 5 * time.Minute + } + if cfg.JobTimeout == 0 { + cfg.JobTimeout = 5 * time.Minute + } + + return &Handler{ + logger: logger.WithComponent("handler"), + queue: q, + handlers: make(map[string]queue.Handler), + config: cfg, + workerID: uuid.New().String(), + } +} + +// RegisterHandler registers a handler for a specific job type. +// Call this before Run() to set up job processing. +func (h *Handler) RegisterHandler(jobType string, handler queue.Handler) { + h.mu.Lock() + defer h.mu.Unlock() + h.handlers[jobType] = handler + h.logger.Info("registered job handler", "type", jobType) +} + +// Run starts the worker loop and processes jobs until context is cancelled. +func (h *Handler) Run(ctx context.Context) { + h.logger.Info("worker loop started", "worker_id", h.workerID) + + for { + select { + case <-ctx.Done(): + h.logger.Info("worker loop stopping", "worker_id", h.workerID) + return + default: + if err := h.processNextJob(ctx); err != nil { + if errors.Is(err, queue.ErrNoJob) { + // Queue is empty, wait before polling again + select { + case <-ctx.Done(): + return + case <-time.After(h.config.PollInterval): + continue + } + } + // Log error and continue + h.logger.Error("error processing job", "error", err) + time.Sleep(time.Second) // Brief pause on error + } + } + } +} + +// processNextJob dequeues and processes a single job. +func (h *Handler) processNextJob(ctx context.Context) error { + job, err := h.queue.Dequeue(ctx, h.workerID) + if err != nil { + return err + } + + // Get handler for job type + h.mu.RLock() + handler, ok := h.handlers[job.Type] + h.mu.RUnlock() + + if !ok { + h.logger.Error("no handler for job type", "job_id", job.ID, "type", job.Type) + return h.queue.Fail(ctx, job.ID, fmt.Sprintf("unknown job type: %s", job.Type)) + } + + // Apply middleware and process (TimeoutMiddleware handles the deadline) + wrappedHandler := queue.Chain( + queue.RecoveryMiddleware(h.logger), + queue.LoggingMiddleware(h.logger), + queue.TimeoutMiddleware(h.config.JobTimeout), + )(handler) + + // Use parent context - TimeoutMiddleware applies the job timeout + jobCtx := ctx + _ = jobCtx // jobCtx used below + + if err := wrappedHandler(jobCtx, job); err != nil { + // Truncate error message to prevent log bloat and potential data leakage + errMsg := truncateErrorMessage(err.Error(), 1000) + h.logger.Debug("job handler failed", "job_id", job.ID, "error", errMsg) + return h.queue.Fail(ctx, job.ID, errMsg) + } + + return h.queue.Ack(ctx, job.ID) +} + +// WorkerID returns this handler's unique worker identifier. +func (h *Handler) WorkerID() string { + return h.workerID +} + +// truncateErrorMessage limits error message length to prevent log bloat. +func truncateErrorMessage(msg string, maxLen int) string { + if len(msg) <= maxLen { + return msg + } + return msg[:maxLen-3] + "..." +}