Compare commits

...

5 Commits

Author SHA1 Message Date
jordan
83b5d1ebb4 ci: trigger rebuild for workService fix
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-07 00:38:12 -07:00
jordan
e58d679e67 fix: add go mod download to component Dockerfiles
Empty go.sum files were causing Docker builds to fail because
Go couldn't verify dependencies. Added go mod download steps
for both pkg and component directories before building.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-06 23:35:02 -07:00
jordan
5d86bb7c57 feat: enable Claude Code OTEL telemetry in claudebox containers
Add OpenTelemetry environment variables to export Claude Code logs
and metrics to the existing OTEL collector. Provides visibility into
long-running builds.

- claudebox-worker: sidecar in rdev-worker deployment
- claudebox-standalone: StatefulSet for direct access

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-06 19:43:47 -07:00
jordan
bc010c4746 feat: add RWX storage class and full SDLC lifecycle cookbook
- Add longhorn-rwx StorageClass for RWX volume support
- Add slackpath-5-full-lifecycle.yaml cookbook tree (all 10 SDLC phases)
- Update worker-pool.md documentation
- Consolidate PVC configuration, remove separate pvc-shared-claude.yaml
- Update rdev-worker and kustomization for new PVC structure

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-06 11:37:57 -07:00
jordan
d74efb75ff fix: wire workService to WorkersHandler and add /work/tasks endpoint
Critical fix: WorkersHandler was missing workService dependency, causing
500 errors when workers tried to fail tasks. This caused tasks to get
stuck in "running" state permanently.

Also adds:
- /work/tasks endpoint for debugging all tasks across projects
- List method to WorkQueue interface for admin views
- HTTP client tests for api_client.go and claudebox/client.go (48 tests)
- Split work.go DTOs into work_dto.go to stay under 500 lines

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-06 10:35:39 -07:00
24 changed files with 2403 additions and 169 deletions

View File

@ -14,7 +14,7 @@ Quick reference for rdev concepts and facts.
| Webhooks | [services/webhooks.md](./services/webhooks.md) | High | 2025-01 | Event subscriptions and delivery | | Webhooks | [services/webhooks.md](./services/webhooks.md) | High | 2025-01 | Event subscriptions and delivery |
| **Worker Infrastructure** | | **Worker Infrastructure** |
| Work Queue | [services/work-queue.md](./services/work-queue.md) | High | 2025-01 | Task queue for worker pool | | Work Queue | [services/work-queue.md](./services/work-queue.md) | High | 2025-01 | Task queue for worker pool |
| Worker Pool | [services/worker-pool.md](./services/worker-pool.md) | High | 2026-01 | Embedded work executor with queue maintenance and metrics | | Worker Pool | [services/worker-pool.md](./services/worker-pool.md) | High | 2026-02 | Standalone worker pods with claudebox sidecar, HTTP polling |
| External Health | [services/external-health.md](./services/external-health.md) | High | 2026-02 | Background health monitoring of registry, CI, git | | External Health | [services/external-health.md](./services/external-health.md) | High | 2026-02 | Background health monitoring of registry, CI, git |
| CI Provider | [services/ci-provider.md](./services/ci-provider.md) | High | 2025-01 | Woodpecker auto-activation | | CI Provider | [services/ci-provider.md](./services/ci-provider.md) | High | 2025-01 | Woodpecker auto-activation |
| DNS / Cloudflare | [services/dns-cloudflare.md](./services/dns-cloudflare.md) | High | 2026-01 | Domain management for threesix.ai | | DNS / Cloudflare | [services/dns-cloudflare.md](./services/dns-cloudflare.md) | High | 2026-01 | Domain management for threesix.ai |

View File

@ -1,79 +1,193 @@
# Worker Pool # Worker Pool
**Last Updated:** 2026-01-31 **Last Updated:** 2026-02-06
**Confidence:** High **Confidence:** High
## Summary ## Summary
Shared worker pool that executes build tasks for any project. Currently runs as an embedded WorkExecutor daemon inside rdev-api. Workers register with the worker registry, poll the work queue for tasks, execute Claude Code in pods via kubectl exec. Post-build git operations (commit/push) are programmatic via PodGitOperations, not LLM-driven. Distributed task execution system where standalone worker pods poll rdev-api for tasks and execute them via a claudebox sidecar. Supports horizontal scaling by adding more worker pods.
**Key Facts:** **Key Facts:**
- **LLM vs rdev boundary:** Claude writes code; rdev handles git ops programmatically (no LLM for runbook tasks) - **Architecture:** Pull-based polling (not push/websocket)
- Embedded WorkExecutor daemon runs inside rdev-api process - **Sidecar pattern:** Worker + claudebox in same pod, communicate via localhost HTTP
- Workers poll work queue every 5 seconds, heartbeat every 30 seconds - **Atomic dequeue:** PostgreSQL `FOR UPDATE SKIP LOCKED` prevents duplicate claims
- Stale workers (no heartbeat for 2 minutes) automatically marked offline by QueueMaintenance - **Task types:** `build` (Claude Code prompts), `sdlc` (SDLC commands)
- Stale tasks (running >30 min without completion) automatically requeued - **Scaling:** Add replicas to handle more concurrent tasks
- Old tasks (>7 days) automatically cleaned up - **Resilience:** Stale workers marked offline, stuck tasks re-queued automatically
- Queue depth and worker counts exported as Prometheus metrics
- Future: external worker binary for separate pod deployment
**File Pointers:** ## File Pointers
- Domain: `internal/domain/worker.go` (Worker, WorkerStatus)
- Domain: `internal/domain/build.go` (BuildSpec, BuildResult)
- Port: `internal/port/worker_registry.go` (WorkerRegistry interface)
- Port: `internal/port/build_audit.go` (BuildAudit interface)
- Adapter: `internal/adapter/postgres/worker_registry.go`
- Adapter: `internal/adapter/postgres/build_audit.go`
- Service: `internal/service/worker_service.go`
- Service: `internal/service/build_service.go`
- Executor: `internal/worker/work_executor.go` (poll loop, heartbeat, task routing)
- Executor: `internal/worker/build_executor.go` (BuildSpec→AgentRequest)
- Git: `internal/worker/pod_git_operations.go` (post-build commit/push via kubectl exec)
- Maintenance: `internal/worker/queue_maintenance.go` (stale recovery, cleanup, metrics)
- Handler: `internal/handlers/workers.go` (REST API for workers)
- Handler: `internal/handlers/builds.go` (REST API for builds)
- Handler: `internal/handlers/create_and_build.go` (combined create+build)
- Migration: `internal/db/migrations/012_worker_registry.sql`
## Worker Lifecycle (Embedded) ### Standalone Worker Binary
- **Entry:** `cmd/rdev-worker/main.go` - Main binary, registration, heartbeat, poll loop
- **API Client:** `internal/worker/api_client.go` - HTTP client to rdev-api
- **Build Executor:** `internal/worker/http_build_executor.go` - Execute builds via claudebox
- **SDLC Executor:** `internal/worker/http_sdlc_executor.go` - Execute SDLC tasks via claudebox
1. rdev-api starts → WorkExecutor registers as worker in registry ### Claudebox Sidecar Client
2. Heartbeat loop: every 30s sends heartbeat via WorkerService - **Client:** `internal/adapter/claudebox/client.go` - HTTP client to claudebox sidecar
3. Poll loop: every 5s dequeues next task from work queue - **Endpoints:** `/health`, `/execute`, `/git/clone`, `/git/commit-and-push`, `/sdlc`
4. BuildExecutor: executes CodeAgent in pod, then programmatically commits/pushes if auto_commit
5. Reports completion with BuildResult via WorkerService ### rdev-api Server-Side
6. Graceful shutdown: deregisters worker on rdev-api stop - **Handlers:** `internal/handlers/workers.go` - `/workers/*` endpoints
- **Service:** `internal/service/worker_service.go` - Claim, complete, fail logic
- **Registry:** `internal/adapter/postgres/worker_registry.go` - Worker state persistence
- **Queue:** `internal/adapter/postgres/work_queue.go` - Task queue with atomic dequeue
### Domain
- **Worker:** `internal/domain/worker.go` - Worker, WorkerStatus
- **Task:** `internal/domain/work.go` - WorkTask, WorkTaskType, WorkTaskStatus
- **Build:** `internal/domain/build.go` - BuildSpec, BuildResult
### Kubernetes
- **Deployment:** `deployments/k8s/base/rdev-worker.yaml` - Worker + claudebox pod spec
## Architecture
```
┌─────────────────────┐ HTTP Polling (5s) ┌──────────────────────────┐
│ rdev-api │◄────────────────────────────────►│ Worker Pod │
│ │ │ ┌─────────┐ ┌─────────┐ │
│ POST /workers/register ← Register at startup │ │ worker │→│claudebox│ │
│ POST /workers/{id}/heartbeat ← Every 30s │ └─────────┘ └─────────┘ │
│ POST /workers/{id}/claim ← Poll for tasks │ ↓ HTTP localhost │
│ POST /workers/{id}/complete/{taskId} ← Success │ Claude Code execution │
│ POST /workers/{id}/fail/{taskId} ← Failure └──────────────────────────┘
│ │
│ PostgreSQL │
│ ├─ workers │ (worker registry)
│ ├─ work_queue │ (task queue)
│ └─ build_audit │ (execution history)
└─────────────────────┘
```
## Worker Lifecycle
1. **Register:** Worker pod starts → `POST /workers/register` with ID, hostname, capabilities
2. **Heartbeat:** Every 30s → `POST /workers/{id}/heartbeat` to stay alive
3. **Poll:** Every 5s → `POST /workers/{id}/claim` to get next task
4. **Execute:** Call claudebox sidecar HTTP API to run Claude Code / SDLC commands
5. **Report:** `POST /workers/{id}/complete/{taskId}` or `/fail/{taskId}` with results
6. **Shutdown:** Graceful wait for in-flight tasks via `sync.WaitGroup`
## Worker Statuses ## Worker Statuses
- `idle` - available for new tasks | Status | Meaning |
- `busy` - currently executing a task |--------|---------|
- `draining` - not accepting new tasks (pre-shutdown) | `idle` | Ready to claim new tasks |
- `offline` - missed heartbeat threshold | `busy` | Currently executing a task |
| `draining` | Not accepting new tasks (pre-shutdown) |
| `offline` | Missed heartbeat threshold (>90s) |
## Task Types
### Build Tasks (`WorkTaskTypeBuild`)
Execute Claude Code prompts with optional git operations.
**Spec:**
```json
{
"prompt": "Build a React app with...",
"auto_commit": true,
"auto_push": false,
"git_clone_url": "https://gitea.../repo.git"
}
```
**Execution Flow:**
1. Clone repo via `claudebox /git/clone`
2. Execute prompt via `claudebox /execute` (streaming)
3. Commit/push via `claudebox /git/commit-and-push`
### SDLC Tasks (`WorkTaskTypeSDLC`)
Execute SDLC CLI commands.
**Spec:**
```json
{
"command": "feature",
"args": ["init", "feature-name"],
"git_clone_url": "https://gitea.../repo.git"
}
```
**Execution Flow:**
1. Clone repo via `claudebox /git/clone`
2. Run SDLC command via `claudebox /sdlc`
3. Commit/push changes
## API Endpoints ## API Endpoints
| Method | Path | Description | | Method | Path | Description |
|--------|------|-------------| |--------|------|-------------|
| GET | `/workers` | List all workers with status summary | | POST | `/workers/register` | Register new worker |
| GET | `/workers/{workerId}` | Get worker details | | POST | `/workers/{id}/heartbeat` | Keep worker alive |
| POST | `/workers/{workerId}/drain` | Set worker to draining | | POST | `/workers/{id}/claim` | Claim next available task (204 if none) |
| POST | `/projects/{id}/builds` | Start build for project | | POST | `/workers/{id}/complete/{taskId}` | Report successful completion |
| GET | `/projects/{id}/builds` | List builds for project | | POST | `/workers/{id}/fail/{taskId}` | Report failure |
| GET | `/builds/{taskId}` | Get build status | | GET | `/workers` | List all workers |
| POST | `/project/create-and-build` | Create project + start build | | GET | `/workers/{id}` | Get worker details |
| POST | `/workers/{id}/drain` | Set worker to draining |
## Kubernetes Deployment
```yaml
# deployments/k8s/base/rdev-worker.yaml
spec:
replicas: 1 # Scale by increasing
strategy:
type: RollingUpdate # RWX PVC enables multi-pod mounts
rollingUpdate:
maxSurge: 2
maxUnavailable: 0
containers:
- name: worker
image: registry.threesix.ai/rdev/worker:latest
env:
- RDEV_API_URL: http://rdev-api.rdev.svc.cluster.local:8080
- CLAUDEBOX_URL: http://localhost:8080
- WORKER_POLL_INTERVAL: 5s
- WORKER_HEARTBEAT_INTERVAL: 30s
- WORKER_TASK_TIMEOUT: 15m
- name: claudebox
image: registry.threesix.ai/rdev/claudebox:latest
volumeMounts:
- /workspace (EmptyDir)
- /root/.claude (RWX PVC - shared Claude auth)
```
**Storage:** The `claudebox-claude-config` PVC uses `ReadWriteMany` (RWX) access mode with Longhorn NFS, allowing multiple worker pods to share Claude OAuth credentials.
## Error Classification
Failed tasks are classified for smart retry logic:
| Code | Trigger | Retryable |
|------|---------|-----------|
| `RATE_LIMITED` | "rate limit", "quota exceeded" | Yes (with backoff) |
| `AUTH_FAILED` | "unauthorized", "invalid api key" | No |
| `TIMEOUT` | "context deadline exceeded" | Yes |
| `AGENT_ERROR` | Generic error | Yes (limited retries) |
## Queue Maintenance ## Queue Maintenance
The QueueMaintenance worker runs inside rdev-api alongside the WorkExecutor: Background goroutine in rdev-api:
- **Stale task recovery** (every 1m): Requeues tasks running >30m without completion. Also syncs build_audit status to "pending" so API correctly reflects requeued state. - **Stale worker marking:** Workers without heartbeat >90s → `offline`
- **Stale worker marking** (every 1m): Marks workers offline after 2m without heartbeat - **Stale task recovery:** Tasks running >30m without completion → re-queued
- **Old task cleanup** (every 1m): Removes completed/failed/cancelled tasks >7 days old - **Old task cleanup:** Completed/failed tasks >7 days → deleted
- **Metrics refresh** (every 15s): Updates Prometheus gauges for queue depth and worker counts - **Metrics refresh:** Queue depth and worker counts → Prometheus
**Build Audit Sync:** When stale tasks are requeued, both `work_queue` and `build_audit` tables are updated atomically. This prevents builds from appearing stuck in "running" when the underlying task has been requeued for retry due to worker death. ## Graceful Shutdown
Worker uses `sync.WaitGroup` to track in-flight tasks:
1. Receive SIGTERM/SIGINT
2. Cancel context (stops polling)
3. Wait for WaitGroup with timeout (`WORKER_TASK_TIMEOUT`)
4. Log success or timeout warning
## Related Topics ## Related Topics
- [Work Queue](./work-queue.md) - [Work Queue](./work-queue.md) - Task queue implementation
- [Build Orchestration](../features/build-orchestration.md) - [Build Orchestration](../features/build-orchestration.md) - Build API and specs
- [SDLC Orchestration](./sdlc.md) - SDLC task integration

View File

@ -423,7 +423,7 @@ func main() {
agentsHandler := handlers.NewAgentsHandler(agentRegistry) agentsHandler := handlers.NewAgentsHandler(agentRegistry)
// Initialize worker pool handlers // Initialize worker pool handlers
workersHandler := handlers.NewWorkersHandler(workerService) workersHandler := handlers.NewWorkersHandler(workerService).WithWorkService(workService)
buildsHandler := handlers.NewBuildsHandler(buildService) buildsHandler := handlers.NewBuildsHandler(buildService)
createAndBuildHandler := handlers.NewCreateAndBuildHandler(projectInfraService, buildService) createAndBuildHandler := handlers.NewCreateAndBuildHandler(projectInfraService, buildService)

View File

@ -0,0 +1,536 @@
name: full-lifecycle
description: "Slack Path 5: The Full Lifecycle. Tests all 10 SDLC phases with explicit artifact approvals."
version: 1
vars:
project_name: ""
feature_slug: "user-preferences"
feature_title: "User Preferences API"
steps:
# ============================================================
# INFRASTRUCTURE
# ============================================================
create-project:
action: api
method: POST
endpoint: /project
body:
name: "{{ .vars.project_name }}"
description: "Slack Path 5: Full SDLC Lifecycle"
outputs:
- project_id: .data.name
- domain: .data.domain
add-db:
description: Add database for preferences storage
depends_on: [create-project]
on_error: continue
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/components"
body:
type: postgres
name: "main-db"
add-service:
description: Add API service
depends_on: [add-db]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/components"
body:
type: service
name: "preferences-api"
wait-init:
depends_on: [add-service]
action: wait_pipeline
project_id: "{{ .outputs.create-project.project_id }}"
# ============================================================
# PHASE 1: DRAFT
# Create feature (starts in draft phase)
# ============================================================
create-feature:
description: "Create feature in draft phase"
depends_on: [wait-init]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/sdlc/features"
body:
slug: "{{ .vars.feature_slug }}"
title: "{{ .vars.feature_title }}"
outputs:
- feature_phase: .data.phase
verify-draft:
description: "Verify feature is in draft phase"
depends_on: [create-feature]
action: shell
command: |
PHASE="{{ .outputs.create-feature.feature_phase }}"
if [ "$PHASE" == "draft" ]; then
echo "Feature created in draft phase"
exit 0
else
echo "Expected draft, got $PHASE"
exit 1
fi
# ============================================================
# PHASE 2: DRAFT → SPECIFIED
# Agent writes spec, API approves, transition
# ============================================================
write-spec:
description: "Agent writes the spec artifact"
depends_on: [verify-draft]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/builds"
body:
prompt: "/spec-feature {{ .vars.feature_slug }} --requirements 'CRUD API for user preferences. GET/PUT /preferences/{user_id}. Preferences are key-value pairs stored in DB. Support theme, language, notifications settings.'"
auto_commit: true
auto_push: true
git_clone_url: "https://git.threesix.ai/jordan/{{ .outputs.create-project.project_id }}.git"
outputs:
- build_id: .data.task_id
wait-spec:
depends_on: [write-spec]
action: wait_build
build_id: "{{ .outputs.write-spec.build_id }}"
max_attempts: 60
poll_interval: 5
approve-spec:
description: "API approves the spec artifact"
depends_on: [wait-spec]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/sdlc/features/{{ .vars.feature_slug }}/artifacts/spec/approve"
body:
comment: "Spec approved by automation"
transition-to-specified:
description: "Transition from draft to specified"
depends_on: [approve-spec]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/sdlc/features/{{ .vars.feature_slug }}/transition"
body:
phase: "specified"
outputs:
- new_phase: .data.phase
# ============================================================
# PHASE 3: SPECIFIED → PLANNED
# Agent writes design, tasks, qa_plan. API approves each.
# ============================================================
write-design:
description: "Agent writes the design artifact"
depends_on: [transition-to-specified]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/builds"
body:
prompt: "/design-feature {{ .vars.feature_slug }}"
auto_commit: true
auto_push: true
git_clone_url: "https://git.threesix.ai/jordan/{{ .outputs.create-project.project_id }}.git"
outputs:
- build_id: .data.task_id
wait-design:
depends_on: [write-design]
action: wait_build
build_id: "{{ .outputs.write-design.build_id }}"
max_attempts: 60
poll_interval: 5
approve-design:
description: "API approves the design artifact"
depends_on: [wait-design]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/sdlc/features/{{ .vars.feature_slug }}/artifacts/design/approve"
body:
comment: "Design approved by automation"
write-tasks:
description: "Agent breaks down into tasks"
depends_on: [approve-design]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/builds"
body:
prompt: "/breakdown-feature {{ .vars.feature_slug }}"
auto_commit: true
auto_push: true
git_clone_url: "https://git.threesix.ai/jordan/{{ .outputs.create-project.project_id }}.git"
outputs:
- build_id: .data.task_id
wait-tasks:
depends_on: [write-tasks]
action: wait_build
build_id: "{{ .outputs.write-tasks.build_id }}"
max_attempts: 60
poll_interval: 5
approve-tasks:
description: "API approves the tasks artifact"
depends_on: [wait-tasks]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/sdlc/features/{{ .vars.feature_slug }}/artifacts/tasks/approve"
body:
comment: "Tasks approved by automation"
write-qa-plan:
description: "Agent writes QA plan"
depends_on: [approve-tasks]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/builds"
body:
prompt: "/create-qa-plan {{ .vars.feature_slug }}"
auto_commit: true
auto_push: true
git_clone_url: "https://git.threesix.ai/jordan/{{ .outputs.create-project.project_id }}.git"
outputs:
- build_id: .data.task_id
wait-qa-plan:
depends_on: [write-qa-plan]
action: wait_build
build_id: "{{ .outputs.write-qa-plan.build_id }}"
max_attempts: 60
poll_interval: 5
approve-qa-plan:
description: "API approves the QA plan artifact"
depends_on: [wait-qa-plan]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/sdlc/features/{{ .vars.feature_slug }}/artifacts/qa_plan/approve"
body:
comment: "QA plan approved by automation"
transition-to-planned:
description: "Transition from specified to planned"
depends_on: [approve-qa-plan]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/sdlc/features/{{ .vars.feature_slug }}/transition"
body:
phase: "planned"
outputs:
- new_phase: .data.phase
# ============================================================
# PHASE 4: PLANNED → READY
# No new artifacts needed, just transition
# ============================================================
transition-to-ready:
description: "Transition from planned to ready"
depends_on: [transition-to-planned]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/sdlc/features/{{ .vars.feature_slug }}/transition"
body:
phase: "ready"
outputs:
- new_phase: .data.phase
# ============================================================
# PHASE 5: READY → IMPLEMENTATION
# Agent implements all tasks
# ============================================================
implement-feature:
description: "Agent implements all tasks for the feature"
depends_on: [transition-to-ready]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/builds"
body:
prompt: "/implement-feature {{ .vars.feature_slug }}"
auto_commit: true
auto_push: true
git_clone_url: "https://git.threesix.ai/jordan/{{ .outputs.create-project.project_id }}.git"
outputs:
- build_id: .data.task_id
wait-implement:
depends_on: [implement-feature]
action: wait_build
build_id: "{{ .outputs.implement-feature.build_id }}"
max_attempts: 120
poll_interval: 5
wait-deploy-impl:
description: "Wait for implementation to deploy"
depends_on: [wait-implement]
action: wait_pipeline
project_id: "{{ .outputs.create-project.project_id }}"
transition-to-implementation:
description: "Transition to implementation phase (marks code complete)"
depends_on: [wait-deploy-impl]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/sdlc/features/{{ .vars.feature_slug }}/transition"
body:
phase: "implementation"
outputs:
- new_phase: .data.phase
# ============================================================
# PHASE 6: IMPLEMENTATION → REVIEW
# Agent writes code review
# ============================================================
write-review:
description: "Agent writes code review"
depends_on: [transition-to-implementation]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/builds"
body:
prompt: "/review-feature {{ .vars.feature_slug }}"
auto_commit: true
auto_push: true
git_clone_url: "https://git.threesix.ai/jordan/{{ .outputs.create-project.project_id }}.git"
outputs:
- build_id: .data.task_id
wait-review:
depends_on: [write-review]
action: wait_build
build_id: "{{ .outputs.write-review.build_id }}"
max_attempts: 60
poll_interval: 5
approve-review:
description: "API approves the review"
depends_on: [wait-review]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/sdlc/features/{{ .vars.feature_slug }}/artifacts/review/approve"
body:
comment: "Review approved by automation"
transition-to-review:
description: "Transition to review phase"
depends_on: [approve-review]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/sdlc/features/{{ .vars.feature_slug }}/transition"
body:
phase: "review"
outputs:
- new_phase: .data.phase
# ============================================================
# PHASE 7: REVIEW → AUDIT
# Agent writes security/architecture audit
# ============================================================
write-audit:
description: "Agent writes security audit"
depends_on: [transition-to-review]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/builds"
body:
prompt: "/audit-feature {{ .vars.feature_slug }}"
auto_commit: true
auto_push: true
git_clone_url: "https://git.threesix.ai/jordan/{{ .outputs.create-project.project_id }}.git"
outputs:
- build_id: .data.task_id
wait-audit:
depends_on: [write-audit]
action: wait_build
build_id: "{{ .outputs.write-audit.build_id }}"
max_attempts: 60
poll_interval: 5
approve-audit:
description: "API approves the audit"
depends_on: [wait-audit]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/sdlc/features/{{ .vars.feature_slug }}/artifacts/audit/approve"
body:
comment: "Audit approved by automation"
transition-to-audit:
description: "Transition to audit phase"
depends_on: [approve-audit]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/sdlc/features/{{ .vars.feature_slug }}/transition"
body:
phase: "audit"
outputs:
- new_phase: .data.phase
# ============================================================
# PHASE 8: AUDIT → QA
# Agent runs QA tests
# ============================================================
run-qa:
description: "Agent runs QA plan"
depends_on: [transition-to-audit]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/builds"
body:
prompt: "/run-qa {{ .vars.feature_slug }}"
auto_commit: true
auto_push: true
git_clone_url: "https://git.threesix.ai/jordan/{{ .outputs.create-project.project_id }}.git"
outputs:
- build_id: .data.task_id
wait-qa:
depends_on: [run-qa]
action: wait_build
build_id: "{{ .outputs.run-qa.build_id }}"
max_attempts: 60
poll_interval: 5
transition-to-qa:
description: "Transition to QA phase"
depends_on: [wait-qa]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/sdlc/features/{{ .vars.feature_slug }}/transition"
body:
phase: "qa"
outputs:
- new_phase: .data.phase
# ============================================================
# PHASE 9: QA → MERGE
# Merge feature branch to main
# ============================================================
merge-feature:
description: "Merge feature branch to main"
depends_on: [transition-to-qa]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/sdlc/features/{{ .vars.feature_slug }}/merge"
body:
strategy: "squash"
outputs:
- merge_commit: .data.commit_sha
transition-to-merge:
description: "Transition to merge phase"
depends_on: [merge-feature]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/sdlc/features/{{ .vars.feature_slug }}/transition"
body:
phase: "merge"
outputs:
- new_phase: .data.phase
# ============================================================
# PHASE 10: MERGE → RELEASED
# Archive the feature
# ============================================================
wait-final-deploy:
description: "Wait for merged code to deploy"
depends_on: [transition-to-merge]
action: wait_pipeline
project_id: "{{ .outputs.create-project.project_id }}"
archive-feature:
description: "Archive the completed feature"
depends_on: [wait-final-deploy]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/sdlc/features/{{ .vars.feature_slug }}/archive"
transition-to-released:
description: "Transition to released phase"
depends_on: [archive-feature]
action: api
method: POST
endpoint: "/projects/{{ .outputs.create-project.project_id }}/sdlc/features/{{ .vars.feature_slug }}/transition"
body:
phase: "released"
outputs:
- final_phase: .data.phase
# ============================================================
# VERIFICATION
# ============================================================
verify-service-running:
description: "Verify the preferences API is running"
depends_on: [transition-to-released]
action: shell
command: |
DOMAIN="{{ .outputs.create-project.domain }}"
HEALTH=$(curl -s "https://$DOMAIN/api/preferences-api/health" | jq -r '.data.status // empty')
if [ "$HEALTH" == "healthy" ]; then
echo "Service healthy"
exit 0
else
echo "Service not healthy: $HEALTH"
exit 1
fi
verify-preferences-api:
description: "Test CRUD operations on preferences"
depends_on: [verify-service-running]
on_error: continue
action: shell
command: |
DOMAIN="{{ .outputs.create-project.domain }}"
BASE_URL="https://$DOMAIN/api/preferences-api"
USER_ID="test-user-123"
# PUT preferences
echo "Setting preferences..."
PUT_RESP=$(curl -s -X PUT "$BASE_URL/preferences/$USER_ID" \
-H "Content-Type: application/json" \
-d '{"theme":"dark","language":"en","notifications":true}')
echo "PUT response: $PUT_RESP"
# GET preferences
echo "Getting preferences..."
GET_RESP=$(curl -s "$BASE_URL/preferences/$USER_ID")
echo "GET response: $GET_RESP"
# Verify theme is dark
THEME=$(echo "$GET_RESP" | jq -r '.theme // .data.theme // empty')
if [ "$THEME" == "dark" ]; then
echo "Preferences API working correctly"
exit 0
else
echo "Expected theme=dark, got: $THEME"
exit 1
fi
verify-lifecycle-complete:
description: "Verify feature reached released phase"
depends_on: [verify-preferences-api]
action: shell
command: |
FINAL_PHASE="{{ .outputs.transition-to-released.final_phase }}"
if [ "$FINAL_PHASE" == "released" ]; then
echo "SUCCESS: Feature completed full lifecycle (draft → released)"
echo "All 10 phases traversed with explicit approvals"
exit 0
else
echo "FAIL: Expected released, got $FINAL_PHASE"
exit 1
fi
teardown:
- action: api
method: DELETE
endpoint: "/project/{{ .outputs.create-project.project_id }}"

View File

@ -25,6 +25,25 @@ spec:
image: registry.threesix.ai/rdev/claudebox:latest image: registry.threesix.ai/rdev/claudebox:latest
imagePullPolicy: Always imagePullPolicy: Always
env:
# Claude Code Telemetry - exports to OTEL collector
- name: CLAUDE_CODE_ENABLE_TELEMETRY
value: "1"
- name: OTEL_METRICS_EXPORTER
value: "otlp"
- name: OTEL_LOGS_EXPORTER
value: "otlp"
- name: OTEL_EXPORTER_OTLP_PROTOCOL
value: "grpc"
- name: OTEL_EXPORTER_OTLP_ENDPOINT
value: "otel-collector.observability.svc.cluster.local:4317"
- name: OTEL_SERVICE_NAME
value: "claudebox-standalone"
- name: OTEL_METRIC_EXPORT_INTERVAL
value: "10000"
- name: OTEL_LOGS_EXPORT_INTERVAL
value: "5000"
resources: resources:
requests: requests:
cpu: "500m" cpu: "500m"

View File

@ -6,9 +6,11 @@ namespace: rdev
resources: resources:
- namespace.yaml - namespace.yaml
# Storage classes (must be applied before PVCs)
- storageclass-rwx.yaml
# Shared worker claudebox (runs all project builds) # Shared worker claudebox (runs all project builds)
- pvc.yaml - pvc.yaml
- pvc-shared-claude.yaml
- claudebox.yaml - claudebox.yaml
- configmaps.yaml - configmaps.yaml

View File

@ -1,29 +0,0 @@
# Shared Claude credentials PVC
# v0.6 - All claudebox pods share this for auth
# Commands/skills/agents live in /workspace/.claude (per-project, in git)
#
# IMPORTANT: ReadWriteMany (RWX) requires Longhorn with NFS enabled.
# Verify with: kubectl get settings -n longhorn-system rwx-volume-fast-failover
# If RWX is not available, either:
# 1. Enable Longhorn NFS: kubectl apply -f longhorn-nfs-provisioner.yaml
# 2. Or use separate PVCs per pod (revert to per-project claude-config PVCs)
#
# RWX is needed because multiple claudebox pods mount this simultaneously
# to share Claude authentication credentials.
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: claudebox-shared-claude-config
namespace: rdev
labels:
app.kubernetes.io/name: claudebox
app.kubernetes.io/part-of: rdev
rdev.orchard9.ai/type: shared-config
spec:
accessModes:
- ReadWriteMany # Multiple pods can mount simultaneously
storageClassName: longhorn
resources:
requests:
storage: 1Gi

View File

@ -14,6 +14,12 @@ spec:
requests: requests:
storage: 20Gi storage: 20Gi
--- ---
# Claude config PVC - shared across claudebox and worker pods
# RWX (ReadWriteMany) allows multiple pods to mount simultaneously
# Contains Claude subscription OAuth credentials (~/.claude)
#
# IMPORTANT: Requires longhorn-rwx StorageClass (see storageclass-rwx.yaml)
# After recreating this PVC, re-authenticate with: claude login
apiVersion: v1 apiVersion: v1
kind: PersistentVolumeClaim kind: PersistentVolumeClaim
metadata: metadata:
@ -22,10 +28,11 @@ metadata:
labels: labels:
app.kubernetes.io/name: claudebox app.kubernetes.io/name: claudebox
app.kubernetes.io/part-of: rdev app.kubernetes.io/part-of: rdev
rdev.orchard9.ai/type: shared-config
spec: spec:
accessModes: accessModes:
- ReadWriteOnce - ReadWriteMany
storageClassName: longhorn storageClassName: longhorn-rwx
resources: resources:
requests: requests:
storage: 1Gi storage: 1Gi

View File

@ -10,10 +10,13 @@ metadata:
app.kubernetes.io/part-of: rdev app.kubernetes.io/part-of: rdev
spec: spec:
replicas: 1 replicas: 1
# Recreate strategy required: claudebox-claude-config PVC is RWO (ReadWriteOnce) # RollingUpdate enabled by RWX (ReadWriteMany) PVC for claude-config
# and cannot be attached to multiple pods simultaneously # See: deployments/k8s/base/pvc.yaml and storageclass-rwx.yaml
strategy: strategy:
type: Recreate type: RollingUpdate
rollingUpdate:
maxSurge: 2
maxUnavailable: 0
selector: selector:
matchLabels: matchLabels:
app: rdev-worker app: rdev-worker
@ -91,6 +94,23 @@ spec:
value: "rdev-worker" value: "rdev-worker"
- name: GIT_EMAIL - name: GIT_EMAIL
value: "worker@threesix.ai" value: "worker@threesix.ai"
# Claude Code Telemetry - exports to OTEL collector
- name: CLAUDE_CODE_ENABLE_TELEMETRY
value: "1"
- name: OTEL_METRICS_EXPORTER
value: "otlp"
- name: OTEL_LOGS_EXPORTER
value: "otlp"
- name: OTEL_EXPORTER_OTLP_PROTOCOL
value: "grpc"
- name: OTEL_EXPORTER_OTLP_ENDPOINT
value: "otel-collector.observability.svc.cluster.local:4317"
- name: OTEL_SERVICE_NAME
value: "claudebox-worker"
- name: OTEL_METRIC_EXPORT_INTERVAL
value: "10000"
- name: OTEL_LOGS_EXPORT_INTERVAL
value: "5000"
ports: ports:
- name: http - name: http

View File

@ -0,0 +1,24 @@
# RWX (ReadWriteMany) StorageClass for shared volumes
# Enables multiple pods to mount the same PVC simultaneously
# Used for: claudebox-claude-config (shared Claude auth credentials)
#
# Prerequisites:
# - Longhorn 1.4.0+ with NFS support
# - Verify: kubectl get settings -n longhorn-system | grep -i rwx
#
# If RWX is not available, enable it:
# kubectl patch -n longhorn-system settings rwx-volume-fast-failover --type merge -p '{"value":"true"}'
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: longhorn-rwx
labels:
app.kubernetes.io/part-of: rdev
provisioner: driver.longhorn.io
allowVolumeExpansion: true
reclaimPolicy: Retain
parameters:
numberOfReplicas: "2"
staleReplicaTimeout: "30"
nfsOptions: "vers=4.1,noresvport"

View File

@ -0,0 +1,774 @@
package claudebox
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"time"
)
func TestNewClient_DefaultTimeout(t *testing.T) {
client := NewClient(ClientConfig{
BaseURL: "http://localhost:8080",
})
if client.httpClient.Timeout != 10*time.Minute {
t.Errorf("expected default timeout 10m, got %v", client.httpClient.Timeout)
}
}
func TestNewClient_CustomTimeout(t *testing.T) {
client := NewClient(ClientConfig{
BaseURL: "http://localhost:8080",
Timeout: 5 * time.Minute,
})
if client.httpClient.Timeout != 5*time.Minute {
t.Errorf("expected timeout 5m, got %v", client.httpClient.Timeout)
}
}
func TestNewClient_TrimsTrailingSlash(t *testing.T) {
client := NewClient(ClientConfig{
BaseURL: "http://localhost:8080/",
})
if client.baseURL != "http://localhost:8080" {
t.Errorf("expected trailing slash trimmed, got %s", client.baseURL)
}
}
func TestHealth_Success(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
t.Errorf("expected GET, got %s", r.Method)
}
if r.URL.Path != "/health" {
t.Errorf("expected /health, got %s", r.URL.Path)
}
resp := HealthResponse{
Status: "healthy",
Timestamp: "2024-01-15T10:30:00Z",
WorkDir: "/workspace",
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(resp)
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
health, err := client.Health(context.Background())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if health.Status != "healthy" {
t.Errorf("expected status 'healthy', got %s", health.Status)
}
if health.WorkDir != "/workspace" {
t.Errorf("expected work_dir '/workspace', got %s", health.WorkDir)
}
}
func TestHealth_Unhealthy(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusServiceUnavailable)
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
_, err := client.Health(context.Background())
if err == nil {
t.Fatal("expected error, got nil")
}
if !strings.Contains(err.Error(), "health check returned status 503") {
t.Errorf("expected 503 error, got %v", err)
}
}
func TestHealth_NetworkError(t *testing.T) {
client := NewClient(ClientConfig{
BaseURL: "http://localhost:1",
Timeout: 100 * time.Millisecond,
})
_, err := client.Health(context.Background())
if err == nil {
t.Fatal("expected error, got nil")
}
if !strings.Contains(err.Error(), "health check:") {
t.Errorf("expected error wrapped with 'health check:', got %v", err)
}
}
func TestExecute_Success(t *testing.T) {
var receivedReq ExecuteRequest
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
t.Errorf("expected POST, got %s", r.Method)
}
if r.URL.Path != "/execute" {
t.Errorf("expected /execute, got %s", r.URL.Path)
}
if r.Header.Get("Content-Type") != "application/json" {
t.Errorf("expected Content-Type application/json, got %s", r.Header.Get("Content-Type"))
}
body, _ := io.ReadAll(r.Body)
_ = json.Unmarshal(body, &receivedReq)
resp := ExecuteResponse{
Success: true,
Output: "Task completed",
ExitCode: 0,
DurationMs: 5000,
SessionID: "session-123",
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(resp)
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
req := &ExecuteRequest{
Prompt: "Build the project",
AllowedTools: []string{"Bash", "Read", "Write"},
WorkingDir: "/workspace/project",
Timeout: 300,
Metadata: map[string]string{"task_id": "task-1"},
}
result, err := client.Execute(context.Background(), req)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !result.Success {
t.Error("expected success=true")
}
if result.Output != "Task completed" {
t.Errorf("expected output 'Task completed', got %s", result.Output)
}
if result.ExitCode != 0 {
t.Errorf("expected exit code 0, got %d", result.ExitCode)
}
// Verify request was serialized correctly
if receivedReq.Prompt != "Build the project" {
t.Errorf("expected prompt 'Build the project', got %s", receivedReq.Prompt)
}
if len(receivedReq.AllowedTools) != 3 {
t.Errorf("expected 3 allowed tools, got %d", len(receivedReq.AllowedTools))
}
}
func TestExecute_ErrorStatus(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(`{"error":"invalid prompt"}`))
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
_, err := client.Execute(context.Background(), &ExecuteRequest{Prompt: ""})
if err == nil {
t.Fatal("expected error, got nil")
}
if !strings.Contains(err.Error(), "execute returned status 400") {
t.Errorf("expected 400 error, got %v", err)
}
if !strings.Contains(err.Error(), "invalid prompt") {
t.Errorf("expected error body in message, got %v", err)
}
}
func TestExecute_MalformedResponse(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{invalid json`))
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
_, err := client.Execute(context.Background(), &ExecuteRequest{Prompt: "test"})
if err == nil {
t.Fatal("expected error, got nil")
}
if !strings.Contains(err.Error(), "decode response") {
t.Errorf("expected decode error, got %v", err)
}
}
func TestExecuteStream_Success(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
t.Errorf("expected POST, got %s", r.Method)
}
if r.URL.Path != "/execute/stream" {
t.Errorf("expected /execute/stream, got %s", r.URL.Path)
}
if r.Header.Get("Accept") != "text/event-stream" {
t.Errorf("expected Accept text/event-stream, got %s", r.Header.Get("Accept"))
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
flusher, ok := w.(http.Flusher)
if !ok {
t.Fatal("expected http.Flusher")
}
events := []StreamEvent{
{Type: "start", Timestamp: "2024-01-15T10:30:00Z"},
{Type: "output", Content: "Building...", Timestamp: "2024-01-15T10:30:01Z"},
{Type: "tool_call", ToolName: "Bash", Timestamp: "2024-01-15T10:30:02Z"},
{Type: "complete", Content: "Done", Timestamp: "2024-01-15T10:30:05Z"},
}
for _, event := range events {
data, _ := json.Marshal(event)
fmt.Fprintf(w, "data: %s\n\n", data)
flusher.Flush()
}
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
var receivedEvents []StreamEvent
var mu sync.Mutex
err := client.ExecuteStream(context.Background(), &ExecuteRequest{Prompt: "build"}, func(event StreamEvent) {
mu.Lock()
receivedEvents = append(receivedEvents, event)
mu.Unlock()
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(receivedEvents) != 4 {
t.Fatalf("expected 4 events, got %d", len(receivedEvents))
}
if receivedEvents[0].Type != "start" {
t.Errorf("expected first event type 'start', got %s", receivedEvents[0].Type)
}
if receivedEvents[1].Content != "Building..." {
t.Errorf("expected second event content 'Building...', got %s", receivedEvents[1].Content)
}
if receivedEvents[2].ToolName != "Bash" {
t.Errorf("expected third event tool name 'Bash', got %s", receivedEvents[2].ToolName)
}
}
func TestExecuteStream_SkipsMalformedEvents(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
flusher, _ := w.(http.Flusher)
// Valid event
event1, _ := json.Marshal(StreamEvent{Type: "start"})
fmt.Fprintf(w, "data: %s\n\n", event1)
flusher.Flush()
// Malformed JSON - should be skipped
fmt.Fprintf(w, "data: {invalid json}\n\n")
flusher.Flush()
// Empty data - should be skipped
fmt.Fprintf(w, "data: \n\n")
flusher.Flush()
// Non-data line - should be skipped
fmt.Fprintf(w, "event: ping\n\n")
flusher.Flush()
// Valid event
event2, _ := json.Marshal(StreamEvent{Type: "complete"})
fmt.Fprintf(w, "data: %s\n\n", event2)
flusher.Flush()
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
var receivedEvents []StreamEvent
var mu sync.Mutex
err := client.ExecuteStream(context.Background(), &ExecuteRequest{Prompt: "test"}, func(event StreamEvent) {
mu.Lock()
receivedEvents = append(receivedEvents, event)
mu.Unlock()
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// Should only receive the 2 valid events
if len(receivedEvents) != 2 {
t.Fatalf("expected 2 events (malformed skipped), got %d", len(receivedEvents))
}
if receivedEvents[0].Type != "start" {
t.Errorf("expected first event 'start', got %s", receivedEvents[0].Type)
}
if receivedEvents[1].Type != "complete" {
t.Errorf("expected second event 'complete', got %s", receivedEvents[1].Type)
}
}
func TestExecuteStream_ErrorStatus(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(`{"error":"agent unavailable"}`))
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
err := client.ExecuteStream(context.Background(), &ExecuteRequest{Prompt: "test"}, func(event StreamEvent) {
t.Error("handler should not be called on error")
})
if err == nil {
t.Fatal("expected error, got nil")
}
if !strings.Contains(err.Error(), "execute stream returned status 500") {
t.Errorf("expected 500 error, got %v", err)
}
}
func TestExecuteStream_ContextCanceledBeforeRequest(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
t.Error("handler should not be called when context is already canceled")
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
// Cancel context before making request
ctx, cancel := context.WithCancel(context.Background())
cancel()
err := client.ExecuteStream(ctx, &ExecuteRequest{Prompt: "test"}, func(event StreamEvent) {})
if err == nil {
t.Fatal("expected error, got nil")
}
// Should get a context canceled error
}
func TestGitClone_Success(t *testing.T) {
var receivedReq GitCloneRequest
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
t.Errorf("expected POST, got %s", r.Method)
}
if r.URL.Path != "/git/clone" {
t.Errorf("expected /git/clone, got %s", r.URL.Path)
}
body, _ := io.ReadAll(r.Body)
_ = json.Unmarshal(body, &receivedReq)
resp := GitCloneResponse{
Success: true,
Cloned: true,
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(resp)
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
result, err := client.GitClone(context.Background(), "https://github.com/example/repo.git", "/workspace")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !result.Success {
t.Error("expected success=true")
}
if !result.Cloned {
t.Error("expected cloned=true")
}
if receivedReq.CloneURL != "https://github.com/example/repo.git" {
t.Errorf("expected clone URL, got %s", receivedReq.CloneURL)
}
if receivedReq.WorkDir != "/workspace" {
t.Errorf("expected work dir '/workspace', got %s", receivedReq.WorkDir)
}
}
func TestGitClone_AlreadyExists(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
resp := GitCloneResponse{
Success: true,
Cloned: false, // Already existed, just updated
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(resp)
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
result, err := client.GitClone(context.Background(), "https://github.com/example/repo.git", "/workspace")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !result.Success {
t.Error("expected success=true")
}
if result.Cloned {
t.Error("expected cloned=false for existing repo")
}
}
func TestGitClone_ErrorStatus(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(`{"error":"invalid clone URL"}`))
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
_, err := client.GitClone(context.Background(), "invalid", "/workspace")
if err == nil {
t.Fatal("expected error, got nil")
}
if !strings.Contains(err.Error(), "git clone returned status 400") {
t.Errorf("expected 400 error, got %v", err)
}
}
func TestGitCommitAndPush_Success(t *testing.T) {
var receivedReq GitCommitAndPushRequest
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
t.Errorf("expected POST, got %s", r.Method)
}
if r.URL.Path != "/git/commit-and-push" {
t.Errorf("expected /git/commit-and-push, got %s", r.URL.Path)
}
body, _ := io.ReadAll(r.Body)
_ = json.Unmarshal(body, &receivedReq)
resp := GitCommitAndPushResponse{
Success: true,
HasChanges: true,
CommitSHA: "abc123def456",
FilesChanged: []string{"main.go", "go.mod"},
Pushed: true,
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(resp)
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
result, err := client.GitCommitAndPush(context.Background(), "feat: add feature", true, "/workspace")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !result.Success {
t.Error("expected success=true")
}
if !result.HasChanges {
t.Error("expected has_changes=true")
}
if result.CommitSHA != "abc123def456" {
t.Errorf("expected commit SHA abc123def456, got %s", result.CommitSHA)
}
if !result.Pushed {
t.Error("expected pushed=true")
}
if len(result.FilesChanged) != 2 {
t.Errorf("expected 2 files changed, got %d", len(result.FilesChanged))
}
// Verify request
if receivedReq.Message != "feat: add feature" {
t.Errorf("expected message 'feat: add feature', got %s", receivedReq.Message)
}
if !receivedReq.Push {
t.Error("expected push=true in request")
}
}
func TestGitCommitAndPush_NoChanges(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
resp := GitCommitAndPushResponse{
Success: true,
HasChanges: false,
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(resp)
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
result, err := client.GitCommitAndPush(context.Background(), "test", false, "/workspace")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !result.Success {
t.Error("expected success=true")
}
if result.HasChanges {
t.Error("expected has_changes=false")
}
if result.CommitSHA != "" {
t.Errorf("expected empty commit SHA, got %s", result.CommitSHA)
}
}
func TestGitCommitAndPush_ErrorStatus(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(`{"error":"git push failed"}`))
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
_, err := client.GitCommitAndPush(context.Background(), "test", true, "/workspace")
if err == nil {
t.Fatal("expected error, got nil")
}
if !strings.Contains(err.Error(), "git commit returned status 500") {
t.Errorf("expected 500 error, got %v", err)
}
}
func TestGitStatus_Success(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
t.Errorf("expected GET, got %s", r.Method)
}
if r.URL.Path != "/git/status" {
t.Errorf("expected /git/status, got %s", r.URL.Path)
}
if r.URL.Query().Get("work_dir") != "/workspace/project" {
t.Errorf("expected work_dir query param, got %s", r.URL.Query().Get("work_dir"))
}
resp := GitStatusResponse{
IsRepo: true,
HasChanges: true,
ChangedFiles: []string{"main.go", "README.md"},
Branch: "feature/test",
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(resp)
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
result, err := client.GitStatus(context.Background(), "/workspace/project")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !result.IsRepo {
t.Error("expected is_repo=true")
}
if !result.HasChanges {
t.Error("expected has_changes=true")
}
if result.Branch != "feature/test" {
t.Errorf("expected branch 'feature/test', got %s", result.Branch)
}
if len(result.ChangedFiles) != 2 {
t.Errorf("expected 2 changed files, got %d", len(result.ChangedFiles))
}
}
func TestGitStatus_EmptyWorkDir(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// work_dir should not be in query when empty
if r.URL.Query().Get("work_dir") != "" {
t.Errorf("expected empty work_dir, got %s", r.URL.Query().Get("work_dir"))
}
resp := GitStatusResponse{IsRepo: true}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(resp)
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
_, err := client.GitStatus(context.Background(), "")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
}
func TestGitStatus_ErrorStatus(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
_, _ = w.Write([]byte(`{"error":"not a git repository"}`))
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
_, err := client.GitStatus(context.Background(), "/workspace")
if err == nil {
t.Fatal("expected error, got nil")
}
if !strings.Contains(err.Error(), "git status returned status 404") {
t.Errorf("expected 404 error, got %v", err)
}
}
func TestRunSDLC_Success(t *testing.T) {
var receivedReq SDLCRequest
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
t.Errorf("expected POST, got %s", r.Method)
}
if r.URL.Path != "/sdlc" {
t.Errorf("expected /sdlc, got %s", r.URL.Path)
}
body, _ := io.ReadAll(r.Body)
_ = json.Unmarshal(body, &receivedReq)
resp := SDLCResponse{
Success: true,
Output: "Feature started successfully",
Data: json.RawMessage(`{"feature_id":"feat-123"}`),
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(resp)
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
result, err := client.RunSDLC(context.Background(), "start", []string{"--name", "test-feature"}, "/workspace")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !result.Success {
t.Error("expected success=true")
}
if result.Output != "Feature started successfully" {
t.Errorf("expected output message, got %s", result.Output)
}
if result.Data == nil {
t.Error("expected data to be set")
}
// Verify request
if receivedReq.Command != "start" {
t.Errorf("expected command 'start', got %s", receivedReq.Command)
}
if len(receivedReq.Args) != 2 {
t.Errorf("expected 2 args, got %d", len(receivedReq.Args))
}
if receivedReq.WorkDir != "/workspace" {
t.Errorf("expected work dir '/workspace', got %s", receivedReq.WorkDir)
}
}
func TestRunSDLC_CommandFailed(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
resp := SDLCResponse{
Success: false,
Output: "Command output before failure",
Error: "validation failed: missing required field",
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(resp)
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
result, err := client.RunSDLC(context.Background(), "validate", nil, "/workspace")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if result.Success {
t.Error("expected success=false")
}
if result.Error != "validation failed: missing required field" {
t.Errorf("expected error message, got %s", result.Error)
}
}
func TestRunSDLC_ErrorStatus(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(`{"error":"sdlc binary not found"}`))
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
_, err := client.RunSDLC(context.Background(), "status", nil, "/workspace")
if err == nil {
t.Fatal("expected error, got nil")
}
if !strings.Contains(err.Error(), "sdlc returned status 500") {
t.Errorf("expected 500 error, got %v", err)
}
}
func TestRunSDLC_MalformedResponse(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{invalid`))
}))
defer server.Close()
client := NewClient(ClientConfig{BaseURL: server.URL})
_, err := client.RunSDLC(context.Background(), "status", nil, "/workspace")
if err == nil {
t.Fatal("expected error, got nil")
}
if !strings.Contains(err.Error(), "decode response") {
t.Errorf("expected decode error, got %v", err)
}
}

View File

@ -97,6 +97,63 @@ func (r *WorkQueueRepository) GetTask(ctx context.Context, taskID string) (*doma
return &task, nil return &task, nil
} }
// List returns all tasks with optional status filter and pagination.
func (r *WorkQueueRepository) List(ctx context.Context, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
opts.Normalize()
// Build optional WHERE clause
whereClause := ""
var args []any
argNum := 1
if status != nil {
whereClause = fmt.Sprintf("WHERE status = $%d", argNum)
args = append(args, string(*status))
argNum++
}
// Get total count
countQuery := "SELECT COUNT(*) FROM work_queue " + whereClause
var total int64
if err := r.db.QueryRowContext(ctx, countQuery, args...).Scan(&total); err != nil {
return nil, fmt.Errorf("count work tasks: %w", err)
}
// Build paginated query
query := fmt.Sprintf(`
SELECT id, project_id, task_type, task_spec, status, priority, worker_id,
callback_url, created_at, started_at, completed_at, result, error,
retry_count, max_retries, error_code
FROM work_queue
%s
ORDER BY created_at DESC
LIMIT $%d OFFSET $%d
`, whereClause, argNum, argNum+1)
args = append(args, opts.Limit, opts.Offset)
rows, err := r.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("list work tasks: %w", err)
}
defer func() { _ = rows.Close() }()
var tasks []*domain.WorkTask
for rows.Next() {
task, err := r.scanTask(rows)
if err != nil {
return nil, err
}
tasks = append(tasks, task)
}
return &domain.WorkListResult{
Tasks: tasks,
Total: total,
Limit: opts.Limit,
Offset: opts.Offset,
}, nil
}
// ListByProject returns tasks for a project with optional status filter and pagination. // ListByProject returns tasks for a project with optional status filter and pagination.
func (r *WorkQueueRepository) ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) { func (r *WorkQueueRepository) ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
// Normalize pagination options // Normalize pagination options

View File

@ -80,6 +80,10 @@ func (m *mockWorkQueue) GetTask(ctx context.Context, taskID string) (*domain.Wor
return task, nil return task, nil
} }
func (m *mockWorkQueue) List(ctx context.Context, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
return &domain.WorkListResult{}, nil
}
func (m *mockWorkQueue) ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) { func (m *mockWorkQueue) ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
return &domain.WorkListResult{}, nil return &domain.WorkListResult{}, nil
} }

View File

@ -14,6 +14,10 @@ WORKDIR /app
COPY pkg/ ./pkg/ COPY pkg/ ./pkg/
COPY services/{{COMPONENT_NAME}}/ ./services/{{COMPONENT_NAME}}/ COPY services/{{COMPONENT_NAME}}/ ./services/{{COMPONENT_NAME}}/
# Download dependencies (populates go.sum if empty)
RUN cd pkg && go mod download
RUN cd services/{{COMPONENT_NAME}} && go mod download
# Build from the service directory (uses replace directive for ../pkg) # Build from the service directory (uses replace directive for ../pkg)
RUN cd services/{{COMPONENT_NAME}} && CGO_ENABLED=0 go build -o /{{COMPONENT_NAME}} ./cmd/server RUN cd services/{{COMPONENT_NAME}} && CGO_ENABLED=0 go build -o /{{COMPONENT_NAME}} ./cmd/server

View File

@ -14,6 +14,10 @@ WORKDIR /app
COPY pkg/ ./pkg/ COPY pkg/ ./pkg/
COPY workers/{{COMPONENT_NAME}}/ ./workers/{{COMPONENT_NAME}}/ COPY workers/{{COMPONENT_NAME}}/ ./workers/{{COMPONENT_NAME}}/
# Download dependencies (populates go.sum if empty)
RUN cd pkg && go mod download
RUN cd workers/{{COMPONENT_NAME}} && go mod download
# Build from the worker directory (uses replace directive for ../pkg) # Build from the worker directory (uses replace directive for ../pkg)
RUN cd workers/{{COMPONENT_NAME}} && CGO_ENABLED=0 go build -o /{{COMPONENT_NAME}} ./cmd/worker RUN cd workers/{{COMPONENT_NAME}} && CGO_ENABLED=0 go build -o /{{COMPONENT_NAME}} ./cmd/worker

View File

@ -38,10 +38,11 @@ func (h *WorkHandler) Mount(r api.Router) {
r.With(auth.RequireScope(auth.ScopeQueueWrite, auth.ScopeAdmin)).Post("/{taskId}/cancel", h.Cancel) r.With(auth.RequireScope(auth.ScopeQueueWrite, auth.ScopeAdmin)).Post("/{taskId}/cancel", h.Cancel)
// Read operations // Read operations
r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/tasks", h.ListTasks)
r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/stats", h.Stats)
r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/projects/{projectId}", h.ListByProject)
r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/{taskId}", h.GetTask) r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/{taskId}", h.GetTask)
r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/{taskId}/status", h.GetStatus) r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/{taskId}/status", h.GetStatus)
r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/projects/{projectId}", h.ListByProject)
r.With(auth.RequireScope(auth.ScopeQueueRead, auth.ScopeAdmin)).Get("/stats", h.Stats)
}) })
} }
@ -121,65 +122,6 @@ type DequeueWorkResponse struct {
Task *WorkTaskDTO `json:"task,omitempty"` Task *WorkTaskDTO `json:"task,omitempty"`
} }
// WorkTaskDTO is the data transfer object for work tasks.
type WorkTaskDTO struct {
ID string `json:"id"`
ProjectID string `json:"project_id"`
Type string `json:"type"`
Spec map[string]any `json:"spec"`
Status string `json:"status"`
Priority int `json:"priority"`
WorkerID string `json:"worker_id,omitempty"`
CallbackURL string `json:"callback_url,omitempty"`
CreatedAt string `json:"created_at"`
StartedAt string `json:"started_at,omitempty"`
CompletedAt string `json:"completed_at,omitempty"`
Result *WorkResultDTO `json:"result,omitempty"`
Error string `json:"error,omitempty"`
RetryCount int `json:"retry_count"`
MaxRetries int `json:"max_retries"`
}
// WorkResultDTO is the data transfer object for work results.
type WorkResultDTO struct {
Output string `json:"output,omitempty"`
Artifacts map[string]string `json:"artifacts,omitempty"`
}
// toWorkTaskDTO converts a domain.WorkTask to a WorkTaskDTO.
func toWorkTaskDTO(t *domain.WorkTask) *WorkTaskDTO {
if t == nil {
return nil
}
dto := &WorkTaskDTO{
ID: t.ID,
ProjectID: t.ProjectID,
Type: string(t.Type),
Spec: t.Spec,
Status: string(t.Status),
Priority: t.Priority,
WorkerID: t.WorkerID,
CallbackURL: t.CallbackURL,
CreatedAt: t.CreatedAt.Format("2006-01-02T15:04:05Z07:00"),
Error: t.Error,
RetryCount: t.RetryCount,
MaxRetries: t.MaxRetries,
}
if t.StartedAt != nil {
dto.StartedAt = t.StartedAt.Format("2006-01-02T15:04:05Z07:00")
}
if t.CompletedAt != nil {
dto.CompletedAt = t.CompletedAt.Format("2006-01-02T15:04:05Z07:00")
}
if t.Result != nil {
dto.Result = &WorkResultDTO{
Output: t.Result.Output,
Artifacts: t.Result.Artifacts,
}
}
return dto
}
// Dequeue claims the next available task for a worker. // Dequeue claims the next available task for a worker.
// POST /work/dequeue // POST /work/dequeue
func (h *WorkHandler) Dequeue(w http.ResponseWriter, r *http.Request) { func (h *WorkHandler) Dequeue(w http.ResponseWriter, r *http.Request) {
@ -341,6 +283,58 @@ func (h *WorkHandler) Cancel(w http.ResponseWriter, r *http.Request) {
}) })
} }
// ListTasks returns all tasks with optional status filter and pagination.
// GET /work/tasks?status=running&limit=50&offset=0
func (h *WorkHandler) ListTasks(w http.ResponseWriter, r *http.Request) {
// Parse and validate optional status filter
var status *domain.WorkTaskStatus
if s := r.URL.Query().Get("status"); s != "" {
st := domain.WorkTaskStatus(s)
if !st.IsValid() {
api.WriteBadRequest(w, r, "invalid status filter: must be pending, running, completed, failed, or cancelled")
return
}
status = &st
}
// Parse pagination options
opts := domain.DefaultWorkListOptions()
if limitStr := r.URL.Query().Get("limit"); limitStr != "" {
limit, err := strconv.Atoi(limitStr)
if err != nil {
api.WriteBadRequest(w, r, "limit must be a valid integer")
return
}
opts.Limit = limit
}
if offsetStr := r.URL.Query().Get("offset"); offsetStr != "" {
offset, err := strconv.Atoi(offsetStr)
if err != nil {
api.WriteBadRequest(w, r, "offset must be a valid integer")
return
}
opts.Offset = offset
}
result, err := h.workService.List(r.Context(), status, opts)
if err != nil {
api.WriteInternalError(w, r, "failed to list tasks")
return
}
dtos := make([]*WorkTaskDTO, len(result.Tasks))
for i, t := range result.Tasks {
dtos[i] = toWorkTaskDTO(t)
}
api.WriteSuccess(w, r, map[string]any{
"tasks": dtos,
"total": result.Total,
"limit": result.Limit,
"offset": result.Offset,
})
}
// ListByProject returns tasks for a project with pagination. // ListByProject returns tasks for a project with pagination.
// GET /work/projects/{projectId}?status=pending&limit=50&offset=0 // GET /work/projects/{projectId}?status=pending&limit=50&offset=0
func (h *WorkHandler) ListByProject(w http.ResponseWriter, r *http.Request) { func (h *WorkHandler) ListByProject(w http.ResponseWriter, r *http.Request) {
@ -428,21 +422,3 @@ func (h *WorkHandler) Stats(w http.ResponseWriter, r *http.Request) {
api.WriteSuccess(w, r, resp) api.WriteSuccess(w, r, resp)
} }
// formatDuration formats a duration in a human-readable way.
func formatDuration(d interface{ Seconds() float64 }) string {
secs := d.Seconds()
if secs < 60 {
return fmt.Sprintf("%.0fs", secs)
}
mins := secs / 60
if mins < 60 {
return fmt.Sprintf("%.1fm", mins)
}
hours := mins / 60
if hours < 24 {
return fmt.Sprintf("%.1fh", hours)
}
days := hours / 24
return fmt.Sprintf("%.1fd", days)
}

View File

@ -0,0 +1,85 @@
// Package handlers provides HTTP handlers for the rdev API.
package handlers
import (
"fmt"
"github.com/orchard9/rdev/internal/domain"
)
// WorkTaskDTO is the data transfer object for work tasks.
type WorkTaskDTO struct {
ID string `json:"id"`
ProjectID string `json:"project_id"`
Type string `json:"type"`
Spec map[string]any `json:"spec"`
Status string `json:"status"`
Priority int `json:"priority"`
WorkerID string `json:"worker_id,omitempty"`
CallbackURL string `json:"callback_url,omitempty"`
CreatedAt string `json:"created_at"`
StartedAt string `json:"started_at,omitempty"`
CompletedAt string `json:"completed_at,omitempty"`
Result *WorkResultDTO `json:"result,omitempty"`
Error string `json:"error,omitempty"`
RetryCount int `json:"retry_count"`
MaxRetries int `json:"max_retries"`
}
// WorkResultDTO is the data transfer object for work results.
type WorkResultDTO struct {
Output string `json:"output,omitempty"`
Artifacts map[string]string `json:"artifacts,omitempty"`
}
// toWorkTaskDTO converts a domain.WorkTask to a WorkTaskDTO.
func toWorkTaskDTO(t *domain.WorkTask) *WorkTaskDTO {
if t == nil {
return nil
}
dto := &WorkTaskDTO{
ID: t.ID,
ProjectID: t.ProjectID,
Type: string(t.Type),
Spec: t.Spec,
Status: string(t.Status),
Priority: t.Priority,
WorkerID: t.WorkerID,
CallbackURL: t.CallbackURL,
CreatedAt: t.CreatedAt.Format("2006-01-02T15:04:05Z07:00"),
Error: t.Error,
RetryCount: t.RetryCount,
MaxRetries: t.MaxRetries,
}
if t.StartedAt != nil {
dto.StartedAt = t.StartedAt.Format("2006-01-02T15:04:05Z07:00")
}
if t.CompletedAt != nil {
dto.CompletedAt = t.CompletedAt.Format("2006-01-02T15:04:05Z07:00")
}
if t.Result != nil {
dto.Result = &WorkResultDTO{
Output: t.Result.Output,
Artifacts: t.Result.Artifacts,
}
}
return dto
}
// formatDuration formats a duration in a human-readable way.
func formatDuration(d interface{ Seconds() float64 }) string {
secs := d.Seconds()
if secs < 60 {
return fmt.Sprintf("%.0fs", secs)
}
mins := secs / 60
if mins < 60 {
return fmt.Sprintf("%.1fm", mins)
}
hours := mins / 60
if hours < 24 {
return fmt.Sprintf("%.1fh", hours)
}
days := hours / 24
return fmt.Sprintf("%.1fd", days)
}

View File

@ -123,6 +123,39 @@ func (m *mockWorkQueue) GetTask(ctx context.Context, taskID string) (*domain.Wor
return task, nil return task, nil
} }
func (m *mockWorkQueue) List(ctx context.Context, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
if m.err != nil {
return nil, m.err
}
opts.Normalize()
var tasks []*domain.WorkTask
for _, task := range m.tasks {
if status == nil || task.Status == *status {
tasks = append(tasks, task)
}
}
// Apply pagination
total := int64(len(tasks))
if opts.Offset >= len(tasks) {
tasks = nil
} else {
end := opts.Offset + opts.Limit
if end > len(tasks) {
end = len(tasks)
}
tasks = tasks[opts.Offset:end]
}
return &domain.WorkListResult{
Tasks: tasks,
Total: total,
Limit: opts.Limit,
Offset: opts.Offset,
}, nil
}
func (m *mockWorkQueue) ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) { func (m *mockWorkQueue) ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
if m.err != nil { if m.err != nil {
return nil, m.err return nil, m.err

View File

@ -40,6 +40,10 @@ type WorkQueue interface {
// GetTask retrieves a task by ID. // GetTask retrieves a task by ID.
GetTask(ctx context.Context, taskID string) (*domain.WorkTask, error) GetTask(ctx context.Context, taskID string) (*domain.WorkTask, error)
// List returns all tasks with optional status filter and pagination.
// Use for admin/debugging views across all projects.
List(ctx context.Context, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error)
// ListByProject returns tasks for a project with optional status filter and pagination. // ListByProject returns tasks for a project with optional status filter and pagination.
ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error)

View File

@ -91,6 +91,10 @@ func (m *mockWorkQueue) GetTask(ctx context.Context, taskID string) (*domain.Wor
return task, nil return task, nil
} }
func (m *mockWorkQueue) List(ctx context.Context, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
return &domain.WorkListResult{}, nil
}
func (m *mockWorkQueue) ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) { func (m *mockWorkQueue) ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
return &domain.WorkListResult{}, nil return &domain.WorkListResult{}, nil
} }

View File

@ -205,6 +205,11 @@ func (s *WorkService) GetTask(ctx context.Context, taskID string) (*domain.WorkT
return s.queue.GetTask(ctx, taskID) return s.queue.GetTask(ctx, taskID)
} }
// List returns all tasks with optional status filter and pagination.
func (s *WorkService) List(ctx context.Context, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
return s.queue.List(ctx, status, opts)
}
// ListByProject returns tasks for a project with pagination. // ListByProject returns tasks for a project with pagination.
func (s *WorkService) ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) { func (s *WorkService) ListByProject(ctx context.Context, projectID string, status *domain.WorkTaskStatus, opts domain.WorkListOptions) (*domain.WorkListResult, error) {
return s.queue.ListByProject(ctx, projectID, status, opts) return s.queue.ListByProject(ctx, projectID, status, opts)

View File

@ -0,0 +1,584 @@
package worker
import (
"context"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/orchard9/rdev/internal/domain"
)
func TestNewAPIClient_DefaultTimeout(t *testing.T) {
client := NewAPIClient(APIClientConfig{
BaseURL: "http://localhost:8080",
APIKey: "test-key",
})
if client.httpClient.Timeout != 30*time.Second {
t.Errorf("expected default timeout 30s, got %v", client.httpClient.Timeout)
}
}
func TestNewAPIClient_CustomTimeout(t *testing.T) {
client := NewAPIClient(APIClientConfig{
BaseURL: "http://localhost:8080",
APIKey: "test-key",
Timeout: 60 * time.Second,
})
if client.httpClient.Timeout != 60*time.Second {
t.Errorf("expected timeout 60s, got %v", client.httpClient.Timeout)
}
}
func TestRegister_Success(t *testing.T) {
var receivedReq RegisterRequest
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
t.Errorf("expected POST, got %s", r.Method)
}
if r.URL.Path != "/workers/register" {
t.Errorf("expected /workers/register, got %s", r.URL.Path)
}
if r.Header.Get("Content-Type") != "application/json" {
t.Errorf("expected Content-Type application/json, got %s", r.Header.Get("Content-Type"))
}
if r.Header.Get("X-API-Key") != "test-key" {
t.Errorf("expected X-API-Key test-key, got %s", r.Header.Get("X-API-Key"))
}
body, _ := io.ReadAll(r.Body)
_ = json.Unmarshal(body, &receivedReq)
w.WriteHeader(http.StatusCreated)
}))
defer server.Close()
client := NewAPIClient(APIClientConfig{
BaseURL: server.URL,
APIKey: "test-key",
})
err := client.Register(context.Background(), &RegisterRequest{
ID: "worker-1",
Hostname: "localhost",
Version: "v1.0.0",
Capabilities: []string{"build", "test"},
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if receivedReq.ID != "worker-1" {
t.Errorf("expected ID worker-1, got %s", receivedReq.ID)
}
if receivedReq.Hostname != "localhost" {
t.Errorf("expected hostname localhost, got %s", receivedReq.Hostname)
}
}
func TestRegister_ErrorStatus(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(`{"error":"invalid worker ID"}`))
}))
defer server.Close()
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
err := client.Register(context.Background(), &RegisterRequest{ID: "bad"})
if err == nil {
t.Fatal("expected error, got nil")
}
if want := "register returned status 400"; !containsSubstring(err.Error(), want) {
t.Errorf("expected error containing %q, got %v", want, err)
}
if !containsSubstring(err.Error(), "invalid worker ID") {
t.Errorf("expected error to contain response body, got %v", err)
}
}
func TestRegister_NetworkError(t *testing.T) {
client := NewAPIClient(APIClientConfig{
BaseURL: "http://localhost:1", // invalid port
Timeout: 100 * time.Millisecond,
})
err := client.Register(context.Background(), &RegisterRequest{ID: "test"})
if err == nil {
t.Fatal("expected error, got nil")
}
if !containsSubstring(err.Error(), "register:") {
t.Errorf("expected error wrapped with 'register:', got %v", err)
}
}
func TestRegister_ContextCanceled(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(1 * time.Second)
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
ctx, cancel := context.WithCancel(context.Background())
cancel() // cancel immediately
err := client.Register(ctx, &RegisterRequest{ID: "test"})
if err == nil {
t.Fatal("expected error, got nil")
}
}
func TestHeartbeat_Success(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
t.Errorf("expected POST, got %s", r.Method)
}
if r.URL.Path != "/workers/worker-123/heartbeat" {
t.Errorf("expected /workers/worker-123/heartbeat, got %s", r.URL.Path)
}
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
client := NewAPIClient(APIClientConfig{BaseURL: server.URL, APIKey: "key"})
err := client.Heartbeat(context.Background(), "worker-123")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
}
func TestHeartbeat_WorkerNotFound(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
_, _ = w.Write([]byte(`{"error":"worker not found"}`))
}))
defer server.Close()
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
err := client.Heartbeat(context.Background(), "unknown-worker")
if err == nil {
t.Fatal("expected error, got nil")
}
if !containsSubstring(err.Error(), "heartbeat returned status 404") {
t.Errorf("expected 404 error, got %v", err)
}
}
func TestClaimTask_Success(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
t.Errorf("expected POST, got %s", r.Method)
}
if r.URL.Path != "/workers/worker-1/claim" {
t.Errorf("expected /workers/worker-1/claim, got %s", r.URL.Path)
}
resp := ClaimTaskResponse{
Success: true,
Data: struct {
Task *WorkTaskData `json:"task"`
WorkerID string `json:"worker_id"`
}{
WorkerID: "worker-1",
Task: &WorkTaskData{
ID: "task-123",
ProjectID: "proj-1",
Type: "build",
Status: "running",
Priority: 5,
CreatedAt: "2024-01-15T10:30:00Z",
StartedAt: "2024-01-15T10:31:00Z",
RetryCount: 1,
MaxRetries: 3,
Spec: map[string]any{"prompt": "build it"},
},
},
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(resp)
}))
defer server.Close()
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
task, err := client.ClaimTask(context.Background(), "worker-1")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if task == nil {
t.Fatal("expected task, got nil")
}
if task.ID != "task-123" {
t.Errorf("expected task ID task-123, got %s", task.ID)
}
if task.ProjectID != "proj-1" {
t.Errorf("expected project ID proj-1, got %s", task.ProjectID)
}
if task.Type != domain.WorkTaskType("build") {
t.Errorf("expected type build, got %s", task.Type)
}
if task.Priority != 5 {
t.Errorf("expected priority 5, got %d", task.Priority)
}
if task.RetryCount != 1 {
t.Errorf("expected retry count 1, got %d", task.RetryCount)
}
}
func TestClaimTask_NoTasksAvailable(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}))
defer server.Close()
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
task, err := client.ClaimTask(context.Background(), "worker-1")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if task != nil {
t.Errorf("expected nil task, got %+v", task)
}
}
func TestClaimTask_NilTaskData(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
resp := ClaimTaskResponse{
Success: true,
Data: struct {
Task *WorkTaskData `json:"task"`
WorkerID string `json:"worker_id"`
}{
WorkerID: "worker-1",
Task: nil,
},
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(resp)
}))
defer server.Close()
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
_, err := client.ClaimTask(context.Background(), "worker-1")
if err == nil {
t.Fatal("expected error, got nil")
}
if !containsSubstring(err.Error(), "no task data") {
t.Errorf("expected 'no task data' error, got %v", err)
}
}
func TestClaimTask_MalformedJSON(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{invalid json}`))
}))
defer server.Close()
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
_, err := client.ClaimTask(context.Background(), "worker-1")
if err == nil {
t.Fatal("expected error, got nil")
}
if !containsSubstring(err.Error(), "decode response") {
t.Errorf("expected decode error, got %v", err)
}
}
func TestClaimTask_ErrorStatus(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(`{"error":"database error"}`))
}))
defer server.Close()
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
_, err := client.ClaimTask(context.Background(), "worker-1")
if err == nil {
t.Fatal("expected error, got nil")
}
if !containsSubstring(err.Error(), "claim task returned status 500") {
t.Errorf("expected 500 error, got %v", err)
}
}
func TestCompleteTask_Success(t *testing.T) {
var receivedReq CompleteTaskRequest
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
t.Errorf("expected POST, got %s", r.Method)
}
if r.URL.Path != "/workers/worker-1/complete/task-123" {
t.Errorf("expected /workers/worker-1/complete/task-123, got %s", r.URL.Path)
}
body, _ := io.ReadAll(r.Body)
_ = json.Unmarshal(body, &receivedReq)
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
client := NewAPIClient(APIClientConfig{BaseURL: server.URL, APIKey: "key"})
result := &domain.BuildResult{
Success: true,
Output: "Build successful",
CommitSHA: "abc123",
FilesChanged: []string{"main.go", "go.mod"},
DurationMs: 5000,
Artifacts: map[string]string{"deploy_url": "https://example.com"},
}
err := client.CompleteTask(context.Background(), "worker-1", "task-123", result)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !receivedReq.Success {
t.Error("expected success=true")
}
if receivedReq.Output != "Build successful" {
t.Errorf("expected output 'Build successful', got %s", receivedReq.Output)
}
if receivedReq.CommitSHA != "abc123" {
t.Errorf("expected commit SHA abc123, got %s", receivedReq.CommitSHA)
}
if receivedReq.DurationMs != 5000 {
t.Errorf("expected duration 5000, got %d", receivedReq.DurationMs)
}
}
func TestCompleteTask_TaskNotFound(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
_, _ = w.Write([]byte(`{"error":"task not found"}`))
}))
defer server.Close()
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
err := client.CompleteTask(context.Background(), "worker-1", "unknown", &domain.BuildResult{})
if err == nil {
t.Fatal("expected error, got nil")
}
if !containsSubstring(err.Error(), "complete task returned status 404") {
t.Errorf("expected 404 error, got %v", err)
}
}
func TestFailTask_Success(t *testing.T) {
var receivedReq FailTaskRequest
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
t.Errorf("expected POST, got %s", r.Method)
}
if r.URL.Path != "/workers/worker-1/fail/task-123" {
t.Errorf("expected /workers/worker-1/fail/task-123, got %s", r.URL.Path)
}
body, _ := io.ReadAll(r.Body)
_ = json.Unmarshal(body, &receivedReq)
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
client := NewAPIClient(APIClientConfig{BaseURL: server.URL, APIKey: "key"})
err := client.FailTask(context.Background(), "worker-1", "task-123", "build failed", "error output", 3000)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if receivedReq.Error != "build failed" {
t.Errorf("expected error 'build failed', got %s", receivedReq.Error)
}
if receivedReq.Output != "error output" {
t.Errorf("expected output 'error output', got %s", receivedReq.Output)
}
if receivedReq.DurationMs != 3000 {
t.Errorf("expected duration 3000, got %d", receivedReq.DurationMs)
}
}
func TestFailTask_ErrorStatus(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(`{"error":"database unavailable"}`))
}))
defer server.Close()
client := NewAPIClient(APIClientConfig{BaseURL: server.URL})
err := client.FailTask(context.Background(), "worker-1", "task-123", "failed", "", 0)
if err == nil {
t.Fatal("expected error, got nil")
}
if !containsSubstring(err.Error(), "fail task returned status 500") {
t.Errorf("expected 500 error, got %v", err)
}
}
func TestToWorkTask_ParsesTimestamps(t *testing.T) {
data := &WorkTaskData{
ID: "task-1",
ProjectID: "proj-1",
Type: "build",
Status: "running",
CreatedAt: "2024-01-15T10:30:00Z",
StartedAt: "2024-01-15T10:31:00Z",
}
task := data.ToWorkTask()
if task.CreatedAt.IsZero() {
t.Error("expected CreatedAt to be set")
}
if task.CreatedAt.Year() != 2024 || task.CreatedAt.Month() != 1 || task.CreatedAt.Day() != 15 {
t.Errorf("unexpected CreatedAt: %v", task.CreatedAt)
}
if task.StartedAt == nil {
t.Fatal("expected StartedAt to be set")
}
if task.StartedAt.Hour() != 10 || task.StartedAt.Minute() != 31 {
t.Errorf("unexpected StartedAt: %v", task.StartedAt)
}
}
func TestToWorkTask_InvalidTimestamp(t *testing.T) {
data := &WorkTaskData{
ID: "task-1",
ProjectID: "proj-1",
Type: "build",
Status: "running",
CreatedAt: "not-a-timestamp",
StartedAt: "also-not-valid",
}
task := data.ToWorkTask()
// Invalid timestamps should result in zero values (current behavior)
if !task.CreatedAt.IsZero() {
t.Errorf("expected zero CreatedAt for invalid timestamp, got %v", task.CreatedAt)
}
// Note: StartedAt is parsed but ignored on error, pointer becomes non-nil with zero value
}
func TestToWorkTask_EmptyTimestamps(t *testing.T) {
data := &WorkTaskData{
ID: "task-1",
ProjectID: "proj-1",
Type: "build",
Status: "pending",
CreatedAt: "",
StartedAt: "",
}
task := data.ToWorkTask()
if !task.CreatedAt.IsZero() {
t.Errorf("expected zero CreatedAt for empty string, got %v", task.CreatedAt)
}
if task.StartedAt != nil {
t.Errorf("expected nil StartedAt for empty string, got %v", task.StartedAt)
}
}
func TestToWorkTask_NilData(t *testing.T) {
var data *WorkTaskData
task := data.ToWorkTask()
if task != nil {
t.Errorf("expected nil task from nil data, got %+v", task)
}
}
func TestToWorkTask_PreservesAllFields(t *testing.T) {
data := &WorkTaskData{
ID: "task-1",
ProjectID: "proj-1",
Type: "deploy",
Status: "completed",
Priority: 10,
WorkerID: "worker-99",
CallbackURL: "https://callback.example.com",
RetryCount: 2,
MaxRetries: 5,
Spec: map[string]any{"key": "value"},
}
task := data.ToWorkTask()
if task.ID != "task-1" {
t.Errorf("expected ID task-1, got %s", task.ID)
}
if task.ProjectID != "proj-1" {
t.Errorf("expected ProjectID proj-1, got %s", task.ProjectID)
}
if task.Type != domain.WorkTaskType("deploy") {
t.Errorf("expected Type deploy, got %s", task.Type)
}
if task.Status != domain.WorkTaskStatus("completed") {
t.Errorf("expected Status completed, got %s", task.Status)
}
if task.Priority != 10 {
t.Errorf("expected Priority 10, got %d", task.Priority)
}
if task.WorkerID != "worker-99" {
t.Errorf("expected WorkerID worker-99, got %s", task.WorkerID)
}
if task.CallbackURL != "https://callback.example.com" {
t.Errorf("expected CallbackURL, got %s", task.CallbackURL)
}
if task.RetryCount != 2 {
t.Errorf("expected RetryCount 2, got %d", task.RetryCount)
}
if task.MaxRetries != 5 {
t.Errorf("expected MaxRetries 5, got %d", task.MaxRetries)
}
if task.Spec["key"] != "value" {
t.Errorf("expected Spec[key]=value, got %v", task.Spec)
}
}
// containsSubstring is a helper to check if a string contains a substring.
func containsSubstring(s, substr string) bool {
return len(s) >= len(substr) && (s == substr || len(s) > 0 && containsHelper(s, substr))
}
func containsHelper(s, substr string) bool {
for i := 0; i <= len(s)-len(substr); i++ {
if s[i:i+len(substr)] == substr {
return true
}
}
return false
}

View File

@ -104,6 +104,9 @@ func (m *mockWorkQueue) GetTask(_ context.Context, taskID string) (*domain.WorkT
} }
return task, nil return task, nil
} }
func (m *mockWorkQueue) List(_ context.Context, _ *domain.WorkTaskStatus, _ domain.WorkListOptions) (*domain.WorkListResult, error) {
return &domain.WorkListResult{}, nil
}
func (m *mockWorkQueue) ListByProject(_ context.Context, _ string, _ *domain.WorkTaskStatus, _ domain.WorkListOptions) (*domain.WorkListResult, error) { func (m *mockWorkQueue) ListByProject(_ context.Context, _ string, _ *domain.WorkTaskStatus, _ domain.WorkListOptions) (*domain.WorkListResult, error) {
return &domain.WorkListResult{}, nil return &domain.WorkListResult{}, nil
} }

View File

@ -63,6 +63,10 @@ func (m *mockMaintenanceQueue) GetTask(_ context.Context, _ string) (*domain.Wor
return nil, nil return nil, nil
} }
func (m *mockMaintenanceQueue) List(_ context.Context, _ *domain.WorkTaskStatus, _ domain.WorkListOptions) (*domain.WorkListResult, error) {
return nil, nil
}
func (m *mockMaintenanceQueue) ListByProject(_ context.Context, _ string, _ *domain.WorkTaskStatus, _ domain.WorkListOptions) (*domain.WorkListResult, error) { func (m *mockMaintenanceQueue) ListByProject(_ context.Context, _ string, _ *domain.WorkTaskStatus, _ domain.WorkListOptions) (*domain.WorkListResult, error) {
return nil, nil return nil, nil
} }