// Package main is the entry point for the background-processor worker. package main import ( "context" "embed" "os" "os/signal" "syscall" "time" "git.threesix.ai/jordan/slack-q-1770281596/pkg/config" "git.threesix.ai/jordan/slack-q-1770281596/pkg/database" "git.threesix.ai/jordan/slack-q-1770281596/pkg/logging" "git.threesix.ai/jordan/slack-q-1770281596/pkg/queue" "git.threesix.ai/jordan/slack-q-1770281596/workers/background-processor/internal/handlers" workerconfig "git.threesix.ai/jordan/slack-q-1770281596/workers/background-processor/internal/config" ) //go:embed migrations/*.sql var migrationsFS embed.FS func main() { // Initialize logger first (with defaults) so we can log config errors logger := logging.New(logging.Config{ Level: logging.LevelInfo, Format: logging.FormatJSON, }).WithService("background-processor") // Initialize configuration cfg, err := workerconfig.Load() if err != nil { logger.Error("failed to load config", "error", err) os.Exit(1) } // Reconfigure logger with loaded config logger = logging.New(logging.Config{ Level: logging.ParseLevel(cfg.Logging.Level), Format: logging.ParseFormat(cfg.Logging.Format), Environment: cfg.AppConfig.Environment, AddSource: cfg.AppConfig.IsDevelopment(), }).WithService("background-processor") logger.Info("starting background-processor worker") // Setup graceful shutdown ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Connect to database pool, err := database.Connect(ctx, cfg.Database.URL, database.Options{ MaxOpenConns: cfg.Database.MaxOpenConns, MaxIdleConns: cfg.Database.MaxIdleConns, ConnMaxLifetime: cfg.Database.ConnMaxLifetime, }) if err != nil { logger.Error("failed to connect to database", "error", err) os.Exit(1) } defer pool.Close() logger.Info("connected to database", "url", pool.URL) // Run migrations database.MustRunMigrations(ctx, pool, migrationsFS, "migrations") logger.Info("migrations complete") // Initialize queue jobQueue := queue.NewPostgresQueue(pool.DB, logger) // Initialize and start handler handler := handlers.New(logger, jobQueue, handlers.Config{ PollInterval: cfg.Worker.PollInterval, StaleJobTimeout: cfg.Worker.StaleJobTimeout, JobTimeout: cfg.Worker.JobTimeout, }) // Register job handlers // TODO: Register your job handlers here // handler.RegisterHandler("send_email", emailHandler) // handler.RegisterHandler("process_image", imageHandler) // Setup signal handling sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) // Start worker in goroutine go handler.Run(ctx) // Start stale job recovery in goroutine go runStaleJobRecovery(ctx, jobQueue, cfg.Worker.StaleJobTimeout, logger) // Wait for shutdown signal sig := <-sigCh logger.Info("received shutdown signal", "signal", sig.String()) // Trigger graceful shutdown with grace period logger.Info("initiating graceful shutdown") cancel() // Give in-flight jobs time to complete (grace period) // This allows handlers to notice context cancellation and finish cleanly. const shutdownGracePeriod = 5 * time.Second time.Sleep(shutdownGracePeriod) logger.Info("background-processor worker stopped") } // runStaleJobRecovery periodically requeues jobs that have been running too long. func runStaleJobRecovery(ctx context.Context, q *queue.PostgresQueue, timeout time.Duration, logger *logging.Logger) { const staleCheckInterval = time.Minute ticker := time.NewTicker(staleCheckInterval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: count, err := q.RequeueStale(ctx, timeout) if err != nil { logger.Error("failed to requeue stale jobs", "error", err) } else if count > 0 { logger.Info("requeued stale jobs", "count", count) } } } }