Critical fix: WorkersHandler was missing workService dependency, causing 500 errors when workers tried to fail tasks. This caused tasks to get stuck in "running" state permanently. Also adds: - /work/tasks endpoint for debugging all tasks across projects - List method to WorkQueue interface for admin views - HTTP client tests for api_client.go and claudebox/client.go (48 tests) - Split work.go DTOs into work_dto.go to stay under 500 lines Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
458 lines
10 KiB
Go
458 lines
10 KiB
Go
package handlers
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/go-chi/chi/v5"
|
|
"github.com/orchard9/rdev/internal/domain"
|
|
"github.com/orchard9/rdev/internal/service"
|
|
)
|
|
|
|
// mockWorkQueue implements port.WorkQueue for testing.
|
|
type mockWorkQueue struct {
|
|
tasks map[string]*domain.WorkTask
|
|
err error
|
|
}
|
|
|
|
func newMockWorkQueue() *mockWorkQueue {
|
|
return &mockWorkQueue{
|
|
tasks: make(map[string]*domain.WorkTask),
|
|
}
|
|
}
|
|
|
|
func (m *mockWorkQueue) Enqueue(ctx context.Context, task *domain.WorkTask) (string, error) {
|
|
if m.err != nil {
|
|
return "", m.err
|
|
}
|
|
id := "task-123"
|
|
task.ID = id
|
|
task.Status = domain.WorkTaskStatusPending
|
|
task.CreatedAt = time.Now()
|
|
m.tasks[id] = task
|
|
return id, nil
|
|
}
|
|
|
|
func (m *mockWorkQueue) Dequeue(ctx context.Context, workerID string) (*domain.WorkTask, error) {
|
|
if m.err != nil {
|
|
return nil, m.err
|
|
}
|
|
for _, task := range m.tasks {
|
|
if task.Status == domain.WorkTaskStatusPending {
|
|
task.Status = domain.WorkTaskStatusRunning
|
|
task.WorkerID = workerID
|
|
now := time.Now()
|
|
task.StartedAt = &now
|
|
return task, nil
|
|
}
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
func (m *mockWorkQueue) Complete(ctx context.Context, taskID string, result *domain.WorkResult) error {
|
|
if m.err != nil {
|
|
return m.err
|
|
}
|
|
task, ok := m.tasks[taskID]
|
|
if !ok {
|
|
return domain.ErrWorkTaskNotFound
|
|
}
|
|
task.Status = domain.WorkTaskStatusCompleted
|
|
task.Result = result
|
|
now := time.Now()
|
|
task.CompletedAt = &now
|
|
return nil
|
|
}
|
|
|
|
func (m *mockWorkQueue) Fail(ctx context.Context, taskID string, errMsg string) error {
|
|
return m.FailWithCode(ctx, taskID, errMsg, domain.WorkErrorCodeNone)
|
|
}
|
|
|
|
func (m *mockWorkQueue) FailWithCode(ctx context.Context, taskID string, errMsg string, code domain.WorkErrorCode) error {
|
|
if m.err != nil {
|
|
return m.err
|
|
}
|
|
task, ok := m.tasks[taskID]
|
|
if !ok {
|
|
return domain.ErrWorkTaskNotFound
|
|
}
|
|
if task.RetryCount < task.MaxRetries {
|
|
task.Status = domain.WorkTaskStatusPending
|
|
task.RetryCount++
|
|
task.Error = errMsg
|
|
} else {
|
|
task.Status = domain.WorkTaskStatusFailed
|
|
task.Error = errMsg
|
|
task.ErrorCode = code
|
|
now := time.Now()
|
|
task.CompletedAt = &now
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *mockWorkQueue) Cancel(ctx context.Context, taskID string) error {
|
|
if m.err != nil {
|
|
return m.err
|
|
}
|
|
task, ok := m.tasks[taskID]
|
|
if !ok {
|
|
return domain.ErrWorkTaskNotFound
|
|
}
|
|
if task.Status != domain.WorkTaskStatusPending {
|
|
return domain.ErrWorkTaskNotFound
|
|
}
|
|
task.Status = domain.WorkTaskStatusCancelled
|
|
now := time.Now()
|
|
task.CompletedAt = &now
|
|
return nil
|
|
}
|
|
|
|
func (m *mockWorkQueue) GetTask(ctx context.Context, taskID string) (*domain.WorkTask, error) {
|
|
if m.err != nil {
|
|
return nil, m.err
|
|
}
|
|
task, ok := m.tasks[taskID]
|
|
if !ok {
|
|
return nil, domain.ErrWorkTaskNotFound
|
|
}
|
|
return task, nil
|
|
}
|
|
|
|
func (m *mockWorkQueue) List(ctx context.Context, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
|
|
if m.err != nil {
|
|
return nil, m.err
|
|
}
|
|
opts.Normalize()
|
|
|
|
var tasks []*domain.WorkTask
|
|
for _, task := range m.tasks {
|
|
if status == nil || task.Status == *status {
|
|
tasks = append(tasks, task)
|
|
}
|
|
}
|
|
|
|
// Apply pagination
|
|
total := int64(len(tasks))
|
|
if opts.Offset >= len(tasks) {
|
|
tasks = nil
|
|
} else {
|
|
end := opts.Offset + opts.Limit
|
|
if end > len(tasks) {
|
|
end = len(tasks)
|
|
}
|
|
tasks = tasks[opts.Offset:end]
|
|
}
|
|
|
|
return &domain.WorkListResult{
|
|
Tasks: tasks,
|
|
Total: total,
|
|
Limit: opts.Limit,
|
|
Offset: opts.Offset,
|
|
}, nil
|
|
}
|
|
|
|
func (m *mockWorkQueue) ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
|
|
if m.err != nil {
|
|
return nil, m.err
|
|
}
|
|
opts.Normalize()
|
|
|
|
var tasks []*domain.WorkTask
|
|
for _, task := range m.tasks {
|
|
if task.ProjectID == projectID {
|
|
if status == nil || task.Status == *status {
|
|
tasks = append(tasks, task)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Apply pagination
|
|
total := int64(len(tasks))
|
|
if opts.Offset >= len(tasks) {
|
|
tasks = nil
|
|
} else {
|
|
end := opts.Offset + opts.Limit
|
|
if end > len(tasks) {
|
|
end = len(tasks)
|
|
}
|
|
tasks = tasks[opts.Offset:end]
|
|
}
|
|
|
|
return &domain.WorkListResult{
|
|
Tasks: tasks,
|
|
Total: total,
|
|
Limit: opts.Limit,
|
|
Offset: opts.Offset,
|
|
}, nil
|
|
}
|
|
|
|
func (m *mockWorkQueue) GetStats(ctx context.Context) (*domain.WorkQueueStats, error) {
|
|
if m.err != nil {
|
|
return nil, m.err
|
|
}
|
|
stats := &domain.WorkQueueStats{}
|
|
for _, task := range m.tasks {
|
|
switch task.Status {
|
|
case domain.WorkTaskStatusPending:
|
|
stats.Pending++
|
|
case domain.WorkTaskStatusRunning:
|
|
stats.Running++
|
|
case domain.WorkTaskStatusCompleted:
|
|
stats.Completed++
|
|
case domain.WorkTaskStatusFailed:
|
|
stats.Failed++
|
|
case domain.WorkTaskStatusCancelled:
|
|
stats.Cancelled++
|
|
}
|
|
}
|
|
return stats, nil
|
|
}
|
|
|
|
func (m *mockWorkQueue) CleanupOld(ctx context.Context, olderThan time.Duration) (int64, error) {
|
|
return 0, m.err
|
|
}
|
|
|
|
func (m *mockWorkQueue) RequeueStale(ctx context.Context, timeout time.Duration) (int64, error) {
|
|
return 0, m.err
|
|
}
|
|
|
|
func (m *mockWorkQueue) RequeueStaleWithIDs(ctx context.Context, timeout time.Duration) ([]string, error) {
|
|
return nil, m.err
|
|
}
|
|
|
|
func TestWorkHandler_Enqueue(t *testing.T) {
|
|
mockQueue := newMockWorkQueue()
|
|
workService := service.NewWorkService(mockQueue)
|
|
handler := NewWorkHandler(workService)
|
|
|
|
router := chi.NewRouter()
|
|
router.Use(testAdminAuth)
|
|
handler.Mount(router)
|
|
|
|
tests := []struct {
|
|
name string
|
|
body EnqueueWorkRequest
|
|
wantStatus int
|
|
}{
|
|
{
|
|
name: "valid_build_task",
|
|
body: EnqueueWorkRequest{
|
|
ProjectID: "test-project",
|
|
TaskType: "build",
|
|
Spec: map[string]any{
|
|
"prompt": "Build a landing page",
|
|
"template": "nextjs-landing",
|
|
},
|
|
},
|
|
wantStatus: http.StatusCreated,
|
|
},
|
|
{
|
|
name: "valid_test_task",
|
|
body: EnqueueWorkRequest{
|
|
ProjectID: "test-project",
|
|
TaskType: "test",
|
|
Spec: map[string]any{
|
|
"test_command": "npm test",
|
|
},
|
|
},
|
|
wantStatus: http.StatusCreated,
|
|
},
|
|
{
|
|
name: "valid_deploy_task",
|
|
body: EnqueueWorkRequest{
|
|
ProjectID: "test-project",
|
|
TaskType: "deploy",
|
|
Spec: map[string]any{
|
|
"image": "registry.example.com/app:latest",
|
|
"replicas": 2,
|
|
},
|
|
},
|
|
wantStatus: http.StatusCreated,
|
|
},
|
|
{
|
|
name: "valid_custom_task",
|
|
body: EnqueueWorkRequest{
|
|
ProjectID: "test-project",
|
|
TaskType: "custom",
|
|
Spec: map[string]any{
|
|
"action": "cleanup",
|
|
},
|
|
},
|
|
wantStatus: http.StatusCreated,
|
|
},
|
|
{
|
|
name: "missing_project_id",
|
|
body: EnqueueWorkRequest{
|
|
TaskType: "build",
|
|
Spec: map[string]any{},
|
|
},
|
|
wantStatus: http.StatusBadRequest,
|
|
},
|
|
{
|
|
name: "missing_task_type",
|
|
body: EnqueueWorkRequest{
|
|
ProjectID: "test-project",
|
|
Spec: map[string]any{},
|
|
},
|
|
wantStatus: http.StatusBadRequest,
|
|
},
|
|
{
|
|
name: "invalid_task_type",
|
|
body: EnqueueWorkRequest{
|
|
ProjectID: "test-project",
|
|
TaskType: "invalid",
|
|
Spec: map[string]any{},
|
|
},
|
|
wantStatus: http.StatusBadRequest,
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
body, _ := json.Marshal(tt.body)
|
|
req := httptest.NewRequest(http.MethodPost, "/work/enqueue", bytes.NewReader(body))
|
|
req.Header.Set("Content-Type", "application/json")
|
|
rec := httptest.NewRecorder()
|
|
|
|
router.ServeHTTP(rec, req)
|
|
|
|
if rec.Code != tt.wantStatus {
|
|
t.Errorf("got status %d, want %d; body: %s", rec.Code, tt.wantStatus, rec.Body.String())
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestWorkHandler_Dequeue(t *testing.T) {
|
|
mockQueue := newMockWorkQueue()
|
|
workService := service.NewWorkService(mockQueue)
|
|
handler := NewWorkHandler(workService)
|
|
|
|
// Pre-populate a pending task
|
|
mockQueue.tasks["task-1"] = &domain.WorkTask{
|
|
ID: "task-1",
|
|
ProjectID: "test-project",
|
|
Type: domain.WorkTaskTypeBuild,
|
|
Status: domain.WorkTaskStatusPending,
|
|
CreatedAt: time.Now(),
|
|
}
|
|
|
|
router := chi.NewRouter()
|
|
router.Use(testAdminAuth)
|
|
handler.Mount(router)
|
|
|
|
tests := []struct {
|
|
name string
|
|
body DequeueWorkRequest
|
|
wantStatus int
|
|
wantTask bool
|
|
}{
|
|
{
|
|
name: "valid_dequeue",
|
|
body: DequeueWorkRequest{WorkerID: "worker-1"},
|
|
wantStatus: http.StatusOK,
|
|
wantTask: true,
|
|
},
|
|
{
|
|
name: "missing_worker_id",
|
|
body: DequeueWorkRequest{},
|
|
wantStatus: http.StatusBadRequest,
|
|
wantTask: false,
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
body, _ := json.Marshal(tt.body)
|
|
req := httptest.NewRequest(http.MethodPost, "/work/dequeue", bytes.NewReader(body))
|
|
req.Header.Set("Content-Type", "application/json")
|
|
rec := httptest.NewRecorder()
|
|
|
|
router.ServeHTTP(rec, req)
|
|
|
|
if rec.Code != tt.wantStatus {
|
|
t.Errorf("got status %d, want %d; body: %s", rec.Code, tt.wantStatus, rec.Body.String())
|
|
}
|
|
|
|
if tt.wantTask && rec.Code == http.StatusOK {
|
|
var resp map[string]any
|
|
if err := json.Unmarshal(rec.Body.Bytes(), &resp); err != nil {
|
|
t.Fatalf("failed to unmarshal response: %v", err)
|
|
}
|
|
data, ok := resp["data"].(map[string]any)
|
|
if !ok {
|
|
t.Fatal("response missing data field")
|
|
}
|
|
if data["task"] == nil {
|
|
t.Error("expected task in response")
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestWorkHandler_Complete(t *testing.T) {
|
|
mockQueue := newMockWorkQueue()
|
|
workService := service.NewWorkService(mockQueue)
|
|
handler := NewWorkHandler(workService)
|
|
|
|
// Pre-populate a running task
|
|
mockQueue.tasks["task-1"] = &domain.WorkTask{
|
|
ID: "task-1",
|
|
ProjectID: "test-project",
|
|
Type: domain.WorkTaskTypeBuild,
|
|
Status: domain.WorkTaskStatusRunning,
|
|
WorkerID: "worker-1",
|
|
CreatedAt: time.Now(),
|
|
}
|
|
|
|
router := chi.NewRouter()
|
|
router.Use(testAdminAuth)
|
|
handler.Mount(router)
|
|
|
|
tests := []struct {
|
|
name string
|
|
taskID string
|
|
body CompleteWorkRequest
|
|
wantStatus int
|
|
}{
|
|
{
|
|
name: "valid_complete",
|
|
taskID: "task-1",
|
|
body: CompleteWorkRequest{
|
|
Output: "Build successful",
|
|
Artifacts: map[string]string{
|
|
"commit_sha": "abc123",
|
|
},
|
|
},
|
|
wantStatus: http.StatusOK,
|
|
},
|
|
{
|
|
name: "task_not_found",
|
|
taskID: "nonexistent",
|
|
body: CompleteWorkRequest{Output: "Done"},
|
|
wantStatus: http.StatusNotFound,
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
body, _ := json.Marshal(tt.body)
|
|
req := httptest.NewRequest(http.MethodPost, "/work/"+tt.taskID+"/complete", bytes.NewReader(body))
|
|
req.Header.Set("Content-Type", "application/json")
|
|
rec := httptest.NewRecorder()
|
|
|
|
router.ServeHTTP(rec, req)
|
|
|
|
if rec.Code != tt.wantStatus {
|
|
t.Errorf("got status %d, want %d; body: %s", rec.Code, tt.wantStatus, rec.Body.String())
|
|
}
|
|
})
|
|
}
|
|
}
|