persona-community-2/pkg/realtime/sse_redis.go
jordan cb3d4d5786
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
ci/woodpecker/manual/woodpecker Pipeline was successful
Initialize project from skeleton template
2026-02-23 10:53:55 +00:00

98 lines
2.7 KiB
Go

package realtime
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"strings"
"github.com/redis/go-redis/v9"
)
const sseChannelPrefix = "sse:"
// SSEPublisher publishes SSE events to Redis for cross-process delivery.
// Used by the worker to send events that reach SSE-connected clients via the service.
type SSEPublisher struct {
client *redis.Client
logger *slog.Logger
}
// NewSSEPublisher creates a publisher that sends SSE events via Redis pub/sub.
func NewSSEPublisher(client *redis.Client, logger *slog.Logger) *SSEPublisher {
if logger == nil {
logger = slog.Default()
}
return &SSEPublisher{
client: client,
logger: logger,
}
}
// SendToUser publishes an SSE event to a user-specific channel.
// The service's SSE subscriber picks this up and delivers to the connected client.
func (p *SSEPublisher) SendToUser(userID string, event *SSEEvent) error {
return p.SendToChannel("user:"+userID, event)
}
// SendToChannel publishes an SSE event to a named channel via Redis.
func (p *SSEPublisher) SendToChannel(channel string, event *SSEEvent) error {
data, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("marshal SSE event: %w", err)
}
redisChannel := sseChannelPrefix + channel
if err := p.client.Publish(context.Background(), redisChannel, data).Err(); err != nil {
p.logger.Error("failed to publish SSE event to Redis",
"channel", channel, "event_type", event.Type, "error", err)
return fmt.Errorf("publish SSE event: %w", err)
}
p.logger.Debug("published SSE event to Redis",
"channel", channel, "event_type", event.Type)
return nil
}
// RunSSESubscriber subscribes to all SSE Redis channels and forwards events
// to the local SSEHub for delivery to connected clients.
// Blocks until ctx is cancelled.
func RunSSESubscriber(ctx context.Context, client *redis.Client, hub *SSEHub, logger *slog.Logger) error {
if logger == nil {
logger = slog.Default()
}
// PSubscribe to all SSE channels
pubsub := client.PSubscribe(ctx, sseChannelPrefix+"*")
defer pubsub.Close()
logger.Info("SSE Redis subscriber started", "pattern", sseChannelPrefix+"*")
ch := pubsub.Channel()
for {
select {
case <-ctx.Done():
logger.Info("SSE Redis subscriber stopping")
return ctx.Err()
case msg, ok := <-ch:
if !ok {
return fmt.Errorf("SSE Redis subscription closed")
}
// Extract channel name by stripping the "sse:" prefix
channel := strings.TrimPrefix(msg.Channel, sseChannelPrefix)
var event SSEEvent
if err := json.Unmarshal([]byte(msg.Payload), &event); err != nil {
logger.Error("failed to parse SSE event from Redis",
"channel", channel, "error", err)
continue
}
// Deliver to local SSE hub
hub.SendToChannel(channel, &event)
}
}
}