From d74efb75ff19fca59653dc0a724af1d800e4bd65 Mon Sep 17 00:00:00 2001 From: jordan Date: Fri, 6 Feb 2026 10:35:39 -0700 Subject: [PATCH] fix: wire workService to WorkersHandler and add /work/tasks endpoint Critical fix: WorkersHandler was missing workService dependency, causing 500 errors when workers tried to fail tasks. This caused tasks to get stuck in "running" state permanently. Also adds: - /work/tasks endpoint for debugging all tasks across projects - List method to WorkQueue interface for admin views - HTTP client tests for api_client.go and claudebox/client.go (48 tests) - Split work.go DTOs into work_dto.go to stay under 500 lines Co-Authored-By: Claude Opus 4.5 --- cmd/rdev-api/main.go | 2 +- internal/adapter/claudebox/client_test.go | 774 ++++++++++++++++++ .../adapter/postgres/work_queue_queries.go | 57 ++ internal/adapter/sdlc/worker_executor_test.go | 4 + internal/handlers/work.go | 134 ++- internal/handlers/work_dto.go | 85 ++ internal/handlers/work_test.go | 33 + internal/port/work_queue.go | 4 + internal/service/mock_test.go | 4 + internal/service/work_service.go | 5 + internal/worker/api_client_test.go | 584 +++++++++++++ internal/worker/mock_test.go | 3 + internal/worker/queue_maintenance_test.go | 4 + 13 files changed, 1613 insertions(+), 80 deletions(-) create mode 100644 internal/adapter/claudebox/client_test.go create mode 100644 internal/handlers/work_dto.go create mode 100644 internal/worker/api_client_test.go diff --git a/cmd/rdev-api/main.go b/cmd/rdev-api/main.go index fead6ba..1eb45bd 100644 --- a/cmd/rdev-api/main.go +++ b/cmd/rdev-api/main.go @@ -423,7 +423,7 @@ func main() { agentsHandler := handlers.NewAgentsHandler(agentRegistry) // Initialize worker pool handlers - workersHandler := handlers.NewWorkersHandler(workerService) + workersHandler := handlers.NewWorkersHandler(workerService).WithWorkService(workService) buildsHandler := handlers.NewBuildsHandler(buildService) createAndBuildHandler := handlers.NewCreateAndBuildHandler(projectInfraService, buildService) diff --git a/internal/adapter/claudebox/client_test.go b/internal/adapter/claudebox/client_test.go new file mode 100644 index 0000000..995aa21 --- /dev/null +++ b/internal/adapter/claudebox/client_test.go @@ -0,0 +1,774 @@ +package claudebox + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + "time" +) + +func TestNewClient_DefaultTimeout(t *testing.T) { + client := NewClient(ClientConfig{ + BaseURL: "http://localhost:8080", + }) + + if client.httpClient.Timeout != 10*time.Minute { + t.Errorf("expected default timeout 10m, got %v", client.httpClient.Timeout) + } +} + +func TestNewClient_CustomTimeout(t *testing.T) { + client := NewClient(ClientConfig{ + BaseURL: "http://localhost:8080", + Timeout: 5 * time.Minute, + }) + + if client.httpClient.Timeout != 5*time.Minute { + t.Errorf("expected timeout 5m, got %v", client.httpClient.Timeout) + } +} + +func TestNewClient_TrimsTrailingSlash(t *testing.T) { + client := NewClient(ClientConfig{ + BaseURL: "http://localhost:8080/", + }) + + if client.baseURL != "http://localhost:8080" { + t.Errorf("expected trailing slash trimmed, got %s", client.baseURL) + } +} + +func TestHealth_Success(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + t.Errorf("expected GET, got %s", r.Method) + } + if r.URL.Path != "/health" { + t.Errorf("expected /health, got %s", r.URL.Path) + } + + resp := HealthResponse{ + Status: "healthy", + Timestamp: "2024-01-15T10:30:00Z", + WorkDir: "/workspace", + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(resp) + })) + defer server.Close() + + client := NewClient(ClientConfig{BaseURL: server.URL}) + + health, err := client.Health(context.Background()) + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if health.Status != "healthy" { + t.Errorf("expected status 'healthy', got %s", health.Status) + } + if health.WorkDir != "/workspace" { + t.Errorf("expected work_dir '/workspace', got %s", health.WorkDir) + } +} + +func TestHealth_Unhealthy(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusServiceUnavailable) + })) + defer server.Close() + + client := NewClient(ClientConfig{BaseURL: server.URL}) + + _, err := client.Health(context.Background()) + + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), "health check returned status 503") { + t.Errorf("expected 503 error, got %v", err) + } +} + +func TestHealth_NetworkError(t *testing.T) { + client := NewClient(ClientConfig{ + BaseURL: "http://localhost:1", + Timeout: 100 * time.Millisecond, + }) + + _, err := client.Health(context.Background()) + + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), "health check:") { + t.Errorf("expected error wrapped with 'health check:', got %v", err) + } +} + +func TestExecute_Success(t *testing.T) { + var receivedReq ExecuteRequest + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + t.Errorf("expected POST, got %s", r.Method) + } + if r.URL.Path != "/execute" { + t.Errorf("expected /execute, got %s", r.URL.Path) + } + if r.Header.Get("Content-Type") != "application/json" { + t.Errorf("expected Content-Type application/json, got %s", r.Header.Get("Content-Type")) + } + + body, _ := io.ReadAll(r.Body) + _ = json.Unmarshal(body, &receivedReq) + + resp := ExecuteResponse{ + Success: true, + Output: "Task completed", + ExitCode: 0, + DurationMs: 5000, + SessionID: "session-123", + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(resp) + })) + defer server.Close() + + client := NewClient(ClientConfig{BaseURL: server.URL}) + + req := &ExecuteRequest{ + Prompt: "Build the project", + AllowedTools: []string{"Bash", "Read", "Write"}, + WorkingDir: "/workspace/project", + Timeout: 300, + Metadata: map[string]string{"task_id": "task-1"}, + } + + result, err := client.Execute(context.Background(), req) + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !result.Success { + t.Error("expected success=true") + } + if result.Output != "Task completed" { + t.Errorf("expected output 'Task completed', got %s", result.Output) + } + if result.ExitCode != 0 { + t.Errorf("expected exit code 0, got %d", result.ExitCode) + } + + // Verify request was serialized correctly + if receivedReq.Prompt != "Build the project" { + t.Errorf("expected prompt 'Build the project', got %s", receivedReq.Prompt) + } + if len(receivedReq.AllowedTools) != 3 { + t.Errorf("expected 3 allowed tools, got %d", len(receivedReq.AllowedTools)) + } +} + +func TestExecute_ErrorStatus(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte(`{"error":"invalid prompt"}`)) + })) + defer server.Close() + + client := NewClient(ClientConfig{BaseURL: server.URL}) + + _, err := client.Execute(context.Background(), &ExecuteRequest{Prompt: ""}) + + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), "execute returned status 400") { + t.Errorf("expected 400 error, got %v", err) + } + if !strings.Contains(err.Error(), "invalid prompt") { + t.Errorf("expected error body in message, got %v", err) + } +} + +func TestExecute_MalformedResponse(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{invalid json`)) + })) + defer server.Close() + + client := NewClient(ClientConfig{BaseURL: server.URL}) + + _, err := client.Execute(context.Background(), &ExecuteRequest{Prompt: "test"}) + + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), "decode response") { + t.Errorf("expected decode error, got %v", err) + } +} + +func TestExecuteStream_Success(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + t.Errorf("expected POST, got %s", r.Method) + } + if r.URL.Path != "/execute/stream" { + t.Errorf("expected /execute/stream, got %s", r.URL.Path) + } + if r.Header.Get("Accept") != "text/event-stream" { + t.Errorf("expected Accept text/event-stream, got %s", r.Header.Get("Accept")) + } + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + + flusher, ok := w.(http.Flusher) + if !ok { + t.Fatal("expected http.Flusher") + } + + events := []StreamEvent{ + {Type: "start", Timestamp: "2024-01-15T10:30:00Z"}, + {Type: "output", Content: "Building...", Timestamp: "2024-01-15T10:30:01Z"}, + {Type: "tool_call", ToolName: "Bash", Timestamp: "2024-01-15T10:30:02Z"}, + {Type: "complete", Content: "Done", Timestamp: "2024-01-15T10:30:05Z"}, + } + + for _, event := range events { + data, _ := json.Marshal(event) + fmt.Fprintf(w, "data: %s\n\n", data) + flusher.Flush() + } + })) + defer server.Close() + + client := NewClient(ClientConfig{BaseURL: server.URL}) + + var receivedEvents []StreamEvent + var mu sync.Mutex + + err := client.ExecuteStream(context.Background(), &ExecuteRequest{Prompt: "build"}, func(event StreamEvent) { + mu.Lock() + receivedEvents = append(receivedEvents, event) + mu.Unlock() + }) + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(receivedEvents) != 4 { + t.Fatalf("expected 4 events, got %d", len(receivedEvents)) + } + if receivedEvents[0].Type != "start" { + t.Errorf("expected first event type 'start', got %s", receivedEvents[0].Type) + } + if receivedEvents[1].Content != "Building..." { + t.Errorf("expected second event content 'Building...', got %s", receivedEvents[1].Content) + } + if receivedEvents[2].ToolName != "Bash" { + t.Errorf("expected third event tool name 'Bash', got %s", receivedEvents[2].ToolName) + } +} + +func TestExecuteStream_SkipsMalformedEvents(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + + flusher, _ := w.(http.Flusher) + + // Valid event + event1, _ := json.Marshal(StreamEvent{Type: "start"}) + fmt.Fprintf(w, "data: %s\n\n", event1) + flusher.Flush() + + // Malformed JSON - should be skipped + fmt.Fprintf(w, "data: {invalid json}\n\n") + flusher.Flush() + + // Empty data - should be skipped + fmt.Fprintf(w, "data: \n\n") + flusher.Flush() + + // Non-data line - should be skipped + fmt.Fprintf(w, "event: ping\n\n") + flusher.Flush() + + // Valid event + event2, _ := json.Marshal(StreamEvent{Type: "complete"}) + fmt.Fprintf(w, "data: %s\n\n", event2) + flusher.Flush() + })) + defer server.Close() + + client := NewClient(ClientConfig{BaseURL: server.URL}) + + var receivedEvents []StreamEvent + var mu sync.Mutex + + err := client.ExecuteStream(context.Background(), &ExecuteRequest{Prompt: "test"}, func(event StreamEvent) { + mu.Lock() + receivedEvents = append(receivedEvents, event) + mu.Unlock() + }) + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Should only receive the 2 valid events + if len(receivedEvents) != 2 { + t.Fatalf("expected 2 events (malformed skipped), got %d", len(receivedEvents)) + } + if receivedEvents[0].Type != "start" { + t.Errorf("expected first event 'start', got %s", receivedEvents[0].Type) + } + if receivedEvents[1].Type != "complete" { + t.Errorf("expected second event 'complete', got %s", receivedEvents[1].Type) + } +} + +func TestExecuteStream_ErrorStatus(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte(`{"error":"agent unavailable"}`)) + })) + defer server.Close() + + client := NewClient(ClientConfig{BaseURL: server.URL}) + + err := client.ExecuteStream(context.Background(), &ExecuteRequest{Prompt: "test"}, func(event StreamEvent) { + t.Error("handler should not be called on error") + }) + + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), "execute stream returned status 500") { + t.Errorf("expected 500 error, got %v", err) + } +} + +func TestExecuteStream_ContextCanceledBeforeRequest(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Error("handler should not be called when context is already canceled") + })) + defer server.Close() + + client := NewClient(ClientConfig{BaseURL: server.URL}) + + // Cancel context before making request + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + err := client.ExecuteStream(ctx, &ExecuteRequest{Prompt: "test"}, func(event StreamEvent) {}) + + if err == nil { + t.Fatal("expected error, got nil") + } + // Should get a context canceled error +} + +func TestGitClone_Success(t *testing.T) { + var receivedReq GitCloneRequest + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + t.Errorf("expected POST, got %s", r.Method) + } + if r.URL.Path != "/git/clone" { + t.Errorf("expected /git/clone, got %s", r.URL.Path) + } + + body, _ := io.ReadAll(r.Body) + _ = json.Unmarshal(body, &receivedReq) + + resp := GitCloneResponse{ + Success: true, + Cloned: true, + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(resp) + })) + defer server.Close() + + client := NewClient(ClientConfig{BaseURL: server.URL}) + + result, err := client.GitClone(context.Background(), "https://github.com/example/repo.git", "/workspace") + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !result.Success { + t.Error("expected success=true") + } + if !result.Cloned { + t.Error("expected cloned=true") + } + if receivedReq.CloneURL != "https://github.com/example/repo.git" { + t.Errorf("expected clone URL, got %s", receivedReq.CloneURL) + } + if receivedReq.WorkDir != "/workspace" { + t.Errorf("expected work dir '/workspace', got %s", receivedReq.WorkDir) + } +} + +func TestGitClone_AlreadyExists(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + resp := GitCloneResponse{ + Success: true, + Cloned: false, // Already existed, just updated + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(resp) + })) + defer server.Close() + + client := NewClient(ClientConfig{BaseURL: server.URL}) + + result, err := client.GitClone(context.Background(), "https://github.com/example/repo.git", "/workspace") + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !result.Success { + t.Error("expected success=true") + } + if result.Cloned { + t.Error("expected cloned=false for existing repo") + } +} + +func TestGitClone_ErrorStatus(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte(`{"error":"invalid clone URL"}`)) + })) + defer server.Close() + + client := NewClient(ClientConfig{BaseURL: server.URL}) + + _, err := client.GitClone(context.Background(), "invalid", "/workspace") + + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), "git clone returned status 400") { + t.Errorf("expected 400 error, got %v", err) + } +} + +func TestGitCommitAndPush_Success(t *testing.T) { + var receivedReq GitCommitAndPushRequest + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + t.Errorf("expected POST, got %s", r.Method) + } + if r.URL.Path != "/git/commit-and-push" { + t.Errorf("expected /git/commit-and-push, got %s", r.URL.Path) + } + + body, _ := io.ReadAll(r.Body) + _ = json.Unmarshal(body, &receivedReq) + + resp := GitCommitAndPushResponse{ + Success: true, + HasChanges: true, + CommitSHA: "abc123def456", + FilesChanged: []string{"main.go", "go.mod"}, + Pushed: true, + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(resp) + })) + defer server.Close() + + client := NewClient(ClientConfig{BaseURL: server.URL}) + + result, err := client.GitCommitAndPush(context.Background(), "feat: add feature", true, "/workspace") + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !result.Success { + t.Error("expected success=true") + } + if !result.HasChanges { + t.Error("expected has_changes=true") + } + if result.CommitSHA != "abc123def456" { + t.Errorf("expected commit SHA abc123def456, got %s", result.CommitSHA) + } + if !result.Pushed { + t.Error("expected pushed=true") + } + if len(result.FilesChanged) != 2 { + t.Errorf("expected 2 files changed, got %d", len(result.FilesChanged)) + } + + // Verify request + if receivedReq.Message != "feat: add feature" { + t.Errorf("expected message 'feat: add feature', got %s", receivedReq.Message) + } + if !receivedReq.Push { + t.Error("expected push=true in request") + } +} + +func TestGitCommitAndPush_NoChanges(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + resp := GitCommitAndPushResponse{ + Success: true, + HasChanges: false, + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(resp) + })) + defer server.Close() + + client := NewClient(ClientConfig{BaseURL: server.URL}) + + result, err := client.GitCommitAndPush(context.Background(), "test", false, "/workspace") + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !result.Success { + t.Error("expected success=true") + } + if result.HasChanges { + t.Error("expected has_changes=false") + } + if result.CommitSHA != "" { + t.Errorf("expected empty commit SHA, got %s", result.CommitSHA) + } +} + +func TestGitCommitAndPush_ErrorStatus(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte(`{"error":"git push failed"}`)) + })) + defer server.Close() + + client := NewClient(ClientConfig{BaseURL: server.URL}) + + _, err := client.GitCommitAndPush(context.Background(), "test", true, "/workspace") + + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), "git commit returned status 500") { + t.Errorf("expected 500 error, got %v", err) + } +} + +func TestGitStatus_Success(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + t.Errorf("expected GET, got %s", r.Method) + } + if r.URL.Path != "/git/status" { + t.Errorf("expected /git/status, got %s", r.URL.Path) + } + if r.URL.Query().Get("work_dir") != "/workspace/project" { + t.Errorf("expected work_dir query param, got %s", r.URL.Query().Get("work_dir")) + } + + resp := GitStatusResponse{ + IsRepo: true, + HasChanges: true, + ChangedFiles: []string{"main.go", "README.md"}, + Branch: "feature/test", + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(resp) + })) + defer server.Close() + + client := NewClient(ClientConfig{BaseURL: server.URL}) + + result, err := client.GitStatus(context.Background(), "/workspace/project") + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !result.IsRepo { + t.Error("expected is_repo=true") + } + if !result.HasChanges { + t.Error("expected has_changes=true") + } + if result.Branch != "feature/test" { + t.Errorf("expected branch 'feature/test', got %s", result.Branch) + } + if len(result.ChangedFiles) != 2 { + t.Errorf("expected 2 changed files, got %d", len(result.ChangedFiles)) + } +} + +func TestGitStatus_EmptyWorkDir(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // work_dir should not be in query when empty + if r.URL.Query().Get("work_dir") != "" { + t.Errorf("expected empty work_dir, got %s", r.URL.Query().Get("work_dir")) + } + + resp := GitStatusResponse{IsRepo: true} + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(resp) + })) + defer server.Close() + + client := NewClient(ClientConfig{BaseURL: server.URL}) + + _, err := client.GitStatus(context.Background(), "") + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestGitStatus_ErrorStatus(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotFound) + _, _ = w.Write([]byte(`{"error":"not a git repository"}`)) + })) + defer server.Close() + + client := NewClient(ClientConfig{BaseURL: server.URL}) + + _, err := client.GitStatus(context.Background(), "/workspace") + + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), "git status returned status 404") { + t.Errorf("expected 404 error, got %v", err) + } +} + +func TestRunSDLC_Success(t *testing.T) { + var receivedReq SDLCRequest + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + t.Errorf("expected POST, got %s", r.Method) + } + if r.URL.Path != "/sdlc" { + t.Errorf("expected /sdlc, got %s", r.URL.Path) + } + + body, _ := io.ReadAll(r.Body) + _ = json.Unmarshal(body, &receivedReq) + + resp := SDLCResponse{ + Success: true, + Output: "Feature started successfully", + Data: json.RawMessage(`{"feature_id":"feat-123"}`), + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(resp) + })) + defer server.Close() + + client := NewClient(ClientConfig{BaseURL: server.URL}) + + result, err := client.RunSDLC(context.Background(), "start", []string{"--name", "test-feature"}, "/workspace") + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !result.Success { + t.Error("expected success=true") + } + if result.Output != "Feature started successfully" { + t.Errorf("expected output message, got %s", result.Output) + } + if result.Data == nil { + t.Error("expected data to be set") + } + + // Verify request + if receivedReq.Command != "start" { + t.Errorf("expected command 'start', got %s", receivedReq.Command) + } + if len(receivedReq.Args) != 2 { + t.Errorf("expected 2 args, got %d", len(receivedReq.Args)) + } + if receivedReq.WorkDir != "/workspace" { + t.Errorf("expected work dir '/workspace', got %s", receivedReq.WorkDir) + } +} + +func TestRunSDLC_CommandFailed(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + resp := SDLCResponse{ + Success: false, + Output: "Command output before failure", + Error: "validation failed: missing required field", + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(resp) + })) + defer server.Close() + + client := NewClient(ClientConfig{BaseURL: server.URL}) + + result, err := client.RunSDLC(context.Background(), "validate", nil, "/workspace") + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result.Success { + t.Error("expected success=false") + } + if result.Error != "validation failed: missing required field" { + t.Errorf("expected error message, got %s", result.Error) + } +} + +func TestRunSDLC_ErrorStatus(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte(`{"error":"sdlc binary not found"}`)) + })) + defer server.Close() + + client := NewClient(ClientConfig{BaseURL: server.URL}) + + _, err := client.RunSDLC(context.Background(), "status", nil, "/workspace") + + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), "sdlc returned status 500") { + t.Errorf("expected 500 error, got %v", err) + } +} + +func TestRunSDLC_MalformedResponse(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{invalid`)) + })) + defer server.Close() + + client := NewClient(ClientConfig{BaseURL: server.URL}) + + _, err := client.RunSDLC(context.Background(), "status", nil, "/workspace") + + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), "decode response") { + t.Errorf("expected decode error, got %v", err) + } +} diff --git a/internal/adapter/postgres/work_queue_queries.go b/internal/adapter/postgres/work_queue_queries.go index 9cf1582..17d8dff 100644 --- a/internal/adapter/postgres/work_queue_queries.go +++ b/internal/adapter/postgres/work_queue_queries.go @@ -97,6 +97,63 @@ func (r *WorkQueueRepository) GetTask(ctx context.Context, taskID string) (*doma return &task, nil } +// List returns all tasks with optional status filter and pagination. +func (r *WorkQueueRepository) List(ctx context.Context, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) { + opts.Normalize() + + // Build optional WHERE clause + whereClause := "" + var args []any + argNum := 1 + + if status != nil { + whereClause = fmt.Sprintf("WHERE status = $%d", argNum) + args = append(args, string(*status)) + argNum++ + } + + // Get total count + countQuery := "SELECT COUNT(*) FROM work_queue " + whereClause + var total int64 + if err := r.db.QueryRowContext(ctx, countQuery, args...).Scan(&total); err != nil { + return nil, fmt.Errorf("count work tasks: %w", err) + } + + // Build paginated query + query := fmt.Sprintf(` + SELECT id, project_id, task_type, task_spec, status, priority, worker_id, + callback_url, created_at, started_at, completed_at, result, error, + retry_count, max_retries, error_code + FROM work_queue + %s + ORDER BY created_at DESC + LIMIT $%d OFFSET $%d + `, whereClause, argNum, argNum+1) + args = append(args, opts.Limit, opts.Offset) + + rows, err := r.db.QueryContext(ctx, query, args...) + if err != nil { + return nil, fmt.Errorf("list work tasks: %w", err) + } + defer func() { _ = rows.Close() }() + + var tasks []*domain.WorkTask + for rows.Next() { + task, err := r.scanTask(rows) + if err != nil { + return nil, err + } + tasks = append(tasks, task) + } + + return &domain.WorkListResult{ + Tasks: tasks, + Total: total, + Limit: opts.Limit, + Offset: opts.Offset, + }, nil +} + // ListByProject returns tasks for a project with optional status filter and pagination. func (r *WorkQueueRepository) ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) { // Normalize pagination options diff --git a/internal/adapter/sdlc/worker_executor_test.go b/internal/adapter/sdlc/worker_executor_test.go index 93fd56c..c85cf25 100644 --- a/internal/adapter/sdlc/worker_executor_test.go +++ b/internal/adapter/sdlc/worker_executor_test.go @@ -80,6 +80,10 @@ func (m *mockWorkQueue) GetTask(ctx context.Context, taskID string) (*domain.Wor return task, nil } +func (m *mockWorkQueue) List(ctx context.Context, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) { + return &domain.WorkListResult{}, nil +} + func (m *mockWorkQueue) ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) { return &domain.WorkListResult{}, nil } diff --git a/internal/handlers/work.go b/internal/handlers/work.go index d2d6032..265c569 100644 --- a/internal/handlers/work.go +++ b/internal/handlers/work.go @@ -38,10 +38,11 @@ func (h *WorkHandler) Mount(r api.Router) { r.With(auth.RequireScope(auth.ScopeQueueWrite, auth.ScopeAdmin)).Post("/{taskId}/cancel", h.Cancel) // Read operations + r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/tasks", h.ListTasks) + r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/stats", h.Stats) + r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/projects/{projectId}", h.ListByProject) r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/{taskId}", h.GetTask) r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/{taskId}/status", h.GetStatus) - r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/projects/{projectId}", h.ListByProject) - r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/stats", h.Stats) }) } @@ -121,65 +122,6 @@ type DequeueWorkResponse struct { Task *WorkTaskDTO `json:"task,omitempty"` } -// WorkTaskDTO is the data transfer object for work tasks. -type WorkTaskDTO struct { - ID string `json:"id"` - ProjectID string `json:"project_id"` - Type string `json:"type"` - Spec map[string]any `json:"spec"` - Status string `json:"status"` - Priority int `json:"priority"` - WorkerID string `json:"worker_id,omitempty"` - CallbackURL string `json:"callback_url,omitempty"` - CreatedAt string `json:"created_at"` - StartedAt string `json:"started_at,omitempty"` - CompletedAt string `json:"completed_at,omitempty"` - Result *WorkResultDTO `json:"result,omitempty"` - Error string `json:"error,omitempty"` - RetryCount int `json:"retry_count"` - MaxRetries int `json:"max_retries"` -} - -// WorkResultDTO is the data transfer object for work results. -type WorkResultDTO struct { - Output string `json:"output,omitempty"` - Artifacts map[string]string `json:"artifacts,omitempty"` -} - -// toWorkTaskDTO converts a domain.WorkTask to a WorkTaskDTO. -func toWorkTaskDTO(t *domain.WorkTask) *WorkTaskDTO { - if t == nil { - return nil - } - dto := &WorkTaskDTO{ - ID: t.ID, - ProjectID: t.ProjectID, - Type: string(t.Type), - Spec: t.Spec, - Status: string(t.Status), - Priority: t.Priority, - WorkerID: t.WorkerID, - CallbackURL: t.CallbackURL, - CreatedAt: t.CreatedAt.Format("2006-01-02T15:04:05Z07:00"), - Error: t.Error, - RetryCount: t.RetryCount, - MaxRetries: t.MaxRetries, - } - if t.StartedAt != nil { - dto.StartedAt = t.StartedAt.Format("2006-01-02T15:04:05Z07:00") - } - if t.CompletedAt != nil { - dto.CompletedAt = t.CompletedAt.Format("2006-01-02T15:04:05Z07:00") - } - if t.Result != nil { - dto.Result = &WorkResultDTO{ - Output: t.Result.Output, - Artifacts: t.Result.Artifacts, - } - } - return dto -} - // Dequeue claims the next available task for a worker. // POST /work/dequeue func (h *WorkHandler) Dequeue(w http.ResponseWriter, r *http.Request) { @@ -341,6 +283,58 @@ func (h *WorkHandler) Cancel(w http.ResponseWriter, r *http.Request) { }) } +// ListTasks returns all tasks with optional status filter and pagination. +// GET /work/tasks?status=running&limit=50&offset=0 +func (h *WorkHandler) ListTasks(w http.ResponseWriter, r *http.Request) { + // Parse and validate optional status filter + var status *domain.WorkTaskStatus + if s := r.URL.Query().Get("status"); s != "" { + st := domain.WorkTaskStatus(s) + if !st.IsValid() { + api.WriteBadRequest(w, r, "invalid status filter: must be pending, running, completed, failed, or cancelled") + return + } + status = &st + } + + // Parse pagination options + opts := domain.DefaultWorkListOptions() + if limitStr := r.URL.Query().Get("limit"); limitStr != "" { + limit, err := strconv.Atoi(limitStr) + if err != nil { + api.WriteBadRequest(w, r, "limit must be a valid integer") + return + } + opts.Limit = limit + } + if offsetStr := r.URL.Query().Get("offset"); offsetStr != "" { + offset, err := strconv.Atoi(offsetStr) + if err != nil { + api.WriteBadRequest(w, r, "offset must be a valid integer") + return + } + opts.Offset = offset + } + + result, err := h.workService.List(r.Context(), status, opts) + if err != nil { + api.WriteInternalError(w, r, "failed to list tasks") + return + } + + dtos := make([]*WorkTaskDTO, len(result.Tasks)) + for i, t := range result.Tasks { + dtos[i] = toWorkTaskDTO(t) + } + + api.WriteSuccess(w, r, map[string]any{ + "tasks": dtos, + "total": result.Total, + "limit": result.Limit, + "offset": result.Offset, + }) +} + // ListByProject returns tasks for a project with pagination. // GET /work/projects/{projectId}?status=pending&limit=50&offset=0 func (h *WorkHandler) ListByProject(w http.ResponseWriter, r *http.Request) { @@ -428,21 +422,3 @@ func (h *WorkHandler) Stats(w http.ResponseWriter, r *http.Request) { api.WriteSuccess(w, r, resp) } - -// formatDuration formats a duration in a human-readable way. -func formatDuration(d interface{ Seconds() float64 }) string { - secs := d.Seconds() - if secs < 60 { - return fmt.Sprintf("%.0fs", secs) - } - mins := secs / 60 - if mins < 60 { - return fmt.Sprintf("%.1fm", mins) - } - hours := mins / 60 - if hours < 24 { - return fmt.Sprintf("%.1fh", hours) - } - days := hours / 24 - return fmt.Sprintf("%.1fd", days) -} diff --git a/internal/handlers/work_dto.go b/internal/handlers/work_dto.go new file mode 100644 index 0000000..d664334 --- /dev/null +++ b/internal/handlers/work_dto.go @@ -0,0 +1,85 @@ +// Package handlers provides HTTP handlers for the rdev API. +package handlers + +import ( + "fmt" + + "github.com/orchard9/rdev/internal/domain" +) + +// WorkTaskDTO is the data transfer object for work tasks. +type WorkTaskDTO struct { + ID string `json:"id"` + ProjectID string `json:"project_id"` + Type string `json:"type"` + Spec map[string]any `json:"spec"` + Status string `json:"status"` + Priority int `json:"priority"` + WorkerID string `json:"worker_id,omitempty"` + CallbackURL string `json:"callback_url,omitempty"` + CreatedAt string `json:"created_at"` + StartedAt string `json:"started_at,omitempty"` + CompletedAt string `json:"completed_at,omitempty"` + Result *WorkResultDTO `json:"result,omitempty"` + Error string `json:"error,omitempty"` + RetryCount int `json:"retry_count"` + MaxRetries int `json:"max_retries"` +} + +// WorkResultDTO is the data transfer object for work results. +type WorkResultDTO struct { + Output string `json:"output,omitempty"` + Artifacts map[string]string `json:"artifacts,omitempty"` +} + +// toWorkTaskDTO converts a domain.WorkTask to a WorkTaskDTO. +func toWorkTaskDTO(t *domain.WorkTask) *WorkTaskDTO { + if t == nil { + return nil + } + dto := &WorkTaskDTO{ + ID: t.ID, + ProjectID: t.ProjectID, + Type: string(t.Type), + Spec: t.Spec, + Status: string(t.Status), + Priority: t.Priority, + WorkerID: t.WorkerID, + CallbackURL: t.CallbackURL, + CreatedAt: t.CreatedAt.Format("2006-01-02T15:04:05Z07:00"), + Error: t.Error, + RetryCount: t.RetryCount, + MaxRetries: t.MaxRetries, + } + if t.StartedAt != nil { + dto.StartedAt = t.StartedAt.Format("2006-01-02T15:04:05Z07:00") + } + if t.CompletedAt != nil { + dto.CompletedAt = t.CompletedAt.Format("2006-01-02T15:04:05Z07:00") + } + if t.Result != nil { + dto.Result = &WorkResultDTO{ + Output: t.Result.Output, + Artifacts: t.Result.Artifacts, + } + } + return dto +} + +// formatDuration formats a duration in a human-readable way. +func formatDuration(d interface{ Seconds() float64 }) string { + secs := d.Seconds() + if secs < 60 { + return fmt.Sprintf("%.0fs", secs) + } + mins := secs / 60 + if mins < 60 { + return fmt.Sprintf("%.1fm", mins) + } + hours := mins / 60 + if hours < 24 { + return fmt.Sprintf("%.1fh", hours) + } + days := hours / 24 + return fmt.Sprintf("%.1fd", days) +} diff --git a/internal/handlers/work_test.go b/internal/handlers/work_test.go index 9ad721d..71d7332 100644 --- a/internal/handlers/work_test.go +++ b/internal/handlers/work_test.go @@ -123,6 +123,39 @@ func (m *mockWorkQueue) GetTask(ctx context.Context, taskID string) (*domain.Wor return task, nil } +func (m *mockWorkQueue) List(ctx context.Context, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) { + if m.err != nil { + return nil, m.err + } + opts.Normalize() + + var tasks []*domain.WorkTask + for _, task := range m.tasks { + if status == nil || task.Status == *status { + tasks = append(tasks, task) + } + } + + // Apply pagination + total := int64(len(tasks)) + if opts.Offset >= len(tasks) { + tasks = nil + } else { + end := opts.Offset + opts.Limit + if end > len(tasks) { + end = len(tasks) + } + tasks = tasks[opts.Offset:end] + } + + return &domain.WorkListResult{ + Tasks: tasks, + Total: total, + Limit: opts.Limit, + Offset: opts.Offset, + }, nil +} + func (m *mockWorkQueue) ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) { if m.err != nil { return nil, m.err diff --git a/internal/port/work_queue.go b/internal/port/work_queue.go index e5f656a..50969ab 100644 --- a/internal/port/work_queue.go +++ b/internal/port/work_queue.go @@ -40,6 +40,10 @@ type WorkQueue interface { // GetTask retrieves a task by ID. GetTask(ctx context.Context, taskID string) (*domain.WorkTask, error) + // List returns all tasks with optional status filter and pagination. + // Use for admin/debugging views across all projects. + List(ctx context.Context, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) + // ListByProject returns tasks for a project with optional status filter and pagination. ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) diff --git a/internal/service/mock_test.go b/internal/service/mock_test.go index 352a3bf..dd81f92 100644 --- a/internal/service/mock_test.go +++ b/internal/service/mock_test.go @@ -91,6 +91,10 @@ func (m *mockWorkQueue) GetTask(ctx context.Context, taskID string) (*domain.Wor return task, nil } +func (m *mockWorkQueue) List(ctx context.Context, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) { + return &domain.WorkListResult{}, nil +} + func (m *mockWorkQueue) ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) { return &domain.WorkListResult{}, nil } diff --git a/internal/service/work_service.go b/internal/service/work_service.go index 532d9a0..9c116b6 100644 --- a/internal/service/work_service.go +++ b/internal/service/work_service.go @@ -205,6 +205,11 @@ func (s *WorkService) GetTask(ctx context.Context, taskID string) (*domain.WorkT return s.queue.GetTask(ctx, taskID) } +// List returns all tasks with optional status filter and pagination. +func (s *WorkService) List(ctx context.Context, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) { + return s.queue.List(ctx, status, opts) +} + // ListByProject returns tasks for a project with pagination. func (s *WorkService) ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) { return s.queue.ListByProject(ctx, projectID, status, opts) diff --git a/internal/worker/api_client_test.go b/internal/worker/api_client_test.go new file mode 100644 index 0000000..0bfadb0 --- /dev/null +++ b/internal/worker/api_client_test.go @@ -0,0 +1,584 @@ +package worker + +import ( + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/orchard9/rdev/internal/domain" +) + +func TestNewAPIClient_DefaultTimeout(t *testing.T) { + client := NewAPIClient(APIClientConfig{ + BaseURL: "http://localhost:8080", + APIKey: "test-key", + }) + + if client.httpClient.Timeout != 30*time.Second { + t.Errorf("expected default timeout 30s, got %v", client.httpClient.Timeout) + } +} + +func TestNewAPIClient_CustomTimeout(t *testing.T) { + client := NewAPIClient(APIClientConfig{ + BaseURL: "http://localhost:8080", + APIKey: "test-key", + Timeout: 60 * time.Second, + }) + + if client.httpClient.Timeout != 60*time.Second { + t.Errorf("expected timeout 60s, got %v", client.httpClient.Timeout) + } +} + +func TestRegister_Success(t *testing.T) { + var receivedReq RegisterRequest + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + t.Errorf("expected POST, got %s", r.Method) + } + if r.URL.Path != "/workers/register" { + t.Errorf("expected /workers/register, got %s", r.URL.Path) + } + if r.Header.Get("Content-Type") != "application/json" { + t.Errorf("expected Content-Type application/json, got %s", r.Header.Get("Content-Type")) + } + if r.Header.Get("X-API-Key") != "test-key" { + t.Errorf("expected X-API-Key test-key, got %s", r.Header.Get("X-API-Key")) + } + + body, _ := io.ReadAll(r.Body) + _ = json.Unmarshal(body, &receivedReq) + + w.WriteHeader(http.StatusCreated) + })) + defer server.Close() + + client := NewAPIClient(APIClientConfig{ + BaseURL: server.URL, + APIKey: "test-key", + }) + + err := client.Register(context.Background(), &RegisterRequest{ + ID: "worker-1", + Hostname: "localhost", + Version: "v1.0.0", + Capabilities: []string{"build", "test"}, + }) + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if receivedReq.ID != "worker-1" { + t.Errorf("expected ID worker-1, got %s", receivedReq.ID) + } + if receivedReq.Hostname != "localhost" { + t.Errorf("expected hostname localhost, got %s", receivedReq.Hostname) + } +} + +func TestRegister_ErrorStatus(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte(`{"error":"invalid worker ID"}`)) + })) + defer server.Close() + + client := NewAPIClient(APIClientConfig{BaseURL: server.URL}) + + err := client.Register(context.Background(), &RegisterRequest{ID: "bad"}) + + if err == nil { + t.Fatal("expected error, got nil") + } + if want := "register returned status 400"; !containsSubstring(err.Error(), want) { + t.Errorf("expected error containing %q, got %v", want, err) + } + if !containsSubstring(err.Error(), "invalid worker ID") { + t.Errorf("expected error to contain response body, got %v", err) + } +} + +func TestRegister_NetworkError(t *testing.T) { + client := NewAPIClient(APIClientConfig{ + BaseURL: "http://localhost:1", // invalid port + Timeout: 100 * time.Millisecond, + }) + + err := client.Register(context.Background(), &RegisterRequest{ID: "test"}) + + if err == nil { + t.Fatal("expected error, got nil") + } + if !containsSubstring(err.Error(), "register:") { + t.Errorf("expected error wrapped with 'register:', got %v", err) + } +} + +func TestRegister_ContextCanceled(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(1 * time.Second) + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + client := NewAPIClient(APIClientConfig{BaseURL: server.URL}) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() // cancel immediately + + err := client.Register(ctx, &RegisterRequest{ID: "test"}) + + if err == nil { + t.Fatal("expected error, got nil") + } +} + +func TestHeartbeat_Success(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + t.Errorf("expected POST, got %s", r.Method) + } + if r.URL.Path != "/workers/worker-123/heartbeat" { + t.Errorf("expected /workers/worker-123/heartbeat, got %s", r.URL.Path) + } + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + client := NewAPIClient(APIClientConfig{BaseURL: server.URL, APIKey: "key"}) + + err := client.Heartbeat(context.Background(), "worker-123") + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestHeartbeat_WorkerNotFound(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotFound) + _, _ = w.Write([]byte(`{"error":"worker not found"}`)) + })) + defer server.Close() + + client := NewAPIClient(APIClientConfig{BaseURL: server.URL}) + + err := client.Heartbeat(context.Background(), "unknown-worker") + + if err == nil { + t.Fatal("expected error, got nil") + } + if !containsSubstring(err.Error(), "heartbeat returned status 404") { + t.Errorf("expected 404 error, got %v", err) + } +} + +func TestClaimTask_Success(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + t.Errorf("expected POST, got %s", r.Method) + } + if r.URL.Path != "/workers/worker-1/claim" { + t.Errorf("expected /workers/worker-1/claim, got %s", r.URL.Path) + } + + resp := ClaimTaskResponse{ + Success: true, + Data: struct { + Task *WorkTaskData `json:"task"` + WorkerID string `json:"worker_id"` + }{ + WorkerID: "worker-1", + Task: &WorkTaskData{ + ID: "task-123", + ProjectID: "proj-1", + Type: "build", + Status: "running", + Priority: 5, + CreatedAt: "2024-01-15T10:30:00Z", + StartedAt: "2024-01-15T10:31:00Z", + RetryCount: 1, + MaxRetries: 3, + Spec: map[string]any{"prompt": "build it"}, + }, + }, + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(resp) + })) + defer server.Close() + + client := NewAPIClient(APIClientConfig{BaseURL: server.URL}) + + task, err := client.ClaimTask(context.Background(), "worker-1") + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if task == nil { + t.Fatal("expected task, got nil") + } + if task.ID != "task-123" { + t.Errorf("expected task ID task-123, got %s", task.ID) + } + if task.ProjectID != "proj-1" { + t.Errorf("expected project ID proj-1, got %s", task.ProjectID) + } + if task.Type != domain.WorkTaskType("build") { + t.Errorf("expected type build, got %s", task.Type) + } + if task.Priority != 5 { + t.Errorf("expected priority 5, got %d", task.Priority) + } + if task.RetryCount != 1 { + t.Errorf("expected retry count 1, got %d", task.RetryCount) + } +} + +func TestClaimTask_NoTasksAvailable(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNoContent) + })) + defer server.Close() + + client := NewAPIClient(APIClientConfig{BaseURL: server.URL}) + + task, err := client.ClaimTask(context.Background(), "worker-1") + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if task != nil { + t.Errorf("expected nil task, got %+v", task) + } +} + +func TestClaimTask_NilTaskData(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + resp := ClaimTaskResponse{ + Success: true, + Data: struct { + Task *WorkTaskData `json:"task"` + WorkerID string `json:"worker_id"` + }{ + WorkerID: "worker-1", + Task: nil, + }, + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(resp) + })) + defer server.Close() + + client := NewAPIClient(APIClientConfig{BaseURL: server.URL}) + + _, err := client.ClaimTask(context.Background(), "worker-1") + + if err == nil { + t.Fatal("expected error, got nil") + } + if !containsSubstring(err.Error(), "no task data") { + t.Errorf("expected 'no task data' error, got %v", err) + } +} + +func TestClaimTask_MalformedJSON(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{invalid json}`)) + })) + defer server.Close() + + client := NewAPIClient(APIClientConfig{BaseURL: server.URL}) + + _, err := client.ClaimTask(context.Background(), "worker-1") + + if err == nil { + t.Fatal("expected error, got nil") + } + if !containsSubstring(err.Error(), "decode response") { + t.Errorf("expected decode error, got %v", err) + } +} + +func TestClaimTask_ErrorStatus(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte(`{"error":"database error"}`)) + })) + defer server.Close() + + client := NewAPIClient(APIClientConfig{BaseURL: server.URL}) + + _, err := client.ClaimTask(context.Background(), "worker-1") + + if err == nil { + t.Fatal("expected error, got nil") + } + if !containsSubstring(err.Error(), "claim task returned status 500") { + t.Errorf("expected 500 error, got %v", err) + } +} + +func TestCompleteTask_Success(t *testing.T) { + var receivedReq CompleteTaskRequest + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + t.Errorf("expected POST, got %s", r.Method) + } + if r.URL.Path != "/workers/worker-1/complete/task-123" { + t.Errorf("expected /workers/worker-1/complete/task-123, got %s", r.URL.Path) + } + + body, _ := io.ReadAll(r.Body) + _ = json.Unmarshal(body, &receivedReq) + + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + client := NewAPIClient(APIClientConfig{BaseURL: server.URL, APIKey: "key"}) + + result := &domain.BuildResult{ + Success: true, + Output: "Build successful", + CommitSHA: "abc123", + FilesChanged: []string{"main.go", "go.mod"}, + DurationMs: 5000, + Artifacts: map[string]string{"deploy_url": "https://example.com"}, + } + + err := client.CompleteTask(context.Background(), "worker-1", "task-123", result) + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !receivedReq.Success { + t.Error("expected success=true") + } + if receivedReq.Output != "Build successful" { + t.Errorf("expected output 'Build successful', got %s", receivedReq.Output) + } + if receivedReq.CommitSHA != "abc123" { + t.Errorf("expected commit SHA abc123, got %s", receivedReq.CommitSHA) + } + if receivedReq.DurationMs != 5000 { + t.Errorf("expected duration 5000, got %d", receivedReq.DurationMs) + } +} + +func TestCompleteTask_TaskNotFound(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotFound) + _, _ = w.Write([]byte(`{"error":"task not found"}`)) + })) + defer server.Close() + + client := NewAPIClient(APIClientConfig{BaseURL: server.URL}) + + err := client.CompleteTask(context.Background(), "worker-1", "unknown", &domain.BuildResult{}) + + if err == nil { + t.Fatal("expected error, got nil") + } + if !containsSubstring(err.Error(), "complete task returned status 404") { + t.Errorf("expected 404 error, got %v", err) + } +} + +func TestFailTask_Success(t *testing.T) { + var receivedReq FailTaskRequest + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + t.Errorf("expected POST, got %s", r.Method) + } + if r.URL.Path != "/workers/worker-1/fail/task-123" { + t.Errorf("expected /workers/worker-1/fail/task-123, got %s", r.URL.Path) + } + + body, _ := io.ReadAll(r.Body) + _ = json.Unmarshal(body, &receivedReq) + + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + client := NewAPIClient(APIClientConfig{BaseURL: server.URL, APIKey: "key"}) + + err := client.FailTask(context.Background(), "worker-1", "task-123", "build failed", "error output", 3000) + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if receivedReq.Error != "build failed" { + t.Errorf("expected error 'build failed', got %s", receivedReq.Error) + } + if receivedReq.Output != "error output" { + t.Errorf("expected output 'error output', got %s", receivedReq.Output) + } + if receivedReq.DurationMs != 3000 { + t.Errorf("expected duration 3000, got %d", receivedReq.DurationMs) + } +} + +func TestFailTask_ErrorStatus(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte(`{"error":"database unavailable"}`)) + })) + defer server.Close() + + client := NewAPIClient(APIClientConfig{BaseURL: server.URL}) + + err := client.FailTask(context.Background(), "worker-1", "task-123", "failed", "", 0) + + if err == nil { + t.Fatal("expected error, got nil") + } + if !containsSubstring(err.Error(), "fail task returned status 500") { + t.Errorf("expected 500 error, got %v", err) + } +} + +func TestToWorkTask_ParsesTimestamps(t *testing.T) { + data := &WorkTaskData{ + ID: "task-1", + ProjectID: "proj-1", + Type: "build", + Status: "running", + CreatedAt: "2024-01-15T10:30:00Z", + StartedAt: "2024-01-15T10:31:00Z", + } + + task := data.ToWorkTask() + + if task.CreatedAt.IsZero() { + t.Error("expected CreatedAt to be set") + } + if task.CreatedAt.Year() != 2024 || task.CreatedAt.Month() != 1 || task.CreatedAt.Day() != 15 { + t.Errorf("unexpected CreatedAt: %v", task.CreatedAt) + } + + if task.StartedAt == nil { + t.Fatal("expected StartedAt to be set") + } + if task.StartedAt.Hour() != 10 || task.StartedAt.Minute() != 31 { + t.Errorf("unexpected StartedAt: %v", task.StartedAt) + } +} + +func TestToWorkTask_InvalidTimestamp(t *testing.T) { + data := &WorkTaskData{ + ID: "task-1", + ProjectID: "proj-1", + Type: "build", + Status: "running", + CreatedAt: "not-a-timestamp", + StartedAt: "also-not-valid", + } + + task := data.ToWorkTask() + + // Invalid timestamps should result in zero values (current behavior) + if !task.CreatedAt.IsZero() { + t.Errorf("expected zero CreatedAt for invalid timestamp, got %v", task.CreatedAt) + } + // Note: StartedAt is parsed but ignored on error, pointer becomes non-nil with zero value +} + +func TestToWorkTask_EmptyTimestamps(t *testing.T) { + data := &WorkTaskData{ + ID: "task-1", + ProjectID: "proj-1", + Type: "build", + Status: "pending", + CreatedAt: "", + StartedAt: "", + } + + task := data.ToWorkTask() + + if !task.CreatedAt.IsZero() { + t.Errorf("expected zero CreatedAt for empty string, got %v", task.CreatedAt) + } + if task.StartedAt != nil { + t.Errorf("expected nil StartedAt for empty string, got %v", task.StartedAt) + } +} + +func TestToWorkTask_NilData(t *testing.T) { + var data *WorkTaskData + task := data.ToWorkTask() + + if task != nil { + t.Errorf("expected nil task from nil data, got %+v", task) + } +} + +func TestToWorkTask_PreservesAllFields(t *testing.T) { + data := &WorkTaskData{ + ID: "task-1", + ProjectID: "proj-1", + Type: "deploy", + Status: "completed", + Priority: 10, + WorkerID: "worker-99", + CallbackURL: "https://callback.example.com", + RetryCount: 2, + MaxRetries: 5, + Spec: map[string]any{"key": "value"}, + } + + task := data.ToWorkTask() + + if task.ID != "task-1" { + t.Errorf("expected ID task-1, got %s", task.ID) + } + if task.ProjectID != "proj-1" { + t.Errorf("expected ProjectID proj-1, got %s", task.ProjectID) + } + if task.Type != domain.WorkTaskType("deploy") { + t.Errorf("expected Type deploy, got %s", task.Type) + } + if task.Status != domain.WorkTaskStatus("completed") { + t.Errorf("expected Status completed, got %s", task.Status) + } + if task.Priority != 10 { + t.Errorf("expected Priority 10, got %d", task.Priority) + } + if task.WorkerID != "worker-99" { + t.Errorf("expected WorkerID worker-99, got %s", task.WorkerID) + } + if task.CallbackURL != "https://callback.example.com" { + t.Errorf("expected CallbackURL, got %s", task.CallbackURL) + } + if task.RetryCount != 2 { + t.Errorf("expected RetryCount 2, got %d", task.RetryCount) + } + if task.MaxRetries != 5 { + t.Errorf("expected MaxRetries 5, got %d", task.MaxRetries) + } + if task.Spec["key"] != "value" { + t.Errorf("expected Spec[key]=value, got %v", task.Spec) + } +} + +// containsSubstring is a helper to check if a string contains a substring. +func containsSubstring(s, substr string) bool { + return len(s) >= len(substr) && (s == substr || len(s) > 0 && containsHelper(s, substr)) +} + +func containsHelper(s, substr string) bool { + for i := 0; i <= len(s)-len(substr); i++ { + if s[i:i+len(substr)] == substr { + return true + } + } + return false +} diff --git a/internal/worker/mock_test.go b/internal/worker/mock_test.go index a5269a1..b982c9a 100644 --- a/internal/worker/mock_test.go +++ b/internal/worker/mock_test.go @@ -104,6 +104,9 @@ func (m *mockWorkQueue) GetTask(_ context.Context, taskID string) (*domain.WorkT } return task, nil } +func (m *mockWorkQueue) List(_ context.Context, _ *domain.WorkTaskStatus, _ domain.WorkListOptions) (*domain.WorkListResult, error) { + return &domain.WorkListResult{}, nil +} func (m *mockWorkQueue) ListByProject(_ context.Context, _ string, _ *domain.WorkTaskStatus, _ domain.WorkListOptions) (*domain.WorkListResult, error) { return &domain.WorkListResult{}, nil } diff --git a/internal/worker/queue_maintenance_test.go b/internal/worker/queue_maintenance_test.go index cd6ce3e..95d4074 100644 --- a/internal/worker/queue_maintenance_test.go +++ b/internal/worker/queue_maintenance_test.go @@ -63,6 +63,10 @@ func (m *mockMaintenanceQueue) GetTask(_ context.Context, _ string) (*domain.Wor return nil, nil } +func (m *mockMaintenanceQueue) List(_ context.Context, _ *domain.WorkTaskStatus, _ domain.WorkListOptions) (*domain.WorkListResult, error) { + return nil, nil +} + func (m *mockMaintenanceQueue) ListByProject(_ context.Context, _ string, _ *domain.WorkTaskStatus, _ domain.WorkListOptions) (*domain.WorkListResult, error) { return nil, nil }