Event Channels
Critical Rules
- NO WEBSOCKETS. EVER. All real-time communication uses HTTP2 + SSE.
- User → Server: HTTP2 POST/PUT/DELETE. Standard REST endpoints.
- Server → User: SSE only. One-way event stream.
- Event flow:
POST → Service (enqueue) → Queue → Worker (generate) → Redis pub/sub → Service SSE subscriber → SSE Hub → User
- This applies to EVERYTHING: Chat, notifications, progress updates, generation — all SSE.
- Channel format is non-negotiable:
user:<id> or channel:<id>. No exceptions.
- All events are JSON with
type as the first field.
- Document every channel in this file before using it.
- Service is thin: Validates, enqueues, returns 202. No AI work in the service.
- Worker does all AI work: Initializes providers, processes jobs, publishes events via Redis.
Architecture
┌─────────────┐ POST /generate/* ┌─────────────┐ enqueue ┌─────────────┐
│ Browser │ ──────────────────────▶│ Service │ ────────────▶│ CRDB │
│ │ { jobId } (202) │ (thin) │ │ Queue │
│ │ ◀──────────────────────│ │ └──────┬──────┘
│ │ │ │ │
│ │ │ │ dequeue│
│ │ │ │ ▼
│ │ │ │ ┌──────────────┐
│ │ │ │ │ Worker │
│ │ │ │ │ (AI work) │
│ │ │ │ └──────┬───────┘
│ │ │ │ │
│ │ │ │ publish│(SSE events)
│ │ │ │ ▼
│ │ │ │ ┌──────────────┐
│ │ SSE stream │ SSE Hub │ subscribe │ Redis │
│ │ ◀──────────────────────│ ◀──────────│──────────────│ Pub/Sub │
└─────────────┘ └─────────────┘ └──────────────┘
Generation flow:
- User sends
POST /api/generate/image with prompt
- Service validates, enqueues job in CRDB, returns
202 {jobId}
- Worker dequeues job, calls AI provider (LaoZhang/Gemini)
- Worker publishes progress/result SSE events to Redis
- Service's SSE subscriber receives events from Redis
- SSE Hub delivers to the user's connected SSE stream
Chat flow:
- User sends
POST /api/chat/messages
- Service broadcasts user message to
channel:general via SSE Hub (immediate)
- Service enqueues
ai_chat_response job
- Worker dequeues, streams AI response tokens via Redis pub/sub
- Service's SSE subscriber delivers
ai_chat_chunk events to channel
Channel Types
| Pattern |
Use For |
Example |
user:<id> |
Private events for one user |
user:u_abc123 |
channel:<id> |
Shared events for a room/topic |
channel:general |
Event Structure
Every event MUST follow this structure:
interface Event {
type: string; // REQUIRED: event type identifier
timestamp: string; // REQUIRED: ISO 8601
jobId?: string; // Job correlation ID
progress?: number; // 0-100 percentage
message?: string; // Human-readable status
result?: any; // Payload (type-specific)
error?: string; // Error message
}
Standard Event Types
User Channel Events
Media Generation Events
| Event |
Trigger |
Payload |
generation_started |
Worker picks up job |
{ jobId, message } |
generation_progress |
Progress update |
{ jobId, progress, message } |
generation_complete |
Generation done |
{ jobId, progress: 100, result } |
generation_failed |
Error occurred |
{ jobId, error } |
Text Generation Events (streaming)
| Event |
Trigger |
Payload |
ai_chat_chunk |
Token generated |
{ jobId, result: { streamId, text, done, provider? } } |
Media Upload Events
| Event |
Trigger |
Payload |
upload_started |
Upload job begins |
{ jobId } |
upload_progress |
Chunk uploaded |
{ jobId, progress, bytesUploaded } |
upload_complete |
Processing done |
{ jobId, result: { original, optimized, thumbnail } } |
upload_failed |
Error occurred |
{ jobId, error } |
Room Channel Events
| Event |
Trigger |
Payload |
chat |
User sends message |
{ result: { id, content, userId, userName, timestamp } } |
ai_chat_chunk |
AI streaming chunk |
{ result: { streamId, text, done, provider? } } |
ai_chat |
AI response complete |
{ result: { id, content, provider, timestamp } } |
presence |
User joins/leaves |
{ status, userId, userName } |
typing |
User typing indicator |
{ userId, isTyping } |
Implementation Pattern
Backend (Go) — Service Handler
// Enqueue generation job (service is thin — no AI work)
jobID, err := h.queue.Enqueue(r.Context(), "generate_image", map[string]any{
"prompt": req.Prompt,
"userID": userID,
})
httpresponse.Accepted(w, r, GenerateAccepted{JobID: jobID})
Backend (Go) — Worker Job Handler
// Worker publishes events via Redis SSE publisher
pub.SendToUser(userID, &realtime.SSEEvent{
Type: realtime.EventGenerationComplete,
JobID: jobID,
Progress: 100,
Result: result,
})
Frontend (TypeScript)
// Subscribe to user channel for generation events
const { status, progress, result } = useMediaGeneration<ImageResult>({
endpoint: '/api/generate/image',
userId: currentUser.id,
});
// Subscribe to room channel for chat events
const { messages, aiMessages, streamingMessages } = useChat({
endpoint: '/api/chat/messages',
channel: 'channel:general',
userId: currentUser.id,
});
Active Channels
| Channel |
Events |
Purpose |
user:<userId> |
generation_*, ai_chat_chunk |
Async job results, text streaming |
channel:<room> |
chat, ai_chat_chunk, presence |
Real-time chat |