// Package handlers provides HTTP handlers for the rdev API. package handlers import ( "errors" "net/http" "github.com/go-chi/chi/v5" "github.com/orchard9/rdev/internal/auth" "github.com/orchard9/rdev/internal/domain" "github.com/orchard9/rdev/internal/port" "github.com/orchard9/rdev/internal/service" "github.com/orchard9/rdev/pkg/api" ) // WorkersHandler handles worker pool management endpoints. type WorkersHandler struct { workerService *service.WorkerService } // NewWorkersHandler creates a new workers handler. func NewWorkersHandler(workerService *service.WorkerService) *WorkersHandler { return &WorkersHandler{ workerService: workerService, } } // Mount registers the worker pool routes. func (h *WorkersHandler) Mount(r api.Router) { r.Route("/workers", func(r chi.Router) { // Read operations r.With(auth.RequireScope(auth.ScopeWorkersRead, auth.ScopeAdmin)).Get("/", h.List) r.With(auth.RequireScope(auth.ScopeWorkersRead, auth.ScopeAdmin)).Get("/{workerId}", h.Get) // Write operations r.With(auth.RequireScope(auth.ScopeWorkersWrite, auth.ScopeAdmin)).Post("/register", h.Register) r.With(auth.RequireScope(auth.ScopeWorkersWrite, auth.ScopeAdmin)).Post("/{workerId}/heartbeat", h.Heartbeat) r.With(auth.RequireScope(auth.ScopeWorkersWrite, auth.ScopeAdmin)).Post("/{workerId}/drain", h.Drain) }) } // WorkerDTO is the data transfer object for workers. type WorkerDTO struct { ID string `json:"id"` Hostname string `json:"hostname"` Status string `json:"status"` CurrentTask string `json:"current_task,omitempty"` Capabilities []string `json:"capabilities,omitempty"` RegisteredAt string `json:"registered_at"` LastHeartbeat string `json:"last_heartbeat"` Version string `json:"version,omitempty"` } func toWorkerDTO(w *domain.Worker) *WorkerDTO { if w == nil { return nil } return &WorkerDTO{ ID: w.ID, Hostname: w.Hostname, Status: string(w.Status), CurrentTask: w.CurrentTask, Capabilities: w.Capabilities, RegisteredAt: w.RegisteredAt.Format("2006-01-02T15:04:05Z07:00"), LastHeartbeat: w.LastHeartbeat.Format("2006-01-02T15:04:05Z07:00"), Version: w.Version, } } // List returns all workers with optional status filter. // GET /workers?status=idle func (h *WorkersHandler) List(w http.ResponseWriter, r *http.Request) { filter := port.WorkerFilter{} if s := r.URL.Query().Get("status"); s != "" { st := domain.WorkerStatus(s) if !st.IsValid() { api.WriteBadRequest(w, r, "invalid status: must be idle, busy, draining, or offline") return } filter.Status = &st } workers, err := h.workerService.ListWorkers(r.Context(), filter) if err != nil { api.WriteInternalError(w, r, "failed to list workers") return } dtos := make([]*WorkerDTO, len(workers)) for i, wkr := range workers { dtos[i] = toWorkerDTO(wkr) } // Compute summary counts idle, busy, draining, offline := 0, 0, 0, 0 for _, wkr := range workers { switch wkr.Status { case domain.WorkerStatusIdle: idle++ case domain.WorkerStatusBusy: busy++ case domain.WorkerStatusDraining: draining++ case domain.WorkerStatusOffline: offline++ } } api.WriteSuccess(w, r, map[string]any{ "workers": dtos, "total": len(dtos), "summary": map[string]int{ "idle": idle, "busy": busy, "draining": draining, "offline": offline, }, }) } // Get returns a specific worker by ID. // GET /workers/{workerId} func (h *WorkersHandler) Get(w http.ResponseWriter, r *http.Request) { workerID := chi.URLParam(r, "workerId") if workerID == "" { api.WriteBadRequest(w, r, "worker ID is required") return } worker, err := h.workerService.GetWorker(r.Context(), workerID) if err != nil { if errors.Is(err, domain.ErrWorkerNotFound) { api.WriteNotFound(w, r, "worker not found: "+workerID) return } api.WriteInternalError(w, r, "failed to get worker") return } api.WriteSuccess(w, r, toWorkerDTO(worker)) } // RegisterWorkerRequest is the request body for POST /workers/register. type RegisterWorkerRequest struct { ID string `json:"id"` Hostname string `json:"hostname"` Version string `json:"version,omitempty"` Capabilities []string `json:"capabilities,omitempty"` } // Register handles worker self-registration. // POST /workers/register func (h *WorkersHandler) Register(w http.ResponseWriter, r *http.Request) { var req RegisterWorkerRequest if err := api.DecodeJSON(r, &req); err != nil { api.WriteBadRequest(w, r, "invalid request body") return } if req.ID == "" { api.WriteBadRequest(w, r, "worker id is required") return } if req.Hostname == "" { api.WriteBadRequest(w, r, "hostname is required") return } worker := &domain.Worker{ ID: req.ID, Hostname: req.Hostname, Version: req.Version, Capabilities: req.Capabilities, } if err := h.workerService.Register(r.Context(), worker); err != nil { api.WriteInternalError(w, r, "failed to register worker") return } api.WriteCreated(w, r, toWorkerDTO(worker)) } // Heartbeat handles worker heartbeat. // POST /workers/{workerId}/heartbeat func (h *WorkersHandler) Heartbeat(w http.ResponseWriter, r *http.Request) { workerID := chi.URLParam(r, "workerId") if workerID == "" { api.WriteBadRequest(w, r, "worker ID is required") return } if err := h.workerService.Heartbeat(r.Context(), workerID); err != nil { if errors.Is(err, domain.ErrWorkerNotFound) { api.WriteNotFound(w, r, "worker not found: "+workerID) return } api.WriteInternalError(w, r, "failed to update heartbeat") return } api.WriteSuccess(w, r, map[string]any{ "worker_id": workerID, "status": "ok", }) } // Drain sets a worker to draining status. // POST /workers/{workerId}/drain func (h *WorkersHandler) Drain(w http.ResponseWriter, r *http.Request) { workerID := chi.URLParam(r, "workerId") if workerID == "" { api.WriteBadRequest(w, r, "worker ID is required") return } if err := h.workerService.DrainWorker(r.Context(), workerID); err != nil { if errors.Is(err, domain.ErrWorkerNotFound) { api.WriteNotFound(w, r, "worker not found: "+workerID) return } api.WriteInternalError(w, r, "failed to drain worker") return } api.WriteSuccess(w, r, map[string]any{ "worker_id": workerID, "status": "draining", "message": "worker will finish current task then stop accepting new work", }) }