From 82c41e819be928adc52e885c43554207fec332ab Mon Sep 17 00:00:00 2001 From: rdev-worker Date: Thu, 5 Feb 2026 21:58:16 +0000 Subject: [PATCH] build: /implement-feature websocket-chat --requirements 'GET /ws upgrades to... --- .sdlc/branches/feature/websocket-chat.yaml | 4 + .sdlc/config.yaml | 36 ++ .sdlc/features/websocket-chat/design.md | 173 ++++++++ .sdlc/features/websocket-chat/manifest.yaml | 50 +++ .sdlc/features/websocket-chat/qa-plan.md | 46 +++ .sdlc/features/websocket-chat/spec.md | 73 ++++ .sdlc/features/websocket-chat/tasks.md | 142 +++++++ .sdlc/state.yaml | 63 +++ services/chat-api/.env.example | 3 + services/chat-api/cmd/server/main.go | 39 +- .../chat-api/internal/api/handlers/ws_test.go | 386 ++++++++++++++++++ services/chat-api/internal/api/routes.go | 29 +- services/chat-api/internal/api/spec.go | 63 ++- services/chat-api/internal/config/config.go | 5 + 14 files changed, 1109 insertions(+), 3 deletions(-) create mode 100644 .sdlc/branches/feature/websocket-chat.yaml create mode 100644 .sdlc/config.yaml create mode 100644 .sdlc/features/websocket-chat/design.md create mode 100644 .sdlc/features/websocket-chat/manifest.yaml create mode 100644 .sdlc/features/websocket-chat/qa-plan.md create mode 100644 .sdlc/features/websocket-chat/spec.md create mode 100644 .sdlc/features/websocket-chat/tasks.md create mode 100644 .sdlc/state.yaml create mode 100644 services/chat-api/internal/api/handlers/ws_test.go diff --git a/.sdlc/branches/feature/websocket-chat.yaml b/.sdlc/branches/feature/websocket-chat.yaml new file mode 100644 index 0000000..534bc16 --- /dev/null +++ b/.sdlc/branches/feature/websocket-chat.yaml @@ -0,0 +1,4 @@ +name: feature/websocket-chat +feature: websocket-chat +base_branch: main +created_at: 2026-02-05T21:54:23.43461145Z diff --git a/.sdlc/config.yaml b/.sdlc/config.yaml new file mode 100644 index 0000000..0d9e993 --- /dev/null +++ b/.sdlc/config.yaml @@ -0,0 +1,36 @@ +version: 1 +project: + name: workspace +branches: + main: main + feature_prefix: feature/ +phases: + enabled: + - draft + - specified + - planned + - ready + - implementation + - review + - audit + - qa + - merge + - released + required_artifacts: + audit: + - audit + planned: + - spec + - design + - tasks + - qa_plan + qa: + - qa_results + review: + - review + specified: + - spec +compliance: + require_approvals: true + require_branch: true + require_qa: true diff --git a/.sdlc/features/websocket-chat/design.md b/.sdlc/features/websocket-chat/design.md new file mode 100644 index 0000000..c9cc060 --- /dev/null +++ b/.sdlc/features/websocket-chat/design.md @@ -0,0 +1,173 @@ +# Technical Design: WebSocket Chat with Redis Pub/Sub + +## Architecture + +The implementation leverages the existing `pkg/realtime` package which provides: +- `LocalHub`: In-memory connection and room management +- `RedisBroadcaster`: Cross-pod message distribution via Redis pub/sub +- `Handler`: HTTP handler for WebSocket upgrade and lifecycle + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ chat-api Service │ +├─────────────────────────────────────────────────────────────────┤ +│ │ +│ ┌──────────────┐ ┌──────────────┐ ┌─────────────────┐ │ +│ │ WebSocket │────▶│ LocalHub │────▶│ Redis Broadcaster│ │ +│ │ Handler │ │ │ │ │ │ +│ └──────────────┘ └──────────────┘ └────────┬────────┘ │ +│ │ ▲ │ │ +│ │ │ ▼ │ +│ ▼ │ ┌──────────────┐ │ +│ ┌──────────────┐ │ │ Redis │ │ +│ │ Clients │ └──────────────│ Pub/Sub │ │ +│ │ (WebSocket) │ └──────────────┘ │ +│ └──────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────────┘ +``` + +## Components + +### 1. Configuration (`internal/config/config.go`) + +Add Redis URL configuration: + +```go +type Config struct { + // ... existing fields + RedisURL string +} + +func Load() *Config { + return &Config{ + // ... existing + RedisURL: os.Getenv("REDIS_URL"), + } +} +``` + +### 2. Main Entry Point (`cmd/server/main.go`) + +Initialize realtime components with context for graceful shutdown: + +```go +func main() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create hub and start event loop + hub := realtime.NewHub(logger) + go hub.Run(ctx) + + // Create Redis broadcaster (if configured) + var broadcaster realtime.Broadcaster + if cfg.RedisURL != "" { + redisClient := redis.NewClient(&redis.Options{...}) + broadcaster = realtime.NewRedisBroadcaster(redisClient, hub, logger) + go broadcaster.Run(ctx) + } + + // Pass to route registration + api.RegisterRoutes(application, exampleService, hub, broadcaster) +} +``` + +### 3. Route Registration (`internal/api/routes.go`) + +Mount WebSocket handler: + +```go +func RegisterRoutes(app *app.App, exampleService *service.ExampleService, + hub realtime.Hub, broadcaster realtime.Broadcaster) { + + wsHandler := realtime.NewHandler(hub, logger, realtime.HandlerConfig{ + Broadcaster: broadcaster, + AuthRequired: cfg.AuthEnabled, + }) + + app.Route("/api/chat-api", func(r app.Router) { + // ... existing routes + + // WebSocket routes + r.Mount("/ws", wsHandler.Routes()) + }) +} +``` + +### 4. WebSocket Handler (`pkg/realtime/handler.go`) + +The existing handler already provides: +- `Routes()` returning Chi router with `GET /` and `GET /{room}` +- `HandleWebSocket()` for upgrade and lifecycle +- `GetStats()` for connection statistics + +Add stats endpoint in routes.go: + +```go +r.Get("/ws/stats", func(w http.ResponseWriter, r *http.Request) { + stats := wsHandler.GetStats() + httpresponse.OK(w, r, stats) +}) +``` + +## Message Flow + +### Outbound (Client → Server → Redis → All Pods) + +1. Client sends JSON message via WebSocket +2. `WSClient.readPump()` decodes message +3. `Handler.makeMessageHandler()` processes message +4. If broadcaster configured: `Broadcaster.Publish()` to Redis +5. Redis distributes to all subscribed pods +6. Each pod's `RedisBroadcaster.Run()` receives message +7. `Hub.Broadcast()` delivers to local connections + +### Inbound (Redis → Server → Clients) + +1. `RedisBroadcaster.Run()` subscribes to Redis channels +2. Receives message, skips if from same pod (echo prevention) +3. Calls `Hub.Broadcast()` with message +4. `LocalHub.doBroadcast()` delivers to room or all connections +5. Each `Connection.Send()` queues to client's send buffer +6. `WSClient.writePump()` writes to WebSocket + +## Redis Channel Structure + +- `realtime:global` - Messages without room targeting +- `realtime:room:{room}` - Messages for specific room + +## Configuration + +Environment variables: + +| Variable | Description | Default | +|----------|-------------|---------| +| `REDIS_URL` | Redis connection URL | (empty = local-only mode) | +| `AUTH_ENABLED` | Require authentication for WebSocket | `false` | + +## Graceful Shutdown + +1. Server receives SIGTERM/SIGINT +2. Context cancelled +3. `Hub.Run()` exits, closes all connections +4. `RedisBroadcaster.Run()` closes Redis subscription +5. Server shutdown completes + +## Files to Modify + +1. `services/chat-api/internal/config/config.go` - Add RedisURL +2. `services/chat-api/cmd/server/main.go` - Initialize hub/broadcaster +3. `services/chat-api/internal/api/routes.go` - Mount WebSocket handler +4. `services/chat-api/.env.example` - Add REDIS_URL + +## Files to Create + +1. `services/chat-api/internal/api/handlers/ws.go` - Stats handler wrapper +2. `services/chat-api/internal/api/handlers/ws_test.go` - WebSocket tests + +## Dependencies + +Already available in `pkg/go.mod`: +- `github.com/gorilla/websocket v1.5.3` +- `github.com/redis/go-redis/v9 v9.7.0` diff --git a/.sdlc/features/websocket-chat/manifest.yaml b/.sdlc/features/websocket-chat/manifest.yaml new file mode 100644 index 0000000..6464605 --- /dev/null +++ b/.sdlc/features/websocket-chat/manifest.yaml @@ -0,0 +1,50 @@ +slug: websocket-chat +title: WebSocket Chat with Redis Pub/Sub +created: 2026-02-05T21:52:07.328706369Z +branch: feature/websocket-chat +phase: implementation +phase_history: + - phase: draft + entered: 2026-02-05T21:52:07.328706369Z + exited: 2026-02-05T21:53:53.812434642Z + - phase: specified + entered: 2026-02-05T21:53:53.812434642Z + exited: 2026-02-05T21:54:20.249171969Z + - phase: planned + entered: 2026-02-05T21:54:20.249171969Z + exited: 2026-02-05T21:54:29.234820907Z + - phase: ready + entered: 2026-02-05T21:54:29.234820907Z + exited: 2026-02-05T21:54:29.239877785Z + - phase: implementation + entered: 2026-02-05T21:54:29.239877785Z +artifacts: + audit: + status: pending + path: audit.md + design: + status: approved + path: design.md + approved_by: user + approved_at: 2026-02-05T21:53:28.832971074Z + qa_plan: + status: approved + path: qa-plan.md + approved_by: user + approved_at: 2026-02-05T21:54:13.064966346Z + qa_results: + status: pending + path: qa-results.md + review: + status: pending + path: review.md + spec: + status: approved + path: spec.md + approved_by: user + approved_at: 2026-02-05T21:53:27.872265485Z + tasks: + status: approved + path: tasks.md + approved_by: user + approved_at: 2026-02-05T21:53:29.394129418Z diff --git a/.sdlc/features/websocket-chat/qa-plan.md b/.sdlc/features/websocket-chat/qa-plan.md new file mode 100644 index 0000000..b750ecd --- /dev/null +++ b/.sdlc/features/websocket-chat/qa-plan.md @@ -0,0 +1,46 @@ +# QA Plan: WebSocket Chat with Redis Pub/Sub + +## Test Categories + +### 1. Unit Tests + +| Test | Description | Location | +|------|-------------|----------| +| Config loading | RedisURL read from environment | `config/config_test.go` | +| Hub registration | Connections register/unregister | `pkg/realtime/hub_test.go` | +| Room management | Join/leave room operations | `pkg/realtime/hub_test.go` | +| Message broadcast | Room and global broadcast | `pkg/realtime/hub_test.go` | + +### 2. Integration Tests + +| Test | Description | Location | +|------|-------------|----------| +| WebSocket upgrade | GET /ws returns 101 Switching Protocols | `handlers/ws_test.go` | +| Room join URL | GET /ws/{room} joins room | `handlers/ws_test.go` | +| Room join query | GET /ws?room=x joins room | `handlers/ws_test.go` | +| Message echo | Client receives own messages | `handlers/ws_test.go` | +| Multi-client broadcast | All clients receive messages | `handlers/ws_test.go` | +| Stats endpoint | GET /ws/stats returns counts | `handlers/ws_test.go` | +| Connection cleanup | Disconnected clients removed | `handlers/ws_test.go` | + +### 3. Manual Tests + +| Test | Steps | Expected | +|------|-------|----------| +| wscat connection | `wscat -c ws://localhost:8001/api/chat-api/ws` | Connection established | +| Send message | Send JSON `{"type":"chat","data":{"text":"hello"}}` | Message echoed back | +| Room broadcast | Two clients join same room, one sends | Both receive message | +| Stats check | `curl localhost:8001/api/chat-api/ws/stats` | Returns connection count | + +## Test Environment + +- Redis running via docker-compose +- Service running on port 8001 +- WebSocket client: wscat or websocat + +## Pass Criteria + +- All unit tests pass +- All integration tests pass +- Manual WebSocket connection test successful +- Message broadcast verified with multiple clients diff --git a/.sdlc/features/websocket-chat/spec.md b/.sdlc/features/websocket-chat/spec.md new file mode 100644 index 0000000..b2613a6 --- /dev/null +++ b/.sdlc/features/websocket-chat/spec.md @@ -0,0 +1,73 @@ +# Feature Specification: WebSocket Chat with Redis Pub/Sub + +## Overview + +Implement a WebSocket endpoint at `GET /ws` that upgrades HTTP connections to WebSocket for real-time chat functionality. Messages received from clients are published to Redis channels, and a Redis subscriber broadcasts incoming messages to all connected clients. + +## Requirements + +### Functional Requirements + +1. **WebSocket Upgrade Endpoint** + - `GET /api/chat-api/ws` upgrades HTTP to WebSocket + - `GET /api/chat-api/ws/{room}` joins a specific room on connect + - Support optional `?room=` query parameter for room selection + +2. **Message Publishing** + - Incoming WebSocket messages are published to Redis pub/sub channels + - Room-specific messages go to `realtime:room:{room}` channel + - Global messages (no room) go to `realtime:global` channel + +3. **Message Broadcasting** + - Redis subscriber receives messages from all channels + - Messages are broadcast to all connected WebSocket clients + - Room-targeted messages only go to clients in that room + - Cross-pod broadcasting ensures messages reach all service instances + +4. **Message Format** + ```json + { + "id": "uuid", + "type": "chat|presence|notification|system", + "room": "room-name", + "from": "client-id", + "data": {}, + "timestamp": "2026-02-05T00:00:00Z" + } + ``` + +### Non-Functional Requirements + +1. **Connection Management** + - Automatic ping/pong heartbeat (60s timeout) + - Graceful connection cleanup on disconnect + - Maximum message size: 64KB + - Send buffer size: 256 messages + +2. **Scalability** + - Multi-pod deployment via Redis pub/sub + - Pod ID tracking to prevent message echo + +3. **Configuration** + - Redis URL via `REDIS_URL` environment variable + - Auth optional via existing `AUTH_ENABLED` flag + +## Acceptance Criteria + +- [ ] WebSocket endpoint accessible at `/api/chat-api/ws` +- [ ] Room-based WebSocket endpoint at `/api/chat-api/ws/{room}` +- [ ] Messages published to Redis channels +- [ ] Redis subscriber broadcasts to connected clients +- [ ] Room-targeted messages only reach room members +- [ ] Global messages reach all connected clients +- [ ] Connection stats endpoint at `/api/chat-api/ws/stats` +- [ ] Graceful shutdown of WebSocket connections +- [ ] Tests cover WebSocket handler and Redis integration + +## Out of Scope + +- Message persistence (database storage) +- Message history retrieval +- User authentication enforcement (auth remains optional) +- Rate limiting +- Message encryption diff --git a/.sdlc/features/websocket-chat/tasks.md b/.sdlc/features/websocket-chat/tasks.md new file mode 100644 index 0000000..89a2594 --- /dev/null +++ b/.sdlc/features/websocket-chat/tasks.md @@ -0,0 +1,142 @@ +# Implementation Tasks: WebSocket Chat with Redis Pub/Sub + +## Task 1: Add Redis Configuration + +**ID:** `redis-config` +**Scope:** Add Redis URL to service configuration + +**Files:** +- `services/chat-api/internal/config/config.go` +- `services/chat-api/.env.example` + +**Changes:** +1. Add `RedisURL string` field to Config struct +2. Read from `REDIS_URL` environment variable +3. Add `REDIS_URL` to `.env.example` with comment + +**Acceptance:** +- Config struct has RedisURL field +- Load() reads REDIS_URL from environment +- .env.example documents the configuration + +--- + +## Task 2: Initialize Hub and Broadcaster + +**ID:** `hub-init` +**Blocked By:** `redis-config` +**Scope:** Initialize realtime components in main.go + +**Files:** +- `services/chat-api/cmd/server/main.go` + +**Changes:** +1. Add context with cancel for graceful shutdown +2. Create LocalHub and start event loop +3. Create RedisBroadcaster if RedisURL configured +4. Update RegisterRoutes call to pass hub and broadcaster +5. Register shutdown hook to cancel context + +**Acceptance:** +- Hub event loop runs in goroutine +- Redis broadcaster runs if configured +- Context cancelled on shutdown +- Service starts without errors + +--- + +## Task 3: Mount WebSocket Handler + +**ID:** `ws-routes` +**Blocked By:** `hub-init` +**Scope:** Register WebSocket routes in routes.go + +**Files:** +- `services/chat-api/internal/api/routes.go` + +**Changes:** +1. Update RegisterRoutes signature to accept Hub and Broadcaster +2. Create realtime.Handler with configuration +3. Mount handler at `/ws` path under `/api/chat-api` +4. Add stats endpoint at `/ws/stats` + +**Acceptance:** +- WebSocket upgrade works at `/api/chat-api/ws` +- Room-based endpoint at `/api/chat-api/ws/{room}` +- Stats endpoint returns connection count +- Auth middleware applied when AUTH_ENABLED=true + +--- + +## Task 4: Add WebSocket OpenAPI Documentation + +**ID:** `ws-openapi` +**Blocked By:** `ws-routes` +**Scope:** Document WebSocket endpoints in OpenAPI spec + +**Files:** +- `services/chat-api/internal/api/spec.go` + +**Changes:** +1. Add `/ws` endpoint documentation (upgrade endpoint) +2. Add `/ws/{room}` endpoint documentation +3. Add `/ws/stats` endpoint with response schema +4. Document WebSocket message format in schemas + +**Acceptance:** +- OpenAPI spec includes WebSocket endpoints +- Stats endpoint has proper response schema +- Message format documented + +--- + +## Task 5: Create WebSocket Integration Tests + +**ID:** `ws-tests` +**Blocked By:** `ws-routes` +**Scope:** Test WebSocket handler functionality + +**Files:** +- `services/chat-api/internal/api/handlers/ws_test.go` + +**Tests:** +1. WebSocket connection upgrade succeeds +2. Room join via URL parameter works +3. Room join via query parameter works +4. Message broadcast to room members +5. Global message broadcast to all clients +6. Stats endpoint returns correct counts +7. Connection cleanup on disconnect + +**Acceptance:** +- All tests pass +- Coverage includes happy path and error cases +- Tests use httptest for server simulation + +--- + +## Task Summary + +| ID | Task | Blocked By | Status | +|----|------|------------|--------| +| `redis-config` | Add Redis Configuration | - | pending | +| `hub-init` | Initialize Hub and Broadcaster | redis-config | pending | +| `ws-routes` | Mount WebSocket Handler | hub-init | pending | +| `ws-openapi` | Add WebSocket OpenAPI Documentation | ws-routes | pending | +| `ws-tests` | Create WebSocket Integration Tests | ws-routes | pending | + +## Dependency Graph + +``` +redis-config + │ + ▼ + hub-init + │ + ▼ + ws-routes + │ + ├─────────┐ + ▼ ▼ +ws-openapi ws-tests +``` diff --git a/.sdlc/state.yaml b/.sdlc/state.yaml new file mode 100644 index 0000000..1e6b55d --- /dev/null +++ b/.sdlc/state.yaml @@ -0,0 +1,63 @@ +version: 1 +project: + name: workspace +active_work: + features: + - slug: websocket-chat + branch: feature/websocket-chat + phase: implementation +blocked: [] +last_updated: 2026-02-05T21:54:29.240473436Z +last_action: TRANSITION +last_actor: cli +history: + - timestamp: 2026-02-05T21:52:07.329080213Z + action: CREATE_FEATURE + feature: websocket-chat + actor: cli + result: success + - timestamp: 2026-02-05T21:53:27.872711534Z + action: APPROVE_ARTIFACT + feature: websocket-chat + actor: user + result: success + - timestamp: 2026-02-05T21:53:28.83345252Z + action: APPROVE_ARTIFACT + feature: websocket-chat + actor: user + result: success + - timestamp: 2026-02-05T21:53:29.394664165Z + action: APPROVE_ARTIFACT + feature: websocket-chat + actor: user + result: success + - timestamp: 2026-02-05T21:53:53.812877094Z + action: TRANSITION + feature: websocket-chat + actor: cli + result: success + - timestamp: 2026-02-05T21:54:13.065523174Z + action: APPROVE_ARTIFACT + feature: websocket-chat + actor: user + result: success + - timestamp: 2026-02-05T21:54:20.24969848Z + action: TRANSITION + feature: websocket-chat + actor: cli + result: success + - timestamp: 2026-02-05T21:54:23.438644211Z + action: CREATE_BRANCH + feature: websocket-chat + actor: cli + result: success + - timestamp: 2026-02-05T21:54:29.235335345Z + action: TRANSITION + feature: websocket-chat + actor: cli + result: success + - timestamp: 2026-02-05T21:54:29.240466754Z + action: TRANSITION + feature: websocket-chat + actor: cli + result: success diff --git a/services/chat-api/.env.example b/services/chat-api/.env.example index 21fbd4c..9000001 100644 --- a/services/chat-api/.env.example +++ b/services/chat-api/.env.example @@ -19,3 +19,6 @@ JWT_SECRET=dev-secret-change-in-production # Database (if needed) DATABASE_URL=postgres://dev:dev@localhost:5432/sp3-solo-1770327084?sslmode=disable + +# Redis (for WebSocket cross-pod broadcasting, empty = local-only mode) +REDIS_URL=redis://localhost:6379/0 diff --git a/services/chat-api/cmd/server/main.go b/services/chat-api/cmd/server/main.go index efca31f..4dc2eef 100644 --- a/services/chat-api/cmd/server/main.go +++ b/services/chat-api/cmd/server/main.go @@ -2,10 +2,16 @@ package main import ( + "context" + + "github.com/redis/go-redis/v9" + "git.threesix.ai/jordan/sp3-solo-1770327084/pkg/app" "git.threesix.ai/jordan/sp3-solo-1770327084/pkg/logging" + "git.threesix.ai/jordan/sp3-solo-1770327084/pkg/realtime" "git.threesix.ai/jordan/sp3-solo-1770327084/services/chat-api/internal/adapter/memory" "git.threesix.ai/jordan/sp3-solo-1770327084/services/chat-api/internal/api" + "git.threesix.ai/jordan/sp3-solo-1770327084/services/chat-api/internal/config" "git.threesix.ai/jordan/sp3-solo-1770327084/services/chat-api/internal/service" ) @@ -13,17 +19,48 @@ func main() { // Create logger logger := logging.Default() + // Load configuration + cfg := config.Load() + + // Create context for graceful shutdown of background goroutines + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // Create adapters (repositories) exampleRepo := memory.NewExampleRepository() // Create services (business logic) exampleService := service.NewExampleService(exampleRepo, logger) + // Create WebSocket hub and start event loop + hub := realtime.NewHub(logger) + go hub.Run(ctx) + + // Create Redis broadcaster if configured + var broadcaster realtime.Broadcaster + if cfg.RedisURL != "" { + opt, err := redis.ParseURL(cfg.RedisURL) + if err != nil { + logger.Error("invalid REDIS_URL", "error", err) + } else { + redisClient := redis.NewClient(opt) + broadcaster = realtime.NewRedisBroadcaster(redisClient, hub, logger) + go broadcaster.Run(ctx) + logger.Info("redis broadcaster enabled", "url", cfg.RedisURL) + } + } + // Create application application := app.New("chat-api", app.WithDefaultPort(8001)) + // Register shutdown hook to cancel context + application.OnShutdown(func(_ context.Context) error { + cancel() + return nil + }) + // Register routes with dependency injection - api.RegisterRoutes(application, exampleService) + api.RegisterRoutes(application, exampleService, hub, broadcaster) // Start server application.Run() diff --git a/services/chat-api/internal/api/handlers/ws_test.go b/services/chat-api/internal/api/handlers/ws_test.go new file mode 100644 index 0000000..f2fddcb --- /dev/null +++ b/services/chat-api/internal/api/handlers/ws_test.go @@ -0,0 +1,386 @@ +package handlers + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + "time" + + "github.com/go-chi/chi/v5" + "github.com/gorilla/websocket" + + "git.threesix.ai/jordan/sp3-solo-1770327084/pkg/httpresponse" + "git.threesix.ai/jordan/sp3-solo-1770327084/pkg/logging" + "git.threesix.ai/jordan/sp3-solo-1770327084/pkg/realtime" +) + +// testHub wraps LocalHub for testing. +func newTestHub(t *testing.T) *realtime.LocalHub { + t.Helper() + hub := realtime.NewHub(logging.Nop()) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + go hub.Run(ctx) + return hub +} + +// testServer creates an HTTP test server with WebSocket handler. +func newTestServer(t *testing.T, hub realtime.Hub) *httptest.Server { + t.Helper() + + wsHandler := realtime.NewHandler(hub, logging.Nop(), realtime.HandlerConfig{ + Broadcaster: nil, // No Redis for unit tests + AuthRequired: false, + }) + + r := chi.NewRouter() + r.Mount("/ws", wsHandler.Routes()) + r.Get("/ws/stats", func(w http.ResponseWriter, r *http.Request) { + stats := wsHandler.GetStats() + httpresponse.OK(w, r, stats) + }) + + return httptest.NewServer(r) +} + +// wsURL converts http:// URL to ws:// URL. +func wsURL(server *httptest.Server, path string) string { + return "ws" + strings.TrimPrefix(server.URL, "http") + path +} + +func TestWebSocket_Upgrade(t *testing.T) { + hub := newTestHub(t) + server := newTestServer(t, hub) + defer server.Close() + + // Connect via WebSocket + conn, resp, err := websocket.DefaultDialer.Dial(wsURL(server, "/ws"), nil) + if err != nil { + t.Fatalf("failed to connect: %v", err) + } + defer conn.Close() + + if resp.StatusCode != http.StatusSwitchingProtocols { + t.Errorf("expected status 101, got %d", resp.StatusCode) + } + + // Give hub time to register + time.Sleep(50 * time.Millisecond) + + if hub.ConnectionCount() != 1 { + t.Errorf("expected 1 connection, got %d", hub.ConnectionCount()) + } +} + +func TestWebSocket_RoomJoinViaURL(t *testing.T) { + hub := newTestHub(t) + server := newTestServer(t, hub) + defer server.Close() + + // Connect to specific room via URL path + conn, _, err := websocket.DefaultDialer.Dial(wsURL(server, "/ws/test-room"), nil) + if err != nil { + t.Fatalf("failed to connect: %v", err) + } + defer conn.Close() + + // Give hub time to register and join room + time.Sleep(50 * time.Millisecond) + + if hub.RoomCount("test-room") != 1 { + t.Errorf("expected 1 client in room, got %d", hub.RoomCount("test-room")) + } +} + +func TestWebSocket_RoomJoinViaQuery(t *testing.T) { + hub := newTestHub(t) + server := newTestServer(t, hub) + defer server.Close() + + // Connect with room query parameter + conn, _, err := websocket.DefaultDialer.Dial(wsURL(server, "/ws?room=query-room"), nil) + if err != nil { + t.Fatalf("failed to connect: %v", err) + } + defer conn.Close() + + // Give hub time to register and join room + time.Sleep(50 * time.Millisecond) + + if hub.RoomCount("query-room") != 1 { + t.Errorf("expected 1 client in query-room, got %d", hub.RoomCount("query-room")) + } +} + +func TestWebSocket_MessageBroadcast(t *testing.T) { + hub := newTestHub(t) + server := newTestServer(t, hub) + defer server.Close() + + // Connect two clients to the same room + conn1, _, err := websocket.DefaultDialer.Dial(wsURL(server, "/ws/broadcast-room"), nil) + if err != nil { + t.Fatalf("client 1 failed to connect: %v", err) + } + defer conn1.Close() + + conn2, _, err := websocket.DefaultDialer.Dial(wsURL(server, "/ws/broadcast-room"), nil) + if err != nil { + t.Fatalf("client 2 failed to connect: %v", err) + } + defer conn2.Close() + + // Give hub time to register both clients + time.Sleep(50 * time.Millisecond) + + if hub.RoomCount("broadcast-room") != 2 { + t.Fatalf("expected 2 clients in room, got %d", hub.RoomCount("broadcast-room")) + } + + // Send a message from client 1 + msg := realtime.Message{ + Type: realtime.MessageTypeChat, + Room: "broadcast-room", + Data: json.RawMessage(`{"text":"hello"}`), + } + if err := conn1.WriteJSON(msg); err != nil { + t.Fatalf("failed to send message: %v", err) + } + + // Both clients should receive the message + var wg sync.WaitGroup + wg.Add(2) + + readMessage := func(conn *websocket.Conn, name string) { + defer wg.Done() + conn.SetReadDeadline(time.Now().Add(time.Second)) + var received realtime.Message + if err := conn.ReadJSON(&received); err != nil { + t.Errorf("%s failed to read message: %v", name, err) + return + } + if received.Type != realtime.MessageTypeChat { + t.Errorf("%s: expected type 'chat', got %s", name, received.Type) + } + } + + go readMessage(conn1, "client1") + go readMessage(conn2, "client2") + wg.Wait() +} + +func TestWebSocket_GlobalBroadcast(t *testing.T) { + hub := newTestHub(t) + server := newTestServer(t, hub) + defer server.Close() + + // Connect two clients without room (global) + conn1, _, err := websocket.DefaultDialer.Dial(wsURL(server, "/ws"), nil) + if err != nil { + t.Fatalf("client 1 failed to connect: %v", err) + } + defer conn1.Close() + + conn2, _, err := websocket.DefaultDialer.Dial(wsURL(server, "/ws"), nil) + if err != nil { + t.Fatalf("client 2 failed to connect: %v", err) + } + defer conn2.Close() + + // Give hub time to register + time.Sleep(50 * time.Millisecond) + + if hub.ConnectionCount() != 2 { + t.Fatalf("expected 2 connections, got %d", hub.ConnectionCount()) + } + + // Send a global message (no room) + msg := realtime.Message{ + Type: realtime.MessageTypeChat, + Data: json.RawMessage(`{"text":"global message"}`), + } + if err := conn1.WriteJSON(msg); err != nil { + t.Fatalf("failed to send message: %v", err) + } + + // Both clients should receive + var wg sync.WaitGroup + wg.Add(2) + + readMessage := func(conn *websocket.Conn, name string) { + defer wg.Done() + conn.SetReadDeadline(time.Now().Add(time.Second)) + var received realtime.Message + if err := conn.ReadJSON(&received); err != nil { + t.Errorf("%s failed to read message: %v", name, err) + return + } + } + + go readMessage(conn1, "client1") + go readMessage(conn2, "client2") + wg.Wait() +} + +func TestWebSocket_Stats(t *testing.T) { + hub := newTestHub(t) + server := newTestServer(t, hub) + defer server.Close() + + // Check stats before any connections + resp, err := http.Get(server.URL + "/ws/stats") + if err != nil { + t.Fatalf("failed to get stats: %v", err) + } + resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + t.Errorf("expected status 200, got %d", resp.StatusCode) + } + + // Connect a client + conn, _, err := websocket.DefaultDialer.Dial(wsURL(server, "/ws"), nil) + if err != nil { + t.Fatalf("failed to connect: %v", err) + } + defer conn.Close() + + // Give hub time to register + time.Sleep(50 * time.Millisecond) + + // Check stats after connection + resp, err = http.Get(server.URL + "/ws/stats") + if err != nil { + t.Fatalf("failed to get stats: %v", err) + } + defer resp.Body.Close() + + var result map[string]any + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + t.Fatalf("failed to decode stats: %v", err) + } + + data, ok := result["data"].(map[string]any) + if !ok { + t.Fatal("expected 'data' field in response") + } + + count, ok := data["total_connections"].(float64) + if !ok { + t.Fatal("expected 'total_connections' in data") + } + + if int(count) != 1 { + t.Errorf("expected 1 connection in stats, got %d", int(count)) + } +} + +func TestWebSocket_Disconnect(t *testing.T) { + hub := newTestHub(t) + server := newTestServer(t, hub) + defer server.Close() + + // Connect + conn, _, err := websocket.DefaultDialer.Dial(wsURL(server, "/ws"), nil) + if err != nil { + t.Fatalf("failed to connect: %v", err) + } + + // Give hub time to register + time.Sleep(50 * time.Millisecond) + + if hub.ConnectionCount() != 1 { + t.Fatalf("expected 1 connection, got %d", hub.ConnectionCount()) + } + + // Close connection + conn.Close() + + // Give hub time to unregister + time.Sleep(100 * time.Millisecond) + + if hub.ConnectionCount() != 0 { + t.Errorf("expected 0 connections after disconnect, got %d", hub.ConnectionCount()) + } +} + +func TestWebSocket_PingPong(t *testing.T) { + hub := newTestHub(t) + server := newTestServer(t, hub) + defer server.Close() + + conn, _, err := websocket.DefaultDialer.Dial(wsURL(server, "/ws"), nil) + if err != nil { + t.Fatalf("failed to connect: %v", err) + } + defer conn.Close() + + // Send ping message + msg := realtime.Message{ + Type: realtime.MessageTypePing, + } + if err := conn.WriteJSON(msg); err != nil { + t.Fatalf("failed to send ping: %v", err) + } + + // Should receive pong + conn.SetReadDeadline(time.Now().Add(time.Second)) + var pong realtime.Message + if err := conn.ReadJSON(&pong); err != nil { + t.Fatalf("failed to read pong: %v", err) + } + + if pong.Type != realtime.MessageTypePong { + t.Errorf("expected pong message, got %s", pong.Type) + } +} + +func TestWebSocket_RoomIsolation(t *testing.T) { + hub := newTestHub(t) + server := newTestServer(t, hub) + defer server.Close() + + // Connect client to room A + connA, _, err := websocket.DefaultDialer.Dial(wsURL(server, "/ws/room-a"), nil) + if err != nil { + t.Fatalf("client A failed to connect: %v", err) + } + defer connA.Close() + + // Connect client to room B + connB, _, err := websocket.DefaultDialer.Dial(wsURL(server, "/ws/room-b"), nil) + if err != nil { + t.Fatalf("client B failed to connect: %v", err) + } + defer connB.Close() + + // Give hub time to register + time.Sleep(50 * time.Millisecond) + + // Send message to room A + msg := realtime.Message{ + Type: realtime.MessageTypeChat, + Room: "room-a", + Data: json.RawMessage(`{"text":"room A only"}`), + } + if err := connA.WriteJSON(msg); err != nil { + t.Fatalf("failed to send message: %v", err) + } + + // Client A should receive it + connA.SetReadDeadline(time.Now().Add(500 * time.Millisecond)) + var received realtime.Message + if err := connA.ReadJSON(&received); err != nil { + t.Errorf("client A should receive message: %v", err) + } + + // Client B should NOT receive it (timeout expected) + connB.SetReadDeadline(time.Now().Add(200 * time.Millisecond)) + if err := connB.ReadJSON(&received); err == nil { + t.Error("client B should NOT receive room-a message") + } +} diff --git a/services/chat-api/internal/api/routes.go b/services/chat-api/internal/api/routes.go index 84054d1..1a33eee 100644 --- a/services/chat-api/internal/api/routes.go +++ b/services/chat-api/internal/api/routes.go @@ -2,8 +2,12 @@ package api import ( + "net/http" + "git.threesix.ai/jordan/sp3-solo-1770327084/pkg/app" "git.threesix.ai/jordan/sp3-solo-1770327084/pkg/auth" + "git.threesix.ai/jordan/sp3-solo-1770327084/pkg/httpresponse" + "git.threesix.ai/jordan/sp3-solo-1770327084/pkg/realtime" "git.threesix.ai/jordan/sp3-solo-1770327084/services/chat-api/internal/api/handlers" "git.threesix.ai/jordan/sp3-solo-1770327084/services/chat-api/internal/config" "git.threesix.ai/jordan/sp3-solo-1770327084/services/chat-api/internal/service" @@ -14,7 +18,13 @@ import ( // This allows the monorepo to expose multiple services under a single domain: // - https://domain/api/chat-api/health // - https://domain/api/chat-api/examples -func RegisterRoutes(application *app.App, exampleService *service.ExampleService) { +// - https://domain/api/chat-api/ws (WebSocket) +func RegisterRoutes( + application *app.App, + exampleService *service.ExampleService, + hub realtime.Hub, + broadcaster realtime.Broadcaster, +) { logger := application.Logger() cfg := config.Load() @@ -22,6 +32,12 @@ func RegisterRoutes(application *app.App, exampleService *service.ExampleService healthHandler := handlers.NewHealth(logger) exampleHandler := handlers.NewExample(exampleService, logger) + // Initialize WebSocket handler + wsHandler := realtime.NewHandler(hub, logger, realtime.HandlerConfig{ + Broadcaster: broadcaster, + AuthRequired: cfg.AuthEnabled, + }) + // Build and mount OpenAPI spec spec := NewServiceSpec() application.EnableDocs(spec) @@ -35,6 +51,17 @@ func RegisterRoutes(application *app.App, exampleService *service.ExampleService r.Get("/examples", app.Wrap(exampleHandler.List)) r.Get("/examples/{id}", app.Wrap(exampleHandler.Get)) + // WebSocket routes + // GET /ws - connect to global channel + // GET /ws/{room} - connect and join a room + r.Mount("/ws", wsHandler.Routes()) + + // WebSocket stats endpoint + r.Get("/ws/stats", func(w http.ResponseWriter, r *http.Request) { + stats := wsHandler.GetStats() + httpresponse.OK(w, r, stats) + }) + // Protected routes (auth required when enabled) r.Group(func(r app.Router) { if cfg.AuthEnabled { diff --git a/services/chat-api/internal/api/spec.go b/services/chat-api/internal/api/spec.go index 12cad43..15c19e7 100644 --- a/services/chat-api/internal/api/spec.go +++ b/services/chat-api/internal/api/spec.go @@ -8,7 +8,8 @@ func NewServiceSpec() *openapi.OpenAPISpec { WithDescription("REST API for the chat-api service"). WithBearerSecurity("bearer", "JWT authentication token"). WithTag("Health", "Service health endpoints"). - WithTag("Examples", "Example CRUD endpoints") + WithTag("Examples", "Example CRUD endpoints"). + WithTag("WebSocket", "Real-time WebSocket endpoints") // Define reusable schemas spec.WithSchema("Example", openapi.Object(map[string]openapi.Schema{ @@ -108,5 +109,65 @@ func NewServiceSpec() *openapi.OpenAPISpec { }, }) + // WebSocket schemas + spec.WithSchema("WebSocketMessage", openapi.Object(map[string]openapi.Schema{ + "id": openapi.UUID().WithDescription("Unique message identifier"), + "type": openapi.String().WithDescription("Message type: chat, presence, notification, system, error"), + "room": openapi.String().WithDescription("Target room (empty for global messages)"), + "from": openapi.String().WithDescription("Sender's client ID"), + "data": openapi.Object(nil).WithDescription("Message payload"), + "timestamp": openapi.DateTime().WithDescription("Message timestamp"), + }, "type")) + + spec.WithSchema("WebSocketStats", openapi.Object(map[string]openapi.Schema{ + "total_connections": openapi.Integer().WithDescription("Total active WebSocket connections"), + "room_counts": openapi.Object(nil).WithDescription("Connection count per room"), + }, "total_connections")) + + // WebSocket upgrade endpoint (documented for reference, actual upgrade is protocol switch) + spec.AddPath("/api/chat-api/ws", "get", map[string]any{ + "summary": "WebSocket connection", + "description": "Upgrades HTTP to WebSocket for real-time chat. Connect to global channel.", + "tags": []string{"WebSocket"}, + "responses": map[string]any{ + "101": map[string]any{ + "description": "Switching Protocols - WebSocket upgrade successful", + }, + "401": openapi.OpResponse("Unauthorized (when AUTH_ENABLED=true)", openapi.ErrorResponseSchema()), + }, + }) + + // WebSocket with room + spec.AddPath("/api/chat-api/ws/{room}", "get", map[string]any{ + "summary": "WebSocket connection with room", + "description": "Upgrades HTTP to WebSocket and joins the specified room.", + "tags": []string{"WebSocket"}, + "parameters": []any{ + map[string]any{ + "name": "room", + "in": "path", + "description": "Room name to join on connect", + "required": true, + "schema": openapi.String(), + }, + }, + "responses": map[string]any{ + "101": map[string]any{ + "description": "Switching Protocols - WebSocket upgrade successful", + }, + "401": openapi.OpResponse("Unauthorized (when AUTH_ENABLED=true)", openapi.ErrorResponseSchema()), + }, + }) + + // WebSocket stats endpoint + spec.AddPath("/api/chat-api/ws/stats", "get", map[string]any{ + "summary": "WebSocket statistics", + "description": "Returns current WebSocket connection statistics.", + "tags": []string{"WebSocket"}, + "responses": map[string]any{ + "200": openapi.OpResponse("Success", openapi.ResponseSchema(openapi.Ref("WebSocketStats"))), + }, + }) + return spec } diff --git a/services/chat-api/internal/config/config.go b/services/chat-api/internal/config/config.go index 3749cba..bf54b10 100644 --- a/services/chat-api/internal/config/config.go +++ b/services/chat-api/internal/config/config.go @@ -18,6 +18,9 @@ type Config struct { // Auth AuthEnabled bool JWTSecret string + + // Redis + RedisURL string } // Load reads configuration from environment variables. @@ -30,5 +33,7 @@ func Load() *Config { AuthEnabled: strings.EqualFold(os.Getenv("AUTH_ENABLED"), "true"), JWTSecret: os.Getenv("JWT_SECRET"), + + RedisURL: os.Getenv("REDIS_URL"), } }