Add worker component: background-processor
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
This commit is contained in:
parent
974769f86e
commit
8f798029f5
@ -36,6 +36,33 @@ steps:
|
||||
|
||||
# COMPONENT_STEPS_BELOW
|
||||
|
||||
# Woodpecker CI step for background-processor worker
|
||||
# Add this step to your .woodpecker.yml
|
||||
|
||||
build-background-processor:
|
||||
image: woodpeckerci/plugin-kaniko
|
||||
settings:
|
||||
registry: registry.threesix.ai
|
||||
repo: sp2-verify-1770320675/background-processor
|
||||
tags:
|
||||
- latest
|
||||
- ${CI_COMMIT_SHA:0:8}
|
||||
context: .
|
||||
dockerfile: workers/background-processor/Dockerfile
|
||||
cache: true
|
||||
skip-tls-verify: true
|
||||
when:
|
||||
branch: main
|
||||
event: push
|
||||
|
||||
deploy-background-processor:
|
||||
image: bitnami/kubectl:latest
|
||||
commands:
|
||||
- kubectl set image deployment/sp2-verify-1770320675-background-processor background-processor=registry.threesix.ai/sp2-verify-1770320675/background-processor:${CI_COMMIT_SHA:0:8} -n projects || echo "Deployment not found, skipping"
|
||||
when:
|
||||
branch: main
|
||||
event: push
|
||||
|
||||
# Woodpecker CI step for api service
|
||||
# Add this step to your .woodpecker.yml
|
||||
|
||||
|
||||
@ -79,4 +79,5 @@ sp2-verify-1770320675/
|
||||
| Component | Type | Path |
|
||||
|-----------|------|------|
|
||||
| **api** | API service | `services/api/` |
|
||||
| **background-processor** | Background worker | `workers/background-processor/` |
|
||||
|
||||
|
||||
1
Procfile
1
Procfile
@ -1,3 +1,4 @@
|
||||
# Local development processes
|
||||
# Components will be added below as they're created
|
||||
api: cd services/api && make run
|
||||
background-processor: cd workers/background-processor && make run
|
||||
|
||||
1
go.work
1
go.work
@ -2,4 +2,5 @@ go 1.23
|
||||
|
||||
use ./pkg
|
||||
use ./services/api
|
||||
use ./workers/background-processor
|
||||
// Component modules will be added below
|
||||
|
||||
23
workers/background-processor/.env.example
Normal file
23
workers/background-processor/.env.example
Normal file
@ -0,0 +1,23 @@
|
||||
# background-processor Worker Configuration
|
||||
|
||||
# App
|
||||
APP_NAME=background-processor
|
||||
APP_ENVIRONMENT=development
|
||||
APP_DEBUG=true
|
||||
|
||||
# Logging
|
||||
LOG_LEVEL=debug
|
||||
LOG_FORMAT=text
|
||||
|
||||
# Database (required for job queue)
|
||||
DATABASE_URL=postgres://dev:dev@localhost:5432/sp2-verify-1770320675?sslmode=disable
|
||||
|
||||
# Worker
|
||||
WORKER_POLL_INTERVAL=10s
|
||||
WORKER_BATCH_SIZE=10
|
||||
WORKER_MAX_RETRIES=3
|
||||
WORKER_STALE_JOB_TIMEOUT=5m
|
||||
WORKER_JOB_TIMEOUT=5m
|
||||
|
||||
# Redis (optional, for cache)
|
||||
# REDIS_URL=redis://localhost:6379/0
|
||||
31
workers/background-processor/Dockerfile
Normal file
31
workers/background-processor/Dockerfile
Normal file
@ -0,0 +1,31 @@
|
||||
# Build stage
|
||||
FROM golang:1.23-alpine AS builder
|
||||
|
||||
RUN apk add --no-cache git
|
||||
|
||||
# Configure Go workspace and private modules
|
||||
ENV GOPRIVATE=git.threesix.ai/*
|
||||
ENV GOWORK=/app/go.work
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Copy go workspace and all source (workspace deps are local)
|
||||
# Note: go.work.sum may not exist if no external dependencies have been synced yet
|
||||
COPY go.work ./
|
||||
COPY go.work.su[m] ./
|
||||
COPY pkg/ ./pkg/
|
||||
COPY workers/background-processor/ ./workers/background-processor/
|
||||
|
||||
# Build from workspace root
|
||||
RUN CGO_ENABLED=0 go build -o /background-processor ./workers/background-processor/cmd/worker
|
||||
|
||||
# Production stage
|
||||
FROM alpine:3.19
|
||||
|
||||
RUN apk add --no-cache ca-certificates tzdata
|
||||
|
||||
WORKDIR /
|
||||
|
||||
COPY --from=builder /background-processor /background-processor
|
||||
|
||||
ENTRYPOINT ["/background-processor"]
|
||||
34
workers/background-processor/Makefile
Normal file
34
workers/background-processor/Makefile
Normal file
@ -0,0 +1,34 @@
|
||||
.PHONY: build run test lint fmt docker-build clean
|
||||
|
||||
WORKER := background-processor
|
||||
BINARY := bin/$(WORKER)
|
||||
GO_MODULE := git.threesix.ai/jordan/sp2-verify-1770320675
|
||||
|
||||
# Build the worker binary
|
||||
build:
|
||||
go build -o $(BINARY) ./cmd/worker
|
||||
|
||||
# Run the worker locally
|
||||
run:
|
||||
go run ./cmd/worker
|
||||
|
||||
# Run tests
|
||||
test:
|
||||
go test -v ./...
|
||||
|
||||
# Run linter
|
||||
lint:
|
||||
golangci-lint run ./...
|
||||
|
||||
# Format code
|
||||
fmt:
|
||||
gofmt -w .
|
||||
goimports -w -local $(GO_MODULE) .
|
||||
|
||||
# Build Docker image (run from monorepo root)
|
||||
docker-build:
|
||||
docker build -t $(WORKER):latest -f Dockerfile ../..
|
||||
|
||||
# Clean build artifacts
|
||||
clean:
|
||||
rm -rf bin/
|
||||
128
workers/background-processor/cmd/worker/main.go
Normal file
128
workers/background-processor/cmd/worker/main.go
Normal file
@ -0,0 +1,128 @@
|
||||
// Package main is the entry point for the background-processor worker.
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"embed"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"git.threesix.ai/jordan/sp2-verify-1770320675/pkg/config"
|
||||
"git.threesix.ai/jordan/sp2-verify-1770320675/pkg/database"
|
||||
"git.threesix.ai/jordan/sp2-verify-1770320675/pkg/logging"
|
||||
"git.threesix.ai/jordan/sp2-verify-1770320675/pkg/queue"
|
||||
"git.threesix.ai/jordan/sp2-verify-1770320675/workers/background-processor/internal/handlers"
|
||||
workerconfig "git.threesix.ai/jordan/sp2-verify-1770320675/workers/background-processor/internal/config"
|
||||
)
|
||||
|
||||
//go:embed migrations/*.sql
|
||||
var migrationsFS embed.FS
|
||||
|
||||
func main() {
|
||||
// Initialize logger first (with defaults) so we can log config errors
|
||||
logger := logging.New(logging.Config{
|
||||
Level: logging.LevelInfo,
|
||||
Format: logging.FormatJSON,
|
||||
}).WithService("background-processor")
|
||||
|
||||
// Initialize configuration
|
||||
cfg, err := workerconfig.Load()
|
||||
if err != nil {
|
||||
logger.Error("failed to load config", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Reconfigure logger with loaded config
|
||||
logger = logging.New(logging.Config{
|
||||
Level: logging.ParseLevel(cfg.Logging.Level),
|
||||
Format: logging.ParseFormat(cfg.Logging.Format),
|
||||
Environment: cfg.AppConfig.Environment,
|
||||
AddSource: cfg.AppConfig.IsDevelopment(),
|
||||
}).WithService("background-processor")
|
||||
|
||||
logger.Info("starting background-processor worker")
|
||||
|
||||
// Setup graceful shutdown
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Connect to database
|
||||
pool, err := database.Connect(ctx, cfg.Database.URL, database.Options{
|
||||
MaxOpenConns: cfg.Database.MaxOpenConns,
|
||||
MaxIdleConns: cfg.Database.MaxIdleConns,
|
||||
ConnMaxLifetime: cfg.Database.ConnMaxLifetime,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error("failed to connect to database", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
defer pool.Close()
|
||||
logger.Info("connected to database", "url", pool.URL)
|
||||
|
||||
// Run migrations
|
||||
database.MustRunMigrations(ctx, pool, migrationsFS, "migrations")
|
||||
logger.Info("migrations complete")
|
||||
|
||||
// Initialize queue
|
||||
jobQueue := queue.NewPostgresQueue(pool.DB, logger)
|
||||
|
||||
// Initialize and start handler
|
||||
handler := handlers.New(logger, jobQueue, handlers.Config{
|
||||
PollInterval: cfg.Worker.PollInterval,
|
||||
StaleJobTimeout: cfg.Worker.StaleJobTimeout,
|
||||
JobTimeout: cfg.Worker.JobTimeout,
|
||||
})
|
||||
|
||||
// Register job handlers
|
||||
// TODO: Register your job handlers here
|
||||
// handler.RegisterHandler("send_email", emailHandler)
|
||||
// handler.RegisterHandler("process_image", imageHandler)
|
||||
|
||||
// Setup signal handling
|
||||
sigCh := make(chan os.Signal, 1)
|
||||
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
// Start worker in goroutine
|
||||
go handler.Run(ctx)
|
||||
|
||||
// Start stale job recovery in goroutine
|
||||
go runStaleJobRecovery(ctx, jobQueue, cfg.Worker.StaleJobTimeout, logger)
|
||||
|
||||
// Wait for shutdown signal
|
||||
sig := <-sigCh
|
||||
logger.Info("received shutdown signal", "signal", sig.String())
|
||||
|
||||
// Trigger graceful shutdown with grace period
|
||||
logger.Info("initiating graceful shutdown")
|
||||
cancel()
|
||||
|
||||
// Give in-flight jobs time to complete (grace period)
|
||||
// This allows handlers to notice context cancellation and finish cleanly.
|
||||
const shutdownGracePeriod = 5 * time.Second
|
||||
time.Sleep(shutdownGracePeriod)
|
||||
|
||||
logger.Info("background-processor worker stopped")
|
||||
}
|
||||
|
||||
// runStaleJobRecovery periodically requeues jobs that have been running too long.
|
||||
func runStaleJobRecovery(ctx context.Context, q *queue.PostgresQueue, timeout time.Duration, logger *logging.Logger) {
|
||||
const staleCheckInterval = time.Minute
|
||||
ticker := time.NewTicker(staleCheckInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
count, err := q.RequeueStale(ctx, timeout)
|
||||
if err != nil {
|
||||
logger.Error("failed to requeue stale jobs", "error", err)
|
||||
} else if count > 0 {
|
||||
logger.Info("requeued stale jobs", "count", count)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,32 @@
|
||||
-- Jobs queue table for async job processing.
|
||||
-- Used by pkg/queue for producer/consumer patterns.
|
||||
CREATE TABLE IF NOT EXISTS jobs (
|
||||
id UUID PRIMARY KEY,
|
||||
job_type VARCHAR(255) NOT NULL,
|
||||
payload JSONB NOT NULL DEFAULT '{}',
|
||||
status VARCHAR(50) NOT NULL DEFAULT 'pending',
|
||||
priority INT NOT NULL DEFAULT 0,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
started_at TIMESTAMPTZ,
|
||||
completed_at TIMESTAMPTZ,
|
||||
retry_count INT NOT NULL DEFAULT 0,
|
||||
max_retries INT NOT NULL DEFAULT 3,
|
||||
error TEXT,
|
||||
worker_id VARCHAR(255)
|
||||
);
|
||||
|
||||
-- Index for efficient dequeue: pending jobs ordered by priority (desc) and age (asc).
|
||||
-- Partial index only includes pending jobs for efficiency.
|
||||
CREATE INDEX IF NOT EXISTS idx_jobs_dequeue ON jobs (priority DESC, created_at ASC)
|
||||
WHERE status = 'pending';
|
||||
|
||||
-- Index for finding stale running jobs that need requeue.
|
||||
-- Used by RequeueStale to recover from crashed workers.
|
||||
CREATE INDEX IF NOT EXISTS idx_jobs_stale ON jobs (started_at)
|
||||
WHERE status = 'running';
|
||||
|
||||
-- Index for listing/filtering jobs by type.
|
||||
CREATE INDEX IF NOT EXISTS idx_jobs_type ON jobs (job_type, created_at DESC);
|
||||
|
||||
-- Index for listing jobs by status (useful for monitoring dashboards).
|
||||
CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs (status, created_at DESC);
|
||||
8
workers/background-processor/component.yaml
Normal file
8
workers/background-processor/component.yaml
Normal file
@ -0,0 +1,8 @@
|
||||
name: background-processor
|
||||
type: worker
|
||||
path: workers/background-processor
|
||||
dependencies: []
|
||||
# Add dependencies as needed:
|
||||
# - postgres
|
||||
# - redis
|
||||
# - rabbitmq
|
||||
11
workers/background-processor/go.mod
Normal file
11
workers/background-processor/go.mod
Normal file
@ -0,0 +1,11 @@
|
||||
module git.threesix.ai/jordan/sp2-verify-1770320675/workers/background-processor
|
||||
|
||||
go 1.23
|
||||
|
||||
require (
|
||||
git.threesix.ai/jordan/sp2-verify-1770320675/pkg v0.0.0
|
||||
github.com/google/uuid v1.6.0
|
||||
)
|
||||
|
||||
// Use local workspace modules (for Docker builds without go.work)
|
||||
replace git.threesix.ai/jordan/sp2-verify-1770320675/pkg => ../../pkg
|
||||
0
workers/background-processor/go.sum
Normal file
0
workers/background-processor/go.sum
Normal file
66
workers/background-processor/internal/config/config.go
Normal file
66
workers/background-processor/internal/config/config.go
Normal file
@ -0,0 +1,66 @@
|
||||
// Package config provides worker-specific configuration.
|
||||
package config
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/spf13/viper"
|
||||
|
||||
"git.threesix.ai/jordan/sp2-verify-1770320675/pkg/config"
|
||||
)
|
||||
|
||||
// Config holds background-processor worker configuration.
|
||||
type Config struct {
|
||||
config.AppConfig
|
||||
Database config.DatabaseConfig
|
||||
Logging config.LoggingConfig
|
||||
Worker WorkerConfig
|
||||
}
|
||||
|
||||
// WorkerConfig holds worker-specific settings.
|
||||
type WorkerConfig struct {
|
||||
// PollInterval is how often to check for new jobs when queue is empty.
|
||||
PollInterval time.Duration
|
||||
|
||||
// BatchSize is the max number of jobs to process per poll (for batch workers).
|
||||
BatchSize int
|
||||
|
||||
// MaxRetries is the default maximum retry attempts for failed jobs.
|
||||
MaxRetries int
|
||||
|
||||
// StaleJobTimeout is how long a job can run before being considered stale.
|
||||
// Jobs running longer than this without heartbeat will be requeued.
|
||||
StaleJobTimeout time.Duration
|
||||
|
||||
// JobTimeout is the maximum time a single job handler can run.
|
||||
JobTimeout time.Duration
|
||||
}
|
||||
|
||||
// Load reads configuration from environment variables.
|
||||
func Load() (*Config, error) {
|
||||
if err := config.Init(config.Options{
|
||||
AppName: "background-processor",
|
||||
SetDefaults: func() {
|
||||
viper.SetDefault("WORKER_POLL_INTERVAL", "10s")
|
||||
viper.SetDefault("WORKER_BATCH_SIZE", 10)
|
||||
viper.SetDefault("WORKER_MAX_RETRIES", 3)
|
||||
viper.SetDefault("WORKER_STALE_JOB_TIMEOUT", "5m")
|
||||
viper.SetDefault("WORKER_JOB_TIMEOUT", "5m")
|
||||
},
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Config{
|
||||
AppConfig: config.ReadAppConfig(),
|
||||
Database: config.ReadDatabaseConfig(),
|
||||
Logging: config.ReadLoggingConfig(),
|
||||
Worker: WorkerConfig{
|
||||
PollInterval: viper.GetDuration("WORKER_POLL_INTERVAL"),
|
||||
BatchSize: viper.GetInt("WORKER_BATCH_SIZE"),
|
||||
MaxRetries: viper.GetInt("WORKER_MAX_RETRIES"),
|
||||
StaleJobTimeout: viper.GetDuration("WORKER_STALE_JOB_TIMEOUT"),
|
||||
JobTimeout: viper.GetDuration("WORKER_JOB_TIMEOUT"),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
147
workers/background-processor/internal/handlers/handler.go
Normal file
147
workers/background-processor/internal/handlers/handler.go
Normal file
@ -0,0 +1,147 @@
|
||||
// Package handlers provides the worker's job processing logic.
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
|
||||
"git.threesix.ai/jordan/sp2-verify-1770320675/pkg/logging"
|
||||
"git.threesix.ai/jordan/sp2-verify-1770320675/pkg/queue"
|
||||
)
|
||||
|
||||
// Config holds handler configuration.
|
||||
type Config struct {
|
||||
// PollInterval is how often to check for new jobs when queue is empty.
|
||||
PollInterval time.Duration
|
||||
|
||||
// StaleJobTimeout is how long a job can run before being considered stale.
|
||||
StaleJobTimeout time.Duration
|
||||
|
||||
// JobTimeout is the maximum time a job handler can run.
|
||||
JobTimeout time.Duration
|
||||
}
|
||||
|
||||
// Handler processes background jobs from the queue.
|
||||
type Handler struct {
|
||||
logger *logging.Logger
|
||||
queue queue.Consumer
|
||||
handlers map[string]queue.Handler
|
||||
config Config
|
||||
workerID string
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
// New creates a new Handler.
|
||||
func New(logger *logging.Logger, q queue.Consumer, cfg Config) *Handler {
|
||||
// Apply defaults
|
||||
if cfg.PollInterval == 0 {
|
||||
cfg.PollInterval = 10 * time.Second
|
||||
}
|
||||
if cfg.StaleJobTimeout == 0 {
|
||||
cfg.StaleJobTimeout = 5 * time.Minute
|
||||
}
|
||||
if cfg.JobTimeout == 0 {
|
||||
cfg.JobTimeout = 5 * time.Minute
|
||||
}
|
||||
|
||||
return &Handler{
|
||||
logger: logger.WithComponent("handler"),
|
||||
queue: q,
|
||||
handlers: make(map[string]queue.Handler),
|
||||
config: cfg,
|
||||
workerID: uuid.New().String(),
|
||||
}
|
||||
}
|
||||
|
||||
// RegisterHandler registers a handler for a specific job type.
|
||||
// Call this before Run() to set up job processing.
|
||||
func (h *Handler) RegisterHandler(jobType string, handler queue.Handler) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
h.handlers[jobType] = handler
|
||||
h.logger.Info("registered job handler", "type", jobType)
|
||||
}
|
||||
|
||||
// Run starts the worker loop and processes jobs until context is cancelled.
|
||||
func (h *Handler) Run(ctx context.Context) {
|
||||
h.logger.Info("worker loop started", "worker_id", h.workerID)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
h.logger.Info("worker loop stopping", "worker_id", h.workerID)
|
||||
return
|
||||
default:
|
||||
if err := h.processNextJob(ctx); err != nil {
|
||||
if errors.Is(err, queue.ErrNoJob) {
|
||||
// Queue is empty, wait before polling again
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(h.config.PollInterval):
|
||||
continue
|
||||
}
|
||||
}
|
||||
// Log error and continue
|
||||
h.logger.Error("error processing job", "error", err)
|
||||
time.Sleep(time.Second) // Brief pause on error
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// processNextJob dequeues and processes a single job.
|
||||
func (h *Handler) processNextJob(ctx context.Context) error {
|
||||
job, err := h.queue.Dequeue(ctx, h.workerID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Get handler for job type
|
||||
h.mu.RLock()
|
||||
handler, ok := h.handlers[job.Type]
|
||||
h.mu.RUnlock()
|
||||
|
||||
if !ok {
|
||||
h.logger.Error("no handler for job type", "job_id", job.ID, "type", job.Type)
|
||||
return h.queue.Fail(ctx, job.ID, fmt.Sprintf("unknown job type: %s", job.Type))
|
||||
}
|
||||
|
||||
// Apply middleware and process (TimeoutMiddleware handles the deadline)
|
||||
wrappedHandler := queue.Chain(
|
||||
queue.RecoveryMiddleware(h.logger),
|
||||
queue.LoggingMiddleware(h.logger),
|
||||
queue.TimeoutMiddleware(h.config.JobTimeout),
|
||||
)(handler)
|
||||
|
||||
// Use parent context - TimeoutMiddleware applies the job timeout
|
||||
jobCtx := ctx
|
||||
_ = jobCtx // jobCtx used below
|
||||
|
||||
if err := wrappedHandler(jobCtx, job); err != nil {
|
||||
// Truncate error message to prevent log bloat and potential data leakage
|
||||
errMsg := truncateErrorMessage(err.Error(), 1000)
|
||||
h.logger.Debug("job handler failed", "job_id", job.ID, "error", errMsg)
|
||||
return h.queue.Fail(ctx, job.ID, errMsg)
|
||||
}
|
||||
|
||||
return h.queue.Ack(ctx, job.ID)
|
||||
}
|
||||
|
||||
// WorkerID returns this handler's unique worker identifier.
|
||||
func (h *Handler) WorkerID() string {
|
||||
return h.workerID
|
||||
}
|
||||
|
||||
// truncateErrorMessage limits error message length to prevent log bloat.
|
||||
func truncateErrorMessage(msg string, maxLen int) string {
|
||||
if len(msg) <= maxLen {
|
||||
return msg
|
||||
}
|
||||
return msg[:maxLen-3] + "..."
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user