sp4-verify-1770325799/workers/worker-svc/cmd/worker/main.go
rdev-worker 36d73dd23d
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
build: /implement-feature mesh-interop --requirements 'Chat Service must cal...
2026-02-05 21:40:58 +00:00

169 lines
4.9 KiB
Go

// Package main is the entry point for the worker-svc worker.
package main
import (
"context"
"embed"
"os"
"os/signal"
"syscall"
"time"
"github.com/redis/go-redis/v9"
"git.threesix.ai/jordan/sp4-verify-1770325799/pkg/database"
"git.threesix.ai/jordan/sp4-verify-1770325799/pkg/logging"
"git.threesix.ai/jordan/sp4-verify-1770325799/pkg/queue"
"git.threesix.ai/jordan/sp4-verify-1770325799/workers/worker-svc/internal/config"
"git.threesix.ai/jordan/sp4-verify-1770325799/workers/worker-svc/internal/handlers"
)
//go:embed migrations/*.sql
var migrationsFS embed.FS
func main() {
// Initialize logger first (with defaults) so we can log config errors
logger := logging.New(logging.Config{
Level: logging.LevelInfo,
Format: logging.FormatJSON,
}).WithService("worker-svc")
// Initialize configuration
cfg, err := config.Load()
if err != nil {
logger.Error("failed to load config", "error", err)
os.Exit(1)
}
// Reconfigure logger with loaded config
logger = logging.New(logging.Config{
Level: logging.ParseLevel(cfg.Logging.Level),
Format: logging.ParseFormat(cfg.Logging.Format),
Environment: cfg.AppConfig.Environment,
AddSource: cfg.AppConfig.IsDevelopment(),
}).WithService("worker-svc")
logger.Info("starting worker-svc worker")
// Setup graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Determine queue backend based on configuration
var jobQueue queue.Queue
var redisClient *redis.Client
if cfg.Redis.URL != "" {
// Use Redis queue
opts, err := redis.ParseURL(cfg.Redis.URL)
if err != nil {
logger.Error("failed to parse REDIS_URL", "error", err)
os.Exit(1)
}
redisClient = redis.NewClient(opts)
if err := redisClient.Ping(ctx).Err(); err != nil {
logger.Error("failed to connect to Redis", "error", err)
os.Exit(1)
}
defer redisClient.Close()
jobQueue = queue.NewRedisQueue(redisClient, logger)
logger.Info("using Redis queue", "url", cfg.Redis.URL)
} else {
// Fall back to PostgreSQL queue
pool, err := database.Connect(ctx, cfg.Database.URL, database.Options{
MaxOpenConns: cfg.Database.MaxOpenConns,
MaxIdleConns: cfg.Database.MaxIdleConns,
ConnMaxLifetime: cfg.Database.ConnMaxLifetime,
})
if err != nil {
logger.Error("failed to connect to database", "error", err)
os.Exit(1)
}
defer pool.Close()
logger.Info("connected to database", "url", pool.URL)
// Run migrations
database.MustRunMigrations(ctx, pool, migrationsFS, "migrations")
logger.Info("migrations complete")
jobQueue = queue.NewPostgresQueue(pool.DB, logger)
logger.Info("using PostgreSQL queue")
}
// Initialize and start handler
handler := handlers.New(logger, jobQueue, handlers.Config{
PollInterval: cfg.Worker.PollInterval,
StaleJobTimeout: cfg.Worker.StaleJobTimeout,
JobTimeout: cfg.Worker.JobTimeout,
})
// Initialize task handlers
taskHandlers := handlers.NewTaskHandlers(logger)
// Register job handlers for tasks pushed by chat-svc and other services
handler.RegisterHandler("process_chat_message", taskHandlers.ProcessChatMessage)
handler.RegisterHandler("send_notification", taskHandlers.SendNotification)
handler.RegisterHandler("sync_data", taskHandlers.SyncData)
handler.RegisterHandler("process_webhook", taskHandlers.ProcessWebhook)
// Setup signal handling
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
// Start worker in goroutine
go handler.Run(ctx)
// Start stale job recovery in goroutine
go runStaleJobRecovery(ctx, jobQueue, cfg.Worker.StaleJobTimeout, logger)
// Wait for shutdown signal
sig := <-sigCh
logger.Info("received shutdown signal", "signal", sig.String())
// Trigger graceful shutdown with grace period
logger.Info("initiating graceful shutdown")
cancel()
// Give in-flight jobs time to complete (grace period)
// This allows handlers to notice context cancellation and finish cleanly.
const shutdownGracePeriod = 5 * time.Second
time.Sleep(shutdownGracePeriod)
logger.Info("worker-svc worker stopped")
}
// StaleJobRequeuer is an interface for queues that support stale job recovery.
type StaleJobRequeuer interface {
RequeueStale(ctx context.Context, timeout time.Duration) (int64, error)
}
// runStaleJobRecovery periodically requeues jobs that have been running too long.
func runStaleJobRecovery(ctx context.Context, q queue.Queue, timeout time.Duration, logger *logging.Logger) {
const staleCheckInterval = time.Minute
ticker := time.NewTicker(staleCheckInterval)
defer ticker.Stop()
// Check if queue supports stale job recovery
requeuer, ok := q.(StaleJobRequeuer)
if !ok {
logger.Warn("queue does not support stale job recovery")
return
}
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
count, err := requeuer.RequeueStale(ctx, timeout)
if err != nil {
logger.Error("failed to requeue stale jobs", "error", err)
} else if count > 0 {
logger.Info("requeued stale jobs", "count", count)
}
}
}
}