diff --git a/.sdlc/branches/feature/websocket-chat.yaml b/.sdlc/branches/feature/websocket-chat.yaml new file mode 100644 index 0000000..c4445f6 --- /dev/null +++ b/.sdlc/branches/feature/websocket-chat.yaml @@ -0,0 +1,4 @@ +name: feature/websocket-chat +feature: websocket-chat +base_branch: main +created_at: 2026-02-05T21:46:00.799440711Z diff --git a/.sdlc/config.yaml b/.sdlc/config.yaml new file mode 100644 index 0000000..0d9e993 --- /dev/null +++ b/.sdlc/config.yaml @@ -0,0 +1,36 @@ +version: 1 +project: + name: workspace +branches: + main: main + feature_prefix: feature/ +phases: + enabled: + - draft + - specified + - planned + - ready + - implementation + - review + - audit + - qa + - merge + - released + required_artifacts: + audit: + - audit + planned: + - spec + - design + - tasks + - qa_plan + qa: + - qa_results + review: + - review + specified: + - spec +compliance: + require_approvals: true + require_branch: true + require_qa: true diff --git a/.sdlc/features/websocket-chat/design.md b/.sdlc/features/websocket-chat/design.md new file mode 100644 index 0000000..0edd2d5 --- /dev/null +++ b/.sdlc/features/websocket-chat/design.md @@ -0,0 +1,210 @@ +# Technical Design: WebSocket Chat + +## Architecture Overview + +This feature integrates the existing `pkg/realtime` package into the `chat-api` service to provide WebSocket chat functionality with Redis pub/sub for horizontal scaling. + +``` +┌─────────────┐ ┌─────────────┐ ┌─────────────┐ +│ Client 1 │ │ Client 2 │ │ Client N │ +└──────┬──────┘ └──────┬──────┘ └──────┬──────┘ + │ WebSocket │ WebSocket │ WebSocket + ▼ ▼ ▼ +┌──────────────────────────────────────────────────────┐ +│ chat-api Pod 1 │ +│ ┌─────────────────────────────────────────────────┐ │ +│ │ realtime.Handler │ │ +│ │ ┌─────────────────────────────────────────┐ │ │ +│ │ │ LocalHub │ │ │ +│ │ │ - connections map[id]Connection │ │ │ +│ │ │ - rooms map[room]map[id]struct{} │ │ │ +│ │ └─────────────────────────────────────────┘ │ │ +│ └─────────────────────────────────────────────────┘ │ +│ │ │ +│ ┌─────────────────────────────────────────────────┐ │ +│ │ RedisBroadcaster │ │ +│ │ - Publish() → Redis channel │ │ +│ │ - Run() ← Redis PSubscribe │ │ +│ └─────────────────────────────────────────────────┘ │ +└──────────────────────────┬───────────────────────────┘ + │ + ┌────────────┴────────────┐ + │ Redis Pub/Sub │ + │ realtime:global │ + │ realtime:room:{name} │ + └────────────┬────────────┘ + │ +┌──────────────────────────┴───────────────────────────┐ +│ chat-api Pod 2 │ +│ (same structure) │ +└──────────────────────────────────────────────────────┘ +``` + +## Component Design + +### 1. Configuration Extension + +Extend `internal/config/config.go` to include Redis configuration: + +```go +type Config struct { + // ... existing fields ... + + // Redis + RedisURL string +} + +func Load() *Config { + return &Config{ + // ... existing ... + RedisURL: getEnv("REDIS_URL", "redis://localhost:6379"), + } +} +``` + +### 2. Service Bootstrap (main.go) + +Wire up the realtime components in the service entry point: + +```go +func main() { + cfg := config.Load() + ctx := context.Background() + + // Create hub (local connection registry) + hub := realtime.NewHub(logger) + go hub.Run(ctx) + + // Create Redis broadcaster (for multi-pod) + var broadcaster realtime.Broadcaster + if cfg.RedisURL != "" { + redisClient := redis.NewClient(parseRedisURL(cfg.RedisURL)) + broadcaster = realtime.NewRedisBroadcaster(redisClient, hub, logger) + go broadcaster.Run(ctx) + } + + // Register routes with realtime handler + api.RegisterRoutes(application, exampleService, hub, broadcaster) +} +``` + +### 3. Route Registration + +Add WebSocket routes in `internal/api/routes.go`: + +```go +func RegisterRoutes( + application *app.App, + exampleService *service.ExampleService, + hub realtime.Hub, + broadcaster realtime.Broadcaster, +) { + // ... existing routes ... + + // WebSocket handler + wsHandler := realtime.NewHandler(hub, logger, realtime.HandlerConfig{ + Broadcaster: broadcaster, + OnConnect: func(conn realtime.Connection) { + logger.Info("client connected", "id", conn.ID()) + }, + OnDisconnect: func(conn realtime.Connection) { + logger.Info("client disconnected", "id", conn.ID()) + }, + OnMessage: func(conn realtime.Connection, msg *realtime.Message) *realtime.Message { + // Set message type if not specified + if msg.Type == "" { + msg.Type = realtime.MessageTypeChat + } + return msg + }, + AuthRequired: cfg.AuthEnabled, + }) + + // Mount WebSocket routes + application.Mount("/api/chat-api/ws", wsHandler.Routes()) +} +``` + +### 4. Message Flow + +1. **Client → Server (Incoming)** + - Client sends JSON message via WebSocket + - `WSClient.readPump()` decodes message + - `OnMessage` callback transforms/filters + - `RedisBroadcaster.Publish()` sends to Redis channel + +2. **Redis → All Pods (Distribution)** + - Redis delivers to all subscribed pods + - `RedisBroadcaster.Run()` receives via PSubscribe + - Filters out messages from same pod (echo prevention) + - Forwards to `LocalHub.Broadcast()` + +3. **Server → Clients (Outgoing)** + - `LocalHub.doBroadcast()` iterates room connections + - `WSClient.Send()` queues message + - `WSClient.writePump()` writes to WebSocket + +### 5. Redis Channel Naming + +| Pattern | Usage | +|---------|-------| +| `realtime:global` | Messages without room (broadcast to all) | +| `realtime:room:{name}` | Room-specific messages | + +### 6. Graceful Shutdown + +The existing `pkg/realtime` package handles: +- Context cancellation triggers hub shutdown +- Hub closes all connections +- WSClient sends close frame +- Redis broadcaster closes subscription + +## File Changes + +| File | Change | +|------|--------| +| `services/chat-api/internal/config/config.go` | Add `RedisURL` field | +| `services/chat-api/cmd/server/main.go` | Wire hub, broadcaster, Redis client | +| `services/chat-api/internal/api/routes.go` | Mount WebSocket handler | +| `services/chat-api/internal/api/spec.go` | Add WebSocket endpoint documentation | + +## Testing Strategy + +### Unit Tests +- Config loading with Redis URL +- Message transformation in OnMessage callback + +### Integration Tests +- WebSocket connection upgrade +- Message send/receive flow +- Room-based isolation +- Multi-client broadcast + +### Manual Testing +```bash +# Terminal 1: Start server +./scripts/dev.sh + +# Terminal 2: Connect with wscat +wscat -c ws://localhost:8001/api/chat-api/ws + +# Terminal 3: Another client +wscat -c ws://localhost:8001/api/chat-api/ws + +# Send message from Terminal 2, verify received in Terminal 3 +{"type":"chat","data":{"content":"Hello!"}} +``` + +## Security Considerations + +1. **Authentication**: Optional auth via `AUTH_ENABLED` config +2. **Origin Check**: WebSocket upgrader validates Origin header (configurable) +3. **Message Size**: 64KB limit prevents memory exhaustion +4. **Buffer Limits**: 256-message buffer drops excess (prevents slow-client backpressure) + +## Rollout Plan + +1. Deploy with `REDIS_URL` set to enable cross-pod messaging +2. Without `REDIS_URL`, operates in single-pod mode (local broadcast only) +3. No database migrations required +4. Feature is additive (no breaking changes) diff --git a/.sdlc/features/websocket-chat/manifest.yaml b/.sdlc/features/websocket-chat/manifest.yaml new file mode 100644 index 0000000..e46fb5d --- /dev/null +++ b/.sdlc/features/websocket-chat/manifest.yaml @@ -0,0 +1,50 @@ +slug: websocket-chat +title: WebSocket Chat +created: 2026-02-05T21:41:24.394924276Z +branch: feature/websocket-chat +phase: implementation +phase_history: + - phase: draft + entered: 2026-02-05T21:41:24.394924276Z + exited: 2026-02-05T21:45:16.70675913Z + - phase: specified + entered: 2026-02-05T21:45:16.70675913Z + exited: 2026-02-05T21:45:55.781510539Z + - phase: planned + entered: 2026-02-05T21:45:55.781510539Z + exited: 2026-02-05T21:46:05.885128246Z + - phase: ready + entered: 2026-02-05T21:46:05.885128246Z + exited: 2026-02-05T21:46:05.895747946Z + - phase: implementation + entered: 2026-02-05T21:46:05.895747946Z +artifacts: + audit: + status: pending + path: audit.md + design: + status: approved + path: design.md + approved_by: user + approved_at: 2026-02-05T21:44:58.283726232Z + qa_plan: + status: approved + path: qa-plan.md + approved_by: user + approved_at: 2026-02-05T21:45:51.612922454Z + qa_results: + status: pending + path: qa-results.md + review: + status: pending + path: review.md + spec: + status: approved + path: spec.md + approved_by: user + approved_at: 2026-02-05T21:44:57.405079826Z + tasks: + status: approved + path: tasks.md + approved_by: user + approved_at: 2026-02-05T21:44:59.042871875Z diff --git a/.sdlc/features/websocket-chat/qa-plan.md b/.sdlc/features/websocket-chat/qa-plan.md new file mode 100644 index 0000000..39a4c24 --- /dev/null +++ b/.sdlc/features/websocket-chat/qa-plan.md @@ -0,0 +1,38 @@ +# QA Plan: WebSocket Chat + +## Test Scope + +### Unit Tests +| Test | File | Description | +|------|------|-------------| +| Config loading | `internal/config/config_test.go` | Verify RedisURL loads from env | + +### Integration Tests +| Test | File | Description | +|------|------|-------------| +| WebSocket upgrade | `internal/api/handlers/websocket_test.go` | Verify HTTP→WS upgrade | +| Message broadcast | `internal/api/handlers/websocket_test.go` | Verify messages reach all clients | +| Room isolation | `internal/api/handlers/websocket_test.go` | Verify room messages stay in room | +| Ping/pong | `internal/api/handlers/websocket_test.go` | Verify heartbeat mechanism | + +### Manual Tests +| Test | Steps | Expected | +|------|-------|----------| +| Basic chat | 1. Start server
2. Connect two wscat clients
3. Send message from one | Other client receives message | +| Room chat | 1. Connect clients to `/ws/room1`
2. Connect client to `/ws/room2`
3. Send to room1 | Only room1 clients receive | +| Disconnect | 1. Connect client
2. Kill client
3. Check server logs | Clean disconnect logged | + +## Acceptance Criteria +- [ ] WebSocket endpoint accessible at `/api/chat-api/ws` +- [ ] Messages broadcast to all connected clients +- [ ] Room-based messaging isolates conversations +- [ ] Connection lifecycle properly managed + +## Test Commands +```bash +# Run all Go tests +go test ./services/chat-api/... -v + +# Manual WebSocket test +wscat -c ws://localhost:8001/api/chat-api/ws +``` diff --git a/.sdlc/features/websocket-chat/spec.md b/.sdlc/features/websocket-chat/spec.md new file mode 100644 index 0000000..bf8f469 --- /dev/null +++ b/.sdlc/features/websocket-chat/spec.md @@ -0,0 +1,105 @@ +# Feature Specification: WebSocket Chat + +## Overview + +Add real-time WebSocket chat capability to the chat-api service. This feature enables bidirectional communication between clients and the server, with message distribution via Redis pub/sub for horizontal scaling across multiple pods. + +## Requirements + +### Core Requirements + +1. **WebSocket Endpoint**: `GET /ws` upgrades HTTP connections to WebSocket +2. **Message Publication**: Incoming messages from clients are published to a Redis channel +3. **Message Broadcasting**: A Redis subscriber broadcasts received messages to all connected clients + +### Functional Requirements + +| ID | Requirement | Priority | +|----|-------------|----------| +| FR-1 | WebSocket endpoint at `GET /api/chat-api/ws` upgrades HTTP to WebSocket | Must | +| FR-2 | WebSocket endpoint at `GET /api/chat-api/ws/{room}` allows joining specific rooms | Must | +| FR-3 | Incoming chat messages are published to Redis pub/sub channel | Must | +| FR-4 | Redis subscriber receives messages and broadcasts to all connected clients in the room | Must | +| FR-5 | Messages without a room are broadcast to all connected clients (global) | Must | +| FR-6 | Automatic ping/pong heartbeat maintains connection health | Must | +| FR-7 | Graceful connection lifecycle with proper cleanup on disconnect | Must | + +### Non-Functional Requirements + +| ID | Requirement | Priority | +|----|-------------|----------| +| NFR-1 | Connection supports 64KB max message size | Must | +| NFR-2 | 60-second pong timeout for dead connection detection | Must | +| NFR-3 | 256-message send buffer per connection | Must | +| NFR-4 | Multi-pod support via Redis pub/sub | Must | +| NFR-5 | Graceful shutdown closes all connections cleanly | Must | + +## Message Format + +Messages follow the standard `realtime.Message` structure: + +```json +{ + "id": "uuid", + "type": "chat", + "room": "room-name", + "from": "client-id", + "data": { "content": "message text" }, + "timestamp": "2024-01-15T10:30:00Z" +} +``` + +### Message Types + +- `chat` - User chat messages +- `presence` - Online/offline status changes +- `notification` - Server notifications +- `system` - System messages +- `error` - Error messages +- `ping`/`pong` - Client-initiated heartbeat + +## API Endpoints + +### WebSocket Connection + +``` +GET /api/chat-api/ws +GET /api/chat-api/ws/{room} +GET /api/chat-api/ws?room={room} +``` + +**Upgrade Headers:** +- `Connection: Upgrade` +- `Upgrade: websocket` + +**Response:** 101 Switching Protocols (on success) + +## Configuration + +| Environment Variable | Description | Default | +|---------------------|-------------|---------| +| `REDIS_URL` | Redis connection URL | `redis://localhost:6379` | +| `AUTH_ENABLED` | Require authentication for WebSocket | `false` | + +## Out of Scope + +- Message persistence/history +- Typing indicators +- Read receipts +- User presence tracking (beyond connection events) +- Rate limiting (future enhancement) + +## Success Criteria + +1. WebSocket connections can be established at `/api/chat-api/ws` +2. Messages sent by one client are received by all other connected clients +3. Room-based messaging isolates conversations +4. Multi-pod deployment correctly distributes messages via Redis +5. Connection cleanup occurs properly on disconnect + +## Dependencies + +- Existing `pkg/realtime` package (WebSocket handling, hub, Redis broadcaster) +- Redis server for pub/sub +- `github.com/gorilla/websocket` (already in go.mod) +- `github.com/redis/go-redis/v9` (already in go.mod) diff --git a/.sdlc/features/websocket-chat/tasks.md b/.sdlc/features/websocket-chat/tasks.md new file mode 100644 index 0000000..6a926a6 --- /dev/null +++ b/.sdlc/features/websocket-chat/tasks.md @@ -0,0 +1,304 @@ +# Implementation Tasks: WebSocket Chat + +## Task Overview + +| ID | Task | Status | Blocked By | +|----|------|--------|------------| +| WS-1 | Extend config with Redis URL | pending | - | +| WS-2 | Wire realtime components in main.go | pending | WS-1 | +| WS-3 | Mount WebSocket handler in routes.go | pending | WS-2 | +| WS-4 | Add WebSocket documentation to spec.go | pending | WS-3 | +| WS-5 | Add integration tests for WebSocket chat | pending | WS-3 | + +--- + +## WS-1: Extend config with Redis URL + +**Status:** pending + +### Description +Add Redis URL configuration to the chat-api service config. + +### Files to Modify +- `services/chat-api/internal/config/config.go` + +### Implementation Details + +Add to the `Config` struct: +```go +// Redis configuration for realtime pub/sub +RedisURL string +``` + +Add to the `Load()` function: +```go +RedisURL: os.Getenv("REDIS_URL"), +``` + +### Acceptance Criteria +- [ ] `Config` struct has `RedisURL` field +- [ ] `Load()` reads from `REDIS_URL` environment variable +- [ ] Empty string is valid (disables Redis, uses local-only broadcast) + +--- + +## WS-2: Wire realtime components in main.go + +**Status:** pending +**Blocked By:** WS-1 + +### Description +Initialize the realtime hub and Redis broadcaster in the service entry point. + +### Files to Modify +- `services/chat-api/cmd/server/main.go` + +### Implementation Details + +1. Import required packages: +```go +import ( + "context" + "github.com/redis/go-redis/v9" + "git.threesix.ai/jordan/sp3-verify-1770325830/pkg/realtime" + "git.threesix.ai/jordan/sp3-verify-1770325830/services/chat-api/internal/config" +) +``` + +2. Load config and create context: +```go +cfg := config.Load() +ctx := context.Background() +``` + +3. Create hub: +```go +hub := realtime.NewHub(logger) +go hub.Run(ctx) +``` + +4. Create Redis broadcaster (optional): +```go +var broadcaster realtime.Broadcaster +if cfg.RedisURL != "" { + opts, err := redis.ParseURL(cfg.RedisURL) + if err != nil { + logger.Error("invalid redis url", "error", err) + } else { + redisClient := redis.NewClient(opts) + broadcaster = realtime.NewRedisBroadcaster(redisClient, hub, logger) + go broadcaster.Run(ctx) + } +} +``` + +5. Pass hub and broadcaster to route registration: +```go +api.RegisterRoutes(application, exampleService, hub, broadcaster) +``` + +### Acceptance Criteria +- [ ] Hub is created and running in a goroutine +- [ ] Redis broadcaster is created when `REDIS_URL` is set +- [ ] Redis broadcaster is nil when `REDIS_URL` is empty +- [ ] Hub and broadcaster are passed to route registration + +--- + +## WS-3: Mount WebSocket handler in routes.go + +**Status:** pending +**Blocked By:** WS-2 + +### Description +Update the route registration to accept realtime components and mount the WebSocket handler. + +### Files to Modify +- `services/chat-api/internal/api/routes.go` + +### Implementation Details + +1. Update function signature: +```go +func RegisterRoutes( + application *app.App, + exampleService *service.ExampleService, + hub realtime.Hub, + broadcaster realtime.Broadcaster, +) { +``` + +2. Import realtime package: +```go +import "git.threesix.ai/jordan/sp3-verify-1770325830/pkg/realtime" +``` + +3. Create and configure WebSocket handler: +```go +wsHandler := realtime.NewHandler(hub, logger, realtime.HandlerConfig{ + Broadcaster: broadcaster, + OnConnect: func(conn realtime.Connection) { + logger.Info("websocket client connected", + "client_id", conn.ID(), + "user_id", conn.UserID(), + ) + }, + OnDisconnect: func(conn realtime.Connection) { + logger.Info("websocket client disconnected", + "client_id", conn.ID(), + ) + }, + OnMessage: func(conn realtime.Connection, msg *realtime.Message) *realtime.Message { + if msg.Type == "" { + msg.Type = realtime.MessageTypeChat + } + return msg + }, + AuthRequired: cfg.AuthEnabled, +}) +``` + +4. Mount WebSocket routes: +```go +application.Mount("/api/chat-api/ws", wsHandler.Routes()) +``` + +### Acceptance Criteria +- [ ] Function signature accepts `hub` and `broadcaster` parameters +- [ ] WebSocket handler is created with appropriate callbacks +- [ ] Handler is mounted at `/api/chat-api/ws` +- [ ] Auth requirement respects `AUTH_ENABLED` config + +--- + +## WS-4: Add WebSocket documentation to spec.go + +**Status:** pending +**Blocked By:** WS-3 + +### Description +Document the WebSocket endpoint in the OpenAPI specification. + +### Files to Modify +- `services/chat-api/internal/api/spec.go` + +### Implementation Details + +Add WebSocket tag: +```go +spec.WithTag("WebSocket", "Real-time WebSocket endpoints") +``` + +Add WebSocket path documentation: +```go +spec.AddPath("/api/chat-api/ws", "get", map[string]any{ + "summary": "WebSocket connection", + "description": "Upgrades to WebSocket for real-time chat. Messages are broadcast to all connected clients via Redis pub/sub.", + "tags": []string{"WebSocket"}, + "responses": map[string]any{ + "101": map[string]any{ + "description": "Switching Protocols - WebSocket upgrade successful", + }, + "401": openapi.OpResponse("Unauthorized - authentication required", nil), + }, +}) + +spec.AddPath("/api/chat-api/ws/{room}", "get", map[string]any{ + "summary": "WebSocket connection to room", + "description": "Upgrades to WebSocket and joins the specified room. Messages are broadcast only to clients in the same room.", + "tags": []string{"WebSocket"}, + "parameters": []map[string]any{ + { + "name": "room", + "in": "path", + "required": true, + "description": "Room identifier to join", + "schema": map[string]any{"type": "string"}, + }, + }, + "responses": map[string]any{ + "101": map[string]any{ + "description": "Switching Protocols - WebSocket upgrade successful", + }, + "401": openapi.OpResponse("Unauthorized - authentication required", nil), + }, +}) +``` + +### Acceptance Criteria +- [ ] WebSocket tag added to spec +- [ ] `/api/chat-api/ws` endpoint documented +- [ ] `/api/chat-api/ws/{room}` endpoint documented with room parameter +- [ ] Response codes documented (101, 401) + +--- + +## WS-5: Add integration tests for WebSocket chat + +**Status:** pending +**Blocked By:** WS-3 + +### Description +Add tests to verify WebSocket functionality. + +### Files to Create +- `services/chat-api/internal/api/handlers/websocket_test.go` + +### Implementation Details + +Test cases: +1. **Connection upgrade** - Verify WebSocket upgrade succeeds +2. **Message broadcast** - Verify messages sent by one client reach others +3. **Room isolation** - Verify room messages don't leak to other rooms +4. **Ping/pong** - Verify heartbeat mechanism + +Example test structure: +```go +package handlers_test + +import ( + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/gorilla/websocket" + "git.threesix.ai/jordan/sp3-verify-1770325830/pkg/logging" + "git.threesix.ai/jordan/sp3-verify-1770325830/pkg/realtime" +) + +func TestWebSocket_Connection(t *testing.T) { + // Create hub + hub := realtime.NewHub(logging.Nop()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go hub.Run(ctx) + + // Create handler + handler := realtime.NewHandler(hub, logging.Nop(), realtime.HandlerConfig{}) + + // Create test server + server := httptest.NewServer(handler.Routes()) + defer server.Close() + + // Connect WebSocket + wsURL := "ws" + server.URL[4:] + "/" + conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil) + if err != nil { + t.Fatalf("dial failed: %v", err) + } + defer conn.Close() + + // Verify connection count + time.Sleep(50 * time.Millisecond) + if hub.ConnectionCount() != 1 { + t.Errorf("expected 1 connection, got %d", hub.ConnectionCount()) + } +} +``` + +### Acceptance Criteria +- [ ] Test WebSocket connection upgrade works +- [ ] Test message broadcast to multiple clients +- [ ] Test room-based message isolation +- [ ] All tests pass diff --git a/.sdlc/state.yaml b/.sdlc/state.yaml new file mode 100644 index 0000000..554cbcc --- /dev/null +++ b/.sdlc/state.yaml @@ -0,0 +1,63 @@ +version: 1 +project: + name: workspace +active_work: + features: + - slug: websocket-chat + branch: feature/websocket-chat + phase: implementation +blocked: [] +last_updated: 2026-02-05T21:46:05.89630832Z +last_action: TRANSITION +last_actor: cli +history: + - timestamp: 2026-02-05T21:41:24.395367921Z + action: CREATE_FEATURE + feature: websocket-chat + actor: cli + result: success + - timestamp: 2026-02-05T21:44:57.410152204Z + action: APPROVE_ARTIFACT + feature: websocket-chat + actor: user + result: success + - timestamp: 2026-02-05T21:44:58.284093042Z + action: APPROVE_ARTIFACT + feature: websocket-chat + actor: user + result: success + - timestamp: 2026-02-05T21:44:59.043340487Z + action: APPROVE_ARTIFACT + feature: websocket-chat + actor: user + result: success + - timestamp: 2026-02-05T21:45:16.7071891Z + action: TRANSITION + feature: websocket-chat + actor: cli + result: success + - timestamp: 2026-02-05T21:45:51.613524427Z + action: APPROVE_ARTIFACT + feature: websocket-chat + actor: user + result: success + - timestamp: 2026-02-05T21:45:55.782110288Z + action: TRANSITION + feature: websocket-chat + actor: cli + result: success + - timestamp: 2026-02-05T21:46:00.807673749Z + action: CREATE_BRANCH + feature: websocket-chat + actor: cli + result: success + - timestamp: 2026-02-05T21:46:05.891255118Z + action: TRANSITION + feature: websocket-chat + actor: cli + result: success + - timestamp: 2026-02-05T21:46:05.896302369Z + action: TRANSITION + feature: websocket-chat + actor: cli + result: success diff --git a/services/chat-api/cmd/server/main.go b/services/chat-api/cmd/server/main.go index 7d0a110..800eb6b 100644 --- a/services/chat-api/cmd/server/main.go +++ b/services/chat-api/cmd/server/main.go @@ -2,17 +2,49 @@ package main import ( + "context" + + "github.com/redis/go-redis/v9" + "git.threesix.ai/jordan/sp3-verify-1770325830/pkg/app" "git.threesix.ai/jordan/sp3-verify-1770325830/pkg/logging" + "git.threesix.ai/jordan/sp3-verify-1770325830/pkg/realtime" "git.threesix.ai/jordan/sp3-verify-1770325830/services/chat-api/internal/adapter/memory" "git.threesix.ai/jordan/sp3-verify-1770325830/services/chat-api/internal/api" + "git.threesix.ai/jordan/sp3-verify-1770325830/services/chat-api/internal/config" "git.threesix.ai/jordan/sp3-verify-1770325830/services/chat-api/internal/service" ) func main() { + // Load configuration + cfg := config.Load() + // Create logger logger := logging.Default() + // Create context for background goroutines + ctx, cancel := context.WithCancel(context.Background()) + + // Create realtime hub (local connection registry) + hub := realtime.NewHub(logger) + go hub.Run(ctx) + + // Create Redis broadcaster for multi-pod scaling (optional) + var broadcaster realtime.Broadcaster + if cfg.RedisURL != "" { + opts, err := redis.ParseURL(cfg.RedisURL) + if err != nil { + logger.Error("invalid redis url", "error", err) + } else { + redisClient := redis.NewClient(opts) + broadcaster = realtime.NewRedisBroadcaster(redisClient, hub, logger) + go broadcaster.Run(ctx) + logger.Info("redis broadcaster enabled", "url", cfg.RedisURL) + } + } else { + logger.Info("redis broadcaster disabled, running in single-pod mode") + } + // Create adapters (repositories) exampleRepo := memory.NewExampleRepository() @@ -22,8 +54,14 @@ func main() { // Create application application := app.New("chat-api", app.WithDefaultPort(8001)) + // Register shutdown hook to cancel context + application.OnShutdown(func(_ context.Context) error { + cancel() + return nil + }) + // Register routes with dependency injection - api.RegisterRoutes(application, exampleService) + api.RegisterRoutes(application, exampleService, hub, broadcaster) // Start server application.Run() diff --git a/services/chat-api/internal/api/handlers/websocket_test.go b/services/chat-api/internal/api/handlers/websocket_test.go new file mode 100644 index 0000000..76e222e --- /dev/null +++ b/services/chat-api/internal/api/handlers/websocket_test.go @@ -0,0 +1,271 @@ +package handlers + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/gorilla/websocket" + + "git.threesix.ai/jordan/sp3-verify-1770325830/pkg/logging" + "git.threesix.ai/jordan/sp3-verify-1770325830/pkg/realtime" +) + +// testSetup creates a hub and handler for WebSocket tests. +func testSetup(t *testing.T) (*realtime.LocalHub, *realtime.Handler, context.CancelFunc) { + t.Helper() + ctx, cancel := context.WithCancel(context.Background()) + logger := logging.Nop() + + hub := realtime.NewHub(logger) + go hub.Run(ctx) + + handler := realtime.NewHandler(hub, logger, realtime.HandlerConfig{ + Broadcaster: nil, // No Redis for tests + AuthRequired: false, + OnMessage: func(conn realtime.Connection, msg *realtime.Message) *realtime.Message { + if msg.Type == "" { + msg.Type = realtime.MessageTypeChat + } + return msg + }, + }) + + return hub, handler, cancel +} + +// dial connects to a WebSocket test server. +func dial(t *testing.T, server *httptest.Server, path string) *websocket.Conn { + t.Helper() + url := "ws" + strings.TrimPrefix(server.URL, "http") + path + conn, _, err := websocket.DefaultDialer.Dial(url, nil) + if err != nil { + t.Fatalf("failed to dial websocket: %v", err) + } + return conn +} + +func TestWebSocket_Connection(t *testing.T) { + hub, handler, cancel := testSetup(t) + defer cancel() + + server := httptest.NewServer(handler.Routes()) + defer server.Close() + + // Connect + conn := dial(t, server, "/") + defer conn.Close() + + // Wait for registration + time.Sleep(50 * time.Millisecond) + + // Verify connection count + if count := hub.ConnectionCount(); count != 1 { + t.Errorf("expected 1 connection, got %d", count) + } + + // Close and verify cleanup + conn.Close() + time.Sleep(50 * time.Millisecond) + + if count := hub.ConnectionCount(); count != 0 { + t.Errorf("expected 0 connections after close, got %d", count) + } +} + +func TestWebSocket_RoomConnection(t *testing.T) { + hub, handler, cancel := testSetup(t) + defer cancel() + + server := httptest.NewServer(handler.Routes()) + defer server.Close() + + // Connect to room + conn := dial(t, server, "/test-room") + defer conn.Close() + + // Wait for registration + time.Sleep(50 * time.Millisecond) + + // Verify room count + if count := hub.RoomCount("test-room"); count != 1 { + t.Errorf("expected 1 connection in room, got %d", count) + } +} + +func TestWebSocket_MessageBroadcast(t *testing.T) { + _, handler, cancel := testSetup(t) + defer cancel() + + server := httptest.NewServer(handler.Routes()) + defer server.Close() + + // Connect two clients + conn1 := dial(t, server, "/") + defer conn1.Close() + + conn2 := dial(t, server, "/") + defer conn2.Close() + + // Wait for registration + time.Sleep(50 * time.Millisecond) + + // Send message from conn1 + msg := realtime.Message{ + Type: realtime.MessageTypeChat, + Data: json.RawMessage(`{"content":"Hello"}`), + } + if err := conn1.WriteJSON(msg); err != nil { + t.Fatalf("failed to send message: %v", err) + } + + // Read from conn2 + conn2.SetReadDeadline(time.Now().Add(2 * time.Second)) + var received realtime.Message + if err := conn2.ReadJSON(&received); err != nil { + t.Fatalf("failed to receive message: %v", err) + } + + if received.Type != realtime.MessageTypeChat { + t.Errorf("expected type %s, got %s", realtime.MessageTypeChat, received.Type) + } +} + +func TestWebSocket_RoomIsolation(t *testing.T) { + _, handler, cancel := testSetup(t) + defer cancel() + + server := httptest.NewServer(handler.Routes()) + defer server.Close() + + // Connect to room1 + conn1 := dial(t, server, "/room1") + defer conn1.Close() + + // Connect to room2 + conn2 := dial(t, server, "/room2") + defer conn2.Close() + + // Wait for registration + time.Sleep(50 * time.Millisecond) + + // Send message to room1 + msg := realtime.Message{ + Type: realtime.MessageTypeChat, + Room: "room1", + Data: json.RawMessage(`{"content":"Room1 Only"}`), + } + if err := conn1.WriteJSON(msg); err != nil { + t.Fatalf("failed to send message: %v", err) + } + + // Set short timeout on conn2 - it should NOT receive the message + conn2.SetReadDeadline(time.Now().Add(200 * time.Millisecond)) + var received realtime.Message + err := conn2.ReadJSON(&received) + + // We expect a timeout error since room2 should not receive room1 messages + if err == nil { + t.Errorf("room2 client received message meant for room1: %+v", received) + } +} + +func TestWebSocket_PingPong(t *testing.T) { + _, handler, cancel := testSetup(t) + defer cancel() + + server := httptest.NewServer(handler.Routes()) + defer server.Close() + + conn := dial(t, server, "/") + defer conn.Close() + + // Wait for registration + time.Sleep(50 * time.Millisecond) + + // Send ping + msg := realtime.Message{ + Type: realtime.MessageTypePing, + } + if err := conn.WriteJSON(msg); err != nil { + t.Fatalf("failed to send ping: %v", err) + } + + // Expect pong + conn.SetReadDeadline(time.Now().Add(2 * time.Second)) + var received realtime.Message + if err := conn.ReadJSON(&received); err != nil { + t.Fatalf("failed to receive pong: %v", err) + } + + if received.Type != realtime.MessageTypePong { + t.Errorf("expected type %s, got %s", realtime.MessageTypePong, received.Type) + } +} + +func TestWebSocket_AuthRequired(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + logger := logging.Nop() + + hub := realtime.NewHub(logger) + go hub.Run(ctx) + + handler := realtime.NewHandler(hub, logger, realtime.HandlerConfig{ + AuthRequired: true, // Require auth + }) + + server := httptest.NewServer(handler.Routes()) + defer server.Close() + + // Try to connect without auth + url := "ws" + strings.TrimPrefix(server.URL, "http") + "/" + _, resp, err := websocket.DefaultDialer.Dial(url, nil) + + // Should fail with 401 + if err == nil { + t.Error("expected connection to fail without auth") + } + if resp != nil && resp.StatusCode != http.StatusUnauthorized { + t.Errorf("expected status 401, got %d", resp.StatusCode) + } +} + +func TestWebSocket_MultipleClients(t *testing.T) { + hub, handler, cancel := testSetup(t) + defer cancel() + + server := httptest.NewServer(handler.Routes()) + defer server.Close() + + // Connect multiple clients + const numClients = 5 + conns := make([]*websocket.Conn, numClients) + for i := 0; i < numClients; i++ { + conns[i] = dial(t, server, "/") + defer conns[i].Close() + } + + // Wait for registration + time.Sleep(100 * time.Millisecond) + + if count := hub.ConnectionCount(); count != numClients { + t.Errorf("expected %d connections, got %d", numClients, count) + } + + // Close half the clients + for i := 0; i < numClients/2; i++ { + conns[i].Close() + } + + time.Sleep(100 * time.Millisecond) + + expected := numClients - numClients/2 + if count := hub.ConnectionCount(); count != expected { + t.Errorf("expected %d connections after partial close, got %d", expected, count) + } +} diff --git a/services/chat-api/internal/api/routes.go b/services/chat-api/internal/api/routes.go index 3e13edb..6e9dace 100644 --- a/services/chat-api/internal/api/routes.go +++ b/services/chat-api/internal/api/routes.go @@ -4,6 +4,7 @@ package api import ( "git.threesix.ai/jordan/sp3-verify-1770325830/pkg/app" "git.threesix.ai/jordan/sp3-verify-1770325830/pkg/auth" + "git.threesix.ai/jordan/sp3-verify-1770325830/pkg/realtime" "git.threesix.ai/jordan/sp3-verify-1770325830/services/chat-api/internal/api/handlers" "git.threesix.ai/jordan/sp3-verify-1770325830/services/chat-api/internal/config" "git.threesix.ai/jordan/sp3-verify-1770325830/services/chat-api/internal/service" @@ -14,7 +15,13 @@ import ( // This allows the monorepo to expose multiple services under a single domain: // - https://domain/api/chat-api/health // - https://domain/api/chat-api/examples -func RegisterRoutes(application *app.App, exampleService *service.ExampleService) { +// - wss://domain/api/chat-api/ws +func RegisterRoutes( + application *app.App, + exampleService *service.ExampleService, + hub realtime.Hub, + broadcaster realtime.Broadcaster, +) { logger := application.Logger() cfg := config.Load() @@ -22,10 +29,36 @@ func RegisterRoutes(application *app.App, exampleService *service.ExampleService healthHandler := handlers.NewHealth(logger) exampleHandler := handlers.NewExample(exampleService, logger) + // Initialize WebSocket handler + wsHandler := realtime.NewHandler(hub, logger, realtime.HandlerConfig{ + Broadcaster: broadcaster, + OnConnect: func(conn realtime.Connection) { + logger.Info("websocket client connected", + "client_id", conn.ID(), + "user_id", conn.UserID(), + ) + }, + OnDisconnect: func(conn realtime.Connection) { + logger.Info("websocket client disconnected", + "client_id", conn.ID(), + ) + }, + OnMessage: func(conn realtime.Connection, msg *realtime.Message) *realtime.Message { + if msg.Type == "" { + msg.Type = realtime.MessageTypeChat + } + return msg + }, + AuthRequired: cfg.AuthEnabled, + }) + // Build and mount OpenAPI spec spec := NewServiceSpec() application.EnableDocs(spec) + // Mount WebSocket routes + application.Mount("/api/chat-api/ws", wsHandler.Routes()) + // Register API routes under /api/{service-name} to match ingress path routing. // The ingress routes /api/chat-api/* to this service. application.Route("/api/chat-api", func(r app.Router) { diff --git a/services/chat-api/internal/api/spec.go b/services/chat-api/internal/api/spec.go index 39d6e04..a904f6f 100644 --- a/services/chat-api/internal/api/spec.go +++ b/services/chat-api/internal/api/spec.go @@ -8,7 +8,8 @@ func NewServiceSpec() *openapi.OpenAPISpec { WithDescription("REST API for the chat-api service"). WithBearerSecurity("bearer", "JWT authentication token"). WithTag("Health", "Service health endpoints"). - WithTag("Examples", "Example CRUD endpoints") + WithTag("Examples", "Example CRUD endpoints"). + WithTag("WebSocket", "Real-time WebSocket endpoints") // Define reusable schemas spec.WithSchema("Example", openapi.Object(map[string]openapi.Schema{ @@ -108,5 +109,50 @@ func NewServiceSpec() *openapi.OpenAPISpec { }, }) + // WebSocket message schema + spec.WithSchema("WebSocketMessage", openapi.Object(map[string]openapi.Schema{ + "id": openapi.String().WithDescription("Message ID (set by server)"), + "type": openapi.String().WithDescription("Message type: chat, presence, notification, system, error, ping, pong"), + "room": openapi.String().WithDescription("Target room (optional)"), + "from": openapi.String().WithDescription("Sender client ID (set by server)"), + "data": openapi.Object(nil).WithDescription("Message payload"), + "timestamp": openapi.DateTime().WithDescription("Message timestamp"), + }, "type")) + + // WebSocket connection + spec.AddPath("/api/chat-api/ws", "get", map[string]any{ + "summary": "WebSocket connection", + "description": "Upgrades to WebSocket for real-time chat. Messages are broadcast to all connected clients via Redis pub/sub. Send JSON messages matching the WebSocketMessage schema.", + "tags": []string{"WebSocket"}, + "responses": map[string]any{ + "101": map[string]any{ + "description": "Switching Protocols - WebSocket upgrade successful", + }, + "401": openapi.OpResponse("Unauthorized - authentication required (when AUTH_ENABLED=true)", openapi.ErrorResponseSchema()), + }, + }) + + // WebSocket connection to room + spec.AddPath("/api/chat-api/ws/{room}", "get", map[string]any{ + "summary": "WebSocket connection to room", + "description": "Upgrades to WebSocket and joins the specified room. Messages are broadcast only to clients in the same room.", + "tags": []string{"WebSocket"}, + "parameters": []any{ + map[string]any{ + "name": "room", + "in": "path", + "required": true, + "description": "Room identifier to join", + "schema": map[string]any{"type": "string"}, + }, + }, + "responses": map[string]any{ + "101": map[string]any{ + "description": "Switching Protocols - WebSocket upgrade successful", + }, + "401": openapi.OpResponse("Unauthorized - authentication required (when AUTH_ENABLED=true)", openapi.ErrorResponseSchema()), + }, + }) + return spec } diff --git a/services/chat-api/internal/config/config.go b/services/chat-api/internal/config/config.go index 69280fa..50f9bbd 100644 --- a/services/chat-api/internal/config/config.go +++ b/services/chat-api/internal/config/config.go @@ -18,6 +18,9 @@ type Config struct { // Auth AuthEnabled bool JWTSecret string + + // Redis configuration for realtime pub/sub + RedisURL string } // Load reads configuration from environment variables. @@ -30,5 +33,6 @@ func Load() *Config { AuthEnabled: strings.EqualFold(os.Getenv("AUTH_ENABLED"), "true"), JWTSecret: os.Getenv("JWT_SECRET"), + RedisURL: os.Getenv("REDIS_URL"), } }