package worker import ( "context" "sync" "time" "github.com/orchard9/rdev/internal/domain" "github.com/orchard9/rdev/internal/logging" "github.com/orchard9/rdev/internal/metrics" "github.com/orchard9/rdev/internal/port" ) // QueueMaintenance runs periodic maintenance tasks on the work queue // and worker registry: stale task recovery, stale worker marking, // old task cleanup, and queue metrics refresh. type QueueMaintenance struct { queue port.WorkQueue registry port.WorkerRegistry buildAudit port.BuildAudit // Optional: syncs build audit when requeuing stale tasks // Intervals staleTaskTimeout time.Duration staleWorkerTimeout time.Duration cleanupAge time.Duration maintenancePeriod time.Duration metricsPeriod time.Duration ctx context.Context cancel context.CancelFunc wg sync.WaitGroup } // QueueMaintenanceConfig holds configuration for queue maintenance. type QueueMaintenanceConfig struct { // StaleTaskTimeout is how long a running task can be silent before requeue. // Default: 30 minutes. StaleTaskTimeout time.Duration // StaleWorkerTimeout is how long without heartbeat before marking offline. // Default: 2 minutes. StaleWorkerTimeout time.Duration // CleanupAge is the minimum age for completed/failed/cancelled tasks to be cleaned up. // Default: 7 days. CleanupAge time.Duration // MaintenancePeriod is how often to run maintenance tasks. // Default: 1 minute. MaintenancePeriod time.Duration // MetricsPeriod is how often to refresh queue depth metrics. // Default: 15 seconds. MetricsPeriod time.Duration // BuildAudit syncs build audit status when requeuing stale tasks. // If nil, build audit is not updated (legacy behavior). BuildAudit port.BuildAudit } // DefaultQueueMaintenanceConfig returns sensible defaults. func DefaultQueueMaintenanceConfig() *QueueMaintenanceConfig { return &QueueMaintenanceConfig{ StaleTaskTimeout: 30 * time.Minute, StaleWorkerTimeout: 2 * time.Minute, CleanupAge: 7 * 24 * time.Hour, MaintenancePeriod: 1 * time.Minute, MetricsPeriod: 15 * time.Second, } } // NewQueueMaintenance creates a new queue maintenance worker. func NewQueueMaintenance( queue port.WorkQueue, registry port.WorkerRegistry, cfg *QueueMaintenanceConfig, ) *QueueMaintenance { if cfg == nil { cfg = DefaultQueueMaintenanceConfig() } ctx, cancel := context.WithCancel(context.Background()) return &QueueMaintenance{ queue: queue, registry: registry, buildAudit: cfg.BuildAudit, staleTaskTimeout: cfg.StaleTaskTimeout, staleWorkerTimeout: cfg.StaleWorkerTimeout, cleanupAge: cfg.CleanupAge, maintenancePeriod: cfg.MaintenancePeriod, metricsPeriod: cfg.MetricsPeriod, ctx: ctx, cancel: cancel, } } // Start begins the maintenance and metrics loops. func (m *QueueMaintenance) Start() { log := logging.FromContext(m.ctx).WithWorker("queue-maintenance") log.Info("queue maintenance started", "maintenance_period", m.maintenancePeriod, "metrics_period", m.metricsPeriod, "stale_task_timeout", m.staleTaskTimeout, "stale_worker_timeout", m.staleWorkerTimeout, "cleanup_age", m.cleanupAge, ) m.wg.Add(2) go m.maintenanceLoop() go m.metricsLoop() } // Stop gracefully shuts down the maintenance worker. func (m *QueueMaintenance) Stop() { log := logging.FromContext(m.ctx).WithWorker("queue-maintenance") log.Info("queue maintenance stopping") m.cancel() m.wg.Wait() log.Info("queue maintenance stopped") } // maintenanceLoop runs periodic maintenance: stale recovery, worker health, cleanup. func (m *QueueMaintenance) maintenanceLoop() { defer m.wg.Done() // Run immediately on start m.runMaintenance() ticker := time.NewTicker(m.maintenancePeriod) defer ticker.Stop() for { select { case <-m.ctx.Done(): return case <-ticker.C: m.runMaintenance() } } } // metricsLoop refreshes queue depth metrics on a faster cadence. func (m *QueueMaintenance) metricsLoop() { defer m.wg.Done() // Run immediately on start m.refreshMetrics() ticker := time.NewTicker(m.metricsPeriod) defer ticker.Stop() for { select { case <-m.ctx.Done(): return case <-ticker.C: m.refreshMetrics() } } } // runMaintenance executes all maintenance tasks. func (m *QueueMaintenance) runMaintenance() { ctx, cancel := context.WithTimeout(m.ctx, TimeoutMaintenance) defer cancel() m.requeueStaleTasks(ctx) m.markStaleWorkers(ctx) m.cleanupOldTasks(ctx) } // requeueStaleTasks requeues tasks that have been running too long // (the worker likely crashed without reporting). // Also syncs build audit to pending status if configured. func (m *QueueMaintenance) requeueStaleTasks(ctx context.Context) { log := logging.FromContext(ctx).WithWorker("queue-maintenance") // Use RequeueStaleWithIDs to get task IDs for build audit sync taskIDs, err := m.queue.RequeueStaleWithIDs(ctx, m.staleTaskTimeout) if err != nil { log.Warn("failed to requeue stale tasks", logging.FieldError, err) return } if len(taskIDs) == 0 { return } log.Info("requeued stale tasks", "count", len(taskIDs), "task_ids", taskIDs) // Sync build audit status if configured if m.buildAudit != nil { for _, taskID := range taskIDs { // Update build audit to pending (worker assignment cleared) if err := m.buildAudit.UpdateStatus(ctx, taskID, domain.BuildStatusPending, ""); err != nil { log.Warn("failed to sync build audit for requeued task", "task_id", taskID, logging.FieldError, err, ) } } } } // markStaleWorkers marks workers without recent heartbeats as offline. func (m *QueueMaintenance) markStaleWorkers(ctx context.Context) { log := logging.FromContext(ctx).WithWorker("queue-maintenance") count, err := m.registry.MarkStaleOffline(ctx, m.staleWorkerTimeout) if err != nil { log.Warn("failed to mark stale workers offline", logging.FieldError, err) return } if count > 0 { log.Info("marked stale workers offline", "count", count) } } // cleanupOldTasks removes completed/failed/cancelled tasks older than cleanup age. func (m *QueueMaintenance) cleanupOldTasks(ctx context.Context) { log := logging.FromContext(ctx).WithWorker("queue-maintenance") count, err := m.queue.CleanupOld(ctx, m.cleanupAge) if err != nil { log.Warn("failed to cleanup old tasks", logging.FieldError, err) return } if count > 0 { log.Info("cleaned up old tasks", "count", count) } } // refreshMetrics fetches queue stats and updates Prometheus gauges. func (m *QueueMaintenance) refreshMetrics() { ctx, cancel := context.WithTimeout(m.ctx, TimeoutQuickOp) defer cancel() log := logging.FromContext(ctx).WithWorker("queue-maintenance") stats, err := m.queue.GetStats(ctx) if err != nil { log.Warn("failed to get queue stats for metrics", logging.FieldError, err) return } metrics.SetWorkQueueDepth("pending", stats.Pending) metrics.SetWorkQueueDepth("running", stats.Running) metrics.SetWorkQueueDepth("completed", stats.Completed) metrics.SetWorkQueueDepth("failed", stats.Failed) metrics.SetWorkQueueDepth("cancelled", stats.Cancelled) // Worker counts workers, err := m.registry.List(ctx, port.WorkerFilter{}) if err != nil { log.Warn("failed to list workers for metrics", logging.FieldError, err) return } counts := map[string]int{ "idle": 0, "busy": 0, "draining": 0, "offline": 0, } for _, w := range workers { counts[string(w.Status)]++ age := time.Since(w.LastHeartbeat).Seconds() metrics.RecordWorkerHeartbeat(w.ID, age) } for status, count := range counts { metrics.SetWorkerCount(status, count) } }