slack-auth-1770277926/pkg/realtime/realtime.go
jordan 611b9e7aae
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
ci/woodpecker/manual/woodpecker Pipeline was successful
Initialize project from skeleton template
2026-02-05 07:52:07 +00:00

150 lines
4.2 KiB
Go

// Package realtime provides WebSocket communication with Redis-backed broadcasting.
//
// This package enables real-time bidirectional communication with:
// - WebSocket connections with automatic ping/pong heartbeat
// - Room-based message grouping
// - Cross-pod broadcasting via Redis Pub/Sub
// - Graceful connection lifecycle management
//
// Usage:
//
// // Create a hub (local connection registry)
// hub := realtime.NewHub(logger)
// go hub.Run(ctx)
//
// // Optional: Add Redis for multi-pod scaling
// redisBroadcaster := realtime.NewRedisBroadcaster(redisClient, hub, logger)
// go redisBroadcaster.Run(ctx)
//
// // Mount the WebSocket handler
// handler := realtime.NewHandler(hub, logger, realtime.HandlerConfig{
// Broadcaster: redisBroadcaster, // nil for single-pod
// })
// r.Mount("/ws", handler.Routes())
package realtime
import (
"context"
"encoding/json"
"time"
)
// Message represents a real-time message sent between clients.
type Message struct {
// ID is a unique identifier for the message (optional, set by broadcaster).
ID string `json:"id,omitempty"`
// Type identifies the message kind (e.g., "chat", "notification", "presence").
Type string `json:"type"`
// Room is the target room for the message (empty for direct messages).
Room string `json:"room,omitempty"`
// From is the sender's client ID (set by server).
From string `json:"from,omitempty"`
// Data contains the message payload.
Data json.RawMessage `json:"data,omitempty"`
// Timestamp is when the message was created.
Timestamp time.Time `json:"timestamp"`
}
// SystemMessage creates a system-generated message.
func SystemMessage(msgType string, data any) (*Message, error) {
dataBytes, err := json.Marshal(data)
if err != nil {
return nil, err
}
return &Message{
Type: msgType,
From: "system",
Data: dataBytes,
Timestamp: time.Now().UTC(),
}, nil
}
// Connection represents a connected WebSocket client.
type Connection interface {
// ID returns the unique connection identifier.
ID() string
// UserID returns the authenticated user ID (empty if anonymous).
UserID() string
// Send queues a message for delivery to this connection.
// Returns false if the connection is closed or send buffer is full.
Send(msg *Message) bool
// Close gracefully closes the connection.
Close()
}
// Hub manages active connections and room memberships.
type Hub interface {
// Register adds a connection to the hub.
Register(conn Connection)
// Unregister removes a connection from the hub.
Unregister(conn Connection)
// JoinRoom adds a connection to a room.
JoinRoom(conn Connection, room string)
// LeaveRoom removes a connection from a room.
LeaveRoom(conn Connection, room string)
// Broadcast sends a message to all connections in a room.
// If room is empty, broadcasts to all connections.
Broadcast(msg *Message)
// SendTo sends a message to a specific connection by ID.
SendTo(connID string, msg *Message) bool
// ConnectionCount returns the total number of active connections.
ConnectionCount() int
// RoomCount returns the number of connections in a specific room.
RoomCount(room string) int
}
// Broadcaster handles cross-pod message distribution.
// Implementations typically use Redis Pub/Sub or similar.
type Broadcaster interface {
// Publish sends a message to all pods.
Publish(ctx context.Context, msg *Message) error
// Run starts the subscriber loop (blocks until context is cancelled).
Run(ctx context.Context) error
}
// MessageType constants for common message types.
const (
MessageTypeChat = "chat"
MessageTypePresence = "presence"
MessageTypeNotification = "notification"
MessageTypeSystem = "system"
MessageTypeError = "error"
MessageTypePing = "ping"
MessageTypePong = "pong"
)
// PresenceStatus constants for presence messages.
const (
PresenceOnline = "online"
PresenceOffline = "offline"
PresenceAway = "away"
)
// PresenceData represents presence change data.
type PresenceData struct {
Status string `json:"status"`
UserID string `json:"user_id"`
}
// JoinLeaveData represents room join/leave data.
type JoinLeaveData struct {
Room string `json:"room"`
UserID string `json:"user_id"`
}