# rdev-api Implementation Plan > Refactoring to Hexagonal Architecture + Production Hardening ## Current State Assessment ### What's Already Implemented (Working) | Component | Status | Lines | Notes | |-----------|--------|-------|-------| | HTTP Handlers | ✅ Complete | 611 | All 6 project endpoints + 4 key endpoints | | Executor | ✅ Complete | 183 | kubectl exec with streaming output | | SSE Streaming | ✅ Complete | ~100 | Channel-based pub/sub with heartbeats | | Auth Service | ✅ Complete | 561 | Keys, scopes, middleware, expiration | | Database | ✅ Complete | 173 | Auto-migrations, connection pooling | | API Framework | ✅ Complete | 298 | Response envelopes, OpenAPI, chi router | ### What's Missing | Component | Priority | Impact | |-----------|----------|--------| | Tests | HIGH | No unit or integration tests | | Dynamic Projects | MEDIUM | Hardcoded to 2 projects | | Project Access Control | HIGH | Middleware exists but unused | | Rate Limiting | HIGH | No request throttling | | Metrics | MEDIUM | No Prometheus/observability | | Retry Logic | MEDIUM | No transient failure recovery | | Command Cancellation | LOW | No explicit kill mechanism | --- ## Architecture Analysis ### Current: Layered (Not Hexagonal) ``` ┌─────────────────────────────────────────────┐ │ HTTP Layer (handlers/) │ │ - Chi router, request parsing, responses │ ├─────────────────────────────────────────────┤ │ Service Layer (auth/, executor/) │ │ - Business logic mixed with infrastructure │ ├─────────────────────────────────────────────┤ │ Data Layer (db/, projects/) │ │ - Direct PostgreSQL, kubectl calls │ └─────────────────────────────────────────────┘ ``` **Problems:** 1. Handlers directly depend on concrete implementations 2. No interfaces for testability 3. Executor calls kubectl directly (hard to test) 4. Registry hardcodes project definitions 5. No domain model - just request/response structs ### Target: Hexagonal Architecture ``` ┌─────────────────────┐ │ Domain (Core) │ │ │ │ - Project │ │ - Command │ │ - APIKey │ │ - CommandResult │ │ - Stream │ └─────────────────────┘ ▲ ┌──────────────┼──────────────┐ │ │ │ ┌─────┴─────┐ ┌─────┴─────┐ ┌─────┴─────┐ │ Ports │ │ Ports │ │ Ports │ │ (Inbound) │ │ (Outbound)│ │ (Outbound)│ └───────────┘ └───────────┘ └───────────┘ │ │ │ ┌─────┴─────┐ ┌─────┴─────┐ ┌─────┴─────┐ │ Adapters │ │ Adapters │ │ Adapters │ │ (HTTP) │ │ (kubectl) │ │ (Postgres)│ └───────────┘ └───────────┘ └───────────┘ ``` --- ## Risk Analysis: What Could Go Wrong ### SSE Streaming Risks | Risk | Severity | Mitigation | |------|----------|------------| | **Memory leak from abandoned streams** | HIGH | Stream cleanup after 30s (implemented), but no max stream limit | | **Buffer overflow** | MEDIUM | 100-event buffer with non-blocking send (drops events) | | **Client reconnection gaps** | MEDIUM | No event replay - missed events are lost | | **Proxy buffering** | MEDIUM | Headers set correctly, but nginx/cloudflare may still buffer | | **Connection hijacking** | LOW | No stream authentication after initial auth | **Current Implementation Strengths:** - Heartbeat keeps connection alive (30s) - Context cancellation on client disconnect - Non-blocking sends prevent goroutine leaks - Completion event triggers cleanup **Gaps to Address:** 1. No max concurrent streams per user 2. No event ID for resumability (Last-Event-ID) 3. No backpressure signaling to clients 4. Stream state not persisted (restart loses all) ### Executor Risks | Risk | Severity | Mitigation | |------|----------|------------| | **kubectl timeout** | HIGH | 10-minute context timeout (implemented) | | **Pod not found** | HIGH | Registry refresh, but no retry | | **Large output OOM** | MEDIUM | 1MB line buffer, but no total limit | | **Command injection** | HIGH | Args passed directly to kubectl (needs validation) | | **Zombie processes** | MEDIUM | Context cancellation, but no kill -9 | **Current Implementation Strengths:** - Context-based cancellation - Line-by-line streaming (not buffering full output) - Separate stdout/stderr handling - Exit code capture **Gaps to Address:** 1. No command sanitization (shell injection risk) 2. No output size limit (could fill memory) 3. No concurrent command limit per project 4. No pod health check before exec ### Auth Risks | Risk | Severity | Mitigation | |------|----------|------------| | **Timing attack on key comparison** | MEDIUM | Using crypto/subtle (verify!) | | **Key enumeration** | LOW | Generic "invalid key" errors | | **Revocation delay** | MEDIUM | No cache - checks DB each request | | **Admin key exposure** | HIGH | Env var, but logged if DEBUG | **Current Implementation Strengths:** - SHA-256 hashed keys (never stored plain) - Scoped permissions - Expiration enforcement - last_used_at audit trail **Gaps to Address:** 1. Verify constant-time comparison is used 2. Add key rotation support 3. Add IP allowlisting per key 4. Rate limit failed auth attempts ### Database Risks | Risk | Severity | Mitigation | |------|----------|------------| | **Connection pool exhaustion** | MEDIUM | Fixed at 10 max connections | | **Migration failure on startup** | HIGH | Fails fast (correct), but no rollback | | **Query timeout** | MEDIUM | No explicit query timeouts set | --- ## Hexagonal Refactoring Plan ### Phase 1: Define Domain Model Create pure domain types with no external dependencies. ```go // internal/domain/project.go package domain type ProjectID string type CommandID string type Project struct { ID ProjectID Name string Description string PodName string Status ProjectStatus Workspace string } type ProjectStatus string const ( ProjectStatusRunning ProjectStatus = "running" ProjectStatusPending ProjectStatus = "pending" ProjectStatusFailed ProjectStatus = "failed" ProjectStatusNotFound ProjectStatus = "not_found" ) ``` ```go // internal/domain/command.go package domain type CommandType string const ( CommandTypeClaude CommandType = "claude" CommandTypeShell CommandType = "shell" CommandTypeGit CommandType = "git" ) type Command struct { ID CommandID ProjectID ProjectID Type CommandType Args []string StartedAt time.Time } type CommandResult struct { CommandID CommandID ExitCode int DurationMs int64 Error error } type OutputLine struct { Stream string // "stdout" or "stderr" Line string Time time.Time } ``` ### Phase 2: Define Ports (Interfaces) ```go // internal/port/project_repository.go package port type ProjectRepository interface { List(ctx context.Context) ([]domain.Project, error) Get(ctx context.Context, id domain.ProjectID) (*domain.Project, error) RefreshStatus(ctx context.Context) error } ``` ```go // internal/port/command_executor.go package port type CommandExecutor interface { Execute(ctx context.Context, cmd *domain.Command, onOutput func(domain.OutputLine)) (*domain.CommandResult, error) Cancel(ctx context.Context, cmdID domain.CommandID) error } ``` ```go // internal/port/stream_publisher.go package port type StreamPublisher interface { Subscribe(streamID string) (<-chan StreamEvent, func()) Publish(streamID string, event StreamEvent) Close(streamID string) } type StreamEvent struct { Type string Data map[string]any } ``` ```go // internal/port/api_key_repository.go package port type APIKeyRepository interface { Create(ctx context.Context, key *domain.APIKey) error GetByHash(ctx context.Context, hash string) (*domain.APIKey, error) List(ctx context.Context) ([]domain.APIKey, error) Revoke(ctx context.Context, id uuid.UUID) error UpdateLastUsed(ctx context.Context, id uuid.UUID) error } ``` ### Phase 3: Create Adapters ``` internal/ ├── adapter/ │ ├── http/ # HTTP handlers (inbound adapter) │ │ ├── project_handler.go │ │ ├── key_handler.go │ │ └── middleware.go │ ├── kubernetes/ # kubectl executor (outbound adapter) │ │ └── executor.go │ ├── postgres/ # Database (outbound adapter) │ │ ├── project_repo.go │ │ └── key_repo.go │ └── memory/ # In-memory (for testing) │ ├── project_repo.go │ └── stream_publisher.go ``` ### Phase 4: Wire with Dependency Injection ```go // cmd/rdev-api/main.go func main() { // Create adapters db := postgres.NewDB(cfg) projectRepo := postgres.NewProjectRepository(db) keyRepo := postgres.NewAPIKeyRepository(db) executor := kubernetes.NewExecutor(cfg.Namespace) streamPub := memory.NewStreamPublisher() // Create services (use cases) projectSvc := service.NewProjectService(projectRepo, executor, streamPub) keySvc := service.NewAPIKeyService(keyRepo) // Create HTTP handlers projectHandler := http.NewProjectHandler(projectSvc) keyHandler := http.NewKeyHandler(keySvc) // Wire routes router := chi.NewRouter() projectHandler.Mount(router) keyHandler.Mount(router) } ``` --- ## Implementation Tasks ### Epic 1: Testing Foundation (Priority: CRITICAL) Without tests, refactoring is dangerous. | Task | Effort | Files | |------|--------|-------| | 1.1 Add test utilities (fixtures, mocks) | 2h | `internal/testutil/` | | 1.2 Unit tests for auth/keys.go | 2h | `internal/auth/keys_test.go` | | 1.3 Unit tests for auth/service.go | 3h | `internal/auth/service_test.go` | | 1.4 Unit tests for executor | 2h | `internal/executor/executor_test.go` | | 1.5 Integration tests for handlers | 4h | `internal/handlers/*_test.go` | | 1.6 E2E test with docker-compose | 4h | `tests/e2e/` | **Test Strategy:** - Unit tests: Mock all dependencies via interfaces - Integration tests: Use real postgres (docker), mock kubectl - E2E tests: Full stack with test k8s cluster or kind ### Epic 2: Security Hardening (Priority: HIGH) | Task | Effort | Risk Mitigated | |------|--------|----------------| | 2.1 Command sanitization | 2h | Shell injection | | 2.2 Rate limiting middleware | 3h | DoS, brute force | | 2.3 Concurrent command limit | 2h | Resource exhaustion | | 2.4 Output size limit | 1h | OOM | | 2.5 Constant-time key comparison audit | 1h | Timing attacks | **Command Sanitization Rules:** ```go // Disallow these in shell commands: var dangerousPatterns = []string{ ";", "&&", "||", "|", "`", "$(", // Command chaining ">", ">>", "<", // Redirects "rm -rf", "dd if=", // Destructive } ``` ### Epic 3: Hexagonal Refactoring (Priority: MEDIUM) | Task | Effort | Impact | |------|--------|--------| | 3.1 Create domain package | 2h | Foundation | | 3.2 Define port interfaces | 2h | Testability | | 3.3 Extract kubernetes adapter | 3h | Mockable executor | | 3.4 Extract postgres adapter | 3h | Mockable repos | | 3.5 Create memory adapter (tests) | 2h | Fast tests | | 3.6 Refactor handlers to use ports | 4h | Decoupling | | 3.7 Add dependency injection | 2h | Flexibility | ### Epic 4: SSE Improvements (Priority: MEDIUM) | Task | Effort | Benefit | |------|--------|---------| | 4.1 Add Last-Event-ID support | 3h | Resumable connections | | 4.2 Max streams per user | 2h | Resource control | | 4.3 Event replay buffer | 4h | Catch missed events | | 4.4 Backpressure detection | 2h | Client health | **Event ID Format:** ``` event: output id: cmd-pantheon-001:42 data: {"line": "Building..."} ``` ### Epic 5: Observability (Priority: MEDIUM) | Task | Effort | Benefit | |------|--------|---------| | 5.1 Prometheus metrics | 3h | Monitoring | | 5.2 Request tracing (OpenTelemetry) | 4h | Debugging | | 5.3 Structured logging improvements | 2h | Log analysis | | 5.4 Health check enhancements | 1h | K8s probes | **Key Metrics:** ``` rdev_commands_total{project, type, status} rdev_command_duration_seconds{project, type} rdev_active_streams{project} rdev_auth_failures_total{reason} rdev_api_request_duration_seconds{endpoint, method, status} ``` ### Epic 6: Dynamic Project Discovery (Priority: LOW) | Task | Effort | Benefit | |------|--------|---------| | 6.1 K8s label-based discovery | 3h | No hardcoding | | 6.2 Project config from ConfigMap | 2h | External config | | 6.3 Watch for pod changes | 3h | Real-time updates | **Label Convention:** ```yaml metadata: labels: rdev.orchard9.ai/project: "true" rdev.orchard9.ai/name: "pantheon" rdev.orchard9.ai/description: "Go API backend" ``` --- ## Implementation Order ``` Week 1: Testing Foundation ├── 1.1 Test utilities ├── 1.2 Auth key tests ├── 1.3 Auth service tests └── 1.4 Executor tests Week 2: Security + More Tests ├── 2.1 Command sanitization ├── 2.2 Rate limiting ├── 1.5 Handler integration tests └── 2.3 Concurrent command limit Week 3: Hexagonal Refactoring ├── 3.1 Domain package ├── 3.2 Port interfaces ├── 3.3 Kubernetes adapter └── 3.4 Postgres adapter Week 4: Production Hardening ├── 3.5-3.7 Complete hexagonal ├── 4.1 SSE Last-Event-ID ├── 5.1 Prometheus metrics └── 1.6 E2E tests ``` --- ## File Structure After Refactoring ``` internal/ ├── domain/ # Pure domain model (no deps) │ ├── project.go │ ├── command.go │ ├── apikey.go │ └── errors.go ├── port/ # Interface definitions │ ├── project_repository.go │ ├── command_executor.go │ ├── apikey_repository.go │ └── stream_publisher.go ├── service/ # Use cases / business logic │ ├── project_service.go │ ├── command_service.go │ └── apikey_service.go ├── adapter/ │ ├── http/ # Inbound: HTTP handlers │ │ ├── project_handler.go │ │ ├── key_handler.go │ │ ├── middleware/ │ │ │ ├── auth.go │ │ │ └── ratelimit.go │ │ └── response.go │ ├── kubernetes/ # Outbound: kubectl │ │ └── executor.go │ ├── postgres/ # Outbound: database │ │ ├── project_repo.go │ │ ├── apikey_repo.go │ │ └── migrations/ │ └── memory/ # Outbound: in-memory (testing) │ ├── project_repo.go │ ├── apikey_repo.go │ └── stream_publisher.go └── config/ # Configuration loading └── config.go ``` --- ## Success Criteria ### Must Have (v0.6) - [ ] 80%+ test coverage on auth and executor - [ ] Command input sanitization - [ ] Rate limiting (100 req/min per key) - [ ] No shell injection vulnerabilities ### Should Have (v0.7) - [ ] Hexagonal architecture complete - [ ] Prometheus metrics endpoint - [ ] SSE reconnection support ### Nice to Have (v0.8+) - [ ] Dynamic project discovery - [ ] OpenTelemetry tracing - [ ] Command cancellation API --- ## Appendix: SSE Event Protocol ### Event Types ``` event: connected data: {"project":"pantheon","stream_id":"cmd-001"} event: output id: cmd-001:1 data: {"line":"Starting build...","stream":"stdout"} event: output id: cmd-001:2 data: {"line":"Warning: deprecated API","stream":"stderr"} event: heartbeat data: {"timestamp":"2026-01-25T07:00:00Z"} event: complete id: cmd-001:final data: {"exit_code":0,"duration_ms":4523} event: error data: {"code":"POD_NOT_FOUND","message":"claudebox-pantheon-0 not running"} ``` ### Client Reconnection ```javascript const events = new EventSource('/projects/pantheon/events?stream_id=cmd-001', { headers: { 'Authorization': 'Bearer rdev_sk_...' } }); // On reconnect, browser sends Last-Event-ID header automatically // Server should replay events since that ID ``` --- ## Appendix: Command Execution Flow ``` ┌─────────┐ ┌─────────┐ ┌──────────┐ ┌─────────┐ │ Client │ │ Handler │ │ Executor │ │ Pod │ └────┬────┘ └────┬────┘ └────┬─────┘ └────┬────┘ │ │ │ │ │ POST /claude │ │ │ │──────────────>│ │ │ │ │ │ │ │ 201 Created │ │ │ │<──────────────│ │ │ │ │ │ │ │ GET /events │ go execute() │ │ │──────────────>│──────────────>│ │ │ │ │ kubectl exec │ │ │ │───────────────>│ │ │ │ │ │ │ onOutput() │ stdout/stderr │ │ SSE output │<──────────────│<───────────────│ │<──────────────│ │ │ │ │ │ │ │ SSE output │ onOutput() │ stdout/stderr │ │<──────────────│<──────────────│<───────────────│ │ │ │ │ │ SSE complete │ result │ exit code │ │<──────────────│<──────────────│<───────────────│ │ │ │ │ ``` --- ## Decision Log | Decision | Rationale | Date | |----------|-----------|------| | Use hexagonal architecture | Testability, flexibility, clean boundaries | 2026-01-25 | | Keep SSE (not WebSocket) | Simpler, HTTP/2 multiplexing, auto-reconnect | 2026-01-25 | | Prioritize tests over refactoring | Can't safely refactor without tests | 2026-01-25 | | Port 5433 for local dev | Avoid conflicts with system postgres | 2026-01-25 |