rdev/internal/worker/queue_maintenance.go
jordan 53862c773b fix: resolve systemic debt in worker and skeleton templates
Worker template fixes:
- Replace panic() with logger.Error() + os.Exit(1) for config errors
- Remove double-timeout application (context + middleware)
- Add error message truncation to prevent log bloat
- Use named constants for shutdown grace period and stale check interval

Skeleton pkg/auth fixes:
- Fix error wrapping to use %w consistently in jwt.go
- Add GetUserOrError() as safe alternative to MustGetUser() panic

Skeleton pkg/queue fixes:
- Check RowsAffected() errors instead of ignoring them
- Add input validation to EnqueueWithOptions (require job type, cap retries)
- Add log truncation for error messages
- Fix inaccurate doc comment claiming exponential backoff

Worker timeout consolidation:
- Add internal/worker/timeouts.go with named constants
- Migrate all workers to use timeout constants

Cleanup:
- Remove obsolete slack-preparation-thoughts.md files

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 23:44:55 -07:00

271 lines
7.5 KiB
Go

package worker
import (
"context"
"sync"
"time"
"github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/logging"
"github.com/orchard9/rdev/internal/metrics"
"github.com/orchard9/rdev/internal/port"
)
// QueueMaintenance runs periodic maintenance tasks on the work queue
// and worker registry: stale task recovery, stale worker marking,
// old task cleanup, and queue metrics refresh.
type QueueMaintenance struct {
queue port.WorkQueue
registry port.WorkerRegistry
buildAudit port.BuildAudit // Optional: syncs build audit when requeuing stale tasks
// Intervals
staleTaskTimeout time.Duration
staleWorkerTimeout time.Duration
cleanupAge time.Duration
maintenancePeriod time.Duration
metricsPeriod time.Duration
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// QueueMaintenanceConfig holds configuration for queue maintenance.
type QueueMaintenanceConfig struct {
// StaleTaskTimeout is how long a running task can be silent before requeue.
// Default: 30 minutes.
StaleTaskTimeout time.Duration
// StaleWorkerTimeout is how long without heartbeat before marking offline.
// Default: 2 minutes.
StaleWorkerTimeout time.Duration
// CleanupAge is the minimum age for completed/failed/cancelled tasks to be cleaned up.
// Default: 7 days.
CleanupAge time.Duration
// MaintenancePeriod is how often to run maintenance tasks.
// Default: 1 minute.
MaintenancePeriod time.Duration
// MetricsPeriod is how often to refresh queue depth metrics.
// Default: 15 seconds.
MetricsPeriod time.Duration
// BuildAudit syncs build audit status when requeuing stale tasks.
// If nil, build audit is not updated (legacy behavior).
BuildAudit port.BuildAudit
}
// DefaultQueueMaintenanceConfig returns sensible defaults.
func DefaultQueueMaintenanceConfig() *QueueMaintenanceConfig {
return &QueueMaintenanceConfig{
StaleTaskTimeout: 30 * time.Minute,
StaleWorkerTimeout: 2 * time.Minute,
CleanupAge: 7 * 24 * time.Hour,
MaintenancePeriod: 1 * time.Minute,
MetricsPeriod: 15 * time.Second,
}
}
// NewQueueMaintenance creates a new queue maintenance worker.
func NewQueueMaintenance(
queue port.WorkQueue,
registry port.WorkerRegistry,
cfg *QueueMaintenanceConfig,
) *QueueMaintenance {
if cfg == nil {
cfg = DefaultQueueMaintenanceConfig()
}
ctx, cancel := context.WithCancel(context.Background())
return &QueueMaintenance{
queue: queue,
registry: registry,
buildAudit: cfg.BuildAudit,
staleTaskTimeout: cfg.StaleTaskTimeout,
staleWorkerTimeout: cfg.StaleWorkerTimeout,
cleanupAge: cfg.CleanupAge,
maintenancePeriod: cfg.MaintenancePeriod,
metricsPeriod: cfg.MetricsPeriod,
ctx: ctx,
cancel: cancel,
}
}
// Start begins the maintenance and metrics loops.
func (m *QueueMaintenance) Start() {
log := logging.FromContext(m.ctx).WithWorker("queue-maintenance")
log.Info("queue maintenance started",
"maintenance_period", m.maintenancePeriod,
"metrics_period", m.metricsPeriod,
"stale_task_timeout", m.staleTaskTimeout,
"stale_worker_timeout", m.staleWorkerTimeout,
"cleanup_age", m.cleanupAge,
)
m.wg.Add(2)
go m.maintenanceLoop()
go m.metricsLoop()
}
// Stop gracefully shuts down the maintenance worker.
func (m *QueueMaintenance) Stop() {
log := logging.FromContext(m.ctx).WithWorker("queue-maintenance")
log.Info("queue maintenance stopping")
m.cancel()
m.wg.Wait()
log.Info("queue maintenance stopped")
}
// maintenanceLoop runs periodic maintenance: stale recovery, worker health, cleanup.
func (m *QueueMaintenance) maintenanceLoop() {
defer m.wg.Done()
// Run immediately on start
m.runMaintenance()
ticker := time.NewTicker(m.maintenancePeriod)
defer ticker.Stop()
for {
select {
case <-m.ctx.Done():
return
case <-ticker.C:
m.runMaintenance()
}
}
}
// metricsLoop refreshes queue depth metrics on a faster cadence.
func (m *QueueMaintenance) metricsLoop() {
defer m.wg.Done()
// Run immediately on start
m.refreshMetrics()
ticker := time.NewTicker(m.metricsPeriod)
defer ticker.Stop()
for {
select {
case <-m.ctx.Done():
return
case <-ticker.C:
m.refreshMetrics()
}
}
}
// runMaintenance executes all maintenance tasks.
func (m *QueueMaintenance) runMaintenance() {
ctx, cancel := context.WithTimeout(m.ctx, TimeoutMaintenance)
defer cancel()
m.requeueStaleTasks(ctx)
m.markStaleWorkers(ctx)
m.cleanupOldTasks(ctx)
}
// requeueStaleTasks requeues tasks that have been running too long
// (the worker likely crashed without reporting).
// Also syncs build audit to pending status if configured.
func (m *QueueMaintenance) requeueStaleTasks(ctx context.Context) {
log := logging.FromContext(ctx).WithWorker("queue-maintenance")
// Use RequeueStaleWithIDs to get task IDs for build audit sync
taskIDs, err := m.queue.RequeueStaleWithIDs(ctx, m.staleTaskTimeout)
if err != nil {
log.Warn("failed to requeue stale tasks", logging.FieldError, err)
return
}
if len(taskIDs) == 0 {
return
}
log.Info("requeued stale tasks", "count", len(taskIDs), "task_ids", taskIDs)
// Sync build audit status if configured
if m.buildAudit != nil {
for _, taskID := range taskIDs {
// Update build audit to pending (worker assignment cleared)
if err := m.buildAudit.UpdateStatus(ctx, taskID, domain.BuildStatusPending, ""); err != nil {
log.Warn("failed to sync build audit for requeued task",
"task_id", taskID,
logging.FieldError, err,
)
}
}
}
}
// markStaleWorkers marks workers without recent heartbeats as offline.
func (m *QueueMaintenance) markStaleWorkers(ctx context.Context) {
log := logging.FromContext(ctx).WithWorker("queue-maintenance")
count, err := m.registry.MarkStaleOffline(ctx, m.staleWorkerTimeout)
if err != nil {
log.Warn("failed to mark stale workers offline", logging.FieldError, err)
return
}
if count > 0 {
log.Info("marked stale workers offline", "count", count)
}
}
// cleanupOldTasks removes completed/failed/cancelled tasks older than cleanup age.
func (m *QueueMaintenance) cleanupOldTasks(ctx context.Context) {
log := logging.FromContext(ctx).WithWorker("queue-maintenance")
count, err := m.queue.CleanupOld(ctx, m.cleanupAge)
if err != nil {
log.Warn("failed to cleanup old tasks", logging.FieldError, err)
return
}
if count > 0 {
log.Info("cleaned up old tasks", "count", count)
}
}
// refreshMetrics fetches queue stats and updates Prometheus gauges.
func (m *QueueMaintenance) refreshMetrics() {
ctx, cancel := context.WithTimeout(m.ctx, TimeoutQuickOp)
defer cancel()
log := logging.FromContext(ctx).WithWorker("queue-maintenance")
stats, err := m.queue.GetStats(ctx)
if err != nil {
log.Warn("failed to get queue stats for metrics", logging.FieldError, err)
return
}
metrics.SetWorkQueueDepth("pending", stats.Pending)
metrics.SetWorkQueueDepth("running", stats.Running)
metrics.SetWorkQueueDepth("completed", stats.Completed)
metrics.SetWorkQueueDepth("failed", stats.Failed)
metrics.SetWorkQueueDepth("cancelled", stats.Cancelled)
// Worker counts
workers, err := m.registry.List(ctx, port.WorkerFilter{})
if err != nil {
log.Warn("failed to list workers for metrics", logging.FieldError, err)
return
}
counts := map[string]int{
"idle": 0, "busy": 0, "draining": 0, "offline": 0,
}
for _, w := range workers {
counts[string(w.Status)]++
age := time.Since(w.LastHeartbeat).Seconds()
metrics.RecordWorkerHeartbeat(w.ID, age)
}
for status, count := range counts {
metrics.SetWorkerCount(status, count)
}
}