167 lines
3.9 KiB
Go
167 lines
3.9 KiB
Go
package realtime
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/redis/go-redis/v9"
|
|
|
|
"git.threesix.ai/jordan/slack-final-1770280107/pkg/logging"
|
|
)
|
|
|
|
const (
|
|
// channelPrefix is the prefix for Redis pub/sub channels.
|
|
channelPrefix = "realtime:"
|
|
|
|
// globalChannel is the channel for messages without a specific room.
|
|
globalChannel = channelPrefix + "global"
|
|
)
|
|
|
|
// RedisBroadcaster distributes messages across multiple pods via Redis Pub/Sub.
|
|
type RedisBroadcaster struct {
|
|
client *redis.Client
|
|
hub Hub
|
|
logger *logging.Logger
|
|
podID string // Unique identifier for this pod (to prevent echo)
|
|
}
|
|
|
|
// Ensure RedisBroadcaster implements Broadcaster at compile time.
|
|
var _ Broadcaster = (*RedisBroadcaster)(nil)
|
|
|
|
// NewRedisBroadcaster creates a broadcaster backed by Redis Pub/Sub.
|
|
func NewRedisBroadcaster(client *redis.Client, hub Hub, logger *logging.Logger) *RedisBroadcaster {
|
|
return &RedisBroadcaster{
|
|
client: client,
|
|
hub: hub,
|
|
logger: logger.WithComponent("redis-broadcaster"),
|
|
podID: uuid.New().String(),
|
|
}
|
|
}
|
|
|
|
// redisMessage wraps a Message with origin metadata.
|
|
type redisMessage struct {
|
|
PodID string `json:"pod_id"`
|
|
Message *Message `json:"message"`
|
|
}
|
|
|
|
// Publish sends a message to all pods via Redis.
|
|
func (b *RedisBroadcaster) Publish(ctx context.Context, msg *Message) error {
|
|
// Set message ID if not set
|
|
if msg.ID == "" {
|
|
msg.ID = uuid.New().String()
|
|
}
|
|
|
|
// Wrap with pod ID to prevent echo
|
|
wrapped := redisMessage{
|
|
PodID: b.podID,
|
|
Message: msg,
|
|
}
|
|
|
|
data, err := json.Marshal(wrapped)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal message: %w", err)
|
|
}
|
|
|
|
// Determine channel
|
|
channel := globalChannel
|
|
if msg.Room != "" {
|
|
channel = channelPrefix + "room:" + msg.Room
|
|
}
|
|
|
|
if err := b.client.Publish(ctx, channel, data).Err(); err != nil {
|
|
return fmt.Errorf("publish to redis: %w", err)
|
|
}
|
|
|
|
b.logger.Debug("published message to redis",
|
|
"channel", channel,
|
|
"message_id", msg.ID,
|
|
"type", msg.Type,
|
|
)
|
|
return nil
|
|
}
|
|
|
|
// Run starts the Redis subscriber loop.
|
|
// Subscribes to all room channels and fans out to local hub.
|
|
func (b *RedisBroadcaster) Run(ctx context.Context) error {
|
|
b.logger.Info("redis broadcaster started", "pod_id", b.podID)
|
|
defer b.logger.Info("redis broadcaster stopped")
|
|
|
|
// Subscribe to global channel and room pattern
|
|
pubsub := b.client.PSubscribe(ctx, globalChannel, channelPrefix+"room:*")
|
|
defer func() { _ = pubsub.Close() }()
|
|
|
|
// Wait for subscription confirmation
|
|
_, err := pubsub.Receive(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("subscribe to redis: %w", err)
|
|
}
|
|
|
|
ch := pubsub.Channel()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
|
|
case redisMsg, ok := <-ch:
|
|
if !ok {
|
|
return nil // Channel closed
|
|
}
|
|
|
|
b.handleRedisMessage(redisMsg)
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleRedisMessage processes a message from Redis.
|
|
func (b *RedisBroadcaster) handleRedisMessage(redisMsg *redis.Message) {
|
|
var wrapped redisMessage
|
|
if err := json.Unmarshal([]byte(redisMsg.Payload), &wrapped); err != nil {
|
|
b.logger.Warn("invalid redis message", "error", err)
|
|
return
|
|
}
|
|
|
|
// Skip messages from this pod (prevent echo)
|
|
if wrapped.PodID == b.podID {
|
|
return
|
|
}
|
|
|
|
msg := wrapped.Message
|
|
if msg == nil {
|
|
return
|
|
}
|
|
|
|
b.logger.Debug("received message from redis",
|
|
"channel", redisMsg.Channel,
|
|
"message_id", msg.ID,
|
|
"type", msg.Type,
|
|
"from_pod", wrapped.PodID,
|
|
)
|
|
|
|
// Broadcast to local hub
|
|
b.hub.Broadcast(msg)
|
|
}
|
|
|
|
// RoomChannel returns the Redis channel name for a room.
|
|
func RoomChannel(room string) string {
|
|
if room == "" {
|
|
return globalChannel
|
|
}
|
|
return channelPrefix + "room:" + room
|
|
}
|
|
|
|
// HealthCheck verifies Redis connectivity.
|
|
func (b *RedisBroadcaster) HealthCheck(ctx context.Context) error {
|
|
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
defer cancel()
|
|
return b.client.Ping(ctx).Err()
|
|
}
|
|
|
|
// Close closes the Redis client.
|
|
func (b *RedisBroadcaster) Close() error {
|
|
return b.client.Close()
|
|
}
|