Major refactoring to hexagonal (ports & adapters) architecture: - Add service layer (apikey_service, project_service) for business logic - Add webhook system with dispatcher and delivery tracking - Add command queue with priority-based processing - Add rate limiting with sliding window algorithm - Add audit logging for command execution - Add OpenTelemetry integration (traces, metrics, spans) - Add circuit breaker for fault tolerance - Add cached repository wrapper for performance - Add comprehensive validation package - Add Kubernetes client integration for pod management - Add database migrations (allowed_ips, audit_log, rate_limiting, queue, webhooks) - Add network policy and PodDisruptionBudget for k8s - Remove legacy executor and projects/registry packages - Untrack secrets.yaml (now managed via envault) - Add coverage.out to .gitignore - Add e2e test infrastructure with docker-compose - Add comprehensive documentation (API, architecture, operations, plans) - Add golangci-lint config and pre-commit hook Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
375 lines
8.9 KiB
Markdown
375 lines
8.9 KiB
Markdown
# SSE Streaming Examples
|
|
|
|
rdev uses Server-Sent Events (SSE) for real-time command output streaming. This guide provides examples in JavaScript, Python, and Go.
|
|
|
|
## Event Types
|
|
|
|
| Event | Description |
|
|
|-------|-------------|
|
|
| `connected` | Stream established |
|
|
| `output` | Command output line |
|
|
| `complete` | Command finished |
|
|
| `heartbeat` | Keep-alive signal |
|
|
| `error` | Error occurred |
|
|
|
|
## JavaScript (Browser/Node.js)
|
|
|
|
### Browser with EventSource
|
|
|
|
```javascript
|
|
async function executeCommand(projectId, command, apiKey) {
|
|
// 1. Start the command
|
|
const response = await fetch(`/projects/${projectId}/shell`, {
|
|
method: 'POST',
|
|
headers: {
|
|
'Content-Type': 'application/json',
|
|
'X-API-Key': apiKey,
|
|
},
|
|
body: JSON.stringify({ command }),
|
|
});
|
|
|
|
const { data } = await response.json();
|
|
const streamId = data.id;
|
|
|
|
// 2. Connect to SSE stream
|
|
const eventSource = new EventSource(
|
|
`/projects/${projectId}/events?stream_id=${streamId}`
|
|
);
|
|
|
|
return new Promise((resolve, reject) => {
|
|
eventSource.addEventListener('connected', (e) => {
|
|
console.log('Connected:', JSON.parse(e.data));
|
|
});
|
|
|
|
eventSource.addEventListener('output', (e) => {
|
|
const { line, stream } = JSON.parse(e.data);
|
|
if (stream === 'stdout') {
|
|
process.stdout.write(line + '\n');
|
|
} else {
|
|
process.stderr.write(line + '\n');
|
|
}
|
|
});
|
|
|
|
eventSource.addEventListener('complete', (e) => {
|
|
const { exit_code, duration_ms } = JSON.parse(e.data);
|
|
eventSource.close();
|
|
resolve({ exitCode: exit_code, duration: duration_ms });
|
|
});
|
|
|
|
eventSource.addEventListener('error', (e) => {
|
|
if (e.data) {
|
|
const { message } = JSON.parse(e.data);
|
|
reject(new Error(message));
|
|
}
|
|
// Connection error - browser will auto-reconnect
|
|
});
|
|
});
|
|
}
|
|
|
|
// Usage
|
|
executeCommand('my-project', 'npm test', 'rdev_xxx')
|
|
.then(({ exitCode }) => console.log('Exit code:', exitCode))
|
|
.catch(console.error);
|
|
```
|
|
|
|
### Node.js with eventsource package
|
|
|
|
```javascript
|
|
const EventSource = require('eventsource');
|
|
|
|
function connectSSE(url, apiKey, handlers) {
|
|
const eventSource = new EventSource(url, {
|
|
headers: {
|
|
'X-API-Key': apiKey,
|
|
},
|
|
});
|
|
|
|
eventSource.addEventListener('connected', (e) => {
|
|
handlers.onConnected?.(JSON.parse(e.data));
|
|
});
|
|
|
|
eventSource.addEventListener('output', (e) => {
|
|
handlers.onOutput?.(JSON.parse(e.data));
|
|
});
|
|
|
|
eventSource.addEventListener('complete', (e) => {
|
|
handlers.onComplete?.(JSON.parse(e.data));
|
|
eventSource.close();
|
|
});
|
|
|
|
eventSource.onerror = (e) => {
|
|
handlers.onError?.(e);
|
|
};
|
|
|
|
return eventSource;
|
|
}
|
|
|
|
// Usage
|
|
connectSSE(
|
|
'http://localhost:8080/projects/my-project/events?stream_id=cmd-001',
|
|
'rdev_xxx',
|
|
{
|
|
onOutput: ({ line, stream }) => console.log(`[${stream}] ${line}`),
|
|
onComplete: ({ exit_code }) => console.log('Done:', exit_code),
|
|
}
|
|
);
|
|
```
|
|
|
|
## Python
|
|
|
|
### Using sseclient-py
|
|
|
|
```python
|
|
import requests
|
|
import sseclient
|
|
import json
|
|
|
|
def execute_command(base_url, project_id, command, api_key):
|
|
"""Execute a command and stream output."""
|
|
headers = {
|
|
'X-API-Key': api_key,
|
|
'Content-Type': 'application/json',
|
|
}
|
|
|
|
# 1. Start the command
|
|
response = requests.post(
|
|
f'{base_url}/projects/{project_id}/shell',
|
|
headers=headers,
|
|
json={'command': command}
|
|
)
|
|
data = response.json()['data']
|
|
stream_id = data['id']
|
|
|
|
# 2. Connect to SSE stream
|
|
response = requests.get(
|
|
f'{base_url}/projects/{project_id}/events',
|
|
params={'stream_id': stream_id},
|
|
headers={'X-API-Key': api_key},
|
|
stream=True
|
|
)
|
|
|
|
client = sseclient.SSEClient(response)
|
|
result = None
|
|
|
|
for event in client.events():
|
|
data = json.loads(event.data)
|
|
|
|
if event.event == 'connected':
|
|
print(f"Connected: {data}")
|
|
|
|
elif event.event == 'output':
|
|
stream = data.get('stream', 'stdout')
|
|
line = data.get('line', '')
|
|
print(f"[{stream}] {line}")
|
|
|
|
elif event.event == 'complete':
|
|
result = {
|
|
'exit_code': data['exit_code'],
|
|
'duration_ms': data['duration_ms']
|
|
}
|
|
break
|
|
|
|
elif event.event == 'heartbeat':
|
|
pass # Keep-alive
|
|
|
|
return result
|
|
|
|
# Usage
|
|
result = execute_command(
|
|
'http://localhost:8080',
|
|
'my-project',
|
|
'pip install -r requirements.txt',
|
|
'rdev_xxx'
|
|
)
|
|
print(f"Exit code: {result['exit_code']}")
|
|
```
|
|
|
|
### Using aiohttp (async)
|
|
|
|
```python
|
|
import aiohttp
|
|
import asyncio
|
|
import json
|
|
|
|
async def execute_command_async(base_url, project_id, command, api_key):
|
|
"""Execute a command asynchronously."""
|
|
headers = {'X-API-Key': api_key}
|
|
|
|
async with aiohttp.ClientSession(headers=headers) as session:
|
|
# 1. Start the command
|
|
async with session.post(
|
|
f'{base_url}/projects/{project_id}/shell',
|
|
json={'command': command}
|
|
) as resp:
|
|
data = await resp.json()
|
|
stream_id = data['data']['id']
|
|
|
|
# 2. Connect to SSE stream
|
|
async with session.get(
|
|
f'{base_url}/projects/{project_id}/events',
|
|
params={'stream_id': stream_id}
|
|
) as resp:
|
|
async for line in resp.content:
|
|
line = line.decode('utf-8').strip()
|
|
|
|
if line.startswith('event:'):
|
|
event_type = line[7:]
|
|
elif line.startswith('data:'):
|
|
data = json.loads(line[6:])
|
|
|
|
if event_type == 'output':
|
|
print(f"[{data['stream']}] {data['line']}")
|
|
elif event_type == 'complete':
|
|
return data
|
|
|
|
# Usage
|
|
result = asyncio.run(execute_command_async(
|
|
'http://localhost:8080',
|
|
'my-project',
|
|
'make build',
|
|
'rdev_xxx'
|
|
))
|
|
```
|
|
|
|
## Go
|
|
|
|
```go
|
|
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"strings"
|
|
)
|
|
|
|
type OutputEvent struct {
|
|
Line string `json:"line"`
|
|
Stream string `json:"stream"`
|
|
}
|
|
|
|
type CompleteEvent struct {
|
|
ExitCode int `json:"exit_code"`
|
|
DurationMs int `json:"duration_ms"`
|
|
}
|
|
|
|
func executeCommand(baseURL, projectID, command, apiKey string) (*CompleteEvent, error) {
|
|
client := &http.Client{}
|
|
|
|
// 1. Start the command
|
|
reqBody := fmt.Sprintf(`{"command": %q}`, command)
|
|
req, _ := http.NewRequest("POST",
|
|
fmt.Sprintf("%s/projects/%s/shell", baseURL, projectID),
|
|
strings.NewReader(reqBody))
|
|
req.Header.Set("X-API-Key", apiKey)
|
|
req.Header.Set("Content-Type", "application/json")
|
|
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
var startResp struct {
|
|
Data struct {
|
|
ID string `json:"id"`
|
|
} `json:"data"`
|
|
}
|
|
json.NewDecoder(resp.Body).Decode(&startResp)
|
|
streamID := startResp.Data.ID
|
|
|
|
// 2. Connect to SSE stream
|
|
req, _ = http.NewRequest("GET",
|
|
fmt.Sprintf("%s/projects/%s/events?stream_id=%s", baseURL, projectID, streamID),
|
|
nil)
|
|
req.Header.Set("X-API-Key", apiKey)
|
|
|
|
resp, err = client.Do(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
scanner := bufio.NewScanner(resp.Body)
|
|
var eventType string
|
|
|
|
for scanner.Scan() {
|
|
line := scanner.Text()
|
|
|
|
if strings.HasPrefix(line, "event:") {
|
|
eventType = strings.TrimSpace(line[6:])
|
|
} else if strings.HasPrefix(line, "data:") {
|
|
data := line[5:]
|
|
|
|
switch eventType {
|
|
case "output":
|
|
var output OutputEvent
|
|
json.Unmarshal([]byte(data), &output)
|
|
fmt.Printf("[%s] %s\n", output.Stream, output.Line)
|
|
|
|
case "complete":
|
|
var complete CompleteEvent
|
|
json.Unmarshal([]byte(data), &complete)
|
|
return &complete, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil, scanner.Err()
|
|
}
|
|
|
|
func main() {
|
|
result, err := executeCommand(
|
|
"http://localhost:8080",
|
|
"my-project",
|
|
"go test ./...",
|
|
"rdev_xxx",
|
|
)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
fmt.Printf("Exit code: %d\n", result.ExitCode)
|
|
}
|
|
```
|
|
|
|
## Reconnection Handling
|
|
|
|
SSE supports automatic reconnection with the `Last-Event-ID` header.
|
|
|
|
### JavaScript
|
|
|
|
The browser's EventSource automatically reconnects with `Last-Event-ID`.
|
|
|
|
### Python
|
|
|
|
```python
|
|
def connect_with_reconnect(url, api_key, last_event_id=None):
|
|
headers = {'X-API-Key': api_key}
|
|
if last_event_id:
|
|
headers['Last-Event-ID'] = last_event_id
|
|
|
|
response = requests.get(url, headers=headers, stream=True)
|
|
return sseclient.SSEClient(response)
|
|
```
|
|
|
|
### Go
|
|
|
|
```go
|
|
req.Header.Set("Last-Event-ID", lastEventID)
|
|
```
|
|
|
|
## Error Handling
|
|
|
|
Always handle SSE errors gracefully:
|
|
|
|
```javascript
|
|
eventSource.onerror = (e) => {
|
|
if (eventSource.readyState === EventSource.CLOSED) {
|
|
console.log('Connection closed');
|
|
} else {
|
|
console.log('Connection error, reconnecting...');
|
|
}
|
|
};
|
|
```
|