Compare commits

..

No commits in common. "feature/websocket-chat" and "main" have entirely different histories.

14 changed files with 3 additions and 1109 deletions

View File

@ -1,4 +0,0 @@
name: feature/websocket-chat
feature: websocket-chat
base_branch: main
created_at: 2026-02-05T21:54:23.43461145Z

View File

@ -1,36 +0,0 @@
version: 1
project:
name: workspace
branches:
main: main
feature_prefix: feature/
phases:
enabled:
- draft
- specified
- planned
- ready
- implementation
- review
- audit
- qa
- merge
- released
required_artifacts:
audit:
- audit
planned:
- spec
- design
- tasks
- qa_plan
qa:
- qa_results
review:
- review
specified:
- spec
compliance:
require_approvals: true
require_branch: true
require_qa: true

View File

@ -1,173 +0,0 @@
# Technical Design: WebSocket Chat with Redis Pub/Sub
## Architecture
The implementation leverages the existing `pkg/realtime` package which provides:
- `LocalHub`: In-memory connection and room management
- `RedisBroadcaster`: Cross-pod message distribution via Redis pub/sub
- `Handler`: HTTP handler for WebSocket upgrade and lifecycle
```
┌─────────────────────────────────────────────────────────────────┐
│ chat-api Service │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌─────────────────┐ │
│ │ WebSocket │────▶│ LocalHub │────▶│ Redis Broadcaster│ │
│ │ Handler │ │ │ │ │ │
│ └──────────────┘ └──────────────┘ └────────┬────────┘ │
│ │ ▲ │ │
│ │ │ ▼ │
│ ▼ │ ┌──────────────┐ │
│ ┌──────────────┐ │ │ Redis │ │
│ │ Clients │ └──────────────│ Pub/Sub │ │
│ │ (WebSocket) │ └──────────────┘ │
│ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
```
## Components
### 1. Configuration (`internal/config/config.go`)
Add Redis URL configuration:
```go
type Config struct {
// ... existing fields
RedisURL string
}
func Load() *Config {
return &Config{
// ... existing
RedisURL: os.Getenv("REDIS_URL"),
}
}
```
### 2. Main Entry Point (`cmd/server/main.go`)
Initialize realtime components with context for graceful shutdown:
```go
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create hub and start event loop
hub := realtime.NewHub(logger)
go hub.Run(ctx)
// Create Redis broadcaster (if configured)
var broadcaster realtime.Broadcaster
if cfg.RedisURL != "" {
redisClient := redis.NewClient(&redis.Options{...})
broadcaster = realtime.NewRedisBroadcaster(redisClient, hub, logger)
go broadcaster.Run(ctx)
}
// Pass to route registration
api.RegisterRoutes(application, exampleService, hub, broadcaster)
}
```
### 3. Route Registration (`internal/api/routes.go`)
Mount WebSocket handler:
```go
func RegisterRoutes(app *app.App, exampleService *service.ExampleService,
hub realtime.Hub, broadcaster realtime.Broadcaster) {
wsHandler := realtime.NewHandler(hub, logger, realtime.HandlerConfig{
Broadcaster: broadcaster,
AuthRequired: cfg.AuthEnabled,
})
app.Route("/api/chat-api", func(r app.Router) {
// ... existing routes
// WebSocket routes
r.Mount("/ws", wsHandler.Routes())
})
}
```
### 4. WebSocket Handler (`pkg/realtime/handler.go`)
The existing handler already provides:
- `Routes()` returning Chi router with `GET /` and `GET /{room}`
- `HandleWebSocket()` for upgrade and lifecycle
- `GetStats()` for connection statistics
Add stats endpoint in routes.go:
```go
r.Get("/ws/stats", func(w http.ResponseWriter, r *http.Request) {
stats := wsHandler.GetStats()
httpresponse.OK(w, r, stats)
})
```
## Message Flow
### Outbound (Client → Server → Redis → All Pods)
1. Client sends JSON message via WebSocket
2. `WSClient.readPump()` decodes message
3. `Handler.makeMessageHandler()` processes message
4. If broadcaster configured: `Broadcaster.Publish()` to Redis
5. Redis distributes to all subscribed pods
6. Each pod's `RedisBroadcaster.Run()` receives message
7. `Hub.Broadcast()` delivers to local connections
### Inbound (Redis → Server → Clients)
1. `RedisBroadcaster.Run()` subscribes to Redis channels
2. Receives message, skips if from same pod (echo prevention)
3. Calls `Hub.Broadcast()` with message
4. `LocalHub.doBroadcast()` delivers to room or all connections
5. Each `Connection.Send()` queues to client's send buffer
6. `WSClient.writePump()` writes to WebSocket
## Redis Channel Structure
- `realtime:global` - Messages without room targeting
- `realtime:room:{room}` - Messages for specific room
## Configuration
Environment variables:
| Variable | Description | Default |
|----------|-------------|---------|
| `REDIS_URL` | Redis connection URL | (empty = local-only mode) |
| `AUTH_ENABLED` | Require authentication for WebSocket | `false` |
## Graceful Shutdown
1. Server receives SIGTERM/SIGINT
2. Context cancelled
3. `Hub.Run()` exits, closes all connections
4. `RedisBroadcaster.Run()` closes Redis subscription
5. Server shutdown completes
## Files to Modify
1. `services/chat-api/internal/config/config.go` - Add RedisURL
2. `services/chat-api/cmd/server/main.go` - Initialize hub/broadcaster
3. `services/chat-api/internal/api/routes.go` - Mount WebSocket handler
4. `services/chat-api/.env.example` - Add REDIS_URL
## Files to Create
1. `services/chat-api/internal/api/handlers/ws.go` - Stats handler wrapper
2. `services/chat-api/internal/api/handlers/ws_test.go` - WebSocket tests
## Dependencies
Already available in `pkg/go.mod`:
- `github.com/gorilla/websocket v1.5.3`
- `github.com/redis/go-redis/v9 v9.7.0`

View File

@ -1,50 +0,0 @@
slug: websocket-chat
title: WebSocket Chat with Redis Pub/Sub
created: 2026-02-05T21:52:07.328706369Z
branch: feature/websocket-chat
phase: implementation
phase_history:
- phase: draft
entered: 2026-02-05T21:52:07.328706369Z
exited: 2026-02-05T21:53:53.812434642Z
- phase: specified
entered: 2026-02-05T21:53:53.812434642Z
exited: 2026-02-05T21:54:20.249171969Z
- phase: planned
entered: 2026-02-05T21:54:20.249171969Z
exited: 2026-02-05T21:54:29.234820907Z
- phase: ready
entered: 2026-02-05T21:54:29.234820907Z
exited: 2026-02-05T21:54:29.239877785Z
- phase: implementation
entered: 2026-02-05T21:54:29.239877785Z
artifacts:
audit:
status: pending
path: audit.md
design:
status: approved
path: design.md
approved_by: user
approved_at: 2026-02-05T21:53:28.832971074Z
qa_plan:
status: approved
path: qa-plan.md
approved_by: user
approved_at: 2026-02-05T21:54:13.064966346Z
qa_results:
status: pending
path: qa-results.md
review:
status: pending
path: review.md
spec:
status: approved
path: spec.md
approved_by: user
approved_at: 2026-02-05T21:53:27.872265485Z
tasks:
status: approved
path: tasks.md
approved_by: user
approved_at: 2026-02-05T21:53:29.394129418Z

View File

@ -1,46 +0,0 @@
# QA Plan: WebSocket Chat with Redis Pub/Sub
## Test Categories
### 1. Unit Tests
| Test | Description | Location |
|------|-------------|----------|
| Config loading | RedisURL read from environment | `config/config_test.go` |
| Hub registration | Connections register/unregister | `pkg/realtime/hub_test.go` |
| Room management | Join/leave room operations | `pkg/realtime/hub_test.go` |
| Message broadcast | Room and global broadcast | `pkg/realtime/hub_test.go` |
### 2. Integration Tests
| Test | Description | Location |
|------|-------------|----------|
| WebSocket upgrade | GET /ws returns 101 Switching Protocols | `handlers/ws_test.go` |
| Room join URL | GET /ws/{room} joins room | `handlers/ws_test.go` |
| Room join query | GET /ws?room=x joins room | `handlers/ws_test.go` |
| Message echo | Client receives own messages | `handlers/ws_test.go` |
| Multi-client broadcast | All clients receive messages | `handlers/ws_test.go` |
| Stats endpoint | GET /ws/stats returns counts | `handlers/ws_test.go` |
| Connection cleanup | Disconnected clients removed | `handlers/ws_test.go` |
### 3. Manual Tests
| Test | Steps | Expected |
|------|-------|----------|
| wscat connection | `wscat -c ws://localhost:8001/api/chat-api/ws` | Connection established |
| Send message | Send JSON `{"type":"chat","data":{"text":"hello"}}` | Message echoed back |
| Room broadcast | Two clients join same room, one sends | Both receive message |
| Stats check | `curl localhost:8001/api/chat-api/ws/stats` | Returns connection count |
## Test Environment
- Redis running via docker-compose
- Service running on port 8001
- WebSocket client: wscat or websocat
## Pass Criteria
- All unit tests pass
- All integration tests pass
- Manual WebSocket connection test successful
- Message broadcast verified with multiple clients

View File

@ -1,73 +0,0 @@
# Feature Specification: WebSocket Chat with Redis Pub/Sub
## Overview
Implement a WebSocket endpoint at `GET /ws` that upgrades HTTP connections to WebSocket for real-time chat functionality. Messages received from clients are published to Redis channels, and a Redis subscriber broadcasts incoming messages to all connected clients.
## Requirements
### Functional Requirements
1. **WebSocket Upgrade Endpoint**
- `GET /api/chat-api/ws` upgrades HTTP to WebSocket
- `GET /api/chat-api/ws/{room}` joins a specific room on connect
- Support optional `?room=` query parameter for room selection
2. **Message Publishing**
- Incoming WebSocket messages are published to Redis pub/sub channels
- Room-specific messages go to `realtime:room:{room}` channel
- Global messages (no room) go to `realtime:global` channel
3. **Message Broadcasting**
- Redis subscriber receives messages from all channels
- Messages are broadcast to all connected WebSocket clients
- Room-targeted messages only go to clients in that room
- Cross-pod broadcasting ensures messages reach all service instances
4. **Message Format**
```json
{
"id": "uuid",
"type": "chat|presence|notification|system",
"room": "room-name",
"from": "client-id",
"data": {},
"timestamp": "2026-02-05T00:00:00Z"
}
```
### Non-Functional Requirements
1. **Connection Management**
- Automatic ping/pong heartbeat (60s timeout)
- Graceful connection cleanup on disconnect
- Maximum message size: 64KB
- Send buffer size: 256 messages
2. **Scalability**
- Multi-pod deployment via Redis pub/sub
- Pod ID tracking to prevent message echo
3. **Configuration**
- Redis URL via `REDIS_URL` environment variable
- Auth optional via existing `AUTH_ENABLED` flag
## Acceptance Criteria
- [ ] WebSocket endpoint accessible at `/api/chat-api/ws`
- [ ] Room-based WebSocket endpoint at `/api/chat-api/ws/{room}`
- [ ] Messages published to Redis channels
- [ ] Redis subscriber broadcasts to connected clients
- [ ] Room-targeted messages only reach room members
- [ ] Global messages reach all connected clients
- [ ] Connection stats endpoint at `/api/chat-api/ws/stats`
- [ ] Graceful shutdown of WebSocket connections
- [ ] Tests cover WebSocket handler and Redis integration
## Out of Scope
- Message persistence (database storage)
- Message history retrieval
- User authentication enforcement (auth remains optional)
- Rate limiting
- Message encryption

View File

@ -1,142 +0,0 @@
# Implementation Tasks: WebSocket Chat with Redis Pub/Sub
## Task 1: Add Redis Configuration
**ID:** `redis-config`
**Scope:** Add Redis URL to service configuration
**Files:**
- `services/chat-api/internal/config/config.go`
- `services/chat-api/.env.example`
**Changes:**
1. Add `RedisURL string` field to Config struct
2. Read from `REDIS_URL` environment variable
3. Add `REDIS_URL` to `.env.example` with comment
**Acceptance:**
- Config struct has RedisURL field
- Load() reads REDIS_URL from environment
- .env.example documents the configuration
---
## Task 2: Initialize Hub and Broadcaster
**ID:** `hub-init`
**Blocked By:** `redis-config`
**Scope:** Initialize realtime components in main.go
**Files:**
- `services/chat-api/cmd/server/main.go`
**Changes:**
1. Add context with cancel for graceful shutdown
2. Create LocalHub and start event loop
3. Create RedisBroadcaster if RedisURL configured
4. Update RegisterRoutes call to pass hub and broadcaster
5. Register shutdown hook to cancel context
**Acceptance:**
- Hub event loop runs in goroutine
- Redis broadcaster runs if configured
- Context cancelled on shutdown
- Service starts without errors
---
## Task 3: Mount WebSocket Handler
**ID:** `ws-routes`
**Blocked By:** `hub-init`
**Scope:** Register WebSocket routes in routes.go
**Files:**
- `services/chat-api/internal/api/routes.go`
**Changes:**
1. Update RegisterRoutes signature to accept Hub and Broadcaster
2. Create realtime.Handler with configuration
3. Mount handler at `/ws` path under `/api/chat-api`
4. Add stats endpoint at `/ws/stats`
**Acceptance:**
- WebSocket upgrade works at `/api/chat-api/ws`
- Room-based endpoint at `/api/chat-api/ws/{room}`
- Stats endpoint returns connection count
- Auth middleware applied when AUTH_ENABLED=true
---
## Task 4: Add WebSocket OpenAPI Documentation
**ID:** `ws-openapi`
**Blocked By:** `ws-routes`
**Scope:** Document WebSocket endpoints in OpenAPI spec
**Files:**
- `services/chat-api/internal/api/spec.go`
**Changes:**
1. Add `/ws` endpoint documentation (upgrade endpoint)
2. Add `/ws/{room}` endpoint documentation
3. Add `/ws/stats` endpoint with response schema
4. Document WebSocket message format in schemas
**Acceptance:**
- OpenAPI spec includes WebSocket endpoints
- Stats endpoint has proper response schema
- Message format documented
---
## Task 5: Create WebSocket Integration Tests
**ID:** `ws-tests`
**Blocked By:** `ws-routes`
**Scope:** Test WebSocket handler functionality
**Files:**
- `services/chat-api/internal/api/handlers/ws_test.go`
**Tests:**
1. WebSocket connection upgrade succeeds
2. Room join via URL parameter works
3. Room join via query parameter works
4. Message broadcast to room members
5. Global message broadcast to all clients
6. Stats endpoint returns correct counts
7. Connection cleanup on disconnect
**Acceptance:**
- All tests pass
- Coverage includes happy path and error cases
- Tests use httptest for server simulation
---
## Task Summary
| ID | Task | Blocked By | Status |
|----|------|------------|--------|
| `redis-config` | Add Redis Configuration | - | pending |
| `hub-init` | Initialize Hub and Broadcaster | redis-config | pending |
| `ws-routes` | Mount WebSocket Handler | hub-init | pending |
| `ws-openapi` | Add WebSocket OpenAPI Documentation | ws-routes | pending |
| `ws-tests` | Create WebSocket Integration Tests | ws-routes | pending |
## Dependency Graph
```
redis-config
hub-init
ws-routes
├─────────┐
▼ ▼
ws-openapi ws-tests
```

View File

@ -1,63 +0,0 @@
version: 1
project:
name: workspace
active_work:
features:
- slug: websocket-chat
branch: feature/websocket-chat
phase: implementation
blocked: []
last_updated: 2026-02-05T21:54:29.240473436Z
last_action: TRANSITION
last_actor: cli
history:
- timestamp: 2026-02-05T21:52:07.329080213Z
action: CREATE_FEATURE
feature: websocket-chat
actor: cli
result: success
- timestamp: 2026-02-05T21:53:27.872711534Z
action: APPROVE_ARTIFACT
feature: websocket-chat
actor: user
result: success
- timestamp: 2026-02-05T21:53:28.83345252Z
action: APPROVE_ARTIFACT
feature: websocket-chat
actor: user
result: success
- timestamp: 2026-02-05T21:53:29.394664165Z
action: APPROVE_ARTIFACT
feature: websocket-chat
actor: user
result: success
- timestamp: 2026-02-05T21:53:53.812877094Z
action: TRANSITION
feature: websocket-chat
actor: cli
result: success
- timestamp: 2026-02-05T21:54:13.065523174Z
action: APPROVE_ARTIFACT
feature: websocket-chat
actor: user
result: success
- timestamp: 2026-02-05T21:54:20.24969848Z
action: TRANSITION
feature: websocket-chat
actor: cli
result: success
- timestamp: 2026-02-05T21:54:23.438644211Z
action: CREATE_BRANCH
feature: websocket-chat
actor: cli
result: success
- timestamp: 2026-02-05T21:54:29.235335345Z
action: TRANSITION
feature: websocket-chat
actor: cli
result: success
- timestamp: 2026-02-05T21:54:29.240466754Z
action: TRANSITION
feature: websocket-chat
actor: cli
result: success

View File

@ -19,6 +19,3 @@ JWT_SECRET=dev-secret-change-in-production
# Database (if needed)
DATABASE_URL=postgres://dev:dev@localhost:5432/sp3-solo-1770327084?sslmode=disable
# Redis (for WebSocket cross-pod broadcasting, empty = local-only mode)
REDIS_URL=redis://localhost:6379/0

View File

@ -2,16 +2,10 @@
package main
import (
"context"
"github.com/redis/go-redis/v9"
"git.threesix.ai/jordan/sp3-solo-1770327084/pkg/app"
"git.threesix.ai/jordan/sp3-solo-1770327084/pkg/logging"
"git.threesix.ai/jordan/sp3-solo-1770327084/pkg/realtime"
"git.threesix.ai/jordan/sp3-solo-1770327084/services/chat-api/internal/adapter/memory"
"git.threesix.ai/jordan/sp3-solo-1770327084/services/chat-api/internal/api"
"git.threesix.ai/jordan/sp3-solo-1770327084/services/chat-api/internal/config"
"git.threesix.ai/jordan/sp3-solo-1770327084/services/chat-api/internal/service"
)
@ -19,48 +13,17 @@ func main() {
// Create logger
logger := logging.Default()
// Load configuration
cfg := config.Load()
// Create context for graceful shutdown of background goroutines
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create adapters (repositories)
exampleRepo := memory.NewExampleRepository()
// Create services (business logic)
exampleService := service.NewExampleService(exampleRepo, logger)
// Create WebSocket hub and start event loop
hub := realtime.NewHub(logger)
go hub.Run(ctx)
// Create Redis broadcaster if configured
var broadcaster realtime.Broadcaster
if cfg.RedisURL != "" {
opt, err := redis.ParseURL(cfg.RedisURL)
if err != nil {
logger.Error("invalid REDIS_URL", "error", err)
} else {
redisClient := redis.NewClient(opt)
broadcaster = realtime.NewRedisBroadcaster(redisClient, hub, logger)
go broadcaster.Run(ctx)
logger.Info("redis broadcaster enabled", "url", cfg.RedisURL)
}
}
// Create application
application := app.New("chat-api", app.WithDefaultPort(8001))
// Register shutdown hook to cancel context
application.OnShutdown(func(_ context.Context) error {
cancel()
return nil
})
// Register routes with dependency injection
api.RegisterRoutes(application, exampleService, hub, broadcaster)
api.RegisterRoutes(application, exampleService)
// Start server
application.Run()

View File

@ -1,386 +0,0 @@
package handlers
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"time"
"github.com/go-chi/chi/v5"
"github.com/gorilla/websocket"
"git.threesix.ai/jordan/sp3-solo-1770327084/pkg/httpresponse"
"git.threesix.ai/jordan/sp3-solo-1770327084/pkg/logging"
"git.threesix.ai/jordan/sp3-solo-1770327084/pkg/realtime"
)
// testHub wraps LocalHub for testing.
func newTestHub(t *testing.T) *realtime.LocalHub {
t.Helper()
hub := realtime.NewHub(logging.Nop())
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
go hub.Run(ctx)
return hub
}
// testServer creates an HTTP test server with WebSocket handler.
func newTestServer(t *testing.T, hub realtime.Hub) *httptest.Server {
t.Helper()
wsHandler := realtime.NewHandler(hub, logging.Nop(), realtime.HandlerConfig{
Broadcaster: nil, // No Redis for unit tests
AuthRequired: false,
})
r := chi.NewRouter()
r.Mount("/ws", wsHandler.Routes())
r.Get("/ws/stats", func(w http.ResponseWriter, r *http.Request) {
stats := wsHandler.GetStats()
httpresponse.OK(w, r, stats)
})
return httptest.NewServer(r)
}
// wsURL converts http:// URL to ws:// URL.
func wsURL(server *httptest.Server, path string) string {
return "ws" + strings.TrimPrefix(server.URL, "http") + path
}
func TestWebSocket_Upgrade(t *testing.T) {
hub := newTestHub(t)
server := newTestServer(t, hub)
defer server.Close()
// Connect via WebSocket
conn, resp, err := websocket.DefaultDialer.Dial(wsURL(server, "/ws"), nil)
if err != nil {
t.Fatalf("failed to connect: %v", err)
}
defer conn.Close()
if resp.StatusCode != http.StatusSwitchingProtocols {
t.Errorf("expected status 101, got %d", resp.StatusCode)
}
// Give hub time to register
time.Sleep(50 * time.Millisecond)
if hub.ConnectionCount() != 1 {
t.Errorf("expected 1 connection, got %d", hub.ConnectionCount())
}
}
func TestWebSocket_RoomJoinViaURL(t *testing.T) {
hub := newTestHub(t)
server := newTestServer(t, hub)
defer server.Close()
// Connect to specific room via URL path
conn, _, err := websocket.DefaultDialer.Dial(wsURL(server, "/ws/test-room"), nil)
if err != nil {
t.Fatalf("failed to connect: %v", err)
}
defer conn.Close()
// Give hub time to register and join room
time.Sleep(50 * time.Millisecond)
if hub.RoomCount("test-room") != 1 {
t.Errorf("expected 1 client in room, got %d", hub.RoomCount("test-room"))
}
}
func TestWebSocket_RoomJoinViaQuery(t *testing.T) {
hub := newTestHub(t)
server := newTestServer(t, hub)
defer server.Close()
// Connect with room query parameter
conn, _, err := websocket.DefaultDialer.Dial(wsURL(server, "/ws?room=query-room"), nil)
if err != nil {
t.Fatalf("failed to connect: %v", err)
}
defer conn.Close()
// Give hub time to register and join room
time.Sleep(50 * time.Millisecond)
if hub.RoomCount("query-room") != 1 {
t.Errorf("expected 1 client in query-room, got %d", hub.RoomCount("query-room"))
}
}
func TestWebSocket_MessageBroadcast(t *testing.T) {
hub := newTestHub(t)
server := newTestServer(t, hub)
defer server.Close()
// Connect two clients to the same room
conn1, _, err := websocket.DefaultDialer.Dial(wsURL(server, "/ws/broadcast-room"), nil)
if err != nil {
t.Fatalf("client 1 failed to connect: %v", err)
}
defer conn1.Close()
conn2, _, err := websocket.DefaultDialer.Dial(wsURL(server, "/ws/broadcast-room"), nil)
if err != nil {
t.Fatalf("client 2 failed to connect: %v", err)
}
defer conn2.Close()
// Give hub time to register both clients
time.Sleep(50 * time.Millisecond)
if hub.RoomCount("broadcast-room") != 2 {
t.Fatalf("expected 2 clients in room, got %d", hub.RoomCount("broadcast-room"))
}
// Send a message from client 1
msg := realtime.Message{
Type: realtime.MessageTypeChat,
Room: "broadcast-room",
Data: json.RawMessage(`{"text":"hello"}`),
}
if err := conn1.WriteJSON(msg); err != nil {
t.Fatalf("failed to send message: %v", err)
}
// Both clients should receive the message
var wg sync.WaitGroup
wg.Add(2)
readMessage := func(conn *websocket.Conn, name string) {
defer wg.Done()
conn.SetReadDeadline(time.Now().Add(time.Second))
var received realtime.Message
if err := conn.ReadJSON(&received); err != nil {
t.Errorf("%s failed to read message: %v", name, err)
return
}
if received.Type != realtime.MessageTypeChat {
t.Errorf("%s: expected type 'chat', got %s", name, received.Type)
}
}
go readMessage(conn1, "client1")
go readMessage(conn2, "client2")
wg.Wait()
}
func TestWebSocket_GlobalBroadcast(t *testing.T) {
hub := newTestHub(t)
server := newTestServer(t, hub)
defer server.Close()
// Connect two clients without room (global)
conn1, _, err := websocket.DefaultDialer.Dial(wsURL(server, "/ws"), nil)
if err != nil {
t.Fatalf("client 1 failed to connect: %v", err)
}
defer conn1.Close()
conn2, _, err := websocket.DefaultDialer.Dial(wsURL(server, "/ws"), nil)
if err != nil {
t.Fatalf("client 2 failed to connect: %v", err)
}
defer conn2.Close()
// Give hub time to register
time.Sleep(50 * time.Millisecond)
if hub.ConnectionCount() != 2 {
t.Fatalf("expected 2 connections, got %d", hub.ConnectionCount())
}
// Send a global message (no room)
msg := realtime.Message{
Type: realtime.MessageTypeChat,
Data: json.RawMessage(`{"text":"global message"}`),
}
if err := conn1.WriteJSON(msg); err != nil {
t.Fatalf("failed to send message: %v", err)
}
// Both clients should receive
var wg sync.WaitGroup
wg.Add(2)
readMessage := func(conn *websocket.Conn, name string) {
defer wg.Done()
conn.SetReadDeadline(time.Now().Add(time.Second))
var received realtime.Message
if err := conn.ReadJSON(&received); err != nil {
t.Errorf("%s failed to read message: %v", name, err)
return
}
}
go readMessage(conn1, "client1")
go readMessage(conn2, "client2")
wg.Wait()
}
func TestWebSocket_Stats(t *testing.T) {
hub := newTestHub(t)
server := newTestServer(t, hub)
defer server.Close()
// Check stats before any connections
resp, err := http.Get(server.URL + "/ws/stats")
if err != nil {
t.Fatalf("failed to get stats: %v", err)
}
resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Errorf("expected status 200, got %d", resp.StatusCode)
}
// Connect a client
conn, _, err := websocket.DefaultDialer.Dial(wsURL(server, "/ws"), nil)
if err != nil {
t.Fatalf("failed to connect: %v", err)
}
defer conn.Close()
// Give hub time to register
time.Sleep(50 * time.Millisecond)
// Check stats after connection
resp, err = http.Get(server.URL + "/ws/stats")
if err != nil {
t.Fatalf("failed to get stats: %v", err)
}
defer resp.Body.Close()
var result map[string]any
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
t.Fatalf("failed to decode stats: %v", err)
}
data, ok := result["data"].(map[string]any)
if !ok {
t.Fatal("expected 'data' field in response")
}
count, ok := data["total_connections"].(float64)
if !ok {
t.Fatal("expected 'total_connections' in data")
}
if int(count) != 1 {
t.Errorf("expected 1 connection in stats, got %d", int(count))
}
}
func TestWebSocket_Disconnect(t *testing.T) {
hub := newTestHub(t)
server := newTestServer(t, hub)
defer server.Close()
// Connect
conn, _, err := websocket.DefaultDialer.Dial(wsURL(server, "/ws"), nil)
if err != nil {
t.Fatalf("failed to connect: %v", err)
}
// Give hub time to register
time.Sleep(50 * time.Millisecond)
if hub.ConnectionCount() != 1 {
t.Fatalf("expected 1 connection, got %d", hub.ConnectionCount())
}
// Close connection
conn.Close()
// Give hub time to unregister
time.Sleep(100 * time.Millisecond)
if hub.ConnectionCount() != 0 {
t.Errorf("expected 0 connections after disconnect, got %d", hub.ConnectionCount())
}
}
func TestWebSocket_PingPong(t *testing.T) {
hub := newTestHub(t)
server := newTestServer(t, hub)
defer server.Close()
conn, _, err := websocket.DefaultDialer.Dial(wsURL(server, "/ws"), nil)
if err != nil {
t.Fatalf("failed to connect: %v", err)
}
defer conn.Close()
// Send ping message
msg := realtime.Message{
Type: realtime.MessageTypePing,
}
if err := conn.WriteJSON(msg); err != nil {
t.Fatalf("failed to send ping: %v", err)
}
// Should receive pong
conn.SetReadDeadline(time.Now().Add(time.Second))
var pong realtime.Message
if err := conn.ReadJSON(&pong); err != nil {
t.Fatalf("failed to read pong: %v", err)
}
if pong.Type != realtime.MessageTypePong {
t.Errorf("expected pong message, got %s", pong.Type)
}
}
func TestWebSocket_RoomIsolation(t *testing.T) {
hub := newTestHub(t)
server := newTestServer(t, hub)
defer server.Close()
// Connect client to room A
connA, _, err := websocket.DefaultDialer.Dial(wsURL(server, "/ws/room-a"), nil)
if err != nil {
t.Fatalf("client A failed to connect: %v", err)
}
defer connA.Close()
// Connect client to room B
connB, _, err := websocket.DefaultDialer.Dial(wsURL(server, "/ws/room-b"), nil)
if err != nil {
t.Fatalf("client B failed to connect: %v", err)
}
defer connB.Close()
// Give hub time to register
time.Sleep(50 * time.Millisecond)
// Send message to room A
msg := realtime.Message{
Type: realtime.MessageTypeChat,
Room: "room-a",
Data: json.RawMessage(`{"text":"room A only"}`),
}
if err := connA.WriteJSON(msg); err != nil {
t.Fatalf("failed to send message: %v", err)
}
// Client A should receive it
connA.SetReadDeadline(time.Now().Add(500 * time.Millisecond))
var received realtime.Message
if err := connA.ReadJSON(&received); err != nil {
t.Errorf("client A should receive message: %v", err)
}
// Client B should NOT receive it (timeout expected)
connB.SetReadDeadline(time.Now().Add(200 * time.Millisecond))
if err := connB.ReadJSON(&received); err == nil {
t.Error("client B should NOT receive room-a message")
}
}

View File

@ -2,12 +2,8 @@
package api
import (
"net/http"
"git.threesix.ai/jordan/sp3-solo-1770327084/pkg/app"
"git.threesix.ai/jordan/sp3-solo-1770327084/pkg/auth"
"git.threesix.ai/jordan/sp3-solo-1770327084/pkg/httpresponse"
"git.threesix.ai/jordan/sp3-solo-1770327084/pkg/realtime"
"git.threesix.ai/jordan/sp3-solo-1770327084/services/chat-api/internal/api/handlers"
"git.threesix.ai/jordan/sp3-solo-1770327084/services/chat-api/internal/config"
"git.threesix.ai/jordan/sp3-solo-1770327084/services/chat-api/internal/service"
@ -18,13 +14,7 @@ import (
// This allows the monorepo to expose multiple services under a single domain:
// - https://domain/api/chat-api/health
// - https://domain/api/chat-api/examples
// - https://domain/api/chat-api/ws (WebSocket)
func RegisterRoutes(
application *app.App,
exampleService *service.ExampleService,
hub realtime.Hub,
broadcaster realtime.Broadcaster,
) {
func RegisterRoutes(application *app.App, exampleService *service.ExampleService) {
logger := application.Logger()
cfg := config.Load()
@ -32,12 +22,6 @@ func RegisterRoutes(
healthHandler := handlers.NewHealth(logger)
exampleHandler := handlers.NewExample(exampleService, logger)
// Initialize WebSocket handler
wsHandler := realtime.NewHandler(hub, logger, realtime.HandlerConfig{
Broadcaster: broadcaster,
AuthRequired: cfg.AuthEnabled,
})
// Build and mount OpenAPI spec
spec := NewServiceSpec()
application.EnableDocs(spec)
@ -51,17 +35,6 @@ func RegisterRoutes(
r.Get("/examples", app.Wrap(exampleHandler.List))
r.Get("/examples/{id}", app.Wrap(exampleHandler.Get))
// WebSocket routes
// GET /ws - connect to global channel
// GET /ws/{room} - connect and join a room
r.Mount("/ws", wsHandler.Routes())
// WebSocket stats endpoint
r.Get("/ws/stats", func(w http.ResponseWriter, r *http.Request) {
stats := wsHandler.GetStats()
httpresponse.OK(w, r, stats)
})
// Protected routes (auth required when enabled)
r.Group(func(r app.Router) {
if cfg.AuthEnabled {

View File

@ -8,8 +8,7 @@ func NewServiceSpec() *openapi.OpenAPISpec {
WithDescription("REST API for the chat-api service").
WithBearerSecurity("bearer", "JWT authentication token").
WithTag("Health", "Service health endpoints").
WithTag("Examples", "Example CRUD endpoints").
WithTag("WebSocket", "Real-time WebSocket endpoints")
WithTag("Examples", "Example CRUD endpoints")
// Define reusable schemas
spec.WithSchema("Example", openapi.Object(map[string]openapi.Schema{
@ -109,65 +108,5 @@ func NewServiceSpec() *openapi.OpenAPISpec {
},
})
// WebSocket schemas
spec.WithSchema("WebSocketMessage", openapi.Object(map[string]openapi.Schema{
"id": openapi.UUID().WithDescription("Unique message identifier"),
"type": openapi.String().WithDescription("Message type: chat, presence, notification, system, error"),
"room": openapi.String().WithDescription("Target room (empty for global messages)"),
"from": openapi.String().WithDescription("Sender's client ID"),
"data": openapi.Object(nil).WithDescription("Message payload"),
"timestamp": openapi.DateTime().WithDescription("Message timestamp"),
}, "type"))
spec.WithSchema("WebSocketStats", openapi.Object(map[string]openapi.Schema{
"total_connections": openapi.Integer().WithDescription("Total active WebSocket connections"),
"room_counts": openapi.Object(nil).WithDescription("Connection count per room"),
}, "total_connections"))
// WebSocket upgrade endpoint (documented for reference, actual upgrade is protocol switch)
spec.AddPath("/api/chat-api/ws", "get", map[string]any{
"summary": "WebSocket connection",
"description": "Upgrades HTTP to WebSocket for real-time chat. Connect to global channel.",
"tags": []string{"WebSocket"},
"responses": map[string]any{
"101": map[string]any{
"description": "Switching Protocols - WebSocket upgrade successful",
},
"401": openapi.OpResponse("Unauthorized (when AUTH_ENABLED=true)", openapi.ErrorResponseSchema()),
},
})
// WebSocket with room
spec.AddPath("/api/chat-api/ws/{room}", "get", map[string]any{
"summary": "WebSocket connection with room",
"description": "Upgrades HTTP to WebSocket and joins the specified room.",
"tags": []string{"WebSocket"},
"parameters": []any{
map[string]any{
"name": "room",
"in": "path",
"description": "Room name to join on connect",
"required": true,
"schema": openapi.String(),
},
},
"responses": map[string]any{
"101": map[string]any{
"description": "Switching Protocols - WebSocket upgrade successful",
},
"401": openapi.OpResponse("Unauthorized (when AUTH_ENABLED=true)", openapi.ErrorResponseSchema()),
},
})
// WebSocket stats endpoint
spec.AddPath("/api/chat-api/ws/stats", "get", map[string]any{
"summary": "WebSocket statistics",
"description": "Returns current WebSocket connection statistics.",
"tags": []string{"WebSocket"},
"responses": map[string]any{
"200": openapi.OpResponse("Success", openapi.ResponseSchema(openapi.Ref("WebSocketStats"))),
},
})
return spec
}

View File

@ -18,9 +18,6 @@ type Config struct {
// Auth
AuthEnabled bool
JWTSecret string
// Redis
RedisURL string
}
// Load reads configuration from environment variables.
@ -33,7 +30,5 @@ func Load() *Config {
AuthEnabled: strings.EqualFold(os.Getenv("AUTH_ENABLED"), "true"),
JWTSecret: os.Getenv("JWT_SECRET"),
RedisURL: os.Getenv("REDIS_URL"),
}
}