feat: add worker pool infrastructure for composable projects
- Add POST /workers/register and POST /workers/{workerId}/heartbeat endpoints
- Start worker health checker goroutine in main.go
- Fix network policy to allow K8s API server access (includes real endpoint IPs)
- Add rdev.orchard9.ai/role: worker label to claudebox StatefulSet
This enables the embedded WorkExecutor to reach claudebox-0 for executing
builds on composable projects that don't have dedicated pods.
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
fc0f6db184
commit
aaf66764fb
@ -238,6 +238,9 @@ func main() {
|
|||||||
workerService := service.NewWorkerService(workerRegistryRepo, workQueueRepo, logger).
|
workerService := service.NewWorkerService(workerRegistryRepo, workQueueRepo, logger).
|
||||||
WithBuildAudit(buildAuditRepo)
|
WithBuildAudit(buildAuditRepo)
|
||||||
|
|
||||||
|
// Start worker health checker (marks stale workers offline)
|
||||||
|
go workerService.StartHealthChecker(context.Background())
|
||||||
|
|
||||||
// Create build service (orchestrates build submission and tracking)
|
// Create build service (orchestrates build submission and tracking)
|
||||||
buildService := service.NewBuildService(workQueueRepo, buildAuditRepo, logger)
|
buildService := service.NewBuildService(workQueueRepo, buildAuditRepo, logger)
|
||||||
|
|
||||||
|
|||||||
@ -18,6 +18,7 @@ spec:
|
|||||||
app: claudebox
|
app: claudebox
|
||||||
app.kubernetes.io/name: claudebox
|
app.kubernetes.io/name: claudebox
|
||||||
app.kubernetes.io/part-of: rdev
|
app.kubernetes.io/part-of: rdev
|
||||||
|
rdev.orchard9.ai/role: worker
|
||||||
spec:
|
spec:
|
||||||
containers:
|
containers:
|
||||||
- name: claudebox
|
- name: claudebox
|
||||||
|
|||||||
@ -33,6 +33,20 @@ spec:
|
|||||||
- protocol: TCP
|
- protocol: TCP
|
||||||
port: 8080
|
port: 8080
|
||||||
egress:
|
egress:
|
||||||
|
# Allow egress to Kubernetes API server (required for kubectl exec into worker pods)
|
||||||
|
# Must include both the ClusterIP (10.43.0.1) and real endpoints
|
||||||
|
- to:
|
||||||
|
- ipBlock:
|
||||||
|
cidr: 10.43.0.1/32
|
||||||
|
- ipBlock:
|
||||||
|
cidr: 208.122.204.172/32
|
||||||
|
- ipBlock:
|
||||||
|
cidr: 208.122.204.173/32
|
||||||
|
ports:
|
||||||
|
- protocol: TCP
|
||||||
|
port: 443
|
||||||
|
- protocol: TCP
|
||||||
|
port: 6443
|
||||||
# Allow egress to PostgreSQL in databases namespace
|
# Allow egress to PostgreSQL in databases namespace
|
||||||
- to:
|
- to:
|
||||||
- namespaceSelector:
|
- namespaceSelector:
|
||||||
@ -57,7 +71,12 @@ spec:
|
|||||||
ports:
|
ports:
|
||||||
- protocol: TCP
|
- protocol: TCP
|
||||||
port: 6379
|
port: 6379
|
||||||
# Allow egress to claudebox pods within the rdev namespace
|
# Allow egress to worker pods (claudebox) within the rdev namespace
|
||||||
|
- to:
|
||||||
|
- podSelector:
|
||||||
|
matchLabels:
|
||||||
|
rdev.orchard9.ai/role: worker
|
||||||
|
# Allow egress to project pods within the rdev namespace
|
||||||
- to:
|
- to:
|
||||||
- podSelector:
|
- podSelector:
|
||||||
matchLabels:
|
matchLabels:
|
||||||
|
|||||||
@ -28,8 +28,13 @@ func NewWorkersHandler(workerService *service.WorkerService) *WorkersHandler {
|
|||||||
// Mount registers the worker pool routes.
|
// Mount registers the worker pool routes.
|
||||||
func (h *WorkersHandler) Mount(r api.Router) {
|
func (h *WorkersHandler) Mount(r api.Router) {
|
||||||
r.Route("/workers", func(r chi.Router) {
|
r.Route("/workers", func(r chi.Router) {
|
||||||
|
// Read operations
|
||||||
r.With(auth.RequireScope(auth.ScopeWorkersRead, auth.ScopeAdmin)).Get("/", h.List)
|
r.With(auth.RequireScope(auth.ScopeWorkersRead, auth.ScopeAdmin)).Get("/", h.List)
|
||||||
r.With(auth.RequireScope(auth.ScopeWorkersRead, auth.ScopeAdmin)).Get("/{workerId}", h.Get)
|
r.With(auth.RequireScope(auth.ScopeWorkersRead, auth.ScopeAdmin)).Get("/{workerId}", h.Get)
|
||||||
|
|
||||||
|
// Write operations
|
||||||
|
r.With(auth.RequireScope(auth.ScopeWorkersWrite, auth.ScopeAdmin)).Post("/register", h.Register)
|
||||||
|
r.With(auth.RequireScope(auth.ScopeWorkersWrite, auth.ScopeAdmin)).Post("/{workerId}/heartbeat", h.Heartbeat)
|
||||||
r.With(auth.RequireScope(auth.ScopeWorkersWrite, auth.ScopeAdmin)).Post("/{workerId}/drain", h.Drain)
|
r.With(auth.RequireScope(auth.ScopeWorkersWrite, auth.ScopeAdmin)).Post("/{workerId}/drain", h.Drain)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -136,6 +141,71 @@ func (h *WorkersHandler) Get(w http.ResponseWriter, r *http.Request) {
|
|||||||
api.WriteSuccess(w, r, toWorkerDTO(worker))
|
api.WriteSuccess(w, r, toWorkerDTO(worker))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RegisterWorkerRequest is the request body for POST /workers/register.
|
||||||
|
type RegisterWorkerRequest struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
Hostname string `json:"hostname"`
|
||||||
|
Version string `json:"version,omitempty"`
|
||||||
|
Capabilities []string `json:"capabilities,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register handles worker self-registration.
|
||||||
|
// POST /workers/register
|
||||||
|
func (h *WorkersHandler) Register(w http.ResponseWriter, r *http.Request) {
|
||||||
|
var req RegisterWorkerRequest
|
||||||
|
if err := api.DecodeJSON(r, &req); err != nil {
|
||||||
|
api.WriteBadRequest(w, r, "invalid request body")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.ID == "" {
|
||||||
|
api.WriteBadRequest(w, r, "worker id is required")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if req.Hostname == "" {
|
||||||
|
api.WriteBadRequest(w, r, "hostname is required")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
worker := &domain.Worker{
|
||||||
|
ID: req.ID,
|
||||||
|
Hostname: req.Hostname,
|
||||||
|
Version: req.Version,
|
||||||
|
Capabilities: req.Capabilities,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := h.workerService.Register(r.Context(), worker); err != nil {
|
||||||
|
api.WriteInternalError(w, r, "failed to register worker")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
api.WriteCreated(w, r, toWorkerDTO(worker))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Heartbeat handles worker heartbeat.
|
||||||
|
// POST /workers/{workerId}/heartbeat
|
||||||
|
func (h *WorkersHandler) Heartbeat(w http.ResponseWriter, r *http.Request) {
|
||||||
|
workerID := chi.URLParam(r, "workerId")
|
||||||
|
if workerID == "" {
|
||||||
|
api.WriteBadRequest(w, r, "worker ID is required")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := h.workerService.Heartbeat(r.Context(), workerID); err != nil {
|
||||||
|
if errors.Is(err, domain.ErrWorkerNotFound) {
|
||||||
|
api.WriteNotFound(w, r, "worker not found: "+workerID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
api.WriteInternalError(w, r, "failed to update heartbeat")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
api.WriteSuccess(w, r, map[string]any{
|
||||||
|
"worker_id": workerID,
|
||||||
|
"status": "ok",
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// Drain sets a worker to draining status.
|
// Drain sets a worker to draining status.
|
||||||
// POST /workers/{workerId}/drain
|
// POST /workers/{workerId}/drain
|
||||||
func (h *WorkersHandler) Drain(w http.ResponseWriter, r *http.Request) {
|
func (h *WorkersHandler) Drain(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user