foundary-test-1770773605/pkg/realtime/hub.go
jordan 75cd7c10d8
Some checks failed
ci/woodpecker/manual/woodpecker Pipeline was successful
ci/woodpecker/push/woodpecker Pipeline failed
Initialize project from skeleton template
2026-02-11 01:34:24 +00:00

238 lines
5.3 KiB
Go

package realtime
import (
"context"
"sync"
"git.threesix.ai/jordan/foundary-test-1770773605/pkg/logging"
)
// LocalHub is an in-memory implementation of Hub.
// It manages connections and rooms on a single pod.
// For multi-pod deployments, pair with RedisBroadcaster.
type LocalHub struct {
logger *logging.Logger
// connections maps client ID to connection
connections map[string]Connection
// rooms maps room name to set of client IDs
rooms map[string]map[string]struct{}
// clientRooms maps client ID to set of rooms (for cleanup on disconnect)
clientRooms map[string]map[string]struct{}
mu sync.RWMutex
// Channels for goroutine-safe operations
registerCh chan Connection
unregisterCh chan Connection
broadcastCh chan *Message
}
// Ensure LocalHub implements Hub at compile time.
var _ Hub = (*LocalHub)(nil)
// NewHub creates a new in-memory hub.
func NewHub(logger *logging.Logger) *LocalHub {
return &LocalHub{
logger: logger.WithComponent("hub"),
connections: make(map[string]Connection),
rooms: make(map[string]map[string]struct{}),
clientRooms: make(map[string]map[string]struct{}),
registerCh: make(chan Connection, 256),
unregisterCh: make(chan Connection, 256),
broadcastCh: make(chan *Message, 256),
}
}
// Run starts the hub's event loop. Call this in a goroutine.
func (h *LocalHub) Run(ctx context.Context) {
h.logger.Info("hub started")
defer h.logger.Info("hub stopped")
for {
select {
case <-ctx.Done():
h.closeAllConnections()
return
case conn := <-h.registerCh:
h.doRegister(conn)
case conn := <-h.unregisterCh:
h.doUnregister(conn)
case msg := <-h.broadcastCh:
h.doBroadcast(msg)
}
}
}
// Register adds a connection to the hub.
func (h *LocalHub) Register(conn Connection) {
h.registerCh <- conn
}
// Unregister removes a connection from the hub.
func (h *LocalHub) Unregister(conn Connection) {
h.unregisterCh <- conn
}
// JoinRoom adds a connection to a room.
func (h *LocalHub) JoinRoom(conn Connection, room string) {
h.mu.Lock()
defer h.mu.Unlock()
// Add to room
if h.rooms[room] == nil {
h.rooms[room] = make(map[string]struct{})
}
h.rooms[room][conn.ID()] = struct{}{}
// Track client's rooms for cleanup
if h.clientRooms[conn.ID()] == nil {
h.clientRooms[conn.ID()] = make(map[string]struct{})
}
h.clientRooms[conn.ID()][room] = struct{}{}
h.logger.Debug("client joined room",
"client_id", conn.ID(),
"room", room,
"room_size", len(h.rooms[room]),
)
}
// LeaveRoom removes a connection from a room.
func (h *LocalHub) LeaveRoom(conn Connection, room string) {
h.mu.Lock()
defer h.mu.Unlock()
if clients, ok := h.rooms[room]; ok {
delete(clients, conn.ID())
if len(clients) == 0 {
delete(h.rooms, room)
}
}
if rooms, ok := h.clientRooms[conn.ID()]; ok {
delete(rooms, room)
}
h.logger.Debug("client left room",
"client_id", conn.ID(),
"room", room,
)
}
// Broadcast sends a message to all connections in a room.
func (h *LocalHub) Broadcast(msg *Message) {
h.broadcastCh <- msg
}
// SendTo sends a message to a specific connection.
func (h *LocalHub) SendTo(connID string, msg *Message) bool {
h.mu.RLock()
conn, ok := h.connections[connID]
h.mu.RUnlock()
if !ok {
return false
}
return conn.Send(msg)
}
// ConnectionCount returns the total number of active connections.
func (h *LocalHub) ConnectionCount() int {
h.mu.RLock()
defer h.mu.RUnlock()
return len(h.connections)
}
// RoomCount returns the number of connections in a specific room.
func (h *LocalHub) RoomCount(room string) int {
h.mu.RLock()
defer h.mu.RUnlock()
return len(h.rooms[room])
}
// doRegister handles registration (called from event loop).
func (h *LocalHub) doRegister(conn Connection) {
h.mu.Lock()
h.connections[conn.ID()] = conn
h.mu.Unlock()
h.logger.Debug("client registered",
"client_id", conn.ID(),
"user_id", conn.UserID(),
"total_connections", h.ConnectionCount(),
)
}
// doUnregister handles unregistration (called from event loop).
func (h *LocalHub) doUnregister(conn Connection) {
h.mu.Lock()
defer h.mu.Unlock()
if _, ok := h.connections[conn.ID()]; !ok {
return // Already unregistered
}
// Remove from all rooms
if rooms, ok := h.clientRooms[conn.ID()]; ok {
for room := range rooms {
if clients, ok := h.rooms[room]; ok {
delete(clients, conn.ID())
if len(clients) == 0 {
delete(h.rooms, room)
}
}
}
delete(h.clientRooms, conn.ID())
}
delete(h.connections, conn.ID())
h.logger.Debug("client unregistered",
"client_id", conn.ID(),
"total_connections", len(h.connections),
)
}
// doBroadcast handles broadcasting (called from event loop).
func (h *LocalHub) doBroadcast(msg *Message) {
h.mu.RLock()
defer h.mu.RUnlock()
if msg.Room != "" {
// Room broadcast
clients, ok := h.rooms[msg.Room]
if !ok {
return
}
for clientID := range clients {
if conn, ok := h.connections[clientID]; ok {
conn.Send(msg)
}
}
} else {
// Global broadcast
for _, conn := range h.connections {
conn.Send(msg)
}
}
}
// closeAllConnections closes all connections on shutdown.
func (h *LocalHub) closeAllConnections() {
h.mu.Lock()
defer h.mu.Unlock()
for _, conn := range h.connections {
conn.Close()
}
h.connections = make(map[string]Connection)
h.rooms = make(map[string]map[string]struct{})
h.clientRooms = make(map[string]map[string]struct{})
}