165 lines
7.0 KiB
Markdown
165 lines
7.0 KiB
Markdown
# Event Channels
|
|
|
|
## Critical Rules
|
|
|
|
- **NO WEBSOCKETS. EVER.** All real-time communication uses HTTP2 + SSE.
|
|
- **User → Server:** HTTP2 POST/PUT/DELETE. Standard REST endpoints.
|
|
- **Server → User:** SSE only. One-way event stream.
|
|
- **Event flow:** `POST → Service (enqueue) → Queue → Worker (generate) → Redis pub/sub → Service SSE subscriber → SSE Hub → User`
|
|
- **This applies to EVERYTHING:** Chat, notifications, progress updates, generation — all SSE.
|
|
- **Channel format is non-negotiable:** `user:<id>` or `channel:<id>`. No exceptions.
|
|
- **All events are JSON** with `type` as the first field.
|
|
- **Document every channel** in this file before using it.
|
|
- **Service is thin:** Validates, enqueues, returns 202. No AI work in the service.
|
|
- **Worker does all AI work:** Initializes providers, processes jobs, publishes events via Redis.
|
|
|
|
## Architecture
|
|
|
|
```
|
|
┌─────────────┐ POST /generate/* ┌─────────────┐ enqueue ┌─────────────┐
|
|
│ Browser │ ──────────────────────▶│ Service │ ────────────▶│ CRDB │
|
|
│ │ { jobId } (202) │ (thin) │ │ Queue │
|
|
│ │ ◀──────────────────────│ │ └──────┬──────┘
|
|
│ │ │ │ │
|
|
│ │ │ │ dequeue│
|
|
│ │ │ │ ▼
|
|
│ │ │ │ ┌──────────────┐
|
|
│ │ │ │ │ Worker │
|
|
│ │ │ │ │ (AI work) │
|
|
│ │ │ │ └──────┬───────┘
|
|
│ │ │ │ │
|
|
│ │ │ │ publish│(SSE events)
|
|
│ │ │ │ ▼
|
|
│ │ │ │ ┌──────────────┐
|
|
│ │ SSE stream │ SSE Hub │ subscribe │ Redis │
|
|
│ │ ◀──────────────────────│ ◀──────────│──────────────│ Pub/Sub │
|
|
└─────────────┘ └─────────────┘ └──────────────┘
|
|
```
|
|
|
|
**Generation flow:**
|
|
1. User sends `POST /api/generate/image` with prompt
|
|
2. Service validates, enqueues job in CRDB, returns `202 {jobId}`
|
|
3. Worker dequeues job, calls AI provider (LaoZhang/Gemini)
|
|
4. Worker publishes progress/result SSE events to Redis
|
|
5. Service's SSE subscriber receives events from Redis
|
|
6. SSE Hub delivers to the user's connected SSE stream
|
|
|
|
**Chat flow:**
|
|
1. User sends `POST /api/chat/messages`
|
|
2. Service broadcasts user message to `channel:general` via SSE Hub (immediate)
|
|
3. Service enqueues `ai_chat_response` job
|
|
4. Worker dequeues, streams AI response tokens via Redis pub/sub
|
|
5. Service's SSE subscriber delivers `ai_chat_chunk` events to channel
|
|
|
|
## Channel Types
|
|
|
|
| Pattern | Use For | Example |
|
|
|---------|---------|---------|
|
|
| `user:<id>` | Private events for one user | `user:u_abc123` |
|
|
| `channel:<id>` | Shared events for a room/topic | `channel:general` |
|
|
|
|
## Event Structure
|
|
|
|
Every event MUST follow this structure:
|
|
|
|
```typescript
|
|
interface Event {
|
|
type: string; // REQUIRED: event type identifier
|
|
timestamp: string; // REQUIRED: ISO 8601
|
|
jobId?: string; // Job correlation ID
|
|
progress?: number; // 0-100 percentage
|
|
message?: string; // Human-readable status
|
|
result?: any; // Payload (type-specific)
|
|
error?: string; // Error message
|
|
}
|
|
```
|
|
|
|
## Standard Event Types
|
|
|
|
### User Channel Events
|
|
|
|
#### Media Generation Events
|
|
|
|
| Event | Trigger | Payload |
|
|
|-------|---------|---------|
|
|
| `generation_started` | Worker picks up job | `{ jobId, message }` |
|
|
| `generation_progress` | Progress update | `{ jobId, progress, message }` |
|
|
| `generation_complete` | Generation done | `{ jobId, progress: 100, result }` |
|
|
| `generation_failed` | Error occurred | `{ jobId, error }` |
|
|
|
|
#### Text Generation Events (streaming)
|
|
|
|
| Event | Trigger | Payload |
|
|
|-------|---------|---------|
|
|
| `ai_chat_chunk` | Token generated | `{ jobId, result: { streamId, text, done, provider? } }` |
|
|
|
|
#### Media Upload Events
|
|
|
|
| Event | Trigger | Payload |
|
|
|-------|---------|---------|
|
|
| `upload_started` | Upload job begins | `{ jobId }` |
|
|
| `upload_progress` | Chunk uploaded | `{ jobId, progress, bytesUploaded }` |
|
|
| `upload_complete` | Processing done | `{ jobId, result: { original, optimized, thumbnail } }` |
|
|
| `upload_failed` | Error occurred | `{ jobId, error }` |
|
|
|
|
### Room Channel Events
|
|
|
|
| Event | Trigger | Payload |
|
|
|-------|---------|---------|
|
|
| `chat` | User sends message | `{ result: { id, content, userId, userName, timestamp } }` |
|
|
| `ai_chat_chunk` | AI streaming chunk | `{ result: { streamId, text, done, provider? } }` |
|
|
| `ai_chat` | AI response complete | `{ result: { id, content, provider, timestamp } }` |
|
|
| `presence` | User joins/leaves | `{ status, userId, userName }` |
|
|
| `typing` | User typing indicator | `{ userId, isTyping }` |
|
|
|
|
## Implementation Pattern
|
|
|
|
### Backend (Go) — Service Handler
|
|
|
|
```go
|
|
// Enqueue generation job (service is thin — no AI work)
|
|
jobID, err := h.queue.Enqueue(r.Context(), "generate_image", map[string]any{
|
|
"prompt": req.Prompt,
|
|
"userID": userID,
|
|
})
|
|
httpresponse.Accepted(w, r, GenerateAccepted{JobID: jobID})
|
|
```
|
|
|
|
### Backend (Go) — Worker Job Handler
|
|
|
|
```go
|
|
// Worker publishes events via Redis SSE publisher
|
|
pub.SendToUser(userID, &realtime.SSEEvent{
|
|
Type: realtime.EventGenerationComplete,
|
|
JobID: jobID,
|
|
Progress: 100,
|
|
Result: result,
|
|
})
|
|
```
|
|
|
|
### Frontend (TypeScript)
|
|
|
|
```typescript
|
|
// Subscribe to user channel for generation events
|
|
const { status, progress, result } = useMediaGeneration<ImageResult>({
|
|
endpoint: '/api/generate/image',
|
|
userId: currentUser.id,
|
|
});
|
|
|
|
// Subscribe to room channel for chat events
|
|
const { messages, aiMessages, streamingMessages } = useChat({
|
|
endpoint: '/api/chat/messages',
|
|
channel: 'channel:general',
|
|
userId: currentUser.id,
|
|
});
|
|
```
|
|
|
|
## Active Channels
|
|
|
|
<!-- Document all channels used in this project below -->
|
|
|
|
| Channel | Events | Purpose |
|
|
|---------|--------|---------|
|
|
| `user:<userId>` | `generation_*`, `ai_chat_chunk` | Async job results, text streaming |
|
|
| `channel:<room>` | `chat`, `ai_chat_chunk`, `presence` | Real-time chat |
|