# SSE Streaming Examples rdev uses Server-Sent Events (SSE) for real-time command output streaming. This guide provides examples in JavaScript, Python, and Go. ## Event Types | Event | Description | |-------|-------------| | `connected` | Stream established | | `output` | Command output line | | `complete` | Command finished | | `heartbeat` | Keep-alive signal | | `error` | Error occurred | ## JavaScript (Browser/Node.js) ### Browser with EventSource ```javascript async function executeCommand(projectId, command, apiKey) { // 1. Start the command const response = await fetch(`/projects/${projectId}/shell`, { method: 'POST', headers: { 'Content-Type': 'application/json', 'X-API-Key': apiKey, }, body: JSON.stringify({ command }), }); const { data } = await response.json(); const streamId = data.id; // 2. Connect to SSE stream const eventSource = new EventSource( `/projects/${projectId}/events?stream_id=${streamId}` ); return new Promise((resolve, reject) => { eventSource.addEventListener('connected', (e) => { console.log('Connected:', JSON.parse(e.data)); }); eventSource.addEventListener('output', (e) => { const { line, stream } = JSON.parse(e.data); if (stream === 'stdout') { process.stdout.write(line + '\n'); } else { process.stderr.write(line + '\n'); } }); eventSource.addEventListener('complete', (e) => { const { exit_code, duration_ms } = JSON.parse(e.data); eventSource.close(); resolve({ exitCode: exit_code, duration: duration_ms }); }); eventSource.addEventListener('error', (e) => { if (e.data) { const { message } = JSON.parse(e.data); reject(new Error(message)); } // Connection error - browser will auto-reconnect }); }); } // Usage executeCommand('my-project', 'npm test', 'rdev_xxx') .then(({ exitCode }) => console.log('Exit code:', exitCode)) .catch(console.error); ``` ### Node.js with eventsource package ```javascript const EventSource = require('eventsource'); function connectSSE(url, apiKey, handlers) { const eventSource = new EventSource(url, { headers: { 'X-API-Key': apiKey, }, }); eventSource.addEventListener('connected', (e) => { handlers.onConnected?.(JSON.parse(e.data)); }); eventSource.addEventListener('output', (e) => { handlers.onOutput?.(JSON.parse(e.data)); }); eventSource.addEventListener('complete', (e) => { handlers.onComplete?.(JSON.parse(e.data)); eventSource.close(); }); eventSource.onerror = (e) => { handlers.onError?.(e); }; return eventSource; } // Usage connectSSE( 'http://localhost:8080/projects/my-project/events?stream_id=cmd-001', 'rdev_xxx', { onOutput: ({ line, stream }) => console.log(`[${stream}] ${line}`), onComplete: ({ exit_code }) => console.log('Done:', exit_code), } ); ``` ## Python ### Using sseclient-py ```python import requests import sseclient import json def execute_command(base_url, project_id, command, api_key): """Execute a command and stream output.""" headers = { 'X-API-Key': api_key, 'Content-Type': 'application/json', } # 1. Start the command response = requests.post( f'{base_url}/projects/{project_id}/shell', headers=headers, json={'command': command} ) data = response.json()['data'] stream_id = data['id'] # 2. Connect to SSE stream response = requests.get( f'{base_url}/projects/{project_id}/events', params={'stream_id': stream_id}, headers={'X-API-Key': api_key}, stream=True ) client = sseclient.SSEClient(response) result = None for event in client.events(): data = json.loads(event.data) if event.event == 'connected': print(f"Connected: {data}") elif event.event == 'output': stream = data.get('stream', 'stdout') line = data.get('line', '') print(f"[{stream}] {line}") elif event.event == 'complete': result = { 'exit_code': data['exit_code'], 'duration_ms': data['duration_ms'] } break elif event.event == 'heartbeat': pass # Keep-alive return result # Usage result = execute_command( 'http://localhost:8080', 'my-project', 'pip install -r requirements.txt', 'rdev_xxx' ) print(f"Exit code: {result['exit_code']}") ``` ### Using aiohttp (async) ```python import aiohttp import asyncio import json async def execute_command_async(base_url, project_id, command, api_key): """Execute a command asynchronously.""" headers = {'X-API-Key': api_key} async with aiohttp.ClientSession(headers=headers) as session: # 1. Start the command async with session.post( f'{base_url}/projects/{project_id}/shell', json={'command': command} ) as resp: data = await resp.json() stream_id = data['data']['id'] # 2. Connect to SSE stream async with session.get( f'{base_url}/projects/{project_id}/events', params={'stream_id': stream_id} ) as resp: async for line in resp.content: line = line.decode('utf-8').strip() if line.startswith('event:'): event_type = line[7:] elif line.startswith('data:'): data = json.loads(line[6:]) if event_type == 'output': print(f"[{data['stream']}] {data['line']}") elif event_type == 'complete': return data # Usage result = asyncio.run(execute_command_async( 'http://localhost:8080', 'my-project', 'make build', 'rdev_xxx' )) ``` ## Go ```go package main import ( "bufio" "encoding/json" "fmt" "net/http" "strings" ) type OutputEvent struct { Line string `json:"line"` Stream string `json:"stream"` } type CompleteEvent struct { ExitCode int `json:"exit_code"` DurationMs int `json:"duration_ms"` } func executeCommand(baseURL, projectID, command, apiKey string) (*CompleteEvent, error) { client := &http.Client{} // 1. Start the command reqBody := fmt.Sprintf(`{"command": %q}`, command) req, _ := http.NewRequest("POST", fmt.Sprintf("%s/projects/%s/shell", baseURL, projectID), strings.NewReader(reqBody)) req.Header.Set("X-API-Key", apiKey) req.Header.Set("Content-Type", "application/json") resp, err := client.Do(req) if err != nil { return nil, err } defer resp.Body.Close() var startResp struct { Data struct { ID string `json:"id"` } `json:"data"` } json.NewDecoder(resp.Body).Decode(&startResp) streamID := startResp.Data.ID // 2. Connect to SSE stream req, _ = http.NewRequest("GET", fmt.Sprintf("%s/projects/%s/events?stream_id=%s", baseURL, projectID, streamID), nil) req.Header.Set("X-API-Key", apiKey) resp, err = client.Do(req) if err != nil { return nil, err } defer resp.Body.Close() scanner := bufio.NewScanner(resp.Body) var eventType string for scanner.Scan() { line := scanner.Text() if strings.HasPrefix(line, "event:") { eventType = strings.TrimSpace(line[6:]) } else if strings.HasPrefix(line, "data:") { data := line[5:] switch eventType { case "output": var output OutputEvent json.Unmarshal([]byte(data), &output) fmt.Printf("[%s] %s\n", output.Stream, output.Line) case "complete": var complete CompleteEvent json.Unmarshal([]byte(data), &complete) return &complete, nil } } } return nil, scanner.Err() } func main() { result, err := executeCommand( "http://localhost:8080", "my-project", "go test ./...", "rdev_xxx", ) if err != nil { panic(err) } fmt.Printf("Exit code: %d\n", result.ExitCode) } ``` ## Reconnection Handling SSE supports automatic reconnection with the `Last-Event-ID` header. ### JavaScript The browser's EventSource automatically reconnects with `Last-Event-ID`. ### Python ```python def connect_with_reconnect(url, api_key, last_event_id=None): headers = {'X-API-Key': api_key} if last_event_id: headers['Last-Event-ID'] = last_event_id response = requests.get(url, headers=headers, stream=True) return sseclient.SSEClient(response) ``` ### Go ```go req.Header.Set("Last-Event-ID", lastEventID) ``` ## Error Handling Always handle SSE errors gracefully: ```javascript eventSource.onerror = (e) => { if (eventSource.readyState === EventSource.CLOSED) { console.log('Connection closed'); } else { console.log('Connection error, reconnecting...'); } }; ```