// Package realtime provides WebSocket communication with Redis-backed broadcasting. // // This package enables real-time bidirectional communication with: // - WebSocket connections with automatic ping/pong heartbeat // - Room-based message grouping // - Cross-pod broadcasting via Redis Pub/Sub // - Graceful connection lifecycle management // // Usage: // // // Create a hub (local connection registry) // hub := realtime.NewHub(logger) // go hub.Run(ctx) // // // Optional: Add Redis for multi-pod scaling // redisBroadcaster := realtime.NewRedisBroadcaster(redisClient, hub, logger) // go redisBroadcaster.Run(ctx) // // // Mount the WebSocket handler // handler := realtime.NewHandler(hub, logger, realtime.HandlerConfig{ // Broadcaster: redisBroadcaster, // nil for single-pod // }) // r.Mount("/ws", handler.Routes()) package realtime import ( "context" "encoding/json" "time" ) // Message represents a real-time message sent between clients. type Message struct { // ID is a unique identifier for the message (optional, set by broadcaster). ID string `json:"id,omitempty"` // Type identifies the message kind (e.g., "chat", "notification", "presence"). Type string `json:"type"` // Room is the target room for the message (empty for direct messages). Room string `json:"room,omitempty"` // From is the sender's client ID (set by server). From string `json:"from,omitempty"` // Data contains the message payload. Data json.RawMessage `json:"data,omitempty"` // Timestamp is when the message was created. Timestamp time.Time `json:"timestamp"` } // SystemMessage creates a system-generated message. func SystemMessage(msgType string, data any) (*Message, error) { dataBytes, err := json.Marshal(data) if err != nil { return nil, err } return &Message{ Type: msgType, From: "system", Data: dataBytes, Timestamp: time.Now().UTC(), }, nil } // Connection represents a connected WebSocket client. type Connection interface { // ID returns the unique connection identifier. ID() string // UserID returns the authenticated user ID (empty if anonymous). UserID() string // Send queues a message for delivery to this connection. // Returns false if the connection is closed or send buffer is full. Send(msg *Message) bool // Close gracefully closes the connection. Close() } // Hub manages active connections and room memberships. type Hub interface { // Register adds a connection to the hub. Register(conn Connection) // Unregister removes a connection from the hub. Unregister(conn Connection) // JoinRoom adds a connection to a room. JoinRoom(conn Connection, room string) // LeaveRoom removes a connection from a room. LeaveRoom(conn Connection, room string) // Broadcast sends a message to all connections in a room. // If room is empty, broadcasts to all connections. Broadcast(msg *Message) // SendTo sends a message to a specific connection by ID. SendTo(connID string, msg *Message) bool // ConnectionCount returns the total number of active connections. ConnectionCount() int // RoomCount returns the number of connections in a specific room. RoomCount(room string) int } // Broadcaster handles cross-pod message distribution. // Implementations typically use Redis Pub/Sub or similar. type Broadcaster interface { // Publish sends a message to all pods. Publish(ctx context.Context, msg *Message) error // Run starts the subscriber loop (blocks until context is cancelled). Run(ctx context.Context) error } // MessageType constants for common message types. const ( MessageTypeChat = "chat" MessageTypePresence = "presence" MessageTypeNotification = "notification" MessageTypeSystem = "system" MessageTypeError = "error" MessageTypePing = "ping" MessageTypePong = "pong" ) // PresenceStatus constants for presence messages. const ( PresenceOnline = "online" PresenceOffline = "offline" PresenceAway = "away" ) // PresenceData represents presence change data. type PresenceData struct { Status string `json:"status"` UserID string `json:"user_id"` } // JoinLeaveData represents room join/leave data. type JoinLeaveData struct { Room string `json:"room"` UserID string `json:"user_id"` }