feat: enterprise worker pool with HTTP sidecar pattern

Implements horizontally-scalable worker pool architecture:
- claudebox-sidecar: HTTP server for Claude Code, git, and SDLC ops
- rdev-worker: standalone worker binary polling rdev-api for tasks
- HTTP client adapter for sidecar communication
- HPA with custom Prometheus metrics for autoscaling
- ServiceMonitor for metrics scraping

Code review fixes applied:
- URL-encode query parameters in GitStatus (Critical #1)
- Remove unused shellQuote function (Critical #2)
- Use stdlib strings.Split/TrimSpace (Critical #3)
- Add version injection via ldflags (Warning #4)
- Add debug logging for swallowed git/sdlc errors (Warning #5, #6)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
jordan 2026-02-05 16:21:11 -07:00
parent 3b0779fbe8
commit 3b35900a2d
22 changed files with 6774 additions and 137 deletions

View File

@ -1,5 +1,5 @@
# rdev claudebox - Claude Code in a container # rdev claudebox - Claude Code in a container
# v0.4 - Git integration + SDLC CLI # v0.5 - HTTP sidecar mode (replaces kubectl exec)
# Build stage for Go binaries # Build stage for Go binaries
FROM golang:1.25-alpine AS builder FROM golang:1.25-alpine AS builder
@ -8,6 +8,7 @@ COPY go.mod go.sum ./
RUN go mod download RUN go mod download
COPY . . COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w" -o sdlc ./cmd/sdlc RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w" -o sdlc ./cmd/sdlc
RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w" -o claudebox-sidecar ./cmd/claudebox-sidecar
# Runtime stage # Runtime stage
FROM ubuntu:22.04 FROM ubuntu:22.04
@ -35,8 +36,9 @@ RUN curl -fsSL https://deb.nodesource.com/setup_20.x | bash - \
# Install Claude Code CLI # Install Claude Code CLI
RUN npm install -g @anthropic-ai/claude-code RUN npm install -g @anthropic-ai/claude-code
# Copy sdlc binary from builder stage # Copy Go binaries from builder stage
COPY --from=builder /build/sdlc /usr/local/bin/sdlc COPY --from=builder /build/sdlc /usr/local/bin/sdlc
COPY --from=builder /build/claudebox-sidecar /usr/local/bin/claudebox-sidecar
# Configure git for rdev-bot identity # Configure git for rdev-bot identity
RUN git config --global user.name "rdev-bot" \ RUN git config --global user.name "rdev-bot" \
@ -57,5 +59,8 @@ WORKDIR /workspace
RUN echo '#!/bin/bash\nclaude --version > /dev/null 2>&1' > /healthcheck.sh \ RUN echo '#!/bin/bash\nclaude --version > /dev/null 2>&1' > /healthcheck.sh \
&& chmod +x /healthcheck.sh && chmod +x /healthcheck.sh
# Keep container running (will exec into it) # Expose sidecar HTTP port
CMD ["tail", "-f", "/dev/null"] EXPOSE 8080
# Run claudebox-sidecar by default (HTTP server mode)
CMD ["claudebox-sidecar"]

31
Dockerfile.worker Normal file
View File

@ -0,0 +1,31 @@
# rdev-worker - Standalone worker for the rdev platform
# Runs as a standalone container with a claudebox sidecar for execution.
# Build stage
FROM golang:1.25-alpine AS builder
WORKDIR /build
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w" -o rdev-worker ./cmd/rdev-worker
# Runtime stage - minimal Alpine image
FROM alpine:3.19
# Install ca-certificates for HTTPS
RUN apk add --no-cache ca-certificates
# Copy worker binary
COPY --from=builder /build/rdev-worker /usr/local/bin/rdev-worker
# Create non-root user
RUN adduser -D -u 1000 worker
USER worker
# Default environment
ENV RDEV_API_URL="http://rdev-api.rdev.svc.cluster.local:8080"
ENV CLAUDEBOX_URL="http://localhost:8080"
ENV WORKER_POLL_INTERVAL="5s"
# Run worker
CMD ["rdev-worker"]

View File

@ -1,43 +1,928 @@
# App Vision Gaps # Orchard Studio: Gap Analysis
To realize **Orchard Studio** (the "Deploy First, Talk Later" UI), `rdev` needs to evolve from a CLI/Script-driven engine into a **Reactive Platform API**. This document maps the delta between current `rdev` capabilities and what Orchard Studio requires.
## 1. The Interactivity Gap (State Management) ## Current Foundation (What We Have)
* **Current State:** `tree-runner.sh` manages state in a local JSON file (`.checkpoints/`). The process is synchronous and blocking.
* **Vision Requirement:** The UI needs to query "What is the status of the current build?" asynchronously.
* **Gap:** We need to move the "Tree Runner" logic **into the `rdev-api`**.
* *Missing:* `GET /projects/{id}/operations` (List active builds/deploys).
* *Missing:* Database schema for `operations` (replacing local checkpoints).
## 2. The Feedback Loop Gap (Streaming) | Capability | Status | Location |
* **Current State:** We see logs in the terminal where `tree-runner` is running. |------------|--------|----------|
* **Vision Requirement:** The user sees "Designing Schema..." -> "Running Tests..." in the web UI. | SDLC Classifier | ✅ Complete | `internal/sdlc/classifier.go` |
* **Gap:** We need a **WebSocket / SSE** pipe from the Agent/CI -> `rdev-api` -> `orchard-studio`. | Feature State Machine | ✅ Complete | `internal/sdlc/` (10 phases, 31 rules) |
* *Missing:* `rdev-api` endpoint for agents to push progress updates (`POST /operations/{id}/log`). | Composable Templates | ✅ Complete | `internal/adapter/templates/` |
* *Missing:* Frontend subscription endpoint (`GET /operations/{id}/stream`). | Worker Pod Execution | ✅ Complete | `internal/worker/sdlc_executor.go` |
| Webhook Dispatcher | ✅ Complete | `internal/webhook/dispatcher.go` |
| Project Provisioning | ✅ Complete | K8s namespace, DNS, git repo |
| Database Provisioning | ✅ Complete | CockroachDB adapter |
| Tree Workflows | ✅ Proven | `cookbooks/trees/*.yaml` |
## 3. The "Draft Mode" Gap (Blueprint API) ---
* **Current State:** We define features by immediately calling `/sdlc/features` and `POST /builds`. It's "fire and forget".
* **Vision Requirement:** The user and Architect iterate on a plan *before* commitment.
* **Gap:** We need a staging area for requirements.
* *Missing:* `POST /projects/{id}/blueprint/draft`.
* *Missing:* `POST /projects/{id}/blueprint/commit` (Triggers the build).
## 4. The Template Gap (Genesis) ## Gap 0: Design Reference Capture & Processing
* **Current State:** We have a few raw templates (`go-api`, `astro-landing`).
* **Vision Requirement:** Rich "Seeds" (SaaS, Social, E-comm).
* **Gap:** Our templates are too primitive.
* *Missing:* A "Meta-Template" system that can combine `go-api` + `auth-pkg` + `postgres` + `react-admin` into a single "SaaS Starter" deployable.
## 5. The "Architect Persona" Gap **Current:** No mechanism for users to provide visual inspiration. Features are described purely in text.
* **Current State:** We prompt Claude with `/implement-feature`.
* **Vision Requirement:** An agent that *asks clarifying questions* instead of just coding.
* **Gap:** We lack the "Consultant" system prompt.
* *Missing:* `.claude/agents/architect.md`.
* *Missing:* A workflow where the API returns a *Question* to the user instead of a *Result*.
## Summary of Work **Required:** Users can provide URLs or screenshots as design references, which inform the Architect's questions and the Blueprint's design system section.
1. **Port Tree Runner to Go:** Move orchestration logic into `rdev-api`.
2. **Build Event Bus:** Implement SSE/Websockets for real-time logs. ### What's Missing
3. **Define Blueprint Resource:** Create DB tables for "Draft Features".
4. **Create Architect Agent:** Define the persona that interviews users. ```
┌─────────────────────────────────────────────────────────────────────────┐
│ CURRENT FLOW │
│ │
│ User: "Build a pricing page" │
│ Architect: *asks about data model, endpoints...*
│ (No visual context, design decisions are guesswork) │
└─────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────┐
│ REQUIRED FLOW │
│ │
│ User: "Build a pricing page like this" + [URL or screenshot] │
│ System: Captures screenshot, stores with Blueprint │
│ Architect: "I see a dark theme with 3 tiers..." → asks clarifying Qs │
│ Blueprint: Populates designSystem section with extracted tokens │
└─────────────────────────────────────────────────────────────────────────┘
```
### Two Input Types
| Input | Capture Method | Storage |
|-------|----------------|---------|
| **URL** | Playwright screenshots the page automatically | `/references/{blueprintId}/{refId}.png` |
| **Screenshot** | User uploads image (drag/drop, paste, file picker) | Same storage path |
### Implementation Required
1. **Reference Capture Service:**
- For URLs: Reuse `verify_executor.go` pattern (Playwright pod)
- For uploads: Standard file upload handling
- Store thumbnails alongside Blueprint
2. **Chat Endpoint Enhancement:**
- Accept `references[]` array in request body
- Process references before LLM call
- Include reference images in Architect prompt context
3. **Architect Prompt Updates:**
- Describe what it observes in natural language
- Ask clarifying questions about design intent
- Extract structured design tokens into Blueprint
4. **Blueprint Schema:**
- Add `references.items[]` array
- Add `sections.designSystem` section
- Track which references informed which design decisions
5. **Plan Pane Rendering:**
- Show reference thumbnails in UI
- Display extracted design tokens
- Allow user to add annotations
### Complexity: Medium
- URL capture reuses existing Playwright infrastructure
- File upload is standard pattern
- Main work is Architect prompt engineering for visual understanding
- LLM vision capabilities needed (Claude can see images natively)
---
## Gap 1: Blueprint Storage & Chat API
**Current:** Features are created via `POST /sdlc/features` with a complete spec. No iterative refinement.
**Required:** Multi-turn conversation that builds a Blueprint incrementally.
### What's Missing
```
┌─────────────────────────────────────────────────────────────────┐
│ CURRENT FLOW │
│ │
│ User writes spec → POST /sdlc/features → Feature created │
│ (one shot, no iteration) │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ REQUIRED FLOW │
│ │
│ User message → Architect responds + updates Blueprint → │
│ User message → Architect responds + updates Blueprint → │
│ ...repeat until ready... │
│ User: "build it" → Blueprint → SDLC Feature → Build │
└─────────────────────────────────────────────────────────────────┘
```
### Implementation Required
1. **Database Tables:**
- `blueprints` - stores structured Blueprint JSON
- `blueprint_messages` - conversation history with snapshots
2. **API Endpoints:**
- `POST /projects/{id}/blueprint/chat` - send message, get reply + updated blueprint
- `GET /projects/{id}/blueprints` - list blueprints
- `GET /projects/{id}/blueprints/{id}` - get specific blueprint
- `DELETE /projects/{id}/blueprints/{id}` - discard draft
3. **Service Layer:**
- `ArchitectService` - manages conversation, calls LLM, updates Blueprint
### Complexity: Medium
- Schema is defined (see app-vision.md)
- Standard CRUD + LLM integration
- Most work is in prompt engineering for Architect
---
## Gap 2: Architect Agent Persona
**Current:** We have coding agents (`/implement-feature`). They write code, not specs.
**Required:** An agent that asks questions, fills in a structured Blueprint, knows when to stop.
### What's Missing
```
┌─────────────────────────────────────────────────────────────────┐
│ CURRENT AGENTS │
│ │
│ User: "Add cat photos" │
│ Agent: *immediately writes code*
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ ARCHITECT AGENT │
│ │
│ User: "Add cat photos" │
│ Architect: "Should photos be public or friends-only?" │
│ User: "Public" │
│ Architect: "Got it. Do you want likes, comments, or neither?" │
│ ...continues until Blueprint is complete... │
└─────────────────────────────────────────────────────────────────┘
```
### Implementation Required
1. **System Prompt:**
- `.claude/agents/architect.md` - detailed persona
- Structured output format (reply + Blueprint JSON)
- Question strategy (when to ask vs assume)
2. **Structured Output Parsing:**
- LLM returns `{reply: string, blueprint: Blueprint}`
- Validate Blueprint against schema
- Handle partial updates (delta vs full replacement)
3. **Completeness Logic:**
- `isReadyToBuild(blueprint)` function
- Clear rules for when questions are resolved
- Override mechanism for user to force build
### Complexity: Medium-High
- Prompt engineering is iterative
- Structured output from LLMs can be fragile
- Need fallback handling for malformed responses
---
## Gap 3: Operation Tracking (Tree Runner in DB)
**Current:** Tree workflows run via shell script (`tree-runner.sh`). State in local JSON files.
**Required:** Operations tracked in database, queryable via API, streamable to UI.
### What's Missing
```
┌─────────────────────────────────────────────────────────────────┐
│ CURRENT │
│ │
│ ./tree-runner.sh slackpath-1.yaml │
│ → Runs in terminal │
│ → State in .checkpoints/slackpath-1.json │
│ → No API visibility │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ REQUIRED │
│ │
│ POST /operations/start {tree: "slackpath-1"} │
│ → Returns operation_id │
│ → State in operations table │
│ → GET /operations/{id}/stream returns SSE events │
└─────────────────────────────────────────────────────────────────┘
```
### Implementation Required
1. **Database Tables:**
- `operations` - tracks running/completed operations
- `operation_events` - event log for replay/streaming
2. **Service Layer:**
- `OrchestratorService` - manages operation lifecycle
- Port tree-runner logic from bash to Go
- Event emission during execution
3. **API Endpoints:**
- `POST /projects/{id}/operations` - start operation
- `GET /projects/{id}/operations/{id}` - get status
- `GET /projects/{id}/operations/{id}/stream` - SSE stream
4. **Worker Integration:**
- SDLC executor emits events as it progresses
- Events written to `operation_events` table
- SSE handler reads from table and streams
### Complexity: High
- Tree runner logic is non-trivial (dependencies, outputs, error handling)
- SSE streaming requires careful connection management
- Need to handle operation cancellation, resumption
---
## Gap 4: Real-Time Progress Streaming
**Current:** Webhooks fire on build complete. No per-step visibility.
**Required:** SSE stream showing "Designing schema... Writing handlers... Running tests..."
### What's Missing
```
┌─────────────────────────────────────────────────────────────────┐
│ CURRENT │
│ │
│ Build starts → ... silence ... → Webhook: "build complete" │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ REQUIRED │
│ │
│ Build starts → │
│ event: {"phase": "spec", "status": "complete"} │
│ event: {"phase": "design", "status": "in_progress"} │
│ event: {"phase": "design", "status": "complete"} │
│ event: {"phase": "implement", "progress": 0.5} │
│ ... │
│ event: {"status": "complete", "url": "..."} │
└─────────────────────────────────────────────────────────────────┘
```
### Implementation Required
1. **SDLC Executor Changes:**
- Emit events at phase transitions
- Emit progress within phases (task completion)
- Write events to `operation_events` table
2. **SSE Handler:**
- `GET /operations/{id}/stream`
- Long-lived connection
- Read events from DB (or Redis pub/sub)
- Handle client disconnection gracefully
3. **Event Types:**
```go
type OperationEvent struct {
Type string // "phase", "progress", "artifact", "error", "complete"
Phase string // "spec", "design", "implement", "test", "deploy"
Status string // "in_progress", "complete", "failed"
Message string // Human-readable
Progress float64 // 0.0 to 1.0 for granular progress
Timestamp time.Time
}
```
### Complexity: Medium
- SSE is straightforward in Go
- Main work is instrumenting SDLC executor
- Need to balance granularity vs noise
---
## Gap 5: Blueprint → SDLC Feature Conversion
**Current:** SDLC features are created manually with spec documents.
**Required:** Automated conversion from structured Blueprint to SDLC feature spec.
### What's Missing
```
┌─────────────────────────────────────────────────────────────────┐
│ CURRENT │
│ │
│ Human writes: spec.md with prose description │
│ → POST /sdlc/features │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ REQUIRED │
│ │
│ Blueprint JSON → Template rendering → spec.md │
│ → Automated POST /sdlc/features │
└─────────────────────────────────────────────────────────────────┘
```
### Implementation Required
1. **Spec Template:**
```markdown
# Feature: {{.Feature}}
## Summary
{{.Summary}}
## Data Model
{{range .Sections.DataModel.Entities}}
### {{.Name}}
| Field | Type |
|-------|------|
{{range .Fields}}| {{.Name}} | {{.Type}} |
{{end}}
{{end}}
## API Endpoints
{{range .Sections.APIEndpoints.Endpoints}}
- `{{.Method}} {{.Path}}` - {{.Description}}
{{end}}
## UI Components
{{range .Sections.UIComponents.Components}}
- **{{.Name}}**: {{.Purpose}}
{{end}}
## Assumptions
{{range .Assumptions}}
- {{.Assumption}}
{{end}}
```
2. **Conversion Service:**
- Takes Blueprint, renders spec.md
- Creates SDLC feature via existing API
- Links Blueprint to created feature (`built_feature_slug`)
### Complexity: Low
- Template rendering is straightforward
- SDLC feature creation already exists
- Main work is template design
---
## Gap 6: Frontend (Next.js Studio)
**Current:** No frontend. All interaction via API/CLI.
**Required:** Three-pane interface (Chat, Plan, Preview).
### What's Missing
Everything. This is a new application.
### Implementation Required
1. **Project Setup:**
- Next.js 14 with App Router
- Tailwind CSS for styling
- Authentication (integrate with rdev auth)
2. **Core Components:**
```
apps/studio/
├── app/
│ ├── page.tsx # Template selection
│ ├── projects/
│ │ └── [id]/
│ │ └── page.tsx # Three-pane workspace
│ └── api/ # Proxy to rdev-api
├── components/
│ ├── ChatPane.tsx
│ ├── PlanPane.tsx
│ ├── PreviewPane.tsx
│ ├── ActivityFeed.tsx
│ └── BuildProgress.tsx
└── lib/
├── api.ts # rdev-api client
└── sse.ts # SSE connection manager
```
3. **State Management:**
- Blueprint state (updated on each chat response)
- Operation state (updated via SSE)
- UI state (which pane is focused, etc.)
4. **Key Interactions:**
- Send chat message → receive reply + blueprint
- Click "Build It" → start operation → show progress
- Operation complete → refresh preview iframe
### Complexity: Medium
- Standard Next.js app
- SSE client requires careful handling
- Most complexity is in polish and UX
---
## Gap 7: Platform Service Infrastructure
**Current:** Projects manage their own integrations. No shared services, no credential management.
**Required:** A service catalog with provisioning, credential injection, and upgrade paths for existing projects.
### The "Upgrade" Problem
```
┌─────────────────────────────────────────────────────────────────┐
│ CURRENT │
│ │
│ Project created 3 months ago │
│ → No centralized logging │
│ → No analytics │
│ → Rolling your own email │
│ → No easy way to add platform services │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ REQUIRED │
│ │
│ POST /projects/{id}/services │
│ { "type": "logging", "provider": "loki" } │
│ │
│ → Provision credentials │
│ → Inject into K8s secrets │
│ → Create integration PR with config changes │
│ → Project now ships logs to centralized system │
└─────────────────────────────────────────────────────────────────┘
```
### Service Rollout Order
Build infrastructure with simplest service first, then add complexity:
| Order | Service | Why This Order |
|-------|---------|----------------|
| 1 | **Logging** | Pure infrastructure, no user-facing code changes |
| 2 | **Email** | Simple API calls, clear success/failure |
| 3 | **Stats** | Frontend SDK + backend events |
| 4 | **Auth** | Most complex (middleware, user model, protected routes) |
### Implementation Required
#### 1. Service Catalog
```yaml
# internal/platform/catalog.yaml
services:
logging:
description: "Centralized log aggregation"
providers:
loki:
name: "Grafana Loki"
credentials:
- LOKI_URL
- LOKI_TENANT_ID
integration:
go:
config_template: "loki-logger.go.tmpl"
env_example: ["LOKI_URL", "LOKI_TENANT_ID"]
node:
packages: ["pino", "pino-loki"]
config_template: "pino-loki.ts.tmpl"
email:
description: "Transactional email"
providers:
resend:
name: "Resend"
credentials:
- RESEND_API_KEY
integration:
go:
packages: ["github.com/resendlabs/resend-go"]
service_template: "email-service.go.tmpl"
node:
packages: ["resend"]
service_template: "email-client.ts.tmpl"
stats:
description: "Product analytics"
providers:
posthog:
name: "PostHog"
credentials:
- POSTHOG_API_KEY
- POSTHOG_HOST
integration:
go:
packages: ["github.com/posthog/posthog-go"]
node:
packages: ["posthog-js", "posthog-node"]
provider_template: "analytics-provider.tsx.tmpl"
auth:
description: "User authentication"
providers:
clerk:
name: "Clerk"
credentials:
- CLERK_SECRET_KEY
- NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY
integration:
node:
packages: ["@clerk/nextjs"]
middleware_template: "clerk-middleware.ts.tmpl"
provider_template: "clerk-provider.tsx.tmpl"
```
#### 2. Database Schema
```sql
-- Track which services a project uses
CREATE TABLE project_services (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
project_id UUID NOT NULL REFERENCES projects(id),
service_type TEXT NOT NULL, -- 'logging', 'email', 'stats', 'auth'
provider TEXT NOT NULL, -- 'loki', 'resend', 'posthog', 'clerk'
environment TEXT NOT NULL, -- 'staging', 'production', 'all'
-- Encrypted credentials
credentials_encrypted BYTEA,
-- Non-sensitive config
config JSONB NOT NULL DEFAULT '{}',
-- Status tracking
status TEXT NOT NULL DEFAULT 'provisioning',
-- provisioning → active → needs_update → deprovisioned
-- Integration tracking
integration_status TEXT DEFAULT 'pending',
-- pending → pr_created → integrated → needs_update
integration_pr_url TEXT,
integration_commit TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(project_id, service_type, environment)
);
```
#### 3. Provisioner Interface
```go
// internal/port/platform_provisioner.go
type PlatformProvisioner interface {
// Provision creates credentials for a project
Provision(ctx context.Context, req ProvisionRequest) (*ProvisionResult, error)
// Verify checks if credentials are still valid
Verify(ctx context.Context, projectID string, creds map[string]string) error
// Deprovision cleans up (optional, for account removal)
Deprovision(ctx context.Context, projectID string) error
}
type ProvisionRequest struct {
ProjectID uuid.UUID
ProjectName string
Environment string // "staging", "production"
}
type ProvisionResult struct {
Credentials map[string]string // Encrypted before storage
Config map[string]string // Non-sensitive config
}
```
#### 4. Service Addition API
```
POST /projects/{projectId}/services
{
"serviceType": "logging",
"provider": "loki" // Optional, uses platform default
}
Response:
{
"serviceId": "svc_abc123",
"status": "provisioning",
"integrationMethod": "pr", // or "direct"
"prUrl": null // Populated when PR is created
}
GET /projects/{projectId}/services/{serviceId}
{
"serviceId": "svc_abc123",
"serviceType": "logging",
"provider": "loki",
"status": "active",
"integrationStatus": "integrated",
"integrationCommit": "abc123...",
"credentials": {
"LOKI_URL": "[redacted]",
"LOKI_TENANT_ID": "project-xyz"
}
}
```
#### 5. Integration Flow
```
POST /projects/{id}/services {type: "logging", provider: "loki"}
┌─────────────────────────────────────────────────────────────────┐
│ 1. PROVISION │
│ │
│ LokiProvisioner.Provision() │
│ → Create tenant in Loki (or use shared with project prefix) │
│ → Generate credentials │
│ → Store encrypted in project_services │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 2. INJECT │
│ │
│ K8sSecretInjector.Inject() │
│ → Add LOKI_URL, LOKI_TENANT_ID to project's K8s secret │
│ → Trigger deployment restart to pick up new env vars │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 3. INTEGRATE │
│ │
│ IntegrationService.CreatePR() or .DirectCommit() │
│ → Clone project repo │
│ → Apply integration templates: │
│ • Update logger config to ship to Loki │
│ • Add env vars to .env.example │
│ • Update deployment to mount secrets │
│ → Create PR (or direct commit for new projects) │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 4. VERIFY │
│ │
│ After PR merge / deploy: │
│ → Check logs appearing in Loki │
│ → Update integration_status to "integrated" │
└─────────────────────────────────────────────────────────────────┘
```
### Complexity: High
- Service catalog is straightforward (YAML/DB)
- Each provisioner is unique (Loki vs Resend vs PostHog)
- Credential encryption and management needs care
- Integration templates need to handle Go + Node + various frameworks
- PR creation requires git operations
### Starting Point: Logging with Loki
```go
// internal/adapter/loki/provisioner.go
type LokiProvisioner struct {
lokiURL string
adminToken string // For tenant creation if using multi-tenant Loki
}
func (p *LokiProvisioner) Provision(ctx context.Context, req ProvisionRequest) (*ProvisionResult, error) {
// For single-tenant Loki, just create a unique label prefix
tenantID := fmt.Sprintf("project-%s", req.ProjectID)
return &ProvisionResult{
Credentials: map[string]string{
"LOKI_URL": p.lokiURL,
"LOKI_TENANT_ID": tenantID,
},
Config: map[string]string{
"service_name": req.ProjectName,
},
}, nil
}
```
---
## Gap 8: Dual Environment Support
**Current:** Single deployment per project. Main branch = production.
**Required:** Staging + Production environments. Build deploys to staging, "Publish" promotes to production.
### The Environment Model
```
┌─────────────────────────────────────────────────────────────────┐
│ Project: cool-project │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ STAGING │ │
│ │ staging.cool-project.threesix.ai │ │
│ │ │ │
│ │ • Where development happens │ │
│ │ • Preview pane shows this │ │
│ │ • "Build It" deploys here │ │
│ │ • May use test credentials for services │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ [Publish] │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ PRODUCTION │ │
│ │ cool-project.threesix.ai │ │
│ │ │ │
│ │ • User-facing, stable │ │
│ │ • Only updated via explicit "Publish" │ │
│ │ • Production credentials for services │ │
│ │ • Enabled after first publish │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
```
### Implementation Required
#### 1. DNS Changes
```go
// On project creation, create both records (prod may be placeholder)
CreateDNSRecord("staging.cool-project.threesix.ai", stagingIP)
CreateDNSRecord("cool-project.threesix.ai", prodIP) // Or placeholder until first publish
```
#### 2. K8s Deployment Model
```yaml
# Option A: Two deployments in same namespace
apiVersion: apps/v1
kind: Deployment
metadata:
name: cool-project-staging
namespace: cool-project
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: cool-project-production
namespace: cool-project
# Option B: Two namespaces (cleaner isolation)
# cool-project-staging namespace
# cool-project-production namespace
```
**Recommendation:** Same namespace, two deployments. Simpler to manage, secrets can be shared or scoped.
#### 3. Database Model
Two options:
**A. Same database, schema prefixes:**
```sql
-- Staging tables
staging_users, staging_posts, staging_...
-- Production tables
prod_users, prod_posts, prod_...
```
**B. Separate databases (cleaner):**
```
cool-project-staging (CockroachDB database)
cool-project-production (CockroachDB database)
```
**Recommendation:** Separate databases. Cleaner isolation, no risk of cross-env data access.
#### 4. Project Schema Updates
```sql
ALTER TABLE projects ADD COLUMN environments JSONB NOT NULL DEFAULT '{
"staging": {"enabled": true, "deployed_at": null},
"production": {"enabled": false, "deployed_at": null, "published_at": null}
}';
```
#### 5. Publish API
```
POST /projects/{projectId}/publish
{
"fromEnvironment": "staging", // Usually staging
"toEnvironment": "production"
}
Response:
{
"operationId": "op_xyz789",
"status": "publishing",
"streamUrl": "/operations/{operationId}/stream"
}
```
**Publish Flow:**
1. Validate staging is healthy
2. Provision production credentials for any services (if not exist)
3. Run migrations on production database
4. Deploy staging image to production deployment
5. Health check production
6. Update DNS if needed
7. Update project.environments.production
### Complexity: Medium
- DNS: Already have CloudflareAdapter, just create two records
- K8s: Straightforward deployment duplication
- Database: CockroachDB adapter supports multiple databases
- Main complexity is the publish flow coordination
### Defer Until After Gap 7
Dual environments can work with platform services, but we can build Gap 7 (services) first:
- Services provision for a single environment initially
- Then extend to environment-aware provisioning
- Then add the publish flow that syncs services to production
---
## Summary: Work Required
| Gap | Effort | Dependencies | Critical Path |
|-----|--------|--------------|---------------|
| 0. Design References | 2-3 days | Gap 1 (storage) | Yes (for design flows) |
| 1. Blueprint Storage | 2-3 days | None | Yes |
| 2. Architect Agent | 3-5 days | Gap 1 | Yes |
| 3. Operation Tracking | 4-6 days | None | Yes |
| 4. Progress Streaming | 2-3 days | Gap 3 | Yes |
| 5. Blueprint → SDLC | 1-2 days | Gap 1 | Yes |
| 6. Frontend | 5-7 days | Gaps 1-5 | Yes |
| 7. Platform Services | 5-8 days | None (can start now) | Parallel track |
| 8. Dual Environments | 3-5 days | Gap 7 | After services work |
**Total Estimate:** 4-5 weeks of focused work (Gaps 7-8 can parallel with 1-6)
**Service Rollout (within Gap 7):**
1. Logging (Loki) - 2 days
2. Email (Resend) - 2 days
3. Stats (PostHog) - 2 days
4. Auth (Clerk) - 3 days
**Note:** Gap 0 (Design References) can be implemented in parallel with Gap 2 (Architect Agent) since both involve Architect prompt engineering. The reference capture infrastructure (Gap 0) builds on Gap 1's storage layer.
### Critical Path
```
┌──► Gap 0 (References) ──┐
│ │
Gap 1 (Blueprint) ──┼──► Gap 2 (Architect) ───┼──► Gap 5 (Conversion)
│ │
│ └──► Gap 6 (Frontend)
│ ▲
Gap 3 (Operations) ─┴──► Gap 4 (Streaming) ────────┘
Parallel Track:
Gap 7 (Services) ──► Logging ──► Email ──► Stats ──► Auth
└──► Gap 8 (Environments) ──► Publish Flow
```
Gap 7 can start immediately and run parallel to the Studio work.
Gap 8 depends on Gap 7 for service credential handling per environment.
---
## Risk Assessment
| Risk | Likelihood | Impact | Mitigation |
|------|------------|--------|------------|
| Architect outputs malformed JSON | High | Medium | JSON schema validation, retry logic |
| SSE connections drop | Medium | Low | Client-side reconnection, event replay from DB |
| Blueprint schema too restrictive | Medium | Medium | Start minimal, add sections iteratively |
| LLM latency affects chat UX | Low | High | Stream partial responses, show typing indicator |
| Build failures leave broken state | Low | Medium | SDLC already handles partial state |
---
## What's NOT a Gap
These are already solved by the current rdev foundation:
- **Project provisioning** - K8s, DNS, git all work
- **Template seeding** - Composable monorepo templates
- **SDLC execution** - Classifier + worker + artifact tracking
- **CI/CD** - Woodpecker integration
- **Database provisioning** - CockroachDB adapter
- **Webhooks** - Event dispatcher with retry
The foundation is solid. The gaps are about **exposing** existing capabilities through a conversational UI, not rebuilding core functionality.

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,111 @@
// Package main provides the claudebox-sidecar HTTP server.
// This sidecar runs alongside Claude Code in worker pods, exposing HTTP endpoints
// for execute, git, and SDLC operations - replacing kubectl exec calls.
package main
import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/orchard9/rdev/internal/claudebox"
"github.com/orchard9/rdev/internal/envutil"
"github.com/orchard9/rdev/internal/logging"
)
func main() {
// Configure logging
logLevel := logging.LevelInfo
if envutil.GetEnvBool("DEBUG", false) {
logLevel = logging.LevelDebug
}
log := logging.New(logging.Config{
Level: logLevel,
Format: logging.FormatJSON,
})
// Configuration from environment
port := envutil.GetEnv("PORT", "8080")
workDir := envutil.GetEnv("WORKSPACE_DIR", "/workspace")
giteaToken := os.Getenv("GITEA_TOKEN") // Required for git push auth
gitUser := envutil.GetEnv("GIT_USER", "rdev-worker")
gitEmail := envutil.GetEnv("GIT_EMAIL", "worker@threesix.ai")
// Create server components
executor := claudebox.NewExecutor(workDir)
gitOps := claudebox.NewGitOperations(claudebox.GitOperationsConfig{
WorkDir: workDir,
GiteaToken: giteaToken,
GitUser: gitUser,
GitEmail: gitEmail,
Logger: log.Slog(),
})
sdlcRunner := claudebox.NewSDLCRunner(claudebox.SDLCRunnerConfig{
WorkDir: workDir,
Logger: log.Slog(),
})
// Create the server
server := claudebox.NewServer(claudebox.ServerConfig{
Executor: executor,
GitOps: gitOps,
SDLCRunner: sdlcRunner,
Logger: log.Slog(),
})
// Create router
r := chi.NewRouter()
r.Use(middleware.RequestID)
r.Use(middleware.RealIP)
r.Use(logging.Middleware(logging.MiddlewareConfig{
Logger: log,
}))
r.Use(middleware.Recoverer)
r.Use(middleware.Timeout(10 * time.Minute))
// Mount server routes
server.Mount(r)
// Create HTTP server
addr := fmt.Sprintf(":%s", port)
httpServer := &http.Server{
Addr: addr,
Handler: r,
ReadTimeout: 30 * time.Second,
WriteTimeout: 15 * time.Minute, // Long timeout for streaming responses
IdleTimeout: 60 * time.Second,
}
// Start server in goroutine
go func() {
log.Info("starting claudebox-sidecar", "addr", addr, "workDir", workDir)
if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Error("server error", logging.FieldError, err)
os.Exit(1)
}
}()
// Wait for shutdown signal
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Info("shutting down server")
// Graceful shutdown with timeout
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := httpServer.Shutdown(ctx); err != nil {
log.Error("server shutdown error", logging.FieldError, err)
os.Exit(1)
}
log.Info("server stopped")
}

266
cmd/rdev-worker/main.go Normal file
View File

@ -0,0 +1,266 @@
// Package main provides the standalone rdev-worker binary.
// This worker runs as a separate container alongside a claudebox sidecar,
// polling the rdev-api for tasks and executing them via HTTP calls to the sidecar.
package main
import (
"context"
"os"
"os/signal"
"strings"
"syscall"
"time"
claudeboxclient "github.com/orchard9/rdev/internal/adapter/claudebox"
"github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/envutil"
"github.com/orchard9/rdev/internal/logging"
"github.com/orchard9/rdev/internal/worker"
)
// version is set via ldflags at build time:
// go build -ldflags "-X main.version=v1.0.0" ./cmd/rdev-worker
var version = "dev"
func main() {
// Configure logging
logLevel := logging.LevelInfo
if envutil.GetEnvBool("DEBUG", false) {
logLevel = logging.LevelDebug
}
log := logging.New(logging.Config{
Level: logLevel,
Format: logging.FormatJSON,
}).WithWorker("rdev-worker")
// Configuration from environment
cfg := loadConfig()
log.Info("starting rdev-worker",
"worker_id", cfg.WorkerID,
"rdev_api_url", cfg.RdevAPIURL,
"claudebox_url", cfg.ClaudeboxURL,
"poll_interval", cfg.PollInterval,
)
// Create API client for rdev-api
apiClient := worker.NewAPIClient(worker.APIClientConfig{
BaseURL: cfg.RdevAPIURL,
APIKey: cfg.APIKey,
Timeout: 30 * time.Second,
})
// Create claudebox client for sidecar
claudeboxClient := claudeboxclient.NewClient(claudeboxclient.ClientConfig{
BaseURL: cfg.ClaudeboxURL,
Timeout: 15 * time.Minute,
})
// Create context with cancellation
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Register worker
hostname, _ := os.Hostname()
if err := apiClient.Register(ctx, &worker.RegisterRequest{
ID: cfg.WorkerID,
Hostname: hostname,
Version: version,
Capabilities: cfg.Capabilities,
}); err != nil {
log.Error("failed to register worker", logging.FieldError, err)
os.Exit(1)
}
log.Info("worker registered", "worker_id", cfg.WorkerID)
// Create executors
buildExecutor := worker.NewHTTPBuildExecutor(worker.HTTPBuildExecutorConfig{
ClaudeboxClient: claudeboxClient,
WorkDir: "/workspace",
})
sdlcExecutor := worker.NewHTTPSDLCTaskExecutor(worker.HTTPSDLCTaskExecutorConfig{
ClaudeboxClient: claudeboxClient,
WorkDir: "/workspace",
})
// Start heartbeat loop
go runHeartbeat(ctx, apiClient, cfg.WorkerID, cfg.HeartbeatInterval, log)
// Start work loop
go runWorkLoop(ctx, apiClient, buildExecutor, sdlcExecutor, cfg, log)
// Wait for shutdown signal
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Info("shutting down worker")
cancel()
// Give ongoing work a chance to complete
time.Sleep(5 * time.Second)
log.Info("worker stopped")
}
// Config holds worker configuration.
type Config struct {
WorkerID string
RdevAPIURL string
ClaudeboxURL string
APIKey string
PollInterval time.Duration
HeartbeatInterval time.Duration
TaskTimeout time.Duration
Capabilities []string
}
// loadConfig loads configuration from environment variables.
func loadConfig() *Config {
hostname, _ := os.Hostname()
workerID := envutil.GetEnv("WORKER_ID", hostname)
return &Config{
WorkerID: workerID,
RdevAPIURL: envutil.GetEnv("RDEV_API_URL", "http://rdev-api.rdev.svc.cluster.local:8080"),
ClaudeboxURL: envutil.GetEnv("CLAUDEBOX_URL", "http://localhost:8080"),
APIKey: os.Getenv("RDEV_API_KEY"),
PollInterval: parseDuration(envutil.GetEnv("WORKER_POLL_INTERVAL", "5s"), 5*time.Second),
HeartbeatInterval: parseDuration(envutil.GetEnv("WORKER_HEARTBEAT_INTERVAL", "30s"), 30*time.Second),
TaskTimeout: parseDuration(envutil.GetEnv("WORKER_TASK_TIMEOUT", "15m"), 15*time.Minute),
Capabilities: parseCapabilities(os.Getenv("WORKER_CAPABILITIES")),
}
}
// parseDuration parses a duration string with a default fallback.
func parseDuration(s string, defaultVal time.Duration) time.Duration {
d, err := time.ParseDuration(s)
if err != nil {
return defaultVal
}
return d
}
// parseCapabilities parses a comma-separated list of capabilities.
func parseCapabilities(s string) []string {
if s == "" {
return []string{"build", "sdlc"}
}
var caps []string
for _, c := range strings.Split(s, ",") {
c = strings.TrimSpace(c)
if c != "" {
caps = append(caps, c)
}
}
return caps
}
// runHeartbeat runs the heartbeat loop.
func runHeartbeat(ctx context.Context, client *worker.APIClient, workerID string, interval time.Duration, log *logging.Logger) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := client.Heartbeat(ctx, workerID); err != nil {
log.Warn("heartbeat failed", logging.FieldError, err)
}
}
}
}
// runWorkLoop runs the main work polling loop.
func runWorkLoop(
ctx context.Context,
client *worker.APIClient,
buildExecutor *worker.HTTPBuildExecutor,
sdlcExecutor *worker.HTTPSDLCTaskExecutor,
cfg *Config,
log *logging.Logger,
) {
ticker := time.NewTicker(cfg.PollInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// Try to claim a task
task, err := client.ClaimTask(ctx, cfg.WorkerID)
if err != nil {
log.Warn("failed to claim task", logging.FieldError, err)
continue
}
if task == nil {
// No tasks available
continue
}
log.Info("task claimed",
"task_id", task.ID,
logging.FieldProjectID, task.ProjectID,
"type", task.Type,
)
// Execute the task
executeTask(ctx, client, buildExecutor, sdlcExecutor, task, cfg, log)
}
}
}
// executeTask executes a single task.
func executeTask(
ctx context.Context,
client *worker.APIClient,
buildExecutor *worker.HTTPBuildExecutor,
sdlcExecutor *worker.HTTPSDLCTaskExecutor,
task *domain.WorkTask,
cfg *Config,
log *logging.Logger,
) {
// Create task context with timeout
taskCtx, cancel := context.WithTimeout(ctx, cfg.TaskTimeout)
defer cancel()
var result *domain.BuildResult
switch task.Type {
case domain.WorkTaskTypeBuild:
result = buildExecutor.Execute(taskCtx, task)
case domain.WorkTaskTypeSDLC:
result = sdlcExecutor.Execute(taskCtx, task)
default:
result = &domain.BuildResult{
Success: false,
Error: "unsupported task type: " + string(task.Type),
}
}
// Report result back to API
if result.Success {
if err := client.CompleteTask(ctx, cfg.WorkerID, task.ID, result); err != nil {
log.Error("failed to complete task", "task_id", task.ID, logging.FieldError, err)
} else {
log.Info("task completed",
"task_id", task.ID,
"duration_ms", result.DurationMs,
)
}
} else {
if err := client.FailTask(ctx, cfg.WorkerID, task.ID, result.Error, result.Output, result.DurationMs); err != nil {
log.Error("failed to report task failure", "task_id", task.ID, logging.FieldError, err)
} else {
log.Warn("task failed",
"task_id", task.ID,
"error", result.Error,
"duration_ms", result.DurationMs,
)
}
}
}

View File

@ -0,0 +1,51 @@
# HorizontalPodAutoscaler for rdev-worker based on queue depth.
# Scales workers up when pending tasks accumulate, scales down when queue drains.
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: rdev-worker
namespace: rdev
labels:
app.kubernetes.io/name: rdev-worker
app.kubernetes.io/part-of: rdev
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: rdev-worker
minReplicas: 1
maxReplicas: 10
metrics:
# Scale based on pending tasks in the work queue
- type: External
external:
metric:
name: rdev_pending_tasks
target:
type: AverageValue
# Target 2 pending tasks per worker
# With 2 workers and 4 pending, we'd scale up
averageValue: "2"
behavior:
# Scale up quickly when work accumulates
scaleUp:
stabilizationWindowSeconds: 60 # Wait 1 minute before scaling up again
policies:
- type: Pods
value: 2 # Add up to 2 pods at a time
periodSeconds: 60
- type: Percent
value: 100 # Or double the current count
periodSeconds: 60
selectPolicy: Max
# Scale down slowly to avoid thrashing
scaleDown:
stabilizationWindowSeconds: 300 # Wait 5 minutes before scaling down
policies:
- type: Pods
value: 1 # Remove 1 pod at a time
periodSeconds: 120
selectPolicy: Min

View File

@ -0,0 +1,78 @@
# Prometheus Adapter rules for exposing rdev metrics for HPA.
# These rules make rdev_pending_tasks available as an external metric.
apiVersion: v1
kind: ConfigMap
metadata:
name: prometheus-adapter-config
namespace: monitoring # Adjust to match your prometheus-adapter namespace
labels:
app.kubernetes.io/name: prometheus-adapter
app.kubernetes.io/part-of: rdev
data:
config.yaml: |
# Default rules from prometheus-adapter
rules:
- seriesQuery: '{__name__=~"^container_.*",container!="POD",namespace!="",pod!=""}'
seriesFilters: []
resources:
overrides:
namespace:
resource: namespace
pod:
resource: pod
name:
matches: ^container_(.*)_seconds_total$
as: ""
metricsQuery: sum(rate(<<.Series>>{<<.LabelMatchers>>,container!="POD"}[5m])) by (<<.GroupBy>>)
- seriesQuery: '{__name__=~"^container_.*",container!="POD",namespace!="",pod!=""}'
seriesFilters:
- isNot: ^container_.*_seconds_total$
resources:
overrides:
namespace:
resource: namespace
pod:
resource: pod
name:
matches: ^container_(.*)_total$
as: ""
metricsQuery: sum(rate(<<.Series>>{<<.LabelMatchers>>,container!="POD"}[5m])) by (<<.GroupBy>>)
- seriesQuery: '{__name__=~"^container_.*",container!="POD",namespace!="",pod!=""}'
seriesFilters:
- isNot: ^container_.*_total$
resources:
overrides:
namespace:
resource: namespace
pod:
resource: pod
name:
matches: ^container_(.*)$
as: ""
metricsQuery: sum(<<.Series>>{<<.LabelMatchers>>,container!="POD"}) by (<<.GroupBy>>)
# rdev external metrics for HPA
externalRules:
- seriesQuery: 'rdev_work_queue_pending_tasks'
resources:
namespaced: false
name:
matches: "rdev_work_queue_pending_tasks"
as: "rdev_pending_tasks"
metricsQuery: sum(rdev_work_queue_pending_tasks)
- seriesQuery: 'rdev_work_queue_running_tasks'
resources:
namespaced: false
name:
matches: "rdev_work_queue_running_tasks"
as: "rdev_running_tasks"
metricsQuery: sum(rdev_work_queue_running_tasks)
- seriesQuery: 'rdev_workers_idle'
resources:
namespaced: false
name:
matches: "rdev_workers_idle"
as: "rdev_idle_workers"
metricsQuery: sum(rdev_workers_idle)

View File

@ -0,0 +1,169 @@
# Standalone worker deployment with claudebox sidecar.
# Workers poll rdev-api for tasks and execute them via HTTP calls to the local sidecar.
apiVersion: apps/v1
kind: Deployment
metadata:
name: rdev-worker
namespace: rdev
labels:
app.kubernetes.io/name: rdev-worker
app.kubernetes.io/part-of: rdev
spec:
replicas: 2
selector:
matchLabels:
app: rdev-worker
template:
metadata:
labels:
app: rdev-worker
app.kubernetes.io/name: rdev-worker
app.kubernetes.io/part-of: rdev
rdev.orchard9.ai/role: worker
spec:
containers:
# Main worker container - polls for tasks and orchestrates execution
- name: worker
image: ghcr.io/orchard9/rdev-worker:latest
imagePullPolicy: Always
env:
- name: RDEV_API_URL
value: "http://rdev-api.rdev.svc.cluster.local:8080"
- name: CLAUDEBOX_URL
value: "http://localhost:8080"
- name: RDEV_API_KEY
valueFrom:
secretKeyRef:
name: rdev-worker-credentials
key: api-key
- name: WORKER_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: WORKER_POLL_INTERVAL
value: "5s"
- name: WORKER_HEARTBEAT_INTERVAL
value: "30s"
- name: WORKER_TASK_TIMEOUT
value: "15m"
- name: WORKER_CAPABILITIES
value: "build,sdlc"
resources:
requests:
cpu: "100m"
memory: "128Mi"
limits:
cpu: "500m"
memory: "256Mi"
livenessProbe:
exec:
command:
- test
- -f
- /usr/local/bin/rdev-worker
initialDelaySeconds: 5
periodSeconds: 60
# Claudebox sidecar - provides Claude Code execution via HTTP
- name: claudebox
image: ghcr.io/orchard9/rdev-claudebox:latest
imagePullPolicy: Always
env:
- name: PORT
value: "8080"
- name: WORKSPACE_DIR
value: "/workspace"
- name: GITEA_TOKEN
valueFrom:
secretKeyRef:
name: rdev-worker-credentials
key: gitea-token
optional: true
- name: GIT_USER
value: "rdev-worker"
- name: GIT_EMAIL
value: "worker@threesix.ai"
ports:
- name: http
containerPort: 8080
resources:
requests:
cpu: "500m"
memory: "1Gi"
limits:
cpu: "2"
memory: "4Gi"
volumeMounts:
- name: workspace
mountPath: /workspace
- name: claude-config
mountPath: /root/.claude
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 10
periodSeconds: 30
readinessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 5
periodSeconds: 10
volumes:
# EmptyDir for workspace - ephemeral per-pod
- name: workspace
emptyDir:
sizeLimit: 10Gi
# Shared Claude config volume for authentication
# Uses the same PVC as the claudebox statefulset
- name: claude-config
persistentVolumeClaim:
claimName: claudebox-claude-config
imagePullSecrets:
- name: ghcr-secret
---
# Secret for worker credentials
apiVersion: v1
kind: Secret
metadata:
name: rdev-worker-credentials
namespace: rdev
labels:
app.kubernetes.io/name: rdev-worker
app.kubernetes.io/part-of: rdev
type: Opaque
stringData:
# API key for workers to authenticate with rdev-api
# Create with: kubectl create secret generic rdev-worker-credentials --from-literal=api-key=<key> --from-literal=gitea-token=<token>
api-key: "placeholder-replace-me"
gitea-token: "placeholder-replace-me"
---
# Service for accessing worker metrics (optional)
apiVersion: v1
kind: Service
metadata:
name: rdev-worker
namespace: rdev
labels:
app.kubernetes.io/name: rdev-worker
app.kubernetes.io/part-of: rdev
spec:
selector:
app: rdev-worker
ports:
- port: 8080
name: claudebox
targetPort: 8080

View File

@ -0,0 +1,46 @@
# ServiceMonitor for scraping worker metrics with Prometheus.
# Requires Prometheus Operator to be installed.
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: rdev-worker
namespace: rdev
labels:
app.kubernetes.io/name: rdev-worker
app.kubernetes.io/part-of: rdev
release: prometheus # Matches Prometheus Operator label selector
spec:
selector:
matchLabels:
app.kubernetes.io/name: rdev-worker
namespaceSelector:
matchNames:
- rdev
endpoints:
- port: claudebox
path: /metrics
interval: 30s
scrapeTimeout: 10s
---
# ServiceMonitor for rdev-api (queue metrics)
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: rdev-api
namespace: rdev
labels:
app.kubernetes.io/name: rdev-api
app.kubernetes.io/part-of: rdev
release: prometheus
spec:
selector:
matchLabels:
app.kubernetes.io/name: rdev-api
namespaceSelector:
matchNames:
- rdev
endpoints:
- port: http
path: /metrics
interval: 30s
scrapeTimeout: 10s

View File

@ -0,0 +1,394 @@
// Package claudebox provides an HTTP client for the claudebox sidecar.
// This client is used by standalone workers to communicate with the local
// claudebox sidecar instead of using kubectl exec.
package claudebox
import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
)
// Client is an HTTP client for the claudebox sidecar.
type Client struct {
baseURL string
httpClient *http.Client
}
// ClientConfig holds configuration for the claudebox client.
type ClientConfig struct {
// BaseURL is the base URL of the claudebox sidecar (e.g., "http://localhost:8080").
BaseURL string
// Timeout is the default request timeout.
Timeout time.Duration
}
// NewClient creates a new claudebox client.
func NewClient(cfg ClientConfig) *Client {
if cfg.Timeout == 0 {
cfg.Timeout = 10 * time.Minute
}
return &Client{
baseURL: strings.TrimSuffix(cfg.BaseURL, "/"),
httpClient: &http.Client{
Timeout: cfg.Timeout,
},
}
}
// HealthResponse is the health check response.
type HealthResponse struct {
Status string `json:"status"`
Timestamp string `json:"timestamp"`
WorkDir string `json:"work_dir"`
}
// Health checks if the claudebox sidecar is healthy.
func (c *Client) Health(ctx context.Context) (*HealthResponse, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+"/health", nil)
if err != nil {
return nil, fmt.Errorf("create request: %w", err)
}
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("health check: %w", err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("health check returned status %d", resp.StatusCode)
}
var result HealthResponse
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("decode response: %w", err)
}
return &result, nil
}
// ExecuteRequest is the request to execute Claude Code.
type ExecuteRequest struct {
Prompt string `json:"prompt"`
AllowedTools []string `json:"allowed_tools,omitempty"`
WorkingDir string `json:"working_dir,omitempty"`
Timeout int `json:"timeout_seconds,omitempty"` // seconds
Metadata map[string]string `json:"metadata,omitempty"`
}
// ExecuteResponse is the response from executing Claude Code.
type ExecuteResponse struct {
Success bool `json:"success"`
Output string `json:"output"`
ExitCode int `json:"exit_code"`
DurationMs int64 `json:"duration_ms"`
Error string `json:"error,omitempty"`
SessionID string `json:"session_id,omitempty"`
FinalOutput string `json:"final_output,omitempty"`
Artifacts map[string]string `json:"artifacts,omitempty"`
}
// Execute runs Claude Code and returns the complete result.
func (c *Client) Execute(ctx context.Context, req *ExecuteRequest) (*ExecuteResponse, error) {
body, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("marshal request: %w", err)
}
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/execute", bytes.NewReader(body))
if err != nil {
return nil, fmt.Errorf("create request: %w", err)
}
httpReq.Header.Set("Content-Type", "application/json")
resp, err := c.httpClient.Do(httpReq)
if err != nil {
return nil, fmt.Errorf("execute: %w", err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("execute returned status %d: %s", resp.StatusCode, string(bodyBytes))
}
var result ExecuteResponse
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("decode response: %w", err)
}
return &result, nil
}
// StreamEvent is an SSE event from streaming execution.
type StreamEvent struct {
Type string `json:"type"`
Content string `json:"content,omitempty"`
Stream string `json:"stream,omitempty"`
ToolName string `json:"tool_name,omitempty"`
Data map[string]any `json:"data,omitempty"`
Timestamp string `json:"timestamp"`
}
// StreamEventHandler is called for each event during streaming execution.
type StreamEventHandler func(StreamEvent)
// ExecuteStream runs Claude Code and streams events to the handler.
func (c *Client) ExecuteStream(ctx context.Context, req *ExecuteRequest, handler StreamEventHandler) error {
body, err := json.Marshal(req)
if err != nil {
return fmt.Errorf("marshal request: %w", err)
}
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/execute/stream", bytes.NewReader(body))
if err != nil {
return fmt.Errorf("create request: %w", err)
}
httpReq.Header.Set("Content-Type", "application/json")
httpReq.Header.Set("Accept", "text/event-stream")
resp, err := c.httpClient.Do(httpReq)
if err != nil {
return fmt.Errorf("execute stream: %w", err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(resp.Body)
return fmt.Errorf("execute stream returned status %d: %s", resp.StatusCode, string(bodyBytes))
}
// Parse SSE events
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Text()
if !strings.HasPrefix(line, "data: ") {
continue
}
data := strings.TrimPrefix(line, "data: ")
if data == "" {
continue
}
var event StreamEvent
if err := json.Unmarshal([]byte(data), &event); err != nil {
continue // Skip malformed events
}
handler(event)
}
if err := scanner.Err(); err != nil {
return fmt.Errorf("read stream: %w", err)
}
return nil
}
// GitCloneRequest is the request to clone a repository.
type GitCloneRequest struct {
CloneURL string `json:"clone_url"`
WorkDir string `json:"work_dir,omitempty"`
}
// GitCloneResponse is the response from cloning.
type GitCloneResponse struct {
Success bool `json:"success"`
Cloned bool `json:"cloned"`
Error string `json:"error,omitempty"`
}
// GitClone clones or updates a git repository.
func (c *Client) GitClone(ctx context.Context, cloneURL, workDir string) (*GitCloneResponse, error) {
req := GitCloneRequest{
CloneURL: cloneURL,
WorkDir: workDir,
}
body, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("marshal request: %w", err)
}
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/git/clone", bytes.NewReader(body))
if err != nil {
return nil, fmt.Errorf("create request: %w", err)
}
httpReq.Header.Set("Content-Type", "application/json")
resp, err := c.httpClient.Do(httpReq)
if err != nil {
return nil, fmt.Errorf("git clone: %w", err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("git clone returned status %d: %s", resp.StatusCode, string(bodyBytes))
}
var result GitCloneResponse
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("decode response: %w", err)
}
return &result, nil
}
// GitCommitAndPushRequest is the request to commit and push changes.
type GitCommitAndPushRequest struct {
Message string `json:"message"`
Push bool `json:"push"`
WorkDir string `json:"work_dir,omitempty"`
}
// GitCommitAndPushResponse is the response from commit and push.
type GitCommitAndPushResponse struct {
Success bool `json:"success"`
HasChanges bool `json:"has_changes"`
CommitSHA string `json:"commit_sha,omitempty"`
FilesChanged []string `json:"files_changed,omitempty"`
Pushed bool `json:"pushed"`
Error string `json:"error,omitempty"`
}
// GitCommitAndPush commits and optionally pushes changes.
func (c *Client) GitCommitAndPush(ctx context.Context, message string, push bool, workDir string) (*GitCommitAndPushResponse, error) {
req := GitCommitAndPushRequest{
Message: message,
Push: push,
WorkDir: workDir,
}
body, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("marshal request: %w", err)
}
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/git/commit-and-push", bytes.NewReader(body))
if err != nil {
return nil, fmt.Errorf("create request: %w", err)
}
httpReq.Header.Set("Content-Type", "application/json")
resp, err := c.httpClient.Do(httpReq)
if err != nil {
return nil, fmt.Errorf("git commit: %w", err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("git commit returned status %d: %s", resp.StatusCode, string(bodyBytes))
}
var result GitCommitAndPushResponse
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("decode response: %w", err)
}
return &result, nil
}
// GitStatusResponse is the response from git status.
type GitStatusResponse struct {
IsRepo bool `json:"is_repo"`
HasChanges bool `json:"has_changes"`
ChangedFiles []string `json:"changed_files,omitempty"`
Branch string `json:"branch,omitempty"`
Error string `json:"error,omitempty"`
}
// GitStatus returns the git status of the workspace.
func (c *Client) GitStatus(ctx context.Context, workDir string) (*GitStatusResponse, error) {
reqURL := c.baseURL + "/git/status"
if workDir != "" {
reqURL += "?work_dir=" + url.QueryEscape(workDir)
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil)
if err != nil {
return nil, fmt.Errorf("create request: %w", err)
}
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("git status: %w", err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("git status returned status %d: %s", resp.StatusCode, string(bodyBytes))
}
var result GitStatusResponse
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("decode response: %w", err)
}
return &result, nil
}
// SDLCRequest is the request to run an SDLC command.
type SDLCRequest struct {
Command string `json:"command"`
Args []string `json:"args,omitempty"`
WorkDir string `json:"work_dir,omitempty"`
}
// SDLCResponse is the response from running an SDLC command.
type SDLCResponse struct {
Success bool `json:"success"`
Output string `json:"output"`
Data json.RawMessage `json:"data,omitempty"`
Error string `json:"error,omitempty"`
}
// RunSDLC executes an SDLC CLI command.
func (c *Client) RunSDLC(ctx context.Context, command string, args []string, workDir string) (*SDLCResponse, error) {
req := SDLCRequest{
Command: command,
Args: args,
WorkDir: workDir,
}
body, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("marshal request: %w", err)
}
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/sdlc", bytes.NewReader(body))
if err != nil {
return nil, fmt.Errorf("create request: %w", err)
}
httpReq.Header.Set("Content-Type", "application/json")
resp, err := c.httpClient.Do(httpReq)
if err != nil {
return nil, fmt.Errorf("sdlc: %w", err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("sdlc returned status %d: %s", resp.StatusCode, string(bodyBytes))
}
var result SDLCResponse
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("decode response: %w", err)
}
return &result, nil
}

View File

@ -0,0 +1,333 @@
package claudebox
import (
"bufio"
"context"
"fmt"
"io"
"os/exec"
"strings"
"sync"
"time"
)
// Default allowed tools for Claude Code execution.
var defaultAllowedTools = []string{
"Bash", "Edit", "Write", "Read", "Glob", "Grep", "Task", "WebFetch", "WebSearch",
}
// Executor runs Claude Code locally in the container.
type Executor struct {
workDir string
}
// NewExecutor creates a new local executor.
func NewExecutor(workDir string) *Executor {
return &Executor{
workDir: workDir,
}
}
// ExecuteResult contains the result of a Claude Code execution.
type ExecuteResult struct {
Success bool
Output string
ExitCode int
DurationMs int64
Error error
SessionID string
FinalOutput string
}
// Execute runs Claude Code and returns the complete result.
func (e *Executor) Execute(ctx context.Context, req *ExecuteRequest) *ExecuteResult {
var output strings.Builder
start := time.Now()
result := &ExecuteResult{}
// Apply timeout if specified
if req.Timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Duration(req.Timeout)*time.Second)
defer cancel()
}
// Build command args
args := e.buildArgs(req)
// Execute claude command
cmd := exec.CommandContext(ctx, "claude", args...)
// Get working directory
workDir := req.WorkingDir
if workDir == "" {
workDir = e.workDir
}
cmd.Dir = workDir
// Capture output
stdout, err := cmd.StdoutPipe()
if err != nil {
result.Error = fmt.Errorf("stdout pipe: %w", err)
result.DurationMs = time.Since(start).Milliseconds()
return result
}
stderr, err := cmd.StderrPipe()
if err != nil {
result.Error = fmt.Errorf("stderr pipe: %w", err)
result.DurationMs = time.Since(start).Milliseconds()
return result
}
if err := cmd.Start(); err != nil {
result.Error = fmt.Errorf("start: %w", err)
result.DurationMs = time.Since(start).Milliseconds()
return result
}
// Read output
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
scanner := bufio.NewScanner(stdout)
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, 1024*1024)
for scanner.Scan() {
output.WriteString(scanner.Text())
output.WriteString("\n")
}
}()
var stderrOutput strings.Builder
go func() {
defer wg.Done()
scanner := bufio.NewScanner(stderr)
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, 1024*1024)
for scanner.Scan() {
stderrOutput.WriteString(scanner.Text())
stderrOutput.WriteString("\n")
}
}()
wg.Wait()
cmdErr := cmd.Wait()
result.DurationMs = time.Since(start).Milliseconds()
result.Output = output.String()
result.FinalOutput = output.String()
if cmdErr != nil {
if exitErr, ok := cmdErr.(*exec.ExitError); ok {
result.ExitCode = exitErr.ExitCode()
} else {
result.ExitCode = 1
result.Error = cmdErr
}
// Append stderr to error message
if stderrOutput.Len() > 0 {
if result.Error != nil {
result.Error = fmt.Errorf("%w\nstderr: %s", result.Error, stderrOutput.String())
} else {
result.Error = fmt.Errorf("stderr: %s", stderrOutput.String())
}
}
} else {
result.Success = true
}
return result
}
// StreamEventHandler is called for each event during streaming execution.
type StreamEventHandler func(StreamEvent)
// ExecuteStream runs Claude Code and streams events to the handler.
func (e *Executor) ExecuteStream(ctx context.Context, req *ExecuteRequest, handler StreamEventHandler) *ExecuteResult {
start := time.Now()
result := &ExecuteResult{}
// Apply timeout if specified
if req.Timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Duration(req.Timeout)*time.Second)
defer cancel()
}
// Build command args with stream-json output
args := e.buildStreamArgs(req)
cmd := exec.CommandContext(ctx, "claude", args...)
workDir := req.WorkingDir
if workDir == "" {
workDir = e.workDir
}
cmd.Dir = workDir
stdout, err := cmd.StdoutPipe()
if err != nil {
result.Error = fmt.Errorf("stdout pipe: %w", err)
result.DurationMs = time.Since(start).Milliseconds()
return result
}
stderr, err := cmd.StderrPipe()
if err != nil {
result.Error = fmt.Errorf("stderr pipe: %w", err)
result.DurationMs = time.Since(start).Milliseconds()
return result
}
if err := cmd.Start(); err != nil {
result.Error = fmt.Errorf("start: %w", err)
result.DurationMs = time.Since(start).Milliseconds()
return result
}
// Emit started event
handler(StreamEvent{
Type: "started",
Timestamp: time.Now().UTC().Format(time.RFC3339),
})
// Stream output
var wg sync.WaitGroup
var output strings.Builder
wg.Add(2)
go func() {
defer wg.Done()
e.streamOutput(stdout, "stdout", handler, &output)
}()
go func() {
defer wg.Done()
e.streamStderr(stderr, handler)
}()
wg.Wait()
cmdErr := cmd.Wait()
result.DurationMs = time.Since(start).Milliseconds()
result.Output = output.String()
result.FinalOutput = output.String()
if cmdErr != nil {
if exitErr, ok := cmdErr.(*exec.ExitError); ok {
result.ExitCode = exitErr.ExitCode()
} else {
result.ExitCode = 1
result.Error = cmdErr
}
handler(StreamEvent{
Type: "failed",
Content: cmdErr.Error(),
Timestamp: time.Now().UTC().Format(time.RFC3339),
})
} else {
result.Success = true
handler(StreamEvent{
Type: "completed",
Timestamp: time.Now().UTC().Format(time.RFC3339),
Data: map[string]any{
"duration_ms": result.DurationMs,
"exit_code": result.ExitCode,
},
})
}
return result
}
// buildArgs constructs Claude Code command arguments.
func (e *Executor) buildArgs(req *ExecuteRequest) []string {
args := []string{
req.Prompt,
"-p",
}
// Add allowed tools
allowedTools := req.AllowedTools
if len(allowedTools) == 0 {
allowedTools = defaultAllowedTools
}
for _, tool := range allowedTools {
args = append(args, "--allowedTools", tool)
}
return args
}
// buildStreamArgs constructs Claude Code command arguments with streaming output.
func (e *Executor) buildStreamArgs(req *ExecuteRequest) []string {
args := []string{
req.Prompt,
"-p",
"--verbose",
"--output-format", "stream-json",
}
// Add allowed tools
allowedTools := req.AllowedTools
if len(allowedTools) == 0 {
allowedTools = defaultAllowedTools
}
for _, tool := range allowedTools {
args = append(args, "--allowedTools", tool)
}
return args
}
// streamOutput reads from stdout and sends events.
func (e *Executor) streamOutput(r io.Reader, stream string, handler StreamEventHandler, output *strings.Builder) {
scanner := bufio.NewScanner(r)
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, 1024*1024)
for scanner.Scan() {
line := scanner.Text()
if line == "" {
continue
}
output.WriteString(line)
output.WriteString("\n")
handler(StreamEvent{
Type: "output",
Content: line,
Stream: stream,
Timestamp: time.Now().UTC().Format(time.RFC3339),
})
}
}
// streamStderr reads from stderr and sends error events.
func (e *Executor) streamStderr(r io.Reader, handler StreamEventHandler) {
scanner := bufio.NewScanner(r)
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, 1024*1024)
for scanner.Scan() {
line := scanner.Text()
if line == "" {
continue
}
handler(StreamEvent{
Type: "error",
Content: line,
Stream: "stderr",
Timestamp: time.Now().UTC().Format(time.RFC3339),
})
}
}

287
internal/claudebox/git.go Normal file
View File

@ -0,0 +1,287 @@
package claudebox
import (
"bytes"
"context"
"fmt"
"log/slog"
"os/exec"
"strings"
)
// GitOperations provides local git operations in the container.
type GitOperations struct {
workDir string
giteaToken string
gitUser string
gitEmail string
logger *slog.Logger
}
// GitOperationsConfig holds configuration for git operations.
type GitOperationsConfig struct {
// WorkDir is the default working directory.
WorkDir string
// GiteaToken is the token for HTTPS push authentication.
GiteaToken string
// GitUser is the git commit author name.
GitUser string
// GitEmail is the git commit author email.
GitEmail string
// Logger is an optional logger for debug output.
Logger *slog.Logger
}
// NewGitOperations creates a new git operations helper.
func NewGitOperations(cfg GitOperationsConfig) *GitOperations {
if cfg.GitUser == "" {
cfg.GitUser = "rdev-worker"
}
if cfg.GitEmail == "" {
cfg.GitEmail = "worker@threesix.ai"
}
logger := cfg.Logger
if logger == nil {
logger = slog.Default()
}
return &GitOperations{
workDir: cfg.WorkDir,
giteaToken: cfg.GiteaToken,
gitUser: cfg.GitUser,
gitEmail: cfg.GitEmail,
logger: logger,
}
}
// CloneResult contains the result of a git clone operation.
type CloneResult struct {
Cloned bool // True if repo was cloned, false if already existed
Error error
}
// CloneRepo clones a git repository into the workspace if it doesn't exist.
// If the workspace already contains a git repo, it pulls the latest changes.
func (g *GitOperations) CloneRepo(ctx context.Context, workDir, cloneURL string) *CloneResult {
result := &CloneResult{}
if cloneURL == "" {
result.Error = fmt.Errorf("git clone URL is required")
return result
}
// Check if already a git repo with the correct remote
if g.isGitRepo(ctx, workDir) {
currentRemote, err := g.runGitOutput(ctx, workDir, "config", "--get", "remote.origin.url")
currentRemote = strings.TrimSpace(currentRemote)
if err == nil && currentRemote == cloneURL {
// Pull latest changes
if err := g.runGit(ctx, workDir, "pull", "--ff-only"); err != nil {
// Pull failed but repo exists - continue with existing state
g.logger.Debug("git pull failed, continuing with existing state", "error", err, "work_dir", workDir)
}
return result
}
// Different remote - clear and re-clone
if err := g.clearDir(ctx, workDir); err != nil {
result.Error = fmt.Errorf("clear workspace: %w", err)
return result
}
}
// Inject token for authentication
authCloneURL := cloneURL
if g.giteaToken != "" {
authCloneURL = strings.Replace(cloneURL, "https://", "https://token:"+g.giteaToken+"@", 1)
}
// Clone the repository
cmd := exec.CommandContext(ctx, "git", "clone", authCloneURL, workDir)
var stderr bytes.Buffer
cmd.Stderr = &stderr
if err := cmd.Run(); err != nil {
errMsg := g.redactToken(stderr.String())
result.Error = fmt.Errorf("git clone failed: %s: %s", err, errMsg)
return result
}
result.Cloned = true
return result
}
// CommitAndPushResult contains the result of commit and push operations.
type CommitAndPushResult struct {
HasChanges bool
CommitSHA string
FilesChanged []string
Pushed bool
Error error
}
// CommitAndPush commits and optionally pushes changes.
func (g *GitOperations) CommitAndPush(ctx context.Context, workDir, message string, push bool) *CommitAndPushResult {
result := &CommitAndPushResult{}
// Configure git user
if err := g.runGit(ctx, workDir, "config", "user.name", g.gitUser); err != nil {
result.Error = fmt.Errorf("git config user.name: %w", err)
return result
}
if err := g.runGit(ctx, workDir, "config", "user.email", g.gitEmail); err != nil {
result.Error = fmt.Errorf("git config user.email: %w", err)
return result
}
// Check for changes
status, err := g.runGitOutput(ctx, workDir, "status", "--porcelain")
if err != nil {
result.Error = fmt.Errorf("git status: %w", err)
return result
}
if strings.TrimSpace(status) == "" {
return result // No changes
}
result.HasChanges = true
// Stage all changes
if err := g.runGit(ctx, workDir, "add", "-A"); err != nil {
result.Error = fmt.Errorf("git add: %w", err)
return result
}
// Get list of staged files
diffOutput, err := g.runGitOutput(ctx, workDir, "diff", "--cached", "--name-only")
if err != nil {
result.Error = fmt.Errorf("git diff: %w", err)
return result
}
for _, f := range strings.Split(strings.TrimSpace(diffOutput), "\n") {
if f != "" {
result.FilesChanged = append(result.FilesChanged, f)
}
}
// Commit
if err := g.runGit(ctx, workDir, "commit", "-m", message); err != nil {
result.Error = fmt.Errorf("git commit: %w", err)
return result
}
// Get commit SHA
sha, err := g.runGitOutput(ctx, workDir, "rev-parse", "HEAD")
if err != nil {
result.Error = fmt.Errorf("git rev-parse: %w", err)
return result
}
result.CommitSHA = strings.TrimSpace(sha)
// Push if requested
if push {
// Configure credential helper
if g.giteaToken != "" {
credHelper := fmt.Sprintf("!f() { echo username=token; echo password=%s; }; f", g.giteaToken)
if err := g.runGit(ctx, workDir, "config", "credential.helper", credHelper); err != nil {
g.logger.Debug("credential helper config failed, continuing with push", "error", err)
}
}
if err := g.runGit(ctx, workDir, "push", "origin", "HEAD"); err != nil {
result.Error = fmt.Errorf("git push: %w", err)
return result
}
result.Pushed = true
}
return result
}
// GitStatusResult contains git status information.
type GitStatusResult struct {
IsRepo bool `json:"is_repo"`
HasChanges bool `json:"has_changes"`
ChangedFiles []string `json:"changed_files,omitempty"`
Branch string `json:"branch,omitempty"`
}
// Status returns the git status of the workspace.
func (g *GitOperations) Status(ctx context.Context, workDir string) (*GitStatusResult, error) {
result := &GitStatusResult{}
if !g.isGitRepo(ctx, workDir) {
return result, nil
}
result.IsRepo = true
// Get current branch
branch, err := g.runGitOutput(ctx, workDir, "rev-parse", "--abbrev-ref", "HEAD")
if err == nil {
result.Branch = strings.TrimSpace(branch)
}
// Get status
status, err := g.runGitOutput(ctx, workDir, "status", "--porcelain")
if err != nil {
return result, fmt.Errorf("git status: %w", err)
}
lines := strings.Split(strings.TrimSpace(status), "\n")
for _, line := range lines {
if len(line) > 3 {
result.ChangedFiles = append(result.ChangedFiles, strings.TrimSpace(line[3:]))
}
}
result.HasChanges = len(result.ChangedFiles) > 0
return result, nil
}
// isGitRepo checks if the directory is a git repository.
func (g *GitOperations) isGitRepo(ctx context.Context, workDir string) bool {
cmd := exec.CommandContext(ctx, "test", "-d", workDir+"/.git")
return cmd.Run() == nil
}
// clearDir clears the contents of a directory.
func (g *GitOperations) clearDir(ctx context.Context, dir string) error {
cmd := exec.CommandContext(ctx, "sh", "-c", fmt.Sprintf("rm -rf %s/* %s/.[!.]*", dir, dir))
return cmd.Run()
}
// runGit executes a git command.
func (g *GitOperations) runGit(ctx context.Context, workDir string, args ...string) error {
cmd := exec.CommandContext(ctx, "git", append([]string{"-C", workDir}, args...)...)
var stderr bytes.Buffer
cmd.Stderr = &stderr
if err := cmd.Run(); err != nil {
errMsg := g.redactToken(stderr.String())
return fmt.Errorf("%s: %s", err, errMsg)
}
return nil
}
// runGitOutput executes a git command and returns stdout.
func (g *GitOperations) runGitOutput(ctx context.Context, workDir string, args ...string) (string, error) {
cmd := exec.CommandContext(ctx, "git", append([]string{"-C", workDir}, args...)...)
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
if err := cmd.Run(); err != nil {
errMsg := g.redactToken(stderr.String())
return "", fmt.Errorf("%s: %s", err, errMsg)
}
return stdout.String(), nil
}
// redactToken removes the Gitea token from output.
func (g *GitOperations) redactToken(s string) string {
if g.giteaToken == "" {
return s
}
return strings.ReplaceAll(s, g.giteaToken, "[REDACTED]")
}

100
internal/claudebox/sdlc.go Normal file
View File

@ -0,0 +1,100 @@
package claudebox
import (
"bytes"
"context"
"encoding/json"
"fmt"
"log/slog"
"os/exec"
"strings"
)
// SDLCRunner executes SDLC CLI commands locally.
type SDLCRunner struct {
workDir string
logger *slog.Logger
}
// SDLCRunnerConfig holds configuration for the SDLC runner.
type SDLCRunnerConfig struct {
// WorkDir is the default working directory.
WorkDir string
// Logger is an optional logger for debug output.
Logger *slog.Logger
}
// NewSDLCRunner creates a new SDLC runner.
func NewSDLCRunner(cfg SDLCRunnerConfig) *SDLCRunner {
logger := cfg.Logger
if logger == nil {
logger = slog.Default()
}
return &SDLCRunner{
workDir: cfg.WorkDir,
logger: logger,
}
}
// SDLCResult contains the result of an SDLC command.
type SDLCResult struct {
Success bool
Output string
Data json.RawMessage // Parsed JSON from sdlc --json output
Error error
}
// Run executes an SDLC CLI command.
func (s *SDLCRunner) Run(ctx context.Context, workDir, command string, args []string) *SDLCResult {
result := &SDLCResult{}
// Ensure .sdlc/ is initialized
if err := s.ensureInit(ctx, workDir); err != nil {
// Log but continue - command might still work
s.logger.Debug("sdlc init failed, continuing with command", "error", err, "work_dir", workDir)
}
// Build the command
sdlcArgs := []string{command}
sdlcArgs = append(sdlcArgs, args...)
sdlcArgs = append(sdlcArgs, "--json")
cmd := exec.CommandContext(ctx, "sdlc", sdlcArgs...)
cmd.Dir = workDir
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
if err := cmd.Run(); err != nil {
result.Error = fmt.Errorf("%s: %s", err, stderr.String())
result.Output = stdout.String()
return result
}
result.Success = true
result.Output = stdout.String()
// Try to parse JSON output
output := strings.TrimSpace(stdout.String())
if output != "" && (output[0] == '{' || output[0] == '[') {
result.Data = json.RawMessage(output)
}
return result
}
// ensureInit checks if .sdlc/ exists and runs `sdlc init` if it doesn't.
func (s *SDLCRunner) ensureInit(ctx context.Context, workDir string) error {
// Check if .sdlc/ directory exists
cmd := exec.CommandContext(ctx, "test", "-d", workDir+"/.sdlc")
if cmd.Run() == nil {
return nil // Already initialized
}
// Run sdlc init
initCmd := exec.CommandContext(ctx, "sdlc", "init", "--json")
initCmd.Dir = workDir
return initCmd.Run()
}

View File

@ -0,0 +1,368 @@
// Package claudebox provides HTTP server and handlers for the claudebox sidecar.
// This package enables HTTP-based execution of Claude Code, git, and SDLC operations
// instead of kubectl exec.
package claudebox
import (
"encoding/json"
"log/slog"
"net/http"
"time"
"github.com/go-chi/chi/v5"
"github.com/orchard9/rdev/internal/logging"
"github.com/orchard9/rdev/pkg/api"
)
// Server handles HTTP requests for claudebox operations.
type Server struct {
executor *Executor
gitOps *GitOperations
sdlcRunner *SDLCRunner
logger *slog.Logger
}
// ServerConfig holds configuration for the claudebox server.
type ServerConfig struct {
Executor *Executor
GitOps *GitOperations
SDLCRunner *SDLCRunner
Logger *slog.Logger
}
// NewServer creates a new claudebox HTTP server.
func NewServer(cfg ServerConfig) *Server {
return &Server{
executor: cfg.Executor,
gitOps: cfg.GitOps,
sdlcRunner: cfg.SDLCRunner,
logger: cfg.Logger,
}
}
// Mount registers server routes on the router.
func (s *Server) Mount(r chi.Router) {
r.Get("/health", s.handleHealth)
r.Post("/execute", s.handleExecute)
r.Post("/execute/stream", s.handleExecuteStream)
r.Post("/git/clone", s.handleGitClone)
r.Post("/git/commit-and-push", s.handleGitCommitAndPush)
r.Get("/git/status", s.handleGitStatus)
r.Post("/sdlc", s.handleSDLC)
}
// HealthResponse is the health check response.
type HealthResponse struct {
Status string `json:"status"`
Timestamp string `json:"timestamp"`
WorkDir string `json:"work_dir"`
}
// handleHealth returns server health status.
func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
resp := HealthResponse{
Status: "healthy",
Timestamp: time.Now().UTC().Format(time.RFC3339),
WorkDir: s.executor.workDir,
}
api.WriteJSON(w, r, http.StatusOK, resp)
}
// ExecuteRequest is the request to execute Claude Code.
type ExecuteRequest struct {
Prompt string `json:"prompt"`
AllowedTools []string `json:"allowed_tools,omitempty"`
WorkingDir string `json:"working_dir,omitempty"`
Timeout int `json:"timeout_seconds,omitempty"` // seconds
Metadata map[string]string `json:"metadata,omitempty"`
}
// ExecuteResponse is the response from executing Claude Code.
type ExecuteResponse struct {
Success bool `json:"success"`
Output string `json:"output"`
ExitCode int `json:"exit_code"`
DurationMs int64 `json:"duration_ms"`
Error string `json:"error,omitempty"`
SessionID string `json:"session_id,omitempty"`
FinalOutput string `json:"final_output,omitempty"`
Artifacts map[string]string `json:"artifacts,omitempty"`
}
// handleExecute runs Claude Code and returns the complete result.
func (s *Server) handleExecute(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
log := logging.FromContext(ctx)
var req ExecuteRequest
if err := api.DecodeJSON(r, &req); err != nil {
api.WriteBadRequest(w, r, "invalid request body")
return
}
if req.Prompt == "" {
api.WriteBadRequest(w, r, "prompt is required")
return
}
log.Info("executing Claude Code", "prompt_len", len(req.Prompt))
result := s.executor.Execute(ctx, &req)
resp := ExecuteResponse{
Success: result.Success,
Output: result.Output,
ExitCode: result.ExitCode,
DurationMs: result.DurationMs,
SessionID: result.SessionID,
FinalOutput: result.FinalOutput,
}
if result.Error != nil {
resp.Error = result.Error.Error()
}
api.WriteJSON(w, r, http.StatusOK, resp)
}
// StreamEvent is an SSE event for streaming execution.
type StreamEvent struct {
Type string `json:"type"`
Content string `json:"content,omitempty"`
Stream string `json:"stream,omitempty"`
ToolName string `json:"tool_name,omitempty"`
Data map[string]any `json:"data,omitempty"`
Timestamp string `json:"timestamp"`
}
// handleExecuteStream runs Claude Code and streams events via SSE.
func (s *Server) handleExecuteStream(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
log := logging.FromContext(ctx)
var req ExecuteRequest
if err := api.DecodeJSON(r, &req); err != nil {
api.WriteBadRequest(w, r, "invalid request body")
return
}
if req.Prompt == "" {
api.WriteBadRequest(w, r, "prompt is required")
return
}
// Set up SSE headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
flusher, ok := w.(http.Flusher)
if !ok {
api.WriteInternalError(w, r, "streaming not supported")
return
}
log.Info("starting streaming execution", "prompt_len", len(req.Prompt))
// Stream events via callback
eventCh := make(chan StreamEvent, 100)
go func() {
defer close(eventCh)
s.executor.ExecuteStream(ctx, &req, func(evt StreamEvent) {
select {
case eventCh <- evt:
case <-ctx.Done():
}
})
}()
// Write events to client
for evt := range eventCh {
data, err := json.Marshal(evt)
if err != nil {
log.Warn("failed to marshal event", logging.FieldError, err)
continue
}
_, writeErr := w.Write([]byte("data: " + string(data) + "\n\n"))
if writeErr != nil {
log.Debug("client disconnected during stream")
return
}
flusher.Flush()
}
}
// GitCloneRequest is the request to clone a repository.
type GitCloneRequest struct {
CloneURL string `json:"clone_url"`
WorkDir string `json:"work_dir,omitempty"` // defaults to /workspace
}
// GitCloneResponse is the response from cloning.
type GitCloneResponse struct {
Success bool `json:"success"`
Cloned bool `json:"cloned"` // true if cloned, false if already existed
Error string `json:"error,omitempty"`
}
// handleGitClone clones or updates a git repository.
func (s *Server) handleGitClone(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
var req GitCloneRequest
if err := api.DecodeJSON(r, &req); err != nil {
api.WriteBadRequest(w, r, "invalid request body")
return
}
if req.CloneURL == "" {
api.WriteBadRequest(w, r, "clone_url is required")
return
}
workDir := req.WorkDir
if workDir == "" {
workDir = s.gitOps.workDir
}
result := s.gitOps.CloneRepo(ctx, workDir, req.CloneURL)
resp := GitCloneResponse{
Success: result.Error == nil,
Cloned: result.Cloned,
}
if result.Error != nil {
resp.Error = result.Error.Error()
}
api.WriteJSON(w, r, http.StatusOK, resp)
}
// GitCommitAndPushRequest is the request to commit and push changes.
type GitCommitAndPushRequest struct {
Message string `json:"message"`
Push bool `json:"push"`
WorkDir string `json:"work_dir,omitempty"` // defaults to /workspace
}
// GitCommitAndPushResponse is the response from commit and push.
type GitCommitAndPushResponse struct {
Success bool `json:"success"`
HasChanges bool `json:"has_changes"`
CommitSHA string `json:"commit_sha,omitempty"`
FilesChanged []string `json:"files_changed,omitempty"`
Pushed bool `json:"pushed"`
Error string `json:"error,omitempty"`
}
// handleGitCommitAndPush commits and optionally pushes changes.
func (s *Server) handleGitCommitAndPush(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
var req GitCommitAndPushRequest
if err := api.DecodeJSON(r, &req); err != nil {
api.WriteBadRequest(w, r, "invalid request body")
return
}
if req.Message == "" {
api.WriteBadRequest(w, r, "message is required")
return
}
workDir := req.WorkDir
if workDir == "" {
workDir = s.gitOps.workDir
}
result := s.gitOps.CommitAndPush(ctx, workDir, req.Message, req.Push)
resp := GitCommitAndPushResponse{
Success: result.Error == nil,
HasChanges: result.HasChanges,
CommitSHA: result.CommitSHA,
FilesChanged: result.FilesChanged,
Pushed: result.Pushed,
}
if result.Error != nil {
resp.Error = result.Error.Error()
}
api.WriteJSON(w, r, http.StatusOK, resp)
}
// GitStatusResponse is the response from git status.
type GitStatusResponse struct {
IsRepo bool `json:"is_repo"`
HasChanges bool `json:"has_changes"`
ChangedFiles []string `json:"changed_files,omitempty"`
Branch string `json:"branch,omitempty"`
Error string `json:"error,omitempty"`
}
// handleGitStatus returns the git status of the workspace.
func (s *Server) handleGitStatus(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
workDir := r.URL.Query().Get("work_dir")
if workDir == "" {
workDir = s.gitOps.workDir
}
status, err := s.gitOps.Status(ctx, workDir)
if err != nil {
api.WriteJSON(w, r, http.StatusOK, GitStatusResponse{
IsRepo: false,
Error: err.Error(),
})
return
}
api.WriteJSON(w, r, http.StatusOK, status)
}
// SDLCRequest is the request to run an SDLC command.
type SDLCRequest struct {
Command string `json:"command"`
Args []string `json:"args,omitempty"`
WorkDir string `json:"work_dir,omitempty"` // defaults to /workspace
}
// SDLCResponse is the response from running an SDLC command.
type SDLCResponse struct {
Success bool `json:"success"`
Output string `json:"output"`
Data json.RawMessage `json:"data,omitempty"` // Parsed JSON from sdlc --json output
Error string `json:"error,omitempty"`
}
// handleSDLC runs an SDLC CLI command.
func (s *Server) handleSDLC(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
var req SDLCRequest
if err := api.DecodeJSON(r, &req); err != nil {
api.WriteBadRequest(w, r, "invalid request body")
return
}
if req.Command == "" {
api.WriteBadRequest(w, r, "command is required")
return
}
workDir := req.WorkDir
if workDir == "" {
workDir = s.sdlcRunner.workDir
}
result := s.sdlcRunner.Run(ctx, workDir, req.Command, req.Args)
resp := SDLCResponse{
Success: result.Success,
Output: result.Output,
Data: result.Data,
}
if result.Error != nil {
resp.Error = result.Error.Error()
}
api.WriteJSON(w, r, http.StatusOK, resp)
}

View File

@ -0,0 +1,36 @@
-- Migration 017: Add capability-based task routing.
-- Workers can declare capabilities and tags, and tasks can require specific
-- capabilities/tags for routing to appropriate workers.
-- Add tags column to workers table for arbitrary metadata/labels.
-- Tags are key-value pairs that can be used for worker selection.
-- Example: {"gpu": "true", "region": "us-west"}
ALTER TABLE workers ADD COLUMN IF NOT EXISTS tags JSONB DEFAULT '{}';
COMMENT ON COLUMN workers.tags IS 'Key-value tags for worker selection (e.g., {"gpu": "true"})';
-- Add required_capabilities to work_queue for capability-based routing.
-- Tasks can require specific capabilities (from workers.capabilities array).
-- Example: ["gpu", "high-memory"]
ALTER TABLE work_queue ADD COLUMN IF NOT EXISTS required_capabilities TEXT[] DEFAULT '{}';
COMMENT ON COLUMN work_queue.required_capabilities IS 'Array of required worker capabilities for task routing';
-- Add required_tags to work_queue for tag-based routing.
-- Tasks can require workers to have specific tags.
-- Example: {"region": "us-west"}
ALTER TABLE work_queue ADD COLUMN IF NOT EXISTS required_tags JSONB DEFAULT '{}';
COMMENT ON COLUMN work_queue.required_tags IS 'Required worker tags for task routing (JSON key-value pairs)';
-- Create GIN index for capability-based routing queries.
-- Enables efficient queries like: WHERE required_capabilities <@ worker_capabilities
CREATE INDEX IF NOT EXISTS idx_work_queue_capabilities
ON work_queue USING GIN(required_capabilities)
WHERE required_capabilities != '{}';
-- Create GIN index for tag-based routing queries.
-- Enables efficient queries like: WHERE required_tags <@ worker_tags
CREATE INDEX IF NOT EXISTS idx_work_queue_tags
ON work_queue USING GIN(required_tags)
WHERE required_tags != '{}';

View File

@ -16,6 +16,7 @@ import (
// WorkersHandler handles worker pool management endpoints. // WorkersHandler handles worker pool management endpoints.
type WorkersHandler struct { type WorkersHandler struct {
workerService *service.WorkerService workerService *service.WorkerService
workService service.WorkServiceFailer
} }
// NewWorkersHandler creates a new workers handler. // NewWorkersHandler creates a new workers handler.
@ -25,6 +26,13 @@ func NewWorkersHandler(workerService *service.WorkerService) *WorkersHandler {
} }
} }
// WithWorkService adds a work service for task failure handling.
// This is required for standalone worker endpoints.
func (h *WorkersHandler) WithWorkService(ws service.WorkServiceFailer) *WorkersHandler {
h.workService = ws
return h
}
// Mount registers the worker pool routes. // Mount registers the worker pool routes.
func (h *WorkersHandler) Mount(r api.Router) { func (h *WorkersHandler) Mount(r api.Router) {
r.Route("/workers", func(r chi.Router) { r.Route("/workers", func(r chi.Router) {
@ -36,6 +44,11 @@ func (h *WorkersHandler) Mount(r api.Router) {
r.With(auth.RequireScope(auth.ScopeWorkersWrite, auth.ScopeAdmin)).Post("/register", h.Register) r.With(auth.RequireScope(auth.ScopeWorkersWrite, auth.ScopeAdmin)).Post("/register", h.Register)
r.With(auth.RequireScope(auth.ScopeWorkersWrite, auth.ScopeAdmin)).Post("/{workerId}/heartbeat", h.Heartbeat) r.With(auth.RequireScope(auth.ScopeWorkersWrite, auth.ScopeAdmin)).Post("/{workerId}/heartbeat", h.Heartbeat)
r.With(auth.RequireScope(auth.ScopeWorkersWrite, auth.ScopeAdmin)).Post("/{workerId}/drain", h.Drain) r.With(auth.RequireScope(auth.ScopeWorkersWrite, auth.ScopeAdmin)).Post("/{workerId}/drain", h.Drain)
// Standalone worker task operations
r.With(auth.RequireScope(auth.ScopeWorkersWrite, auth.ScopeAdmin)).Post("/{workerId}/claim", h.ClaimTask)
r.With(auth.RequireScope(auth.ScopeWorkersWrite, auth.ScopeAdmin)).Post("/{workerId}/complete/{taskId}", h.CompleteTask)
r.With(auth.RequireScope(auth.ScopeWorkersWrite, auth.ScopeAdmin)).Post("/{workerId}/fail/{taskId}", h.FailTask)
}) })
} }
@ -230,3 +243,146 @@ func (h *WorkersHandler) Drain(w http.ResponseWriter, r *http.Request) {
"message": "worker will finish current task then stop accepting new work", "message": "worker will finish current task then stop accepting new work",
}) })
} }
// ClaimTask claims the next available task for a worker.
// POST /workers/{workerId}/claim
func (h *WorkersHandler) ClaimTask(w http.ResponseWriter, r *http.Request) {
workerID := chi.URLParam(r, "workerId")
if workerID == "" {
api.WriteBadRequest(w, r, "worker ID is required")
return
}
task, err := h.workerService.ClaimTask(r.Context(), workerID)
if err != nil {
if errors.Is(err, domain.ErrWorkerNotFound) {
api.WriteNotFound(w, r, "worker not found: "+workerID)
return
}
api.WriteInternalError(w, r, "failed to claim task")
return
}
if task == nil {
// No tasks available - return 204 No Content
w.WriteHeader(http.StatusNoContent)
return
}
api.WriteSuccess(w, r, map[string]any{
"task": toWorkTaskDTO(task),
"worker_id": workerID,
})
}
// CompleteTaskRequest is the request body for POST /workers/{workerId}/complete/{taskId}.
type CompleteTaskRequest struct {
Success bool `json:"success"`
Output string `json:"output,omitempty"`
Error string `json:"error,omitempty"`
CommitSHA string `json:"commit_sha,omitempty"`
FilesChanged []string `json:"files_changed,omitempty"`
Artifacts map[string]string `json:"artifacts,omitempty"`
DurationMs int64 `json:"duration_ms,omitempty"`
}
// CompleteTask marks a task as complete.
// POST /workers/{workerId}/complete/{taskId}
func (h *WorkersHandler) CompleteTask(w http.ResponseWriter, r *http.Request) {
workerID := chi.URLParam(r, "workerId")
taskID := chi.URLParam(r, "taskId")
if workerID == "" {
api.WriteBadRequest(w, r, "worker ID is required")
return
}
if taskID == "" {
api.WriteBadRequest(w, r, "task ID is required")
return
}
var req CompleteTaskRequest
if err := api.DecodeJSON(r, &req); err != nil {
api.WriteBadRequest(w, r, "invalid request body")
return
}
result := &domain.BuildResult{
Success: req.Success,
Output: req.Output,
Error: req.Error,
CommitSHA: req.CommitSHA,
FilesChanged: req.FilesChanged,
DurationMs: req.DurationMs,
Artifacts: req.Artifacts,
}
if err := h.workerService.CompleteTask(r.Context(), workerID, taskID, result); err != nil {
if errors.Is(err, domain.ErrWorkerNotFound) {
api.WriteNotFound(w, r, "worker not found: "+workerID)
return
}
api.WriteInternalError(w, r, "failed to complete task")
return
}
api.WriteSuccess(w, r, map[string]any{
"task_id": taskID,
"worker_id": workerID,
"status": "completed",
})
}
// FailTaskRequest is the request body for POST /workers/{workerId}/fail/{taskId}.
type FailTaskRequest struct {
Error string `json:"error"`
Output string `json:"output,omitempty"`
DurationMs int64 `json:"duration_ms,omitempty"`
}
// FailTask marks a task as failed.
// POST /workers/{workerId}/fail/{taskId}
func (h *WorkersHandler) FailTask(w http.ResponseWriter, r *http.Request) {
workerID := chi.URLParam(r, "workerId")
taskID := chi.URLParam(r, "taskId")
if workerID == "" {
api.WriteBadRequest(w, r, "worker ID is required")
return
}
if taskID == "" {
api.WriteBadRequest(w, r, "task ID is required")
return
}
var req FailTaskRequest
if err := api.DecodeJSON(r, &req); err != nil {
api.WriteBadRequest(w, r, "invalid request body")
return
}
if h.workService == nil {
api.WriteInternalError(w, r, "work service not configured")
return
}
result := &domain.BuildResult{
Success: false,
Output: req.Output,
Error: req.Error,
DurationMs: req.DurationMs,
}
if err := h.workerService.FailTask(r.Context(), workerID, taskID, result, h.workService); err != nil {
if errors.Is(err, domain.ErrWorkerNotFound) {
api.WriteNotFound(w, r, "worker not found: "+workerID)
return
}
api.WriteInternalError(w, r, "failed to fail task")
return
}
api.WriteSuccess(w, r, map[string]any{
"task_id": taskID,
"worker_id": workerID,
"status": "failed",
})
}

View File

@ -0,0 +1,308 @@
package worker
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
"github.com/orchard9/rdev/internal/domain"
)
// APIClient is an HTTP client for standalone workers to communicate with rdev-api.
type APIClient struct {
baseURL string
apiKey string
httpClient *http.Client
}
// APIClientConfig holds configuration for the API client.
type APIClientConfig struct {
// BaseURL is the base URL of the rdev-api server.
BaseURL string
// APIKey is the API key for authentication.
APIKey string
// Timeout is the default request timeout.
Timeout time.Duration
}
// NewAPIClient creates a new API client for standalone workers.
func NewAPIClient(cfg APIClientConfig) *APIClient {
if cfg.Timeout == 0 {
cfg.Timeout = 30 * time.Second
}
return &APIClient{
baseURL: cfg.BaseURL,
apiKey: cfg.APIKey,
httpClient: &http.Client{
Timeout: cfg.Timeout,
},
}
}
// RegisterRequest is the request to register a worker.
type RegisterRequest struct {
ID string `json:"id"`
Hostname string `json:"hostname"`
Version string `json:"version,omitempty"`
Capabilities []string `json:"capabilities,omitempty"`
}
// RegisterResponse is the response from registering a worker.
type RegisterResponse struct {
Success bool `json:"success"`
Data struct {
ID string `json:"id"`
Hostname string `json:"hostname"`
Status string `json:"status"`
Capabilities []string `json:"capabilities,omitempty"`
RegisteredAt string `json:"registered_at"`
LastHeartbeat string `json:"last_heartbeat"`
Version string `json:"version,omitempty"`
} `json:"data"`
Error string `json:"error,omitempty"`
}
// Register registers the worker with rdev-api.
func (c *APIClient) Register(ctx context.Context, req *RegisterRequest) error {
body, err := json.Marshal(req)
if err != nil {
return fmt.Errorf("marshal request: %w", err)
}
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/workers/register", bytes.NewReader(body))
if err != nil {
return fmt.Errorf("create request: %w", err)
}
c.setHeaders(httpReq)
resp, err := c.httpClient.Do(httpReq)
if err != nil {
return fmt.Errorf("register: %w", err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
bodyBytes, _ := io.ReadAll(resp.Body)
return fmt.Errorf("register returned status %d: %s", resp.StatusCode, string(bodyBytes))
}
return nil
}
// Heartbeat sends a heartbeat to keep the worker alive.
func (c *APIClient) Heartbeat(ctx context.Context, workerID string) error {
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost,
fmt.Sprintf("%s/workers/%s/heartbeat", c.baseURL, workerID), nil)
if err != nil {
return fmt.Errorf("create request: %w", err)
}
c.setHeaders(httpReq)
resp, err := c.httpClient.Do(httpReq)
if err != nil {
return fmt.Errorf("heartbeat: %w", err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(resp.Body)
return fmt.Errorf("heartbeat returned status %d: %s", resp.StatusCode, string(bodyBytes))
}
return nil
}
// ClaimTaskResponse is the response from claiming a task.
type ClaimTaskResponse struct {
Success bool `json:"success"`
Data struct {
Task *WorkTaskData `json:"task"`
WorkerID string `json:"worker_id"`
} `json:"data"`
Error string `json:"error,omitempty"`
}
// WorkTaskData is the task data returned from the API.
type WorkTaskData struct {
ID string `json:"id"`
ProjectID string `json:"project_id"`
Type string `json:"type"`
Spec map[string]any `json:"spec"`
Status string `json:"status"`
Priority int `json:"priority"`
WorkerID string `json:"worker_id,omitempty"`
CallbackURL string `json:"callback_url,omitempty"`
CreatedAt string `json:"created_at"`
StartedAt string `json:"started_at,omitempty"`
RetryCount int `json:"retry_count"`
MaxRetries int `json:"max_retries"`
}
// ToWorkTask converts the API task data to a domain work task.
func (d *WorkTaskData) ToWorkTask() *domain.WorkTask {
if d == nil {
return nil
}
task := &domain.WorkTask{
ID: d.ID,
ProjectID: d.ProjectID,
Type: domain.WorkTaskType(d.Type),
Spec: d.Spec,
Status: domain.WorkTaskStatus(d.Status),
Priority: d.Priority,
WorkerID: d.WorkerID,
CallbackURL: d.CallbackURL,
RetryCount: d.RetryCount,
MaxRetries: d.MaxRetries,
}
if d.CreatedAt != "" {
task.CreatedAt, _ = time.Parse(time.RFC3339, d.CreatedAt)
}
if d.StartedAt != "" {
t, _ := time.Parse(time.RFC3339, d.StartedAt)
task.StartedAt = &t
}
return task
}
// ClaimTask claims the next available task from the queue.
// Returns nil if no tasks are available.
func (c *APIClient) ClaimTask(ctx context.Context, workerID string) (*domain.WorkTask, error) {
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost,
fmt.Sprintf("%s/workers/%s/claim", c.baseURL, workerID), nil)
if err != nil {
return nil, fmt.Errorf("create request: %w", err)
}
c.setHeaders(httpReq)
resp, err := c.httpClient.Do(httpReq)
if err != nil {
return nil, fmt.Errorf("claim task: %w", err)
}
defer func() { _ = resp.Body.Close() }()
// 204 No Content = no tasks available
if resp.StatusCode == http.StatusNoContent {
return nil, nil
}
if resp.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("claim task returned status %d: %s", resp.StatusCode, string(bodyBytes))
}
var result ClaimTaskResponse
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("decode response: %w", err)
}
return result.Data.Task.ToWorkTask(), nil
}
// CompleteTaskRequest is the request to complete a task.
type CompleteTaskRequest struct {
Success bool `json:"success"`
Output string `json:"output,omitempty"`
Error string `json:"error,omitempty"`
CommitSHA string `json:"commit_sha,omitempty"`
FilesChanged []string `json:"files_changed,omitempty"`
Artifacts map[string]string `json:"artifacts,omitempty"`
DurationMs int64 `json:"duration_ms,omitempty"`
}
// CompleteTask marks a task as complete.
func (c *APIClient) CompleteTask(ctx context.Context, workerID, taskID string, result *domain.BuildResult) error {
req := &CompleteTaskRequest{
Success: result.Success,
Output: result.Output,
Error: result.Error,
CommitSHA: result.CommitSHA,
DurationMs: result.DurationMs,
}
if result.FilesChanged != nil {
req.FilesChanged = result.FilesChanged
}
if result.Artifacts != nil {
req.Artifacts = result.Artifacts
}
body, err := json.Marshal(req)
if err != nil {
return fmt.Errorf("marshal request: %w", err)
}
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost,
fmt.Sprintf("%s/workers/%s/complete/%s", c.baseURL, workerID, taskID), bytes.NewReader(body))
if err != nil {
return fmt.Errorf("create request: %w", err)
}
c.setHeaders(httpReq)
resp, err := c.httpClient.Do(httpReq)
if err != nil {
return fmt.Errorf("complete task: %w", err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(resp.Body)
return fmt.Errorf("complete task returned status %d: %s", resp.StatusCode, string(bodyBytes))
}
return nil
}
// FailTaskRequest is the request to fail a task.
type FailTaskRequest struct {
Error string `json:"error"`
Output string `json:"output,omitempty"`
DurationMs int64 `json:"duration_ms,omitempty"`
}
// FailTask marks a task as failed.
func (c *APIClient) FailTask(ctx context.Context, workerID, taskID string, errMsg, output string, durationMs int64) error {
req := &FailTaskRequest{
Error: errMsg,
Output: output,
DurationMs: durationMs,
}
body, err := json.Marshal(req)
if err != nil {
return fmt.Errorf("marshal request: %w", err)
}
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost,
fmt.Sprintf("%s/workers/%s/fail/%s", c.baseURL, workerID, taskID), bytes.NewReader(body))
if err != nil {
return fmt.Errorf("create request: %w", err)
}
c.setHeaders(httpReq)
resp, err := c.httpClient.Do(httpReq)
if err != nil {
return fmt.Errorf("fail task: %w", err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(resp.Body)
return fmt.Errorf("fail task returned status %d: %s", resp.StatusCode, string(bodyBytes))
}
return nil
}
// setHeaders sets common headers on the request.
func (c *APIClient) setHeaders(req *http.Request) {
req.Header.Set("Content-Type", "application/json")
if c.apiKey != "" {
req.Header.Set("X-API-Key", c.apiKey)
}
}

View File

@ -0,0 +1,315 @@
package worker
import (
"context"
"fmt"
"strings"
"time"
claudeboxclient "github.com/orchard9/rdev/internal/adapter/claudebox"
"github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/logging"
"github.com/orchard9/rdev/internal/port"
)
// HTTPBuildExecutor handles WorkTaskTypeBuild tasks using HTTP calls to the
// local claudebox sidecar instead of kubectl exec.
type HTTPBuildExecutor struct {
client *claudeboxclient.Client
streams port.StreamPublisher
workDir string
}
// HTTPBuildExecutorConfig holds configuration for the HTTP build executor.
type HTTPBuildExecutorConfig struct {
// ClaudeboxClient is the HTTP client for the claudebox sidecar.
ClaudeboxClient *claudeboxclient.Client
// Streams is the SSE stream publisher for real-time events.
Streams port.StreamPublisher
// WorkDir is the default working directory in the container.
WorkDir string
}
// NewHTTPBuildExecutor creates a new HTTP-based build executor.
func NewHTTPBuildExecutor(cfg HTTPBuildExecutorConfig) *HTTPBuildExecutor {
if cfg.WorkDir == "" {
cfg.WorkDir = "/workspace"
}
return &HTTPBuildExecutor{
client: cfg.ClaudeboxClient,
streams: cfg.Streams,
workDir: cfg.WorkDir,
}
}
// Execute runs a build task using the claudebox sidecar HTTP API.
func (e *HTTPBuildExecutor) Execute(ctx context.Context, task *domain.WorkTask) *domain.BuildResult {
log := logging.FromContext(ctx).WithWorker("http-build-executor")
start := time.Now()
streamID := task.ID
// Publish started event
e.publishEvent(streamID, BuildEventStarted, map[string]any{
"task_id": task.ID,
"project_id": task.ProjectID,
"started_at": start.Format(time.RFC3339),
})
// Parse build spec
spec, err := e.parseSpec(task.Spec)
if err != nil {
e.publishEvent(streamID, BuildEventFailed, map[string]any{
"task_id": task.ID,
"error": fmt.Sprintf("invalid build spec: %v", err),
})
return &domain.BuildResult{
Success: false,
Error: fmt.Sprintf("invalid build spec: %v", err),
DurationMs: time.Since(start).Milliseconds(),
}
}
// Clone or update repository if git operations are needed
if (spec.AutoCommit || spec.AutoPush) && e.client != nil {
if spec.GitCloneURL == "" {
e.publishEvent(streamID, BuildEventFailed, map[string]any{
"task_id": task.ID,
"error": "git_clone_url is required when auto_commit or auto_push is enabled",
})
return &domain.BuildResult{
Success: false,
Error: "git_clone_url is required when auto_commit or auto_push is enabled",
DurationMs: time.Since(start).Milliseconds(),
}
}
log.Info("cloning repository via HTTP", "task_id", task.ID)
cloneResp, err := e.client.GitClone(ctx, spec.GitCloneURL, e.workDir)
if err != nil {
e.publishEvent(streamID, BuildEventFailed, map[string]any{
"task_id": task.ID,
"error": fmt.Sprintf("git clone failed: %v", err),
})
return &domain.BuildResult{
Success: false,
Error: fmt.Sprintf("git clone failed: %v", err),
DurationMs: time.Since(start).Milliseconds(),
}
}
if !cloneResp.Success {
e.publishEvent(streamID, BuildEventFailed, map[string]any{
"task_id": task.ID,
"error": fmt.Sprintf("git clone failed: %s", cloneResp.Error),
})
return &domain.BuildResult{
Success: false,
Error: fmt.Sprintf("git clone failed: %s", cloneResp.Error),
DurationMs: time.Since(start).Milliseconds(),
}
}
if cloneResp.Cloned {
e.publishEvent(streamID, BuildEventOutput, map[string]any{
"content": fmt.Sprintf("Cloned repository to %s", e.workDir),
})
}
}
// Execute Claude Code via HTTP
log.Info("executing Claude Code via HTTP", "task_id", task.ID, "prompt_len", len(spec.Prompt))
var output strings.Builder
const maxOutputSize = 1 << 20 // 1MB
// Use streaming execution
execErr := e.client.ExecuteStream(ctx, &claudeboxclient.ExecuteRequest{
Prompt: spec.Prompt,
WorkingDir: e.workDir,
Timeout: 600, // 10 minutes
}, func(evt claudeboxclient.StreamEvent) {
// Map event types
eventType := BuildEventOutput
switch evt.Type {
case "tool_use":
eventType = BuildEventToolUse
case "tool_result":
eventType = BuildEventToolResult
case "error":
eventType = BuildEventError
}
e.publishEvent(streamID, eventType, map[string]any{
"content": evt.Content,
"stream": evt.Stream,
"tool_name": evt.ToolName,
})
// Buffer output
if evt.Type == "output" || evt.Type == "error" {
if output.Len() >= maxOutputSize {
return
}
if output.Len() > 0 {
output.WriteString("\n")
}
remaining := maxOutputSize - output.Len()
if len(evt.Content) > remaining {
output.WriteString(evt.Content[:remaining])
output.WriteString("\n... [output truncated at 1MB]")
} else {
output.WriteString(evt.Content)
}
}
})
if execErr != nil {
e.publishEvent(streamID, BuildEventFailed, map[string]any{
"task_id": task.ID,
"error": fmt.Sprintf("agent execution failed: %v", execErr),
"duration_ms": time.Since(start).Milliseconds(),
})
e.closeStream(ctx, streamID)
return &domain.BuildResult{
Success: false,
Error: fmt.Sprintf("agent execution failed: %v", execErr),
Output: output.String(),
DurationMs: time.Since(start).Milliseconds(),
}
}
result := &domain.BuildResult{
Success: true,
Output: output.String(),
DurationMs: time.Since(start).Milliseconds(),
Artifacts: make(map[string]string),
}
// Include SDLC context in artifacts for callback routing
if spec.SDLCContext != nil {
if spec.SDLCContext.Feature != "" {
result.Artifacts["sdlc_feature"] = spec.SDLCContext.Feature
}
if spec.SDLCContext.ArtifactType != "" {
result.Artifacts["sdlc_artifact_type"] = spec.SDLCContext.ArtifactType
}
if spec.SDLCContext.TaskID != "" {
result.Artifacts["sdlc_task_id"] = spec.SDLCContext.TaskID
}
}
// Post-build git operations: commit and push changes
if result.Success && spec.AutoCommit && e.client != nil {
commitMsg := fmt.Sprintf("build: %s", truncate(spec.Prompt, 72))
gitResp, err := e.client.GitCommitAndPush(ctx, commitMsg, spec.AutoPush, e.workDir)
if err != nil {
log.Warn("post-build git operations failed", "task_id", task.ID, "error", err)
result.Success = false
result.Error = fmt.Sprintf("build succeeded but git operations failed: %v", err)
} else if !gitResp.Success {
log.Warn("post-build git operations failed", "task_id", task.ID, "error", gitResp.Error)
result.Success = false
result.Error = fmt.Sprintf("build succeeded but git operations failed: %s", gitResp.Error)
} else if gitResp.HasChanges {
result.CommitSHA = gitResp.CommitSHA
result.FilesChanged = gitResp.FilesChanged
log.Info("post-build git operations completed",
"task_id", task.ID,
"commit", gitResp.CommitSHA,
"files", len(gitResp.FilesChanged),
"pushed", gitResp.Pushed,
)
} else {
log.Info("no changes to commit after build", "task_id", task.ID)
}
}
// Publish completion event
if result.Success {
e.publishEvent(streamID, BuildEventCompleted, map[string]any{
"task_id": task.ID,
"success": true,
"commit_sha": result.CommitSHA,
"files_changed": result.FilesChanged,
"duration_ms": result.DurationMs,
})
} else {
e.publishEvent(streamID, BuildEventFailed, map[string]any{
"task_id": task.ID,
"error": result.Error,
"duration_ms": result.DurationMs,
})
}
e.closeStream(ctx, streamID)
return result
}
// publishEvent publishes an event to the SSE stream.
func (e *HTTPBuildExecutor) publishEvent(streamID, eventType string, data map[string]any) {
if e.streams == nil {
return
}
e.streams.Publish(streamID, port.StreamEvent{
Type: eventType,
Data: data,
})
}
// closeStream closes the stream after a delay.
func (e *HTTPBuildExecutor) closeStream(ctx context.Context, streamID string) {
if e.streams == nil {
return
}
go func() {
select {
case <-ctx.Done():
e.streams.Close(streamID)
case <-time.After(streamCloseDelay):
e.streams.Close(streamID)
}
}()
}
// httpBuildSpec holds typed fields extracted from the task spec map.
type httpBuildSpec struct {
Prompt string
AutoCommit bool
AutoPush bool
GitCloneURL string
SDLCContext *sdlcContext
}
// parseSpec extracts typed BuildSpec fields from the generic map.
func (e *HTTPBuildExecutor) parseSpec(spec map[string]any) (*httpBuildSpec, error) {
prompt, _ := spec["prompt"].(string)
if prompt == "" {
return nil, fmt.Errorf("prompt is required")
}
autoCommit, _ := spec["auto_commit"].(bool)
autoPush, _ := spec["auto_push"].(bool)
gitCloneURL, _ := spec["git_clone_url"].(string)
parsed := &httpBuildSpec{
Prompt: prompt,
AutoCommit: autoCommit,
AutoPush: autoPush,
GitCloneURL: gitCloneURL,
}
// Extract SDLC context if present
if sdlcCtx, ok := spec["sdlc_context"].(map[string]any); ok {
parsed.SDLCContext = &sdlcContext{
Feature: stringFromMap(sdlcCtx, "feature"),
ArtifactType: stringFromMap(sdlcCtx, "artifact_type"),
TaskID: stringFromMap(sdlcCtx, "task_id"),
}
}
return parsed, nil
}

View File

@ -0,0 +1,184 @@
package worker
import (
"context"
"fmt"
"time"
claudeboxclient "github.com/orchard9/rdev/internal/adapter/claudebox"
"github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/logging"
)
// HTTPSDLCTaskExecutor handles WorkTaskTypeSDLC tasks using HTTP calls to the
// local claudebox sidecar instead of kubectl exec.
type HTTPSDLCTaskExecutor struct {
client *claudeboxclient.Client
workDir string
}
// HTTPSDLCTaskExecutorConfig holds configuration for the HTTP SDLC executor.
type HTTPSDLCTaskExecutorConfig struct {
// ClaudeboxClient is the HTTP client for the claudebox sidecar.
ClaudeboxClient *claudeboxclient.Client
// WorkDir is the default working directory in the container.
WorkDir string
}
// NewHTTPSDLCTaskExecutor creates a new HTTP-based SDLC executor.
func NewHTTPSDLCTaskExecutor(cfg HTTPSDLCTaskExecutorConfig) *HTTPSDLCTaskExecutor {
if cfg.WorkDir == "" {
cfg.WorkDir = "/workspace"
}
return &HTTPSDLCTaskExecutor{
client: cfg.ClaudeboxClient,
workDir: cfg.WorkDir,
}
}
// Execute runs an SDLC task using the claudebox sidecar HTTP API.
func (e *HTTPSDLCTaskExecutor) Execute(ctx context.Context, task *domain.WorkTask) *domain.BuildResult {
start := time.Now()
log := logging.FromContext(ctx).WithWorker("http-sdlc-executor")
// Parse SDLC spec
spec, err := e.parseSpec(task.Spec)
if err != nil {
return &domain.BuildResult{
Success: false,
Error: fmt.Sprintf("invalid SDLC spec: %v", err),
DurationMs: time.Since(start).Milliseconds(),
}
}
log.Info("executing SDLC task via HTTP",
"task_id", task.ID,
logging.FieldProjectID, task.ProjectID,
"command", spec.Command,
)
// Clone repo to workspace
cloneResp, err := e.client.GitClone(ctx, spec.GitCloneURL, e.workDir)
if err != nil {
return &domain.BuildResult{
Success: false,
Error: fmt.Sprintf("git clone failed: %v", err),
DurationMs: time.Since(start).Milliseconds(),
}
}
if !cloneResp.Success {
return &domain.BuildResult{
Success: false,
Error: fmt.Sprintf("git clone failed: %s", cloneResp.Error),
DurationMs: time.Since(start).Milliseconds(),
}
}
// Run SDLC command
sdlcResp, err := e.client.RunSDLC(ctx, spec.Command, spec.Args, e.workDir)
if err != nil {
return &domain.BuildResult{
Success: false,
Error: fmt.Sprintf("sdlc command failed: %v", err),
DurationMs: time.Since(start).Milliseconds(),
}
}
if !sdlcResp.Success {
return &domain.BuildResult{
Success: false,
Error: fmt.Sprintf("sdlc command failed: %s", sdlcResp.Error),
Output: sdlcResp.Output,
DurationMs: time.Since(start).Milliseconds(),
}
}
result := &domain.BuildResult{
Success: true,
Output: sdlcResp.Output,
DurationMs: time.Since(start).Milliseconds(),
}
// Commit and push if enabled
if spec.AutoCommit {
commitMsg := fmt.Sprintf("sdlc: %s", spec.Command)
gitResp, err := e.client.GitCommitAndPush(ctx, commitMsg, spec.AutoPush, e.workDir)
if err != nil {
result.Success = false
result.Error = fmt.Sprintf("git operations failed: %v", err)
return result
}
if !gitResp.Success {
result.Success = false
result.Error = fmt.Sprintf("git operations failed: %s", gitResp.Error)
return result
}
if gitResp.HasChanges {
result.CommitSHA = gitResp.CommitSHA
result.FilesChanged = gitResp.FilesChanged
log.Info("SDLC changes committed",
"task_id", task.ID,
"commit", gitResp.CommitSHA,
"files", len(gitResp.FilesChanged),
"pushed", gitResp.Pushed,
)
}
}
log.Info("SDLC task completed",
"task_id", task.ID,
"command", spec.Command,
logging.FieldDuration, result.DurationMs,
)
return result
}
// httpSDLCSpec holds typed fields extracted from the task spec map.
type httpSDLCSpec struct {
Command string
Args []string
GitCloneURL string
AutoCommit bool
AutoPush bool
}
// parseSpec extracts typed SDLCTaskSpec fields from the generic map.
func (e *HTTPSDLCTaskExecutor) parseSpec(spec map[string]any) (*httpSDLCSpec, error) {
command, _ := spec["command"].(string)
if command == "" {
return nil, fmt.Errorf("command is required")
}
gitCloneURL, _ := spec["git_clone_url"].(string)
if gitCloneURL == "" {
return nil, fmt.Errorf("git_clone_url is required")
}
autoCommit, _ := spec["auto_commit"].(bool)
autoPush, _ := spec["auto_push"].(bool)
// Parse args (can be []string or []any from JSON)
var args []string
if argsRaw, ok := spec["args"]; ok {
switch v := argsRaw.(type) {
case []string:
args = v
case []any:
for _, a := range v {
if s, ok := a.(string); ok {
args = append(args, s)
}
}
}
}
return &httpSDLCSpec{
Command: command,
Args: args,
GitCloneURL: gitCloneURL,
AutoCommit: autoCommit,
AutoPush: autoPush,
}, nil
}

View File

@ -2,10 +2,11 @@
# Build and push rdev images to GitHub Container Registry # Build and push rdev images to GitHub Container Registry
# #
# Usage: # Usage:
# ./build-push.sh # Build both images with 'latest' tag # ./build-push.sh # Build all images with 'latest' tag
# ./build-push.sh v0.4.0 # Build both images with version tag # ./build-push.sh v0.4.0 # Build all images with version tag
# ./build-push.sh v0.4.0 claudebox # Build only claudebox image # ./build-push.sh v0.4.0 claudebox # Build only claudebox image
# ./build-push.sh v0.4.0 api # Build only api image # ./build-push.sh v0.4.0 api # Build only api image
# ./build-push.sh v0.4.0 worker # Build only worker image
set -e set -e
@ -61,6 +62,27 @@ build_api() {
echo "Pushed: $IMAGE_TAG" echo "Pushed: $IMAGE_TAG"
} }
build_worker() {
local IMAGE_TAG="$REGISTRY/rdev-worker:$VERSION"
echo "Building rdev-worker image..."
echo "Image: $IMAGE_TAG"
echo ""
# Build the image for linux/amd64
docker build --platform linux/amd64 \
-t "$IMAGE_TAG" \
-t "$REGISTRY/rdev-worker:latest" \
-f Dockerfile.worker \
.
echo ""
echo "Pushing rdev-worker to GitHub Container Registry..."
docker push "$IMAGE_TAG"
docker push "$REGISTRY/rdev-worker:latest"
echo "Pushed: $IMAGE_TAG"
}
case "$TARGET" in case "$TARGET" in
claudebox) claudebox)
build_claudebox build_claudebox
@ -68,16 +90,23 @@ case "$TARGET" in
api) api)
build_api build_api
;; ;;
worker)
build_worker
;;
all) all)
build_claudebox build_claudebox
echo "" echo ""
echo "---" echo "---"
echo "" echo ""
build_api build_api
echo ""
echo "---"
echo ""
build_worker
;; ;;
*) *)
echo "Unknown target: $TARGET" echo "Unknown target: $TARGET"
echo "Usage: $0 [version] [claudebox|api|all]" echo "Usage: $0 [version] [claudebox|api|worker|all]"
exit 1 exit 1
;; ;;
esac esac