rdev/internal/worker/queue_maintenance_test.go
jordan 53862c773b fix: resolve systemic debt in worker and skeleton templates
Worker template fixes:
- Replace panic() with logger.Error() + os.Exit(1) for config errors
- Remove double-timeout application (context + middleware)
- Add error message truncation to prevent log bloat
- Use named constants for shutdown grace period and stale check interval

Skeleton pkg/auth fixes:
- Fix error wrapping to use %w consistently in jwt.go
- Add GetUserOrError() as safe alternative to MustGetUser() panic

Skeleton pkg/queue fixes:
- Check RowsAffected() errors instead of ignoring them
- Add input validation to EnqueueWithOptions (require job type, cap retries)
- Add log truncation for error messages
- Fix inaccurate doc comment claiming exponential backoff

Worker timeout consolidation:
- Add internal/worker/timeouts.go with named constants
- Migrate all workers to use timeout constants

Cleanup:
- Remove obsolete slack-preparation-thoughts.md files

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 23:44:55 -07:00

314 lines
7.8 KiB
Go

package worker
import (
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/port"
)
// mockMaintenanceQueue implements port.WorkQueue for maintenance tests.
type mockMaintenanceQueue struct {
mu sync.Mutex
requeueCalls int
cleanupCalls int
statsCalls int
requeueCount int64
cleanupCount int64
stats *domain.WorkQueueStats
err error
}
func newMockMaintenanceQueue() *mockMaintenanceQueue {
return &mockMaintenanceQueue{
stats: &domain.WorkQueueStats{
Pending: 5,
Running: 2,
Completed: 100,
Failed: 3,
Cancelled: 1,
},
}
}
func (m *mockMaintenanceQueue) Enqueue(_ context.Context, _ *domain.WorkTask) (string, error) {
return "", nil
}
func (m *mockMaintenanceQueue) Dequeue(_ context.Context, _ string) (*domain.WorkTask, error) {
return nil, nil
}
func (m *mockMaintenanceQueue) Complete(_ context.Context, _ string, _ *domain.WorkResult) error {
return nil
}
func (m *mockMaintenanceQueue) Fail(_ context.Context, _ string, _ string) error {
return nil
}
func (m *mockMaintenanceQueue) FailWithCode(_ context.Context, _ string, _ string, _ domain.WorkErrorCode) error {
return nil
}
func (m *mockMaintenanceQueue) Cancel(_ context.Context, _ string) error {
return nil
}
func (m *mockMaintenanceQueue) GetTask(_ context.Context, _ string) (*domain.WorkTask, error) {
return nil, nil
}
func (m *mockMaintenanceQueue) ListByProject(_ context.Context, _ string, _ *domain.WorkTaskStatus, _ domain.WorkListOptions) (*domain.WorkListResult, error) {
return nil, nil
}
func (m *mockMaintenanceQueue) GetStats(_ context.Context) (*domain.WorkQueueStats, error) {
m.mu.Lock()
defer m.mu.Unlock()
m.statsCalls++
if m.err != nil {
return nil, m.err
}
return m.stats, nil
}
func (m *mockMaintenanceQueue) CleanupOld(_ context.Context, _ time.Duration) (int64, error) {
m.mu.Lock()
defer m.mu.Unlock()
m.cleanupCalls++
if m.err != nil {
return 0, m.err
}
return m.cleanupCount, nil
}
func (m *mockMaintenanceQueue) RequeueStale(_ context.Context, _ time.Duration) (int64, error) {
m.mu.Lock()
defer m.mu.Unlock()
m.requeueCalls++
if m.err != nil {
return 0, m.err
}
return m.requeueCount, nil
}
func (m *mockMaintenanceQueue) RequeueStaleWithIDs(_ context.Context, _ time.Duration) ([]string, error) {
m.mu.Lock()
defer m.mu.Unlock()
m.requeueCalls++
if m.err != nil {
return nil, m.err
}
// Generate mock IDs based on requeueCount
var ids []string
for i := int64(0); i < m.requeueCount; i++ {
ids = append(ids, fmt.Sprintf("task-%d", i+1))
}
return ids, nil
}
// mockMaintenanceRegistry implements port.WorkerRegistry for maintenance tests.
type mockMaintenanceRegistry struct {
mu sync.Mutex
markStaleCalls int
markStaleCount int
workers []*domain.Worker
err error
}
func newMockMaintenanceRegistry() *mockMaintenanceRegistry {
return &mockMaintenanceRegistry{
workers: []*domain.Worker{
{
ID: "worker-1",
Status: domain.WorkerStatusIdle,
LastHeartbeat: time.Now(),
},
{
ID: "worker-2",
Status: domain.WorkerStatusBusy,
LastHeartbeat: time.Now().Add(-5 * time.Minute),
},
},
}
}
func (m *mockMaintenanceRegistry) Register(_ context.Context, _ *domain.Worker) error {
return nil
}
func (m *mockMaintenanceRegistry) Heartbeat(_ context.Context, _ string) error {
return nil
}
func (m *mockMaintenanceRegistry) UpdateStatus(_ context.Context, _ string, _ domain.WorkerStatus, _ string) error {
return nil
}
func (m *mockMaintenanceRegistry) Deregister(_ context.Context, _ string) error {
return nil
}
func (m *mockMaintenanceRegistry) Get(_ context.Context, _ string) (*domain.Worker, error) {
return nil, nil
}
func (m *mockMaintenanceRegistry) List(_ context.Context, _ port.WorkerFilter) ([]*domain.Worker, error) {
m.mu.Lock()
defer m.mu.Unlock()
if m.err != nil {
return nil, m.err
}
return m.workers, nil
}
func (m *mockMaintenanceRegistry) MarkStaleOffline(_ context.Context, _ time.Duration) (int, error) {
m.mu.Lock()
defer m.mu.Unlock()
m.markStaleCalls++
if m.err != nil {
return 0, m.err
}
return m.markStaleCount, nil
}
func TestQueueMaintenance_DefaultConfig(t *testing.T) {
cfg := DefaultQueueMaintenanceConfig()
if cfg.StaleTaskTimeout != 30*time.Minute {
t.Errorf("got StaleTaskTimeout=%v, want 30m", cfg.StaleTaskTimeout)
}
if cfg.StaleWorkerTimeout != 2*time.Minute {
t.Errorf("got StaleWorkerTimeout=%v, want 2m", cfg.StaleWorkerTimeout)
}
if cfg.CleanupAge != 7*24*time.Hour {
t.Errorf("got CleanupAge=%v, want 7d", cfg.CleanupAge)
}
if cfg.MaintenancePeriod != 1*time.Minute {
t.Errorf("got MaintenancePeriod=%v, want 1m", cfg.MaintenancePeriod)
}
if cfg.MetricsPeriod != 15*time.Second {
t.Errorf("got MetricsPeriod=%v, want 15s", cfg.MetricsPeriod)
}
}
func TestQueueMaintenance_RunMaintenance(t *testing.T) {
queue := newMockMaintenanceQueue()
queue.requeueCount = 2
queue.cleanupCount = 5
registry := newMockMaintenanceRegistry()
registry.markStaleCount = 1
cfg := &QueueMaintenanceConfig{
StaleTaskTimeout: 30 * time.Minute,
StaleWorkerTimeout: 2 * time.Minute,
CleanupAge: 7 * 24 * time.Hour,
MaintenancePeriod: 1 * time.Hour, // won't fire in test
MetricsPeriod: 1 * time.Hour, // won't fire in test
}
m := NewQueueMaintenance(queue, registry, cfg)
// Run maintenance directly
m.runMaintenance()
queue.mu.Lock()
defer queue.mu.Unlock()
registry.mu.Lock()
defer registry.mu.Unlock()
if queue.requeueCalls != 1 {
t.Errorf("got requeueCalls=%d, want 1", queue.requeueCalls)
}
if queue.cleanupCalls != 1 {
t.Errorf("got cleanupCalls=%d, want 1", queue.cleanupCalls)
}
if registry.markStaleCalls != 1 {
t.Errorf("got markStaleCalls=%d, want 1", registry.markStaleCalls)
}
}
func TestQueueMaintenance_RefreshMetrics(t *testing.T) {
queue := newMockMaintenanceQueue()
registry := newMockMaintenanceRegistry()
cfg := &QueueMaintenanceConfig{
StaleTaskTimeout: 30 * time.Minute,
StaleWorkerTimeout: 2 * time.Minute,
CleanupAge: 7 * 24 * time.Hour,
MaintenancePeriod: 1 * time.Hour,
MetricsPeriod: 1 * time.Hour,
}
m := NewQueueMaintenance(queue, registry, cfg)
// Run metrics refresh directly
m.refreshMetrics()
queue.mu.Lock()
if queue.statsCalls != 1 {
t.Errorf("got statsCalls=%d, want 1", queue.statsCalls)
}
queue.mu.Unlock()
}
func TestQueueMaintenance_StartStop(t *testing.T) {
queue := newMockMaintenanceQueue()
registry := newMockMaintenanceRegistry()
cfg := &QueueMaintenanceConfig{
StaleTaskTimeout: 30 * time.Minute,
StaleWorkerTimeout: 2 * time.Minute,
CleanupAge: 7 * 24 * time.Hour,
MaintenancePeriod: 50 * time.Millisecond,
MetricsPeriod: 50 * time.Millisecond,
}
m := NewQueueMaintenance(queue, registry, cfg)
m.Start()
// Poll until maintenance has run at least once (runs immediately on start)
deadline := time.After(2 * time.Second)
for {
queue.mu.Lock()
rCalls := queue.requeueCalls
sCalls := queue.statsCalls
queue.mu.Unlock()
registry.mu.Lock()
mCalls := registry.markStaleCalls
registry.mu.Unlock()
if rCalls >= 1 && sCalls >= 1 && mCalls >= 1 {
break
}
select {
case <-deadline:
t.Fatalf("timed out waiting for maintenance to run: requeue=%d stats=%d markStale=%d", rCalls, sCalls, mCalls)
default:
time.Sleep(10 * time.Millisecond)
}
}
m.Stop()
}
func TestQueueMaintenance_NilConfig(t *testing.T) {
queue := newMockMaintenanceQueue()
registry := newMockMaintenanceRegistry()
m := NewQueueMaintenance(queue, registry, nil)
if m.staleTaskTimeout != 30*time.Minute {
t.Errorf("expected default stale task timeout, got %v", m.staleTaskTimeout)
}
if m.metricsPeriod != 15*time.Second {
t.Errorf("expected default metrics period, got %v", m.metricsPeriod)
}
}