rdev/docs/api/sse-examples.md
jordan 72d16929ca feat: Implement hexagonal architecture with services, webhooks, queue, and telemetry
Major refactoring to hexagonal (ports & adapters) architecture:

- Add service layer (apikey_service, project_service) for business logic
- Add webhook system with dispatcher and delivery tracking
- Add command queue with priority-based processing
- Add rate limiting with sliding window algorithm
- Add audit logging for command execution
- Add OpenTelemetry integration (traces, metrics, spans)
- Add circuit breaker for fault tolerance
- Add cached repository wrapper for performance
- Add comprehensive validation package
- Add Kubernetes client integration for pod management
- Add database migrations (allowed_ips, audit_log, rate_limiting, queue, webhooks)
- Add network policy and PodDisruptionBudget for k8s
- Remove legacy executor and projects/registry packages
- Untrack secrets.yaml (now managed via envault)
- Add coverage.out to .gitignore
- Add e2e test infrastructure with docker-compose
- Add comprehensive documentation (API, architecture, operations, plans)
- Add golangci-lint config and pre-commit hook

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-25 19:57:46 -07:00

8.9 KiB

SSE Streaming Examples

rdev uses Server-Sent Events (SSE) for real-time command output streaming. This guide provides examples in JavaScript, Python, and Go.

Event Types

Event Description
connected Stream established
output Command output line
complete Command finished
heartbeat Keep-alive signal
error Error occurred

JavaScript (Browser/Node.js)

Browser with EventSource

async function executeCommand(projectId, command, apiKey) {
  // 1. Start the command
  const response = await fetch(`/projects/${projectId}/shell`, {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'X-API-Key': apiKey,
    },
    body: JSON.stringify({ command }),
  });

  const { data } = await response.json();
  const streamId = data.id;

  // 2. Connect to SSE stream
  const eventSource = new EventSource(
    `/projects/${projectId}/events?stream_id=${streamId}`
  );

  return new Promise((resolve, reject) => {
    eventSource.addEventListener('connected', (e) => {
      console.log('Connected:', JSON.parse(e.data));
    });

    eventSource.addEventListener('output', (e) => {
      const { line, stream } = JSON.parse(e.data);
      if (stream === 'stdout') {
        process.stdout.write(line + '\n');
      } else {
        process.stderr.write(line + '\n');
      }
    });

    eventSource.addEventListener('complete', (e) => {
      const { exit_code, duration_ms } = JSON.parse(e.data);
      eventSource.close();
      resolve({ exitCode: exit_code, duration: duration_ms });
    });

    eventSource.addEventListener('error', (e) => {
      if (e.data) {
        const { message } = JSON.parse(e.data);
        reject(new Error(message));
      }
      // Connection error - browser will auto-reconnect
    });
  });
}

// Usage
executeCommand('my-project', 'npm test', 'rdev_xxx')
  .then(({ exitCode }) => console.log('Exit code:', exitCode))
  .catch(console.error);

Node.js with eventsource package

const EventSource = require('eventsource');

function connectSSE(url, apiKey, handlers) {
  const eventSource = new EventSource(url, {
    headers: {
      'X-API-Key': apiKey,
    },
  });

  eventSource.addEventListener('connected', (e) => {
    handlers.onConnected?.(JSON.parse(e.data));
  });

  eventSource.addEventListener('output', (e) => {
    handlers.onOutput?.(JSON.parse(e.data));
  });

  eventSource.addEventListener('complete', (e) => {
    handlers.onComplete?.(JSON.parse(e.data));
    eventSource.close();
  });

  eventSource.onerror = (e) => {
    handlers.onError?.(e);
  };

  return eventSource;
}

// Usage
connectSSE(
  'http://localhost:8080/projects/my-project/events?stream_id=cmd-001',
  'rdev_xxx',
  {
    onOutput: ({ line, stream }) => console.log(`[${stream}] ${line}`),
    onComplete: ({ exit_code }) => console.log('Done:', exit_code),
  }
);

Python

Using sseclient-py

import requests
import sseclient
import json

def execute_command(base_url, project_id, command, api_key):
    """Execute a command and stream output."""
    headers = {
        'X-API-Key': api_key,
        'Content-Type': 'application/json',
    }

    # 1. Start the command
    response = requests.post(
        f'{base_url}/projects/{project_id}/shell',
        headers=headers,
        json={'command': command}
    )
    data = response.json()['data']
    stream_id = data['id']

    # 2. Connect to SSE stream
    response = requests.get(
        f'{base_url}/projects/{project_id}/events',
        params={'stream_id': stream_id},
        headers={'X-API-Key': api_key},
        stream=True
    )

    client = sseclient.SSEClient(response)
    result = None

    for event in client.events():
        data = json.loads(event.data)

        if event.event == 'connected':
            print(f"Connected: {data}")

        elif event.event == 'output':
            stream = data.get('stream', 'stdout')
            line = data.get('line', '')
            print(f"[{stream}] {line}")

        elif event.event == 'complete':
            result = {
                'exit_code': data['exit_code'],
                'duration_ms': data['duration_ms']
            }
            break

        elif event.event == 'heartbeat':
            pass  # Keep-alive

    return result

# Usage
result = execute_command(
    'http://localhost:8080',
    'my-project',
    'pip install -r requirements.txt',
    'rdev_xxx'
)
print(f"Exit code: {result['exit_code']}")

Using aiohttp (async)

import aiohttp
import asyncio
import json

async def execute_command_async(base_url, project_id, command, api_key):
    """Execute a command asynchronously."""
    headers = {'X-API-Key': api_key}

    async with aiohttp.ClientSession(headers=headers) as session:
        # 1. Start the command
        async with session.post(
            f'{base_url}/projects/{project_id}/shell',
            json={'command': command}
        ) as resp:
            data = await resp.json()
            stream_id = data['data']['id']

        # 2. Connect to SSE stream
        async with session.get(
            f'{base_url}/projects/{project_id}/events',
            params={'stream_id': stream_id}
        ) as resp:
            async for line in resp.content:
                line = line.decode('utf-8').strip()

                if line.startswith('event:'):
                    event_type = line[7:]
                elif line.startswith('data:'):
                    data = json.loads(line[6:])

                    if event_type == 'output':
                        print(f"[{data['stream']}] {data['line']}")
                    elif event_type == 'complete':
                        return data

# Usage
result = asyncio.run(execute_command_async(
    'http://localhost:8080',
    'my-project',
    'make build',
    'rdev_xxx'
))

Go

package main

import (
    "bufio"
    "encoding/json"
    "fmt"
    "net/http"
    "strings"
)

type OutputEvent struct {
    Line   string `json:"line"`
    Stream string `json:"stream"`
}

type CompleteEvent struct {
    ExitCode   int `json:"exit_code"`
    DurationMs int `json:"duration_ms"`
}

func executeCommand(baseURL, projectID, command, apiKey string) (*CompleteEvent, error) {
    client := &http.Client{}

    // 1. Start the command
    reqBody := fmt.Sprintf(`{"command": %q}`, command)
    req, _ := http.NewRequest("POST",
        fmt.Sprintf("%s/projects/%s/shell", baseURL, projectID),
        strings.NewReader(reqBody))
    req.Header.Set("X-API-Key", apiKey)
    req.Header.Set("Content-Type", "application/json")

    resp, err := client.Do(req)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()

    var startResp struct {
        Data struct {
            ID string `json:"id"`
        } `json:"data"`
    }
    json.NewDecoder(resp.Body).Decode(&startResp)
    streamID := startResp.Data.ID

    // 2. Connect to SSE stream
    req, _ = http.NewRequest("GET",
        fmt.Sprintf("%s/projects/%s/events?stream_id=%s", baseURL, projectID, streamID),
        nil)
    req.Header.Set("X-API-Key", apiKey)

    resp, err = client.Do(req)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()

    scanner := bufio.NewScanner(resp.Body)
    var eventType string

    for scanner.Scan() {
        line := scanner.Text()

        if strings.HasPrefix(line, "event:") {
            eventType = strings.TrimSpace(line[6:])
        } else if strings.HasPrefix(line, "data:") {
            data := line[5:]

            switch eventType {
            case "output":
                var output OutputEvent
                json.Unmarshal([]byte(data), &output)
                fmt.Printf("[%s] %s\n", output.Stream, output.Line)

            case "complete":
                var complete CompleteEvent
                json.Unmarshal([]byte(data), &complete)
                return &complete, nil
            }
        }
    }

    return nil, scanner.Err()
}

func main() {
    result, err := executeCommand(
        "http://localhost:8080",
        "my-project",
        "go test ./...",
        "rdev_xxx",
    )
    if err != nil {
        panic(err)
    }
    fmt.Printf("Exit code: %d\n", result.ExitCode)
}

Reconnection Handling

SSE supports automatic reconnection with the Last-Event-ID header.

JavaScript

The browser's EventSource automatically reconnects with Last-Event-ID.

Python

def connect_with_reconnect(url, api_key, last_event_id=None):
    headers = {'X-API-Key': api_key}
    if last_event_id:
        headers['Last-Event-ID'] = last_event_id

    response = requests.get(url, headers=headers, stream=True)
    return sseclient.SSEClient(response)

Go

req.Header.Set("Last-Event-ID", lastEventID)

Error Handling

Always handle SSE errors gracefully:

eventSource.onerror = (e) => {
  if (eventSource.readyState === EventSource.CLOSED) {
    console.log('Connection closed');
  } else {
    console.log('Connection error, reconnecting...');
  }
};