build: /implement-feature websocket-chat --requirements 'GET /ws upgrades to...
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful

This commit is contained in:
rdev-worker 2026-02-05 21:50:17 +00:00
parent a78d9b43d0
commit 42c1444274
13 changed files with 1205 additions and 3 deletions

View File

@ -0,0 +1,4 @@
name: feature/websocket-chat
feature: websocket-chat
base_branch: main
created_at: 2026-02-05T21:46:00.799440711Z

36
.sdlc/config.yaml Normal file
View File

@ -0,0 +1,36 @@
version: 1
project:
name: workspace
branches:
main: main
feature_prefix: feature/
phases:
enabled:
- draft
- specified
- planned
- ready
- implementation
- review
- audit
- qa
- merge
- released
required_artifacts:
audit:
- audit
planned:
- spec
- design
- tasks
- qa_plan
qa:
- qa_results
review:
- review
specified:
- spec
compliance:
require_approvals: true
require_branch: true
require_qa: true

View File

@ -0,0 +1,210 @@
# Technical Design: WebSocket Chat
## Architecture Overview
This feature integrates the existing `pkg/realtime` package into the `chat-api` service to provide WebSocket chat functionality with Redis pub/sub for horizontal scaling.
```
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Client 1 │ │ Client 2 │ │ Client N │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ WebSocket │ WebSocket │ WebSocket
▼ ▼ ▼
┌──────────────────────────────────────────────────────┐
│ chat-api Pod 1 │
│ ┌─────────────────────────────────────────────────┐ │
│ │ realtime.Handler │ │
│ │ ┌─────────────────────────────────────────┐ │ │
│ │ │ LocalHub │ │ │
│ │ │ - connections map[id]Connection │ │ │
│ │ │ - rooms map[room]map[id]struct{} │ │ │
│ │ └─────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ RedisBroadcaster │ │
│ │ - Publish() → Redis channel │ │
│ │ - Run() ← Redis PSubscribe │ │
│ └─────────────────────────────────────────────────┘ │
└──────────────────────────┬───────────────────────────┘
┌────────────┴────────────┐
│ Redis Pub/Sub │
│ realtime:global │
│ realtime:room:{name} │
└────────────┬────────────┘
┌──────────────────────────┴───────────────────────────┐
│ chat-api Pod 2 │
│ (same structure) │
└──────────────────────────────────────────────────────┘
```
## Component Design
### 1. Configuration Extension
Extend `internal/config/config.go` to include Redis configuration:
```go
type Config struct {
// ... existing fields ...
// Redis
RedisURL string
}
func Load() *Config {
return &Config{
// ... existing ...
RedisURL: getEnv("REDIS_URL", "redis://localhost:6379"),
}
}
```
### 2. Service Bootstrap (main.go)
Wire up the realtime components in the service entry point:
```go
func main() {
cfg := config.Load()
ctx := context.Background()
// Create hub (local connection registry)
hub := realtime.NewHub(logger)
go hub.Run(ctx)
// Create Redis broadcaster (for multi-pod)
var broadcaster realtime.Broadcaster
if cfg.RedisURL != "" {
redisClient := redis.NewClient(parseRedisURL(cfg.RedisURL))
broadcaster = realtime.NewRedisBroadcaster(redisClient, hub, logger)
go broadcaster.Run(ctx)
}
// Register routes with realtime handler
api.RegisterRoutes(application, exampleService, hub, broadcaster)
}
```
### 3. Route Registration
Add WebSocket routes in `internal/api/routes.go`:
```go
func RegisterRoutes(
application *app.App,
exampleService *service.ExampleService,
hub realtime.Hub,
broadcaster realtime.Broadcaster,
) {
// ... existing routes ...
// WebSocket handler
wsHandler := realtime.NewHandler(hub, logger, realtime.HandlerConfig{
Broadcaster: broadcaster,
OnConnect: func(conn realtime.Connection) {
logger.Info("client connected", "id", conn.ID())
},
OnDisconnect: func(conn realtime.Connection) {
logger.Info("client disconnected", "id", conn.ID())
},
OnMessage: func(conn realtime.Connection, msg *realtime.Message) *realtime.Message {
// Set message type if not specified
if msg.Type == "" {
msg.Type = realtime.MessageTypeChat
}
return msg
},
AuthRequired: cfg.AuthEnabled,
})
// Mount WebSocket routes
application.Mount("/api/chat-api/ws", wsHandler.Routes())
}
```
### 4. Message Flow
1. **Client → Server (Incoming)**
- Client sends JSON message via WebSocket
- `WSClient.readPump()` decodes message
- `OnMessage` callback transforms/filters
- `RedisBroadcaster.Publish()` sends to Redis channel
2. **Redis → All Pods (Distribution)**
- Redis delivers to all subscribed pods
- `RedisBroadcaster.Run()` receives via PSubscribe
- Filters out messages from same pod (echo prevention)
- Forwards to `LocalHub.Broadcast()`
3. **Server → Clients (Outgoing)**
- `LocalHub.doBroadcast()` iterates room connections
- `WSClient.Send()` queues message
- `WSClient.writePump()` writes to WebSocket
### 5. Redis Channel Naming
| Pattern | Usage |
|---------|-------|
| `realtime:global` | Messages without room (broadcast to all) |
| `realtime:room:{name}` | Room-specific messages |
### 6. Graceful Shutdown
The existing `pkg/realtime` package handles:
- Context cancellation triggers hub shutdown
- Hub closes all connections
- WSClient sends close frame
- Redis broadcaster closes subscription
## File Changes
| File | Change |
|------|--------|
| `services/chat-api/internal/config/config.go` | Add `RedisURL` field |
| `services/chat-api/cmd/server/main.go` | Wire hub, broadcaster, Redis client |
| `services/chat-api/internal/api/routes.go` | Mount WebSocket handler |
| `services/chat-api/internal/api/spec.go` | Add WebSocket endpoint documentation |
## Testing Strategy
### Unit Tests
- Config loading with Redis URL
- Message transformation in OnMessage callback
### Integration Tests
- WebSocket connection upgrade
- Message send/receive flow
- Room-based isolation
- Multi-client broadcast
### Manual Testing
```bash
# Terminal 1: Start server
./scripts/dev.sh
# Terminal 2: Connect with wscat
wscat -c ws://localhost:8001/api/chat-api/ws
# Terminal 3: Another client
wscat -c ws://localhost:8001/api/chat-api/ws
# Send message from Terminal 2, verify received in Terminal 3
{"type":"chat","data":{"content":"Hello!"}}
```
## Security Considerations
1. **Authentication**: Optional auth via `AUTH_ENABLED` config
2. **Origin Check**: WebSocket upgrader validates Origin header (configurable)
3. **Message Size**: 64KB limit prevents memory exhaustion
4. **Buffer Limits**: 256-message buffer drops excess (prevents slow-client backpressure)
## Rollout Plan
1. Deploy with `REDIS_URL` set to enable cross-pod messaging
2. Without `REDIS_URL`, operates in single-pod mode (local broadcast only)
3. No database migrations required
4. Feature is additive (no breaking changes)

View File

@ -0,0 +1,50 @@
slug: websocket-chat
title: WebSocket Chat
created: 2026-02-05T21:41:24.394924276Z
branch: feature/websocket-chat
phase: implementation
phase_history:
- phase: draft
entered: 2026-02-05T21:41:24.394924276Z
exited: 2026-02-05T21:45:16.70675913Z
- phase: specified
entered: 2026-02-05T21:45:16.70675913Z
exited: 2026-02-05T21:45:55.781510539Z
- phase: planned
entered: 2026-02-05T21:45:55.781510539Z
exited: 2026-02-05T21:46:05.885128246Z
- phase: ready
entered: 2026-02-05T21:46:05.885128246Z
exited: 2026-02-05T21:46:05.895747946Z
- phase: implementation
entered: 2026-02-05T21:46:05.895747946Z
artifacts:
audit:
status: pending
path: audit.md
design:
status: approved
path: design.md
approved_by: user
approved_at: 2026-02-05T21:44:58.283726232Z
qa_plan:
status: approved
path: qa-plan.md
approved_by: user
approved_at: 2026-02-05T21:45:51.612922454Z
qa_results:
status: pending
path: qa-results.md
review:
status: pending
path: review.md
spec:
status: approved
path: spec.md
approved_by: user
approved_at: 2026-02-05T21:44:57.405079826Z
tasks:
status: approved
path: tasks.md
approved_by: user
approved_at: 2026-02-05T21:44:59.042871875Z

View File

@ -0,0 +1,38 @@
# QA Plan: WebSocket Chat
## Test Scope
### Unit Tests
| Test | File | Description |
|------|------|-------------|
| Config loading | `internal/config/config_test.go` | Verify RedisURL loads from env |
### Integration Tests
| Test | File | Description |
|------|------|-------------|
| WebSocket upgrade | `internal/api/handlers/websocket_test.go` | Verify HTTP→WS upgrade |
| Message broadcast | `internal/api/handlers/websocket_test.go` | Verify messages reach all clients |
| Room isolation | `internal/api/handlers/websocket_test.go` | Verify room messages stay in room |
| Ping/pong | `internal/api/handlers/websocket_test.go` | Verify heartbeat mechanism |
### Manual Tests
| Test | Steps | Expected |
|------|-------|----------|
| Basic chat | 1. Start server<br>2. Connect two wscat clients<br>3. Send message from one | Other client receives message |
| Room chat | 1. Connect clients to `/ws/room1`<br>2. Connect client to `/ws/room2`<br>3. Send to room1 | Only room1 clients receive |
| Disconnect | 1. Connect client<br>2. Kill client<br>3. Check server logs | Clean disconnect logged |
## Acceptance Criteria
- [ ] WebSocket endpoint accessible at `/api/chat-api/ws`
- [ ] Messages broadcast to all connected clients
- [ ] Room-based messaging isolates conversations
- [ ] Connection lifecycle properly managed
## Test Commands
```bash
# Run all Go tests
go test ./services/chat-api/... -v
# Manual WebSocket test
wscat -c ws://localhost:8001/api/chat-api/ws
```

View File

@ -0,0 +1,105 @@
# Feature Specification: WebSocket Chat
## Overview
Add real-time WebSocket chat capability to the chat-api service. This feature enables bidirectional communication between clients and the server, with message distribution via Redis pub/sub for horizontal scaling across multiple pods.
## Requirements
### Core Requirements
1. **WebSocket Endpoint**: `GET /ws` upgrades HTTP connections to WebSocket
2. **Message Publication**: Incoming messages from clients are published to a Redis channel
3. **Message Broadcasting**: A Redis subscriber broadcasts received messages to all connected clients
### Functional Requirements
| ID | Requirement | Priority |
|----|-------------|----------|
| FR-1 | WebSocket endpoint at `GET /api/chat-api/ws` upgrades HTTP to WebSocket | Must |
| FR-2 | WebSocket endpoint at `GET /api/chat-api/ws/{room}` allows joining specific rooms | Must |
| FR-3 | Incoming chat messages are published to Redis pub/sub channel | Must |
| FR-4 | Redis subscriber receives messages and broadcasts to all connected clients in the room | Must |
| FR-5 | Messages without a room are broadcast to all connected clients (global) | Must |
| FR-6 | Automatic ping/pong heartbeat maintains connection health | Must |
| FR-7 | Graceful connection lifecycle with proper cleanup on disconnect | Must |
### Non-Functional Requirements
| ID | Requirement | Priority |
|----|-------------|----------|
| NFR-1 | Connection supports 64KB max message size | Must |
| NFR-2 | 60-second pong timeout for dead connection detection | Must |
| NFR-3 | 256-message send buffer per connection | Must |
| NFR-4 | Multi-pod support via Redis pub/sub | Must |
| NFR-5 | Graceful shutdown closes all connections cleanly | Must |
## Message Format
Messages follow the standard `realtime.Message` structure:
```json
{
"id": "uuid",
"type": "chat",
"room": "room-name",
"from": "client-id",
"data": { "content": "message text" },
"timestamp": "2024-01-15T10:30:00Z"
}
```
### Message Types
- `chat` - User chat messages
- `presence` - Online/offline status changes
- `notification` - Server notifications
- `system` - System messages
- `error` - Error messages
- `ping`/`pong` - Client-initiated heartbeat
## API Endpoints
### WebSocket Connection
```
GET /api/chat-api/ws
GET /api/chat-api/ws/{room}
GET /api/chat-api/ws?room={room}
```
**Upgrade Headers:**
- `Connection: Upgrade`
- `Upgrade: websocket`
**Response:** 101 Switching Protocols (on success)
## Configuration
| Environment Variable | Description | Default |
|---------------------|-------------|---------|
| `REDIS_URL` | Redis connection URL | `redis://localhost:6379` |
| `AUTH_ENABLED` | Require authentication for WebSocket | `false` |
## Out of Scope
- Message persistence/history
- Typing indicators
- Read receipts
- User presence tracking (beyond connection events)
- Rate limiting (future enhancement)
## Success Criteria
1. WebSocket connections can be established at `/api/chat-api/ws`
2. Messages sent by one client are received by all other connected clients
3. Room-based messaging isolates conversations
4. Multi-pod deployment correctly distributes messages via Redis
5. Connection cleanup occurs properly on disconnect
## Dependencies
- Existing `pkg/realtime` package (WebSocket handling, hub, Redis broadcaster)
- Redis server for pub/sub
- `github.com/gorilla/websocket` (already in go.mod)
- `github.com/redis/go-redis/v9` (already in go.mod)

View File

@ -0,0 +1,304 @@
# Implementation Tasks: WebSocket Chat
## Task Overview
| ID | Task | Status | Blocked By |
|----|------|--------|------------|
| WS-1 | Extend config with Redis URL | pending | - |
| WS-2 | Wire realtime components in main.go | pending | WS-1 |
| WS-3 | Mount WebSocket handler in routes.go | pending | WS-2 |
| WS-4 | Add WebSocket documentation to spec.go | pending | WS-3 |
| WS-5 | Add integration tests for WebSocket chat | pending | WS-3 |
---
## WS-1: Extend config with Redis URL
**Status:** pending
### Description
Add Redis URL configuration to the chat-api service config.
### Files to Modify
- `services/chat-api/internal/config/config.go`
### Implementation Details
Add to the `Config` struct:
```go
// Redis configuration for realtime pub/sub
RedisURL string
```
Add to the `Load()` function:
```go
RedisURL: os.Getenv("REDIS_URL"),
```
### Acceptance Criteria
- [ ] `Config` struct has `RedisURL` field
- [ ] `Load()` reads from `REDIS_URL` environment variable
- [ ] Empty string is valid (disables Redis, uses local-only broadcast)
---
## WS-2: Wire realtime components in main.go
**Status:** pending
**Blocked By:** WS-1
### Description
Initialize the realtime hub and Redis broadcaster in the service entry point.
### Files to Modify
- `services/chat-api/cmd/server/main.go`
### Implementation Details
1. Import required packages:
```go
import (
"context"
"github.com/redis/go-redis/v9"
"git.threesix.ai/jordan/sp3-verify-1770325830/pkg/realtime"
"git.threesix.ai/jordan/sp3-verify-1770325830/services/chat-api/internal/config"
)
```
2. Load config and create context:
```go
cfg := config.Load()
ctx := context.Background()
```
3. Create hub:
```go
hub := realtime.NewHub(logger)
go hub.Run(ctx)
```
4. Create Redis broadcaster (optional):
```go
var broadcaster realtime.Broadcaster
if cfg.RedisURL != "" {
opts, err := redis.ParseURL(cfg.RedisURL)
if err != nil {
logger.Error("invalid redis url", "error", err)
} else {
redisClient := redis.NewClient(opts)
broadcaster = realtime.NewRedisBroadcaster(redisClient, hub, logger)
go broadcaster.Run(ctx)
}
}
```
5. Pass hub and broadcaster to route registration:
```go
api.RegisterRoutes(application, exampleService, hub, broadcaster)
```
### Acceptance Criteria
- [ ] Hub is created and running in a goroutine
- [ ] Redis broadcaster is created when `REDIS_URL` is set
- [ ] Redis broadcaster is nil when `REDIS_URL` is empty
- [ ] Hub and broadcaster are passed to route registration
---
## WS-3: Mount WebSocket handler in routes.go
**Status:** pending
**Blocked By:** WS-2
### Description
Update the route registration to accept realtime components and mount the WebSocket handler.
### Files to Modify
- `services/chat-api/internal/api/routes.go`
### Implementation Details
1. Update function signature:
```go
func RegisterRoutes(
application *app.App,
exampleService *service.ExampleService,
hub realtime.Hub,
broadcaster realtime.Broadcaster,
) {
```
2. Import realtime package:
```go
import "git.threesix.ai/jordan/sp3-verify-1770325830/pkg/realtime"
```
3. Create and configure WebSocket handler:
```go
wsHandler := realtime.NewHandler(hub, logger, realtime.HandlerConfig{
Broadcaster: broadcaster,
OnConnect: func(conn realtime.Connection) {
logger.Info("websocket client connected",
"client_id", conn.ID(),
"user_id", conn.UserID(),
)
},
OnDisconnect: func(conn realtime.Connection) {
logger.Info("websocket client disconnected",
"client_id", conn.ID(),
)
},
OnMessage: func(conn realtime.Connection, msg *realtime.Message) *realtime.Message {
if msg.Type == "" {
msg.Type = realtime.MessageTypeChat
}
return msg
},
AuthRequired: cfg.AuthEnabled,
})
```
4. Mount WebSocket routes:
```go
application.Mount("/api/chat-api/ws", wsHandler.Routes())
```
### Acceptance Criteria
- [ ] Function signature accepts `hub` and `broadcaster` parameters
- [ ] WebSocket handler is created with appropriate callbacks
- [ ] Handler is mounted at `/api/chat-api/ws`
- [ ] Auth requirement respects `AUTH_ENABLED` config
---
## WS-4: Add WebSocket documentation to spec.go
**Status:** pending
**Blocked By:** WS-3
### Description
Document the WebSocket endpoint in the OpenAPI specification.
### Files to Modify
- `services/chat-api/internal/api/spec.go`
### Implementation Details
Add WebSocket tag:
```go
spec.WithTag("WebSocket", "Real-time WebSocket endpoints")
```
Add WebSocket path documentation:
```go
spec.AddPath("/api/chat-api/ws", "get", map[string]any{
"summary": "WebSocket connection",
"description": "Upgrades to WebSocket for real-time chat. Messages are broadcast to all connected clients via Redis pub/sub.",
"tags": []string{"WebSocket"},
"responses": map[string]any{
"101": map[string]any{
"description": "Switching Protocols - WebSocket upgrade successful",
},
"401": openapi.OpResponse("Unauthorized - authentication required", nil),
},
})
spec.AddPath("/api/chat-api/ws/{room}", "get", map[string]any{
"summary": "WebSocket connection to room",
"description": "Upgrades to WebSocket and joins the specified room. Messages are broadcast only to clients in the same room.",
"tags": []string{"WebSocket"},
"parameters": []map[string]any{
{
"name": "room",
"in": "path",
"required": true,
"description": "Room identifier to join",
"schema": map[string]any{"type": "string"},
},
},
"responses": map[string]any{
"101": map[string]any{
"description": "Switching Protocols - WebSocket upgrade successful",
},
"401": openapi.OpResponse("Unauthorized - authentication required", nil),
},
})
```
### Acceptance Criteria
- [ ] WebSocket tag added to spec
- [ ] `/api/chat-api/ws` endpoint documented
- [ ] `/api/chat-api/ws/{room}` endpoint documented with room parameter
- [ ] Response codes documented (101, 401)
---
## WS-5: Add integration tests for WebSocket chat
**Status:** pending
**Blocked By:** WS-3
### Description
Add tests to verify WebSocket functionality.
### Files to Create
- `services/chat-api/internal/api/handlers/websocket_test.go`
### Implementation Details
Test cases:
1. **Connection upgrade** - Verify WebSocket upgrade succeeds
2. **Message broadcast** - Verify messages sent by one client reach others
3. **Room isolation** - Verify room messages don't leak to other rooms
4. **Ping/pong** - Verify heartbeat mechanism
Example test structure:
```go
package handlers_test
import (
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/gorilla/websocket"
"git.threesix.ai/jordan/sp3-verify-1770325830/pkg/logging"
"git.threesix.ai/jordan/sp3-verify-1770325830/pkg/realtime"
)
func TestWebSocket_Connection(t *testing.T) {
// Create hub
hub := realtime.NewHub(logging.Nop())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go hub.Run(ctx)
// Create handler
handler := realtime.NewHandler(hub, logging.Nop(), realtime.HandlerConfig{})
// Create test server
server := httptest.NewServer(handler.Routes())
defer server.Close()
// Connect WebSocket
wsURL := "ws" + server.URL[4:] + "/"
conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
if err != nil {
t.Fatalf("dial failed: %v", err)
}
defer conn.Close()
// Verify connection count
time.Sleep(50 * time.Millisecond)
if hub.ConnectionCount() != 1 {
t.Errorf("expected 1 connection, got %d", hub.ConnectionCount())
}
}
```
### Acceptance Criteria
- [ ] Test WebSocket connection upgrade works
- [ ] Test message broadcast to multiple clients
- [ ] Test room-based message isolation
- [ ] All tests pass

63
.sdlc/state.yaml Normal file
View File

@ -0,0 +1,63 @@
version: 1
project:
name: workspace
active_work:
features:
- slug: websocket-chat
branch: feature/websocket-chat
phase: implementation
blocked: []
last_updated: 2026-02-05T21:46:05.89630832Z
last_action: TRANSITION
last_actor: cli
history:
- timestamp: 2026-02-05T21:41:24.395367921Z
action: CREATE_FEATURE
feature: websocket-chat
actor: cli
result: success
- timestamp: 2026-02-05T21:44:57.410152204Z
action: APPROVE_ARTIFACT
feature: websocket-chat
actor: user
result: success
- timestamp: 2026-02-05T21:44:58.284093042Z
action: APPROVE_ARTIFACT
feature: websocket-chat
actor: user
result: success
- timestamp: 2026-02-05T21:44:59.043340487Z
action: APPROVE_ARTIFACT
feature: websocket-chat
actor: user
result: success
- timestamp: 2026-02-05T21:45:16.7071891Z
action: TRANSITION
feature: websocket-chat
actor: cli
result: success
- timestamp: 2026-02-05T21:45:51.613524427Z
action: APPROVE_ARTIFACT
feature: websocket-chat
actor: user
result: success
- timestamp: 2026-02-05T21:45:55.782110288Z
action: TRANSITION
feature: websocket-chat
actor: cli
result: success
- timestamp: 2026-02-05T21:46:00.807673749Z
action: CREATE_BRANCH
feature: websocket-chat
actor: cli
result: success
- timestamp: 2026-02-05T21:46:05.891255118Z
action: TRANSITION
feature: websocket-chat
actor: cli
result: success
- timestamp: 2026-02-05T21:46:05.896302369Z
action: TRANSITION
feature: websocket-chat
actor: cli
result: success

View File

@ -2,17 +2,49 @@
package main
import (
"context"
"github.com/redis/go-redis/v9"
"git.threesix.ai/jordan/sp3-verify-1770325830/pkg/app"
"git.threesix.ai/jordan/sp3-verify-1770325830/pkg/logging"
"git.threesix.ai/jordan/sp3-verify-1770325830/pkg/realtime"
"git.threesix.ai/jordan/sp3-verify-1770325830/services/chat-api/internal/adapter/memory"
"git.threesix.ai/jordan/sp3-verify-1770325830/services/chat-api/internal/api"
"git.threesix.ai/jordan/sp3-verify-1770325830/services/chat-api/internal/config"
"git.threesix.ai/jordan/sp3-verify-1770325830/services/chat-api/internal/service"
)
func main() {
// Load configuration
cfg := config.Load()
// Create logger
logger := logging.Default()
// Create context for background goroutines
ctx, cancel := context.WithCancel(context.Background())
// Create realtime hub (local connection registry)
hub := realtime.NewHub(logger)
go hub.Run(ctx)
// Create Redis broadcaster for multi-pod scaling (optional)
var broadcaster realtime.Broadcaster
if cfg.RedisURL != "" {
opts, err := redis.ParseURL(cfg.RedisURL)
if err != nil {
logger.Error("invalid redis url", "error", err)
} else {
redisClient := redis.NewClient(opts)
broadcaster = realtime.NewRedisBroadcaster(redisClient, hub, logger)
go broadcaster.Run(ctx)
logger.Info("redis broadcaster enabled", "url", cfg.RedisURL)
}
} else {
logger.Info("redis broadcaster disabled, running in single-pod mode")
}
// Create adapters (repositories)
exampleRepo := memory.NewExampleRepository()
@ -22,8 +54,14 @@ func main() {
// Create application
application := app.New("chat-api", app.WithDefaultPort(8001))
// Register shutdown hook to cancel context
application.OnShutdown(func(_ context.Context) error {
cancel()
return nil
})
// Register routes with dependency injection
api.RegisterRoutes(application, exampleService)
api.RegisterRoutes(application, exampleService, hub, broadcaster)
// Start server
application.Run()

View File

@ -0,0 +1,271 @@
package handlers
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/gorilla/websocket"
"git.threesix.ai/jordan/sp3-verify-1770325830/pkg/logging"
"git.threesix.ai/jordan/sp3-verify-1770325830/pkg/realtime"
)
// testSetup creates a hub and handler for WebSocket tests.
func testSetup(t *testing.T) (*realtime.LocalHub, *realtime.Handler, context.CancelFunc) {
t.Helper()
ctx, cancel := context.WithCancel(context.Background())
logger := logging.Nop()
hub := realtime.NewHub(logger)
go hub.Run(ctx)
handler := realtime.NewHandler(hub, logger, realtime.HandlerConfig{
Broadcaster: nil, // No Redis for tests
AuthRequired: false,
OnMessage: func(conn realtime.Connection, msg *realtime.Message) *realtime.Message {
if msg.Type == "" {
msg.Type = realtime.MessageTypeChat
}
return msg
},
})
return hub, handler, cancel
}
// dial connects to a WebSocket test server.
func dial(t *testing.T, server *httptest.Server, path string) *websocket.Conn {
t.Helper()
url := "ws" + strings.TrimPrefix(server.URL, "http") + path
conn, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
t.Fatalf("failed to dial websocket: %v", err)
}
return conn
}
func TestWebSocket_Connection(t *testing.T) {
hub, handler, cancel := testSetup(t)
defer cancel()
server := httptest.NewServer(handler.Routes())
defer server.Close()
// Connect
conn := dial(t, server, "/")
defer conn.Close()
// Wait for registration
time.Sleep(50 * time.Millisecond)
// Verify connection count
if count := hub.ConnectionCount(); count != 1 {
t.Errorf("expected 1 connection, got %d", count)
}
// Close and verify cleanup
conn.Close()
time.Sleep(50 * time.Millisecond)
if count := hub.ConnectionCount(); count != 0 {
t.Errorf("expected 0 connections after close, got %d", count)
}
}
func TestWebSocket_RoomConnection(t *testing.T) {
hub, handler, cancel := testSetup(t)
defer cancel()
server := httptest.NewServer(handler.Routes())
defer server.Close()
// Connect to room
conn := dial(t, server, "/test-room")
defer conn.Close()
// Wait for registration
time.Sleep(50 * time.Millisecond)
// Verify room count
if count := hub.RoomCount("test-room"); count != 1 {
t.Errorf("expected 1 connection in room, got %d", count)
}
}
func TestWebSocket_MessageBroadcast(t *testing.T) {
_, handler, cancel := testSetup(t)
defer cancel()
server := httptest.NewServer(handler.Routes())
defer server.Close()
// Connect two clients
conn1 := dial(t, server, "/")
defer conn1.Close()
conn2 := dial(t, server, "/")
defer conn2.Close()
// Wait for registration
time.Sleep(50 * time.Millisecond)
// Send message from conn1
msg := realtime.Message{
Type: realtime.MessageTypeChat,
Data: json.RawMessage(`{"content":"Hello"}`),
}
if err := conn1.WriteJSON(msg); err != nil {
t.Fatalf("failed to send message: %v", err)
}
// Read from conn2
conn2.SetReadDeadline(time.Now().Add(2 * time.Second))
var received realtime.Message
if err := conn2.ReadJSON(&received); err != nil {
t.Fatalf("failed to receive message: %v", err)
}
if received.Type != realtime.MessageTypeChat {
t.Errorf("expected type %s, got %s", realtime.MessageTypeChat, received.Type)
}
}
func TestWebSocket_RoomIsolation(t *testing.T) {
_, handler, cancel := testSetup(t)
defer cancel()
server := httptest.NewServer(handler.Routes())
defer server.Close()
// Connect to room1
conn1 := dial(t, server, "/room1")
defer conn1.Close()
// Connect to room2
conn2 := dial(t, server, "/room2")
defer conn2.Close()
// Wait for registration
time.Sleep(50 * time.Millisecond)
// Send message to room1
msg := realtime.Message{
Type: realtime.MessageTypeChat,
Room: "room1",
Data: json.RawMessage(`{"content":"Room1 Only"}`),
}
if err := conn1.WriteJSON(msg); err != nil {
t.Fatalf("failed to send message: %v", err)
}
// Set short timeout on conn2 - it should NOT receive the message
conn2.SetReadDeadline(time.Now().Add(200 * time.Millisecond))
var received realtime.Message
err := conn2.ReadJSON(&received)
// We expect a timeout error since room2 should not receive room1 messages
if err == nil {
t.Errorf("room2 client received message meant for room1: %+v", received)
}
}
func TestWebSocket_PingPong(t *testing.T) {
_, handler, cancel := testSetup(t)
defer cancel()
server := httptest.NewServer(handler.Routes())
defer server.Close()
conn := dial(t, server, "/")
defer conn.Close()
// Wait for registration
time.Sleep(50 * time.Millisecond)
// Send ping
msg := realtime.Message{
Type: realtime.MessageTypePing,
}
if err := conn.WriteJSON(msg); err != nil {
t.Fatalf("failed to send ping: %v", err)
}
// Expect pong
conn.SetReadDeadline(time.Now().Add(2 * time.Second))
var received realtime.Message
if err := conn.ReadJSON(&received); err != nil {
t.Fatalf("failed to receive pong: %v", err)
}
if received.Type != realtime.MessageTypePong {
t.Errorf("expected type %s, got %s", realtime.MessageTypePong, received.Type)
}
}
func TestWebSocket_AuthRequired(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := logging.Nop()
hub := realtime.NewHub(logger)
go hub.Run(ctx)
handler := realtime.NewHandler(hub, logger, realtime.HandlerConfig{
AuthRequired: true, // Require auth
})
server := httptest.NewServer(handler.Routes())
defer server.Close()
// Try to connect without auth
url := "ws" + strings.TrimPrefix(server.URL, "http") + "/"
_, resp, err := websocket.DefaultDialer.Dial(url, nil)
// Should fail with 401
if err == nil {
t.Error("expected connection to fail without auth")
}
if resp != nil && resp.StatusCode != http.StatusUnauthorized {
t.Errorf("expected status 401, got %d", resp.StatusCode)
}
}
func TestWebSocket_MultipleClients(t *testing.T) {
hub, handler, cancel := testSetup(t)
defer cancel()
server := httptest.NewServer(handler.Routes())
defer server.Close()
// Connect multiple clients
const numClients = 5
conns := make([]*websocket.Conn, numClients)
for i := 0; i < numClients; i++ {
conns[i] = dial(t, server, "/")
defer conns[i].Close()
}
// Wait for registration
time.Sleep(100 * time.Millisecond)
if count := hub.ConnectionCount(); count != numClients {
t.Errorf("expected %d connections, got %d", numClients, count)
}
// Close half the clients
for i := 0; i < numClients/2; i++ {
conns[i].Close()
}
time.Sleep(100 * time.Millisecond)
expected := numClients - numClients/2
if count := hub.ConnectionCount(); count != expected {
t.Errorf("expected %d connections after partial close, got %d", expected, count)
}
}

View File

@ -4,6 +4,7 @@ package api
import (
"git.threesix.ai/jordan/sp3-verify-1770325830/pkg/app"
"git.threesix.ai/jordan/sp3-verify-1770325830/pkg/auth"
"git.threesix.ai/jordan/sp3-verify-1770325830/pkg/realtime"
"git.threesix.ai/jordan/sp3-verify-1770325830/services/chat-api/internal/api/handlers"
"git.threesix.ai/jordan/sp3-verify-1770325830/services/chat-api/internal/config"
"git.threesix.ai/jordan/sp3-verify-1770325830/services/chat-api/internal/service"
@ -14,7 +15,13 @@ import (
// This allows the monorepo to expose multiple services under a single domain:
// - https://domain/api/chat-api/health
// - https://domain/api/chat-api/examples
func RegisterRoutes(application *app.App, exampleService *service.ExampleService) {
// - wss://domain/api/chat-api/ws
func RegisterRoutes(
application *app.App,
exampleService *service.ExampleService,
hub realtime.Hub,
broadcaster realtime.Broadcaster,
) {
logger := application.Logger()
cfg := config.Load()
@ -22,10 +29,36 @@ func RegisterRoutes(application *app.App, exampleService *service.ExampleService
healthHandler := handlers.NewHealth(logger)
exampleHandler := handlers.NewExample(exampleService, logger)
// Initialize WebSocket handler
wsHandler := realtime.NewHandler(hub, logger, realtime.HandlerConfig{
Broadcaster: broadcaster,
OnConnect: func(conn realtime.Connection) {
logger.Info("websocket client connected",
"client_id", conn.ID(),
"user_id", conn.UserID(),
)
},
OnDisconnect: func(conn realtime.Connection) {
logger.Info("websocket client disconnected",
"client_id", conn.ID(),
)
},
OnMessage: func(conn realtime.Connection, msg *realtime.Message) *realtime.Message {
if msg.Type == "" {
msg.Type = realtime.MessageTypeChat
}
return msg
},
AuthRequired: cfg.AuthEnabled,
})
// Build and mount OpenAPI spec
spec := NewServiceSpec()
application.EnableDocs(spec)
// Mount WebSocket routes
application.Mount("/api/chat-api/ws", wsHandler.Routes())
// Register API routes under /api/{service-name} to match ingress path routing.
// The ingress routes /api/chat-api/* to this service.
application.Route("/api/chat-api", func(r app.Router) {

View File

@ -8,7 +8,8 @@ func NewServiceSpec() *openapi.OpenAPISpec {
WithDescription("REST API for the chat-api service").
WithBearerSecurity("bearer", "JWT authentication token").
WithTag("Health", "Service health endpoints").
WithTag("Examples", "Example CRUD endpoints")
WithTag("Examples", "Example CRUD endpoints").
WithTag("WebSocket", "Real-time WebSocket endpoints")
// Define reusable schemas
spec.WithSchema("Example", openapi.Object(map[string]openapi.Schema{
@ -108,5 +109,50 @@ func NewServiceSpec() *openapi.OpenAPISpec {
},
})
// WebSocket message schema
spec.WithSchema("WebSocketMessage", openapi.Object(map[string]openapi.Schema{
"id": openapi.String().WithDescription("Message ID (set by server)"),
"type": openapi.String().WithDescription("Message type: chat, presence, notification, system, error, ping, pong"),
"room": openapi.String().WithDescription("Target room (optional)"),
"from": openapi.String().WithDescription("Sender client ID (set by server)"),
"data": openapi.Object(nil).WithDescription("Message payload"),
"timestamp": openapi.DateTime().WithDescription("Message timestamp"),
}, "type"))
// WebSocket connection
spec.AddPath("/api/chat-api/ws", "get", map[string]any{
"summary": "WebSocket connection",
"description": "Upgrades to WebSocket for real-time chat. Messages are broadcast to all connected clients via Redis pub/sub. Send JSON messages matching the WebSocketMessage schema.",
"tags": []string{"WebSocket"},
"responses": map[string]any{
"101": map[string]any{
"description": "Switching Protocols - WebSocket upgrade successful",
},
"401": openapi.OpResponse("Unauthorized - authentication required (when AUTH_ENABLED=true)", openapi.ErrorResponseSchema()),
},
})
// WebSocket connection to room
spec.AddPath("/api/chat-api/ws/{room}", "get", map[string]any{
"summary": "WebSocket connection to room",
"description": "Upgrades to WebSocket and joins the specified room. Messages are broadcast only to clients in the same room.",
"tags": []string{"WebSocket"},
"parameters": []any{
map[string]any{
"name": "room",
"in": "path",
"required": true,
"description": "Room identifier to join",
"schema": map[string]any{"type": "string"},
},
},
"responses": map[string]any{
"101": map[string]any{
"description": "Switching Protocols - WebSocket upgrade successful",
},
"401": openapi.OpResponse("Unauthorized - authentication required (when AUTH_ENABLED=true)", openapi.ErrorResponseSchema()),
},
})
return spec
}

View File

@ -18,6 +18,9 @@ type Config struct {
// Auth
AuthEnabled bool
JWTSecret string
// Redis configuration for realtime pub/sub
RedisURL string
}
// Load reads configuration from environment variables.
@ -30,5 +33,6 @@ func Load() *Config {
AuthEnabled: strings.EqualFold(os.Getenv("AUTH_ENABLED"), "true"),
JWTSecret: os.Getenv("JWT_SECRET"),
RedisURL: os.Getenv("REDIS_URL"),
}
}