persona-community-2/pkg/realtime/redis.go
jordan cb3d4d5786
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
ci/woodpecker/manual/woodpecker Pipeline was successful
Initialize project from skeleton template
2026-02-23 10:53:55 +00:00

167 lines
3.9 KiB
Go

package realtime
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
"git.threesix.ai/jordan/persona-community-2/pkg/logging"
)
const (
// channelPrefix is the prefix for Redis pub/sub channels.
channelPrefix = "realtime:"
// globalChannel is the channel for messages without a specific room.
globalChannel = channelPrefix + "global"
)
// RedisBroadcaster distributes messages across multiple pods via Redis Pub/Sub.
type RedisBroadcaster struct {
client *redis.Client
hub Hub
logger *logging.Logger
podID string // Unique identifier for this pod (to prevent echo)
}
// Ensure RedisBroadcaster implements Broadcaster at compile time.
var _ Broadcaster = (*RedisBroadcaster)(nil)
// NewRedisBroadcaster creates a broadcaster backed by Redis Pub/Sub.
func NewRedisBroadcaster(client *redis.Client, hub Hub, logger *logging.Logger) *RedisBroadcaster {
return &RedisBroadcaster{
client: client,
hub: hub,
logger: logger.WithComponent("redis-broadcaster"),
podID: uuid.New().String(),
}
}
// redisMessage wraps a Message with origin metadata.
type redisMessage struct {
PodID string `json:"pod_id"`
Message *Message `json:"message"`
}
// Publish sends a message to all pods via Redis.
func (b *RedisBroadcaster) Publish(ctx context.Context, msg *Message) error {
// Set message ID if not set
if msg.ID == "" {
msg.ID = uuid.New().String()
}
// Wrap with pod ID to prevent echo
wrapped := redisMessage{
PodID: b.podID,
Message: msg,
}
data, err := json.Marshal(wrapped)
if err != nil {
return fmt.Errorf("marshal message: %w", err)
}
// Determine channel
channel := globalChannel
if msg.Room != "" {
channel = channelPrefix + "room:" + msg.Room
}
if err := b.client.Publish(ctx, channel, data).Err(); err != nil {
return fmt.Errorf("publish to redis: %w", err)
}
b.logger.Debug("published message to redis",
"channel", channel,
"message_id", msg.ID,
"type", msg.Type,
)
return nil
}
// Run starts the Redis subscriber loop.
// Subscribes to all room channels and fans out to local hub.
func (b *RedisBroadcaster) Run(ctx context.Context) error {
b.logger.Info("redis broadcaster started", "pod_id", b.podID)
defer b.logger.Info("redis broadcaster stopped")
// Subscribe to global channel and room pattern
pubsub := b.client.PSubscribe(ctx, globalChannel, channelPrefix+"room:*")
defer func() { _ = pubsub.Close() }()
// Wait for subscription confirmation
_, err := pubsub.Receive(ctx)
if err != nil {
return fmt.Errorf("subscribe to redis: %w", err)
}
ch := pubsub.Channel()
for {
select {
case <-ctx.Done():
return ctx.Err()
case redisMsg, ok := <-ch:
if !ok {
return nil // Channel closed
}
b.handleRedisMessage(redisMsg)
}
}
}
// handleRedisMessage processes a message from Redis.
func (b *RedisBroadcaster) handleRedisMessage(redisMsg *redis.Message) {
var wrapped redisMessage
if err := json.Unmarshal([]byte(redisMsg.Payload), &wrapped); err != nil {
b.logger.Warn("invalid redis message", "error", err)
return
}
// Skip messages from this pod (prevent echo)
if wrapped.PodID == b.podID {
return
}
msg := wrapped.Message
if msg == nil {
return
}
b.logger.Debug("received message from redis",
"channel", redisMsg.Channel,
"message_id", msg.ID,
"type", msg.Type,
"from_pod", wrapped.PodID,
)
// Broadcast to local hub
b.hub.Broadcast(msg)
}
// RoomChannel returns the Redis channel name for a room.
func RoomChannel(room string) string {
if room == "" {
return globalChannel
}
return channelPrefix + "room:" + room
}
// HealthCheck verifies Redis connectivity.
func (b *RedisBroadcaster) HealthCheck(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
return b.client.Ping(ctx).Err()
}
// Close closes the Redis client.
func (b *RedisBroadcaster) Close() error {
return b.client.Close()
}