Major refactoring to hexagonal (ports & adapters) architecture: - Add service layer (apikey_service, project_service) for business logic - Add webhook system with dispatcher and delivery tracking - Add command queue with priority-based processing - Add rate limiting with sliding window algorithm - Add audit logging for command execution - Add OpenTelemetry integration (traces, metrics, spans) - Add circuit breaker for fault tolerance - Add cached repository wrapper for performance - Add comprehensive validation package - Add Kubernetes client integration for pod management - Add database migrations (allowed_ips, audit_log, rate_limiting, queue, webhooks) - Add network policy and PodDisruptionBudget for k8s - Remove legacy executor and projects/registry packages - Untrack secrets.yaml (now managed via envault) - Add coverage.out to .gitignore - Add e2e test infrastructure with docker-compose - Add comprehensive documentation (API, architecture, operations, plans) - Add golangci-lint config and pre-commit hook Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
325 lines
11 KiB
Markdown
325 lines
11 KiB
Markdown
# SSE Streaming Architecture
|
|
|
|
rdev uses Server-Sent Events (SSE) for real-time command output streaming.
|
|
|
|
## Overview
|
|
|
|
```
|
|
┌────────────┐ ┌────────────┐ ┌────────────┐
|
|
│ Client │ │ rdev API │ │ K8s Pod │
|
|
│ │ │ │ │ │
|
|
│ 1. POST │─────────────────▶│ Start │─────────────────▶│ Execute │
|
|
│ /claude │ │ Command │ │ Command │
|
|
│ │◀─────────────────│ │ │ │
|
|
│ response: │ {id, stream_url}│ │ │ │
|
|
│ │ │ │ │ │
|
|
│ 2. GET │─────────────────▶│ SSE │◀─────────────────│ Output │
|
|
│ /events │◀─────────────────│ Stream │◀─────────────────│ Lines │
|
|
│ │ event: output │ │ │ │
|
|
│ │ event: output │ │ │ │
|
|
│ │ event: complete │ │◀─────────────────│ Exit │
|
|
└────────────┘ └────────────┘ └────────────┘
|
|
```
|
|
|
|
## SSE Protocol
|
|
|
|
### Event Format
|
|
|
|
```
|
|
id: evt-001
|
|
event: output
|
|
data: {"line": "Hello, world!", "stream": "stdout"}
|
|
|
|
id: evt-002
|
|
event: output
|
|
data: {"line": "Processing...", "stream": "stdout"}
|
|
|
|
id: evt-003
|
|
event: complete
|
|
data: {"exit_code": 0, "duration_ms": 1234}
|
|
```
|
|
|
|
### Event Types
|
|
|
|
| Event | Description | Data |
|
|
|-------|-------------|------|
|
|
| `connected` | Stream established | `{project, stream_id, reconnecting}` |
|
|
| `output` | Command output line | `{line, stream}` |
|
|
| `complete` | Command finished | `{exit_code, duration_ms}` |
|
|
| `heartbeat` | Keep-alive signal | `{timestamp}` |
|
|
| `error` | Error occurred | `{message}` |
|
|
|
|
### Output Streams
|
|
|
|
- `stdout` - Standard output
|
|
- `stderr` - Standard error
|
|
|
|
## Reconnection Support
|
|
|
|
### Last-Event-ID
|
|
|
|
Clients can reconnect and resume from where they left off:
|
|
|
|
```
|
|
GET /projects/test/events?stream_id=cmd-001
|
|
Last-Event-ID: evt-002
|
|
```
|
|
|
|
The server replays all events after `evt-002`.
|
|
|
|
### Implementation
|
|
|
|
```go
|
|
type StreamManager struct {
|
|
streams map[string]*Stream
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
type Stream struct {
|
|
events []StreamEvent
|
|
listeners []chan StreamEvent
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
func (sm *StreamManager) SubscribeFromID(streamID, lastEventID string) (<-chan StreamEvent, func()) {
|
|
sm.mu.RLock()
|
|
stream := sm.streams[streamID]
|
|
sm.mu.RUnlock()
|
|
|
|
ch := make(chan StreamEvent, 100)
|
|
|
|
// Replay events after lastEventID
|
|
stream.mu.RLock()
|
|
foundLast := false
|
|
for _, event := range stream.events {
|
|
if event.ID == lastEventID {
|
|
foundLast = true
|
|
continue
|
|
}
|
|
if foundLast {
|
|
ch <- event
|
|
}
|
|
}
|
|
stream.mu.RUnlock()
|
|
|
|
// Subscribe for new events
|
|
stream.addListener(ch)
|
|
|
|
return ch, func() { stream.removeListener(ch) }
|
|
}
|
|
```
|
|
|
|
### Client Handling
|
|
|
|
```javascript
|
|
// JavaScript SSE client with reconnection
|
|
function connectSSE(url) {
|
|
const eventSource = new EventSource(url);
|
|
|
|
eventSource.onopen = () => {
|
|
console.log('Connected');
|
|
};
|
|
|
|
eventSource.addEventListener('output', (e) => {
|
|
const data = JSON.parse(e.data);
|
|
console.log(data.stream + ':', data.line);
|
|
});
|
|
|
|
eventSource.addEventListener('complete', (e) => {
|
|
const data = JSON.parse(e.data);
|
|
console.log('Exit code:', data.exit_code);
|
|
eventSource.close();
|
|
});
|
|
|
|
eventSource.onerror = (e) => {
|
|
console.log('Connection error, will auto-reconnect');
|
|
// Browser automatically reconnects with Last-Event-ID
|
|
};
|
|
|
|
return eventSource;
|
|
}
|
|
```
|
|
|
|
## Stream Lifecycle
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────────────┐
|
|
│ Command Started │
|
|
└────────────────────────┬────────────────────────────────┘
|
|
│
|
|
▼
|
|
┌─────────────────────────────────────────────────────────┐
|
|
│ Stream Created │
|
|
│ (StreamManager) │
|
|
└────────────────────────┬────────────────────────────────┘
|
|
│
|
|
┌──────────────┴──────────────┐
|
|
│ │
|
|
▼ ▼
|
|
┌───────────────────┐ ┌───────────────────┐
|
|
│ Client Subscribe │ │ Output Events │
|
|
│ (SSE Connection) │◀────────│ (from executor) │
|
|
└───────────────────┘ └───────────────────┘
|
|
│ │
|
|
│ │
|
|
└──────────────┬──────────────┘
|
|
│
|
|
▼
|
|
┌─────────────────────────────────────────────────────────┐
|
|
│ Complete Event │
|
|
│ (exit_code) │
|
|
└────────────────────────┬────────────────────────────────┘
|
|
│
|
|
▼
|
|
┌─────────────────────────────────────────────────────────┐
|
|
│ Stream Cleanup │
|
|
│ (after 30s grace period) │
|
|
└─────────────────────────────────────────────────────────┘
|
|
```
|
|
|
|
## Handler Implementation
|
|
|
|
```go
|
|
func (h *ProjectsHandler) Events(w http.ResponseWriter, r *http.Request) {
|
|
streamID := r.URL.Query().Get("stream_id")
|
|
lastEventID := r.Header.Get("Last-Event-ID")
|
|
|
|
// Set SSE headers
|
|
w.Header().Set("Content-Type", "text/event-stream")
|
|
w.Header().Set("Cache-Control", "no-cache")
|
|
w.Header().Set("Connection", "keep-alive")
|
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
|
|
flusher := w.(http.Flusher)
|
|
|
|
// Subscribe with reconnection support
|
|
var events <-chan StreamEvent
|
|
if lastEventID != "" {
|
|
events, cleanup = h.streams.SubscribeFromID(streamID, lastEventID)
|
|
} else {
|
|
events, cleanup = h.streams.Subscribe(streamID)
|
|
}
|
|
defer cleanup()
|
|
|
|
// Send connected event
|
|
writeSSE(w, flusher, "connected", map[string]any{
|
|
"stream_id": streamID,
|
|
"reconnecting": lastEventID != "",
|
|
})
|
|
|
|
// Heartbeat ticker
|
|
heartbeat := time.NewTicker(30 * time.Second)
|
|
defer heartbeat.Stop()
|
|
|
|
// Event loop
|
|
for {
|
|
select {
|
|
case <-r.Context().Done():
|
|
return
|
|
case event, ok := <-events:
|
|
if !ok {
|
|
return
|
|
}
|
|
writeSSEWithID(w, flusher, event.ID, event.Type, event.Data)
|
|
if event.Type == "complete" {
|
|
return
|
|
}
|
|
case <-heartbeat.C:
|
|
writeSSE(w, flusher, "heartbeat", map[string]any{
|
|
"timestamp": time.Now().UTC().Format(time.RFC3339),
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
func writeSSEWithID(w http.ResponseWriter, flusher http.Flusher,
|
|
id, event string, data map[string]any) {
|
|
|
|
dataBytes, _ := json.Marshal(data)
|
|
if id != "" {
|
|
fmt.Fprintf(w, "id: %s\n", id)
|
|
}
|
|
fmt.Fprintf(w, "event: %s\n", event)
|
|
fmt.Fprintf(w, "data: %s\n\n", dataBytes)
|
|
flusher.Flush()
|
|
}
|
|
```
|
|
|
|
## Performance Considerations
|
|
|
|
### Buffer Sizing
|
|
|
|
```go
|
|
// 100-event buffer to handle bursts
|
|
ch := make(chan StreamEvent, 100)
|
|
```
|
|
|
|
### Heartbeats
|
|
|
|
30-second heartbeats prevent:
|
|
- Proxy timeouts
|
|
- Connection drops from inactive connections
|
|
- Client uncertainty about connection state
|
|
|
|
### Cleanup
|
|
|
|
Streams are cleaned up 30 seconds after completion:
|
|
- Allows time for reconnections
|
|
- Prevents memory leaks
|
|
- Enables late-arriving clients to see final state
|
|
|
|
### Fanout
|
|
|
|
Multiple clients can subscribe to the same stream:
|
|
|
|
```go
|
|
func (sm *StreamManager) Send(streamID, eventType string, data map[string]any) {
|
|
sm.mu.RLock()
|
|
stream := sm.streams[streamID]
|
|
sm.mu.RUnlock()
|
|
|
|
event := StreamEvent{
|
|
ID: generateEventID(),
|
|
Type: eventType,
|
|
Data: data,
|
|
}
|
|
|
|
// Store for replay
|
|
stream.addEvent(event)
|
|
|
|
// Fanout to all listeners
|
|
stream.mu.RLock()
|
|
for _, ch := range stream.listeners {
|
|
select {
|
|
case ch <- event:
|
|
default:
|
|
// Channel full, skip (client too slow)
|
|
}
|
|
}
|
|
stream.mu.RUnlock()
|
|
}
|
|
```
|
|
|
|
## Error Handling
|
|
|
|
### Connection Errors
|
|
|
|
SSE automatically reconnects on error. The browser:
|
|
1. Closes failed connection
|
|
2. Waits 3 seconds (configurable)
|
|
3. Reconnects with `Last-Event-ID`
|
|
|
|
### Slow Clients
|
|
|
|
If a client can't keep up:
|
|
1. Events are dropped (non-blocking send)
|
|
2. Client eventually catches up via replay on reconnect
|
|
|
|
### Stream Not Found
|
|
|
|
If stream doesn't exist (expired or never created):
|
|
```
|
|
event: error
|
|
data: {"message": "stream not found"}
|
|
```
|