rdev/internal/worker/queue_maintenance.go
jordan d69da6d627 feat: add structured logging infrastructure and SDLC extensions
Major changes:
- Add internal/logging package with field constants, context propagation,
  sensitive data auto-redaction, and per-component log levels
- Add worker timeout constants (TimeoutQuickOp, TimeoutHealthCheck, etc.)
- Extend SDLC with callback handlers, generate endpoints, and executor
- Add new cookbook trees for aeries and slackpath progression
- Add skeleton templates for queue, realtime, and microservices
- Add worker component template with async job processing
- Refactor services and handlers to use new logging infrastructure
- Split component.go into component_infra.go and component_listing.go

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 22:56:04 -07:00

266 lines
7.2 KiB
Go

package worker
import (
"context"
"log/slog"
"sync"
"time"
"github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/metrics"
"github.com/orchard9/rdev/internal/port"
)
// QueueMaintenance runs periodic maintenance tasks on the work queue
// and worker registry: stale task recovery, stale worker marking,
// old task cleanup, and queue metrics refresh.
type QueueMaintenance struct {
queue port.WorkQueue
registry port.WorkerRegistry
buildAudit port.BuildAudit // Optional: syncs build audit when requeuing stale tasks
logger *slog.Logger
// Intervals
staleTaskTimeout time.Duration
staleWorkerTimeout time.Duration
cleanupAge time.Duration
maintenancePeriod time.Duration
metricsPeriod time.Duration
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// QueueMaintenanceConfig holds configuration for queue maintenance.
type QueueMaintenanceConfig struct {
// StaleTaskTimeout is how long a running task can be silent before requeue.
// Default: 30 minutes.
StaleTaskTimeout time.Duration
// StaleWorkerTimeout is how long without heartbeat before marking offline.
// Default: 2 minutes.
StaleWorkerTimeout time.Duration
// CleanupAge is the minimum age for completed/failed/cancelled tasks to be cleaned up.
// Default: 7 days.
CleanupAge time.Duration
// MaintenancePeriod is how often to run maintenance tasks.
// Default: 1 minute.
MaintenancePeriod time.Duration
// MetricsPeriod is how often to refresh queue depth metrics.
// Default: 15 seconds.
MetricsPeriod time.Duration
// BuildAudit syncs build audit status when requeuing stale tasks.
// If nil, build audit is not updated (legacy behavior).
BuildAudit port.BuildAudit
Logger *slog.Logger
}
// DefaultQueueMaintenanceConfig returns sensible defaults.
func DefaultQueueMaintenanceConfig() *QueueMaintenanceConfig {
return &QueueMaintenanceConfig{
StaleTaskTimeout: 30 * time.Minute,
StaleWorkerTimeout: 2 * time.Minute,
CleanupAge: 7 * 24 * time.Hour,
MaintenancePeriod: 1 * time.Minute,
MetricsPeriod: 15 * time.Second,
Logger: slog.Default(),
}
}
// NewQueueMaintenance creates a new queue maintenance worker.
func NewQueueMaintenance(
queue port.WorkQueue,
registry port.WorkerRegistry,
cfg *QueueMaintenanceConfig,
) *QueueMaintenance {
if cfg == nil {
cfg = DefaultQueueMaintenanceConfig()
}
ctx, cancel := context.WithCancel(context.Background())
return &QueueMaintenance{
queue: queue,
registry: registry,
buildAudit: cfg.BuildAudit,
logger: cfg.Logger.With("component", "queue-maintenance"),
staleTaskTimeout: cfg.StaleTaskTimeout,
staleWorkerTimeout: cfg.StaleWorkerTimeout,
cleanupAge: cfg.CleanupAge,
maintenancePeriod: cfg.MaintenancePeriod,
metricsPeriod: cfg.MetricsPeriod,
ctx: ctx,
cancel: cancel,
}
}
// Start begins the maintenance and metrics loops.
func (m *QueueMaintenance) Start() {
m.logger.Info("queue maintenance started",
"maintenance_period", m.maintenancePeriod,
"metrics_period", m.metricsPeriod,
"stale_task_timeout", m.staleTaskTimeout,
"stale_worker_timeout", m.staleWorkerTimeout,
"cleanup_age", m.cleanupAge,
)
m.wg.Add(2)
go m.maintenanceLoop()
go m.metricsLoop()
}
// Stop gracefully shuts down the maintenance worker.
func (m *QueueMaintenance) Stop() {
m.logger.Info("queue maintenance stopping")
m.cancel()
m.wg.Wait()
m.logger.Info("queue maintenance stopped")
}
// maintenanceLoop runs periodic maintenance: stale recovery, worker health, cleanup.
func (m *QueueMaintenance) maintenanceLoop() {
defer m.wg.Done()
// Run immediately on start
m.runMaintenance()
ticker := time.NewTicker(m.maintenancePeriod)
defer ticker.Stop()
for {
select {
case <-m.ctx.Done():
return
case <-ticker.C:
m.runMaintenance()
}
}
}
// metricsLoop refreshes queue depth metrics on a faster cadence.
func (m *QueueMaintenance) metricsLoop() {
defer m.wg.Done()
// Run immediately on start
m.refreshMetrics()
ticker := time.NewTicker(m.metricsPeriod)
defer ticker.Stop()
for {
select {
case <-m.ctx.Done():
return
case <-ticker.C:
m.refreshMetrics()
}
}
}
// runMaintenance executes all maintenance tasks.
func (m *QueueMaintenance) runMaintenance() {
ctx, cancel := context.WithTimeout(m.ctx, TimeoutMaintenance)
defer cancel()
m.requeueStaleTasks(ctx)
m.markStaleWorkers(ctx)
m.cleanupOldTasks(ctx)
}
// requeueStaleTasks requeues tasks that have been running too long
// (the worker likely crashed without reporting).
// Also syncs build audit to pending status if configured.
func (m *QueueMaintenance) requeueStaleTasks(ctx context.Context) {
// Use RequeueStaleWithIDs to get task IDs for build audit sync
taskIDs, err := m.queue.RequeueStaleWithIDs(ctx, m.staleTaskTimeout)
if err != nil {
m.logger.Warn("failed to requeue stale tasks", "error", err)
return
}
if len(taskIDs) == 0 {
return
}
m.logger.Info("requeued stale tasks", "count", len(taskIDs), "task_ids", taskIDs)
// Sync build audit status if configured
if m.buildAudit != nil {
for _, taskID := range taskIDs {
// Update build audit to pending (worker assignment cleared)
if err := m.buildAudit.UpdateStatus(ctx, taskID, domain.BuildStatusPending, ""); err != nil {
m.logger.Warn("failed to sync build audit for requeued task",
"task_id", taskID,
"error", err,
)
}
}
}
}
// markStaleWorkers marks workers without recent heartbeats as offline.
func (m *QueueMaintenance) markStaleWorkers(ctx context.Context) {
count, err := m.registry.MarkStaleOffline(ctx, m.staleWorkerTimeout)
if err != nil {
m.logger.Warn("failed to mark stale workers offline", "error", err)
return
}
if count > 0 {
m.logger.Info("marked stale workers offline", "count", count)
}
}
// cleanupOldTasks removes completed/failed/cancelled tasks older than cleanup age.
func (m *QueueMaintenance) cleanupOldTasks(ctx context.Context) {
count, err := m.queue.CleanupOld(ctx, m.cleanupAge)
if err != nil {
m.logger.Warn("failed to cleanup old tasks", "error", err)
return
}
if count > 0 {
m.logger.Info("cleaned up old tasks", "count", count)
}
}
// refreshMetrics fetches queue stats and updates Prometheus gauges.
func (m *QueueMaintenance) refreshMetrics() {
ctx, cancel := context.WithTimeout(m.ctx, TimeoutQuickOp)
defer cancel()
stats, err := m.queue.GetStats(ctx)
if err != nil {
m.logger.Warn("failed to get queue stats for metrics", "error", err)
return
}
metrics.SetWorkQueueDepth("pending", stats.Pending)
metrics.SetWorkQueueDepth("running", stats.Running)
metrics.SetWorkQueueDepth("completed", stats.Completed)
metrics.SetWorkQueueDepth("failed", stats.Failed)
metrics.SetWorkQueueDepth("cancelled", stats.Cancelled)
// Worker counts
workers, err := m.registry.List(ctx, port.WorkerFilter{})
if err != nil {
m.logger.Warn("failed to list workers for metrics", "error", err)
return
}
counts := map[string]int{
"idle": 0, "busy": 0, "draining": 0, "offline": 0,
}
for _, w := range workers {
counts[string(w.Status)]++
age := time.Since(w.LastHeartbeat).Seconds()
metrics.RecordWorkerHeartbeat(w.ID, age)
}
for status, count := range counts {
metrics.SetWorkerCount(status, count)
}
}