238 lines
5.3 KiB
Go
238 lines
5.3 KiB
Go
package realtime
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
|
|
"git.threesix.ai/jordan/slate-verify-1770510662/pkg/logging"
|
|
)
|
|
|
|
// LocalHub is an in-memory implementation of Hub.
|
|
// It manages connections and rooms on a single pod.
|
|
// For multi-pod deployments, pair with RedisBroadcaster.
|
|
type LocalHub struct {
|
|
logger *logging.Logger
|
|
|
|
// connections maps client ID to connection
|
|
connections map[string]Connection
|
|
|
|
// rooms maps room name to set of client IDs
|
|
rooms map[string]map[string]struct{}
|
|
|
|
// clientRooms maps client ID to set of rooms (for cleanup on disconnect)
|
|
clientRooms map[string]map[string]struct{}
|
|
|
|
mu sync.RWMutex
|
|
|
|
// Channels for goroutine-safe operations
|
|
registerCh chan Connection
|
|
unregisterCh chan Connection
|
|
broadcastCh chan *Message
|
|
}
|
|
|
|
// Ensure LocalHub implements Hub at compile time.
|
|
var _ Hub = (*LocalHub)(nil)
|
|
|
|
// NewHub creates a new in-memory hub.
|
|
func NewHub(logger *logging.Logger) *LocalHub {
|
|
return &LocalHub{
|
|
logger: logger.WithComponent("hub"),
|
|
connections: make(map[string]Connection),
|
|
rooms: make(map[string]map[string]struct{}),
|
|
clientRooms: make(map[string]map[string]struct{}),
|
|
registerCh: make(chan Connection, 256),
|
|
unregisterCh: make(chan Connection, 256),
|
|
broadcastCh: make(chan *Message, 256),
|
|
}
|
|
}
|
|
|
|
// Run starts the hub's event loop. Call this in a goroutine.
|
|
func (h *LocalHub) Run(ctx context.Context) {
|
|
h.logger.Info("hub started")
|
|
defer h.logger.Info("hub stopped")
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
h.closeAllConnections()
|
|
return
|
|
|
|
case conn := <-h.registerCh:
|
|
h.doRegister(conn)
|
|
|
|
case conn := <-h.unregisterCh:
|
|
h.doUnregister(conn)
|
|
|
|
case msg := <-h.broadcastCh:
|
|
h.doBroadcast(msg)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Register adds a connection to the hub.
|
|
func (h *LocalHub) Register(conn Connection) {
|
|
h.registerCh <- conn
|
|
}
|
|
|
|
// Unregister removes a connection from the hub.
|
|
func (h *LocalHub) Unregister(conn Connection) {
|
|
h.unregisterCh <- conn
|
|
}
|
|
|
|
// JoinRoom adds a connection to a room.
|
|
func (h *LocalHub) JoinRoom(conn Connection, room string) {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
|
|
// Add to room
|
|
if h.rooms[room] == nil {
|
|
h.rooms[room] = make(map[string]struct{})
|
|
}
|
|
h.rooms[room][conn.ID()] = struct{}{}
|
|
|
|
// Track client's rooms for cleanup
|
|
if h.clientRooms[conn.ID()] == nil {
|
|
h.clientRooms[conn.ID()] = make(map[string]struct{})
|
|
}
|
|
h.clientRooms[conn.ID()][room] = struct{}{}
|
|
|
|
h.logger.Debug("client joined room",
|
|
"client_id", conn.ID(),
|
|
"room", room,
|
|
"room_size", len(h.rooms[room]),
|
|
)
|
|
}
|
|
|
|
// LeaveRoom removes a connection from a room.
|
|
func (h *LocalHub) LeaveRoom(conn Connection, room string) {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
|
|
if clients, ok := h.rooms[room]; ok {
|
|
delete(clients, conn.ID())
|
|
if len(clients) == 0 {
|
|
delete(h.rooms, room)
|
|
}
|
|
}
|
|
|
|
if rooms, ok := h.clientRooms[conn.ID()]; ok {
|
|
delete(rooms, room)
|
|
}
|
|
|
|
h.logger.Debug("client left room",
|
|
"client_id", conn.ID(),
|
|
"room", room,
|
|
)
|
|
}
|
|
|
|
// Broadcast sends a message to all connections in a room.
|
|
func (h *LocalHub) Broadcast(msg *Message) {
|
|
h.broadcastCh <- msg
|
|
}
|
|
|
|
// SendTo sends a message to a specific connection.
|
|
func (h *LocalHub) SendTo(connID string, msg *Message) bool {
|
|
h.mu.RLock()
|
|
conn, ok := h.connections[connID]
|
|
h.mu.RUnlock()
|
|
|
|
if !ok {
|
|
return false
|
|
}
|
|
return conn.Send(msg)
|
|
}
|
|
|
|
// ConnectionCount returns the total number of active connections.
|
|
func (h *LocalHub) ConnectionCount() int {
|
|
h.mu.RLock()
|
|
defer h.mu.RUnlock()
|
|
return len(h.connections)
|
|
}
|
|
|
|
// RoomCount returns the number of connections in a specific room.
|
|
func (h *LocalHub) RoomCount(room string) int {
|
|
h.mu.RLock()
|
|
defer h.mu.RUnlock()
|
|
return len(h.rooms[room])
|
|
}
|
|
|
|
// doRegister handles registration (called from event loop).
|
|
func (h *LocalHub) doRegister(conn Connection) {
|
|
h.mu.Lock()
|
|
h.connections[conn.ID()] = conn
|
|
h.mu.Unlock()
|
|
|
|
h.logger.Debug("client registered",
|
|
"client_id", conn.ID(),
|
|
"user_id", conn.UserID(),
|
|
"total_connections", h.ConnectionCount(),
|
|
)
|
|
}
|
|
|
|
// doUnregister handles unregistration (called from event loop).
|
|
func (h *LocalHub) doUnregister(conn Connection) {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
|
|
if _, ok := h.connections[conn.ID()]; !ok {
|
|
return // Already unregistered
|
|
}
|
|
|
|
// Remove from all rooms
|
|
if rooms, ok := h.clientRooms[conn.ID()]; ok {
|
|
for room := range rooms {
|
|
if clients, ok := h.rooms[room]; ok {
|
|
delete(clients, conn.ID())
|
|
if len(clients) == 0 {
|
|
delete(h.rooms, room)
|
|
}
|
|
}
|
|
}
|
|
delete(h.clientRooms, conn.ID())
|
|
}
|
|
|
|
delete(h.connections, conn.ID())
|
|
|
|
h.logger.Debug("client unregistered",
|
|
"client_id", conn.ID(),
|
|
"total_connections", len(h.connections),
|
|
)
|
|
}
|
|
|
|
// doBroadcast handles broadcasting (called from event loop).
|
|
func (h *LocalHub) doBroadcast(msg *Message) {
|
|
h.mu.RLock()
|
|
defer h.mu.RUnlock()
|
|
|
|
if msg.Room != "" {
|
|
// Room broadcast
|
|
clients, ok := h.rooms[msg.Room]
|
|
if !ok {
|
|
return
|
|
}
|
|
for clientID := range clients {
|
|
if conn, ok := h.connections[clientID]; ok {
|
|
conn.Send(msg)
|
|
}
|
|
}
|
|
} else {
|
|
// Global broadcast
|
|
for _, conn := range h.connections {
|
|
conn.Send(msg)
|
|
}
|
|
}
|
|
}
|
|
|
|
// closeAllConnections closes all connections on shutdown.
|
|
func (h *LocalHub) closeAllConnections() {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
|
|
for _, conn := range h.connections {
|
|
conn.Close()
|
|
}
|
|
h.connections = make(map[string]Connection)
|
|
h.rooms = make(map[string]map[string]struct{})
|
|
h.clientRooms = make(map[string]map[string]struct{})
|
|
}
|