persona-community-3/pkg/realtime/realtime.go
jordan f53b908499
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
ci/woodpecker/manual/woodpecker Pipeline was successful
Initialize project from skeleton template
2026-02-23 11:10:35 +00:00

218 lines
6.3 KiB
Go

// Package realtime provides SSE (Server-Sent Events) with Redis-backed broadcasting.
//
// IMPORTANT: NO WEBSOCKETS. All real-time communication uses HTTP2 + SSE.
// - User → Server: HTTP2 POST/PUT/DELETE (standard REST)
// - Server → User: SSE (one-way event stream)
//
// Event flow: server → redis → redis listeners → SSE hub → user
//
// This pattern applies to ALL real-time features: chat, notifications, progress, presence.
//
// Usage:
//
// // Create SSE hub
// sseHub := realtime.NewSSEHub(logger)
//
// // Optional: Add Redis for multi-pod scaling
// redisBroadcaster := realtime.NewRedisBroadcaster(redisClient, sseHub, logger)
// go redisBroadcaster.Run(ctx)
//
// // Mount SSE handler
// sseHandler := realtime.NewSSEHandler(sseHub, logger)
// r.Mount("/api/events", sseHandler.Routes())
//
// // In your API handlers, publish events:
// sseHub.SendToUser(userID, &realtime.SSEEvent{
// Type: "generation_complete",
// JobID: jobID,
// Result: result,
// })
package realtime
import (
"context"
"encoding/json"
"time"
)
// Message represents a real-time message sent between clients.
type Message struct {
// ID is a unique identifier for the message (optional, set by broadcaster).
ID string `json:"id,omitempty"`
// Type identifies the message kind (e.g., "chat", "notification", "presence").
Type string `json:"type"`
// Room is the target room for the message (empty for direct messages).
Room string `json:"room,omitempty"`
// From is the sender's client ID (set by server).
From string `json:"from,omitempty"`
// Data contains the message payload.
Data json.RawMessage `json:"data,omitempty"`
// Timestamp is when the message was created.
Timestamp time.Time `json:"timestamp"`
}
// SystemMessage creates a system-generated message.
func SystemMessage(msgType string, data any) (*Message, error) {
dataBytes, err := json.Marshal(data)
if err != nil {
return nil, err
}
return &Message{
Type: msgType,
From: "system",
Data: dataBytes,
Timestamp: time.Now().UTC(),
}, nil
}
// Connection represents a connected WebSocket client.
type Connection interface {
// ID returns the unique connection identifier.
ID() string
// UserID returns the authenticated user ID (empty if anonymous).
UserID() string
// UserName returns the display name for the user.
UserName() string
// Send queues a message for delivery to this connection.
// Returns false if the connection is closed or send buffer is full.
Send(msg *Message) bool
// Close gracefully closes the connection.
Close()
}
// Hub manages active connections and room memberships.
type Hub interface {
// Register adds a connection to the hub.
Register(conn Connection)
// Unregister removes a connection from the hub.
Unregister(conn Connection)
// JoinRoom adds a connection to a room.
JoinRoom(conn Connection, room string)
// LeaveRoom removes a connection from a room.
LeaveRoom(conn Connection, room string)
// Broadcast sends a message to all connections in a room.
// If room is empty, broadcasts to all connections.
Broadcast(msg *Message)
// SendTo sends a message to a specific connection by ID.
SendTo(connID string, msg *Message) bool
// ConnectionCount returns the total number of active connections.
ConnectionCount() int
// RoomCount returns the number of connections in a specific room.
RoomCount(room string) int
}
// Broadcaster handles cross-pod message distribution.
// Implementations typically use Redis Pub/Sub or similar.
type Broadcaster interface {
// Publish sends a message to all pods.
Publish(ctx context.Context, msg *Message) error
// Run starts the subscriber loop (blocks until context is cancelled).
Run(ctx context.Context) error
}
// MessageType constants for common message types.
const (
MessageTypeChat = "chat"
MessageTypeAIChat = "ai_chat"
MessageTypeAIChatChunk = "ai_chat_chunk"
MessageTypePresence = "presence"
MessageTypeNotification = "notification"
MessageTypeSystem = "system"
MessageTypeError = "error"
MessageTypePing = "ping"
MessageTypePong = "pong"
)
// PresenceStatus constants for presence messages.
const (
PresenceOnline = "online"
PresenceOffline = "offline"
PresenceAway = "away"
)
// PresenceData represents presence change data.
type PresenceData struct {
Status string `json:"status"`
UserID string `json:"userId"`
UserName string `json:"userName,omitempty"`
}
// JoinLeaveData represents room join/leave data.
type JoinLeaveData struct {
Room string `json:"room"`
UserID string `json:"userId"`
}
// ChatData represents user chat message data.
type ChatData struct {
Content string `json:"content"`
UserID string `json:"userId"`
UserName string `json:"userName,omitempty"`
}
// AIResponseData represents AI-generated response data.
type AIResponseData struct {
Content string `json:"content"`
Provider string `json:"provider"`
}
// AIChunkData represents a streaming chunk of AI response.
type AIChunkData struct {
// StreamID identifies this stream (for correlating chunks).
StreamID string `json:"streamId"`
// Text is the chunk content.
Text string `json:"text"`
// Done indicates this is the final chunk.
Done bool `json:"done"`
// Provider name (only set on final chunk).
Provider string `json:"provider,omitempty"`
}
// ---------------------------------------------------------------------------
// Generation/Upload Events (sent via SSE, not WebSocket)
// ---------------------------------------------------------------------------
// GenerationEvent types for async job progress.
const (
EventGenerationStarted = "generation_started"
EventGenerationProgress = "generation_progress"
EventGenerationComplete = "generation_complete"
EventGenerationFailed = "generation_failed"
EventUploadStarted = "upload_started"
EventUploadProgress = "upload_progress"
EventUploadComplete = "upload_complete"
EventUploadFailed = "upload_failed"
)
// GenerationResult is the result payload for generation_complete events.
type GenerationResult struct {
URL string `json:"url"`
Provider string `json:"provider"`
LatencyMs int64 `json:"latencyMs"`
}
// UploadResult is the result payload for upload_complete events.
type UploadResult struct {
ID string `json:"id"`
Original string `json:"original"`
Optimized string `json:"optimized"`
Thumbnail string `json:"thumbnail"`
}