174 lines
6.2 KiB
Markdown
174 lines
6.2 KiB
Markdown
# Technical Design: WebSocket Chat with Redis Pub/Sub
|
|
|
|
## Architecture
|
|
|
|
The implementation leverages the existing `pkg/realtime` package which provides:
|
|
- `LocalHub`: In-memory connection and room management
|
|
- `RedisBroadcaster`: Cross-pod message distribution via Redis pub/sub
|
|
- `Handler`: HTTP handler for WebSocket upgrade and lifecycle
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────────────────────┐
|
|
│ chat-api Service │
|
|
├─────────────────────────────────────────────────────────────────┤
|
|
│ │
|
|
│ ┌──────────────┐ ┌──────────────┐ ┌─────────────────┐ │
|
|
│ │ WebSocket │────▶│ LocalHub │────▶│ Redis Broadcaster│ │
|
|
│ │ Handler │ │ │ │ │ │
|
|
│ └──────────────┘ └──────────────┘ └────────┬────────┘ │
|
|
│ │ ▲ │ │
|
|
│ │ │ ▼ │
|
|
│ ▼ │ ┌──────────────┐ │
|
|
│ ┌──────────────┐ │ │ Redis │ │
|
|
│ │ Clients │ └──────────────│ Pub/Sub │ │
|
|
│ │ (WebSocket) │ └──────────────┘ │
|
|
│ └──────────────┘ │
|
|
│ │
|
|
└─────────────────────────────────────────────────────────────────┘
|
|
```
|
|
|
|
## Components
|
|
|
|
### 1. Configuration (`internal/config/config.go`)
|
|
|
|
Add Redis URL configuration:
|
|
|
|
```go
|
|
type Config struct {
|
|
// ... existing fields
|
|
RedisURL string
|
|
}
|
|
|
|
func Load() *Config {
|
|
return &Config{
|
|
// ... existing
|
|
RedisURL: os.Getenv("REDIS_URL"),
|
|
}
|
|
}
|
|
```
|
|
|
|
### 2. Main Entry Point (`cmd/server/main.go`)
|
|
|
|
Initialize realtime components with context for graceful shutdown:
|
|
|
|
```go
|
|
func main() {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
// Create hub and start event loop
|
|
hub := realtime.NewHub(logger)
|
|
go hub.Run(ctx)
|
|
|
|
// Create Redis broadcaster (if configured)
|
|
var broadcaster realtime.Broadcaster
|
|
if cfg.RedisURL != "" {
|
|
redisClient := redis.NewClient(&redis.Options{...})
|
|
broadcaster = realtime.NewRedisBroadcaster(redisClient, hub, logger)
|
|
go broadcaster.Run(ctx)
|
|
}
|
|
|
|
// Pass to route registration
|
|
api.RegisterRoutes(application, exampleService, hub, broadcaster)
|
|
}
|
|
```
|
|
|
|
### 3. Route Registration (`internal/api/routes.go`)
|
|
|
|
Mount WebSocket handler:
|
|
|
|
```go
|
|
func RegisterRoutes(app *app.App, exampleService *service.ExampleService,
|
|
hub realtime.Hub, broadcaster realtime.Broadcaster) {
|
|
|
|
wsHandler := realtime.NewHandler(hub, logger, realtime.HandlerConfig{
|
|
Broadcaster: broadcaster,
|
|
AuthRequired: cfg.AuthEnabled,
|
|
})
|
|
|
|
app.Route("/api/chat-api", func(r app.Router) {
|
|
// ... existing routes
|
|
|
|
// WebSocket routes
|
|
r.Mount("/ws", wsHandler.Routes())
|
|
})
|
|
}
|
|
```
|
|
|
|
### 4. WebSocket Handler (`pkg/realtime/handler.go`)
|
|
|
|
The existing handler already provides:
|
|
- `Routes()` returning Chi router with `GET /` and `GET /{room}`
|
|
- `HandleWebSocket()` for upgrade and lifecycle
|
|
- `GetStats()` for connection statistics
|
|
|
|
Add stats endpoint in routes.go:
|
|
|
|
```go
|
|
r.Get("/ws/stats", func(w http.ResponseWriter, r *http.Request) {
|
|
stats := wsHandler.GetStats()
|
|
httpresponse.OK(w, r, stats)
|
|
})
|
|
```
|
|
|
|
## Message Flow
|
|
|
|
### Outbound (Client → Server → Redis → All Pods)
|
|
|
|
1. Client sends JSON message via WebSocket
|
|
2. `WSClient.readPump()` decodes message
|
|
3. `Handler.makeMessageHandler()` processes message
|
|
4. If broadcaster configured: `Broadcaster.Publish()` to Redis
|
|
5. Redis distributes to all subscribed pods
|
|
6. Each pod's `RedisBroadcaster.Run()` receives message
|
|
7. `Hub.Broadcast()` delivers to local connections
|
|
|
|
### Inbound (Redis → Server → Clients)
|
|
|
|
1. `RedisBroadcaster.Run()` subscribes to Redis channels
|
|
2. Receives message, skips if from same pod (echo prevention)
|
|
3. Calls `Hub.Broadcast()` with message
|
|
4. `LocalHub.doBroadcast()` delivers to room or all connections
|
|
5. Each `Connection.Send()` queues to client's send buffer
|
|
6. `WSClient.writePump()` writes to WebSocket
|
|
|
|
## Redis Channel Structure
|
|
|
|
- `realtime:global` - Messages without room targeting
|
|
- `realtime:room:{room}` - Messages for specific room
|
|
|
|
## Configuration
|
|
|
|
Environment variables:
|
|
|
|
| Variable | Description | Default |
|
|
|----------|-------------|---------|
|
|
| `REDIS_URL` | Redis connection URL | (empty = local-only mode) |
|
|
| `AUTH_ENABLED` | Require authentication for WebSocket | `false` |
|
|
|
|
## Graceful Shutdown
|
|
|
|
1. Server receives SIGTERM/SIGINT
|
|
2. Context cancelled
|
|
3. `Hub.Run()` exits, closes all connections
|
|
4. `RedisBroadcaster.Run()` closes Redis subscription
|
|
5. Server shutdown completes
|
|
|
|
## Files to Modify
|
|
|
|
1. `services/chat-api/internal/config/config.go` - Add RedisURL
|
|
2. `services/chat-api/cmd/server/main.go` - Initialize hub/broadcaster
|
|
3. `services/chat-api/internal/api/routes.go` - Mount WebSocket handler
|
|
4. `services/chat-api/.env.example` - Add REDIS_URL
|
|
|
|
## Files to Create
|
|
|
|
1. `services/chat-api/internal/api/handlers/ws.go` - Stats handler wrapper
|
|
2. `services/chat-api/internal/api/handlers/ws_test.go` - WebSocket tests
|
|
|
|
## Dependencies
|
|
|
|
Already available in `pkg/go.mod`:
|
|
- `github.com/gorilla/websocket v1.5.3`
|
|
- `github.com/redis/go-redis/v9 v9.7.0`
|