Major refactoring to hexagonal (ports & adapters) architecture: - Add service layer (apikey_service, project_service) for business logic - Add webhook system with dispatcher and delivery tracking - Add command queue with priority-based processing - Add rate limiting with sliding window algorithm - Add audit logging for command execution - Add OpenTelemetry integration (traces, metrics, spans) - Add circuit breaker for fault tolerance - Add cached repository wrapper for performance - Add comprehensive validation package - Add Kubernetes client integration for pod management - Add database migrations (allowed_ips, audit_log, rate_limiting, queue, webhooks) - Add network policy and PodDisruptionBudget for k8s - Remove legacy executor and projects/registry packages - Untrack secrets.yaml (now managed via envault) - Add coverage.out to .gitignore - Add e2e test infrastructure with docker-compose - Add comprehensive documentation (API, architecture, operations, plans) - Add golangci-lint config and pre-commit hook Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
11 KiB
11 KiB
SSE Streaming Architecture
rdev uses Server-Sent Events (SSE) for real-time command output streaming.
Overview
┌────────────┐ ┌────────────┐ ┌────────────┐
│ Client │ │ rdev API │ │ K8s Pod │
│ │ │ │ │ │
│ 1. POST │─────────────────▶│ Start │─────────────────▶│ Execute │
│ /claude │ │ Command │ │ Command │
│ │◀─────────────────│ │ │ │
│ response: │ {id, stream_url}│ │ │ │
│ │ │ │ │ │
│ 2. GET │─────────────────▶│ SSE │◀─────────────────│ Output │
│ /events │◀─────────────────│ Stream │◀─────────────────│ Lines │
│ │ event: output │ │ │ │
│ │ event: output │ │ │ │
│ │ event: complete │ │◀─────────────────│ Exit │
└────────────┘ └────────────┘ └────────────┘
SSE Protocol
Event Format
id: evt-001
event: output
data: {"line": "Hello, world!", "stream": "stdout"}
id: evt-002
event: output
data: {"line": "Processing...", "stream": "stdout"}
id: evt-003
event: complete
data: {"exit_code": 0, "duration_ms": 1234}
Event Types
| Event | Description | Data |
|---|---|---|
connected |
Stream established | {project, stream_id, reconnecting} |
output |
Command output line | {line, stream} |
complete |
Command finished | {exit_code, duration_ms} |
heartbeat |
Keep-alive signal | {timestamp} |
error |
Error occurred | {message} |
Output Streams
stdout- Standard outputstderr- Standard error
Reconnection Support
Last-Event-ID
Clients can reconnect and resume from where they left off:
GET /projects/test/events?stream_id=cmd-001
Last-Event-ID: evt-002
The server replays all events after evt-002.
Implementation
type StreamManager struct {
streams map[string]*Stream
mu sync.RWMutex
}
type Stream struct {
events []StreamEvent
listeners []chan StreamEvent
mu sync.RWMutex
}
func (sm *StreamManager) SubscribeFromID(streamID, lastEventID string) (<-chan StreamEvent, func()) {
sm.mu.RLock()
stream := sm.streams[streamID]
sm.mu.RUnlock()
ch := make(chan StreamEvent, 100)
// Replay events after lastEventID
stream.mu.RLock()
foundLast := false
for _, event := range stream.events {
if event.ID == lastEventID {
foundLast = true
continue
}
if foundLast {
ch <- event
}
}
stream.mu.RUnlock()
// Subscribe for new events
stream.addListener(ch)
return ch, func() { stream.removeListener(ch) }
}
Client Handling
// JavaScript SSE client with reconnection
function connectSSE(url) {
const eventSource = new EventSource(url);
eventSource.onopen = () => {
console.log('Connected');
};
eventSource.addEventListener('output', (e) => {
const data = JSON.parse(e.data);
console.log(data.stream + ':', data.line);
});
eventSource.addEventListener('complete', (e) => {
const data = JSON.parse(e.data);
console.log('Exit code:', data.exit_code);
eventSource.close();
});
eventSource.onerror = (e) => {
console.log('Connection error, will auto-reconnect');
// Browser automatically reconnects with Last-Event-ID
};
return eventSource;
}
Stream Lifecycle
┌─────────────────────────────────────────────────────────┐
│ Command Started │
└────────────────────────┬────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Stream Created │
│ (StreamManager) │
└────────────────────────┬────────────────────────────────┘
│
┌──────────────┴──────────────┐
│ │
▼ ▼
┌───────────────────┐ ┌───────────────────┐
│ Client Subscribe │ │ Output Events │
│ (SSE Connection) │◀────────│ (from executor) │
└───────────────────┘ └───────────────────┘
│ │
│ │
└──────────────┬──────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Complete Event │
│ (exit_code) │
└────────────────────────┬────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Stream Cleanup │
│ (after 30s grace period) │
└─────────────────────────────────────────────────────────┘
Handler Implementation
func (h *ProjectsHandler) Events(w http.ResponseWriter, r *http.Request) {
streamID := r.URL.Query().Get("stream_id")
lastEventID := r.Header.Get("Last-Event-ID")
// Set SSE headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
flusher := w.(http.Flusher)
// Subscribe with reconnection support
var events <-chan StreamEvent
if lastEventID != "" {
events, cleanup = h.streams.SubscribeFromID(streamID, lastEventID)
} else {
events, cleanup = h.streams.Subscribe(streamID)
}
defer cleanup()
// Send connected event
writeSSE(w, flusher, "connected", map[string]any{
"stream_id": streamID,
"reconnecting": lastEventID != "",
})
// Heartbeat ticker
heartbeat := time.NewTicker(30 * time.Second)
defer heartbeat.Stop()
// Event loop
for {
select {
case <-r.Context().Done():
return
case event, ok := <-events:
if !ok {
return
}
writeSSEWithID(w, flusher, event.ID, event.Type, event.Data)
if event.Type == "complete" {
return
}
case <-heartbeat.C:
writeSSE(w, flusher, "heartbeat", map[string]any{
"timestamp": time.Now().UTC().Format(time.RFC3339),
})
}
}
}
func writeSSEWithID(w http.ResponseWriter, flusher http.Flusher,
id, event string, data map[string]any) {
dataBytes, _ := json.Marshal(data)
if id != "" {
fmt.Fprintf(w, "id: %s\n", id)
}
fmt.Fprintf(w, "event: %s\n", event)
fmt.Fprintf(w, "data: %s\n\n", dataBytes)
flusher.Flush()
}
Performance Considerations
Buffer Sizing
// 100-event buffer to handle bursts
ch := make(chan StreamEvent, 100)
Heartbeats
30-second heartbeats prevent:
- Proxy timeouts
- Connection drops from inactive connections
- Client uncertainty about connection state
Cleanup
Streams are cleaned up 30 seconds after completion:
- Allows time for reconnections
- Prevents memory leaks
- Enables late-arriving clients to see final state
Fanout
Multiple clients can subscribe to the same stream:
func (sm *StreamManager) Send(streamID, eventType string, data map[string]any) {
sm.mu.RLock()
stream := sm.streams[streamID]
sm.mu.RUnlock()
event := StreamEvent{
ID: generateEventID(),
Type: eventType,
Data: data,
}
// Store for replay
stream.addEvent(event)
// Fanout to all listeners
stream.mu.RLock()
for _, ch := range stream.listeners {
select {
case ch <- event:
default:
// Channel full, skip (client too slow)
}
}
stream.mu.RUnlock()
}
Error Handling
Connection Errors
SSE automatically reconnects on error. The browser:
- Closes failed connection
- Waits 3 seconds (configurable)
- Reconnects with
Last-Event-ID
Slow Clients
If a client can't keep up:
- Events are dropped (non-blocking send)
- Client eventually catches up via replay on reconnect
Stream Not Found
If stream doesn't exist (expired or never created):
event: error
data: {"message": "stream not found"}