From aaf66764fb591efbd4936905105d288589cb2990 Mon Sep 17 00:00:00 2001 From: jordan Date: Mon, 2 Feb 2026 19:55:37 -0700 Subject: [PATCH] feat: add worker pool infrastructure for composable projects - Add POST /workers/register and POST /workers/{workerId}/heartbeat endpoints - Start worker health checker goroutine in main.go - Fix network policy to allow K8s API server access (includes real endpoint IPs) - Add rdev.orchard9.ai/role: worker label to claudebox StatefulSet This enables the embedded WorkExecutor to reach claudebox-0 for executing builds on composable projects that don't have dedicated pods. Co-Authored-By: Claude Opus 4.5 --- cmd/rdev-api/main.go | 3 + deployments/k8s/base/claudebox.yaml | 1 + deployments/k8s/base/network-policy.yaml | 21 ++++++- internal/handlers/workers.go | 70 ++++++++++++++++++++++++ 4 files changed, 94 insertions(+), 1 deletion(-) diff --git a/cmd/rdev-api/main.go b/cmd/rdev-api/main.go index 1ea5db1..de3e0e5 100644 --- a/cmd/rdev-api/main.go +++ b/cmd/rdev-api/main.go @@ -238,6 +238,9 @@ func main() { workerService := service.NewWorkerService(workerRegistryRepo, workQueueRepo, logger). WithBuildAudit(buildAuditRepo) + // Start worker health checker (marks stale workers offline) + go workerService.StartHealthChecker(context.Background()) + // Create build service (orchestrates build submission and tracking) buildService := service.NewBuildService(workQueueRepo, buildAuditRepo, logger) diff --git a/deployments/k8s/base/claudebox.yaml b/deployments/k8s/base/claudebox.yaml index 9a0f46c..65a1a49 100644 --- a/deployments/k8s/base/claudebox.yaml +++ b/deployments/k8s/base/claudebox.yaml @@ -18,6 +18,7 @@ spec: app: claudebox app.kubernetes.io/name: claudebox app.kubernetes.io/part-of: rdev + rdev.orchard9.ai/role: worker spec: containers: - name: claudebox diff --git a/deployments/k8s/base/network-policy.yaml b/deployments/k8s/base/network-policy.yaml index 3d1fb04..880ed93 100644 --- a/deployments/k8s/base/network-policy.yaml +++ b/deployments/k8s/base/network-policy.yaml @@ -33,6 +33,20 @@ spec: - protocol: TCP port: 8080 egress: + # Allow egress to Kubernetes API server (required for kubectl exec into worker pods) + # Must include both the ClusterIP (10.43.0.1) and real endpoints + - to: + - ipBlock: + cidr: 10.43.0.1/32 + - ipBlock: + cidr: 208.122.204.172/32 + - ipBlock: + cidr: 208.122.204.173/32 + ports: + - protocol: TCP + port: 443 + - protocol: TCP + port: 6443 # Allow egress to PostgreSQL in databases namespace - to: - namespaceSelector: @@ -57,7 +71,12 @@ spec: ports: - protocol: TCP port: 6379 - # Allow egress to claudebox pods within the rdev namespace + # Allow egress to worker pods (claudebox) within the rdev namespace + - to: + - podSelector: + matchLabels: + rdev.orchard9.ai/role: worker + # Allow egress to project pods within the rdev namespace - to: - podSelector: matchLabels: diff --git a/internal/handlers/workers.go b/internal/handlers/workers.go index 5cc1309..1a8f414 100644 --- a/internal/handlers/workers.go +++ b/internal/handlers/workers.go @@ -28,8 +28,13 @@ func NewWorkersHandler(workerService *service.WorkerService) *WorkersHandler { // Mount registers the worker pool routes. func (h *WorkersHandler) Mount(r api.Router) { r.Route("/workers", func(r chi.Router) { + // Read operations r.With(auth.RequireScope(auth.ScopeWorkersRead, auth.ScopeAdmin)).Get("/", h.List) r.With(auth.RequireScope(auth.ScopeWorkersRead, auth.ScopeAdmin)).Get("/{workerId}", h.Get) + + // Write operations + r.With(auth.RequireScope(auth.ScopeWorkersWrite, auth.ScopeAdmin)).Post("/register", h.Register) + r.With(auth.RequireScope(auth.ScopeWorkersWrite, auth.ScopeAdmin)).Post("/{workerId}/heartbeat", h.Heartbeat) r.With(auth.RequireScope(auth.ScopeWorkersWrite, auth.ScopeAdmin)).Post("/{workerId}/drain", h.Drain) }) } @@ -136,6 +141,71 @@ func (h *WorkersHandler) Get(w http.ResponseWriter, r *http.Request) { api.WriteSuccess(w, r, toWorkerDTO(worker)) } +// RegisterWorkerRequest is the request body for POST /workers/register. +type RegisterWorkerRequest struct { + ID string `json:"id"` + Hostname string `json:"hostname"` + Version string `json:"version,omitempty"` + Capabilities []string `json:"capabilities,omitempty"` +} + +// Register handles worker self-registration. +// POST /workers/register +func (h *WorkersHandler) Register(w http.ResponseWriter, r *http.Request) { + var req RegisterWorkerRequest + if err := api.DecodeJSON(r, &req); err != nil { + api.WriteBadRequest(w, r, "invalid request body") + return + } + + if req.ID == "" { + api.WriteBadRequest(w, r, "worker id is required") + return + } + if req.Hostname == "" { + api.WriteBadRequest(w, r, "hostname is required") + return + } + + worker := &domain.Worker{ + ID: req.ID, + Hostname: req.Hostname, + Version: req.Version, + Capabilities: req.Capabilities, + } + + if err := h.workerService.Register(r.Context(), worker); err != nil { + api.WriteInternalError(w, r, "failed to register worker") + return + } + + api.WriteCreated(w, r, toWorkerDTO(worker)) +} + +// Heartbeat handles worker heartbeat. +// POST /workers/{workerId}/heartbeat +func (h *WorkersHandler) Heartbeat(w http.ResponseWriter, r *http.Request) { + workerID := chi.URLParam(r, "workerId") + if workerID == "" { + api.WriteBadRequest(w, r, "worker ID is required") + return + } + + if err := h.workerService.Heartbeat(r.Context(), workerID); err != nil { + if errors.Is(err, domain.ErrWorkerNotFound) { + api.WriteNotFound(w, r, "worker not found: "+workerID) + return + } + api.WriteInternalError(w, r, "failed to update heartbeat") + return + } + + api.WriteSuccess(w, r, map[string]any{ + "worker_id": workerID, + "status": "ok", + }) +} + // Drain sets a worker to draining status. // POST /workers/{workerId}/drain func (h *WorkersHandler) Drain(w http.ResponseWriter, r *http.Request) {