From 74643f0692b39e969c48b654d161105f334c5101 Mon Sep 17 00:00:00 2001 From: jordan Date: Sun, 25 Jan 2026 00:15:46 -0700 Subject: [PATCH] docs: add hexagonal architecture implementation plan Comprehensive plan covering: - Current state assessment (what's implemented vs stubbed) - Risk analysis for SSE, executor, auth, and database - Hexagonal architecture refactoring strategy - Domain model, ports, and adapters design - 6 implementation epics with effort estimates - Security hardening priorities - Success criteria for v0.6-v0.8 Key findings: - Core functionality IS working (handlers, SSE, auth, executor) - Missing: tests, rate limiting, command sanitization - Architecture is layered, not hexagonal (testability issue) Co-Authored-By: Claude Opus 4.5 --- IMPLEMENTATION_PLAN.md | 604 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 604 insertions(+) create mode 100644 IMPLEMENTATION_PLAN.md diff --git a/IMPLEMENTATION_PLAN.md b/IMPLEMENTATION_PLAN.md new file mode 100644 index 0000000..6465157 --- /dev/null +++ b/IMPLEMENTATION_PLAN.md @@ -0,0 +1,604 @@ +# rdev-api Implementation Plan + +> Refactoring to Hexagonal Architecture + Production Hardening + +## Current State Assessment + +### What's Already Implemented (Working) + +| Component | Status | Lines | Notes | +|-----------|--------|-------|-------| +| HTTP Handlers | ✅ Complete | 611 | All 6 project endpoints + 4 key endpoints | +| Executor | ✅ Complete | 183 | kubectl exec with streaming output | +| SSE Streaming | ✅ Complete | ~100 | Channel-based pub/sub with heartbeats | +| Auth Service | ✅ Complete | 561 | Keys, scopes, middleware, expiration | +| Database | ✅ Complete | 173 | Auto-migrations, connection pooling | +| API Framework | ✅ Complete | 298 | Response envelopes, OpenAPI, chi router | + +### What's Missing + +| Component | Priority | Impact | +|-----------|----------|--------| +| Tests | HIGH | No unit or integration tests | +| Dynamic Projects | MEDIUM | Hardcoded to 2 projects | +| Project Access Control | HIGH | Middleware exists but unused | +| Rate Limiting | HIGH | No request throttling | +| Metrics | MEDIUM | No Prometheus/observability | +| Retry Logic | MEDIUM | No transient failure recovery | +| Command Cancellation | LOW | No explicit kill mechanism | + +--- + +## Architecture Analysis + +### Current: Layered (Not Hexagonal) + +``` +┌─────────────────────────────────────────────┐ +│ HTTP Layer (handlers/) │ +│ - Chi router, request parsing, responses │ +├─────────────────────────────────────────────┤ +│ Service Layer (auth/, executor/) │ +│ - Business logic mixed with infrastructure │ +├─────────────────────────────────────────────┤ +│ Data Layer (db/, projects/) │ +│ - Direct PostgreSQL, kubectl calls │ +└─────────────────────────────────────────────┘ +``` + +**Problems:** +1. Handlers directly depend on concrete implementations +2. No interfaces for testability +3. Executor calls kubectl directly (hard to test) +4. Registry hardcodes project definitions +5. No domain model - just request/response structs + +### Target: Hexagonal Architecture + +``` + ┌─────────────────────┐ + │ Domain (Core) │ + │ │ + │ - Project │ + │ - Command │ + │ - APIKey │ + │ - CommandResult │ + │ - Stream │ + └─────────────────────┘ + ▲ + ┌──────────────┼──────────────┐ + │ │ │ + ┌─────┴─────┐ ┌─────┴─────┐ ┌─────┴─────┐ + │ Ports │ │ Ports │ │ Ports │ + │ (Inbound) │ │ (Outbound)│ │ (Outbound)│ + └───────────┘ └───────────┘ └───────────┘ + │ │ │ + ┌─────┴─────┐ ┌─────┴─────┐ ┌─────┴─────┐ + │ Adapters │ │ Adapters │ │ Adapters │ + │ (HTTP) │ │ (kubectl) │ │ (Postgres)│ + └───────────┘ └───────────┘ └───────────┘ +``` + +--- + +## Risk Analysis: What Could Go Wrong + +### SSE Streaming Risks + +| Risk | Severity | Mitigation | +|------|----------|------------| +| **Memory leak from abandoned streams** | HIGH | Stream cleanup after 30s (implemented), but no max stream limit | +| **Buffer overflow** | MEDIUM | 100-event buffer with non-blocking send (drops events) | +| **Client reconnection gaps** | MEDIUM | No event replay - missed events are lost | +| **Proxy buffering** | MEDIUM | Headers set correctly, but nginx/cloudflare may still buffer | +| **Connection hijacking** | LOW | No stream authentication after initial auth | + +**Current Implementation Strengths:** +- Heartbeat keeps connection alive (30s) +- Context cancellation on client disconnect +- Non-blocking sends prevent goroutine leaks +- Completion event triggers cleanup + +**Gaps to Address:** +1. No max concurrent streams per user +2. No event ID for resumability (Last-Event-ID) +3. No backpressure signaling to clients +4. Stream state not persisted (restart loses all) + +### Executor Risks + +| Risk | Severity | Mitigation | +|------|----------|------------| +| **kubectl timeout** | HIGH | 10-minute context timeout (implemented) | +| **Pod not found** | HIGH | Registry refresh, but no retry | +| **Large output OOM** | MEDIUM | 1MB line buffer, but no total limit | +| **Command injection** | HIGH | Args passed directly to kubectl (needs validation) | +| **Zombie processes** | MEDIUM | Context cancellation, but no kill -9 | + +**Current Implementation Strengths:** +- Context-based cancellation +- Line-by-line streaming (not buffering full output) +- Separate stdout/stderr handling +- Exit code capture + +**Gaps to Address:** +1. No command sanitization (shell injection risk) +2. No output size limit (could fill memory) +3. No concurrent command limit per project +4. No pod health check before exec + +### Auth Risks + +| Risk | Severity | Mitigation | +|------|----------|------------| +| **Timing attack on key comparison** | MEDIUM | Using crypto/subtle (verify!) | +| **Key enumeration** | LOW | Generic "invalid key" errors | +| **Revocation delay** | MEDIUM | No cache - checks DB each request | +| **Admin key exposure** | HIGH | Env var, but logged if DEBUG | + +**Current Implementation Strengths:** +- SHA-256 hashed keys (never stored plain) +- Scoped permissions +- Expiration enforcement +- last_used_at audit trail + +**Gaps to Address:** +1. Verify constant-time comparison is used +2. Add key rotation support +3. Add IP allowlisting per key +4. Rate limit failed auth attempts + +### Database Risks + +| Risk | Severity | Mitigation | +|------|----------|------------| +| **Connection pool exhaustion** | MEDIUM | Fixed at 10 max connections | +| **Migration failure on startup** | HIGH | Fails fast (correct), but no rollback | +| **Query timeout** | MEDIUM | No explicit query timeouts set | + +--- + +## Hexagonal Refactoring Plan + +### Phase 1: Define Domain Model + +Create pure domain types with no external dependencies. + +```go +// internal/domain/project.go +package domain + +type ProjectID string +type CommandID string + +type Project struct { + ID ProjectID + Name string + Description string + PodName string + Status ProjectStatus + Workspace string +} + +type ProjectStatus string + +const ( + ProjectStatusRunning ProjectStatus = "running" + ProjectStatusPending ProjectStatus = "pending" + ProjectStatusFailed ProjectStatus = "failed" + ProjectStatusNotFound ProjectStatus = "not_found" +) +``` + +```go +// internal/domain/command.go +package domain + +type CommandType string + +const ( + CommandTypeClaude CommandType = "claude" + CommandTypeShell CommandType = "shell" + CommandTypeGit CommandType = "git" +) + +type Command struct { + ID CommandID + ProjectID ProjectID + Type CommandType + Args []string + StartedAt time.Time +} + +type CommandResult struct { + CommandID CommandID + ExitCode int + DurationMs int64 + Error error +} + +type OutputLine struct { + Stream string // "stdout" or "stderr" + Line string + Time time.Time +} +``` + +### Phase 2: Define Ports (Interfaces) + +```go +// internal/port/project_repository.go +package port + +type ProjectRepository interface { + List(ctx context.Context) ([]domain.Project, error) + Get(ctx context.Context, id domain.ProjectID) (*domain.Project, error) + RefreshStatus(ctx context.Context) error +} +``` + +```go +// internal/port/command_executor.go +package port + +type CommandExecutor interface { + Execute(ctx context.Context, cmd *domain.Command, onOutput func(domain.OutputLine)) (*domain.CommandResult, error) + Cancel(ctx context.Context, cmdID domain.CommandID) error +} +``` + +```go +// internal/port/stream_publisher.go +package port + +type StreamPublisher interface { + Subscribe(streamID string) (<-chan StreamEvent, func()) + Publish(streamID string, event StreamEvent) + Close(streamID string) +} + +type StreamEvent struct { + Type string + Data map[string]any +} +``` + +```go +// internal/port/api_key_repository.go +package port + +type APIKeyRepository interface { + Create(ctx context.Context, key *domain.APIKey) error + GetByHash(ctx context.Context, hash string) (*domain.APIKey, error) + List(ctx context.Context) ([]domain.APIKey, error) + Revoke(ctx context.Context, id uuid.UUID) error + UpdateLastUsed(ctx context.Context, id uuid.UUID) error +} +``` + +### Phase 3: Create Adapters + +``` +internal/ +├── adapter/ +│ ├── http/ # HTTP handlers (inbound adapter) +│ │ ├── project_handler.go +│ │ ├── key_handler.go +│ │ └── middleware.go +│ ├── kubernetes/ # kubectl executor (outbound adapter) +│ │ └── executor.go +│ ├── postgres/ # Database (outbound adapter) +│ │ ├── project_repo.go +│ │ └── key_repo.go +│ └── memory/ # In-memory (for testing) +│ ├── project_repo.go +│ └── stream_publisher.go +``` + +### Phase 4: Wire with Dependency Injection + +```go +// cmd/rdev-api/main.go + +func main() { + // Create adapters + db := postgres.NewDB(cfg) + projectRepo := postgres.NewProjectRepository(db) + keyRepo := postgres.NewAPIKeyRepository(db) + executor := kubernetes.NewExecutor(cfg.Namespace) + streamPub := memory.NewStreamPublisher() + + // Create services (use cases) + projectSvc := service.NewProjectService(projectRepo, executor, streamPub) + keySvc := service.NewAPIKeyService(keyRepo) + + // Create HTTP handlers + projectHandler := http.NewProjectHandler(projectSvc) + keyHandler := http.NewKeyHandler(keySvc) + + // Wire routes + router := chi.NewRouter() + projectHandler.Mount(router) + keyHandler.Mount(router) +} +``` + +--- + +## Implementation Tasks + +### Epic 1: Testing Foundation (Priority: CRITICAL) + +Without tests, refactoring is dangerous. + +| Task | Effort | Files | +|------|--------|-------| +| 1.1 Add test utilities (fixtures, mocks) | 2h | `internal/testutil/` | +| 1.2 Unit tests for auth/keys.go | 2h | `internal/auth/keys_test.go` | +| 1.3 Unit tests for auth/service.go | 3h | `internal/auth/service_test.go` | +| 1.4 Unit tests for executor | 2h | `internal/executor/executor_test.go` | +| 1.5 Integration tests for handlers | 4h | `internal/handlers/*_test.go` | +| 1.6 E2E test with docker-compose | 4h | `tests/e2e/` | + +**Test Strategy:** +- Unit tests: Mock all dependencies via interfaces +- Integration tests: Use real postgres (docker), mock kubectl +- E2E tests: Full stack with test k8s cluster or kind + +### Epic 2: Security Hardening (Priority: HIGH) + +| Task | Effort | Risk Mitigated | +|------|--------|----------------| +| 2.1 Command sanitization | 2h | Shell injection | +| 2.2 Rate limiting middleware | 3h | DoS, brute force | +| 2.3 Concurrent command limit | 2h | Resource exhaustion | +| 2.4 Output size limit | 1h | OOM | +| 2.5 Constant-time key comparison audit | 1h | Timing attacks | + +**Command Sanitization Rules:** +```go +// Disallow these in shell commands: +var dangerousPatterns = []string{ + ";", "&&", "||", "|", "`", "$(", // Command chaining + ">", ">>", "<", // Redirects + "rm -rf", "dd if=", // Destructive +} +``` + +### Epic 3: Hexagonal Refactoring (Priority: MEDIUM) + +| Task | Effort | Impact | +|------|--------|--------| +| 3.1 Create domain package | 2h | Foundation | +| 3.2 Define port interfaces | 2h | Testability | +| 3.3 Extract kubernetes adapter | 3h | Mockable executor | +| 3.4 Extract postgres adapter | 3h | Mockable repos | +| 3.5 Create memory adapter (tests) | 2h | Fast tests | +| 3.6 Refactor handlers to use ports | 4h | Decoupling | +| 3.7 Add dependency injection | 2h | Flexibility | + +### Epic 4: SSE Improvements (Priority: MEDIUM) + +| Task | Effort | Benefit | +|------|--------|---------| +| 4.1 Add Last-Event-ID support | 3h | Resumable connections | +| 4.2 Max streams per user | 2h | Resource control | +| 4.3 Event replay buffer | 4h | Catch missed events | +| 4.4 Backpressure detection | 2h | Client health | + +**Event ID Format:** +``` +event: output +id: cmd-pantheon-001:42 +data: {"line": "Building..."} +``` + +### Epic 5: Observability (Priority: MEDIUM) + +| Task | Effort | Benefit | +|------|--------|---------| +| 5.1 Prometheus metrics | 3h | Monitoring | +| 5.2 Request tracing (OpenTelemetry) | 4h | Debugging | +| 5.3 Structured logging improvements | 2h | Log analysis | +| 5.4 Health check enhancements | 1h | K8s probes | + +**Key Metrics:** +``` +rdev_commands_total{project, type, status} +rdev_command_duration_seconds{project, type} +rdev_active_streams{project} +rdev_auth_failures_total{reason} +rdev_api_request_duration_seconds{endpoint, method, status} +``` + +### Epic 6: Dynamic Project Discovery (Priority: LOW) + +| Task | Effort | Benefit | +|------|--------|---------| +| 6.1 K8s label-based discovery | 3h | No hardcoding | +| 6.2 Project config from ConfigMap | 2h | External config | +| 6.3 Watch for pod changes | 3h | Real-time updates | + +**Label Convention:** +```yaml +metadata: + labels: + rdev.orchard9.ai/project: "true" + rdev.orchard9.ai/name: "pantheon" + rdev.orchard9.ai/description: "Go API backend" +``` + +--- + +## Implementation Order + +``` +Week 1: Testing Foundation +├── 1.1 Test utilities +├── 1.2 Auth key tests +├── 1.3 Auth service tests +└── 1.4 Executor tests + +Week 2: Security + More Tests +├── 2.1 Command sanitization +├── 2.2 Rate limiting +├── 1.5 Handler integration tests +└── 2.3 Concurrent command limit + +Week 3: Hexagonal Refactoring +├── 3.1 Domain package +├── 3.2 Port interfaces +├── 3.3 Kubernetes adapter +└── 3.4 Postgres adapter + +Week 4: Production Hardening +├── 3.5-3.7 Complete hexagonal +├── 4.1 SSE Last-Event-ID +├── 5.1 Prometheus metrics +└── 1.6 E2E tests +``` + +--- + +## File Structure After Refactoring + +``` +internal/ +├── domain/ # Pure domain model (no deps) +│ ├── project.go +│ ├── command.go +│ ├── apikey.go +│ └── errors.go +├── port/ # Interface definitions +│ ├── project_repository.go +│ ├── command_executor.go +│ ├── apikey_repository.go +│ └── stream_publisher.go +├── service/ # Use cases / business logic +│ ├── project_service.go +│ ├── command_service.go +│ └── apikey_service.go +├── adapter/ +│ ├── http/ # Inbound: HTTP handlers +│ │ ├── project_handler.go +│ │ ├── key_handler.go +│ │ ├── middleware/ +│ │ │ ├── auth.go +│ │ │ └── ratelimit.go +│ │ └── response.go +│ ├── kubernetes/ # Outbound: kubectl +│ │ └── executor.go +│ ├── postgres/ # Outbound: database +│ │ ├── project_repo.go +│ │ ├── apikey_repo.go +│ │ └── migrations/ +│ └── memory/ # Outbound: in-memory (testing) +│ ├── project_repo.go +│ ├── apikey_repo.go +│ └── stream_publisher.go +└── config/ # Configuration loading + └── config.go +``` + +--- + +## Success Criteria + +### Must Have (v0.6) +- [ ] 80%+ test coverage on auth and executor +- [ ] Command input sanitization +- [ ] Rate limiting (100 req/min per key) +- [ ] No shell injection vulnerabilities + +### Should Have (v0.7) +- [ ] Hexagonal architecture complete +- [ ] Prometheus metrics endpoint +- [ ] SSE reconnection support + +### Nice to Have (v0.8+) +- [ ] Dynamic project discovery +- [ ] OpenTelemetry tracing +- [ ] Command cancellation API + +--- + +## Appendix: SSE Event Protocol + +### Event Types + +``` +event: connected +data: {"project":"pantheon","stream_id":"cmd-001"} + +event: output +id: cmd-001:1 +data: {"line":"Starting build...","stream":"stdout"} + +event: output +id: cmd-001:2 +data: {"line":"Warning: deprecated API","stream":"stderr"} + +event: heartbeat +data: {"timestamp":"2026-01-25T07:00:00Z"} + +event: complete +id: cmd-001:final +data: {"exit_code":0,"duration_ms":4523} + +event: error +data: {"code":"POD_NOT_FOUND","message":"claudebox-pantheon-0 not running"} +``` + +### Client Reconnection + +```javascript +const events = new EventSource('/projects/pantheon/events?stream_id=cmd-001', { + headers: { 'Authorization': 'Bearer rdev_sk_...' } +}); + +// On reconnect, browser sends Last-Event-ID header automatically +// Server should replay events since that ID +``` + +--- + +## Appendix: Command Execution Flow + +``` +┌─────────┐ ┌─────────┐ ┌──────────┐ ┌─────────┐ +│ Client │ │ Handler │ │ Executor │ │ Pod │ +└────┬────┘ └────┬────┘ └────┬─────┘ └────┬────┘ + │ │ │ │ + │ POST /claude │ │ │ + │──────────────>│ │ │ + │ │ │ │ + │ 201 Created │ │ │ + │<──────────────│ │ │ + │ │ │ │ + │ GET /events │ go execute() │ │ + │──────────────>│──────────────>│ │ + │ │ │ kubectl exec │ + │ │ │───────────────>│ + │ │ │ │ + │ │ onOutput() │ stdout/stderr │ + │ SSE output │<──────────────│<───────────────│ + │<──────────────│ │ │ + │ │ │ │ + │ SSE output │ onOutput() │ stdout/stderr │ + │<──────────────│<──────────────│<───────────────│ + │ │ │ │ + │ SSE complete │ result │ exit code │ + │<──────────────│<──────────────│<───────────────│ + │ │ │ │ +``` + +--- + +## Decision Log + +| Decision | Rationale | Date | +|----------|-----------|------| +| Use hexagonal architecture | Testability, flexibility, clean boundaries | 2026-01-25 | +| Keep SSE (not WebSocket) | Simpler, HTTP/2 multiplexing, auto-reconnect | 2026-01-25 | +| Prioritize tests over refactoring | Can't safely refactor without tests | 2026-01-25 | +| Port 5433 for local dev | Avoid conflicts with system postgres | 2026-01-25 |