128 lines
3.7 KiB
Go
128 lines
3.7 KiB
Go
// Package main is the entry point for the worker-svc worker.
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"embed"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
"time"
|
|
|
|
"git.threesix.ai/jordan/sp4-test-1770498663/pkg/database"
|
|
"git.threesix.ai/jordan/sp4-test-1770498663/pkg/logging"
|
|
"git.threesix.ai/jordan/sp4-test-1770498663/pkg/queue"
|
|
"git.threesix.ai/jordan/sp4-test-1770498663/workers/worker-svc/internal/config"
|
|
"git.threesix.ai/jordan/sp4-test-1770498663/workers/worker-svc/internal/handlers"
|
|
)
|
|
|
|
//go:embed migrations/*.sql
|
|
var migrationsFS embed.FS
|
|
|
|
func main() {
|
|
// Initialize logger first (with defaults) so we can log config errors
|
|
logger := logging.New(logging.Config{
|
|
Level: logging.LevelInfo,
|
|
Format: logging.FormatJSON,
|
|
}).WithService("worker-svc")
|
|
|
|
// Initialize configuration
|
|
cfg, err := config.Load()
|
|
if err != nil {
|
|
logger.Error("failed to load config", "error", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Reconfigure logger with loaded config
|
|
logger = logging.New(logging.Config{
|
|
Level: logging.ParseLevel(cfg.Logging.Level),
|
|
Format: logging.ParseFormat(cfg.Logging.Format),
|
|
Environment: cfg.AppConfig.Environment,
|
|
AddSource: cfg.AppConfig.IsDevelopment(),
|
|
}).WithService("worker-svc")
|
|
|
|
logger.Info("starting worker-svc worker")
|
|
|
|
// Setup graceful shutdown
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
// Connect to database
|
|
pool, err := database.Connect(ctx, cfg.Database.URL, database.Options{
|
|
MaxOpenConns: cfg.Database.MaxOpenConns,
|
|
MaxIdleConns: cfg.Database.MaxIdleConns,
|
|
ConnMaxLifetime: cfg.Database.ConnMaxLifetime,
|
|
})
|
|
if err != nil {
|
|
logger.Error("failed to connect to database", "error", err)
|
|
os.Exit(1)
|
|
}
|
|
defer pool.Close()
|
|
logger.Info("connected to database", "url", pool.URL)
|
|
|
|
// Run migrations
|
|
database.MustRunMigrations(ctx, pool, migrationsFS, "migrations")
|
|
logger.Info("migrations complete")
|
|
|
|
// Initialize queue
|
|
jobQueue := queue.NewPostgresQueue(pool.DB, logger)
|
|
|
|
// Initialize and start handler
|
|
handler := handlers.New(logger, jobQueue, handlers.Config{
|
|
PollInterval: cfg.Worker.PollInterval,
|
|
StaleJobTimeout: cfg.Worker.StaleJobTimeout,
|
|
JobTimeout: cfg.Worker.JobTimeout,
|
|
})
|
|
|
|
// Register job handlers
|
|
// TODO: Register your job handlers here
|
|
// handler.RegisterHandler("send_email", emailHandler)
|
|
// handler.RegisterHandler("process_image", imageHandler)
|
|
|
|
// Setup signal handling
|
|
sigCh := make(chan os.Signal, 1)
|
|
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
|
|
|
|
// Start worker in goroutine
|
|
go handler.Run(ctx)
|
|
|
|
// Start stale job recovery in goroutine
|
|
go runStaleJobRecovery(ctx, jobQueue, cfg.Worker.StaleJobTimeout, logger)
|
|
|
|
// Wait for shutdown signal
|
|
sig := <-sigCh
|
|
logger.Info("received shutdown signal", "signal", sig.String())
|
|
|
|
// Trigger graceful shutdown with grace period
|
|
logger.Info("initiating graceful shutdown")
|
|
cancel()
|
|
|
|
// Give in-flight jobs time to complete (grace period)
|
|
// This allows handlers to notice context cancellation and finish cleanly.
|
|
const shutdownGracePeriod = 5 * time.Second
|
|
time.Sleep(shutdownGracePeriod)
|
|
|
|
logger.Info("worker-svc worker stopped")
|
|
}
|
|
|
|
// runStaleJobRecovery periodically requeues jobs that have been running too long.
|
|
func runStaleJobRecovery(ctx context.Context, q *queue.PostgresQueue, timeout time.Duration, logger *logging.Logger) {
|
|
const staleCheckInterval = time.Minute
|
|
ticker := time.NewTicker(staleCheckInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
count, err := q.RequeueStale(ctx, timeout)
|
|
if err != nil {
|
|
logger.Error("failed to requeue stale jobs", "error", err)
|
|
} else if count > 0 {
|
|
logger.Info("requeued stale jobs", "count", count)
|
|
}
|
|
}
|
|
}
|
|
}
|