Compare commits

..

No commits in common. "83b5d1ebb409497e80c82abab8ba25e007b68152" and "d7a6f3759344c09bf3a733abbac7ab8a0a8e9036" have entirely different histories.

24 changed files with 169 additions and 2403 deletions

View File

@ -14,7 +14,7 @@ Quick reference for rdev concepts and facts.
| Webhooks | [services/webhooks.md](./services/webhooks.md) | High | 2025-01 | Event subscriptions and delivery |
| **Worker Infrastructure** |
| Work Queue | [services/work-queue.md](./services/work-queue.md) | High | 2025-01 | Task queue for worker pool |
| Worker Pool | [services/worker-pool.md](./services/worker-pool.md) | High | 2026-02 | Standalone worker pods with claudebox sidecar, HTTP polling |
| Worker Pool | [services/worker-pool.md](./services/worker-pool.md) | High | 2026-01 | Embedded work executor with queue maintenance and metrics |
| External Health | [services/external-health.md](./services/external-health.md) | High | 2026-02 | Background health monitoring of registry, CI, git |
| CI Provider | [services/ci-provider.md](./services/ci-provider.md) | High | 2025-01 | Woodpecker auto-activation |
| DNS / Cloudflare | [services/dns-cloudflare.md](./services/dns-cloudflare.md) | High | 2026-01 | Domain management for threesix.ai |

View File

@ -1,193 +1,79 @@
# Worker Pool
**Last Updated:** 2026-02-06
**Last Updated:** 2026-01-31
**Confidence:** High
## Summary
Distributed task execution system where standalone worker pods poll rdev-api for tasks and execute them via a claudebox sidecar. Supports horizontal scaling by adding more worker pods.
Shared worker pool that executes build tasks for any project. Currently runs as an embedded WorkExecutor daemon inside rdev-api. Workers register with the worker registry, poll the work queue for tasks, execute Claude Code in pods via kubectl exec. Post-build git operations (commit/push) are programmatic via PodGitOperations, not LLM-driven.
**Key Facts:**
- **Architecture:** Pull-based polling (not push/websocket)
- **Sidecar pattern:** Worker + claudebox in same pod, communicate via localhost HTTP
- **Atomic dequeue:** PostgreSQL `FOR UPDATE SKIP LOCKED` prevents duplicate claims
- **Task types:** `build` (Claude Code prompts), `sdlc` (SDLC commands)
- **Scaling:** Add replicas to handle more concurrent tasks
- **Resilience:** Stale workers marked offline, stuck tasks re-queued automatically
- **LLM vs rdev boundary:** Claude writes code; rdev handles git ops programmatically (no LLM for runbook tasks)
- Embedded WorkExecutor daemon runs inside rdev-api process
- Workers poll work queue every 5 seconds, heartbeat every 30 seconds
- Stale workers (no heartbeat for 2 minutes) automatically marked offline by QueueMaintenance
- Stale tasks (running >30 min without completion) automatically requeued
- Old tasks (>7 days) automatically cleaned up
- Queue depth and worker counts exported as Prometheus metrics
- Future: external worker binary for separate pod deployment
## File Pointers
**File Pointers:**
- Domain: `internal/domain/worker.go` (Worker, WorkerStatus)
- Domain: `internal/domain/build.go` (BuildSpec, BuildResult)
- Port: `internal/port/worker_registry.go` (WorkerRegistry interface)
- Port: `internal/port/build_audit.go` (BuildAudit interface)
- Adapter: `internal/adapter/postgres/worker_registry.go`
- Adapter: `internal/adapter/postgres/build_audit.go`
- Service: `internal/service/worker_service.go`
- Service: `internal/service/build_service.go`
- Executor: `internal/worker/work_executor.go` (poll loop, heartbeat, task routing)
- Executor: `internal/worker/build_executor.go` (BuildSpec→AgentRequest)
- Git: `internal/worker/pod_git_operations.go` (post-build commit/push via kubectl exec)
- Maintenance: `internal/worker/queue_maintenance.go` (stale recovery, cleanup, metrics)
- Handler: `internal/handlers/workers.go` (REST API for workers)
- Handler: `internal/handlers/builds.go` (REST API for builds)
- Handler: `internal/handlers/create_and_build.go` (combined create+build)
- Migration: `internal/db/migrations/012_worker_registry.sql`
### Standalone Worker Binary
- **Entry:** `cmd/rdev-worker/main.go` - Main binary, registration, heartbeat, poll loop
- **API Client:** `internal/worker/api_client.go` - HTTP client to rdev-api
- **Build Executor:** `internal/worker/http_build_executor.go` - Execute builds via claudebox
- **SDLC Executor:** `internal/worker/http_sdlc_executor.go` - Execute SDLC tasks via claudebox
## Worker Lifecycle (Embedded)
### Claudebox Sidecar Client
- **Client:** `internal/adapter/claudebox/client.go` - HTTP client to claudebox sidecar
- **Endpoints:** `/health`, `/execute`, `/git/clone`, `/git/commit-and-push`, `/sdlc`
### rdev-api Server-Side
- **Handlers:** `internal/handlers/workers.go` - `/workers/*` endpoints
- **Service:** `internal/service/worker_service.go` - Claim, complete, fail logic
- **Registry:** `internal/adapter/postgres/worker_registry.go` - Worker state persistence
- **Queue:** `internal/adapter/postgres/work_queue.go` - Task queue with atomic dequeue
### Domain
- **Worker:** `internal/domain/worker.go` - Worker, WorkerStatus
- **Task:** `internal/domain/work.go` - WorkTask, WorkTaskType, WorkTaskStatus
- **Build:** `internal/domain/build.go` - BuildSpec, BuildResult
### Kubernetes
- **Deployment:** `deployments/k8s/base/rdev-worker.yaml` - Worker + claudebox pod spec
## Architecture
```
┌─────────────────────┐ HTTP Polling (5s) ┌──────────────────────────┐
│ rdev-api │◄────────────────────────────────►│ Worker Pod │
│ │ │ ┌─────────┐ ┌─────────┐ │
│ POST /workers/register ← Register at startup │ │ worker │→│claudebox│ │
│ POST /workers/{id}/heartbeat ← Every 30s │ └─────────┘ └─────────┘ │
│ POST /workers/{id}/claim ← Poll for tasks │ ↓ HTTP localhost │
│ POST /workers/{id}/complete/{taskId} ← Success │ Claude Code execution │
│ POST /workers/{id}/fail/{taskId} ← Failure └──────────────────────────┘
│ │
│ PostgreSQL │
│ ├─ workers │ (worker registry)
│ ├─ work_queue │ (task queue)
│ └─ build_audit │ (execution history)
└─────────────────────┘
```
## Worker Lifecycle
1. **Register:** Worker pod starts → `POST /workers/register` with ID, hostname, capabilities
2. **Heartbeat:** Every 30s → `POST /workers/{id}/heartbeat` to stay alive
3. **Poll:** Every 5s → `POST /workers/{id}/claim` to get next task
4. **Execute:** Call claudebox sidecar HTTP API to run Claude Code / SDLC commands
5. **Report:** `POST /workers/{id}/complete/{taskId}` or `/fail/{taskId}` with results
6. **Shutdown:** Graceful wait for in-flight tasks via `sync.WaitGroup`
1. rdev-api starts → WorkExecutor registers as worker in registry
2. Heartbeat loop: every 30s sends heartbeat via WorkerService
3. Poll loop: every 5s dequeues next task from work queue
4. BuildExecutor: executes CodeAgent in pod, then programmatically commits/pushes if auto_commit
5. Reports completion with BuildResult via WorkerService
6. Graceful shutdown: deregisters worker on rdev-api stop
## Worker Statuses
| Status | Meaning |
|--------|---------|
| `idle` | Ready to claim new tasks |
| `busy` | Currently executing a task |
| `draining` | Not accepting new tasks (pre-shutdown) |
| `offline` | Missed heartbeat threshold (>90s) |
## Task Types
### Build Tasks (`WorkTaskTypeBuild`)
Execute Claude Code prompts with optional git operations.
**Spec:**
```json
{
"prompt": "Build a React app with...",
"auto_commit": true,
"auto_push": false,
"git_clone_url": "https://gitea.../repo.git"
}
```
**Execution Flow:**
1. Clone repo via `claudebox /git/clone`
2. Execute prompt via `claudebox /execute` (streaming)
3. Commit/push via `claudebox /git/commit-and-push`
### SDLC Tasks (`WorkTaskTypeSDLC`)
Execute SDLC CLI commands.
**Spec:**
```json
{
"command": "feature",
"args": ["init", "feature-name"],
"git_clone_url": "https://gitea.../repo.git"
}
```
**Execution Flow:**
1. Clone repo via `claudebox /git/clone`
2. Run SDLC command via `claudebox /sdlc`
3. Commit/push changes
- `idle` - available for new tasks
- `busy` - currently executing a task
- `draining` - not accepting new tasks (pre-shutdown)
- `offline` - missed heartbeat threshold
## API Endpoints
| Method | Path | Description |
|--------|------|-------------|
| POST | `/workers/register` | Register new worker |
| POST | `/workers/{id}/heartbeat` | Keep worker alive |
| POST | `/workers/{id}/claim` | Claim next available task (204 if none) |
| POST | `/workers/{id}/complete/{taskId}` | Report successful completion |
| POST | `/workers/{id}/fail/{taskId}` | Report failure |
| GET | `/workers` | List all workers |
| GET | `/workers/{id}` | Get worker details |
| POST | `/workers/{id}/drain` | Set worker to draining |
## Kubernetes Deployment
```yaml
# deployments/k8s/base/rdev-worker.yaml
spec:
replicas: 1 # Scale by increasing
strategy:
type: RollingUpdate # RWX PVC enables multi-pod mounts
rollingUpdate:
maxSurge: 2
maxUnavailable: 0
containers:
- name: worker
image: registry.threesix.ai/rdev/worker:latest
env:
- RDEV_API_URL: http://rdev-api.rdev.svc.cluster.local:8080
- CLAUDEBOX_URL: http://localhost:8080
- WORKER_POLL_INTERVAL: 5s
- WORKER_HEARTBEAT_INTERVAL: 30s
- WORKER_TASK_TIMEOUT: 15m
- name: claudebox
image: registry.threesix.ai/rdev/claudebox:latest
volumeMounts:
- /workspace (EmptyDir)
- /root/.claude (RWX PVC - shared Claude auth)
```
**Storage:** The `claudebox-claude-config` PVC uses `ReadWriteMany` (RWX) access mode with Longhorn NFS, allowing multiple worker pods to share Claude OAuth credentials.
## Error Classification
Failed tasks are classified for smart retry logic:
| Code | Trigger | Retryable |
|------|---------|-----------|
| `RATE_LIMITED` | "rate limit", "quota exceeded" | Yes (with backoff) |
| `AUTH_FAILED` | "unauthorized", "invalid api key" | No |
| `TIMEOUT` | "context deadline exceeded" | Yes |
| `AGENT_ERROR` | Generic error | Yes (limited retries) |
| GET | `/workers` | List all workers with status summary |
| GET | `/workers/{workerId}` | Get worker details |
| POST | `/workers/{workerId}/drain` | Set worker to draining |
| POST | `/projects/{id}/builds` | Start build for project |
| GET | `/projects/{id}/builds` | List builds for project |
| GET | `/builds/{taskId}` | Get build status |
| POST | `/project/create-and-build` | Create project + start build |
## Queue Maintenance
Background goroutine in rdev-api:
- **Stale worker marking:** Workers without heartbeat >90s → `offline`
- **Stale task recovery:** Tasks running >30m without completion → re-queued
- **Old task cleanup:** Completed/failed tasks >7 days → deleted
- **Metrics refresh:** Queue depth and worker counts → Prometheus
The QueueMaintenance worker runs inside rdev-api alongside the WorkExecutor:
- **Stale task recovery** (every 1m): Requeues tasks running >30m without completion. Also syncs build_audit status to "pending" so API correctly reflects requeued state.
- **Stale worker marking** (every 1m): Marks workers offline after 2m without heartbeat
- **Old task cleanup** (every 1m): Removes completed/failed/cancelled tasks >7 days old
- **Metrics refresh** (every 15s): Updates Prometheus gauges for queue depth and worker counts
## Graceful Shutdown
Worker uses `sync.WaitGroup` to track in-flight tasks:
1. Receive SIGTERM/SIGINT
2. Cancel context (stops polling)
3. Wait for WaitGroup with timeout (`WORKER_TASK_TIMEOUT`)
4. Log success or timeout warning
**Build Audit Sync:** When stale tasks are requeued, both `work_queue` and `build_audit` tables are updated atomically. This prevents builds from appearing stuck in "running" when the underlying task has been requeued for retry due to worker death.
## Related Topics
- [Work Queue](./work-queue.md) - Task queue implementation
- [Build Orchestration](../features/build-orchestration.md) - Build API and specs
- [SDLC Orchestration](./sdlc.md) - SDLC task integration
- [Work Queue](./work-queue.md)
- [Build Orchestration](../features/build-orchestration.md)

View File

@ -423,7 +423,7 @@ func main() {
agentsHandler := handlers.NewAgentsHandler(agentRegistry)
// Initialize worker pool handlers
workersHandler := handlers.NewWorkersHandler(workerService).WithWorkService(workService)
workersHandler := handlers.NewWorkersHandler(workerService)
buildsHandler := handlers.NewBuildsHandler(buildService)
createAndBuildHandler := handlers.NewCreateAndBuildHandler(projectInfraService, buildService)

View File

@ -1,536 +0,0 @@
name: full-lifecycle
description: "Slack Path 5: The Full Lifecycle. Tests all 10 SDLC phases with explicit artifact approvals."
version: 1
vars:
project_name: ""
feature_slug: "user-preferences"
feature_title: "User Preferences API"
steps:
# ============================================================
# INFRASTRUCTURE
# ============================================================
create-project:
action: api
method: POST
endpoint: /project
body:
name: "{{ .vars.project_name }}"
description: "Slack Path 5: Full SDLC Lifecycle"
outputs:
- project_id: .data.name
- domain: .data.domain
add-db:
description: Add database for preferences storage
depends_on: [create-project]
on_error: continue
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/components"
body:
type: postgres
name: "main-db"
add-service:
description: Add API service
depends_on: [add-db]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/components"
body:
type: service
name: "preferences-api"
wait-init:
depends_on: [add-service]
action: wait_pipeline
project_id: "{{ .outputs.create-project.project_id }}"
# ============================================================
# PHASE 1: DRAFT
# Create feature (starts in draft phase)
# ============================================================
create-feature:
description: "Create feature in draft phase"
depends_on: [wait-init]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/sdlc/features"
body:
slug: "{{ .vars.feature_slug }}"
title: "{{ .vars.feature_title }}"
outputs:
- feature_phase: .data.phase
verify-draft:
description: "Verify feature is in draft phase"
depends_on: [create-feature]
action: shell
command: |
PHASE="{{ .outputs.create-feature.feature_phase }}"
if [ "$PHASE" == "draft" ]; then
echo "Feature created in draft phase"
exit 0
else
echo "Expected draft, got $PHASE"
exit 1
fi
# ============================================================
# PHASE 2: DRAFT → SPECIFIED
# Agent writes spec, API approves, transition
# ============================================================
write-spec:
description: "Agent writes the spec artifact"
depends_on: [verify-draft]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/builds"
body:
prompt: "/spec-feature {{ .vars.feature_slug }} --requirements 'CRUD API for user preferences. GET/PUT /preferences/{user_id}. Preferences are key-value pairs stored in DB. Support theme, language, notifications settings.'"
auto_commit: true
auto_push: true
git_clone_url: "https://git.threesix.ai/jordan/{{ .outputs.create-project.project_id }}.git"
outputs:
- build_id: .data.task_id
wait-spec:
depends_on: [write-spec]
action: wait_build
build_id: "{{ .outputs.write-spec.build_id }}"
max_attempts: 60
poll_interval: 5
approve-spec:
description: "API approves the spec artifact"
depends_on: [wait-spec]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/sdlc/features/{{ .vars.feature_slug }}/artifacts/spec/approve"
body:
comment: "Spec approved by automation"
transition-to-specified:
description: "Transition from draft to specified"
depends_on: [approve-spec]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/sdlc/features/{{ .vars.feature_slug }}/transition"
body:
phase: "specified"
outputs:
- new_phase: .data.phase
# ============================================================
# PHASE 3: SPECIFIED → PLANNED
# Agent writes design, tasks, qa_plan. API approves each.
# ============================================================
write-design:
description: "Agent writes the design artifact"
depends_on: [transition-to-specified]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/builds"
body:
prompt: "/design-feature {{ .vars.feature_slug }}"
auto_commit: true
auto_push: true
git_clone_url: "https://git.threesix.ai/jordan/{{ .outputs.create-project.project_id }}.git"
outputs:
- build_id: .data.task_id
wait-design:
depends_on: [write-design]
action: wait_build
build_id: "{{ .outputs.write-design.build_id }}"
max_attempts: 60
poll_interval: 5
approve-design:
description: "API approves the design artifact"
depends_on: [wait-design]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/sdlc/features/{{ .vars.feature_slug }}/artifacts/design/approve"
body:
comment: "Design approved by automation"
write-tasks:
description: "Agent breaks down into tasks"
depends_on: [approve-design]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/builds"
body:
prompt: "/breakdown-feature {{ .vars.feature_slug }}"
auto_commit: true
auto_push: true
git_clone_url: "https://git.threesix.ai/jordan/{{ .outputs.create-project.project_id }}.git"
outputs:
- build_id: .data.task_id
wait-tasks:
depends_on: [write-tasks]
action: wait_build
build_id: "{{ .outputs.write-tasks.build_id }}"
max_attempts: 60
poll_interval: 5
approve-tasks:
description: "API approves the tasks artifact"
depends_on: [wait-tasks]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/sdlc/features/{{ .vars.feature_slug }}/artifacts/tasks/approve"
body:
comment: "Tasks approved by automation"
write-qa-plan:
description: "Agent writes QA plan"
depends_on: [approve-tasks]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/builds"
body:
prompt: "/create-qa-plan {{ .vars.feature_slug }}"
auto_commit: true
auto_push: true
git_clone_url: "https://git.threesix.ai/jordan/{{ .outputs.create-project.project_id }}.git"
outputs:
- build_id: .data.task_id
wait-qa-plan:
depends_on: [write-qa-plan]
action: wait_build
build_id: "{{ .outputs.write-qa-plan.build_id }}"
max_attempts: 60
poll_interval: 5
approve-qa-plan:
description: "API approves the QA plan artifact"
depends_on: [wait-qa-plan]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/sdlc/features/{{ .vars.feature_slug }}/artifacts/qa_plan/approve"
body:
comment: "QA plan approved by automation"
transition-to-planned:
description: "Transition from specified to planned"
depends_on: [approve-qa-plan]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/sdlc/features/{{ .vars.feature_slug }}/transition"
body:
phase: "planned"
outputs:
- new_phase: .data.phase
# ============================================================
# PHASE 4: PLANNED → READY
# No new artifacts needed, just transition
# ============================================================
transition-to-ready:
description: "Transition from planned to ready"
depends_on: [transition-to-planned]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/sdlc/features/{{ .vars.feature_slug }}/transition"
body:
phase: "ready"
outputs:
- new_phase: .data.phase
# ============================================================
# PHASE 5: READY → IMPLEMENTATION
# Agent implements all tasks
# ============================================================
implement-feature:
description: "Agent implements all tasks for the feature"
depends_on: [transition-to-ready]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/builds"
body:
prompt: "/implement-feature {{ .vars.feature_slug }}"
auto_commit: true
auto_push: true
git_clone_url: "https://git.threesix.ai/jordan/{{ .outputs.create-project.project_id }}.git"
outputs:
- build_id: .data.task_id
wait-implement:
depends_on: [implement-feature]
action: wait_build
build_id: "{{ .outputs.implement-feature.build_id }}"
max_attempts: 120
poll_interval: 5
wait-deploy-impl:
description: "Wait for implementation to deploy"
depends_on: [wait-implement]
action: wait_pipeline
project_id: "{{ .outputs.create-project.project_id }}"
transition-to-implementation:
description: "Transition to implementation phase (marks code complete)"
depends_on: [wait-deploy-impl]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/sdlc/features/{{ .vars.feature_slug }}/transition"
body:
phase: "implementation"
outputs:
- new_phase: .data.phase
# ============================================================
# PHASE 6: IMPLEMENTATION → REVIEW
# Agent writes code review
# ============================================================
write-review:
description: "Agent writes code review"
depends_on: [transition-to-implementation]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/builds"
body:
prompt: "/review-feature {{ .vars.feature_slug }}"
auto_commit: true
auto_push: true
git_clone_url: "https://git.threesix.ai/jordan/{{ .outputs.create-project.project_id }}.git"
outputs:
- build_id: .data.task_id
wait-review:
depends_on: [write-review]
action: wait_build
build_id: "{{ .outputs.write-review.build_id }}"
max_attempts: 60
poll_interval: 5
approve-review:
description: "API approves the review"
depends_on: [wait-review]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/sdlc/features/{{ .vars.feature_slug }}/artifacts/review/approve"
body:
comment: "Review approved by automation"
transition-to-review:
description: "Transition to review phase"
depends_on: [approve-review]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/sdlc/features/{{ .vars.feature_slug }}/transition"
body:
phase: "review"
outputs:
- new_phase: .data.phase
# ============================================================
# PHASE 7: REVIEW → AUDIT
# Agent writes security/architecture audit
# ============================================================
write-audit:
description: "Agent writes security audit"
depends_on: [transition-to-review]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/builds"
body:
prompt: "/audit-feature {{ .vars.feature_slug }}"
auto_commit: true
auto_push: true
git_clone_url: "https://git.threesix.ai/jordan/{{ .outputs.create-project.project_id }}.git"
outputs:
- build_id: .data.task_id
wait-audit:
depends_on: [write-audit]
action: wait_build
build_id: "{{ .outputs.write-audit.build_id }}"
max_attempts: 60
poll_interval: 5
approve-audit:
description: "API approves the audit"
depends_on: [wait-audit]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/sdlc/features/{{ .vars.feature_slug }}/artifacts/audit/approve"
body:
comment: "Audit approved by automation"
transition-to-audit:
description: "Transition to audit phase"
depends_on: [approve-audit]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/sdlc/features/{{ .vars.feature_slug }}/transition"
body:
phase: "audit"
outputs:
- new_phase: .data.phase
# ============================================================
# PHASE 8: AUDIT → QA
# Agent runs QA tests
# ============================================================
run-qa:
description: "Agent runs QA plan"
depends_on: [transition-to-audit]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/builds"
body:
prompt: "/run-qa {{ .vars.feature_slug }}"
auto_commit: true
auto_push: true
git_clone_url: "https://git.threesix.ai/jordan/{{ .outputs.create-project.project_id }}.git"
outputs:
- build_id: .data.task_id
wait-qa:
depends_on: [run-qa]
action: wait_build
build_id: "{{ .outputs.run-qa.build_id }}"
max_attempts: 60
poll_interval: 5
transition-to-qa:
description: "Transition to QA phase"
depends_on: [wait-qa]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/sdlc/features/{{ .vars.feature_slug }}/transition"
body:
phase: "qa"
outputs:
- new_phase: .data.phase
# ============================================================
# PHASE 9: QA → MERGE
# Merge feature branch to main
# ============================================================
merge-feature:
description: "Merge feature branch to main"
depends_on: [transition-to-qa]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/sdlc/features/{{ .vars.feature_slug }}/merge"
body:
strategy: "squash"
outputs:
- merge_commit: .data.commit_sha
transition-to-merge:
description: "Transition to merge phase"
depends_on: [merge-feature]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/sdlc/features/{{ .vars.feature_slug }}/transition"
body:
phase: "merge"
outputs:
- new_phase: .data.phase
# ============================================================
# PHASE 10: MERGE → RELEASED
# Archive the feature
# ============================================================
wait-final-deploy:
description: "Wait for merged code to deploy"
depends_on: [transition-to-merge]
action: wait_pipeline
project_id: "{{ .outputs.create-project.project_id }}"
archive-feature:
description: "Archive the completed feature"
depends_on: [wait-final-deploy]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/sdlc/features/{{ .vars.feature_slug }}/archive"
transition-to-released:
description: "Transition to released phase"
depends_on: [archive-feature]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/sdlc/features/{{ .vars.feature_slug }}/transition"
body:
phase: "released"
outputs:
- final_phase: .data.phase
# ============================================================
# VERIFICATION
# ============================================================
verify-service-running:
description: "Verify the preferences API is running"
depends_on: [transition-to-released]
action: shell
command: |
DOMAIN="{{ .outputs.create-project.domain }}"
HEALTH=$(curl -s "https://$DOMAIN/api/preferences-api/health" | jq -r '.data.status // empty')
if [ "$HEALTH" == "healthy" ]; then
echo "Service healthy"
exit 0
else
echo "Service not healthy: $HEALTH"
exit 1
fi
verify-preferences-api:
description: "Test CRUD operations on preferences"
depends_on: [verify-service-running]
on_error: continue
action: shell
command: |
DOMAIN="{{ .outputs.create-project.domain }}"
BASE_URL="https://$DOMAIN/api/preferences-api"
USER_ID="test-user-123"
# PUT preferences
echo "Setting preferences..."
PUT_RESP=$(curl -s -X PUT "$BASE_URL/preferences/$USER_ID" \
-H "Content-Type: application/json" \
-d '{"theme":"dark","language":"en","notifications":true}')
echo "PUT response: $PUT_RESP"
# GET preferences
echo "Getting preferences..."
GET_RESP=$(curl -s "$BASE_URL/preferences/$USER_ID")
echo "GET response: $GET_RESP"
# Verify theme is dark
THEME=$(echo "$GET_RESP" | jq -r '.theme // .data.theme // empty')
if [ "$THEME" == "dark" ]; then
echo "Preferences API working correctly"
exit 0
else
echo "Expected theme=dark, got: $THEME"
exit 1
fi
verify-lifecycle-complete:
description: "Verify feature reached released phase"
depends_on: [verify-preferences-api]
action: shell
command: |
FINAL_PHASE="{{ .outputs.transition-to-released.final_phase }}"
if [ "$FINAL_PHASE" == "released" ]; then
echo "SUCCESS: Feature completed full lifecycle (draft → released)"
echo "All 10 phases traversed with explicit approvals"
exit 0
else
echo "FAIL: Expected released, got $FINAL_PHASE"
exit 1
fi
teardown:
- action: api
method: DELETE
endpoint: "/project/{{ .outputs.create-project.project_id }}"

View File

@ -25,25 +25,6 @@ spec:
image: registry.threesix.ai/rdev/claudebox:latest
imagePullPolicy: Always
env:
# Claude Code Telemetry - exports to OTEL collector
- name: CLAUDE_CODE_ENABLE_TELEMETRY
value: "1"
- name: OTEL_METRICS_EXPORTER
value: "otlp"
- name: OTEL_LOGS_EXPORTER
value: "otlp"
- name: OTEL_EXPORTER_OTLP_PROTOCOL
value: "grpc"
- name: OTEL_EXPORTER_OTLP_ENDPOINT
value: "otel-collector.observability.svc.cluster.local:4317"
- name: OTEL_SERVICE_NAME
value: "claudebox-standalone"
- name: OTEL_METRIC_EXPORT_INTERVAL
value: "10000"
- name: OTEL_LOGS_EXPORT_INTERVAL
value: "5000"
resources:
requests:
cpu: "500m"

View File

@ -6,11 +6,9 @@ namespace: rdev
resources:
- namespace.yaml
# Storage classes (must be applied before PVCs)
- storageclass-rwx.yaml
# Shared worker claudebox (runs all project builds)
- pvc.yaml
- pvc-shared-claude.yaml
- claudebox.yaml
- configmaps.yaml

View File

@ -0,0 +1,29 @@
# Shared Claude credentials PVC
# v0.6 - All claudebox pods share this for auth
# Commands/skills/agents live in /workspace/.claude (per-project, in git)
#
# IMPORTANT: ReadWriteMany (RWX) requires Longhorn with NFS enabled.
# Verify with: kubectl get settings -n longhorn-system rwx-volume-fast-failover
# If RWX is not available, either:
# 1. Enable Longhorn NFS: kubectl apply -f longhorn-nfs-provisioner.yaml
# 2. Or use separate PVCs per pod (revert to per-project claude-config PVCs)
#
# RWX is needed because multiple claudebox pods mount this simultaneously
# to share Claude authentication credentials.
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: claudebox-shared-claude-config
namespace: rdev
labels:
app.kubernetes.io/name: claudebox
app.kubernetes.io/part-of: rdev
rdev.orchard9.ai/type: shared-config
spec:
accessModes:
- ReadWriteMany # Multiple pods can mount simultaneously
storageClassName: longhorn
resources:
requests:
storage: 1Gi

View File

@ -14,12 +14,6 @@ spec:
requests:
storage: 20Gi
---
# Claude config PVC - shared across claudebox and worker pods
# RWX (ReadWriteMany) allows multiple pods to mount simultaneously
# Contains Claude subscription OAuth credentials (~/.claude)
#
# IMPORTANT: Requires longhorn-rwx StorageClass (see storageclass-rwx.yaml)
# After recreating this PVC, re-authenticate with: claude login
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
@ -28,11 +22,10 @@ metadata:
labels:
app.kubernetes.io/name: claudebox
app.kubernetes.io/part-of: rdev
rdev.orchard9.ai/type: shared-config
spec:
accessModes:
- ReadWriteMany
storageClassName: longhorn-rwx
- ReadWriteOnce
storageClassName: longhorn
resources:
requests:
storage: 1Gi

View File

@ -10,13 +10,10 @@ metadata:
app.kubernetes.io/part-of: rdev
spec:
replicas: 1
# RollingUpdate enabled by RWX (ReadWriteMany) PVC for claude-config
# See: deployments/k8s/base/pvc.yaml and storageclass-rwx.yaml
# Recreate strategy required: claudebox-claude-config PVC is RWO (ReadWriteOnce)
# and cannot be attached to multiple pods simultaneously
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 2
maxUnavailable: 0
type: Recreate
selector:
matchLabels:
app: rdev-worker
@ -94,23 +91,6 @@ spec:
value: "rdev-worker"
- name: GIT_EMAIL
value: "worker@threesix.ai"
# Claude Code Telemetry - exports to OTEL collector
- name: CLAUDE_CODE_ENABLE_TELEMETRY
value: "1"
- name: OTEL_METRICS_EXPORTER
value: "otlp"
- name: OTEL_LOGS_EXPORTER
value: "otlp"
- name: OTEL_EXPORTER_OTLP_PROTOCOL
value: "grpc"
- name: OTEL_EXPORTER_OTLP_ENDPOINT
value: "otel-collector.observability.svc.cluster.local:4317"
- name: OTEL_SERVICE_NAME
value: "claudebox-worker"
- name: OTEL_METRIC_EXPORT_INTERVAL
value: "10000"
- name: OTEL_LOGS_EXPORT_INTERVAL
value: "5000"
ports:
- name: http

View File

@ -1,24 +0,0 @@
# RWX (ReadWriteMany) StorageClass for shared volumes
# Enables multiple pods to mount the same PVC simultaneously
# Used for: claudebox-claude-config (shared Claude auth credentials)
#
# Prerequisites:
# - Longhorn 1.4.0+ with NFS support
# - Verify: kubectl get settings -n longhorn-system | grep -i rwx
#
# If RWX is not available, enable it:
# kubectl patch -n longhorn-system settings rwx-volume-fast-failover --type merge -p '{"value":"true"}'
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: longhorn-rwx
labels:
app.kubernetes.io/part-of: rdev
provisioner: driver.longhorn.io
allowVolumeExpansion: true
reclaimPolicy: Retain
parameters:
numberOfReplicas: "2"
staleReplicaTimeout: "30"
nfsOptions: "vers=4.1,noresvport"

View File

@ -1,774 +0,0 @@
package claudebox
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"time"
)
func TestNewClient_DefaultTimeout(t *testing.T) {
client := NewClient(ClientConfig{
BaseURL: "http://localhost:8080",
})
if client.httpClient.Timeout != 10*time.Minute {
t.Errorf("expected default timeout 10m, got %v", client.httpClient.Timeout)
}
}
func TestNewClient_CustomTimeout(t *testing.T) {
client := NewClient(ClientConfig{
BaseURL: "http://localhost:8080",
Timeout: 5 * time.Minute,
})
if client.httpClient.Timeout != 5*time.Minute {
t.Errorf("expected timeout 5m, got %v", client.httpClient.Timeout)
}
}
func TestNewClient_TrimsTrailingSlash(t *testing.T) {
client := NewClient(ClientConfig{
BaseURL: "http://localhost:8080/",
})
if client.baseURL != "http://localhost:8080" {
t.Errorf("expected trailing slash trimmed, got %s", client.baseURL)
}
}
func TestHealth_Success(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
t.Errorf("expected GET, got %s", r.Method)
}
if r.URL.Path != "/health" {
t.Errorf("expected /health, got %s", r.URL.Path)
}
resp := HealthResponse{
Status: "healthy",
Timestamp: "2024-01-15T10:30:00Z",
WorkDir: "/workspace",
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(resp)
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
health, err := client.Health(context.Background())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if health.Status != "healthy" {
t.Errorf("expected status 'healthy', got %s", health.Status)
}
if health.WorkDir != "/workspace" {
t.Errorf("expected work_dir '/workspace', got %s", health.WorkDir)
}
}
func TestHealth_Unhealthy(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusServiceUnavailable)
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
_, err := client.Health(context.Background())
if err == nil {
t.Fatal("expected error, got nil")
}
if !strings.Contains(err.Error(), "health check returned status 503") {
t.Errorf("expected 503 error, got %v", err)
}
}
func TestHealth_NetworkError(t *testing.T) {
client := NewClient(ClientConfig{
BaseURL: "http://localhost:1",
Timeout: 100 * time.Millisecond,
})
_, err := client.Health(context.Background())
if err == nil {
t.Fatal("expected error, got nil")
}
if !strings.Contains(err.Error(), "health check:") {
t.Errorf("expected error wrapped with 'health check:', got %v", err)
}
}
func TestExecute_Success(t *testing.T) {
var receivedReq ExecuteRequest
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
t.Errorf("expected POST, got %s", r.Method)
}
if r.URL.Path != "/execute" {
t.Errorf("expected /execute, got %s", r.URL.Path)
}
if r.Header.Get("Content-Type") != "application/json" {
t.Errorf("expected Content-Type application/json, got %s", r.Header.Get("Content-Type"))
}
body, _ := io.ReadAll(r.Body)
_ = json.Unmarshal(body, &receivedReq)
resp := ExecuteResponse{
Success: true,
Output: "Task completed",
ExitCode: 0,
DurationMs: 5000,
SessionID: "session-123",
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(resp)
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
req := &ExecuteRequest{
Prompt: "Build the project",
AllowedTools: []string{"Bash", "Read", "Write"},
WorkingDir: "/workspace/project",
Timeout: 300,
Metadata: map[string]string{"task_id": "task-1"},
}
result, err := client.Execute(context.Background(), req)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !result.Success {
t.Error("expected success=true")
}
if result.Output != "Task completed" {
t.Errorf("expected output 'Task completed', got %s", result.Output)
}
if result.ExitCode != 0 {
t.Errorf("expected exit code 0, got %d", result.ExitCode)
}
// Verify request was serialized correctly
if receivedReq.Prompt != "Build the project" {
t.Errorf("expected prompt 'Build the project', got %s", receivedReq.Prompt)
}
if len(receivedReq.AllowedTools) != 3 {
t.Errorf("expected 3 allowed tools, got %d", len(receivedReq.AllowedTools))
}
}
func TestExecute_ErrorStatus(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(`{"error":"invalid prompt"}`))
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
_, err := client.Execute(context.Background(), &ExecuteRequest{Prompt: ""})
if err == nil {
t.Fatal("expected error, got nil")
}
if !strings.Contains(err.Error(), "execute returned status 400") {
t.Errorf("expected 400 error, got %v", err)
}
if !strings.Contains(err.Error(), "invalid prompt") {
t.Errorf("expected error body in message, got %v", err)
}
}
func TestExecute_MalformedResponse(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{invalid json`))
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
_, err := client.Execute(context.Background(), &ExecuteRequest{Prompt: "test"})
if err == nil {
t.Fatal("expected error, got nil")
}
if !strings.Contains(err.Error(), "decode response") {
t.Errorf("expected decode error, got %v", err)
}
}
func TestExecuteStream_Success(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
t.Errorf("expected POST, got %s", r.Method)
}
if r.URL.Path != "/execute/stream" {
t.Errorf("expected /execute/stream, got %s", r.URL.Path)
}
if r.Header.Get("Accept") != "text/event-stream" {
t.Errorf("expected Accept text/event-stream, got %s", r.Header.Get("Accept"))
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
flusher, ok := w.(http.Flusher)
if !ok {
t.Fatal("expected http.Flusher")
}
events := []StreamEvent{
{Type: "start", Timestamp: "2024-01-15T10:30:00Z"},
{Type: "output", Content: "Building...", Timestamp: "2024-01-15T10:30:01Z"},
{Type: "tool_call", ToolName: "Bash", Timestamp: "2024-01-15T10:30:02Z"},
{Type: "complete", Content: "Done", Timestamp: "2024-01-15T10:30:05Z"},
}
for _, event := range events {
data, _ := json.Marshal(event)
fmt.Fprintf(w, "data: %s\n\n", data)
flusher.Flush()
}
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
var receivedEvents []StreamEvent
var mu sync.Mutex
err := client.ExecuteStream(context.Background(), &ExecuteRequest{Prompt: "build"}, func(event StreamEvent) {
mu.Lock()
receivedEvents = append(receivedEvents, event)
mu.Unlock()
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(receivedEvents) != 4 {
t.Fatalf("expected 4 events, got %d", len(receivedEvents))
}
if receivedEvents[0].Type != "start" {
t.Errorf("expected first event type 'start', got %s", receivedEvents[0].Type)
}
if receivedEvents[1].Content != "Building..." {
t.Errorf("expected second event content 'Building...', got %s", receivedEvents[1].Content)
}
if receivedEvents[2].ToolName != "Bash" {
t.Errorf("expected third event tool name 'Bash', got %s", receivedEvents[2].ToolName)
}
}
func TestExecuteStream_SkipsMalformedEvents(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
flusher, _ := w.(http.Flusher)
// Valid event
event1, _ := json.Marshal(StreamEvent{Type: "start"})
fmt.Fprintf(w, "data: %s\n\n", event1)
flusher.Flush()
// Malformed JSON - should be skipped
fmt.Fprintf(w, "data: {invalid json}\n\n")
flusher.Flush()
// Empty data - should be skipped
fmt.Fprintf(w, "data: \n\n")
flusher.Flush()
// Non-data line - should be skipped
fmt.Fprintf(w, "event: ping\n\n")
flusher.Flush()
// Valid event
event2, _ := json.Marshal(StreamEvent{Type: "complete"})
fmt.Fprintf(w, "data: %s\n\n", event2)
flusher.Flush()
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
var receivedEvents []StreamEvent
var mu sync.Mutex
err := client.ExecuteStream(context.Background(), &ExecuteRequest{Prompt: "test"}, func(event StreamEvent) {
mu.Lock()
receivedEvents = append(receivedEvents, event)
mu.Unlock()
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// Should only receive the 2 valid events
if len(receivedEvents) != 2 {
t.Fatalf("expected 2 events (malformed skipped), got %d", len(receivedEvents))
}
if receivedEvents[0].Type != "start" {
t.Errorf("expected first event 'start', got %s", receivedEvents[0].Type)
}
if receivedEvents[1].Type != "complete" {
t.Errorf("expected second event 'complete', got %s", receivedEvents[1].Type)
}
}
func TestExecuteStream_ErrorStatus(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(`{"error":"agent unavailable"}`))
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
err := client.ExecuteStream(context.Background(), &ExecuteRequest{Prompt: "test"}, func(event StreamEvent) {
t.Error("handler should not be called on error")
})
if err == nil {
t.Fatal("expected error, got nil")
}
if !strings.Contains(err.Error(), "execute stream returned status 500") {
t.Errorf("expected 500 error, got %v", err)
}
}
func TestExecuteStream_ContextCanceledBeforeRequest(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
t.Error("handler should not be called when context is already canceled")
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
// Cancel context before making request
ctx, cancel := context.WithCancel(context.Background())
cancel()
err := client.ExecuteStream(ctx, &ExecuteRequest{Prompt: "test"}, func(event StreamEvent) {})
if err == nil {
t.Fatal("expected error, got nil")
}
// Should get a context canceled error
}
func TestGitClone_Success(t *testing.T) {
var receivedReq GitCloneRequest
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
t.Errorf("expected POST, got %s", r.Method)
}
if r.URL.Path != "/git/clone" {
t.Errorf("expected /git/clone, got %s", r.URL.Path)
}
body, _ := io.ReadAll(r.Body)
_ = json.Unmarshal(body, &receivedReq)
resp := GitCloneResponse{
Success: true,
Cloned: true,
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(resp)
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
result, err := client.GitClone(context.Background(), "https://github.com/example/repo.git", "/workspace")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !result.Success {
t.Error("expected success=true")
}
if !result.Cloned {
t.Error("expected cloned=true")
}
if receivedReq.CloneURL != "https://github.com/example/repo.git" {
t.Errorf("expected clone URL, got %s", receivedReq.CloneURL)
}
if receivedReq.WorkDir != "/workspace" {
t.Errorf("expected work dir '/workspace', got %s", receivedReq.WorkDir)
}
}
func TestGitClone_AlreadyExists(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
resp := GitCloneResponse{
Success: true,
Cloned: false, // Already existed, just updated
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(resp)
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
result, err := client.GitClone(context.Background(), "https://github.com/example/repo.git", "/workspace")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !result.Success {
t.Error("expected success=true")
}
if result.Cloned {
t.Error("expected cloned=false for existing repo")
}
}
func TestGitClone_ErrorStatus(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(`{"error":"invalid clone URL"}`))
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
_, err := client.GitClone(context.Background(), "invalid", "/workspace")
if err == nil {
t.Fatal("expected error, got nil")
}
if !strings.Contains(err.Error(), "git clone returned status 400") {
t.Errorf("expected 400 error, got %v", err)
}
}
func TestGitCommitAndPush_Success(t *testing.T) {
var receivedReq GitCommitAndPushRequest
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
t.Errorf("expected POST, got %s", r.Method)
}
if r.URL.Path != "/git/commit-and-push" {
t.Errorf("expected /git/commit-and-push, got %s", r.URL.Path)
}
body, _ := io.ReadAll(r.Body)
_ = json.Unmarshal(body, &receivedReq)
resp := GitCommitAndPushResponse{
Success: true,
HasChanges: true,
CommitSHA: "abc123def456",
FilesChanged: []string{"main.go", "go.mod"},
Pushed: true,
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(resp)
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
result, err := client.GitCommitAndPush(context.Background(), "feat: add feature", true, "/workspace")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !result.Success {
t.Error("expected success=true")
}
if !result.HasChanges {
t.Error("expected has_changes=true")
}
if result.CommitSHA != "abc123def456" {
t.Errorf("expected commit SHA abc123def456, got %s", result.CommitSHA)
}
if !result.Pushed {
t.Error("expected pushed=true")
}
if len(result.FilesChanged) != 2 {
t.Errorf("expected 2 files changed, got %d", len(result.FilesChanged))
}
// Verify request
if receivedReq.Message != "feat: add feature" {
t.Errorf("expected message 'feat: add feature', got %s", receivedReq.Message)
}
if !receivedReq.Push {
t.Error("expected push=true in request")
}
}
func TestGitCommitAndPush_NoChanges(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
resp := GitCommitAndPushResponse{
Success: true,
HasChanges: false,
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(resp)
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
result, err := client.GitCommitAndPush(context.Background(), "test", false, "/workspace")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !result.Success {
t.Error("expected success=true")
}
if result.HasChanges {
t.Error("expected has_changes=false")
}
if result.CommitSHA != "" {
t.Errorf("expected empty commit SHA, got %s", result.CommitSHA)
}
}
func TestGitCommitAndPush_ErrorStatus(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(`{"error":"git push failed"}`))
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
_, err := client.GitCommitAndPush(context.Background(), "test", true, "/workspace")
if err == nil {
t.Fatal("expected error, got nil")
}
if !strings.Contains(err.Error(), "git commit returned status 500") {
t.Errorf("expected 500 error, got %v", err)
}
}
func TestGitStatus_Success(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
t.Errorf("expected GET, got %s", r.Method)
}
if r.URL.Path != "/git/status" {
t.Errorf("expected /git/status, got %s", r.URL.Path)
}
if r.URL.Query().Get("work_dir") != "/workspace/project" {
t.Errorf("expected work_dir query param, got %s", r.URL.Query().Get("work_dir"))
}
resp := GitStatusResponse{
IsRepo: true,
HasChanges: true,
ChangedFiles: []string{"main.go", "README.md"},
Branch: "feature/test",
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(resp)
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
result, err := client.GitStatus(context.Background(), "/workspace/project")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !result.IsRepo {
t.Error("expected is_repo=true")
}
if !result.HasChanges {
t.Error("expected has_changes=true")
}
if result.Branch != "feature/test" {
t.Errorf("expected branch 'feature/test', got %s", result.Branch)
}
if len(result.ChangedFiles) != 2 {
t.Errorf("expected 2 changed files, got %d", len(result.ChangedFiles))
}
}
func TestGitStatus_EmptyWorkDir(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// work_dir should not be in query when empty
if r.URL.Query().Get("work_dir") != "" {
t.Errorf("expected empty work_dir, got %s", r.URL.Query().Get("work_dir"))
}
resp := GitStatusResponse{IsRepo: true}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(resp)
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
_, err := client.GitStatus(context.Background(), "")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
}
func TestGitStatus_ErrorStatus(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
_, _ = w.Write([]byte(`{"error":"not a git repository"}`))
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
_, err := client.GitStatus(context.Background(), "/workspace")
if err == nil {
t.Fatal("expected error, got nil")
}
if !strings.Contains(err.Error(), "git status returned status 404") {
t.Errorf("expected 404 error, got %v", err)
}
}
func TestRunSDLC_Success(t *testing.T) {
var receivedReq SDLCRequest
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
t.Errorf("expected POST, got %s", r.Method)
}
if r.URL.Path != "/sdlc" {
t.Errorf("expected /sdlc, got %s", r.URL.Path)
}
body, _ := io.ReadAll(r.Body)
_ = json.Unmarshal(body, &receivedReq)
resp := SDLCResponse{
Success: true,
Output: "Feature started successfully",
Data: json.RawMessage(`{"feature_id":"feat-123"}`),
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(resp)
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
result, err := client.RunSDLC(context.Background(), "start", []string{"--name", "test-feature"}, "/workspace")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !result.Success {
t.Error("expected success=true")
}
if result.Output != "Feature started successfully" {
t.Errorf("expected output message, got %s", result.Output)
}
if result.Data == nil {
t.Error("expected data to be set")
}
// Verify request
if receivedReq.Command != "start" {
t.Errorf("expected command 'start', got %s", receivedReq.Command)
}
if len(receivedReq.Args) != 2 {
t.Errorf("expected 2 args, got %d", len(receivedReq.Args))
}
if receivedReq.WorkDir != "/workspace" {
t.Errorf("expected work dir '/workspace', got %s", receivedReq.WorkDir)
}
}
func TestRunSDLC_CommandFailed(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
resp := SDLCResponse{
Success: false,
Output: "Command output before failure",
Error: "validation failed: missing required field",
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(resp)
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
result, err := client.RunSDLC(context.Background(), "validate", nil, "/workspace")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if result.Success {
t.Error("expected success=false")
}
if result.Error != "validation failed: missing required field" {
t.Errorf("expected error message, got %s", result.Error)
}
}
func TestRunSDLC_ErrorStatus(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(`{"error":"sdlc binary not found"}`))
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
_, err := client.RunSDLC(context.Background(), "status", nil, "/workspace")
if err == nil {
t.Fatal("expected error, got nil")
}
if !strings.Contains(err.Error(), "sdlc returned status 500") {
t.Errorf("expected 500 error, got %v", err)
}
}
func TestRunSDLC_MalformedResponse(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{invalid`))
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
_, err := client.RunSDLC(context.Background(), "status", nil, "/workspace")
if err == nil {
t.Fatal("expected error, got nil")
}
if !strings.Contains(err.Error(), "decode response") {
t.Errorf("expected decode error, got %v", err)
}
}

View File

@ -97,63 +97,6 @@ func (r *WorkQueueRepository) GetTask(ctx context.Context, taskID string) (*doma
return &task, nil
}
// List returns all tasks with optional status filter and pagination.
func (r *WorkQueueRepository) List(ctx context.Context, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
opts.Normalize()
// Build optional WHERE clause
whereClause := ""
var args []any
argNum := 1
if status != nil {
whereClause = fmt.Sprintf("WHERE status = $%d", argNum)
args = append(args, string(*status))
argNum++
}
// Get total count
countQuery := "SELECT COUNT(*) FROM work_queue " + whereClause
var total int64
if err := r.db.QueryRowContext(ctx, countQuery, args...).Scan(&total); err != nil {
return nil, fmt.Errorf("count work tasks: %w", err)
}
// Build paginated query
query := fmt.Sprintf(`
SELECT id, project_id, task_type, task_spec, status, priority, worker_id,
callback_url, created_at, started_at, completed_at, result, error,
retry_count, max_retries, error_code
FROM work_queue
%s
ORDER BY created_at DESC
LIMIT $%d OFFSET $%d
`, whereClause, argNum, argNum+1)
args = append(args, opts.Limit, opts.Offset)
rows, err := r.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("list work tasks: %w", err)
}
defer func() { _ = rows.Close() }()
var tasks []*domain.WorkTask
for rows.Next() {
task, err := r.scanTask(rows)
if err != nil {
return nil, err
}
tasks = append(tasks, task)
}
return &domain.WorkListResult{
Tasks: tasks,
Total: total,
Limit: opts.Limit,
Offset: opts.Offset,
}, nil
}
// ListByProject returns tasks for a project with optional status filter and pagination.
func (r *WorkQueueRepository) ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
// Normalize pagination options

View File

@ -80,10 +80,6 @@ func (m *mockWorkQueue) GetTask(ctx context.Context, taskID string) (*domain.Wor
return task, nil
}
func (m *mockWorkQueue) List(ctx context.Context, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
return &domain.WorkListResult{}, nil
}
func (m *mockWorkQueue) ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
return &domain.WorkListResult{}, nil
}

View File

@ -14,10 +14,6 @@ WORKDIR /app
COPY pkg/ ./pkg/
COPY services/{{COMPONENT_NAME}}/ ./services/{{COMPONENT_NAME}}/
# Download dependencies (populates go.sum if empty)
RUN cd pkg && go mod download
RUN cd services/{{COMPONENT_NAME}} && go mod download
# Build from the service directory (uses replace directive for ../pkg)
RUN cd services/{{COMPONENT_NAME}} && CGO_ENABLED=0 go build -o /{{COMPONENT_NAME}} ./cmd/server

View File

@ -14,10 +14,6 @@ WORKDIR /app
COPY pkg/ ./pkg/
COPY workers/{{COMPONENT_NAME}}/ ./workers/{{COMPONENT_NAME}}/
# Download dependencies (populates go.sum if empty)
RUN cd pkg && go mod download
RUN cd workers/{{COMPONENT_NAME}} && go mod download
# Build from the worker directory (uses replace directive for ../pkg)
RUN cd workers/{{COMPONENT_NAME}} && CGO_ENABLED=0 go build -o /{{COMPONENT_NAME}} ./cmd/worker

View File

@ -38,11 +38,10 @@ func (h *WorkHandler) Mount(r api.Router) {
r.With(auth.RequireScope(auth.ScopeQueueWrite, auth.ScopeAdmin)).Post("/{taskId}/cancel", h.Cancel)
// Read operations
r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/tasks", h.ListTasks)
r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/stats", h.Stats)
r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/projects/{projectId}", h.ListByProject)
r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/{taskId}", h.GetTask)
r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/{taskId}/status", h.GetStatus)
r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/projects/{projectId}", h.ListByProject)
r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/stats", h.Stats)
})
}
@ -122,6 +121,65 @@ type DequeueWorkResponse struct {
Task *WorkTaskDTO `json:"task,omitempty"`
}
// WorkTaskDTO is the data transfer object for work tasks.
type WorkTaskDTO struct {
ID string `json:"id"`
ProjectID string `json:"project_id"`
Type string `json:"type"`
Spec map[string]any `json:"spec"`
Status string `json:"status"`
Priority int `json:"priority"`
WorkerID string `json:"worker_id,omitempty"`
CallbackURL string `json:"callback_url,omitempty"`
CreatedAt string `json:"created_at"`
StartedAt string `json:"started_at,omitempty"`
CompletedAt string `json:"completed_at,omitempty"`
Result *WorkResultDTO `json:"result,omitempty"`
Error string `json:"error,omitempty"`
RetryCount int `json:"retry_count"`
MaxRetries int `json:"max_retries"`
}
// WorkResultDTO is the data transfer object for work results.
type WorkResultDTO struct {
Output string `json:"output,omitempty"`
Artifacts map[string]string `json:"artifacts,omitempty"`
}
// toWorkTaskDTO converts a domain.WorkTask to a WorkTaskDTO.
func toWorkTaskDTO(t *domain.WorkTask) *WorkTaskDTO {
if t == nil {
return nil
}
dto := &WorkTaskDTO{
ID: t.ID,
ProjectID: t.ProjectID,
Type: string(t.Type),
Spec: t.Spec,
Status: string(t.Status),
Priority: t.Priority,
WorkerID: t.WorkerID,
CallbackURL: t.CallbackURL,
CreatedAt: t.CreatedAt.Format("2006-01-02T15:04:05Z07:00"),
Error: t.Error,
RetryCount: t.RetryCount,
MaxRetries: t.MaxRetries,
}
if t.StartedAt != nil {
dto.StartedAt = t.StartedAt.Format("2006-01-02T15:04:05Z07:00")
}
if t.CompletedAt != nil {
dto.CompletedAt = t.CompletedAt.Format("2006-01-02T15:04:05Z07:00")
}
if t.Result != nil {
dto.Result = &WorkResultDTO{
Output: t.Result.Output,
Artifacts: t.Result.Artifacts,
}
}
return dto
}
// Dequeue claims the next available task for a worker.
// POST /work/dequeue
func (h *WorkHandler) Dequeue(w http.ResponseWriter, r *http.Request) {
@ -283,58 +341,6 @@ func (h *WorkHandler) Cancel(w http.ResponseWriter, r *http.Request) {
})
}
// ListTasks returns all tasks with optional status filter and pagination.
// GET /work/tasks?status=running&limit=50&offset=0
func (h *WorkHandler) ListTasks(w http.ResponseWriter, r *http.Request) {
// Parse and validate optional status filter
var status *domain.WorkTaskStatus
if s := r.URL.Query().Get("status"); s != "" {
st := domain.WorkTaskStatus(s)
if !st.IsValid() {
api.WriteBadRequest(w, r, "invalid status filter: must be pending, running, completed, failed, or cancelled")
return
}
status = &st
}
// Parse pagination options
opts := domain.DefaultWorkListOptions()
if limitStr := r.URL.Query().Get("limit"); limitStr != "" {
limit, err := strconv.Atoi(limitStr)
if err != nil {
api.WriteBadRequest(w, r, "limit must be a valid integer")
return
}
opts.Limit = limit
}
if offsetStr := r.URL.Query().Get("offset"); offsetStr != "" {
offset, err := strconv.Atoi(offsetStr)
if err != nil {
api.WriteBadRequest(w, r, "offset must be a valid integer")
return
}
opts.Offset = offset
}
result, err := h.workService.List(r.Context(), status, opts)
if err != nil {
api.WriteInternalError(w, r, "failed to list tasks")
return
}
dtos := make([]*WorkTaskDTO, len(result.Tasks))
for i, t := range result.Tasks {
dtos[i] = toWorkTaskDTO(t)
}
api.WriteSuccess(w, r, map[string]any{
"tasks": dtos,
"total": result.Total,
"limit": result.Limit,
"offset": result.Offset,
})
}
// ListByProject returns tasks for a project with pagination.
// GET /work/projects/{projectId}?status=pending&limit=50&offset=0
func (h *WorkHandler) ListByProject(w http.ResponseWriter, r *http.Request) {
@ -422,3 +428,21 @@ func (h *WorkHandler) Stats(w http.ResponseWriter, r *http.Request) {
api.WriteSuccess(w, r, resp)
}
// formatDuration formats a duration in a human-readable way.
func formatDuration(d interface{ Seconds() float64 }) string {
secs := d.Seconds()
if secs < 60 {
return fmt.Sprintf("%.0fs", secs)
}
mins := secs / 60
if mins < 60 {
return fmt.Sprintf("%.1fm", mins)
}
hours := mins / 60
if hours < 24 {
return fmt.Sprintf("%.1fh", hours)
}
days := hours / 24
return fmt.Sprintf("%.1fd", days)
}

View File

@ -1,85 +0,0 @@
// Package handlers provides HTTP handlers for the rdev API.
package handlers
import (
"fmt"
"github.com/orchard9/rdev/internal/domain"
)
// WorkTaskDTO is the data transfer object for work tasks.
type WorkTaskDTO struct {
ID string `json:"id"`
ProjectID string `json:"project_id"`
Type string `json:"type"`
Spec map[string]any `json:"spec"`
Status string `json:"status"`
Priority int `json:"priority"`
WorkerID string `json:"worker_id,omitempty"`
CallbackURL string `json:"callback_url,omitempty"`
CreatedAt string `json:"created_at"`
StartedAt string `json:"started_at,omitempty"`
CompletedAt string `json:"completed_at,omitempty"`
Result *WorkResultDTO `json:"result,omitempty"`
Error string `json:"error,omitempty"`
RetryCount int `json:"retry_count"`
MaxRetries int `json:"max_retries"`
}
// WorkResultDTO is the data transfer object for work results.
type WorkResultDTO struct {
Output string `json:"output,omitempty"`
Artifacts map[string]string `json:"artifacts,omitempty"`
}
// toWorkTaskDTO converts a domain.WorkTask to a WorkTaskDTO.
func toWorkTaskDTO(t *domain.WorkTask) *WorkTaskDTO {
if t == nil {
return nil
}
dto := &WorkTaskDTO{
ID: t.ID,
ProjectID: t.ProjectID,
Type: string(t.Type),
Spec: t.Spec,
Status: string(t.Status),
Priority: t.Priority,
WorkerID: t.WorkerID,
CallbackURL: t.CallbackURL,
CreatedAt: t.CreatedAt.Format("2006-01-02T15:04:05Z07:00"),
Error: t.Error,
RetryCount: t.RetryCount,
MaxRetries: t.MaxRetries,
}
if t.StartedAt != nil {
dto.StartedAt = t.StartedAt.Format("2006-01-02T15:04:05Z07:00")
}
if t.CompletedAt != nil {
dto.CompletedAt = t.CompletedAt.Format("2006-01-02T15:04:05Z07:00")
}
if t.Result != nil {
dto.Result = &WorkResultDTO{
Output: t.Result.Output,
Artifacts: t.Result.Artifacts,
}
}
return dto
}
// formatDuration formats a duration in a human-readable way.
func formatDuration(d interface{ Seconds() float64 }) string {
secs := d.Seconds()
if secs < 60 {
return fmt.Sprintf("%.0fs", secs)
}
mins := secs / 60
if mins < 60 {
return fmt.Sprintf("%.1fm", mins)
}
hours := mins / 60
if hours < 24 {
return fmt.Sprintf("%.1fh", hours)
}
days := hours / 24
return fmt.Sprintf("%.1fd", days)
}

View File

@ -123,39 +123,6 @@ func (m *mockWorkQueue) GetTask(ctx context.Context, taskID string) (*domain.Wor
return task, nil
}
func (m *mockWorkQueue) List(ctx context.Context, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
if m.err != nil {
return nil, m.err
}
opts.Normalize()
var tasks []*domain.WorkTask
for _, task := range m.tasks {
if status == nil || task.Status == *status {
tasks = append(tasks, task)
}
}
// Apply pagination
total := int64(len(tasks))
if opts.Offset >= len(tasks) {
tasks = nil
} else {
end := opts.Offset + opts.Limit
if end > len(tasks) {
end = len(tasks)
}
tasks = tasks[opts.Offset:end]
}
return &domain.WorkListResult{
Tasks: tasks,
Total: total,
Limit: opts.Limit,
Offset: opts.Offset,
}, nil
}
func (m *mockWorkQueue) ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
if m.err != nil {
return nil, m.err

View File

@ -40,10 +40,6 @@ type WorkQueue interface {
// GetTask retrieves a task by ID.
GetTask(ctx context.Context, taskID string) (*domain.WorkTask, error)
// List returns all tasks with optional status filter and pagination.
// Use for admin/debugging views across all projects.
List(ctx context.Context, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error)
// ListByProject returns tasks for a project with optional status filter and pagination.
ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error)

View File

@ -91,10 +91,6 @@ func (m *mockWorkQueue) GetTask(ctx context.Context, taskID string) (*domain.Wor
return task, nil
}
func (m *mockWorkQueue) List(ctx context.Context, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
return &domain.WorkListResult{}, nil
}
func (m *mockWorkQueue) ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
return &domain.WorkListResult{}, nil
}

View File

@ -205,11 +205,6 @@ func (s *WorkService) GetTask(ctx context.Context, taskID string) (*domain.WorkT
return s.queue.GetTask(ctx, taskID)
}
// List returns all tasks with optional status filter and pagination.
func (s *WorkService) List(ctx context.Context, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
return s.queue.List(ctx, status, opts)
}
// ListByProject returns tasks for a project with pagination.
func (s *WorkService) ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
return s.queue.ListByProject(ctx, projectID, status, opts)

View File

@ -1,584 +0,0 @@
package worker
import (
"context"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/orchard9/rdev/internal/domain"
)
func TestNewAPIClient_DefaultTimeout(t *testing.T) {
client := NewAPIClient(APIClientConfig{
BaseURL: "http://localhost:8080",
APIKey: "test-key",
})
if client.httpClient.Timeout != 30*time.Second {
t.Errorf("expected default timeout 30s, got %v", client.httpClient.Timeout)
}
}
func TestNewAPIClient_CustomTimeout(t *testing.T) {
client := NewAPIClient(APIClientConfig{
BaseURL: "http://localhost:8080",
APIKey: "test-key",
Timeout: 60 * time.Second,
})
if client.httpClient.Timeout != 60*time.Second {
t.Errorf("expected timeout 60s, got %v", client.httpClient.Timeout)
}
}
func TestRegister_Success(t *testing.T) {
var receivedReq RegisterRequest
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
t.Errorf("expected POST, got %s", r.Method)
}
if r.URL.Path != "/workers/register" {
t.Errorf("expected /workers/register, got %s", r.URL.Path)
}
if r.Header.Get("Content-Type") != "application/json" {
t.Errorf("expected Content-Type application/json, got %s", r.Header.Get("Content-Type"))
}
if r.Header.Get("X-API-Key") != "test-key" {
t.Errorf("expected X-API-Key test-key, got %s", r.Header.Get("X-API-Key"))
}
body, _ := io.ReadAll(r.Body)
_ = json.Unmarshal(body, &receivedReq)
w.WriteHeader(http.StatusCreated)
}))
defer server.Close()
client := NewAPIClient(APIClientConfig{
BaseURL: server.URL,
APIKey: "test-key",
})
err := client.Register(context.Background(), &RegisterRequest{
ID: "worker-1",
Hostname: "localhost",
Version: "v1.0.0",
Capabilities: []string{"build", "test"},
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if receivedReq.ID != "worker-1" {
t.Errorf("expected ID worker-1, got %s", receivedReq.ID)
}
if receivedReq.Hostname != "localhost" {
t.Errorf("expected hostname localhost, got %s", receivedReq.Hostname)
}
}
func TestRegister_ErrorStatus(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(`{"error":"invalid worker ID"}`))
}))
defer server.Close()
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
err := client.Register(context.Background(), &RegisterRequest{ID: "bad"})
if err == nil {
t.Fatal("expected error, got nil")
}
if want := "register returned status 400"; !containsSubstring(err.Error(), want) {
t.Errorf("expected error containing %q, got %v", want, err)
}
if !containsSubstring(err.Error(), "invalid worker ID") {
t.Errorf("expected error to contain response body, got %v", err)
}
}
func TestRegister_NetworkError(t *testing.T) {
client := NewAPIClient(APIClientConfig{
BaseURL: "http://localhost:1", // invalid port
Timeout: 100 * time.Millisecond,
})
err := client.Register(context.Background(), &RegisterRequest{ID: "test"})
if err == nil {
t.Fatal("expected error, got nil")
}
if !containsSubstring(err.Error(), "register:") {
t.Errorf("expected error wrapped with 'register:', got %v", err)
}
}
func TestRegister_ContextCanceled(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(1 * time.Second)
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
ctx, cancel := context.WithCancel(context.Background())
cancel() // cancel immediately
err := client.Register(ctx, &RegisterRequest{ID: "test"})
if err == nil {
t.Fatal("expected error, got nil")
}
}
func TestHeartbeat_Success(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
t.Errorf("expected POST, got %s", r.Method)
}
if r.URL.Path != "/workers/worker-123/heartbeat" {
t.Errorf("expected /workers/worker-123/heartbeat, got %s", r.URL.Path)
}
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
client := NewAPIClient(APIClientConfig{BaseURL: server.URL, APIKey: "key"})
err := client.Heartbeat(context.Background(), "worker-123")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
}
func TestHeartbeat_WorkerNotFound(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
_, _ = w.Write([]byte(`{"error":"worker not found"}`))
}))
defer server.Close()
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
err := client.Heartbeat(context.Background(), "unknown-worker")
if err == nil {
t.Fatal("expected error, got nil")
}
if !containsSubstring(err.Error(), "heartbeat returned status 404") {
t.Errorf("expected 404 error, got %v", err)
}
}
func TestClaimTask_Success(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
t.Errorf("expected POST, got %s", r.Method)
}
if r.URL.Path != "/workers/worker-1/claim" {
t.Errorf("expected /workers/worker-1/claim, got %s", r.URL.Path)
}
resp := ClaimTaskResponse{
Success: true,
Data: struct {
Task *WorkTaskData `json:"task"`
WorkerID string `json:"worker_id"`
}{
WorkerID: "worker-1",
Task: &WorkTaskData{
ID: "task-123",
ProjectID: "proj-1",
Type: "build",
Status: "running",
Priority: 5,
CreatedAt: "2024-01-15T10:30:00Z",
StartedAt: "2024-01-15T10:31:00Z",
RetryCount: 1,
MaxRetries: 3,
Spec: map[string]any{"prompt": "build it"},
},
},
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(resp)
}))
defer server.Close()
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
task, err := client.ClaimTask(context.Background(), "worker-1")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if task == nil {
t.Fatal("expected task, got nil")
}
if task.ID != "task-123" {
t.Errorf("expected task ID task-123, got %s", task.ID)
}
if task.ProjectID != "proj-1" {
t.Errorf("expected project ID proj-1, got %s", task.ProjectID)
}
if task.Type != domain.WorkTaskType("build") {
t.Errorf("expected type build, got %s", task.Type)
}
if task.Priority != 5 {
t.Errorf("expected priority 5, got %d", task.Priority)
}
if task.RetryCount != 1 {
t.Errorf("expected retry count 1, got %d", task.RetryCount)
}
}
func TestClaimTask_NoTasksAvailable(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}))
defer server.Close()
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
task, err := client.ClaimTask(context.Background(), "worker-1")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if task != nil {
t.Errorf("expected nil task, got %+v", task)
}
}
func TestClaimTask_NilTaskData(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
resp := ClaimTaskResponse{
Success: true,
Data: struct {
Task *WorkTaskData `json:"task"`
WorkerID string `json:"worker_id"`
}{
WorkerID: "worker-1",
Task: nil,
},
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(resp)
}))
defer server.Close()
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
_, err := client.ClaimTask(context.Background(), "worker-1")
if err == nil {
t.Fatal("expected error, got nil")
}
if !containsSubstring(err.Error(), "no task data") {
t.Errorf("expected 'no task data' error, got %v", err)
}
}
func TestClaimTask_MalformedJSON(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{invalid json}`))
}))
defer server.Close()
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
_, err := client.ClaimTask(context.Background(), "worker-1")
if err == nil {
t.Fatal("expected error, got nil")
}
if !containsSubstring(err.Error(), "decode response") {
t.Errorf("expected decode error, got %v", err)
}
}
func TestClaimTask_ErrorStatus(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(`{"error":"database error"}`))
}))
defer server.Close()
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
_, err := client.ClaimTask(context.Background(), "worker-1")
if err == nil {
t.Fatal("expected error, got nil")
}
if !containsSubstring(err.Error(), "claim task returned status 500") {
t.Errorf("expected 500 error, got %v", err)
}
}
func TestCompleteTask_Success(t *testing.T) {
var receivedReq CompleteTaskRequest
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
t.Errorf("expected POST, got %s", r.Method)
}
if r.URL.Path != "/workers/worker-1/complete/task-123" {
t.Errorf("expected /workers/worker-1/complete/task-123, got %s", r.URL.Path)
}
body, _ := io.ReadAll(r.Body)
_ = json.Unmarshal(body, &receivedReq)
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
client := NewAPIClient(APIClientConfig{BaseURL: server.URL, APIKey: "key"})
result := &domain.BuildResult{
Success: true,
Output: "Build successful",
CommitSHA: "abc123",
FilesChanged: []string{"main.go", "go.mod"},
DurationMs: 5000,
Artifacts: map[string]string{"deploy_url": "https://example.com"},
}
err := client.CompleteTask(context.Background(), "worker-1", "task-123", result)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !receivedReq.Success {
t.Error("expected success=true")
}
if receivedReq.Output != "Build successful" {
t.Errorf("expected output 'Build successful', got %s", receivedReq.Output)
}
if receivedReq.CommitSHA != "abc123" {
t.Errorf("expected commit SHA abc123, got %s", receivedReq.CommitSHA)
}
if receivedReq.DurationMs != 5000 {
t.Errorf("expected duration 5000, got %d", receivedReq.DurationMs)
}
}
func TestCompleteTask_TaskNotFound(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
_, _ = w.Write([]byte(`{"error":"task not found"}`))
}))
defer server.Close()
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
err := client.CompleteTask(context.Background(), "worker-1", "unknown", &domain.BuildResult{})
if err == nil {
t.Fatal("expected error, got nil")
}
if !containsSubstring(err.Error(), "complete task returned status 404") {
t.Errorf("expected 404 error, got %v", err)
}
}
func TestFailTask_Success(t *testing.T) {
var receivedReq FailTaskRequest
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
t.Errorf("expected POST, got %s", r.Method)
}
if r.URL.Path != "/workers/worker-1/fail/task-123" {
t.Errorf("expected /workers/worker-1/fail/task-123, got %s", r.URL.Path)
}
body, _ := io.ReadAll(r.Body)
_ = json.Unmarshal(body, &receivedReq)
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
client := NewAPIClient(APIClientConfig{BaseURL: server.URL, APIKey: "key"})
err := client.FailTask(context.Background(), "worker-1", "task-123", "build failed", "error output", 3000)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if receivedReq.Error != "build failed" {
t.Errorf("expected error 'build failed', got %s", receivedReq.Error)
}
if receivedReq.Output != "error output" {
t.Errorf("expected output 'error output', got %s", receivedReq.Output)
}
if receivedReq.DurationMs != 3000 {
t.Errorf("expected duration 3000, got %d", receivedReq.DurationMs)
}
}
func TestFailTask_ErrorStatus(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(`{"error":"database unavailable"}`))
}))
defer server.Close()
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
err := client.FailTask(context.Background(), "worker-1", "task-123", "failed", "", 0)
if err == nil {
t.Fatal("expected error, got nil")
}
if !containsSubstring(err.Error(), "fail task returned status 500") {
t.Errorf("expected 500 error, got %v", err)
}
}
func TestToWorkTask_ParsesTimestamps(t *testing.T) {
data := &WorkTaskData{
ID: "task-1",
ProjectID: "proj-1",
Type: "build",
Status: "running",
CreatedAt: "2024-01-15T10:30:00Z",
StartedAt: "2024-01-15T10:31:00Z",
}
task := data.ToWorkTask()
if task.CreatedAt.IsZero() {
t.Error("expected CreatedAt to be set")
}
if task.CreatedAt.Year() != 2024 || task.CreatedAt.Month() != 1 || task.CreatedAt.Day() != 15 {
t.Errorf("unexpected CreatedAt: %v", task.CreatedAt)
}
if task.StartedAt == nil {
t.Fatal("expected StartedAt to be set")
}
if task.StartedAt.Hour() != 10 || task.StartedAt.Minute() != 31 {
t.Errorf("unexpected StartedAt: %v", task.StartedAt)
}
}
func TestToWorkTask_InvalidTimestamp(t *testing.T) {
data := &WorkTaskData{
ID: "task-1",
ProjectID: "proj-1",
Type: "build",
Status: "running",
CreatedAt: "not-a-timestamp",
StartedAt: "also-not-valid",
}
task := data.ToWorkTask()
// Invalid timestamps should result in zero values (current behavior)
if !task.CreatedAt.IsZero() {
t.Errorf("expected zero CreatedAt for invalid timestamp, got %v", task.CreatedAt)
}
// Note: StartedAt is parsed but ignored on error, pointer becomes non-nil with zero value
}
func TestToWorkTask_EmptyTimestamps(t *testing.T) {
data := &WorkTaskData{
ID: "task-1",
ProjectID: "proj-1",
Type: "build",
Status: "pending",
CreatedAt: "",
StartedAt: "",
}
task := data.ToWorkTask()
if !task.CreatedAt.IsZero() {
t.Errorf("expected zero CreatedAt for empty string, got %v", task.CreatedAt)
}
if task.StartedAt != nil {
t.Errorf("expected nil StartedAt for empty string, got %v", task.StartedAt)
}
}
func TestToWorkTask_NilData(t *testing.T) {
var data *WorkTaskData
task := data.ToWorkTask()
if task != nil {
t.Errorf("expected nil task from nil data, got %+v", task)
}
}
func TestToWorkTask_PreservesAllFields(t *testing.T) {
data := &WorkTaskData{
ID: "task-1",
ProjectID: "proj-1",
Type: "deploy",
Status: "completed",
Priority: 10,
WorkerID: "worker-99",
CallbackURL: "https://callback.example.com",
RetryCount: 2,
MaxRetries: 5,
Spec: map[string]any{"key": "value"},
}
task := data.ToWorkTask()
if task.ID != "task-1" {
t.Errorf("expected ID task-1, got %s", task.ID)
}
if task.ProjectID != "proj-1" {
t.Errorf("expected ProjectID proj-1, got %s", task.ProjectID)
}
if task.Type != domain.WorkTaskType("deploy") {
t.Errorf("expected Type deploy, got %s", task.Type)
}
if task.Status != domain.WorkTaskStatus("completed") {
t.Errorf("expected Status completed, got %s", task.Status)
}
if task.Priority != 10 {
t.Errorf("expected Priority 10, got %d", task.Priority)
}
if task.WorkerID != "worker-99" {
t.Errorf("expected WorkerID worker-99, got %s", task.WorkerID)
}
if task.CallbackURL != "https://callback.example.com" {
t.Errorf("expected CallbackURL, got %s", task.CallbackURL)
}
if task.RetryCount != 2 {
t.Errorf("expected RetryCount 2, got %d", task.RetryCount)
}
if task.MaxRetries != 5 {
t.Errorf("expected MaxRetries 5, got %d", task.MaxRetries)
}
if task.Spec["key"] != "value" {
t.Errorf("expected Spec[key]=value, got %v", task.Spec)
}
}
// containsSubstring is a helper to check if a string contains a substring.
func containsSubstring(s, substr string) bool {
return len(s) >= len(substr) && (s == substr || len(s) > 0 && containsHelper(s, substr))
}
func containsHelper(s, substr string) bool {
for i := 0; i <= len(s)-len(substr); i++ {
if s[i:i+len(substr)] == substr {
return true
}
}
return false
}

View File

@ -104,9 +104,6 @@ func (m *mockWorkQueue) GetTask(_ context.Context, taskID string) (*domain.WorkT
}
return task, nil
}
func (m *mockWorkQueue) List(_ context.Context, _ *domain.WorkTaskStatus, _ domain.WorkListOptions) (*domain.WorkListResult, error) {
return &domain.WorkListResult{}, nil
}
func (m *mockWorkQueue) ListByProject(_ context.Context, _ string, _ *domain.WorkTaskStatus, _ domain.WorkListOptions) (*domain.WorkListResult, error) {
return &domain.WorkListResult{}, nil
}

View File

@ -63,10 +63,6 @@ func (m *mockMaintenanceQueue) GetTask(_ context.Context, _ string) (*domain.Wor
return nil, nil
}
func (m *mockMaintenanceQueue) List(_ context.Context, _ *domain.WorkTaskStatus, _ domain.WorkListOptions) (*domain.WorkListResult, error) {
return nil, nil
}
func (m *mockMaintenanceQueue) ListByProject(_ context.Context, _ string, _ *domain.WorkTaskStatus, _ domain.WorkListOptions) (*domain.WorkListResult, error) {
return nil, nil
}