rdev/internal/handlers/work.go
jordan d74efb75ff fix: wire workService to WorkersHandler and add /work/tasks endpoint
Critical fix: WorkersHandler was missing workService dependency, causing
500 errors when workers tried to fail tasks. This caused tasks to get
stuck in "running" state permanently.

Also adds:
- /work/tasks endpoint for debugging all tasks across projects
- List method to WorkQueue interface for admin views
- HTTP client tests for api_client.go and claudebox/client.go (48 tests)
- Split work.go DTOs into work_dto.go to stay under 500 lines

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-06 10:35:39 -07:00

425 lines
12 KiB
Go

// Package handlers provides HTTP handlers for the rdev API.
package handlers
import (
"errors"
"fmt"
"net/http"
"strconv"
"github.com/go-chi/chi/v5"
"github.com/orchard9/rdev/internal/auth"
"github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/service"
"github.com/orchard9/rdev/internal/validate"
"github.com/orchard9/rdev/pkg/api"
)
// WorkHandler handles work queue endpoints.
type WorkHandler struct {
workService *service.WorkService
}
// NewWorkHandler creates a new work handler.
func NewWorkHandler(workService *service.WorkService) *WorkHandler {
return &WorkHandler{
workService: workService,
}
}
// Mount registers the work queue routes.
func (h *WorkHandler) Mount(r api.Router) {
r.Route("/work", func(r chi.Router) {
// Write operations (task submission and management)
r.With(auth.RequireScope(auth.ScopeQueueWrite, auth.ScopeAdmin)).Post("/enqueue", h.Enqueue)
r.With(auth.RequireScope(auth.ScopeQueueWrite, auth.ScopeAdmin)).Post("/dequeue", h.Dequeue)
r.With(auth.RequireScope(auth.ScopeQueueWrite, auth.ScopeAdmin)).Post("/{taskId}/complete", h.Complete)
r.With(auth.RequireScope(auth.ScopeQueueWrite, auth.ScopeAdmin)).Post("/{taskId}/fail", h.Fail)
r.With(auth.RequireScope(auth.ScopeQueueWrite, auth.ScopeAdmin)).Post("/{taskId}/cancel", h.Cancel)
// Read operations
r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/tasks", h.ListTasks)
r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/stats", h.Stats)
r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/projects/{projectId}", h.ListByProject)
r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/{taskId}", h.GetTask)
r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/{taskId}/status", h.GetStatus)
})
}
// EnqueueWorkRequest is the request body for POST /work/enqueue.
type EnqueueWorkRequest struct {
ProjectID string `json:"project_id"`
TaskType string `json:"task_type"`
Spec map[string]any `json:"task_spec"`
Priority int `json:"priority,omitempty"`
CallbackURL string `json:"callback_url,omitempty"`
MaxRetries int `json:"max_retries,omitempty"`
}
// EnqueueWorkResponse is the response for POST /work/enqueue.
type EnqueueWorkResponse struct {
TaskID string `json:"task_id"`
StatusURL string `json:"status_url"`
}
// Enqueue adds a task to the work queue.
// POST /work/enqueue
func (h *WorkHandler) Enqueue(w http.ResponseWriter, r *http.Request) {
var req EnqueueWorkRequest
if err := api.DecodeJSON(r, &req); err != nil {
api.WriteBadRequest(w, r, "invalid request body")
return
}
// Validate required fields
v := validate.New()
v.Required(req.ProjectID, "project_id")
v.Required(req.TaskType, "task_type")
if err := v.Error(); err != nil {
api.WriteBadRequest(w, r, err.Error())
return
}
// Validate task type
taskType := domain.WorkTaskType(req.TaskType)
if !taskType.IsValid() {
api.WriteBadRequest(w, r, "task_type must be one of: build, test, deploy, custom, verify")
return
}
// Validate callback URL if provided
if err := validate.HTTPURL(req.CallbackURL, "callback_url"); err != nil {
api.WriteBadRequest(w, r, "callback_url must be a valid HTTP/HTTPS URL")
return
}
result, err := h.workService.EnqueueTask(r.Context(), service.EnqueueTaskRequest{
ProjectID: req.ProjectID,
Type: taskType,
Spec: req.Spec,
Priority: req.Priority,
CallbackURL: req.CallbackURL,
MaxRetries: req.MaxRetries,
})
if err != nil {
api.WriteInternalError(w, r, "failed to enqueue task")
return
}
api.WriteCreated(w, r, EnqueueWorkResponse{
TaskID: result.TaskID,
StatusURL: result.StatusURL,
})
}
// DequeueWorkRequest is the request body for POST /work/dequeue.
type DequeueWorkRequest struct {
WorkerID string `json:"worker_id"`
}
// DequeueWorkResponse is the response for POST /work/dequeue.
type DequeueWorkResponse struct {
Task *WorkTaskDTO `json:"task,omitempty"`
}
// Dequeue claims the next available task for a worker.
// POST /work/dequeue
func (h *WorkHandler) Dequeue(w http.ResponseWriter, r *http.Request) {
var req DequeueWorkRequest
if err := api.DecodeJSON(r, &req); err != nil {
api.WriteBadRequest(w, r, "invalid request body")
return
}
if req.WorkerID == "" {
api.WriteBadRequest(w, r, "worker_id is required")
return
}
task, err := h.workService.DequeueTask(r.Context(), req.WorkerID)
if err != nil {
api.WriteInternalError(w, r, "failed to dequeue task")
return
}
api.WriteSuccess(w, r, DequeueWorkResponse{
Task: toWorkTaskDTO(task),
})
}
// GetTask retrieves a task by ID.
// GET /work/{taskId}
func (h *WorkHandler) GetTask(w http.ResponseWriter, r *http.Request) {
taskID := chi.URLParam(r, "taskId")
task, err := h.workService.GetTask(r.Context(), taskID)
if err != nil {
if errors.Is(err, domain.ErrWorkTaskNotFound) {
api.WriteNotFound(w, r, fmt.Sprintf("task not found: %s", taskID))
return
}
api.WriteInternalError(w, r, "failed to get task")
return
}
api.WriteSuccess(w, r, toWorkTaskDTO(task))
}
// GetStatus returns the status of a task.
// GET /work/{taskId}/status
func (h *WorkHandler) GetStatus(w http.ResponseWriter, r *http.Request) {
taskID := chi.URLParam(r, "taskId")
task, err := h.workService.GetTask(r.Context(), taskID)
if err != nil {
if errors.Is(err, domain.ErrWorkTaskNotFound) {
api.WriteNotFound(w, r, fmt.Sprintf("task not found: %s", taskID))
return
}
api.WriteInternalError(w, r, "failed to get task")
return
}
api.WriteSuccess(w, r, map[string]any{
"task_id": task.ID,
"status": string(task.Status),
"error": task.Error,
})
}
// CompleteWorkRequest is the request body for POST /work/{taskId}/complete.
type CompleteWorkRequest struct {
Output string `json:"output,omitempty"`
Artifacts map[string]string `json:"artifacts,omitempty"`
}
// Complete marks a task as successfully completed.
// POST /work/{taskId}/complete
func (h *WorkHandler) Complete(w http.ResponseWriter, r *http.Request) {
taskID := chi.URLParam(r, "taskId")
var req CompleteWorkRequest
if err := api.DecodeJSON(r, &req); err != nil {
api.WriteBadRequest(w, r, "invalid request body")
return
}
result := &domain.WorkResult{
Output: req.Output,
Artifacts: req.Artifacts,
}
if err := h.workService.CompleteTask(r.Context(), taskID, result); err != nil {
if errors.Is(err, domain.ErrWorkTaskNotFound) {
api.WriteNotFound(w, r, fmt.Sprintf("task not found: %s", taskID))
return
}
api.WriteInternalError(w, r, "failed to complete task")
return
}
api.WriteSuccess(w, r, map[string]any{
"task_id": taskID,
"status": "completed",
"message": "task completed successfully",
})
}
// FailWorkRequest is the request body for POST /work/{taskId}/fail.
type FailWorkRequest struct {
Error string `json:"error"`
}
// Fail marks a task as failed.
// POST /work/{taskId}/fail
func (h *WorkHandler) Fail(w http.ResponseWriter, r *http.Request) {
taskID := chi.URLParam(r, "taskId")
var req FailWorkRequest
if err := api.DecodeJSON(r, &req); err != nil {
api.WriteBadRequest(w, r, "invalid request body")
return
}
if req.Error == "" {
api.WriteBadRequest(w, r, "error message is required")
return
}
if err := h.workService.FailTask(r.Context(), taskID, req.Error); err != nil {
if errors.Is(err, domain.ErrWorkTaskNotFound) {
api.WriteNotFound(w, r, fmt.Sprintf("task not found: %s", taskID))
return
}
api.WriteInternalError(w, r, "failed to fail task")
return
}
api.WriteSuccess(w, r, map[string]any{
"task_id": taskID,
"status": "failed",
"message": "task marked as failed",
})
}
// Cancel cancels a pending task.
// POST /work/{taskId}/cancel
func (h *WorkHandler) Cancel(w http.ResponseWriter, r *http.Request) {
taskID := chi.URLParam(r, "taskId")
if err := h.workService.CancelTask(r.Context(), taskID); err != nil {
if errors.Is(err, domain.ErrWorkTaskNotFound) {
api.WriteNotFound(w, r, fmt.Sprintf("task not found: %s", taskID))
return
}
api.WriteBadRequest(w, r, err.Error())
return
}
api.WriteSuccess(w, r, map[string]any{
"task_id": taskID,
"status": "cancelled",
"message": "task cancelled successfully",
})
}
// ListTasks returns all tasks with optional status filter and pagination.
// GET /work/tasks?status=running&limit=50&offset=0
func (h *WorkHandler) ListTasks(w http.ResponseWriter, r *http.Request) {
// Parse and validate optional status filter
var status *domain.WorkTaskStatus
if s := r.URL.Query().Get("status"); s != "" {
st := domain.WorkTaskStatus(s)
if !st.IsValid() {
api.WriteBadRequest(w, r, "invalid status filter: must be pending, running, completed, failed, or cancelled")
return
}
status = &st
}
// Parse pagination options
opts := domain.DefaultWorkListOptions()
if limitStr := r.URL.Query().Get("limit"); limitStr != "" {
limit, err := strconv.Atoi(limitStr)
if err != nil {
api.WriteBadRequest(w, r, "limit must be a valid integer")
return
}
opts.Limit = limit
}
if offsetStr := r.URL.Query().Get("offset"); offsetStr != "" {
offset, err := strconv.Atoi(offsetStr)
if err != nil {
api.WriteBadRequest(w, r, "offset must be a valid integer")
return
}
opts.Offset = offset
}
result, err := h.workService.List(r.Context(), status, opts)
if err != nil {
api.WriteInternalError(w, r, "failed to list tasks")
return
}
dtos := make([]*WorkTaskDTO, len(result.Tasks))
for i, t := range result.Tasks {
dtos[i] = toWorkTaskDTO(t)
}
api.WriteSuccess(w, r, map[string]any{
"tasks": dtos,
"total": result.Total,
"limit": result.Limit,
"offset": result.Offset,
})
}
// ListByProject returns tasks for a project with pagination.
// GET /work/projects/{projectId}?status=pending&limit=50&offset=0
func (h *WorkHandler) ListByProject(w http.ResponseWriter, r *http.Request) {
projectID := chi.URLParam(r, "projectId")
// Parse and validate optional status filter
var status *domain.WorkTaskStatus
if s := r.URL.Query().Get("status"); s != "" {
st := domain.WorkTaskStatus(s)
if !st.IsValid() {
api.WriteBadRequest(w, r, "invalid status filter: must be pending, running, completed, failed, or cancelled")
return
}
status = &st
}
// Parse pagination options
opts := domain.DefaultWorkListOptions()
if limitStr := r.URL.Query().Get("limit"); limitStr != "" {
limit, err := strconv.Atoi(limitStr)
if err != nil {
api.WriteBadRequest(w, r, "limit must be a valid integer")
return
}
opts.Limit = limit
}
if offsetStr := r.URL.Query().Get("offset"); offsetStr != "" {
offset, err := strconv.Atoi(offsetStr)
if err != nil {
api.WriteBadRequest(w, r, "offset must be a valid integer")
return
}
opts.Offset = offset
}
result, err := h.workService.ListByProject(r.Context(), projectID, status, opts)
if err != nil {
api.WriteInternalError(w, r, "failed to list tasks")
return
}
dtos := make([]*WorkTaskDTO, len(result.Tasks))
for i, t := range result.Tasks {
dtos[i] = toWorkTaskDTO(t)
}
api.WriteSuccess(w, r, map[string]any{
"tasks": dtos,
"total": result.Total,
"limit": result.Limit,
"offset": result.Offset,
})
}
// WorkStatsResponse is the response for GET /work/stats.
type WorkStatsResponse struct {
Pending int64 `json:"pending"`
Running int64 `json:"running"`
Completed int64 `json:"completed"`
Failed int64 `json:"failed"`
Cancelled int64 `json:"cancelled"`
OldestPending string `json:"oldest_pending,omitempty"`
}
// Stats returns queue statistics.
// GET /work/stats
func (h *WorkHandler) Stats(w http.ResponseWriter, r *http.Request) {
stats, err := h.workService.GetStats(r.Context())
if err != nil {
api.WriteInternalError(w, r, "failed to get queue stats")
return
}
resp := WorkStatsResponse{
Pending: stats.Pending,
Running: stats.Running,
Completed: stats.Completed,
Failed: stats.Failed,
Cancelled: stats.Cancelled,
}
if stats.OldestPending != nil {
// Convert to human-readable duration
resp.OldestPending = formatDuration(*stats.OldestPending)
}
api.WriteSuccess(w, r, resp)
}