rdev/internal/worker/queue_maintenance.go
jordan bc47e426b0 feat: Add CI pipeline proxy, DNS alias management, and worker executor system
- Add ListPipelines/GetPipeline to CIProvider port with Woodpecker adapter
- Add DNS alias endpoints: GET/POST/DELETE /projects/{id}/domains
- Implement worker executor daemon, build executor, and git operations
- Add build service, worker service, and build audit tracking
- Add worker registry with PostgreSQL adapter and migration
- Add multi-provider code agent interface (Claude Code + OpenCode)
- Add create-and-build combo endpoint
- Update landing-page cookbook to reflect all gaps closed
- Fix tech debt: unified validation, auth scopes, error wrapping, slog patterns

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-27 21:05:28 -07:00

242 lines
6.3 KiB
Go

package worker
import (
"context"
"log/slog"
"sync"
"time"
"github.com/orchard9/rdev/internal/metrics"
"github.com/orchard9/rdev/internal/port"
)
// QueueMaintenance runs periodic maintenance tasks on the work queue
// and worker registry: stale task recovery, stale worker marking,
// old task cleanup, and queue metrics refresh.
type QueueMaintenance struct {
queue port.WorkQueue
registry port.WorkerRegistry
logger *slog.Logger
// Intervals
staleTaskTimeout time.Duration
staleWorkerTimeout time.Duration
cleanupAge time.Duration
maintenancePeriod time.Duration
metricsPeriod time.Duration
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// QueueMaintenanceConfig holds configuration for queue maintenance.
type QueueMaintenanceConfig struct {
// StaleTaskTimeout is how long a running task can be silent before requeue.
// Default: 30 minutes.
StaleTaskTimeout time.Duration
// StaleWorkerTimeout is how long without heartbeat before marking offline.
// Default: 2 minutes.
StaleWorkerTimeout time.Duration
// CleanupAge is the minimum age for completed/failed/cancelled tasks to be cleaned up.
// Default: 7 days.
CleanupAge time.Duration
// MaintenancePeriod is how often to run maintenance tasks.
// Default: 1 minute.
MaintenancePeriod time.Duration
// MetricsPeriod is how often to refresh queue depth metrics.
// Default: 15 seconds.
MetricsPeriod time.Duration
Logger *slog.Logger
}
// DefaultQueueMaintenanceConfig returns sensible defaults.
func DefaultQueueMaintenanceConfig() *QueueMaintenanceConfig {
return &QueueMaintenanceConfig{
StaleTaskTimeout: 30 * time.Minute,
StaleWorkerTimeout: 2 * time.Minute,
CleanupAge: 7 * 24 * time.Hour,
MaintenancePeriod: 1 * time.Minute,
MetricsPeriod: 15 * time.Second,
Logger: slog.Default(),
}
}
// NewQueueMaintenance creates a new queue maintenance worker.
func NewQueueMaintenance(
queue port.WorkQueue,
registry port.WorkerRegistry,
cfg *QueueMaintenanceConfig,
) *QueueMaintenance {
if cfg == nil {
cfg = DefaultQueueMaintenanceConfig()
}
ctx, cancel := context.WithCancel(context.Background())
return &QueueMaintenance{
queue: queue,
registry: registry,
logger: cfg.Logger.With("component", "queue-maintenance"),
staleTaskTimeout: cfg.StaleTaskTimeout,
staleWorkerTimeout: cfg.StaleWorkerTimeout,
cleanupAge: cfg.CleanupAge,
maintenancePeriod: cfg.MaintenancePeriod,
metricsPeriod: cfg.MetricsPeriod,
ctx: ctx,
cancel: cancel,
}
}
// Start begins the maintenance and metrics loops.
func (m *QueueMaintenance) Start() {
m.logger.Info("queue maintenance started",
"maintenance_period", m.maintenancePeriod,
"metrics_period", m.metricsPeriod,
"stale_task_timeout", m.staleTaskTimeout,
"stale_worker_timeout", m.staleWorkerTimeout,
"cleanup_age", m.cleanupAge,
)
m.wg.Add(2)
go m.maintenanceLoop()
go m.metricsLoop()
}
// Stop gracefully shuts down the maintenance worker.
func (m *QueueMaintenance) Stop() {
m.logger.Info("queue maintenance stopping")
m.cancel()
m.wg.Wait()
m.logger.Info("queue maintenance stopped")
}
// maintenanceLoop runs periodic maintenance: stale recovery, worker health, cleanup.
func (m *QueueMaintenance) maintenanceLoop() {
defer m.wg.Done()
// Run immediately on start
m.runMaintenance()
ticker := time.NewTicker(m.maintenancePeriod)
defer ticker.Stop()
for {
select {
case <-m.ctx.Done():
return
case <-ticker.C:
m.runMaintenance()
}
}
}
// metricsLoop refreshes queue depth metrics on a faster cadence.
func (m *QueueMaintenance) metricsLoop() {
defer m.wg.Done()
// Run immediately on start
m.refreshMetrics()
ticker := time.NewTicker(m.metricsPeriod)
defer ticker.Stop()
for {
select {
case <-m.ctx.Done():
return
case <-ticker.C:
m.refreshMetrics()
}
}
}
// runMaintenance executes all maintenance tasks.
func (m *QueueMaintenance) runMaintenance() {
ctx, cancel := context.WithTimeout(m.ctx, 30*time.Second)
defer cancel()
m.requeueStaleTasks(ctx)
m.markStaleWorkers(ctx)
m.cleanupOldTasks(ctx)
}
// requeueStaleTasks requeues tasks that have been running too long
// (the worker likely crashed without reporting).
func (m *QueueMaintenance) requeueStaleTasks(ctx context.Context) {
count, err := m.queue.RequeueStale(ctx, m.staleTaskTimeout)
if err != nil {
m.logger.Warn("failed to requeue stale tasks", "error", err)
return
}
if count > 0 {
m.logger.Info("requeued stale tasks", "count", count)
}
}
// markStaleWorkers marks workers without recent heartbeats as offline.
func (m *QueueMaintenance) markStaleWorkers(ctx context.Context) {
count, err := m.registry.MarkStaleOffline(ctx, m.staleWorkerTimeout)
if err != nil {
m.logger.Warn("failed to mark stale workers offline", "error", err)
return
}
if count > 0 {
m.logger.Info("marked stale workers offline", "count", count)
}
}
// cleanupOldTasks removes completed/failed/cancelled tasks older than cleanup age.
func (m *QueueMaintenance) cleanupOldTasks(ctx context.Context) {
count, err := m.queue.CleanupOld(ctx, m.cleanupAge)
if err != nil {
m.logger.Warn("failed to cleanup old tasks", "error", err)
return
}
if count > 0 {
m.logger.Info("cleaned up old tasks", "count", count)
}
}
// refreshMetrics fetches queue stats and updates Prometheus gauges.
func (m *QueueMaintenance) refreshMetrics() {
ctx, cancel := context.WithTimeout(m.ctx, 5*time.Second)
defer cancel()
stats, err := m.queue.GetStats(ctx)
if err != nil {
m.logger.Warn("failed to get queue stats for metrics", "error", err)
return
}
metrics.SetWorkQueueDepth("pending", stats.Pending)
metrics.SetWorkQueueDepth("running", stats.Running)
metrics.SetWorkQueueDepth("completed", stats.Completed)
metrics.SetWorkQueueDepth("failed", stats.Failed)
metrics.SetWorkQueueDepth("cancelled", stats.Cancelled)
// Worker counts
workers, err := m.registry.List(ctx, port.WorkerFilter{})
if err != nil {
m.logger.Warn("failed to list workers for metrics", "error", err)
return
}
counts := map[string]int{
"idle": 0, "busy": 0, "draining": 0, "offline": 0,
}
for _, w := range workers {
counts[string(w.Status)]++
age := time.Since(w.LastHeartbeat).Seconds()
metrics.RecordWorkerHeartbeat(w.ID, age)
}
for status, count := range counts {
metrics.SetWorkerCount(status, count)
}
}