150 lines
4.2 KiB
Go
150 lines
4.2 KiB
Go
// Package realtime provides WebSocket communication with Redis-backed broadcasting.
|
|
//
|
|
// This package enables real-time bidirectional communication with:
|
|
// - WebSocket connections with automatic ping/pong heartbeat
|
|
// - Room-based message grouping
|
|
// - Cross-pod broadcasting via Redis Pub/Sub
|
|
// - Graceful connection lifecycle management
|
|
//
|
|
// Usage:
|
|
//
|
|
// // Create a hub (local connection registry)
|
|
// hub := realtime.NewHub(logger)
|
|
// go hub.Run(ctx)
|
|
//
|
|
// // Optional: Add Redis for multi-pod scaling
|
|
// redisBroadcaster := realtime.NewRedisBroadcaster(redisClient, hub, logger)
|
|
// go redisBroadcaster.Run(ctx)
|
|
//
|
|
// // Mount the WebSocket handler
|
|
// handler := realtime.NewHandler(hub, logger, realtime.HandlerConfig{
|
|
// Broadcaster: redisBroadcaster, // nil for single-pod
|
|
// })
|
|
// r.Mount("/ws", handler.Routes())
|
|
package realtime
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"time"
|
|
)
|
|
|
|
// Message represents a real-time message sent between clients.
|
|
type Message struct {
|
|
// ID is a unique identifier for the message (optional, set by broadcaster).
|
|
ID string `json:"id,omitempty"`
|
|
|
|
// Type identifies the message kind (e.g., "chat", "notification", "presence").
|
|
Type string `json:"type"`
|
|
|
|
// Room is the target room for the message (empty for direct messages).
|
|
Room string `json:"room,omitempty"`
|
|
|
|
// From is the sender's client ID (set by server).
|
|
From string `json:"from,omitempty"`
|
|
|
|
// Data contains the message payload.
|
|
Data json.RawMessage `json:"data,omitempty"`
|
|
|
|
// Timestamp is when the message was created.
|
|
Timestamp time.Time `json:"timestamp"`
|
|
}
|
|
|
|
// SystemMessage creates a system-generated message.
|
|
func SystemMessage(msgType string, data any) (*Message, error) {
|
|
dataBytes, err := json.Marshal(data)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &Message{
|
|
Type: msgType,
|
|
From: "system",
|
|
Data: dataBytes,
|
|
Timestamp: time.Now().UTC(),
|
|
}, nil
|
|
}
|
|
|
|
// Connection represents a connected WebSocket client.
|
|
type Connection interface {
|
|
// ID returns the unique connection identifier.
|
|
ID() string
|
|
|
|
// UserID returns the authenticated user ID (empty if anonymous).
|
|
UserID() string
|
|
|
|
// Send queues a message for delivery to this connection.
|
|
// Returns false if the connection is closed or send buffer is full.
|
|
Send(msg *Message) bool
|
|
|
|
// Close gracefully closes the connection.
|
|
Close()
|
|
}
|
|
|
|
// Hub manages active connections and room memberships.
|
|
type Hub interface {
|
|
// Register adds a connection to the hub.
|
|
Register(conn Connection)
|
|
|
|
// Unregister removes a connection from the hub.
|
|
Unregister(conn Connection)
|
|
|
|
// JoinRoom adds a connection to a room.
|
|
JoinRoom(conn Connection, room string)
|
|
|
|
// LeaveRoom removes a connection from a room.
|
|
LeaveRoom(conn Connection, room string)
|
|
|
|
// Broadcast sends a message to all connections in a room.
|
|
// If room is empty, broadcasts to all connections.
|
|
Broadcast(msg *Message)
|
|
|
|
// SendTo sends a message to a specific connection by ID.
|
|
SendTo(connID string, msg *Message) bool
|
|
|
|
// ConnectionCount returns the total number of active connections.
|
|
ConnectionCount() int
|
|
|
|
// RoomCount returns the number of connections in a specific room.
|
|
RoomCount(room string) int
|
|
}
|
|
|
|
// Broadcaster handles cross-pod message distribution.
|
|
// Implementations typically use Redis Pub/Sub or similar.
|
|
type Broadcaster interface {
|
|
// Publish sends a message to all pods.
|
|
Publish(ctx context.Context, msg *Message) error
|
|
|
|
// Run starts the subscriber loop (blocks until context is cancelled).
|
|
Run(ctx context.Context) error
|
|
}
|
|
|
|
// MessageType constants for common message types.
|
|
const (
|
|
MessageTypeChat = "chat"
|
|
MessageTypePresence = "presence"
|
|
MessageTypeNotification = "notification"
|
|
MessageTypeSystem = "system"
|
|
MessageTypeError = "error"
|
|
MessageTypePing = "ping"
|
|
MessageTypePong = "pong"
|
|
)
|
|
|
|
// PresenceStatus constants for presence messages.
|
|
const (
|
|
PresenceOnline = "online"
|
|
PresenceOffline = "offline"
|
|
PresenceAway = "away"
|
|
)
|
|
|
|
// PresenceData represents presence change data.
|
|
type PresenceData struct {
|
|
Status string `json:"status"`
|
|
UserID string `json:"user_id"`
|
|
}
|
|
|
|
// JoinLeaveData represents room join/leave data.
|
|
type JoinLeaveData struct {
|
|
Room string `json:"room"`
|
|
UserID string `json:"user_id"`
|
|
}
|