rdev/docs/api/sse-examples.md
jordan 72d16929ca feat: Implement hexagonal architecture with services, webhooks, queue, and telemetry
Major refactoring to hexagonal (ports & adapters) architecture:

- Add service layer (apikey_service, project_service) for business logic
- Add webhook system with dispatcher and delivery tracking
- Add command queue with priority-based processing
- Add rate limiting with sliding window algorithm
- Add audit logging for command execution
- Add OpenTelemetry integration (traces, metrics, spans)
- Add circuit breaker for fault tolerance
- Add cached repository wrapper for performance
- Add comprehensive validation package
- Add Kubernetes client integration for pod management
- Add database migrations (allowed_ips, audit_log, rate_limiting, queue, webhooks)
- Add network policy and PodDisruptionBudget for k8s
- Remove legacy executor and projects/registry packages
- Untrack secrets.yaml (now managed via envault)
- Add coverage.out to .gitignore
- Add e2e test infrastructure with docker-compose
- Add comprehensive documentation (API, architecture, operations, plans)
- Add golangci-lint config and pre-commit hook

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-25 19:57:46 -07:00

375 lines
8.9 KiB
Markdown

# SSE Streaming Examples
rdev uses Server-Sent Events (SSE) for real-time command output streaming. This guide provides examples in JavaScript, Python, and Go.
## Event Types
| Event | Description |
|-------|-------------|
| `connected` | Stream established |
| `output` | Command output line |
| `complete` | Command finished |
| `heartbeat` | Keep-alive signal |
| `error` | Error occurred |
## JavaScript (Browser/Node.js)
### Browser with EventSource
```javascript
async function executeCommand(projectId, command, apiKey) {
// 1. Start the command
const response = await fetch(`/projects/${projectId}/shell`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-API-Key': apiKey,
},
body: JSON.stringify({ command }),
});
const { data } = await response.json();
const streamId = data.id;
// 2. Connect to SSE stream
const eventSource = new EventSource(
`/projects/${projectId}/events?stream_id=${streamId}`
);
return new Promise((resolve, reject) => {
eventSource.addEventListener('connected', (e) => {
console.log('Connected:', JSON.parse(e.data));
});
eventSource.addEventListener('output', (e) => {
const { line, stream } = JSON.parse(e.data);
if (stream === 'stdout') {
process.stdout.write(line + '\n');
} else {
process.stderr.write(line + '\n');
}
});
eventSource.addEventListener('complete', (e) => {
const { exit_code, duration_ms } = JSON.parse(e.data);
eventSource.close();
resolve({ exitCode: exit_code, duration: duration_ms });
});
eventSource.addEventListener('error', (e) => {
if (e.data) {
const { message } = JSON.parse(e.data);
reject(new Error(message));
}
// Connection error - browser will auto-reconnect
});
});
}
// Usage
executeCommand('my-project', 'npm test', 'rdev_xxx')
.then(({ exitCode }) => console.log('Exit code:', exitCode))
.catch(console.error);
```
### Node.js with eventsource package
```javascript
const EventSource = require('eventsource');
function connectSSE(url, apiKey, handlers) {
const eventSource = new EventSource(url, {
headers: {
'X-API-Key': apiKey,
},
});
eventSource.addEventListener('connected', (e) => {
handlers.onConnected?.(JSON.parse(e.data));
});
eventSource.addEventListener('output', (e) => {
handlers.onOutput?.(JSON.parse(e.data));
});
eventSource.addEventListener('complete', (e) => {
handlers.onComplete?.(JSON.parse(e.data));
eventSource.close();
});
eventSource.onerror = (e) => {
handlers.onError?.(e);
};
return eventSource;
}
// Usage
connectSSE(
'http://localhost:8080/projects/my-project/events?stream_id=cmd-001',
'rdev_xxx',
{
onOutput: ({ line, stream }) => console.log(`[${stream}] ${line}`),
onComplete: ({ exit_code }) => console.log('Done:', exit_code),
}
);
```
## Python
### Using sseclient-py
```python
import requests
import sseclient
import json
def execute_command(base_url, project_id, command, api_key):
"""Execute a command and stream output."""
headers = {
'X-API-Key': api_key,
'Content-Type': 'application/json',
}
# 1. Start the command
response = requests.post(
f'{base_url}/projects/{project_id}/shell',
headers=headers,
json={'command': command}
)
data = response.json()['data']
stream_id = data['id']
# 2. Connect to SSE stream
response = requests.get(
f'{base_url}/projects/{project_id}/events',
params={'stream_id': stream_id},
headers={'X-API-Key': api_key},
stream=True
)
client = sseclient.SSEClient(response)
result = None
for event in client.events():
data = json.loads(event.data)
if event.event == 'connected':
print(f"Connected: {data}")
elif event.event == 'output':
stream = data.get('stream', 'stdout')
line = data.get('line', '')
print(f"[{stream}] {line}")
elif event.event == 'complete':
result = {
'exit_code': data['exit_code'],
'duration_ms': data['duration_ms']
}
break
elif event.event == 'heartbeat':
pass # Keep-alive
return result
# Usage
result = execute_command(
'http://localhost:8080',
'my-project',
'pip install -r requirements.txt',
'rdev_xxx'
)
print(f"Exit code: {result['exit_code']}")
```
### Using aiohttp (async)
```python
import aiohttp
import asyncio
import json
async def execute_command_async(base_url, project_id, command, api_key):
"""Execute a command asynchronously."""
headers = {'X-API-Key': api_key}
async with aiohttp.ClientSession(headers=headers) as session:
# 1. Start the command
async with session.post(
f'{base_url}/projects/{project_id}/shell',
json={'command': command}
) as resp:
data = await resp.json()
stream_id = data['data']['id']
# 2. Connect to SSE stream
async with session.get(
f'{base_url}/projects/{project_id}/events',
params={'stream_id': stream_id}
) as resp:
async for line in resp.content:
line = line.decode('utf-8').strip()
if line.startswith('event:'):
event_type = line[7:]
elif line.startswith('data:'):
data = json.loads(line[6:])
if event_type == 'output':
print(f"[{data['stream']}] {data['line']}")
elif event_type == 'complete':
return data
# Usage
result = asyncio.run(execute_command_async(
'http://localhost:8080',
'my-project',
'make build',
'rdev_xxx'
))
```
## Go
```go
package main
import (
"bufio"
"encoding/json"
"fmt"
"net/http"
"strings"
)
type OutputEvent struct {
Line string `json:"line"`
Stream string `json:"stream"`
}
type CompleteEvent struct {
ExitCode int `json:"exit_code"`
DurationMs int `json:"duration_ms"`
}
func executeCommand(baseURL, projectID, command, apiKey string) (*CompleteEvent, error) {
client := &http.Client{}
// 1. Start the command
reqBody := fmt.Sprintf(`{"command": %q}`, command)
req, _ := http.NewRequest("POST",
fmt.Sprintf("%s/projects/%s/shell", baseURL, projectID),
strings.NewReader(reqBody))
req.Header.Set("X-API-Key", apiKey)
req.Header.Set("Content-Type", "application/json")
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var startResp struct {
Data struct {
ID string `json:"id"`
} `json:"data"`
}
json.NewDecoder(resp.Body).Decode(&startResp)
streamID := startResp.Data.ID
// 2. Connect to SSE stream
req, _ = http.NewRequest("GET",
fmt.Sprintf("%s/projects/%s/events?stream_id=%s", baseURL, projectID, streamID),
nil)
req.Header.Set("X-API-Key", apiKey)
resp, err = client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
scanner := bufio.NewScanner(resp.Body)
var eventType string
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "event:") {
eventType = strings.TrimSpace(line[6:])
} else if strings.HasPrefix(line, "data:") {
data := line[5:]
switch eventType {
case "output":
var output OutputEvent
json.Unmarshal([]byte(data), &output)
fmt.Printf("[%s] %s\n", output.Stream, output.Line)
case "complete":
var complete CompleteEvent
json.Unmarshal([]byte(data), &complete)
return &complete, nil
}
}
}
return nil, scanner.Err()
}
func main() {
result, err := executeCommand(
"http://localhost:8080",
"my-project",
"go test ./...",
"rdev_xxx",
)
if err != nil {
panic(err)
}
fmt.Printf("Exit code: %d\n", result.ExitCode)
}
```
## Reconnection Handling
SSE supports automatic reconnection with the `Last-Event-ID` header.
### JavaScript
The browser's EventSource automatically reconnects with `Last-Event-ID`.
### Python
```python
def connect_with_reconnect(url, api_key, last_event_id=None):
headers = {'X-API-Key': api_key}
if last_event_id:
headers['Last-Event-ID'] = last_event_id
response = requests.get(url, headers=headers, stream=True)
return sseclient.SSEClient(response)
```
### Go
```go
req.Header.Set("Last-Event-ID", lastEventID)
```
## Error Handling
Always handle SSE errors gracefully:
```javascript
eventSource.onerror = (e) => {
if (eventSource.readyState === EventSource.CLOSED) {
console.log('Connection closed');
} else {
console.log('Connection error, reconnecting...');
}
};
```