// Package realtime provides SSE (Server-Sent Events) with Redis-backed broadcasting. // // IMPORTANT: NO WEBSOCKETS. All real-time communication uses HTTP2 + SSE. // - User → Server: HTTP2 POST/PUT/DELETE (standard REST) // - Server → User: SSE (one-way event stream) // // Event flow: server → redis → redis listeners → SSE hub → user // // This pattern applies to ALL real-time features: chat, notifications, progress, presence. // // Usage: // // // Create SSE hub // sseHub := realtime.NewSSEHub(logger) // // // Optional: Add Redis for multi-pod scaling // redisBroadcaster := realtime.NewRedisBroadcaster(redisClient, sseHub, logger) // go redisBroadcaster.Run(ctx) // // // Mount SSE handler // sseHandler := realtime.NewSSEHandler(sseHub, logger) // r.Mount("/api/events", sseHandler.Routes()) // // // In your API handlers, publish events: // sseHub.SendToUser(userID, &realtime.SSEEvent{ // Type: "generation_complete", // JobID: jobID, // Result: result, // }) package realtime import ( "context" "encoding/json" "time" ) // Message represents a real-time message sent between clients. type Message struct { // ID is a unique identifier for the message (optional, set by broadcaster). ID string `json:"id,omitempty"` // Type identifies the message kind (e.g., "chat", "notification", "presence"). Type string `json:"type"` // Room is the target room for the message (empty for direct messages). Room string `json:"room,omitempty"` // From is the sender's client ID (set by server). From string `json:"from,omitempty"` // Data contains the message payload. Data json.RawMessage `json:"data,omitempty"` // Timestamp is when the message was created. Timestamp time.Time `json:"timestamp"` } // SystemMessage creates a system-generated message. func SystemMessage(msgType string, data any) (*Message, error) { dataBytes, err := json.Marshal(data) if err != nil { return nil, err } return &Message{ Type: msgType, From: "system", Data: dataBytes, Timestamp: time.Now().UTC(), }, nil } // Connection represents a connected WebSocket client. type Connection interface { // ID returns the unique connection identifier. ID() string // UserID returns the authenticated user ID (empty if anonymous). UserID() string // UserName returns the display name for the user. UserName() string // Send queues a message for delivery to this connection. // Returns false if the connection is closed or send buffer is full. Send(msg *Message) bool // Close gracefully closes the connection. Close() } // Hub manages active connections and room memberships. type Hub interface { // Register adds a connection to the hub. Register(conn Connection) // Unregister removes a connection from the hub. Unregister(conn Connection) // JoinRoom adds a connection to a room. JoinRoom(conn Connection, room string) // LeaveRoom removes a connection from a room. LeaveRoom(conn Connection, room string) // Broadcast sends a message to all connections in a room. // If room is empty, broadcasts to all connections. Broadcast(msg *Message) // SendTo sends a message to a specific connection by ID. SendTo(connID string, msg *Message) bool // ConnectionCount returns the total number of active connections. ConnectionCount() int // RoomCount returns the number of connections in a specific room. RoomCount(room string) int } // Broadcaster handles cross-pod message distribution. // Implementations typically use Redis Pub/Sub or similar. type Broadcaster interface { // Publish sends a message to all pods. Publish(ctx context.Context, msg *Message) error // Run starts the subscriber loop (blocks until context is cancelled). Run(ctx context.Context) error } // MessageType constants for common message types. const ( MessageTypeChat = "chat" MessageTypeAIChat = "ai_chat" MessageTypeAIChatChunk = "ai_chat_chunk" MessageTypePresence = "presence" MessageTypeNotification = "notification" MessageTypeSystem = "system" MessageTypeError = "error" MessageTypePing = "ping" MessageTypePong = "pong" ) // PresenceStatus constants for presence messages. const ( PresenceOnline = "online" PresenceOffline = "offline" PresenceAway = "away" ) // PresenceData represents presence change data. type PresenceData struct { Status string `json:"status"` UserID string `json:"userId"` UserName string `json:"userName,omitempty"` } // JoinLeaveData represents room join/leave data. type JoinLeaveData struct { Room string `json:"room"` UserID string `json:"userId"` } // ChatData represents user chat message data. type ChatData struct { Content string `json:"content"` UserID string `json:"userId"` UserName string `json:"userName,omitempty"` } // AIResponseData represents AI-generated response data. type AIResponseData struct { Content string `json:"content"` Provider string `json:"provider"` } // AIChunkData represents a streaming chunk of AI response. type AIChunkData struct { // StreamID identifies this stream (for correlating chunks). StreamID string `json:"streamId"` // Text is the chunk content. Text string `json:"text"` // Done indicates this is the final chunk. Done bool `json:"done"` // Provider name (only set on final chunk). Provider string `json:"provider,omitempty"` } // --------------------------------------------------------------------------- // Generation/Upload Events (sent via SSE, not WebSocket) // --------------------------------------------------------------------------- // GenerationEvent types for async job progress. const ( EventGenerationStarted = "generation_started" EventGenerationProgress = "generation_progress" EventGenerationComplete = "generation_complete" EventGenerationFailed = "generation_failed" EventUploadStarted = "upload_started" EventUploadProgress = "upload_progress" EventUploadComplete = "upload_complete" EventUploadFailed = "upload_failed" ) // GenerationResult is the result payload for generation_complete events. type GenerationResult struct { URL string `json:"url"` Provider string `json:"provider"` LatencyMs int64 `json:"latencyMs"` } // UploadResult is the result payload for upload_complete events. type UploadResult struct { ID string `json:"id"` Original string `json:"original"` Optimized string `json:"optimized"` Thumbnail string `json:"thumbnail"` }