sp4-fresh/workers/worker-svc/cmd/worker/main.go
jordan 2d62a0683b
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
Add components: service/auth-svc, service/chat-svc, worker/worker-svc
2026-02-06 23:25:39 +00:00

128 lines
3.6 KiB
Go

// Package main is the entry point for the worker-svc worker.
package main
import (
"context"
"embed"
"os"
"os/signal"
"syscall"
"time"
"git.threesix.ai/jordan/sp4-fresh/pkg/database"
"git.threesix.ai/jordan/sp4-fresh/pkg/logging"
"git.threesix.ai/jordan/sp4-fresh/pkg/queue"
"git.threesix.ai/jordan/sp4-fresh/workers/worker-svc/internal/config"
"git.threesix.ai/jordan/sp4-fresh/workers/worker-svc/internal/handlers"
)
//go:embed migrations/*.sql
var migrationsFS embed.FS
func main() {
// Initialize logger first (with defaults) so we can log config errors
logger := logging.New(logging.Config{
Level: logging.LevelInfo,
Format: logging.FormatJSON,
}).WithService("worker-svc")
// Initialize configuration
cfg, err := config.Load()
if err != nil {
logger.Error("failed to load config", "error", err)
os.Exit(1)
}
// Reconfigure logger with loaded config
logger = logging.New(logging.Config{
Level: logging.ParseLevel(cfg.Logging.Level),
Format: logging.ParseFormat(cfg.Logging.Format),
Environment: cfg.AppConfig.Environment,
AddSource: cfg.AppConfig.IsDevelopment(),
}).WithService("worker-svc")
logger.Info("starting worker-svc worker")
// Setup graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Connect to database
pool, err := database.Connect(ctx, cfg.Database.URL, database.Options{
MaxOpenConns: cfg.Database.MaxOpenConns,
MaxIdleConns: cfg.Database.MaxIdleConns,
ConnMaxLifetime: cfg.Database.ConnMaxLifetime,
})
if err != nil {
logger.Error("failed to connect to database", "error", err)
os.Exit(1)
}
defer pool.Close()
logger.Info("connected to database", "url", pool.URL)
// Run migrations
database.MustRunMigrations(ctx, pool, migrationsFS, "migrations")
logger.Info("migrations complete")
// Initialize queue
jobQueue := queue.NewPostgresQueue(pool.DB, logger)
// Initialize and start handler
handler := handlers.New(logger, jobQueue, handlers.Config{
PollInterval: cfg.Worker.PollInterval,
StaleJobTimeout: cfg.Worker.StaleJobTimeout,
JobTimeout: cfg.Worker.JobTimeout,
})
// Register job handlers
// TODO: Register your job handlers here
// handler.RegisterHandler("send_email", emailHandler)
// handler.RegisterHandler("process_image", imageHandler)
// Setup signal handling
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
// Start worker in goroutine
go handler.Run(ctx)
// Start stale job recovery in goroutine
go runStaleJobRecovery(ctx, jobQueue, cfg.Worker.StaleJobTimeout, logger)
// Wait for shutdown signal
sig := <-sigCh
logger.Info("received shutdown signal", "signal", sig.String())
// Trigger graceful shutdown with grace period
logger.Info("initiating graceful shutdown")
cancel()
// Give in-flight jobs time to complete (grace period)
// This allows handlers to notice context cancellation and finish cleanly.
const shutdownGracePeriod = 5 * time.Second
time.Sleep(shutdownGracePeriod)
logger.Info("worker-svc worker stopped")
}
// runStaleJobRecovery periodically requeues jobs that have been running too long.
func runStaleJobRecovery(ctx context.Context, q *queue.PostgresQueue, timeout time.Duration, logger *logging.Logger) {
const staleCheckInterval = time.Minute
ticker := time.NewTicker(staleCheckInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
count, err := q.RequeueStale(ctx, timeout)
if err != nil {
logger.Error("failed to requeue stale jobs", "error", err)
} else if count > 0 {
logger.Info("requeued stale jobs", "count", count)
}
}
}
}