- Add ListPipelines/GetPipeline to CIProvider port with Woodpecker adapter
- Add DNS alias endpoints: GET/POST/DELETE /projects/{id}/domains
- Implement worker executor daemon, build executor, and git operations
- Add build service, worker service, and build audit tracking
- Add worker registry with PostgreSQL adapter and migration
- Add multi-provider code agent interface (Claude Code + OpenCode)
- Add create-and-build combo endpoint
- Update landing-page cookbook to reflect all gaps closed
- Fix tech debt: unified validation, auth scopes, error wrapping, slog patterns
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
242 lines
6.3 KiB
Go
242 lines
6.3 KiB
Go
package worker
|
|
|
|
import (
|
|
"context"
|
|
"log/slog"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/orchard9/rdev/internal/metrics"
|
|
"github.com/orchard9/rdev/internal/port"
|
|
)
|
|
|
|
// QueueMaintenance runs periodic maintenance tasks on the work queue
|
|
// and worker registry: stale task recovery, stale worker marking,
|
|
// old task cleanup, and queue metrics refresh.
|
|
type QueueMaintenance struct {
|
|
queue port.WorkQueue
|
|
registry port.WorkerRegistry
|
|
logger *slog.Logger
|
|
|
|
// Intervals
|
|
staleTaskTimeout time.Duration
|
|
staleWorkerTimeout time.Duration
|
|
cleanupAge time.Duration
|
|
maintenancePeriod time.Duration
|
|
metricsPeriod time.Duration
|
|
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
// QueueMaintenanceConfig holds configuration for queue maintenance.
|
|
type QueueMaintenanceConfig struct {
|
|
// StaleTaskTimeout is how long a running task can be silent before requeue.
|
|
// Default: 30 minutes.
|
|
StaleTaskTimeout time.Duration
|
|
|
|
// StaleWorkerTimeout is how long without heartbeat before marking offline.
|
|
// Default: 2 minutes.
|
|
StaleWorkerTimeout time.Duration
|
|
|
|
// CleanupAge is the minimum age for completed/failed/cancelled tasks to be cleaned up.
|
|
// Default: 7 days.
|
|
CleanupAge time.Duration
|
|
|
|
// MaintenancePeriod is how often to run maintenance tasks.
|
|
// Default: 1 minute.
|
|
MaintenancePeriod time.Duration
|
|
|
|
// MetricsPeriod is how often to refresh queue depth metrics.
|
|
// Default: 15 seconds.
|
|
MetricsPeriod time.Duration
|
|
|
|
Logger *slog.Logger
|
|
}
|
|
|
|
// DefaultQueueMaintenanceConfig returns sensible defaults.
|
|
func DefaultQueueMaintenanceConfig() *QueueMaintenanceConfig {
|
|
return &QueueMaintenanceConfig{
|
|
StaleTaskTimeout: 30 * time.Minute,
|
|
StaleWorkerTimeout: 2 * time.Minute,
|
|
CleanupAge: 7 * 24 * time.Hour,
|
|
MaintenancePeriod: 1 * time.Minute,
|
|
MetricsPeriod: 15 * time.Second,
|
|
Logger: slog.Default(),
|
|
}
|
|
}
|
|
|
|
// NewQueueMaintenance creates a new queue maintenance worker.
|
|
func NewQueueMaintenance(
|
|
queue port.WorkQueue,
|
|
registry port.WorkerRegistry,
|
|
cfg *QueueMaintenanceConfig,
|
|
) *QueueMaintenance {
|
|
if cfg == nil {
|
|
cfg = DefaultQueueMaintenanceConfig()
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
return &QueueMaintenance{
|
|
queue: queue,
|
|
registry: registry,
|
|
logger: cfg.Logger.With("component", "queue-maintenance"),
|
|
staleTaskTimeout: cfg.StaleTaskTimeout,
|
|
staleWorkerTimeout: cfg.StaleWorkerTimeout,
|
|
cleanupAge: cfg.CleanupAge,
|
|
maintenancePeriod: cfg.MaintenancePeriod,
|
|
metricsPeriod: cfg.MetricsPeriod,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
}
|
|
}
|
|
|
|
// Start begins the maintenance and metrics loops.
|
|
func (m *QueueMaintenance) Start() {
|
|
m.logger.Info("queue maintenance started",
|
|
"maintenance_period", m.maintenancePeriod,
|
|
"metrics_period", m.metricsPeriod,
|
|
"stale_task_timeout", m.staleTaskTimeout,
|
|
"stale_worker_timeout", m.staleWorkerTimeout,
|
|
"cleanup_age", m.cleanupAge,
|
|
)
|
|
|
|
m.wg.Add(2)
|
|
go m.maintenanceLoop()
|
|
go m.metricsLoop()
|
|
}
|
|
|
|
// Stop gracefully shuts down the maintenance worker.
|
|
func (m *QueueMaintenance) Stop() {
|
|
m.logger.Info("queue maintenance stopping")
|
|
m.cancel()
|
|
m.wg.Wait()
|
|
m.logger.Info("queue maintenance stopped")
|
|
}
|
|
|
|
// maintenanceLoop runs periodic maintenance: stale recovery, worker health, cleanup.
|
|
func (m *QueueMaintenance) maintenanceLoop() {
|
|
defer m.wg.Done()
|
|
|
|
// Run immediately on start
|
|
m.runMaintenance()
|
|
|
|
ticker := time.NewTicker(m.maintenancePeriod)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-m.ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
m.runMaintenance()
|
|
}
|
|
}
|
|
}
|
|
|
|
// metricsLoop refreshes queue depth metrics on a faster cadence.
|
|
func (m *QueueMaintenance) metricsLoop() {
|
|
defer m.wg.Done()
|
|
|
|
// Run immediately on start
|
|
m.refreshMetrics()
|
|
|
|
ticker := time.NewTicker(m.metricsPeriod)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-m.ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
m.refreshMetrics()
|
|
}
|
|
}
|
|
}
|
|
|
|
// runMaintenance executes all maintenance tasks.
|
|
func (m *QueueMaintenance) runMaintenance() {
|
|
ctx, cancel := context.WithTimeout(m.ctx, 30*time.Second)
|
|
defer cancel()
|
|
|
|
m.requeueStaleTasks(ctx)
|
|
m.markStaleWorkers(ctx)
|
|
m.cleanupOldTasks(ctx)
|
|
}
|
|
|
|
// requeueStaleTasks requeues tasks that have been running too long
|
|
// (the worker likely crashed without reporting).
|
|
func (m *QueueMaintenance) requeueStaleTasks(ctx context.Context) {
|
|
count, err := m.queue.RequeueStale(ctx, m.staleTaskTimeout)
|
|
if err != nil {
|
|
m.logger.Warn("failed to requeue stale tasks", "error", err)
|
|
return
|
|
}
|
|
if count > 0 {
|
|
m.logger.Info("requeued stale tasks", "count", count)
|
|
}
|
|
}
|
|
|
|
// markStaleWorkers marks workers without recent heartbeats as offline.
|
|
func (m *QueueMaintenance) markStaleWorkers(ctx context.Context) {
|
|
count, err := m.registry.MarkStaleOffline(ctx, m.staleWorkerTimeout)
|
|
if err != nil {
|
|
m.logger.Warn("failed to mark stale workers offline", "error", err)
|
|
return
|
|
}
|
|
if count > 0 {
|
|
m.logger.Info("marked stale workers offline", "count", count)
|
|
}
|
|
}
|
|
|
|
// cleanupOldTasks removes completed/failed/cancelled tasks older than cleanup age.
|
|
func (m *QueueMaintenance) cleanupOldTasks(ctx context.Context) {
|
|
count, err := m.queue.CleanupOld(ctx, m.cleanupAge)
|
|
if err != nil {
|
|
m.logger.Warn("failed to cleanup old tasks", "error", err)
|
|
return
|
|
}
|
|
if count > 0 {
|
|
m.logger.Info("cleaned up old tasks", "count", count)
|
|
}
|
|
}
|
|
|
|
// refreshMetrics fetches queue stats and updates Prometheus gauges.
|
|
func (m *QueueMaintenance) refreshMetrics() {
|
|
ctx, cancel := context.WithTimeout(m.ctx, 5*time.Second)
|
|
defer cancel()
|
|
|
|
stats, err := m.queue.GetStats(ctx)
|
|
if err != nil {
|
|
m.logger.Warn("failed to get queue stats for metrics", "error", err)
|
|
return
|
|
}
|
|
|
|
metrics.SetWorkQueueDepth("pending", stats.Pending)
|
|
metrics.SetWorkQueueDepth("running", stats.Running)
|
|
metrics.SetWorkQueueDepth("completed", stats.Completed)
|
|
metrics.SetWorkQueueDepth("failed", stats.Failed)
|
|
metrics.SetWorkQueueDepth("cancelled", stats.Cancelled)
|
|
|
|
// Worker counts
|
|
workers, err := m.registry.List(ctx, port.WorkerFilter{})
|
|
if err != nil {
|
|
m.logger.Warn("failed to list workers for metrics", "error", err)
|
|
return
|
|
}
|
|
|
|
counts := map[string]int{
|
|
"idle": 0, "busy": 0, "draining": 0, "offline": 0,
|
|
}
|
|
for _, w := range workers {
|
|
counts[string(w.Status)]++
|
|
age := time.Since(w.LastHeartbeat).Seconds()
|
|
metrics.RecordWorkerHeartbeat(w.ID, age)
|
|
}
|
|
for status, count := range counts {
|
|
metrics.SetWorkerCount(status, count)
|
|
}
|
|
}
|