fix: wire workService to WorkersHandler and add /work/tasks endpoint
Critical fix: WorkersHandler was missing workService dependency, causing 500 errors when workers tried to fail tasks. This caused tasks to get stuck in "running" state permanently. Also adds: - /work/tasks endpoint for debugging all tasks across projects - List method to WorkQueue interface for admin views - HTTP client tests for api_client.go and claudebox/client.go (48 tests) - Split work.go DTOs into work_dto.go to stay under 500 lines Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
d7a6f37593
commit
d74efb75ff
@ -423,7 +423,7 @@ func main() {
|
|||||||
agentsHandler := handlers.NewAgentsHandler(agentRegistry)
|
agentsHandler := handlers.NewAgentsHandler(agentRegistry)
|
||||||
|
|
||||||
// Initialize worker pool handlers
|
// Initialize worker pool handlers
|
||||||
workersHandler := handlers.NewWorkersHandler(workerService)
|
workersHandler := handlers.NewWorkersHandler(workerService).WithWorkService(workService)
|
||||||
buildsHandler := handlers.NewBuildsHandler(buildService)
|
buildsHandler := handlers.NewBuildsHandler(buildService)
|
||||||
createAndBuildHandler := handlers.NewCreateAndBuildHandler(projectInfraService, buildService)
|
createAndBuildHandler := handlers.NewCreateAndBuildHandler(projectInfraService, buildService)
|
||||||
|
|
||||||
|
|||||||
774
internal/adapter/claudebox/client_test.go
Normal file
774
internal/adapter/claudebox/client_test.go
Normal file
@ -0,0 +1,774 @@
|
|||||||
|
package claudebox
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNewClient_DefaultTimeout(t *testing.T) {
|
||||||
|
client := NewClient(ClientConfig{
|
||||||
|
BaseURL: "http://localhost:8080",
|
||||||
|
})
|
||||||
|
|
||||||
|
if client.httpClient.Timeout != 10*time.Minute {
|
||||||
|
t.Errorf("expected default timeout 10m, got %v", client.httpClient.Timeout)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewClient_CustomTimeout(t *testing.T) {
|
||||||
|
client := NewClient(ClientConfig{
|
||||||
|
BaseURL: "http://localhost:8080",
|
||||||
|
Timeout: 5 * time.Minute,
|
||||||
|
})
|
||||||
|
|
||||||
|
if client.httpClient.Timeout != 5*time.Minute {
|
||||||
|
t.Errorf("expected timeout 5m, got %v", client.httpClient.Timeout)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewClient_TrimsTrailingSlash(t *testing.T) {
|
||||||
|
client := NewClient(ClientConfig{
|
||||||
|
BaseURL: "http://localhost:8080/",
|
||||||
|
})
|
||||||
|
|
||||||
|
if client.baseURL != "http://localhost:8080" {
|
||||||
|
t.Errorf("expected trailing slash trimmed, got %s", client.baseURL)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHealth_Success(t *testing.T) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.Method != http.MethodGet {
|
||||||
|
t.Errorf("expected GET, got %s", r.Method)
|
||||||
|
}
|
||||||
|
if r.URL.Path != "/health" {
|
||||||
|
t.Errorf("expected /health, got %s", r.URL.Path)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := HealthResponse{
|
||||||
|
Status: "healthy",
|
||||||
|
Timestamp: "2024-01-15T10:30:00Z",
|
||||||
|
WorkDir: "/workspace",
|
||||||
|
}
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
_ = json.NewEncoder(w).Encode(resp)
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewClient(ClientConfig{BaseURL: server.URL})
|
||||||
|
|
||||||
|
health, err := client.Health(context.Background())
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if health.Status != "healthy" {
|
||||||
|
t.Errorf("expected status 'healthy', got %s", health.Status)
|
||||||
|
}
|
||||||
|
if health.WorkDir != "/workspace" {
|
||||||
|
t.Errorf("expected work_dir '/workspace', got %s", health.WorkDir)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHealth_Unhealthy(t *testing.T) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusServiceUnavailable)
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewClient(ClientConfig{BaseURL: server.URL})
|
||||||
|
|
||||||
|
_, err := client.Health(context.Background())
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected error, got nil")
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), "health check returned status 503") {
|
||||||
|
t.Errorf("expected 503 error, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHealth_NetworkError(t *testing.T) {
|
||||||
|
client := NewClient(ClientConfig{
|
||||||
|
BaseURL: "http://localhost:1",
|
||||||
|
Timeout: 100 * time.Millisecond,
|
||||||
|
})
|
||||||
|
|
||||||
|
_, err := client.Health(context.Background())
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected error, got nil")
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), "health check:") {
|
||||||
|
t.Errorf("expected error wrapped with 'health check:', got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestExecute_Success(t *testing.T) {
|
||||||
|
var receivedReq ExecuteRequest
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.Method != http.MethodPost {
|
||||||
|
t.Errorf("expected POST, got %s", r.Method)
|
||||||
|
}
|
||||||
|
if r.URL.Path != "/execute" {
|
||||||
|
t.Errorf("expected /execute, got %s", r.URL.Path)
|
||||||
|
}
|
||||||
|
if r.Header.Get("Content-Type") != "application/json" {
|
||||||
|
t.Errorf("expected Content-Type application/json, got %s", r.Header.Get("Content-Type"))
|
||||||
|
}
|
||||||
|
|
||||||
|
body, _ := io.ReadAll(r.Body)
|
||||||
|
_ = json.Unmarshal(body, &receivedReq)
|
||||||
|
|
||||||
|
resp := ExecuteResponse{
|
||||||
|
Success: true,
|
||||||
|
Output: "Task completed",
|
||||||
|
ExitCode: 0,
|
||||||
|
DurationMs: 5000,
|
||||||
|
SessionID: "session-123",
|
||||||
|
}
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
_ = json.NewEncoder(w).Encode(resp)
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewClient(ClientConfig{BaseURL: server.URL})
|
||||||
|
|
||||||
|
req := &ExecuteRequest{
|
||||||
|
Prompt: "Build the project",
|
||||||
|
AllowedTools: []string{"Bash", "Read", "Write"},
|
||||||
|
WorkingDir: "/workspace/project",
|
||||||
|
Timeout: 300,
|
||||||
|
Metadata: map[string]string{"task_id": "task-1"},
|
||||||
|
}
|
||||||
|
|
||||||
|
result, err := client.Execute(context.Background(), req)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if !result.Success {
|
||||||
|
t.Error("expected success=true")
|
||||||
|
}
|
||||||
|
if result.Output != "Task completed" {
|
||||||
|
t.Errorf("expected output 'Task completed', got %s", result.Output)
|
||||||
|
}
|
||||||
|
if result.ExitCode != 0 {
|
||||||
|
t.Errorf("expected exit code 0, got %d", result.ExitCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify request was serialized correctly
|
||||||
|
if receivedReq.Prompt != "Build the project" {
|
||||||
|
t.Errorf("expected prompt 'Build the project', got %s", receivedReq.Prompt)
|
||||||
|
}
|
||||||
|
if len(receivedReq.AllowedTools) != 3 {
|
||||||
|
t.Errorf("expected 3 allowed tools, got %d", len(receivedReq.AllowedTools))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestExecute_ErrorStatus(t *testing.T) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
|
_, _ = w.Write([]byte(`{"error":"invalid prompt"}`))
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewClient(ClientConfig{BaseURL: server.URL})
|
||||||
|
|
||||||
|
_, err := client.Execute(context.Background(), &ExecuteRequest{Prompt: ""})
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected error, got nil")
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), "execute returned status 400") {
|
||||||
|
t.Errorf("expected 400 error, got %v", err)
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), "invalid prompt") {
|
||||||
|
t.Errorf("expected error body in message, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestExecute_MalformedResponse(t *testing.T) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
_, _ = w.Write([]byte(`{invalid json`))
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewClient(ClientConfig{BaseURL: server.URL})
|
||||||
|
|
||||||
|
_, err := client.Execute(context.Background(), &ExecuteRequest{Prompt: "test"})
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected error, got nil")
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), "decode response") {
|
||||||
|
t.Errorf("expected decode error, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestExecuteStream_Success(t *testing.T) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.Method != http.MethodPost {
|
||||||
|
t.Errorf("expected POST, got %s", r.Method)
|
||||||
|
}
|
||||||
|
if r.URL.Path != "/execute/stream" {
|
||||||
|
t.Errorf("expected /execute/stream, got %s", r.URL.Path)
|
||||||
|
}
|
||||||
|
if r.Header.Get("Accept") != "text/event-stream" {
|
||||||
|
t.Errorf("expected Accept text/event-stream, got %s", r.Header.Get("Accept"))
|
||||||
|
}
|
||||||
|
|
||||||
|
w.Header().Set("Content-Type", "text/event-stream")
|
||||||
|
w.Header().Set("Cache-Control", "no-cache")
|
||||||
|
w.Header().Set("Connection", "keep-alive")
|
||||||
|
|
||||||
|
flusher, ok := w.(http.Flusher)
|
||||||
|
if !ok {
|
||||||
|
t.Fatal("expected http.Flusher")
|
||||||
|
}
|
||||||
|
|
||||||
|
events := []StreamEvent{
|
||||||
|
{Type: "start", Timestamp: "2024-01-15T10:30:00Z"},
|
||||||
|
{Type: "output", Content: "Building...", Timestamp: "2024-01-15T10:30:01Z"},
|
||||||
|
{Type: "tool_call", ToolName: "Bash", Timestamp: "2024-01-15T10:30:02Z"},
|
||||||
|
{Type: "complete", Content: "Done", Timestamp: "2024-01-15T10:30:05Z"},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, event := range events {
|
||||||
|
data, _ := json.Marshal(event)
|
||||||
|
fmt.Fprintf(w, "data: %s\n\n", data)
|
||||||
|
flusher.Flush()
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewClient(ClientConfig{BaseURL: server.URL})
|
||||||
|
|
||||||
|
var receivedEvents []StreamEvent
|
||||||
|
var mu sync.Mutex
|
||||||
|
|
||||||
|
err := client.ExecuteStream(context.Background(), &ExecuteRequest{Prompt: "build"}, func(event StreamEvent) {
|
||||||
|
mu.Lock()
|
||||||
|
receivedEvents = append(receivedEvents, event)
|
||||||
|
mu.Unlock()
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(receivedEvents) != 4 {
|
||||||
|
t.Fatalf("expected 4 events, got %d", len(receivedEvents))
|
||||||
|
}
|
||||||
|
if receivedEvents[0].Type != "start" {
|
||||||
|
t.Errorf("expected first event type 'start', got %s", receivedEvents[0].Type)
|
||||||
|
}
|
||||||
|
if receivedEvents[1].Content != "Building..." {
|
||||||
|
t.Errorf("expected second event content 'Building...', got %s", receivedEvents[1].Content)
|
||||||
|
}
|
||||||
|
if receivedEvents[2].ToolName != "Bash" {
|
||||||
|
t.Errorf("expected third event tool name 'Bash', got %s", receivedEvents[2].ToolName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestExecuteStream_SkipsMalformedEvents(t *testing.T) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("Content-Type", "text/event-stream")
|
||||||
|
|
||||||
|
flusher, _ := w.(http.Flusher)
|
||||||
|
|
||||||
|
// Valid event
|
||||||
|
event1, _ := json.Marshal(StreamEvent{Type: "start"})
|
||||||
|
fmt.Fprintf(w, "data: %s\n\n", event1)
|
||||||
|
flusher.Flush()
|
||||||
|
|
||||||
|
// Malformed JSON - should be skipped
|
||||||
|
fmt.Fprintf(w, "data: {invalid json}\n\n")
|
||||||
|
flusher.Flush()
|
||||||
|
|
||||||
|
// Empty data - should be skipped
|
||||||
|
fmt.Fprintf(w, "data: \n\n")
|
||||||
|
flusher.Flush()
|
||||||
|
|
||||||
|
// Non-data line - should be skipped
|
||||||
|
fmt.Fprintf(w, "event: ping\n\n")
|
||||||
|
flusher.Flush()
|
||||||
|
|
||||||
|
// Valid event
|
||||||
|
event2, _ := json.Marshal(StreamEvent{Type: "complete"})
|
||||||
|
fmt.Fprintf(w, "data: %s\n\n", event2)
|
||||||
|
flusher.Flush()
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewClient(ClientConfig{BaseURL: server.URL})
|
||||||
|
|
||||||
|
var receivedEvents []StreamEvent
|
||||||
|
var mu sync.Mutex
|
||||||
|
|
||||||
|
err := client.ExecuteStream(context.Background(), &ExecuteRequest{Prompt: "test"}, func(event StreamEvent) {
|
||||||
|
mu.Lock()
|
||||||
|
receivedEvents = append(receivedEvents, event)
|
||||||
|
mu.Unlock()
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should only receive the 2 valid events
|
||||||
|
if len(receivedEvents) != 2 {
|
||||||
|
t.Fatalf("expected 2 events (malformed skipped), got %d", len(receivedEvents))
|
||||||
|
}
|
||||||
|
if receivedEvents[0].Type != "start" {
|
||||||
|
t.Errorf("expected first event 'start', got %s", receivedEvents[0].Type)
|
||||||
|
}
|
||||||
|
if receivedEvents[1].Type != "complete" {
|
||||||
|
t.Errorf("expected second event 'complete', got %s", receivedEvents[1].Type)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestExecuteStream_ErrorStatus(t *testing.T) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
_, _ = w.Write([]byte(`{"error":"agent unavailable"}`))
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewClient(ClientConfig{BaseURL: server.URL})
|
||||||
|
|
||||||
|
err := client.ExecuteStream(context.Background(), &ExecuteRequest{Prompt: "test"}, func(event StreamEvent) {
|
||||||
|
t.Error("handler should not be called on error")
|
||||||
|
})
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected error, got nil")
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), "execute stream returned status 500") {
|
||||||
|
t.Errorf("expected 500 error, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestExecuteStream_ContextCanceledBeforeRequest(t *testing.T) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
t.Error("handler should not be called when context is already canceled")
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewClient(ClientConfig{BaseURL: server.URL})
|
||||||
|
|
||||||
|
// Cancel context before making request
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
err := client.ExecuteStream(ctx, &ExecuteRequest{Prompt: "test"}, func(event StreamEvent) {})
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected error, got nil")
|
||||||
|
}
|
||||||
|
// Should get a context canceled error
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGitClone_Success(t *testing.T) {
|
||||||
|
var receivedReq GitCloneRequest
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.Method != http.MethodPost {
|
||||||
|
t.Errorf("expected POST, got %s", r.Method)
|
||||||
|
}
|
||||||
|
if r.URL.Path != "/git/clone" {
|
||||||
|
t.Errorf("expected /git/clone, got %s", r.URL.Path)
|
||||||
|
}
|
||||||
|
|
||||||
|
body, _ := io.ReadAll(r.Body)
|
||||||
|
_ = json.Unmarshal(body, &receivedReq)
|
||||||
|
|
||||||
|
resp := GitCloneResponse{
|
||||||
|
Success: true,
|
||||||
|
Cloned: true,
|
||||||
|
}
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
_ = json.NewEncoder(w).Encode(resp)
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewClient(ClientConfig{BaseURL: server.URL})
|
||||||
|
|
||||||
|
result, err := client.GitClone(context.Background(), "https://github.com/example/repo.git", "/workspace")
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if !result.Success {
|
||||||
|
t.Error("expected success=true")
|
||||||
|
}
|
||||||
|
if !result.Cloned {
|
||||||
|
t.Error("expected cloned=true")
|
||||||
|
}
|
||||||
|
if receivedReq.CloneURL != "https://github.com/example/repo.git" {
|
||||||
|
t.Errorf("expected clone URL, got %s", receivedReq.CloneURL)
|
||||||
|
}
|
||||||
|
if receivedReq.WorkDir != "/workspace" {
|
||||||
|
t.Errorf("expected work dir '/workspace', got %s", receivedReq.WorkDir)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGitClone_AlreadyExists(t *testing.T) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
resp := GitCloneResponse{
|
||||||
|
Success: true,
|
||||||
|
Cloned: false, // Already existed, just updated
|
||||||
|
}
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
_ = json.NewEncoder(w).Encode(resp)
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewClient(ClientConfig{BaseURL: server.URL})
|
||||||
|
|
||||||
|
result, err := client.GitClone(context.Background(), "https://github.com/example/repo.git", "/workspace")
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if !result.Success {
|
||||||
|
t.Error("expected success=true")
|
||||||
|
}
|
||||||
|
if result.Cloned {
|
||||||
|
t.Error("expected cloned=false for existing repo")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGitClone_ErrorStatus(t *testing.T) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
|
_, _ = w.Write([]byte(`{"error":"invalid clone URL"}`))
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewClient(ClientConfig{BaseURL: server.URL})
|
||||||
|
|
||||||
|
_, err := client.GitClone(context.Background(), "invalid", "/workspace")
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected error, got nil")
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), "git clone returned status 400") {
|
||||||
|
t.Errorf("expected 400 error, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGitCommitAndPush_Success(t *testing.T) {
|
||||||
|
var receivedReq GitCommitAndPushRequest
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.Method != http.MethodPost {
|
||||||
|
t.Errorf("expected POST, got %s", r.Method)
|
||||||
|
}
|
||||||
|
if r.URL.Path != "/git/commit-and-push" {
|
||||||
|
t.Errorf("expected /git/commit-and-push, got %s", r.URL.Path)
|
||||||
|
}
|
||||||
|
|
||||||
|
body, _ := io.ReadAll(r.Body)
|
||||||
|
_ = json.Unmarshal(body, &receivedReq)
|
||||||
|
|
||||||
|
resp := GitCommitAndPushResponse{
|
||||||
|
Success: true,
|
||||||
|
HasChanges: true,
|
||||||
|
CommitSHA: "abc123def456",
|
||||||
|
FilesChanged: []string{"main.go", "go.mod"},
|
||||||
|
Pushed: true,
|
||||||
|
}
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
_ = json.NewEncoder(w).Encode(resp)
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewClient(ClientConfig{BaseURL: server.URL})
|
||||||
|
|
||||||
|
result, err := client.GitCommitAndPush(context.Background(), "feat: add feature", true, "/workspace")
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if !result.Success {
|
||||||
|
t.Error("expected success=true")
|
||||||
|
}
|
||||||
|
if !result.HasChanges {
|
||||||
|
t.Error("expected has_changes=true")
|
||||||
|
}
|
||||||
|
if result.CommitSHA != "abc123def456" {
|
||||||
|
t.Errorf("expected commit SHA abc123def456, got %s", result.CommitSHA)
|
||||||
|
}
|
||||||
|
if !result.Pushed {
|
||||||
|
t.Error("expected pushed=true")
|
||||||
|
}
|
||||||
|
if len(result.FilesChanged) != 2 {
|
||||||
|
t.Errorf("expected 2 files changed, got %d", len(result.FilesChanged))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify request
|
||||||
|
if receivedReq.Message != "feat: add feature" {
|
||||||
|
t.Errorf("expected message 'feat: add feature', got %s", receivedReq.Message)
|
||||||
|
}
|
||||||
|
if !receivedReq.Push {
|
||||||
|
t.Error("expected push=true in request")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGitCommitAndPush_NoChanges(t *testing.T) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
resp := GitCommitAndPushResponse{
|
||||||
|
Success: true,
|
||||||
|
HasChanges: false,
|
||||||
|
}
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
_ = json.NewEncoder(w).Encode(resp)
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewClient(ClientConfig{BaseURL: server.URL})
|
||||||
|
|
||||||
|
result, err := client.GitCommitAndPush(context.Background(), "test", false, "/workspace")
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if !result.Success {
|
||||||
|
t.Error("expected success=true")
|
||||||
|
}
|
||||||
|
if result.HasChanges {
|
||||||
|
t.Error("expected has_changes=false")
|
||||||
|
}
|
||||||
|
if result.CommitSHA != "" {
|
||||||
|
t.Errorf("expected empty commit SHA, got %s", result.CommitSHA)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGitCommitAndPush_ErrorStatus(t *testing.T) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
_, _ = w.Write([]byte(`{"error":"git push failed"}`))
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewClient(ClientConfig{BaseURL: server.URL})
|
||||||
|
|
||||||
|
_, err := client.GitCommitAndPush(context.Background(), "test", true, "/workspace")
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected error, got nil")
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), "git commit returned status 500") {
|
||||||
|
t.Errorf("expected 500 error, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGitStatus_Success(t *testing.T) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.Method != http.MethodGet {
|
||||||
|
t.Errorf("expected GET, got %s", r.Method)
|
||||||
|
}
|
||||||
|
if r.URL.Path != "/git/status" {
|
||||||
|
t.Errorf("expected /git/status, got %s", r.URL.Path)
|
||||||
|
}
|
||||||
|
if r.URL.Query().Get("work_dir") != "/workspace/project" {
|
||||||
|
t.Errorf("expected work_dir query param, got %s", r.URL.Query().Get("work_dir"))
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := GitStatusResponse{
|
||||||
|
IsRepo: true,
|
||||||
|
HasChanges: true,
|
||||||
|
ChangedFiles: []string{"main.go", "README.md"},
|
||||||
|
Branch: "feature/test",
|
||||||
|
}
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
_ = json.NewEncoder(w).Encode(resp)
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewClient(ClientConfig{BaseURL: server.URL})
|
||||||
|
|
||||||
|
result, err := client.GitStatus(context.Background(), "/workspace/project")
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if !result.IsRepo {
|
||||||
|
t.Error("expected is_repo=true")
|
||||||
|
}
|
||||||
|
if !result.HasChanges {
|
||||||
|
t.Error("expected has_changes=true")
|
||||||
|
}
|
||||||
|
if result.Branch != "feature/test" {
|
||||||
|
t.Errorf("expected branch 'feature/test', got %s", result.Branch)
|
||||||
|
}
|
||||||
|
if len(result.ChangedFiles) != 2 {
|
||||||
|
t.Errorf("expected 2 changed files, got %d", len(result.ChangedFiles))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGitStatus_EmptyWorkDir(t *testing.T) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
// work_dir should not be in query when empty
|
||||||
|
if r.URL.Query().Get("work_dir") != "" {
|
||||||
|
t.Errorf("expected empty work_dir, got %s", r.URL.Query().Get("work_dir"))
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := GitStatusResponse{IsRepo: true}
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
_ = json.NewEncoder(w).Encode(resp)
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewClient(ClientConfig{BaseURL: server.URL})
|
||||||
|
|
||||||
|
_, err := client.GitStatus(context.Background(), "")
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGitStatus_ErrorStatus(t *testing.T) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
_, _ = w.Write([]byte(`{"error":"not a git repository"}`))
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewClient(ClientConfig{BaseURL: server.URL})
|
||||||
|
|
||||||
|
_, err := client.GitStatus(context.Background(), "/workspace")
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected error, got nil")
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), "git status returned status 404") {
|
||||||
|
t.Errorf("expected 404 error, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRunSDLC_Success(t *testing.T) {
|
||||||
|
var receivedReq SDLCRequest
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.Method != http.MethodPost {
|
||||||
|
t.Errorf("expected POST, got %s", r.Method)
|
||||||
|
}
|
||||||
|
if r.URL.Path != "/sdlc" {
|
||||||
|
t.Errorf("expected /sdlc, got %s", r.URL.Path)
|
||||||
|
}
|
||||||
|
|
||||||
|
body, _ := io.ReadAll(r.Body)
|
||||||
|
_ = json.Unmarshal(body, &receivedReq)
|
||||||
|
|
||||||
|
resp := SDLCResponse{
|
||||||
|
Success: true,
|
||||||
|
Output: "Feature started successfully",
|
||||||
|
Data: json.RawMessage(`{"feature_id":"feat-123"}`),
|
||||||
|
}
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
_ = json.NewEncoder(w).Encode(resp)
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewClient(ClientConfig{BaseURL: server.URL})
|
||||||
|
|
||||||
|
result, err := client.RunSDLC(context.Background(), "start", []string{"--name", "test-feature"}, "/workspace")
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if !result.Success {
|
||||||
|
t.Error("expected success=true")
|
||||||
|
}
|
||||||
|
if result.Output != "Feature started successfully" {
|
||||||
|
t.Errorf("expected output message, got %s", result.Output)
|
||||||
|
}
|
||||||
|
if result.Data == nil {
|
||||||
|
t.Error("expected data to be set")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify request
|
||||||
|
if receivedReq.Command != "start" {
|
||||||
|
t.Errorf("expected command 'start', got %s", receivedReq.Command)
|
||||||
|
}
|
||||||
|
if len(receivedReq.Args) != 2 {
|
||||||
|
t.Errorf("expected 2 args, got %d", len(receivedReq.Args))
|
||||||
|
}
|
||||||
|
if receivedReq.WorkDir != "/workspace" {
|
||||||
|
t.Errorf("expected work dir '/workspace', got %s", receivedReq.WorkDir)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRunSDLC_CommandFailed(t *testing.T) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
resp := SDLCResponse{
|
||||||
|
Success: false,
|
||||||
|
Output: "Command output before failure",
|
||||||
|
Error: "validation failed: missing required field",
|
||||||
|
}
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
_ = json.NewEncoder(w).Encode(resp)
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewClient(ClientConfig{BaseURL: server.URL})
|
||||||
|
|
||||||
|
result, err := client.RunSDLC(context.Background(), "validate", nil, "/workspace")
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if result.Success {
|
||||||
|
t.Error("expected success=false")
|
||||||
|
}
|
||||||
|
if result.Error != "validation failed: missing required field" {
|
||||||
|
t.Errorf("expected error message, got %s", result.Error)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRunSDLC_ErrorStatus(t *testing.T) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
_, _ = w.Write([]byte(`{"error":"sdlc binary not found"}`))
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewClient(ClientConfig{BaseURL: server.URL})
|
||||||
|
|
||||||
|
_, err := client.RunSDLC(context.Background(), "status", nil, "/workspace")
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected error, got nil")
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), "sdlc returned status 500") {
|
||||||
|
t.Errorf("expected 500 error, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRunSDLC_MalformedResponse(t *testing.T) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
_, _ = w.Write([]byte(`{invalid`))
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewClient(ClientConfig{BaseURL: server.URL})
|
||||||
|
|
||||||
|
_, err := client.RunSDLC(context.Background(), "status", nil, "/workspace")
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected error, got nil")
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), "decode response") {
|
||||||
|
t.Errorf("expected decode error, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -97,6 +97,63 @@ func (r *WorkQueueRepository) GetTask(ctx context.Context, taskID string) (*doma
|
|||||||
return &task, nil
|
return &task, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// List returns all tasks with optional status filter and pagination.
|
||||||
|
func (r *WorkQueueRepository) List(ctx context.Context, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
|
||||||
|
opts.Normalize()
|
||||||
|
|
||||||
|
// Build optional WHERE clause
|
||||||
|
whereClause := ""
|
||||||
|
var args []any
|
||||||
|
argNum := 1
|
||||||
|
|
||||||
|
if status != nil {
|
||||||
|
whereClause = fmt.Sprintf("WHERE status = $%d", argNum)
|
||||||
|
args = append(args, string(*status))
|
||||||
|
argNum++
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get total count
|
||||||
|
countQuery := "SELECT COUNT(*) FROM work_queue " + whereClause
|
||||||
|
var total int64
|
||||||
|
if err := r.db.QueryRowContext(ctx, countQuery, args...).Scan(&total); err != nil {
|
||||||
|
return nil, fmt.Errorf("count work tasks: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build paginated query
|
||||||
|
query := fmt.Sprintf(`
|
||||||
|
SELECT id, project_id, task_type, task_spec, status, priority, worker_id,
|
||||||
|
callback_url, created_at, started_at, completed_at, result, error,
|
||||||
|
retry_count, max_retries, error_code
|
||||||
|
FROM work_queue
|
||||||
|
%s
|
||||||
|
ORDER BY created_at DESC
|
||||||
|
LIMIT $%d OFFSET $%d
|
||||||
|
`, whereClause, argNum, argNum+1)
|
||||||
|
args = append(args, opts.Limit, opts.Offset)
|
||||||
|
|
||||||
|
rows, err := r.db.QueryContext(ctx, query, args...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("list work tasks: %w", err)
|
||||||
|
}
|
||||||
|
defer func() { _ = rows.Close() }()
|
||||||
|
|
||||||
|
var tasks []*domain.WorkTask
|
||||||
|
for rows.Next() {
|
||||||
|
task, err := r.scanTask(rows)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
tasks = append(tasks, task)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &domain.WorkListResult{
|
||||||
|
Tasks: tasks,
|
||||||
|
Total: total,
|
||||||
|
Limit: opts.Limit,
|
||||||
|
Offset: opts.Offset,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
// ListByProject returns tasks for a project with optional status filter and pagination.
|
// ListByProject returns tasks for a project with optional status filter and pagination.
|
||||||
func (r *WorkQueueRepository) ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
|
func (r *WorkQueueRepository) ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
|
||||||
// Normalize pagination options
|
// Normalize pagination options
|
||||||
|
|||||||
@ -80,6 +80,10 @@ func (m *mockWorkQueue) GetTask(ctx context.Context, taskID string) (*domain.Wor
|
|||||||
return task, nil
|
return task, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *mockWorkQueue) List(ctx context.Context, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
|
||||||
|
return &domain.WorkListResult{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (m *mockWorkQueue) ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
|
func (m *mockWorkQueue) ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
|
||||||
return &domain.WorkListResult{}, nil
|
return &domain.WorkListResult{}, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@ -38,10 +38,11 @@ func (h *WorkHandler) Mount(r api.Router) {
|
|||||||
r.With(auth.RequireScope(auth.ScopeQueueWrite, auth.ScopeAdmin)).Post("/{taskId}/cancel", h.Cancel)
|
r.With(auth.RequireScope(auth.ScopeQueueWrite, auth.ScopeAdmin)).Post("/{taskId}/cancel", h.Cancel)
|
||||||
|
|
||||||
// Read operations
|
// Read operations
|
||||||
|
r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/tasks", h.ListTasks)
|
||||||
|
r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/stats", h.Stats)
|
||||||
|
r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/projects/{projectId}", h.ListByProject)
|
||||||
r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/{taskId}", h.GetTask)
|
r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/{taskId}", h.GetTask)
|
||||||
r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/{taskId}/status", h.GetStatus)
|
r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/{taskId}/status", h.GetStatus)
|
||||||
r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/projects/{projectId}", h.ListByProject)
|
|
||||||
r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/stats", h.Stats)
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -121,65 +122,6 @@ type DequeueWorkResponse struct {
|
|||||||
Task *WorkTaskDTO `json:"task,omitempty"`
|
Task *WorkTaskDTO `json:"task,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// WorkTaskDTO is the data transfer object for work tasks.
|
|
||||||
type WorkTaskDTO struct {
|
|
||||||
ID string `json:"id"`
|
|
||||||
ProjectID string `json:"project_id"`
|
|
||||||
Type string `json:"type"`
|
|
||||||
Spec map[string]any `json:"spec"`
|
|
||||||
Status string `json:"status"`
|
|
||||||
Priority int `json:"priority"`
|
|
||||||
WorkerID string `json:"worker_id,omitempty"`
|
|
||||||
CallbackURL string `json:"callback_url,omitempty"`
|
|
||||||
CreatedAt string `json:"created_at"`
|
|
||||||
StartedAt string `json:"started_at,omitempty"`
|
|
||||||
CompletedAt string `json:"completed_at,omitempty"`
|
|
||||||
Result *WorkResultDTO `json:"result,omitempty"`
|
|
||||||
Error string `json:"error,omitempty"`
|
|
||||||
RetryCount int `json:"retry_count"`
|
|
||||||
MaxRetries int `json:"max_retries"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// WorkResultDTO is the data transfer object for work results.
|
|
||||||
type WorkResultDTO struct {
|
|
||||||
Output string `json:"output,omitempty"`
|
|
||||||
Artifacts map[string]string `json:"artifacts,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// toWorkTaskDTO converts a domain.WorkTask to a WorkTaskDTO.
|
|
||||||
func toWorkTaskDTO(t *domain.WorkTask) *WorkTaskDTO {
|
|
||||||
if t == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
dto := &WorkTaskDTO{
|
|
||||||
ID: t.ID,
|
|
||||||
ProjectID: t.ProjectID,
|
|
||||||
Type: string(t.Type),
|
|
||||||
Spec: t.Spec,
|
|
||||||
Status: string(t.Status),
|
|
||||||
Priority: t.Priority,
|
|
||||||
WorkerID: t.WorkerID,
|
|
||||||
CallbackURL: t.CallbackURL,
|
|
||||||
CreatedAt: t.CreatedAt.Format("2006-01-02T15:04:05Z07:00"),
|
|
||||||
Error: t.Error,
|
|
||||||
RetryCount: t.RetryCount,
|
|
||||||
MaxRetries: t.MaxRetries,
|
|
||||||
}
|
|
||||||
if t.StartedAt != nil {
|
|
||||||
dto.StartedAt = t.StartedAt.Format("2006-01-02T15:04:05Z07:00")
|
|
||||||
}
|
|
||||||
if t.CompletedAt != nil {
|
|
||||||
dto.CompletedAt = t.CompletedAt.Format("2006-01-02T15:04:05Z07:00")
|
|
||||||
}
|
|
||||||
if t.Result != nil {
|
|
||||||
dto.Result = &WorkResultDTO{
|
|
||||||
Output: t.Result.Output,
|
|
||||||
Artifacts: t.Result.Artifacts,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return dto
|
|
||||||
}
|
|
||||||
|
|
||||||
// Dequeue claims the next available task for a worker.
|
// Dequeue claims the next available task for a worker.
|
||||||
// POST /work/dequeue
|
// POST /work/dequeue
|
||||||
func (h *WorkHandler) Dequeue(w http.ResponseWriter, r *http.Request) {
|
func (h *WorkHandler) Dequeue(w http.ResponseWriter, r *http.Request) {
|
||||||
@ -341,6 +283,58 @@ func (h *WorkHandler) Cancel(w http.ResponseWriter, r *http.Request) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ListTasks returns all tasks with optional status filter and pagination.
|
||||||
|
// GET /work/tasks?status=running&limit=50&offset=0
|
||||||
|
func (h *WorkHandler) ListTasks(w http.ResponseWriter, r *http.Request) {
|
||||||
|
// Parse and validate optional status filter
|
||||||
|
var status *domain.WorkTaskStatus
|
||||||
|
if s := r.URL.Query().Get("status"); s != "" {
|
||||||
|
st := domain.WorkTaskStatus(s)
|
||||||
|
if !st.IsValid() {
|
||||||
|
api.WriteBadRequest(w, r, "invalid status filter: must be pending, running, completed, failed, or cancelled")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
status = &st
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse pagination options
|
||||||
|
opts := domain.DefaultWorkListOptions()
|
||||||
|
if limitStr := r.URL.Query().Get("limit"); limitStr != "" {
|
||||||
|
limit, err := strconv.Atoi(limitStr)
|
||||||
|
if err != nil {
|
||||||
|
api.WriteBadRequest(w, r, "limit must be a valid integer")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
opts.Limit = limit
|
||||||
|
}
|
||||||
|
if offsetStr := r.URL.Query().Get("offset"); offsetStr != "" {
|
||||||
|
offset, err := strconv.Atoi(offsetStr)
|
||||||
|
if err != nil {
|
||||||
|
api.WriteBadRequest(w, r, "offset must be a valid integer")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
opts.Offset = offset
|
||||||
|
}
|
||||||
|
|
||||||
|
result, err := h.workService.List(r.Context(), status, opts)
|
||||||
|
if err != nil {
|
||||||
|
api.WriteInternalError(w, r, "failed to list tasks")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
dtos := make([]*WorkTaskDTO, len(result.Tasks))
|
||||||
|
for i, t := range result.Tasks {
|
||||||
|
dtos[i] = toWorkTaskDTO(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
api.WriteSuccess(w, r, map[string]any{
|
||||||
|
"tasks": dtos,
|
||||||
|
"total": result.Total,
|
||||||
|
"limit": result.Limit,
|
||||||
|
"offset": result.Offset,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// ListByProject returns tasks for a project with pagination.
|
// ListByProject returns tasks for a project with pagination.
|
||||||
// GET /work/projects/{projectId}?status=pending&limit=50&offset=0
|
// GET /work/projects/{projectId}?status=pending&limit=50&offset=0
|
||||||
func (h *WorkHandler) ListByProject(w http.ResponseWriter, r *http.Request) {
|
func (h *WorkHandler) ListByProject(w http.ResponseWriter, r *http.Request) {
|
||||||
@ -428,21 +422,3 @@ func (h *WorkHandler) Stats(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
api.WriteSuccess(w, r, resp)
|
api.WriteSuccess(w, r, resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
// formatDuration formats a duration in a human-readable way.
|
|
||||||
func formatDuration(d interface{ Seconds() float64 }) string {
|
|
||||||
secs := d.Seconds()
|
|
||||||
if secs < 60 {
|
|
||||||
return fmt.Sprintf("%.0fs", secs)
|
|
||||||
}
|
|
||||||
mins := secs / 60
|
|
||||||
if mins < 60 {
|
|
||||||
return fmt.Sprintf("%.1fm", mins)
|
|
||||||
}
|
|
||||||
hours := mins / 60
|
|
||||||
if hours < 24 {
|
|
||||||
return fmt.Sprintf("%.1fh", hours)
|
|
||||||
}
|
|
||||||
days := hours / 24
|
|
||||||
return fmt.Sprintf("%.1fd", days)
|
|
||||||
}
|
|
||||||
|
|||||||
85
internal/handlers/work_dto.go
Normal file
85
internal/handlers/work_dto.go
Normal file
@ -0,0 +1,85 @@
|
|||||||
|
// Package handlers provides HTTP handlers for the rdev API.
|
||||||
|
package handlers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/orchard9/rdev/internal/domain"
|
||||||
|
)
|
||||||
|
|
||||||
|
// WorkTaskDTO is the data transfer object for work tasks.
|
||||||
|
type WorkTaskDTO struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
ProjectID string `json:"project_id"`
|
||||||
|
Type string `json:"type"`
|
||||||
|
Spec map[string]any `json:"spec"`
|
||||||
|
Status string `json:"status"`
|
||||||
|
Priority int `json:"priority"`
|
||||||
|
WorkerID string `json:"worker_id,omitempty"`
|
||||||
|
CallbackURL string `json:"callback_url,omitempty"`
|
||||||
|
CreatedAt string `json:"created_at"`
|
||||||
|
StartedAt string `json:"started_at,omitempty"`
|
||||||
|
CompletedAt string `json:"completed_at,omitempty"`
|
||||||
|
Result *WorkResultDTO `json:"result,omitempty"`
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
|
RetryCount int `json:"retry_count"`
|
||||||
|
MaxRetries int `json:"max_retries"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// WorkResultDTO is the data transfer object for work results.
|
||||||
|
type WorkResultDTO struct {
|
||||||
|
Output string `json:"output,omitempty"`
|
||||||
|
Artifacts map[string]string `json:"artifacts,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// toWorkTaskDTO converts a domain.WorkTask to a WorkTaskDTO.
|
||||||
|
func toWorkTaskDTO(t *domain.WorkTask) *WorkTaskDTO {
|
||||||
|
if t == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
dto := &WorkTaskDTO{
|
||||||
|
ID: t.ID,
|
||||||
|
ProjectID: t.ProjectID,
|
||||||
|
Type: string(t.Type),
|
||||||
|
Spec: t.Spec,
|
||||||
|
Status: string(t.Status),
|
||||||
|
Priority: t.Priority,
|
||||||
|
WorkerID: t.WorkerID,
|
||||||
|
CallbackURL: t.CallbackURL,
|
||||||
|
CreatedAt: t.CreatedAt.Format("2006-01-02T15:04:05Z07:00"),
|
||||||
|
Error: t.Error,
|
||||||
|
RetryCount: t.RetryCount,
|
||||||
|
MaxRetries: t.MaxRetries,
|
||||||
|
}
|
||||||
|
if t.StartedAt != nil {
|
||||||
|
dto.StartedAt = t.StartedAt.Format("2006-01-02T15:04:05Z07:00")
|
||||||
|
}
|
||||||
|
if t.CompletedAt != nil {
|
||||||
|
dto.CompletedAt = t.CompletedAt.Format("2006-01-02T15:04:05Z07:00")
|
||||||
|
}
|
||||||
|
if t.Result != nil {
|
||||||
|
dto.Result = &WorkResultDTO{
|
||||||
|
Output: t.Result.Output,
|
||||||
|
Artifacts: t.Result.Artifacts,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return dto
|
||||||
|
}
|
||||||
|
|
||||||
|
// formatDuration formats a duration in a human-readable way.
|
||||||
|
func formatDuration(d interface{ Seconds() float64 }) string {
|
||||||
|
secs := d.Seconds()
|
||||||
|
if secs < 60 {
|
||||||
|
return fmt.Sprintf("%.0fs", secs)
|
||||||
|
}
|
||||||
|
mins := secs / 60
|
||||||
|
if mins < 60 {
|
||||||
|
return fmt.Sprintf("%.1fm", mins)
|
||||||
|
}
|
||||||
|
hours := mins / 60
|
||||||
|
if hours < 24 {
|
||||||
|
return fmt.Sprintf("%.1fh", hours)
|
||||||
|
}
|
||||||
|
days := hours / 24
|
||||||
|
return fmt.Sprintf("%.1fd", days)
|
||||||
|
}
|
||||||
@ -123,6 +123,39 @@ func (m *mockWorkQueue) GetTask(ctx context.Context, taskID string) (*domain.Wor
|
|||||||
return task, nil
|
return task, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *mockWorkQueue) List(ctx context.Context, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
|
||||||
|
if m.err != nil {
|
||||||
|
return nil, m.err
|
||||||
|
}
|
||||||
|
opts.Normalize()
|
||||||
|
|
||||||
|
var tasks []*domain.WorkTask
|
||||||
|
for _, task := range m.tasks {
|
||||||
|
if status == nil || task.Status == *status {
|
||||||
|
tasks = append(tasks, task)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply pagination
|
||||||
|
total := int64(len(tasks))
|
||||||
|
if opts.Offset >= len(tasks) {
|
||||||
|
tasks = nil
|
||||||
|
} else {
|
||||||
|
end := opts.Offset + opts.Limit
|
||||||
|
if end > len(tasks) {
|
||||||
|
end = len(tasks)
|
||||||
|
}
|
||||||
|
tasks = tasks[opts.Offset:end]
|
||||||
|
}
|
||||||
|
|
||||||
|
return &domain.WorkListResult{
|
||||||
|
Tasks: tasks,
|
||||||
|
Total: total,
|
||||||
|
Limit: opts.Limit,
|
||||||
|
Offset: opts.Offset,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (m *mockWorkQueue) ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
|
func (m *mockWorkQueue) ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
|
||||||
if m.err != nil {
|
if m.err != nil {
|
||||||
return nil, m.err
|
return nil, m.err
|
||||||
|
|||||||
@ -40,6 +40,10 @@ type WorkQueue interface {
|
|||||||
// GetTask retrieves a task by ID.
|
// GetTask retrieves a task by ID.
|
||||||
GetTask(ctx context.Context, taskID string) (*domain.WorkTask, error)
|
GetTask(ctx context.Context, taskID string) (*domain.WorkTask, error)
|
||||||
|
|
||||||
|
// List returns all tasks with optional status filter and pagination.
|
||||||
|
// Use for admin/debugging views across all projects.
|
||||||
|
List(ctx context.Context, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error)
|
||||||
|
|
||||||
// ListByProject returns tasks for a project with optional status filter and pagination.
|
// ListByProject returns tasks for a project with optional status filter and pagination.
|
||||||
ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error)
|
ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error)
|
||||||
|
|
||||||
|
|||||||
@ -91,6 +91,10 @@ func (m *mockWorkQueue) GetTask(ctx context.Context, taskID string) (*domain.Wor
|
|||||||
return task, nil
|
return task, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *mockWorkQueue) List(ctx context.Context, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
|
||||||
|
return &domain.WorkListResult{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (m *mockWorkQueue) ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
|
func (m *mockWorkQueue) ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
|
||||||
return &domain.WorkListResult{}, nil
|
return &domain.WorkListResult{}, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@ -205,6 +205,11 @@ func (s *WorkService) GetTask(ctx context.Context, taskID string) (*domain.WorkT
|
|||||||
return s.queue.GetTask(ctx, taskID)
|
return s.queue.GetTask(ctx, taskID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// List returns all tasks with optional status filter and pagination.
|
||||||
|
func (s *WorkService) List(ctx context.Context, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
|
||||||
|
return s.queue.List(ctx, status, opts)
|
||||||
|
}
|
||||||
|
|
||||||
// ListByProject returns tasks for a project with pagination.
|
// ListByProject returns tasks for a project with pagination.
|
||||||
func (s *WorkService) ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
|
func (s *WorkService) ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
|
||||||
return s.queue.ListByProject(ctx, projectID, status, opts)
|
return s.queue.ListByProject(ctx, projectID, status, opts)
|
||||||
|
|||||||
584
internal/worker/api_client_test.go
Normal file
584
internal/worker/api_client_test.go
Normal file
@ -0,0 +1,584 @@
|
|||||||
|
package worker
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/orchard9/rdev/internal/domain"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNewAPIClient_DefaultTimeout(t *testing.T) {
|
||||||
|
client := NewAPIClient(APIClientConfig{
|
||||||
|
BaseURL: "http://localhost:8080",
|
||||||
|
APIKey: "test-key",
|
||||||
|
})
|
||||||
|
|
||||||
|
if client.httpClient.Timeout != 30*time.Second {
|
||||||
|
t.Errorf("expected default timeout 30s, got %v", client.httpClient.Timeout)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewAPIClient_CustomTimeout(t *testing.T) {
|
||||||
|
client := NewAPIClient(APIClientConfig{
|
||||||
|
BaseURL: "http://localhost:8080",
|
||||||
|
APIKey: "test-key",
|
||||||
|
Timeout: 60 * time.Second,
|
||||||
|
})
|
||||||
|
|
||||||
|
if client.httpClient.Timeout != 60*time.Second {
|
||||||
|
t.Errorf("expected timeout 60s, got %v", client.httpClient.Timeout)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRegister_Success(t *testing.T) {
|
||||||
|
var receivedReq RegisterRequest
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.Method != http.MethodPost {
|
||||||
|
t.Errorf("expected POST, got %s", r.Method)
|
||||||
|
}
|
||||||
|
if r.URL.Path != "/workers/register" {
|
||||||
|
t.Errorf("expected /workers/register, got %s", r.URL.Path)
|
||||||
|
}
|
||||||
|
if r.Header.Get("Content-Type") != "application/json" {
|
||||||
|
t.Errorf("expected Content-Type application/json, got %s", r.Header.Get("Content-Type"))
|
||||||
|
}
|
||||||
|
if r.Header.Get("X-API-Key") != "test-key" {
|
||||||
|
t.Errorf("expected X-API-Key test-key, got %s", r.Header.Get("X-API-Key"))
|
||||||
|
}
|
||||||
|
|
||||||
|
body, _ := io.ReadAll(r.Body)
|
||||||
|
_ = json.Unmarshal(body, &receivedReq)
|
||||||
|
|
||||||
|
w.WriteHeader(http.StatusCreated)
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewAPIClient(APIClientConfig{
|
||||||
|
BaseURL: server.URL,
|
||||||
|
APIKey: "test-key",
|
||||||
|
})
|
||||||
|
|
||||||
|
err := client.Register(context.Background(), &RegisterRequest{
|
||||||
|
ID: "worker-1",
|
||||||
|
Hostname: "localhost",
|
||||||
|
Version: "v1.0.0",
|
||||||
|
Capabilities: []string{"build", "test"},
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if receivedReq.ID != "worker-1" {
|
||||||
|
t.Errorf("expected ID worker-1, got %s", receivedReq.ID)
|
||||||
|
}
|
||||||
|
if receivedReq.Hostname != "localhost" {
|
||||||
|
t.Errorf("expected hostname localhost, got %s", receivedReq.Hostname)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRegister_ErrorStatus(t *testing.T) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
|
_, _ = w.Write([]byte(`{"error":"invalid worker ID"}`))
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
|
||||||
|
|
||||||
|
err := client.Register(context.Background(), &RegisterRequest{ID: "bad"})
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected error, got nil")
|
||||||
|
}
|
||||||
|
if want := "register returned status 400"; !containsSubstring(err.Error(), want) {
|
||||||
|
t.Errorf("expected error containing %q, got %v", want, err)
|
||||||
|
}
|
||||||
|
if !containsSubstring(err.Error(), "invalid worker ID") {
|
||||||
|
t.Errorf("expected error to contain response body, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRegister_NetworkError(t *testing.T) {
|
||||||
|
client := NewAPIClient(APIClientConfig{
|
||||||
|
BaseURL: "http://localhost:1", // invalid port
|
||||||
|
Timeout: 100 * time.Millisecond,
|
||||||
|
})
|
||||||
|
|
||||||
|
err := client.Register(context.Background(), &RegisterRequest{ID: "test"})
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected error, got nil")
|
||||||
|
}
|
||||||
|
if !containsSubstring(err.Error(), "register:") {
|
||||||
|
t.Errorf("expected error wrapped with 'register:', got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRegister_ContextCanceled(t *testing.T) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
cancel() // cancel immediately
|
||||||
|
|
||||||
|
err := client.Register(ctx, &RegisterRequest{ID: "test"})
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected error, got nil")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHeartbeat_Success(t *testing.T) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.Method != http.MethodPost {
|
||||||
|
t.Errorf("expected POST, got %s", r.Method)
|
||||||
|
}
|
||||||
|
if r.URL.Path != "/workers/worker-123/heartbeat" {
|
||||||
|
t.Errorf("expected /workers/worker-123/heartbeat, got %s", r.URL.Path)
|
||||||
|
}
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewAPIClient(APIClientConfig{BaseURL: server.URL, APIKey: "key"})
|
||||||
|
|
||||||
|
err := client.Heartbeat(context.Background(), "worker-123")
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHeartbeat_WorkerNotFound(t *testing.T) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
_, _ = w.Write([]byte(`{"error":"worker not found"}`))
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
|
||||||
|
|
||||||
|
err := client.Heartbeat(context.Background(), "unknown-worker")
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected error, got nil")
|
||||||
|
}
|
||||||
|
if !containsSubstring(err.Error(), "heartbeat returned status 404") {
|
||||||
|
t.Errorf("expected 404 error, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestClaimTask_Success(t *testing.T) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.Method != http.MethodPost {
|
||||||
|
t.Errorf("expected POST, got %s", r.Method)
|
||||||
|
}
|
||||||
|
if r.URL.Path != "/workers/worker-1/claim" {
|
||||||
|
t.Errorf("expected /workers/worker-1/claim, got %s", r.URL.Path)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := ClaimTaskResponse{
|
||||||
|
Success: true,
|
||||||
|
Data: struct {
|
||||||
|
Task *WorkTaskData `json:"task"`
|
||||||
|
WorkerID string `json:"worker_id"`
|
||||||
|
}{
|
||||||
|
WorkerID: "worker-1",
|
||||||
|
Task: &WorkTaskData{
|
||||||
|
ID: "task-123",
|
||||||
|
ProjectID: "proj-1",
|
||||||
|
Type: "build",
|
||||||
|
Status: "running",
|
||||||
|
Priority: 5,
|
||||||
|
CreatedAt: "2024-01-15T10:30:00Z",
|
||||||
|
StartedAt: "2024-01-15T10:31:00Z",
|
||||||
|
RetryCount: 1,
|
||||||
|
MaxRetries: 3,
|
||||||
|
Spec: map[string]any{"prompt": "build it"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
_ = json.NewEncoder(w).Encode(resp)
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
|
||||||
|
|
||||||
|
task, err := client.ClaimTask(context.Background(), "worker-1")
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if task == nil {
|
||||||
|
t.Fatal("expected task, got nil")
|
||||||
|
}
|
||||||
|
if task.ID != "task-123" {
|
||||||
|
t.Errorf("expected task ID task-123, got %s", task.ID)
|
||||||
|
}
|
||||||
|
if task.ProjectID != "proj-1" {
|
||||||
|
t.Errorf("expected project ID proj-1, got %s", task.ProjectID)
|
||||||
|
}
|
||||||
|
if task.Type != domain.WorkTaskType("build") {
|
||||||
|
t.Errorf("expected type build, got %s", task.Type)
|
||||||
|
}
|
||||||
|
if task.Priority != 5 {
|
||||||
|
t.Errorf("expected priority 5, got %d", task.Priority)
|
||||||
|
}
|
||||||
|
if task.RetryCount != 1 {
|
||||||
|
t.Errorf("expected retry count 1, got %d", task.RetryCount)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestClaimTask_NoTasksAvailable(t *testing.T) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusNoContent)
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
|
||||||
|
|
||||||
|
task, err := client.ClaimTask(context.Background(), "worker-1")
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if task != nil {
|
||||||
|
t.Errorf("expected nil task, got %+v", task)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestClaimTask_NilTaskData(t *testing.T) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
resp := ClaimTaskResponse{
|
||||||
|
Success: true,
|
||||||
|
Data: struct {
|
||||||
|
Task *WorkTaskData `json:"task"`
|
||||||
|
WorkerID string `json:"worker_id"`
|
||||||
|
}{
|
||||||
|
WorkerID: "worker-1",
|
||||||
|
Task: nil,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
_ = json.NewEncoder(w).Encode(resp)
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
|
||||||
|
|
||||||
|
_, err := client.ClaimTask(context.Background(), "worker-1")
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected error, got nil")
|
||||||
|
}
|
||||||
|
if !containsSubstring(err.Error(), "no task data") {
|
||||||
|
t.Errorf("expected 'no task data' error, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestClaimTask_MalformedJSON(t *testing.T) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
_, _ = w.Write([]byte(`{invalid json}`))
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
|
||||||
|
|
||||||
|
_, err := client.ClaimTask(context.Background(), "worker-1")
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected error, got nil")
|
||||||
|
}
|
||||||
|
if !containsSubstring(err.Error(), "decode response") {
|
||||||
|
t.Errorf("expected decode error, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestClaimTask_ErrorStatus(t *testing.T) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
_, _ = w.Write([]byte(`{"error":"database error"}`))
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
|
||||||
|
|
||||||
|
_, err := client.ClaimTask(context.Background(), "worker-1")
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected error, got nil")
|
||||||
|
}
|
||||||
|
if !containsSubstring(err.Error(), "claim task returned status 500") {
|
||||||
|
t.Errorf("expected 500 error, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCompleteTask_Success(t *testing.T) {
|
||||||
|
var receivedReq CompleteTaskRequest
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.Method != http.MethodPost {
|
||||||
|
t.Errorf("expected POST, got %s", r.Method)
|
||||||
|
}
|
||||||
|
if r.URL.Path != "/workers/worker-1/complete/task-123" {
|
||||||
|
t.Errorf("expected /workers/worker-1/complete/task-123, got %s", r.URL.Path)
|
||||||
|
}
|
||||||
|
|
||||||
|
body, _ := io.ReadAll(r.Body)
|
||||||
|
_ = json.Unmarshal(body, &receivedReq)
|
||||||
|
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewAPIClient(APIClientConfig{BaseURL: server.URL, APIKey: "key"})
|
||||||
|
|
||||||
|
result := &domain.BuildResult{
|
||||||
|
Success: true,
|
||||||
|
Output: "Build successful",
|
||||||
|
CommitSHA: "abc123",
|
||||||
|
FilesChanged: []string{"main.go", "go.mod"},
|
||||||
|
DurationMs: 5000,
|
||||||
|
Artifacts: map[string]string{"deploy_url": "https://example.com"},
|
||||||
|
}
|
||||||
|
|
||||||
|
err := client.CompleteTask(context.Background(), "worker-1", "task-123", result)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if !receivedReq.Success {
|
||||||
|
t.Error("expected success=true")
|
||||||
|
}
|
||||||
|
if receivedReq.Output != "Build successful" {
|
||||||
|
t.Errorf("expected output 'Build successful', got %s", receivedReq.Output)
|
||||||
|
}
|
||||||
|
if receivedReq.CommitSHA != "abc123" {
|
||||||
|
t.Errorf("expected commit SHA abc123, got %s", receivedReq.CommitSHA)
|
||||||
|
}
|
||||||
|
if receivedReq.DurationMs != 5000 {
|
||||||
|
t.Errorf("expected duration 5000, got %d", receivedReq.DurationMs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCompleteTask_TaskNotFound(t *testing.T) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
_, _ = w.Write([]byte(`{"error":"task not found"}`))
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
|
||||||
|
|
||||||
|
err := client.CompleteTask(context.Background(), "worker-1", "unknown", &domain.BuildResult{})
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected error, got nil")
|
||||||
|
}
|
||||||
|
if !containsSubstring(err.Error(), "complete task returned status 404") {
|
||||||
|
t.Errorf("expected 404 error, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFailTask_Success(t *testing.T) {
|
||||||
|
var receivedReq FailTaskRequest
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.Method != http.MethodPost {
|
||||||
|
t.Errorf("expected POST, got %s", r.Method)
|
||||||
|
}
|
||||||
|
if r.URL.Path != "/workers/worker-1/fail/task-123" {
|
||||||
|
t.Errorf("expected /workers/worker-1/fail/task-123, got %s", r.URL.Path)
|
||||||
|
}
|
||||||
|
|
||||||
|
body, _ := io.ReadAll(r.Body)
|
||||||
|
_ = json.Unmarshal(body, &receivedReq)
|
||||||
|
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewAPIClient(APIClientConfig{BaseURL: server.URL, APIKey: "key"})
|
||||||
|
|
||||||
|
err := client.FailTask(context.Background(), "worker-1", "task-123", "build failed", "error output", 3000)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if receivedReq.Error != "build failed" {
|
||||||
|
t.Errorf("expected error 'build failed', got %s", receivedReq.Error)
|
||||||
|
}
|
||||||
|
if receivedReq.Output != "error output" {
|
||||||
|
t.Errorf("expected output 'error output', got %s", receivedReq.Output)
|
||||||
|
}
|
||||||
|
if receivedReq.DurationMs != 3000 {
|
||||||
|
t.Errorf("expected duration 3000, got %d", receivedReq.DurationMs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFailTask_ErrorStatus(t *testing.T) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
_, _ = w.Write([]byte(`{"error":"database unavailable"}`))
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
|
||||||
|
|
||||||
|
err := client.FailTask(context.Background(), "worker-1", "task-123", "failed", "", 0)
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected error, got nil")
|
||||||
|
}
|
||||||
|
if !containsSubstring(err.Error(), "fail task returned status 500") {
|
||||||
|
t.Errorf("expected 500 error, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestToWorkTask_ParsesTimestamps(t *testing.T) {
|
||||||
|
data := &WorkTaskData{
|
||||||
|
ID: "task-1",
|
||||||
|
ProjectID: "proj-1",
|
||||||
|
Type: "build",
|
||||||
|
Status: "running",
|
||||||
|
CreatedAt: "2024-01-15T10:30:00Z",
|
||||||
|
StartedAt: "2024-01-15T10:31:00Z",
|
||||||
|
}
|
||||||
|
|
||||||
|
task := data.ToWorkTask()
|
||||||
|
|
||||||
|
if task.CreatedAt.IsZero() {
|
||||||
|
t.Error("expected CreatedAt to be set")
|
||||||
|
}
|
||||||
|
if task.CreatedAt.Year() != 2024 || task.CreatedAt.Month() != 1 || task.CreatedAt.Day() != 15 {
|
||||||
|
t.Errorf("unexpected CreatedAt: %v", task.CreatedAt)
|
||||||
|
}
|
||||||
|
|
||||||
|
if task.StartedAt == nil {
|
||||||
|
t.Fatal("expected StartedAt to be set")
|
||||||
|
}
|
||||||
|
if task.StartedAt.Hour() != 10 || task.StartedAt.Minute() != 31 {
|
||||||
|
t.Errorf("unexpected StartedAt: %v", task.StartedAt)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestToWorkTask_InvalidTimestamp(t *testing.T) {
|
||||||
|
data := &WorkTaskData{
|
||||||
|
ID: "task-1",
|
||||||
|
ProjectID: "proj-1",
|
||||||
|
Type: "build",
|
||||||
|
Status: "running",
|
||||||
|
CreatedAt: "not-a-timestamp",
|
||||||
|
StartedAt: "also-not-valid",
|
||||||
|
}
|
||||||
|
|
||||||
|
task := data.ToWorkTask()
|
||||||
|
|
||||||
|
// Invalid timestamps should result in zero values (current behavior)
|
||||||
|
if !task.CreatedAt.IsZero() {
|
||||||
|
t.Errorf("expected zero CreatedAt for invalid timestamp, got %v", task.CreatedAt)
|
||||||
|
}
|
||||||
|
// Note: StartedAt is parsed but ignored on error, pointer becomes non-nil with zero value
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestToWorkTask_EmptyTimestamps(t *testing.T) {
|
||||||
|
data := &WorkTaskData{
|
||||||
|
ID: "task-1",
|
||||||
|
ProjectID: "proj-1",
|
||||||
|
Type: "build",
|
||||||
|
Status: "pending",
|
||||||
|
CreatedAt: "",
|
||||||
|
StartedAt: "",
|
||||||
|
}
|
||||||
|
|
||||||
|
task := data.ToWorkTask()
|
||||||
|
|
||||||
|
if !task.CreatedAt.IsZero() {
|
||||||
|
t.Errorf("expected zero CreatedAt for empty string, got %v", task.CreatedAt)
|
||||||
|
}
|
||||||
|
if task.StartedAt != nil {
|
||||||
|
t.Errorf("expected nil StartedAt for empty string, got %v", task.StartedAt)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestToWorkTask_NilData(t *testing.T) {
|
||||||
|
var data *WorkTaskData
|
||||||
|
task := data.ToWorkTask()
|
||||||
|
|
||||||
|
if task != nil {
|
||||||
|
t.Errorf("expected nil task from nil data, got %+v", task)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestToWorkTask_PreservesAllFields(t *testing.T) {
|
||||||
|
data := &WorkTaskData{
|
||||||
|
ID: "task-1",
|
||||||
|
ProjectID: "proj-1",
|
||||||
|
Type: "deploy",
|
||||||
|
Status: "completed",
|
||||||
|
Priority: 10,
|
||||||
|
WorkerID: "worker-99",
|
||||||
|
CallbackURL: "https://callback.example.com",
|
||||||
|
RetryCount: 2,
|
||||||
|
MaxRetries: 5,
|
||||||
|
Spec: map[string]any{"key": "value"},
|
||||||
|
}
|
||||||
|
|
||||||
|
task := data.ToWorkTask()
|
||||||
|
|
||||||
|
if task.ID != "task-1" {
|
||||||
|
t.Errorf("expected ID task-1, got %s", task.ID)
|
||||||
|
}
|
||||||
|
if task.ProjectID != "proj-1" {
|
||||||
|
t.Errorf("expected ProjectID proj-1, got %s", task.ProjectID)
|
||||||
|
}
|
||||||
|
if task.Type != domain.WorkTaskType("deploy") {
|
||||||
|
t.Errorf("expected Type deploy, got %s", task.Type)
|
||||||
|
}
|
||||||
|
if task.Status != domain.WorkTaskStatus("completed") {
|
||||||
|
t.Errorf("expected Status completed, got %s", task.Status)
|
||||||
|
}
|
||||||
|
if task.Priority != 10 {
|
||||||
|
t.Errorf("expected Priority 10, got %d", task.Priority)
|
||||||
|
}
|
||||||
|
if task.WorkerID != "worker-99" {
|
||||||
|
t.Errorf("expected WorkerID worker-99, got %s", task.WorkerID)
|
||||||
|
}
|
||||||
|
if task.CallbackURL != "https://callback.example.com" {
|
||||||
|
t.Errorf("expected CallbackURL, got %s", task.CallbackURL)
|
||||||
|
}
|
||||||
|
if task.RetryCount != 2 {
|
||||||
|
t.Errorf("expected RetryCount 2, got %d", task.RetryCount)
|
||||||
|
}
|
||||||
|
if task.MaxRetries != 5 {
|
||||||
|
t.Errorf("expected MaxRetries 5, got %d", task.MaxRetries)
|
||||||
|
}
|
||||||
|
if task.Spec["key"] != "value" {
|
||||||
|
t.Errorf("expected Spec[key]=value, got %v", task.Spec)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// containsSubstring is a helper to check if a string contains a substring.
|
||||||
|
func containsSubstring(s, substr string) bool {
|
||||||
|
return len(s) >= len(substr) && (s == substr || len(s) > 0 && containsHelper(s, substr))
|
||||||
|
}
|
||||||
|
|
||||||
|
func containsHelper(s, substr string) bool {
|
||||||
|
for i := 0; i <= len(s)-len(substr); i++ {
|
||||||
|
if s[i:i+len(substr)] == substr {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
@ -104,6 +104,9 @@ func (m *mockWorkQueue) GetTask(_ context.Context, taskID string) (*domain.WorkT
|
|||||||
}
|
}
|
||||||
return task, nil
|
return task, nil
|
||||||
}
|
}
|
||||||
|
func (m *mockWorkQueue) List(_ context.Context, _ *domain.WorkTaskStatus, _ domain.WorkListOptions) (*domain.WorkListResult, error) {
|
||||||
|
return &domain.WorkListResult{}, nil
|
||||||
|
}
|
||||||
func (m *mockWorkQueue) ListByProject(_ context.Context, _ string, _ *domain.WorkTaskStatus, _ domain.WorkListOptions) (*domain.WorkListResult, error) {
|
func (m *mockWorkQueue) ListByProject(_ context.Context, _ string, _ *domain.WorkTaskStatus, _ domain.WorkListOptions) (*domain.WorkListResult, error) {
|
||||||
return &domain.WorkListResult{}, nil
|
return &domain.WorkListResult{}, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@ -63,6 +63,10 @@ func (m *mockMaintenanceQueue) GetTask(_ context.Context, _ string) (*domain.Wor
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *mockMaintenanceQueue) List(_ context.Context, _ *domain.WorkTaskStatus, _ domain.WorkListOptions) (*domain.WorkListResult, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (m *mockMaintenanceQueue) ListByProject(_ context.Context, _ string, _ *domain.WorkTaskStatus, _ domain.WorkListOptions) (*domain.WorkListResult, error) {
|
func (m *mockMaintenanceQueue) ListByProject(_ context.Context, _ string, _ *domain.WorkTaskStatus, _ domain.WorkListOptions) (*domain.WorkListResult, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user