rdev/IMPLEMENTATION_PLAN_V2.md
jordan 72d16929ca feat: Implement hexagonal architecture with services, webhooks, queue, and telemetry
Major refactoring to hexagonal (ports & adapters) architecture:

- Add service layer (apikey_service, project_service) for business logic
- Add webhook system with dispatcher and delivery tracking
- Add command queue with priority-based processing
- Add rate limiting with sliding window algorithm
- Add audit logging for command execution
- Add OpenTelemetry integration (traces, metrics, spans)
- Add circuit breaker for fault tolerance
- Add cached repository wrapper for performance
- Add comprehensive validation package
- Add Kubernetes client integration for pod management
- Add database migrations (allowed_ips, audit_log, rate_limiting, queue, webhooks)
- Add network policy and PodDisruptionBudget for k8s
- Remove legacy executor and projects/registry packages
- Untrack secrets.yaml (now managed via envault)
- Add coverage.out to .gitignore
- Add e2e test infrastructure with docker-compose
- Add comprehensive documentation (API, architecture, operations, plans)
- Add golangci-lint config and pre-commit hook

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-25 19:57:46 -07:00

23 KiB

rdev Implementation Plan v2

Weeks 5-10: From 75% Complete to Pristine Production

Current State (After Week 4)

Completed

Component Status Test Coverage
Hexagonal Architecture Domain, Ports, Services
Authentication 394 lines
HTTP API + OpenAPI 1,189 lines
Command Execution 359 lines
Command Sanitization 257 lines
SSE Streaming Last-Event-ID support
Rate Limiting 413 lines
Command Limiting 414 lines
Database + Migrations Auto-migrations
Domain Models 542 lines
Port Interfaces 380 lines
Prometheus Metrics Path normalization
Validation Package 548 lines

Remaining Gaps

Gap Impact Priority
Claude config file I/O Handlers broken CRITICAL
Legacy code mixed in Technical debt HIGH
Hardcoded projects Scalability HIGH
No adapter tests Reliability HIGH
IP allowlisting Security HIGH
Production manifests Deployment MEDIUM
Validation not integrated Consistency MEDIUM
Documentation gaps Usability MEDIUM

Philosophy: Foundation First

Week 5-6: Clean the House
├── Remove all legacy code
├── Fix broken functionality
└── Achieve 100% working state

Week 7-8: Strengthen the Foundation
├── Complete test coverage
├── Add missing security features
└── Production-harden deployment

Week 9-10: Polish and Document
├── Performance optimization
├── Comprehensive documentation
└── Final quality gates

Week 5: Legacy Removal & Core Fixes

Goal: Remove all legacy code, fix Claude config, integrate validation

Task 5.1: Remove Legacy Code (4h)

Files to delete:

  • internal/executor/executor.go → replaced by internal/adapter/kubernetes/executor.go
  • internal/projects/registry.go → replaced by internal/adapter/kubernetes/project_repository.go

Files to update:

  • internal/handlers/claude_config.go → Use service layer, not legacy executor
  • cmd/rdev-api/main.go → Remove legacy imports

Acceptance:

  • go build ./... passes
  • No imports from internal/executor or internal/projects
  • All tests pass

Task 5.2: Implement Claude Config File I/O (6h)

Problem: Handlers exist but don't actually read/write files

Create:

internal/service/claude_config_service.go
internal/adapter/kubernetes/claude_config_repository.go
internal/port/claude_config_repository.go

Operations to implement:

type ClaudeConfigRepository interface {
    // List items in .claude/{type}/ directory
    List(ctx context.Context, podName, itemType string) ([]ConfigItem, error)

    // Get single item content
    Get(ctx context.Context, podName, itemType, name string) (*ConfigItem, error)

    // Create new item (write file)
    Create(ctx context.Context, podName, itemType string, item *ConfigItem) error

    // Update existing item
    Update(ctx context.Context, podName, itemType, name string, content string) error

    // Delete item (remove file)
    Delete(ctx context.Context, podName, itemType, name string) error
}

Implementation via kubectl:

# List: kubectl exec pod -- ls /workspace/.claude/commands/
# Get:  kubectl exec pod -- cat /workspace/.claude/commands/deploy.md
# Create: kubectl exec pod -- sh -c 'cat > /workspace/.claude/commands/new.md'
# Delete: kubectl exec pod -- rm /workspace/.claude/commands/old.md

Acceptance:

  • Can list/create/read/update/delete commands, skills, agents via API
  • E2E test proves round-trip works

Task 5.3: Integrate Validation Package (3h)

Replace inline checks with validate package:

Before:

if req.Name == "" {
    api.WriteBadRequest(w, r, "name is required")
    return
}

After:

v := validate.New()
v.Required(req.Name, "name")
v.Name(req.Name, "name") // alphanumeric, 1-64 chars
if err := v.Error(); err != nil {
    api.WriteBadRequest(w, r, err.Error())
    return
}

Files to update:

  • internal/handlers/keys.go
  • internal/handlers/projects.go
  • internal/handlers/claude_config.go
  • internal/service/project_service.go

Acceptance:

  • All inline validation replaced with validate package
  • Consistent error messages across all endpoints
  • All handler tests pass

Task 5.4: Consolidate Docker Images (1h)

Current state: 4 Dockerfiles with unclear purpose

Action:

  • Keep Dockerfile as single canonical image
  • Delete Dockerfile.api, Dockerfile.api.prebuild, Dockerfile.api.simple
  • Update any CI/scripts referencing old files

Acceptance:

  • Single Dockerfile builds and runs correctly
  • No references to deleted Dockerfiles

Week 6: Dynamic Project Discovery

Goal: Remove hardcoded projects, discover from K8s

Task 6.1: Define Project Labels (1h)

K8s label convention:

metadata:
  labels:
    rdev.orchard9.ai/project: "true"
    rdev.orchard9.ai/name: "pantheon"
    rdev.orchard9.ai/workspace: "/workspace"
  annotations:
    rdev.orchard9.ai/description: "Go API backend"

Update existing pods:

  • claudebox-pantheon-0
  • claudebox-aeries-0

Task 6.2: Implement Label Discovery (4h)

Update internal/adapter/kubernetes/project_repository.go:

func (r *ProjectRepository) RefreshStatus(ctx context.Context) error {
    // List pods with label rdev.orchard9.ai/project=true
    pods, err := r.client.CoreV1().Pods(r.namespace).List(ctx, metav1.ListOptions{
        LabelSelector: "rdev.orchard9.ai/project=true",
    })

    // For each pod, extract project info from labels
    for _, pod := range pods.Items {
        project := domain.Project{
            ID:          domain.ProjectID(pod.Labels["rdev.orchard9.ai/name"]),
            Name:        pod.Labels["rdev.orchard9.ai/name"],
            Description: pod.Annotations["rdev.orchard9.ai/description"],
            PodName:     pod.Name,
            Workspace:   pod.Labels["rdev.orchard9.ai/workspace"],
            Status:      mapPodPhase(pod.Status.Phase),
        }
        r.register(project)
    }
}

Acceptance:

  • Projects auto-discovered from labeled pods
  • No hardcoded project list
  • New pods automatically appear

Task 6.3: Add Project ConfigMap Support (3h)

For complex project configuration:

apiVersion: v1
kind: ConfigMap
metadata:
  name: rdev-projects
data:
  pantheon.yaml: |
    name: pantheon
    description: Go API backend
    pod_selector: claudebox-pantheon-0
    workspace: /workspace
    allowed_commands:
      - claude
      - shell
      - git
    max_concurrent_commands: 5    

Implementation:

  • Read ConfigMap on startup
  • Merge with label-discovered projects
  • ConfigMap takes precedence for settings

Task 6.4: Pod Watch for Real-Time Updates (4h)

Instead of polling, watch for changes:

func (r *ProjectRepository) StartWatching(ctx context.Context) error {
    watcher, err := r.client.CoreV1().Pods(r.namespace).Watch(ctx, metav1.ListOptions{
        LabelSelector: "rdev.orchard9.ai/project=true",
    })

    go func() {
        for event := range watcher.ResultChan() {
            switch event.Type {
            case watch.Added:
                r.register(podToProject(event.Object))
            case watch.Deleted:
                r.unregister(podToProjectID(event.Object))
            case watch.Modified:
                r.update(podToProject(event.Object))
            }
        }
    }()
}

Acceptance:

  • Projects appear within 1s of pod creation
  • Projects disappear within 1s of pod deletion
  • No polling required

Week 7: Security & Test Completion

Goal: IP allowlisting, comprehensive adapter tests

Task 7.1: IP Allowlisting (4h)

Schema update:

ALTER TABLE api_keys ADD COLUMN allowed_ips CIDR[];

Domain update:

type APIKey struct {
    // ... existing fields
    AllowedIPs []net.IPNet `json:"allowed_ips,omitempty"`
}

Middleware update:

func (m *AuthMiddleware) checkIPAllowed(key *domain.APIKey, clientIP string) bool {
    if len(key.AllowedIPs) == 0 {
        return true // No restriction
    }
    ip := net.ParseIP(clientIP)
    for _, allowed := range key.AllowedIPs {
        if allowed.Contains(ip) {
            return true
        }
    }
    return false
}

Acceptance:

  • Keys can have IP restrictions
  • Requests from non-allowed IPs get 403
  • Admin can create unrestricted keys

Task 7.2: Adapter Integration Tests (6h)

Create test infrastructure:

tests/
├── integration/
│   ├── postgres_test.go      # Real postgres via docker
│   ├── kubernetes_test.go    # Mock kubectl
│   └── testdata/
│       └── docker-compose.yml

Postgres adapter tests:

  • CRUD operations for API keys
  • Scope/project array handling
  • Connection pool behavior
  • Migration idempotency

Kubernetes adapter tests:

  • Mock kubectl responses
  • Command execution with output
  • Error handling (pod not found, timeout)
  • Claude config file operations

Memory adapter tests:

  • Stream publisher pub/sub
  • Event replay buffer
  • Concurrent subscriber handling

Acceptance:

  • All adapters have >80% coverage
  • Tests run in CI without real K8s
  • Docker-compose for postgres tests

Task 7.3: Service Layer Tests (4h)

Create:

internal/service/project_service_test.go
internal/service/apikey_service_test.go
internal/service/claude_config_service_test.go

Test patterns:

  • Happy path for all operations
  • Error propagation from adapters
  • Business rule enforcement
  • Metrics recording

Task 7.4: Improve E2E Test Coverage (4h)

Expand tests/e2e/e2e_test.go:

func TestE2E_FullCommandLifecycle(t *testing.T) {
    // 1. Create API key
    // 2. Execute claude command
    // 3. Stream output via SSE
    // 4. Verify completion event
    // 5. Check metrics incremented
}

func TestE2E_RateLimiting(t *testing.T) {
    // Send 101 requests rapidly
    // Verify 429 on 101st request
    // Wait for bucket refill
    // Verify request succeeds
}

func TestE2E_SSEReconnection(t *testing.T) {
    // Start command
    // Connect to stream
    // Disconnect
    // Reconnect with Last-Event-ID
    // Verify replay
}

func TestE2E_ConcurrentCommands(t *testing.T) {
    // Start 5 commands
    // Verify 6th blocked
    // Complete one
    // Verify 6th now succeeds
}

Week 8: Production Hardening

Goal: Production-ready K8s manifests, reliability features

Task 8.1: K8s Manifest Hardening (4h)

Update deployments/k8s/base/:

# deployment.yaml
spec:
  template:
    spec:
      containers:
        - name: rdev-api
          resources:
            requests:
              memory: "128Mi"
              cpu: "100m"
            limits:
              memory: "512Mi"
              cpu: "500m"
          livenessProbe:
            httpGet:
              path: /health
              port: 8080
            initialDelaySeconds: 5
            periodSeconds: 10
          readinessProbe:
            httpGet:
              path: /ready
              port: 8080
            initialDelaySeconds: 5
            periodSeconds: 5
          securityContext:
            runAsNonRoot: true
            readOnlyRootFilesystem: true
            capabilities:
              drop: ["ALL"]
# pdb.yaml
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
  name: rdev-api-pdb
spec:
  minAvailable: 1
  selector:
    matchLabels:
      app: rdev-api
# network-policy.yaml
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: rdev-api-policy
spec:
  podSelector:
    matchLabels:
      app: rdev-api
  policyTypes:
    - Ingress
    - Egress
  ingress:
    - from:
        - namespaceSelector:
            matchLabels:
              name: ingress
      ports:
        - port: 8080
  egress:
    - to:
        - namespaceSelector:
            matchLabels:
              name: databases
      ports:
        - port: 5432
    - to:
        - podSelector:
            matchLabels:
              rdev.orchard9.ai/project: "true"

Task 8.2: RBAC Configuration (2h)

# rbac.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
  name: rdev-api
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: rdev-api-role
rules:
  - apiGroups: [""]
    resources: ["pods"]
    verbs: ["get", "list", "watch"]
  - apiGroups: [""]
    resources: ["pods/exec"]
    verbs: ["create"]
  - apiGroups: [""]
    resources: ["configmaps"]
    verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: rdev-api-binding
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: rdev-api-role
subjects:
  - kind: ServiceAccount
    name: rdev-api

Task 8.3: Graceful Shutdown (3h)

// cmd/rdev-api/main.go
func main() {
    // ... setup ...

    srv := &http.Server{
        Addr:    cfg.Addr,
        Handler: router,
    }

    // Start server
    go func() {
        if err := srv.ListenAndServe(); err != http.ErrServerClosed {
            log.Fatal(err)
        }
    }()

    // Wait for interrupt
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    <-quit

    // Graceful shutdown
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    // Stop accepting new requests
    srv.SetKeepAlivesEnabled(false)

    // Wait for active requests
    if err := srv.Shutdown(ctx); err != nil {
        log.Error("forced shutdown", "error", err)
    }

    // Close database connections
    db.Close()

    log.Info("server stopped gracefully")
}

Task 8.4: Circuit Breaker for K8s (3h)

Protect against K8s API failures:

type CircuitBreaker struct {
    failures    int
    threshold   int
    resetAfter  time.Duration
    lastFailure time.Time
    state       State // Closed, Open, HalfOpen
    mu          sync.RWMutex
}

func (cb *CircuitBreaker) Execute(fn func() error) error {
    cb.mu.RLock()
    if cb.state == Open && time.Since(cb.lastFailure) < cb.resetAfter {
        cb.mu.RUnlock()
        return ErrCircuitOpen
    }
    cb.mu.RUnlock()

    err := fn()

    cb.mu.Lock()
    defer cb.mu.Unlock()
    if err != nil {
        cb.failures++
        cb.lastFailure = time.Now()
        if cb.failures >= cb.threshold {
            cb.state = Open
        }
    } else {
        cb.failures = 0
        cb.state = Closed
    }
    return err
}

Task 8.5: Health Check Enhancements (2h)

// /health - Basic liveness
func (h *HealthHandler) Health(w http.ResponseWriter, r *http.Request) {
    api.WriteSuccess(w, r, map[string]string{"status": "ok"})
}

// /ready - Full readiness
func (h *HealthHandler) Ready(w http.ResponseWriter, r *http.Request) {
    checks := make(map[string]string)

    // Database connectivity
    if err := h.db.PingContext(r.Context()); err != nil {
        checks["database"] = "unhealthy: " + err.Error()
    } else {
        checks["database"] = "healthy"
    }

    // K8s connectivity
    if err := h.k8sClient.Ping(r.Context()); err != nil {
        checks["kubernetes"] = "unhealthy: " + err.Error()
    } else {
        checks["kubernetes"] = "healthy"
    }

    // Check for any unhealthy
    for _, status := range checks {
        if strings.HasPrefix(status, "unhealthy") {
            api.WriteError(w, r, http.StatusServiceUnavailable,
                "NOT_READY", "service not ready", checks)
            return
        }
    }

    api.WriteSuccess(w, r, map[string]any{
        "status": "ready",
        "checks": checks,
    })
}

Week 9: Performance & Observability

Goal: OpenTelemetry, performance optimization

Task 9.1: OpenTelemetry Integration (6h)

Add tracing:

// cmd/rdev-api/main.go
func initTracing() (*sdktrace.TracerProvider, error) {
    exporter, err := otlptracehttp.New(context.Background(),
        otlptracehttp.WithEndpoint(os.Getenv("OTEL_EXPORTER_ENDPOINT")),
    )
    if err != nil {
        return nil, err
    }

    tp := sdktrace.NewTracerProvider(
        sdktrace.WithBatcher(exporter),
        sdktrace.WithResource(resource.NewWithAttributes(
            semconv.SchemaURL,
            semconv.ServiceName("rdev-api"),
            semconv.ServiceVersion(Version),
        )),
    )
    otel.SetTracerProvider(tp)
    return tp, nil
}

Instrument handlers:

func (h *ProjectsHandler) RunClaude(w http.ResponseWriter, r *http.Request) {
    ctx, span := tracer.Start(r.Context(), "RunClaude")
    defer span.End()

    span.SetAttributes(
        attribute.String("project.id", projectID),
        attribute.String("command.type", "claude"),
    )

    // ... handler logic ...

    if err != nil {
        span.RecordError(err)
        span.SetStatus(codes.Error, err.Error())
    }
}

Task 9.2: Connection Pool Tuning (2h)

Database:

db.SetMaxOpenConns(25)
db.SetMaxIdleConns(10)
db.SetConnMaxLifetime(5 * time.Minute)
db.SetConnMaxIdleTime(1 * time.Minute)

HTTP client for K8s:

transport := &http.Transport{
    MaxIdleConns:        100,
    MaxIdleConnsPerHost: 10,
    IdleConnTimeout:     90 * time.Second,
}

Task 9.3: Response Caching (3h)

Cache project list (changes infrequently):

type CachedProjectRepository struct {
    inner     port.ProjectRepository
    cache     *sync.Map
    ttl       time.Duration
    lastFetch time.Time
    mu        sync.RWMutex
}

func (r *CachedProjectRepository) List(ctx context.Context) ([]domain.Project, error) {
    r.mu.RLock()
    if time.Since(r.lastFetch) < r.ttl {
        if cached, ok := r.cache.Load("projects"); ok {
            r.mu.RUnlock()
            return cached.([]domain.Project), nil
        }
    }
    r.mu.RUnlock()

    r.mu.Lock()
    defer r.mu.Unlock()

    // Double-check after acquiring write lock
    if time.Since(r.lastFetch) < r.ttl {
        if cached, ok := r.cache.Load("projects"); ok {
            return cached.([]domain.Project), nil
        }
    }

    projects, err := r.inner.List(ctx)
    if err != nil {
        return nil, err
    }

    r.cache.Store("projects", projects)
    r.lastFetch = time.Now()
    return projects, nil
}

Task 9.4: Benchmark Suite (3h)

// internal/handlers/projects_bench_test.go

func BenchmarkRunClaude(b *testing.B) {
    // Setup
    handler := setupTestHandler()

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        req := httptest.NewRequest("POST", "/projects/test/claude",
            strings.NewReader(`{"prompt":"test"}`))
        rec := httptest.NewRecorder()
        handler.RunClaude(rec, req)
    }
}

func BenchmarkSSEStreaming(b *testing.B) {
    // Measure event throughput
}

func BenchmarkAuthMiddleware(b *testing.B) {
    // Measure auth overhead
}

Week 10: Documentation & Polish

Goal: Comprehensive docs, final quality pass

Task 10.1: Architecture Documentation (4h)

Create docs/architecture/:

docs/architecture/
├── README.md           # Overview + diagrams
├── hexagonal.md        # Port/adapter pattern
├── security.md         # Auth, sanitization, rate limiting
├── streaming.md        # SSE protocol, reconnection
└── diagrams/
    ├── system-context.mmd
    ├── component.mmd
    └── sequence-command.mmd

Include:

  • System context diagram
  • Component diagram
  • Sequence diagrams for key flows
  • ADRs (Architecture Decision Records)

Task 10.2: API Documentation (3h)

Enhance OpenAPI spec:

  • Add examples for all endpoints
  • Document error codes
  • Add authentication examples
  • Include rate limit headers

Create docs/api/:

  • Quick start guide
  • Authentication guide
  • SSE client examples (JS, Python, Go)
  • Error handling guide

Task 10.3: Operations Documentation (3h)

Create docs/operations/:

docs/operations/
├── deployment.md       # K8s deployment guide
├── monitoring.md       # Prometheus/Grafana setup
├── troubleshooting.md  # Common issues
├── runbooks/
│   ├── high-cpu.md
│   ├── high-memory.md
│   ├── pod-not-found.md
│   └── auth-failures.md
└── disaster-recovery.md

Task 10.4: Final Quality Gate (4h)

Run comprehensive checks:

# Static analysis
golangci-lint run ./...

# Security scan
gosec ./...

# Test coverage
go test -coverprofile=coverage.out ./...
go tool cover -html=coverage.out -o coverage.html

# Benchmark baseline
go test -bench=. -benchmem ./... > benchmark.txt

# Dependency audit
go list -m all | nancy sleuth

# Build all targets
go build ./...
GOOS=linux GOARCH=amd64 go build ./...

# Docker build
docker build -t rdev-api:latest .

Coverage targets:

Package Target
internal/auth >90%
internal/handlers >85%
internal/service >90%
internal/adapter/* >80%
internal/domain >95%

Task 10.5: Release Preparation (2h)

Create release checklist:

## v1.0.0 Release Checklist

### Pre-release
- [ ] All tests pass
- [ ] Coverage targets met
- [ ] Security scan clean
- [ ] Benchmarks acceptable
- [ ] Documentation complete
- [ ] CHANGELOG.md updated
- [ ] Version bumped

### Release
- [ ] Tag created
- [ ] Docker image built and pushed
- [ ] K8s manifests updated
- [ ] Release notes published

### Post-release
- [ ] Smoke test in staging
- [ ] Monitor error rates
- [ ] Monitor latency
- [ ] Announce to users

Summary: Week-by-Week

Week Focus Key Deliverables
5 Legacy Removal & Core Fixes Clean codebase, working Claude config, integrated validation
6 Dynamic Project Discovery Label-based discovery, ConfigMap support, pod watching
7 Security & Tests IP allowlisting, adapter tests, service tests, E2E
8 Production Hardening K8s manifests, RBAC, graceful shutdown, circuit breaker
9 Performance & Observability OpenTelemetry, connection tuning, caching, benchmarks
10 Documentation & Polish Architecture docs, API docs, ops docs, final QA

Success Criteria: Pristine Project

Code Quality

  • No legacy code remaining
  • 100% of handlers use service layer
  • All validation via validate package
  • Consistent error handling throughout
  • No TODO/FIXME without ticket

Test Coverage

  • >85% overall coverage
  • All adapters have integration tests
  • E2E tests cover all user journeys
  • Benchmark suite for performance regression

Security

  • Command sanitization (shell injection)
  • IP allowlisting support
  • Rate limiting enforced
  • Secrets never logged
  • RBAC configured

Production Ready

  • Resource limits set
  • Health/readiness probes
  • Graceful shutdown
  • Network policies
  • PodDisruptionBudget
  • Monitoring dashboards

Documentation

  • Architecture documented
  • API fully documented with examples
  • Operations runbooks
  • Troubleshooting guide
  • Deployment guide

Observability

  • Prometheus metrics
  • OpenTelemetry tracing
  • Structured logging
  • Error tracking

Estimated Effort

Week Hours
5 14h
6 12h
7 18h
8 14h
9 14h
10 16h
Total 88h

At ~15h/week pace: 6 weeks to pristine. At ~30h/week pace: 3 weeks to pristine.