rdev/docs/architecture/streaming.md
jordan 72d16929ca feat: Implement hexagonal architecture with services, webhooks, queue, and telemetry
Major refactoring to hexagonal (ports & adapters) architecture:

- Add service layer (apikey_service, project_service) for business logic
- Add webhook system with dispatcher and delivery tracking
- Add command queue with priority-based processing
- Add rate limiting with sliding window algorithm
- Add audit logging for command execution
- Add OpenTelemetry integration (traces, metrics, spans)
- Add circuit breaker for fault tolerance
- Add cached repository wrapper for performance
- Add comprehensive validation package
- Add Kubernetes client integration for pod management
- Add database migrations (allowed_ips, audit_log, rate_limiting, queue, webhooks)
- Add network policy and PodDisruptionBudget for k8s
- Remove legacy executor and projects/registry packages
- Untrack secrets.yaml (now managed via envault)
- Add coverage.out to .gitignore
- Add e2e test infrastructure with docker-compose
- Add comprehensive documentation (API, architecture, operations, plans)
- Add golangci-lint config and pre-commit hook

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-25 19:57:46 -07:00

11 KiB

SSE Streaming Architecture

rdev uses Server-Sent Events (SSE) for real-time command output streaming.

Overview

┌────────────┐                  ┌────────────┐                  ┌────────────┐
│   Client   │                  │  rdev API  │                  │   K8s Pod  │
│            │                  │            │                  │            │
│  1. POST   │─────────────────▶│  Start     │─────────────────▶│  Execute   │
│  /claude   │                  │  Command   │                  │  Command   │
│            │◀─────────────────│            │                  │            │
│  response: │  {id, stream_url}│            │                  │            │
│            │                  │            │                  │            │
│  2. GET    │─────────────────▶│  SSE       │◀─────────────────│  Output    │
│  /events   │◀─────────────────│  Stream    │◀─────────────────│  Lines     │
│            │  event: output   │            │                  │            │
│            │  event: output   │            │                  │            │
│            │  event: complete │            │◀─────────────────│  Exit      │
└────────────┘                  └────────────┘                  └────────────┘

SSE Protocol

Event Format

id: evt-001
event: output
data: {"line": "Hello, world!", "stream": "stdout"}

id: evt-002
event: output
data: {"line": "Processing...", "stream": "stdout"}

id: evt-003
event: complete
data: {"exit_code": 0, "duration_ms": 1234}

Event Types

Event Description Data
connected Stream established {project, stream_id, reconnecting}
output Command output line {line, stream}
complete Command finished {exit_code, duration_ms}
heartbeat Keep-alive signal {timestamp}
error Error occurred {message}

Output Streams

  • stdout - Standard output
  • stderr - Standard error

Reconnection Support

Last-Event-ID

Clients can reconnect and resume from where they left off:

GET /projects/test/events?stream_id=cmd-001
Last-Event-ID: evt-002

The server replays all events after evt-002.

Implementation

type StreamManager struct {
    streams map[string]*Stream
    mu      sync.RWMutex
}

type Stream struct {
    events    []StreamEvent
    listeners []chan StreamEvent
    mu        sync.RWMutex
}

func (sm *StreamManager) SubscribeFromID(streamID, lastEventID string) (<-chan StreamEvent, func()) {
    sm.mu.RLock()
    stream := sm.streams[streamID]
    sm.mu.RUnlock()

    ch := make(chan StreamEvent, 100)

    // Replay events after lastEventID
    stream.mu.RLock()
    foundLast := false
    for _, event := range stream.events {
        if event.ID == lastEventID {
            foundLast = true
            continue
        }
        if foundLast {
            ch <- event
        }
    }
    stream.mu.RUnlock()

    // Subscribe for new events
    stream.addListener(ch)

    return ch, func() { stream.removeListener(ch) }
}

Client Handling

// JavaScript SSE client with reconnection
function connectSSE(url) {
    const eventSource = new EventSource(url);

    eventSource.onopen = () => {
        console.log('Connected');
    };

    eventSource.addEventListener('output', (e) => {
        const data = JSON.parse(e.data);
        console.log(data.stream + ':', data.line);
    });

    eventSource.addEventListener('complete', (e) => {
        const data = JSON.parse(e.data);
        console.log('Exit code:', data.exit_code);
        eventSource.close();
    });

    eventSource.onerror = (e) => {
        console.log('Connection error, will auto-reconnect');
        // Browser automatically reconnects with Last-Event-ID
    };

    return eventSource;
}

Stream Lifecycle

    ┌─────────────────────────────────────────────────────────┐
    │                    Command Started                       │
    └────────────────────────┬────────────────────────────────┘
                             │
                             ▼
    ┌─────────────────────────────────────────────────────────┐
    │                    Stream Created                        │
    │                    (StreamManager)                       │
    └────────────────────────┬────────────────────────────────┘
                             │
              ┌──────────────┴──────────────┐
              │                             │
              ▼                             ▼
    ┌───────────────────┐         ┌───────────────────┐
    │  Client Subscribe │         │  Output Events    │
    │  (SSE Connection) │◀────────│  (from executor)  │
    └───────────────────┘         └───────────────────┘
              │                             │
              │                             │
              └──────────────┬──────────────┘
                             │
                             ▼
    ┌─────────────────────────────────────────────────────────┐
    │                   Complete Event                         │
    │                   (exit_code)                            │
    └────────────────────────┬────────────────────────────────┘
                             │
                             ▼
    ┌─────────────────────────────────────────────────────────┐
    │                   Stream Cleanup                         │
    │              (after 30s grace period)                    │
    └─────────────────────────────────────────────────────────┘

Handler Implementation

func (h *ProjectsHandler) Events(w http.ResponseWriter, r *http.Request) {
    streamID := r.URL.Query().Get("stream_id")
    lastEventID := r.Header.Get("Last-Event-ID")

    // Set SSE headers
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")
    w.Header().Set("Access-Control-Allow-Origin", "*")

    flusher := w.(http.Flusher)

    // Subscribe with reconnection support
    var events <-chan StreamEvent
    if lastEventID != "" {
        events, cleanup = h.streams.SubscribeFromID(streamID, lastEventID)
    } else {
        events, cleanup = h.streams.Subscribe(streamID)
    }
    defer cleanup()

    // Send connected event
    writeSSE(w, flusher, "connected", map[string]any{
        "stream_id":    streamID,
        "reconnecting": lastEventID != "",
    })

    // Heartbeat ticker
    heartbeat := time.NewTicker(30 * time.Second)
    defer heartbeat.Stop()

    // Event loop
    for {
        select {
        case <-r.Context().Done():
            return
        case event, ok := <-events:
            if !ok {
                return
            }
            writeSSEWithID(w, flusher, event.ID, event.Type, event.Data)
            if event.Type == "complete" {
                return
            }
        case <-heartbeat.C:
            writeSSE(w, flusher, "heartbeat", map[string]any{
                "timestamp": time.Now().UTC().Format(time.RFC3339),
            })
        }
    }
}

func writeSSEWithID(w http.ResponseWriter, flusher http.Flusher,
    id, event string, data map[string]any) {

    dataBytes, _ := json.Marshal(data)
    if id != "" {
        fmt.Fprintf(w, "id: %s\n", id)
    }
    fmt.Fprintf(w, "event: %s\n", event)
    fmt.Fprintf(w, "data: %s\n\n", dataBytes)
    flusher.Flush()
}

Performance Considerations

Buffer Sizing

// 100-event buffer to handle bursts
ch := make(chan StreamEvent, 100)

Heartbeats

30-second heartbeats prevent:

  • Proxy timeouts
  • Connection drops from inactive connections
  • Client uncertainty about connection state

Cleanup

Streams are cleaned up 30 seconds after completion:

  • Allows time for reconnections
  • Prevents memory leaks
  • Enables late-arriving clients to see final state

Fanout

Multiple clients can subscribe to the same stream:

func (sm *StreamManager) Send(streamID, eventType string, data map[string]any) {
    sm.mu.RLock()
    stream := sm.streams[streamID]
    sm.mu.RUnlock()

    event := StreamEvent{
        ID:   generateEventID(),
        Type: eventType,
        Data: data,
    }

    // Store for replay
    stream.addEvent(event)

    // Fanout to all listeners
    stream.mu.RLock()
    for _, ch := range stream.listeners {
        select {
        case ch <- event:
        default:
            // Channel full, skip (client too slow)
        }
    }
    stream.mu.RUnlock()
}

Error Handling

Connection Errors

SSE automatically reconnects on error. The browser:

  1. Closes failed connection
  2. Waits 3 seconds (configurable)
  3. Reconnects with Last-Event-ID

Slow Clients

If a client can't keep up:

  1. Events are dropped (non-blocking send)
  2. Client eventually catches up via replay on reconnect

Stream Not Found

If stream doesn't exist (expired or never created):

event: error
data: {"message": "stream not found"}