package worker import ( "context" "fmt" "log/slog" "sync" "testing" "time" "github.com/orchard9/rdev/internal/domain" "github.com/orchard9/rdev/internal/port" ) // mockMaintenanceQueue implements port.WorkQueue for maintenance tests. type mockMaintenanceQueue struct { mu sync.Mutex requeueCalls int cleanupCalls int statsCalls int requeueCount int64 cleanupCount int64 stats *domain.WorkQueueStats err error } func newMockMaintenanceQueue() *mockMaintenanceQueue { return &mockMaintenanceQueue{ stats: &domain.WorkQueueStats{ Pending: 5, Running: 2, Completed: 100, Failed: 3, Cancelled: 1, }, } } func (m *mockMaintenanceQueue) Enqueue(_ context.Context, _ *domain.WorkTask) (string, error) { return "", nil } func (m *mockMaintenanceQueue) Dequeue(_ context.Context, _ string) (*domain.WorkTask, error) { return nil, nil } func (m *mockMaintenanceQueue) Complete(_ context.Context, _ string, _ *domain.WorkResult) error { return nil } func (m *mockMaintenanceQueue) Fail(_ context.Context, _ string, _ string) error { return nil } func (m *mockMaintenanceQueue) Cancel(_ context.Context, _ string) error { return nil } func (m *mockMaintenanceQueue) GetTask(_ context.Context, _ string) (*domain.WorkTask, error) { return nil, nil } func (m *mockMaintenanceQueue) ListByProject(_ context.Context, _ string, _ *domain.WorkTaskStatus, _ domain.WorkListOptions) (*domain.WorkListResult, error) { return nil, nil } func (m *mockMaintenanceQueue) GetStats(_ context.Context) (*domain.WorkQueueStats, error) { m.mu.Lock() defer m.mu.Unlock() m.statsCalls++ if m.err != nil { return nil, m.err } return m.stats, nil } func (m *mockMaintenanceQueue) CleanupOld(_ context.Context, _ time.Duration) (int64, error) { m.mu.Lock() defer m.mu.Unlock() m.cleanupCalls++ if m.err != nil { return 0, m.err } return m.cleanupCount, nil } func (m *mockMaintenanceQueue) RequeueStale(_ context.Context, _ time.Duration) (int64, error) { m.mu.Lock() defer m.mu.Unlock() m.requeueCalls++ if m.err != nil { return 0, m.err } return m.requeueCount, nil } func (m *mockMaintenanceQueue) RequeueStaleWithIDs(_ context.Context, _ time.Duration) ([]string, error) { m.mu.Lock() defer m.mu.Unlock() m.requeueCalls++ if m.err != nil { return nil, m.err } // Generate mock IDs based on requeueCount var ids []string for i := int64(0); i < m.requeueCount; i++ { ids = append(ids, fmt.Sprintf("task-%d", i+1)) } return ids, nil } // mockMaintenanceRegistry implements port.WorkerRegistry for maintenance tests. type mockMaintenanceRegistry struct { mu sync.Mutex markStaleCalls int markStaleCount int workers []*domain.Worker err error } func newMockMaintenanceRegistry() *mockMaintenanceRegistry { return &mockMaintenanceRegistry{ workers: []*domain.Worker{ { ID: "worker-1", Status: domain.WorkerStatusIdle, LastHeartbeat: time.Now(), }, { ID: "worker-2", Status: domain.WorkerStatusBusy, LastHeartbeat: time.Now().Add(-5 * time.Minute), }, }, } } func (m *mockMaintenanceRegistry) Register(_ context.Context, _ *domain.Worker) error { return nil } func (m *mockMaintenanceRegistry) Heartbeat(_ context.Context, _ string) error { return nil } func (m *mockMaintenanceRegistry) UpdateStatus(_ context.Context, _ string, _ domain.WorkerStatus, _ string) error { return nil } func (m *mockMaintenanceRegistry) Deregister(_ context.Context, _ string) error { return nil } func (m *mockMaintenanceRegistry) Get(_ context.Context, _ string) (*domain.Worker, error) { return nil, nil } func (m *mockMaintenanceRegistry) List(_ context.Context, _ port.WorkerFilter) ([]*domain.Worker, error) { m.mu.Lock() defer m.mu.Unlock() if m.err != nil { return nil, m.err } return m.workers, nil } func (m *mockMaintenanceRegistry) MarkStaleOffline(_ context.Context, _ time.Duration) (int, error) { m.mu.Lock() defer m.mu.Unlock() m.markStaleCalls++ if m.err != nil { return 0, m.err } return m.markStaleCount, nil } func TestQueueMaintenance_DefaultConfig(t *testing.T) { cfg := DefaultQueueMaintenanceConfig() if cfg.StaleTaskTimeout != 30*time.Minute { t.Errorf("got StaleTaskTimeout=%v, want 30m", cfg.StaleTaskTimeout) } if cfg.StaleWorkerTimeout != 2*time.Minute { t.Errorf("got StaleWorkerTimeout=%v, want 2m", cfg.StaleWorkerTimeout) } if cfg.CleanupAge != 7*24*time.Hour { t.Errorf("got CleanupAge=%v, want 7d", cfg.CleanupAge) } if cfg.MaintenancePeriod != 1*time.Minute { t.Errorf("got MaintenancePeriod=%v, want 1m", cfg.MaintenancePeriod) } if cfg.MetricsPeriod != 15*time.Second { t.Errorf("got MetricsPeriod=%v, want 15s", cfg.MetricsPeriod) } } func TestQueueMaintenance_RunMaintenance(t *testing.T) { queue := newMockMaintenanceQueue() queue.requeueCount = 2 queue.cleanupCount = 5 registry := newMockMaintenanceRegistry() registry.markStaleCount = 1 cfg := &QueueMaintenanceConfig{ StaleTaskTimeout: 30 * time.Minute, StaleWorkerTimeout: 2 * time.Minute, CleanupAge: 7 * 24 * time.Hour, MaintenancePeriod: 1 * time.Hour, // won't fire in test MetricsPeriod: 1 * time.Hour, // won't fire in test Logger: slog.Default(), } m := NewQueueMaintenance(queue, registry, cfg) // Run maintenance directly m.runMaintenance() queue.mu.Lock() defer queue.mu.Unlock() registry.mu.Lock() defer registry.mu.Unlock() if queue.requeueCalls != 1 { t.Errorf("got requeueCalls=%d, want 1", queue.requeueCalls) } if queue.cleanupCalls != 1 { t.Errorf("got cleanupCalls=%d, want 1", queue.cleanupCalls) } if registry.markStaleCalls != 1 { t.Errorf("got markStaleCalls=%d, want 1", registry.markStaleCalls) } } func TestQueueMaintenance_RefreshMetrics(t *testing.T) { queue := newMockMaintenanceQueue() registry := newMockMaintenanceRegistry() cfg := &QueueMaintenanceConfig{ StaleTaskTimeout: 30 * time.Minute, StaleWorkerTimeout: 2 * time.Minute, CleanupAge: 7 * 24 * time.Hour, MaintenancePeriod: 1 * time.Hour, MetricsPeriod: 1 * time.Hour, Logger: slog.Default(), } m := NewQueueMaintenance(queue, registry, cfg) // Run metrics refresh directly m.refreshMetrics() queue.mu.Lock() if queue.statsCalls != 1 { t.Errorf("got statsCalls=%d, want 1", queue.statsCalls) } queue.mu.Unlock() } func TestQueueMaintenance_StartStop(t *testing.T) { queue := newMockMaintenanceQueue() registry := newMockMaintenanceRegistry() cfg := &QueueMaintenanceConfig{ StaleTaskTimeout: 30 * time.Minute, StaleWorkerTimeout: 2 * time.Minute, CleanupAge: 7 * 24 * time.Hour, MaintenancePeriod: 50 * time.Millisecond, MetricsPeriod: 50 * time.Millisecond, Logger: slog.Default(), } m := NewQueueMaintenance(queue, registry, cfg) m.Start() // Poll until maintenance has run at least once (runs immediately on start) deadline := time.After(2 * time.Second) for { queue.mu.Lock() rCalls := queue.requeueCalls sCalls := queue.statsCalls queue.mu.Unlock() registry.mu.Lock() mCalls := registry.markStaleCalls registry.mu.Unlock() if rCalls >= 1 && sCalls >= 1 && mCalls >= 1 { break } select { case <-deadline: t.Fatalf("timed out waiting for maintenance to run: requeue=%d stats=%d markStale=%d", rCalls, sCalls, mCalls) default: time.Sleep(10 * time.Millisecond) } } m.Stop() } func TestQueueMaintenance_NilConfig(t *testing.T) { queue := newMockMaintenanceQueue() registry := newMockMaintenanceRegistry() m := NewQueueMaintenance(queue, registry, nil) if m.staleTaskTimeout != 30*time.Minute { t.Errorf("expected default stale task timeout, got %v", m.staleTaskTimeout) } if m.metricsPeriod != 15*time.Second { t.Errorf("expected default metrics period, got %v", m.metricsPeriod) } }