package postgres import ( "context" "database/sql" "testing" "time" "github.com/orchard9/rdev/internal/domain" "github.com/orchard9/rdev/internal/port" "github.com/orchard9/rdev/internal/testutil" ) func cleanupTestWorkers(t *testing.T, db *sql.DB) { t.Helper() _, err := db.Exec("DELETE FROM workers WHERE id LIKE 'test-%'") if err != nil { t.Logf("cleanup test workers: %v", err) } } func TestWorkerRegistryRepository_Register(t *testing.T) { db := testutil.TestDB(t) t.Cleanup(func() { cleanupTestWorkers(t, db) }) repo := NewWorkerRegistryRepository(db) ctx := context.Background() t.Run("registers new worker", func(t *testing.T) { worker := &domain.Worker{ ID: "test-worker-reg-1", Hostname: "host-1", Capabilities: []string{"build", "test"}, Version: "1.0.0", } err := repo.Register(ctx, worker) if err != nil { t.Fatalf("Register() error = %v", err) } // Verify worker was stored got, err := repo.Get(ctx, "test-worker-reg-1") if err != nil { t.Fatalf("Get() error = %v", err) } if got.Hostname != "host-1" { t.Errorf("got hostname %q, want %q", got.Hostname, "host-1") } if got.Status != domain.WorkerStatusIdle { t.Errorf("got status %q, want %q", got.Status, domain.WorkerStatusIdle) } if got.Version != "1.0.0" { t.Errorf("got version %q, want %q", got.Version, "1.0.0") } }) t.Run("re-registers existing worker", func(t *testing.T) { worker := &domain.Worker{ ID: "test-worker-reg-2", Hostname: "host-2-old", Version: "1.0.0", } if err := repo.Register(ctx, worker); err != nil { t.Fatalf("Register() error = %v", err) } // Update hostname via re-registration worker.Hostname = "host-2-new" worker.Version = "2.0.0" if err := repo.Register(ctx, worker); err != nil { t.Fatalf("Register() re-register error = %v", err) } got, err := repo.Get(ctx, "test-worker-reg-2") if err != nil { t.Fatalf("Get() error = %v", err) } if got.Hostname != "host-2-new" { t.Errorf("got hostname %q, want %q", got.Hostname, "host-2-new") } if got.Status != domain.WorkerStatusIdle { t.Errorf("got status %q, want %q (should reset on re-register)", got.Status, domain.WorkerStatusIdle) } }) } func TestWorkerRegistryRepository_Heartbeat(t *testing.T) { db := testutil.TestDB(t) t.Cleanup(func() { cleanupTestWorkers(t, db) }) repo := NewWorkerRegistryRepository(db) ctx := context.Background() t.Run("updates heartbeat", func(t *testing.T) { worker := &domain.Worker{ ID: "test-worker-hb-1", Hostname: "host-1", } if err := repo.Register(ctx, worker); err != nil { t.Fatalf("Register() error = %v", err) } // Wait a moment so heartbeat time differs time.Sleep(10 * time.Millisecond) if err := repo.Heartbeat(ctx, "test-worker-hb-1"); err != nil { t.Fatalf("Heartbeat() error = %v", err) } }) t.Run("returns error for nonexistent worker", func(t *testing.T) { err := repo.Heartbeat(ctx, "test-worker-nonexistent") if err == nil { t.Error("expected error for nonexistent worker") } }) } func TestWorkerRegistryRepository_UpdateStatus(t *testing.T) { db := testutil.TestDB(t) t.Cleanup(func() { cleanupTestWorkers(t, db) }) repo := NewWorkerRegistryRepository(db) ctx := context.Background() // Register a worker worker := &domain.Worker{ ID: "test-worker-status-1", Hostname: "host-1", } if err := repo.Register(ctx, worker); err != nil { t.Fatalf("Register() error = %v", err) } t.Run("updates to busy with task", func(t *testing.T) { err := repo.UpdateStatus(ctx, "test-worker-status-1", domain.WorkerStatusBusy, "task-123") if err != nil { t.Fatalf("UpdateStatus() error = %v", err) } got, err := repo.Get(ctx, "test-worker-status-1") if err != nil { t.Fatalf("Get() error = %v", err) } if got.Status != domain.WorkerStatusBusy { t.Errorf("got status %q, want %q", got.Status, domain.WorkerStatusBusy) } if got.CurrentTask != "task-123" { t.Errorf("got current_task %q, want %q", got.CurrentTask, "task-123") } }) t.Run("updates to idle clearing task", func(t *testing.T) { err := repo.UpdateStatus(ctx, "test-worker-status-1", domain.WorkerStatusIdle, "") if err != nil { t.Fatalf("UpdateStatus() error = %v", err) } got, err := repo.Get(ctx, "test-worker-status-1") if err != nil { t.Fatalf("Get() error = %v", err) } if got.Status != domain.WorkerStatusIdle { t.Errorf("got status %q, want %q", got.Status, domain.WorkerStatusIdle) } if got.CurrentTask != "" { t.Errorf("got current_task %q, want empty", got.CurrentTask) } }) t.Run("returns error for nonexistent worker", func(t *testing.T) { err := repo.UpdateStatus(ctx, "test-worker-nonexistent", domain.WorkerStatusBusy, "") if err == nil { t.Error("expected error for nonexistent worker") } }) } func TestWorkerRegistryRepository_Deregister(t *testing.T) { db := testutil.TestDB(t) t.Cleanup(func() { cleanupTestWorkers(t, db) }) repo := NewWorkerRegistryRepository(db) ctx := context.Background() t.Run("deregisters existing worker", func(t *testing.T) { worker := &domain.Worker{ ID: "test-worker-dereg-1", Hostname: "host-1", } if err := repo.Register(ctx, worker); err != nil { t.Fatalf("Register() error = %v", err) } err := repo.Deregister(ctx, "test-worker-dereg-1") if err != nil { t.Fatalf("Deregister() error = %v", err) } // Verify worker was removed _, err = repo.Get(ctx, "test-worker-dereg-1") if err == nil { t.Error("expected error for deregistered worker") } }) t.Run("returns error for nonexistent worker", func(t *testing.T) { err := repo.Deregister(ctx, "test-worker-nonexistent") if err == nil { t.Error("expected error for nonexistent worker") } }) } func TestWorkerRegistryRepository_List(t *testing.T) { db := testutil.TestDB(t) t.Cleanup(func() { cleanupTestWorkers(t, db) }) repo := NewWorkerRegistryRepository(db) ctx := context.Background() // Register workers workers := []*domain.Worker{ {ID: "test-worker-list-1", Hostname: "host-1"}, {ID: "test-worker-list-2", Hostname: "host-2"}, {ID: "test-worker-list-3", Hostname: "host-3"}, } for _, w := range workers { if err := repo.Register(ctx, w); err != nil { t.Fatalf("Register() error = %v", err) } } // Make one busy if err := repo.UpdateStatus(ctx, "test-worker-list-2", domain.WorkerStatusBusy, "task-1"); err != nil { t.Fatalf("UpdateStatus() error = %v", err) } t.Run("lists all workers", func(t *testing.T) { got, err := repo.List(ctx, port.WorkerFilter{}) if err != nil { t.Fatalf("List() error = %v", err) } // Filter to just our test workers count := 0 for _, w := range got { if w.ID == "test-worker-list-1" || w.ID == "test-worker-list-2" || w.ID == "test-worker-list-3" { count++ } } if count < 3 { t.Errorf("expected at least 3 test workers, got %d", count) } }) t.Run("filters by status", func(t *testing.T) { idle := domain.WorkerStatusIdle got, err := repo.List(ctx, port.WorkerFilter{Status: &idle}) if err != nil { t.Fatalf("List() error = %v", err) } for _, w := range got { if w.Status != domain.WorkerStatusIdle { t.Errorf("got worker with status %q, want only idle", w.Status) } } }) t.Run("respects limit", func(t *testing.T) { got, err := repo.List(ctx, port.WorkerFilter{Limit: 1}) if err != nil { t.Fatalf("List() error = %v", err) } if len(got) > 1 { t.Errorf("got %d workers, want at most 1", len(got)) } }) } func TestWorkerRegistryRepository_MarkStaleOffline(t *testing.T) { db := testutil.TestDB(t) t.Cleanup(func() { cleanupTestWorkers(t, db) }) repo := NewWorkerRegistryRepository(db) ctx := context.Background() // Register a worker worker := &domain.Worker{ ID: "test-worker-stale-1", Hostname: "host-1", } if err := repo.Register(ctx, worker); err != nil { t.Fatalf("Register() error = %v", err) } t.Run("marks stale worker offline", func(t *testing.T) { // Set heartbeat to past _, err := db.Exec("UPDATE workers SET last_heartbeat = $1 WHERE id = $2", time.Now().Add(-5*time.Minute), "test-worker-stale-1") if err != nil { t.Fatalf("set heartbeat: %v", err) } count, err := repo.MarkStaleOffline(ctx, 90*time.Second) if err != nil { t.Fatalf("MarkStaleOffline() error = %v", err) } if count < 1 { t.Errorf("expected at least 1 worker marked offline, got %d", count) } got, err := repo.Get(ctx, "test-worker-stale-1") if err != nil { t.Fatalf("Get() error = %v", err) } if got.Status != domain.WorkerStatusOffline { t.Errorf("got status %q, want %q", got.Status, domain.WorkerStatusOffline) } }) }