// Package main is the entry point for the worker-svc worker. package main import ( "context" "embed" "os" "os/signal" "syscall" "time" "github.com/redis/go-redis/v9" "git.threesix.ai/jordan/sp4-verify-1770325799/pkg/database" "git.threesix.ai/jordan/sp4-verify-1770325799/pkg/logging" "git.threesix.ai/jordan/sp4-verify-1770325799/pkg/queue" "git.threesix.ai/jordan/sp4-verify-1770325799/workers/worker-svc/internal/config" "git.threesix.ai/jordan/sp4-verify-1770325799/workers/worker-svc/internal/handlers" ) //go:embed migrations/*.sql var migrationsFS embed.FS func main() { // Initialize logger first (with defaults) so we can log config errors logger := logging.New(logging.Config{ Level: logging.LevelInfo, Format: logging.FormatJSON, }).WithService("worker-svc") // Initialize configuration cfg, err := config.Load() if err != nil { logger.Error("failed to load config", "error", err) os.Exit(1) } // Reconfigure logger with loaded config logger = logging.New(logging.Config{ Level: logging.ParseLevel(cfg.Logging.Level), Format: logging.ParseFormat(cfg.Logging.Format), Environment: cfg.AppConfig.Environment, AddSource: cfg.AppConfig.IsDevelopment(), }).WithService("worker-svc") logger.Info("starting worker-svc worker") // Setup graceful shutdown ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Determine queue backend based on configuration var jobQueue queue.Queue var redisClient *redis.Client if cfg.Redis.URL != "" { // Use Redis queue opts, err := redis.ParseURL(cfg.Redis.URL) if err != nil { logger.Error("failed to parse REDIS_URL", "error", err) os.Exit(1) } redisClient = redis.NewClient(opts) if err := redisClient.Ping(ctx).Err(); err != nil { logger.Error("failed to connect to Redis", "error", err) os.Exit(1) } defer redisClient.Close() jobQueue = queue.NewRedisQueue(redisClient, logger) logger.Info("using Redis queue", "url", cfg.Redis.URL) } else { // Fall back to PostgreSQL queue pool, err := database.Connect(ctx, cfg.Database.URL, database.Options{ MaxOpenConns: cfg.Database.MaxOpenConns, MaxIdleConns: cfg.Database.MaxIdleConns, ConnMaxLifetime: cfg.Database.ConnMaxLifetime, }) if err != nil { logger.Error("failed to connect to database", "error", err) os.Exit(1) } defer pool.Close() logger.Info("connected to database", "url", pool.URL) // Run migrations database.MustRunMigrations(ctx, pool, migrationsFS, "migrations") logger.Info("migrations complete") jobQueue = queue.NewPostgresQueue(pool.DB, logger) logger.Info("using PostgreSQL queue") } // Initialize and start handler handler := handlers.New(logger, jobQueue, handlers.Config{ PollInterval: cfg.Worker.PollInterval, StaleJobTimeout: cfg.Worker.StaleJobTimeout, JobTimeout: cfg.Worker.JobTimeout, }) // Initialize task handlers taskHandlers := handlers.NewTaskHandlers(logger) // Register job handlers for tasks pushed by chat-svc and other services handler.RegisterHandler("process_chat_message", taskHandlers.ProcessChatMessage) handler.RegisterHandler("send_notification", taskHandlers.SendNotification) handler.RegisterHandler("sync_data", taskHandlers.SyncData) handler.RegisterHandler("process_webhook", taskHandlers.ProcessWebhook) // Setup signal handling sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) // Start worker in goroutine go handler.Run(ctx) // Start stale job recovery in goroutine go runStaleJobRecovery(ctx, jobQueue, cfg.Worker.StaleJobTimeout, logger) // Wait for shutdown signal sig := <-sigCh logger.Info("received shutdown signal", "signal", sig.String()) // Trigger graceful shutdown with grace period logger.Info("initiating graceful shutdown") cancel() // Give in-flight jobs time to complete (grace period) // This allows handlers to notice context cancellation and finish cleanly. const shutdownGracePeriod = 5 * time.Second time.Sleep(shutdownGracePeriod) logger.Info("worker-svc worker stopped") } // StaleJobRequeuer is an interface for queues that support stale job recovery. type StaleJobRequeuer interface { RequeueStale(ctx context.Context, timeout time.Duration) (int64, error) } // runStaleJobRecovery periodically requeues jobs that have been running too long. func runStaleJobRecovery(ctx context.Context, q queue.Queue, timeout time.Duration, logger *logging.Logger) { const staleCheckInterval = time.Minute ticker := time.NewTicker(staleCheckInterval) defer ticker.Stop() // Check if queue supports stale job recovery requeuer, ok := q.(StaleJobRequeuer) if !ok { logger.Warn("queue does not support stale job recovery") return } for { select { case <-ctx.Done(): return case <-ticker.C: count, err := requeuer.RequeueStale(ctx, timeout) if err != nil { logger.Error("failed to requeue stale jobs", "error", err) } else if count > 0 { logger.Info("requeued stale jobs", "count", count) } } } }