218 lines
6.3 KiB
Go
218 lines
6.3 KiB
Go
// Package realtime provides SSE (Server-Sent Events) with Redis-backed broadcasting.
|
|
//
|
|
// IMPORTANT: NO WEBSOCKETS. All real-time communication uses HTTP2 + SSE.
|
|
// - User → Server: HTTP2 POST/PUT/DELETE (standard REST)
|
|
// - Server → User: SSE (one-way event stream)
|
|
//
|
|
// Event flow: server → redis → redis listeners → SSE hub → user
|
|
//
|
|
// This pattern applies to ALL real-time features: chat, notifications, progress, presence.
|
|
//
|
|
// Usage:
|
|
//
|
|
// // Create SSE hub
|
|
// sseHub := realtime.NewSSEHub(logger)
|
|
//
|
|
// // Optional: Add Redis for multi-pod scaling
|
|
// redisBroadcaster := realtime.NewRedisBroadcaster(redisClient, sseHub, logger)
|
|
// go redisBroadcaster.Run(ctx)
|
|
//
|
|
// // Mount SSE handler
|
|
// sseHandler := realtime.NewSSEHandler(sseHub, logger)
|
|
// r.Mount("/api/events", sseHandler.Routes())
|
|
//
|
|
// // In your API handlers, publish events:
|
|
// sseHub.SendToUser(userID, &realtime.SSEEvent{
|
|
// Type: "generation_complete",
|
|
// JobID: jobID,
|
|
// Result: result,
|
|
// })
|
|
package realtime
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"time"
|
|
)
|
|
|
|
// Message represents a real-time message sent between clients.
|
|
type Message struct {
|
|
// ID is a unique identifier for the message (optional, set by broadcaster).
|
|
ID string `json:"id,omitempty"`
|
|
|
|
// Type identifies the message kind (e.g., "chat", "notification", "presence").
|
|
Type string `json:"type"`
|
|
|
|
// Room is the target room for the message (empty for direct messages).
|
|
Room string `json:"room,omitempty"`
|
|
|
|
// From is the sender's client ID (set by server).
|
|
From string `json:"from,omitempty"`
|
|
|
|
// Data contains the message payload.
|
|
Data json.RawMessage `json:"data,omitempty"`
|
|
|
|
// Timestamp is when the message was created.
|
|
Timestamp time.Time `json:"timestamp"`
|
|
}
|
|
|
|
// SystemMessage creates a system-generated message.
|
|
func SystemMessage(msgType string, data any) (*Message, error) {
|
|
dataBytes, err := json.Marshal(data)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &Message{
|
|
Type: msgType,
|
|
From: "system",
|
|
Data: dataBytes,
|
|
Timestamp: time.Now().UTC(),
|
|
}, nil
|
|
}
|
|
|
|
// Connection represents a connected WebSocket client.
|
|
type Connection interface {
|
|
// ID returns the unique connection identifier.
|
|
ID() string
|
|
|
|
// UserID returns the authenticated user ID (empty if anonymous).
|
|
UserID() string
|
|
|
|
// UserName returns the display name for the user.
|
|
UserName() string
|
|
|
|
// Send queues a message for delivery to this connection.
|
|
// Returns false if the connection is closed or send buffer is full.
|
|
Send(msg *Message) bool
|
|
|
|
// Close gracefully closes the connection.
|
|
Close()
|
|
}
|
|
|
|
// Hub manages active connections and room memberships.
|
|
type Hub interface {
|
|
// Register adds a connection to the hub.
|
|
Register(conn Connection)
|
|
|
|
// Unregister removes a connection from the hub.
|
|
Unregister(conn Connection)
|
|
|
|
// JoinRoom adds a connection to a room.
|
|
JoinRoom(conn Connection, room string)
|
|
|
|
// LeaveRoom removes a connection from a room.
|
|
LeaveRoom(conn Connection, room string)
|
|
|
|
// Broadcast sends a message to all connections in a room.
|
|
// If room is empty, broadcasts to all connections.
|
|
Broadcast(msg *Message)
|
|
|
|
// SendTo sends a message to a specific connection by ID.
|
|
SendTo(connID string, msg *Message) bool
|
|
|
|
// ConnectionCount returns the total number of active connections.
|
|
ConnectionCount() int
|
|
|
|
// RoomCount returns the number of connections in a specific room.
|
|
RoomCount(room string) int
|
|
}
|
|
|
|
// Broadcaster handles cross-pod message distribution.
|
|
// Implementations typically use Redis Pub/Sub or similar.
|
|
type Broadcaster interface {
|
|
// Publish sends a message to all pods.
|
|
Publish(ctx context.Context, msg *Message) error
|
|
|
|
// Run starts the subscriber loop (blocks until context is cancelled).
|
|
Run(ctx context.Context) error
|
|
}
|
|
|
|
// MessageType constants for common message types.
|
|
const (
|
|
MessageTypeChat = "chat"
|
|
MessageTypeAIChat = "ai_chat"
|
|
MessageTypeAIChatChunk = "ai_chat_chunk"
|
|
MessageTypePresence = "presence"
|
|
MessageTypeNotification = "notification"
|
|
MessageTypeSystem = "system"
|
|
MessageTypeError = "error"
|
|
MessageTypePing = "ping"
|
|
MessageTypePong = "pong"
|
|
)
|
|
|
|
// PresenceStatus constants for presence messages.
|
|
const (
|
|
PresenceOnline = "online"
|
|
PresenceOffline = "offline"
|
|
PresenceAway = "away"
|
|
)
|
|
|
|
// PresenceData represents presence change data.
|
|
type PresenceData struct {
|
|
Status string `json:"status"`
|
|
UserID string `json:"userId"`
|
|
UserName string `json:"userName,omitempty"`
|
|
}
|
|
|
|
// JoinLeaveData represents room join/leave data.
|
|
type JoinLeaveData struct {
|
|
Room string `json:"room"`
|
|
UserID string `json:"userId"`
|
|
}
|
|
|
|
// ChatData represents user chat message data.
|
|
type ChatData struct {
|
|
Content string `json:"content"`
|
|
UserID string `json:"userId"`
|
|
UserName string `json:"userName,omitempty"`
|
|
}
|
|
|
|
// AIResponseData represents AI-generated response data.
|
|
type AIResponseData struct {
|
|
Content string `json:"content"`
|
|
Provider string `json:"provider"`
|
|
}
|
|
|
|
// AIChunkData represents a streaming chunk of AI response.
|
|
type AIChunkData struct {
|
|
// StreamID identifies this stream (for correlating chunks).
|
|
StreamID string `json:"streamId"`
|
|
// Text is the chunk content.
|
|
Text string `json:"text"`
|
|
// Done indicates this is the final chunk.
|
|
Done bool `json:"done"`
|
|
// Provider name (only set on final chunk).
|
|
Provider string `json:"provider,omitempty"`
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Generation/Upload Events (sent via SSE, not WebSocket)
|
|
// ---------------------------------------------------------------------------
|
|
|
|
// GenerationEvent types for async job progress.
|
|
const (
|
|
EventGenerationStarted = "generation_started"
|
|
EventGenerationProgress = "generation_progress"
|
|
EventGenerationComplete = "generation_complete"
|
|
EventGenerationFailed = "generation_failed"
|
|
EventUploadStarted = "upload_started"
|
|
EventUploadProgress = "upload_progress"
|
|
EventUploadComplete = "upload_complete"
|
|
EventUploadFailed = "upload_failed"
|
|
)
|
|
|
|
// GenerationResult is the result payload for generation_complete events.
|
|
type GenerationResult struct {
|
|
URL string `json:"url"`
|
|
Provider string `json:"provider"`
|
|
LatencyMs int64 `json:"latencyMs"`
|
|
}
|
|
|
|
// UploadResult is the result payload for upload_complete events.
|
|
type UploadResult struct {
|
|
ID string `json:"id"`
|
|
Original string `json:"original"`
|
|
Optimized string `json:"optimized"`
|
|
Thumbnail string `json:"thumbnail"`
|
|
}
|