package realtime import ( "context" "encoding/json" "fmt" "time" "github.com/google/uuid" "github.com/redis/go-redis/v9" "git.threesix.ai/jordan/sp4-fresh/pkg/logging" ) const ( // channelPrefix is the prefix for Redis pub/sub channels. channelPrefix = "realtime:" // globalChannel is the channel for messages without a specific room. globalChannel = channelPrefix + "global" ) // RedisBroadcaster distributes messages across multiple pods via Redis Pub/Sub. type RedisBroadcaster struct { client *redis.Client hub Hub logger *logging.Logger podID string // Unique identifier for this pod (to prevent echo) } // Ensure RedisBroadcaster implements Broadcaster at compile time. var _ Broadcaster = (*RedisBroadcaster)(nil) // NewRedisBroadcaster creates a broadcaster backed by Redis Pub/Sub. func NewRedisBroadcaster(client *redis.Client, hub Hub, logger *logging.Logger) *RedisBroadcaster { return &RedisBroadcaster{ client: client, hub: hub, logger: logger.WithComponent("redis-broadcaster"), podID: uuid.New().String(), } } // redisMessage wraps a Message with origin metadata. type redisMessage struct { PodID string `json:"pod_id"` Message *Message `json:"message"` } // Publish sends a message to all pods via Redis. func (b *RedisBroadcaster) Publish(ctx context.Context, msg *Message) error { // Set message ID if not set if msg.ID == "" { msg.ID = uuid.New().String() } // Wrap with pod ID to prevent echo wrapped := redisMessage{ PodID: b.podID, Message: msg, } data, err := json.Marshal(wrapped) if err != nil { return fmt.Errorf("marshal message: %w", err) } // Determine channel channel := globalChannel if msg.Room != "" { channel = channelPrefix + "room:" + msg.Room } if err := b.client.Publish(ctx, channel, data).Err(); err != nil { return fmt.Errorf("publish to redis: %w", err) } b.logger.Debug("published message to redis", "channel", channel, "message_id", msg.ID, "type", msg.Type, ) return nil } // Run starts the Redis subscriber loop. // Subscribes to all room channels and fans out to local hub. func (b *RedisBroadcaster) Run(ctx context.Context) error { b.logger.Info("redis broadcaster started", "pod_id", b.podID) defer b.logger.Info("redis broadcaster stopped") // Subscribe to global channel and room pattern pubsub := b.client.PSubscribe(ctx, globalChannel, channelPrefix+"room:*") defer func() { _ = pubsub.Close() }() // Wait for subscription confirmation _, err := pubsub.Receive(ctx) if err != nil { return fmt.Errorf("subscribe to redis: %w", err) } ch := pubsub.Channel() for { select { case <-ctx.Done(): return ctx.Err() case redisMsg, ok := <-ch: if !ok { return nil // Channel closed } b.handleRedisMessage(redisMsg) } } } // handleRedisMessage processes a message from Redis. func (b *RedisBroadcaster) handleRedisMessage(redisMsg *redis.Message) { var wrapped redisMessage if err := json.Unmarshal([]byte(redisMsg.Payload), &wrapped); err != nil { b.logger.Warn("invalid redis message", "error", err) return } // Skip messages from this pod (prevent echo) if wrapped.PodID == b.podID { return } msg := wrapped.Message if msg == nil { return } b.logger.Debug("received message from redis", "channel", redisMsg.Channel, "message_id", msg.ID, "type", msg.Type, "from_pod", wrapped.PodID, ) // Broadcast to local hub b.hub.Broadcast(msg) } // RoomChannel returns the Redis channel name for a room. func RoomChannel(room string) string { if room == "" { return globalChannel } return channelPrefix + "room:" + room } // HealthCheck verifies Redis connectivity. func (b *RedisBroadcaster) HealthCheck(ctx context.Context) error { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() return b.client.Ping(ctx).Err() } // Close closes the Redis client. func (b *RedisBroadcaster) Close() error { return b.client.Close() }