Critical fix: WorkersHandler was missing workService dependency, causing 500 errors when workers tried to fail tasks. This caused tasks to get stuck in "running" state permanently. Also adds: - /work/tasks endpoint for debugging all tasks across projects - List method to WorkQueue interface for admin views - HTTP client tests for api_client.go and claudebox/client.go (48 tests) - Split work.go DTOs into work_dto.go to stay under 500 lines Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
585 lines
16 KiB
Go
585 lines
16 KiB
Go
package worker
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"io"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/orchard9/rdev/internal/domain"
|
|
)
|
|
|
|
func TestNewAPIClient_DefaultTimeout(t *testing.T) {
|
|
client := NewAPIClient(APIClientConfig{
|
|
BaseURL: "http://localhost:8080",
|
|
APIKey: "test-key",
|
|
})
|
|
|
|
if client.httpClient.Timeout != 30*time.Second {
|
|
t.Errorf("expected default timeout 30s, got %v", client.httpClient.Timeout)
|
|
}
|
|
}
|
|
|
|
func TestNewAPIClient_CustomTimeout(t *testing.T) {
|
|
client := NewAPIClient(APIClientConfig{
|
|
BaseURL: "http://localhost:8080",
|
|
APIKey: "test-key",
|
|
Timeout: 60 * time.Second,
|
|
})
|
|
|
|
if client.httpClient.Timeout != 60*time.Second {
|
|
t.Errorf("expected timeout 60s, got %v", client.httpClient.Timeout)
|
|
}
|
|
}
|
|
|
|
func TestRegister_Success(t *testing.T) {
|
|
var receivedReq RegisterRequest
|
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodPost {
|
|
t.Errorf("expected POST, got %s", r.Method)
|
|
}
|
|
if r.URL.Path != "/workers/register" {
|
|
t.Errorf("expected /workers/register, got %s", r.URL.Path)
|
|
}
|
|
if r.Header.Get("Content-Type") != "application/json" {
|
|
t.Errorf("expected Content-Type application/json, got %s", r.Header.Get("Content-Type"))
|
|
}
|
|
if r.Header.Get("X-API-Key") != "test-key" {
|
|
t.Errorf("expected X-API-Key test-key, got %s", r.Header.Get("X-API-Key"))
|
|
}
|
|
|
|
body, _ := io.ReadAll(r.Body)
|
|
_ = json.Unmarshal(body, &receivedReq)
|
|
|
|
w.WriteHeader(http.StatusCreated)
|
|
}))
|
|
defer server.Close()
|
|
|
|
client := NewAPIClient(APIClientConfig{
|
|
BaseURL: server.URL,
|
|
APIKey: "test-key",
|
|
})
|
|
|
|
err := client.Register(context.Background(), &RegisterRequest{
|
|
ID: "worker-1",
|
|
Hostname: "localhost",
|
|
Version: "v1.0.0",
|
|
Capabilities: []string{"build", "test"},
|
|
})
|
|
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
|
|
if receivedReq.ID != "worker-1" {
|
|
t.Errorf("expected ID worker-1, got %s", receivedReq.ID)
|
|
}
|
|
if receivedReq.Hostname != "localhost" {
|
|
t.Errorf("expected hostname localhost, got %s", receivedReq.Hostname)
|
|
}
|
|
}
|
|
|
|
func TestRegister_ErrorStatus(t *testing.T) {
|
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
_, _ = w.Write([]byte(`{"error":"invalid worker ID"}`))
|
|
}))
|
|
defer server.Close()
|
|
|
|
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
|
|
|
|
err := client.Register(context.Background(), &RegisterRequest{ID: "bad"})
|
|
|
|
if err == nil {
|
|
t.Fatal("expected error, got nil")
|
|
}
|
|
if want := "register returned status 400"; !containsSubstring(err.Error(), want) {
|
|
t.Errorf("expected error containing %q, got %v", want, err)
|
|
}
|
|
if !containsSubstring(err.Error(), "invalid worker ID") {
|
|
t.Errorf("expected error to contain response body, got %v", err)
|
|
}
|
|
}
|
|
|
|
func TestRegister_NetworkError(t *testing.T) {
|
|
client := NewAPIClient(APIClientConfig{
|
|
BaseURL: "http://localhost:1", // invalid port
|
|
Timeout: 100 * time.Millisecond,
|
|
})
|
|
|
|
err := client.Register(context.Background(), &RegisterRequest{ID: "test"})
|
|
|
|
if err == nil {
|
|
t.Fatal("expected error, got nil")
|
|
}
|
|
if !containsSubstring(err.Error(), "register:") {
|
|
t.Errorf("expected error wrapped with 'register:', got %v", err)
|
|
}
|
|
}
|
|
|
|
func TestRegister_ContextCanceled(t *testing.T) {
|
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
time.Sleep(1 * time.Second)
|
|
w.WriteHeader(http.StatusOK)
|
|
}))
|
|
defer server.Close()
|
|
|
|
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
cancel() // cancel immediately
|
|
|
|
err := client.Register(ctx, &RegisterRequest{ID: "test"})
|
|
|
|
if err == nil {
|
|
t.Fatal("expected error, got nil")
|
|
}
|
|
}
|
|
|
|
func TestHeartbeat_Success(t *testing.T) {
|
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodPost {
|
|
t.Errorf("expected POST, got %s", r.Method)
|
|
}
|
|
if r.URL.Path != "/workers/worker-123/heartbeat" {
|
|
t.Errorf("expected /workers/worker-123/heartbeat, got %s", r.URL.Path)
|
|
}
|
|
w.WriteHeader(http.StatusOK)
|
|
}))
|
|
defer server.Close()
|
|
|
|
client := NewAPIClient(APIClientConfig{BaseURL: server.URL, APIKey: "key"})
|
|
|
|
err := client.Heartbeat(context.Background(), "worker-123")
|
|
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestHeartbeat_WorkerNotFound(t *testing.T) {
|
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
w.WriteHeader(http.StatusNotFound)
|
|
_, _ = w.Write([]byte(`{"error":"worker not found"}`))
|
|
}))
|
|
defer server.Close()
|
|
|
|
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
|
|
|
|
err := client.Heartbeat(context.Background(), "unknown-worker")
|
|
|
|
if err == nil {
|
|
t.Fatal("expected error, got nil")
|
|
}
|
|
if !containsSubstring(err.Error(), "heartbeat returned status 404") {
|
|
t.Errorf("expected 404 error, got %v", err)
|
|
}
|
|
}
|
|
|
|
func TestClaimTask_Success(t *testing.T) {
|
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodPost {
|
|
t.Errorf("expected POST, got %s", r.Method)
|
|
}
|
|
if r.URL.Path != "/workers/worker-1/claim" {
|
|
t.Errorf("expected /workers/worker-1/claim, got %s", r.URL.Path)
|
|
}
|
|
|
|
resp := ClaimTaskResponse{
|
|
Success: true,
|
|
Data: struct {
|
|
Task *WorkTaskData `json:"task"`
|
|
WorkerID string `json:"worker_id"`
|
|
}{
|
|
WorkerID: "worker-1",
|
|
Task: &WorkTaskData{
|
|
ID: "task-123",
|
|
ProjectID: "proj-1",
|
|
Type: "build",
|
|
Status: "running",
|
|
Priority: 5,
|
|
CreatedAt: "2024-01-15T10:30:00Z",
|
|
StartedAt: "2024-01-15T10:31:00Z",
|
|
RetryCount: 1,
|
|
MaxRetries: 3,
|
|
Spec: map[string]any{"prompt": "build it"},
|
|
},
|
|
},
|
|
}
|
|
w.Header().Set("Content-Type", "application/json")
|
|
_ = json.NewEncoder(w).Encode(resp)
|
|
}))
|
|
defer server.Close()
|
|
|
|
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
|
|
|
|
task, err := client.ClaimTask(context.Background(), "worker-1")
|
|
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
if task == nil {
|
|
t.Fatal("expected task, got nil")
|
|
}
|
|
if task.ID != "task-123" {
|
|
t.Errorf("expected task ID task-123, got %s", task.ID)
|
|
}
|
|
if task.ProjectID != "proj-1" {
|
|
t.Errorf("expected project ID proj-1, got %s", task.ProjectID)
|
|
}
|
|
if task.Type != domain.WorkTaskType("build") {
|
|
t.Errorf("expected type build, got %s", task.Type)
|
|
}
|
|
if task.Priority != 5 {
|
|
t.Errorf("expected priority 5, got %d", task.Priority)
|
|
}
|
|
if task.RetryCount != 1 {
|
|
t.Errorf("expected retry count 1, got %d", task.RetryCount)
|
|
}
|
|
}
|
|
|
|
func TestClaimTask_NoTasksAvailable(t *testing.T) {
|
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
w.WriteHeader(http.StatusNoContent)
|
|
}))
|
|
defer server.Close()
|
|
|
|
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
|
|
|
|
task, err := client.ClaimTask(context.Background(), "worker-1")
|
|
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
if task != nil {
|
|
t.Errorf("expected nil task, got %+v", task)
|
|
}
|
|
}
|
|
|
|
func TestClaimTask_NilTaskData(t *testing.T) {
|
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
resp := ClaimTaskResponse{
|
|
Success: true,
|
|
Data: struct {
|
|
Task *WorkTaskData `json:"task"`
|
|
WorkerID string `json:"worker_id"`
|
|
}{
|
|
WorkerID: "worker-1",
|
|
Task: nil,
|
|
},
|
|
}
|
|
w.Header().Set("Content-Type", "application/json")
|
|
_ = json.NewEncoder(w).Encode(resp)
|
|
}))
|
|
defer server.Close()
|
|
|
|
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
|
|
|
|
_, err := client.ClaimTask(context.Background(), "worker-1")
|
|
|
|
if err == nil {
|
|
t.Fatal("expected error, got nil")
|
|
}
|
|
if !containsSubstring(err.Error(), "no task data") {
|
|
t.Errorf("expected 'no task data' error, got %v", err)
|
|
}
|
|
}
|
|
|
|
func TestClaimTask_MalformedJSON(t *testing.T) {
|
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
_, _ = w.Write([]byte(`{invalid json}`))
|
|
}))
|
|
defer server.Close()
|
|
|
|
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
|
|
|
|
_, err := client.ClaimTask(context.Background(), "worker-1")
|
|
|
|
if err == nil {
|
|
t.Fatal("expected error, got nil")
|
|
}
|
|
if !containsSubstring(err.Error(), "decode response") {
|
|
t.Errorf("expected decode error, got %v", err)
|
|
}
|
|
}
|
|
|
|
func TestClaimTask_ErrorStatus(t *testing.T) {
|
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
_, _ = w.Write([]byte(`{"error":"database error"}`))
|
|
}))
|
|
defer server.Close()
|
|
|
|
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
|
|
|
|
_, err := client.ClaimTask(context.Background(), "worker-1")
|
|
|
|
if err == nil {
|
|
t.Fatal("expected error, got nil")
|
|
}
|
|
if !containsSubstring(err.Error(), "claim task returned status 500") {
|
|
t.Errorf("expected 500 error, got %v", err)
|
|
}
|
|
}
|
|
|
|
func TestCompleteTask_Success(t *testing.T) {
|
|
var receivedReq CompleteTaskRequest
|
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodPost {
|
|
t.Errorf("expected POST, got %s", r.Method)
|
|
}
|
|
if r.URL.Path != "/workers/worker-1/complete/task-123" {
|
|
t.Errorf("expected /workers/worker-1/complete/task-123, got %s", r.URL.Path)
|
|
}
|
|
|
|
body, _ := io.ReadAll(r.Body)
|
|
_ = json.Unmarshal(body, &receivedReq)
|
|
|
|
w.WriteHeader(http.StatusOK)
|
|
}))
|
|
defer server.Close()
|
|
|
|
client := NewAPIClient(APIClientConfig{BaseURL: server.URL, APIKey: "key"})
|
|
|
|
result := &domain.BuildResult{
|
|
Success: true,
|
|
Output: "Build successful",
|
|
CommitSHA: "abc123",
|
|
FilesChanged: []string{"main.go", "go.mod"},
|
|
DurationMs: 5000,
|
|
Artifacts: map[string]string{"deploy_url": "https://example.com"},
|
|
}
|
|
|
|
err := client.CompleteTask(context.Background(), "worker-1", "task-123", result)
|
|
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
if !receivedReq.Success {
|
|
t.Error("expected success=true")
|
|
}
|
|
if receivedReq.Output != "Build successful" {
|
|
t.Errorf("expected output 'Build successful', got %s", receivedReq.Output)
|
|
}
|
|
if receivedReq.CommitSHA != "abc123" {
|
|
t.Errorf("expected commit SHA abc123, got %s", receivedReq.CommitSHA)
|
|
}
|
|
if receivedReq.DurationMs != 5000 {
|
|
t.Errorf("expected duration 5000, got %d", receivedReq.DurationMs)
|
|
}
|
|
}
|
|
|
|
func TestCompleteTask_TaskNotFound(t *testing.T) {
|
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
w.WriteHeader(http.StatusNotFound)
|
|
_, _ = w.Write([]byte(`{"error":"task not found"}`))
|
|
}))
|
|
defer server.Close()
|
|
|
|
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
|
|
|
|
err := client.CompleteTask(context.Background(), "worker-1", "unknown", &domain.BuildResult{})
|
|
|
|
if err == nil {
|
|
t.Fatal("expected error, got nil")
|
|
}
|
|
if !containsSubstring(err.Error(), "complete task returned status 404") {
|
|
t.Errorf("expected 404 error, got %v", err)
|
|
}
|
|
}
|
|
|
|
func TestFailTask_Success(t *testing.T) {
|
|
var receivedReq FailTaskRequest
|
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodPost {
|
|
t.Errorf("expected POST, got %s", r.Method)
|
|
}
|
|
if r.URL.Path != "/workers/worker-1/fail/task-123" {
|
|
t.Errorf("expected /workers/worker-1/fail/task-123, got %s", r.URL.Path)
|
|
}
|
|
|
|
body, _ := io.ReadAll(r.Body)
|
|
_ = json.Unmarshal(body, &receivedReq)
|
|
|
|
w.WriteHeader(http.StatusOK)
|
|
}))
|
|
defer server.Close()
|
|
|
|
client := NewAPIClient(APIClientConfig{BaseURL: server.URL, APIKey: "key"})
|
|
|
|
err := client.FailTask(context.Background(), "worker-1", "task-123", "build failed", "error output", 3000)
|
|
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
if receivedReq.Error != "build failed" {
|
|
t.Errorf("expected error 'build failed', got %s", receivedReq.Error)
|
|
}
|
|
if receivedReq.Output != "error output" {
|
|
t.Errorf("expected output 'error output', got %s", receivedReq.Output)
|
|
}
|
|
if receivedReq.DurationMs != 3000 {
|
|
t.Errorf("expected duration 3000, got %d", receivedReq.DurationMs)
|
|
}
|
|
}
|
|
|
|
func TestFailTask_ErrorStatus(t *testing.T) {
|
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
_, _ = w.Write([]byte(`{"error":"database unavailable"}`))
|
|
}))
|
|
defer server.Close()
|
|
|
|
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
|
|
|
|
err := client.FailTask(context.Background(), "worker-1", "task-123", "failed", "", 0)
|
|
|
|
if err == nil {
|
|
t.Fatal("expected error, got nil")
|
|
}
|
|
if !containsSubstring(err.Error(), "fail task returned status 500") {
|
|
t.Errorf("expected 500 error, got %v", err)
|
|
}
|
|
}
|
|
|
|
func TestToWorkTask_ParsesTimestamps(t *testing.T) {
|
|
data := &WorkTaskData{
|
|
ID: "task-1",
|
|
ProjectID: "proj-1",
|
|
Type: "build",
|
|
Status: "running",
|
|
CreatedAt: "2024-01-15T10:30:00Z",
|
|
StartedAt: "2024-01-15T10:31:00Z",
|
|
}
|
|
|
|
task := data.ToWorkTask()
|
|
|
|
if task.CreatedAt.IsZero() {
|
|
t.Error("expected CreatedAt to be set")
|
|
}
|
|
if task.CreatedAt.Year() != 2024 || task.CreatedAt.Month() != 1 || task.CreatedAt.Day() != 15 {
|
|
t.Errorf("unexpected CreatedAt: %v", task.CreatedAt)
|
|
}
|
|
|
|
if task.StartedAt == nil {
|
|
t.Fatal("expected StartedAt to be set")
|
|
}
|
|
if task.StartedAt.Hour() != 10 || task.StartedAt.Minute() != 31 {
|
|
t.Errorf("unexpected StartedAt: %v", task.StartedAt)
|
|
}
|
|
}
|
|
|
|
func TestToWorkTask_InvalidTimestamp(t *testing.T) {
|
|
data := &WorkTaskData{
|
|
ID: "task-1",
|
|
ProjectID: "proj-1",
|
|
Type: "build",
|
|
Status: "running",
|
|
CreatedAt: "not-a-timestamp",
|
|
StartedAt: "also-not-valid",
|
|
}
|
|
|
|
task := data.ToWorkTask()
|
|
|
|
// Invalid timestamps should result in zero values (current behavior)
|
|
if !task.CreatedAt.IsZero() {
|
|
t.Errorf("expected zero CreatedAt for invalid timestamp, got %v", task.CreatedAt)
|
|
}
|
|
// Note: StartedAt is parsed but ignored on error, pointer becomes non-nil with zero value
|
|
}
|
|
|
|
func TestToWorkTask_EmptyTimestamps(t *testing.T) {
|
|
data := &WorkTaskData{
|
|
ID: "task-1",
|
|
ProjectID: "proj-1",
|
|
Type: "build",
|
|
Status: "pending",
|
|
CreatedAt: "",
|
|
StartedAt: "",
|
|
}
|
|
|
|
task := data.ToWorkTask()
|
|
|
|
if !task.CreatedAt.IsZero() {
|
|
t.Errorf("expected zero CreatedAt for empty string, got %v", task.CreatedAt)
|
|
}
|
|
if task.StartedAt != nil {
|
|
t.Errorf("expected nil StartedAt for empty string, got %v", task.StartedAt)
|
|
}
|
|
}
|
|
|
|
func TestToWorkTask_NilData(t *testing.T) {
|
|
var data *WorkTaskData
|
|
task := data.ToWorkTask()
|
|
|
|
if task != nil {
|
|
t.Errorf("expected nil task from nil data, got %+v", task)
|
|
}
|
|
}
|
|
|
|
func TestToWorkTask_PreservesAllFields(t *testing.T) {
|
|
data := &WorkTaskData{
|
|
ID: "task-1",
|
|
ProjectID: "proj-1",
|
|
Type: "deploy",
|
|
Status: "completed",
|
|
Priority: 10,
|
|
WorkerID: "worker-99",
|
|
CallbackURL: "https://callback.example.com",
|
|
RetryCount: 2,
|
|
MaxRetries: 5,
|
|
Spec: map[string]any{"key": "value"},
|
|
}
|
|
|
|
task := data.ToWorkTask()
|
|
|
|
if task.ID != "task-1" {
|
|
t.Errorf("expected ID task-1, got %s", task.ID)
|
|
}
|
|
if task.ProjectID != "proj-1" {
|
|
t.Errorf("expected ProjectID proj-1, got %s", task.ProjectID)
|
|
}
|
|
if task.Type != domain.WorkTaskType("deploy") {
|
|
t.Errorf("expected Type deploy, got %s", task.Type)
|
|
}
|
|
if task.Status != domain.WorkTaskStatus("completed") {
|
|
t.Errorf("expected Status completed, got %s", task.Status)
|
|
}
|
|
if task.Priority != 10 {
|
|
t.Errorf("expected Priority 10, got %d", task.Priority)
|
|
}
|
|
if task.WorkerID != "worker-99" {
|
|
t.Errorf("expected WorkerID worker-99, got %s", task.WorkerID)
|
|
}
|
|
if task.CallbackURL != "https://callback.example.com" {
|
|
t.Errorf("expected CallbackURL, got %s", task.CallbackURL)
|
|
}
|
|
if task.RetryCount != 2 {
|
|
t.Errorf("expected RetryCount 2, got %d", task.RetryCount)
|
|
}
|
|
if task.MaxRetries != 5 {
|
|
t.Errorf("expected MaxRetries 5, got %d", task.MaxRetries)
|
|
}
|
|
if task.Spec["key"] != "value" {
|
|
t.Errorf("expected Spec[key]=value, got %v", task.Spec)
|
|
}
|
|
}
|
|
|
|
// containsSubstring is a helper to check if a string contains a substring.
|
|
func containsSubstring(s, substr string) bool {
|
|
return len(s) >= len(substr) && (s == substr || len(s) > 0 && containsHelper(s, substr))
|
|
}
|
|
|
|
func containsHelper(s, substr string) bool {
|
|
for i := 0; i <= len(s)-len(substr); i++ {
|
|
if s[i:i+len(substr)] == substr {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|