package realtime import ( "context" "encoding/json" "fmt" "log/slog" "strings" "github.com/redis/go-redis/v9" ) const sseChannelPrefix = "sse:" // SSEPublisher publishes SSE events to Redis for cross-process delivery. // Used by the worker to send events that reach SSE-connected clients via the service. type SSEPublisher struct { client *redis.Client logger *slog.Logger } // NewSSEPublisher creates a publisher that sends SSE events via Redis pub/sub. func NewSSEPublisher(client *redis.Client, logger *slog.Logger) *SSEPublisher { if logger == nil { logger = slog.Default() } return &SSEPublisher{ client: client, logger: logger, } } // SendToUser publishes an SSE event to a user-specific channel. // The service's SSE subscriber picks this up and delivers to the connected client. func (p *SSEPublisher) SendToUser(userID string, event *SSEEvent) error { return p.SendToChannel("user:"+userID, event) } // SendToChannel publishes an SSE event to a named channel via Redis. func (p *SSEPublisher) SendToChannel(channel string, event *SSEEvent) error { data, err := json.Marshal(event) if err != nil { return fmt.Errorf("marshal SSE event: %w", err) } redisChannel := sseChannelPrefix + channel if err := p.client.Publish(context.Background(), redisChannel, data).Err(); err != nil { p.logger.Error("failed to publish SSE event to Redis", "channel", channel, "event_type", event.Type, "error", err) return fmt.Errorf("publish SSE event: %w", err) } p.logger.Debug("published SSE event to Redis", "channel", channel, "event_type", event.Type) return nil } // RunSSESubscriber subscribes to all SSE Redis channels and forwards events // to the local SSEHub for delivery to connected clients. // Blocks until ctx is cancelled. func RunSSESubscriber(ctx context.Context, client *redis.Client, hub *SSEHub, logger *slog.Logger) error { if logger == nil { logger = slog.Default() } // PSubscribe to all SSE channels pubsub := client.PSubscribe(ctx, sseChannelPrefix+"*") defer pubsub.Close() logger.Info("SSE Redis subscriber started", "pattern", sseChannelPrefix+"*") ch := pubsub.Channel() for { select { case <-ctx.Done(): logger.Info("SSE Redis subscriber stopping") return ctx.Err() case msg, ok := <-ch: if !ok { return fmt.Errorf("SSE Redis subscription closed") } // Extract channel name by stripping the "sse:" prefix channel := strings.TrimPrefix(msg.Channel, sseChannelPrefix) var event SSEEvent if err := json.Unmarshal([]byte(msg.Payload), &event); err != nil { logger.Error("failed to parse SSE event from Redis", "channel", channel, "error", err) continue } // Deliver to local SSE hub hub.SendToChannel(channel, &event) } } }