Compare commits
No commits in common. "feature/websocket-chat" and "main" have entirely different histories.
feature/we
...
main
@ -1,4 +0,0 @@
|
|||||||
name: feature/websocket-chat
|
|
||||||
feature: websocket-chat
|
|
||||||
base_branch: main
|
|
||||||
created_at: 2026-02-05T21:54:23.43461145Z
|
|
||||||
@ -1,36 +0,0 @@
|
|||||||
version: 1
|
|
||||||
project:
|
|
||||||
name: workspace
|
|
||||||
branches:
|
|
||||||
main: main
|
|
||||||
feature_prefix: feature/
|
|
||||||
phases:
|
|
||||||
enabled:
|
|
||||||
- draft
|
|
||||||
- specified
|
|
||||||
- planned
|
|
||||||
- ready
|
|
||||||
- implementation
|
|
||||||
- review
|
|
||||||
- audit
|
|
||||||
- qa
|
|
||||||
- merge
|
|
||||||
- released
|
|
||||||
required_artifacts:
|
|
||||||
audit:
|
|
||||||
- audit
|
|
||||||
planned:
|
|
||||||
- spec
|
|
||||||
- design
|
|
||||||
- tasks
|
|
||||||
- qa_plan
|
|
||||||
qa:
|
|
||||||
- qa_results
|
|
||||||
review:
|
|
||||||
- review
|
|
||||||
specified:
|
|
||||||
- spec
|
|
||||||
compliance:
|
|
||||||
require_approvals: true
|
|
||||||
require_branch: true
|
|
||||||
require_qa: true
|
|
||||||
@ -1,173 +0,0 @@
|
|||||||
# Technical Design: WebSocket Chat with Redis Pub/Sub
|
|
||||||
|
|
||||||
## Architecture
|
|
||||||
|
|
||||||
The implementation leverages the existing `pkg/realtime` package which provides:
|
|
||||||
- `LocalHub`: In-memory connection and room management
|
|
||||||
- `RedisBroadcaster`: Cross-pod message distribution via Redis pub/sub
|
|
||||||
- `Handler`: HTTP handler for WebSocket upgrade and lifecycle
|
|
||||||
|
|
||||||
```
|
|
||||||
┌─────────────────────────────────────────────────────────────────┐
|
|
||||||
│ chat-api Service │
|
|
||||||
├─────────────────────────────────────────────────────────────────┤
|
|
||||||
│ │
|
|
||||||
│ ┌──────────────┐ ┌──────────────┐ ┌─────────────────┐ │
|
|
||||||
│ │ WebSocket │────▶│ LocalHub │────▶│ Redis Broadcaster│ │
|
|
||||||
│ │ Handler │ │ │ │ │ │
|
|
||||||
│ └──────────────┘ └──────────────┘ └────────┬────────┘ │
|
|
||||||
│ │ ▲ │ │
|
|
||||||
│ │ │ ▼ │
|
|
||||||
│ ▼ │ ┌──────────────┐ │
|
|
||||||
│ ┌──────────────┐ │ │ Redis │ │
|
|
||||||
│ │ Clients │ └──────────────│ Pub/Sub │ │
|
|
||||||
│ │ (WebSocket) │ └──────────────┘ │
|
|
||||||
│ └──────────────┘ │
|
|
||||||
│ │
|
|
||||||
└─────────────────────────────────────────────────────────────────┘
|
|
||||||
```
|
|
||||||
|
|
||||||
## Components
|
|
||||||
|
|
||||||
### 1. Configuration (`internal/config/config.go`)
|
|
||||||
|
|
||||||
Add Redis URL configuration:
|
|
||||||
|
|
||||||
```go
|
|
||||||
type Config struct {
|
|
||||||
// ... existing fields
|
|
||||||
RedisURL string
|
|
||||||
}
|
|
||||||
|
|
||||||
func Load() *Config {
|
|
||||||
return &Config{
|
|
||||||
// ... existing
|
|
||||||
RedisURL: os.Getenv("REDIS_URL"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
### 2. Main Entry Point (`cmd/server/main.go`)
|
|
||||||
|
|
||||||
Initialize realtime components with context for graceful shutdown:
|
|
||||||
|
|
||||||
```go
|
|
||||||
func main() {
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
// Create hub and start event loop
|
|
||||||
hub := realtime.NewHub(logger)
|
|
||||||
go hub.Run(ctx)
|
|
||||||
|
|
||||||
// Create Redis broadcaster (if configured)
|
|
||||||
var broadcaster realtime.Broadcaster
|
|
||||||
if cfg.RedisURL != "" {
|
|
||||||
redisClient := redis.NewClient(&redis.Options{...})
|
|
||||||
broadcaster = realtime.NewRedisBroadcaster(redisClient, hub, logger)
|
|
||||||
go broadcaster.Run(ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Pass to route registration
|
|
||||||
api.RegisterRoutes(application, exampleService, hub, broadcaster)
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
### 3. Route Registration (`internal/api/routes.go`)
|
|
||||||
|
|
||||||
Mount WebSocket handler:
|
|
||||||
|
|
||||||
```go
|
|
||||||
func RegisterRoutes(app *app.App, exampleService *service.ExampleService,
|
|
||||||
hub realtime.Hub, broadcaster realtime.Broadcaster) {
|
|
||||||
|
|
||||||
wsHandler := realtime.NewHandler(hub, logger, realtime.HandlerConfig{
|
|
||||||
Broadcaster: broadcaster,
|
|
||||||
AuthRequired: cfg.AuthEnabled,
|
|
||||||
})
|
|
||||||
|
|
||||||
app.Route("/api/chat-api", func(r app.Router) {
|
|
||||||
// ... existing routes
|
|
||||||
|
|
||||||
// WebSocket routes
|
|
||||||
r.Mount("/ws", wsHandler.Routes())
|
|
||||||
})
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
### 4. WebSocket Handler (`pkg/realtime/handler.go`)
|
|
||||||
|
|
||||||
The existing handler already provides:
|
|
||||||
- `Routes()` returning Chi router with `GET /` and `GET /{room}`
|
|
||||||
- `HandleWebSocket()` for upgrade and lifecycle
|
|
||||||
- `GetStats()` for connection statistics
|
|
||||||
|
|
||||||
Add stats endpoint in routes.go:
|
|
||||||
|
|
||||||
```go
|
|
||||||
r.Get("/ws/stats", func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
stats := wsHandler.GetStats()
|
|
||||||
httpresponse.OK(w, r, stats)
|
|
||||||
})
|
|
||||||
```
|
|
||||||
|
|
||||||
## Message Flow
|
|
||||||
|
|
||||||
### Outbound (Client → Server → Redis → All Pods)
|
|
||||||
|
|
||||||
1. Client sends JSON message via WebSocket
|
|
||||||
2. `WSClient.readPump()` decodes message
|
|
||||||
3. `Handler.makeMessageHandler()` processes message
|
|
||||||
4. If broadcaster configured: `Broadcaster.Publish()` to Redis
|
|
||||||
5. Redis distributes to all subscribed pods
|
|
||||||
6. Each pod's `RedisBroadcaster.Run()` receives message
|
|
||||||
7. `Hub.Broadcast()` delivers to local connections
|
|
||||||
|
|
||||||
### Inbound (Redis → Server → Clients)
|
|
||||||
|
|
||||||
1. `RedisBroadcaster.Run()` subscribes to Redis channels
|
|
||||||
2. Receives message, skips if from same pod (echo prevention)
|
|
||||||
3. Calls `Hub.Broadcast()` with message
|
|
||||||
4. `LocalHub.doBroadcast()` delivers to room or all connections
|
|
||||||
5. Each `Connection.Send()` queues to client's send buffer
|
|
||||||
6. `WSClient.writePump()` writes to WebSocket
|
|
||||||
|
|
||||||
## Redis Channel Structure
|
|
||||||
|
|
||||||
- `realtime:global` - Messages without room targeting
|
|
||||||
- `realtime:room:{room}` - Messages for specific room
|
|
||||||
|
|
||||||
## Configuration
|
|
||||||
|
|
||||||
Environment variables:
|
|
||||||
|
|
||||||
| Variable | Description | Default |
|
|
||||||
|----------|-------------|---------|
|
|
||||||
| `REDIS_URL` | Redis connection URL | (empty = local-only mode) |
|
|
||||||
| `AUTH_ENABLED` | Require authentication for WebSocket | `false` |
|
|
||||||
|
|
||||||
## Graceful Shutdown
|
|
||||||
|
|
||||||
1. Server receives SIGTERM/SIGINT
|
|
||||||
2. Context cancelled
|
|
||||||
3. `Hub.Run()` exits, closes all connections
|
|
||||||
4. `RedisBroadcaster.Run()` closes Redis subscription
|
|
||||||
5. Server shutdown completes
|
|
||||||
|
|
||||||
## Files to Modify
|
|
||||||
|
|
||||||
1. `services/chat-api/internal/config/config.go` - Add RedisURL
|
|
||||||
2. `services/chat-api/cmd/server/main.go` - Initialize hub/broadcaster
|
|
||||||
3. `services/chat-api/internal/api/routes.go` - Mount WebSocket handler
|
|
||||||
4. `services/chat-api/.env.example` - Add REDIS_URL
|
|
||||||
|
|
||||||
## Files to Create
|
|
||||||
|
|
||||||
1. `services/chat-api/internal/api/handlers/ws.go` - Stats handler wrapper
|
|
||||||
2. `services/chat-api/internal/api/handlers/ws_test.go` - WebSocket tests
|
|
||||||
|
|
||||||
## Dependencies
|
|
||||||
|
|
||||||
Already available in `pkg/go.mod`:
|
|
||||||
- `github.com/gorilla/websocket v1.5.3`
|
|
||||||
- `github.com/redis/go-redis/v9 v9.7.0`
|
|
||||||
@ -1,50 +0,0 @@
|
|||||||
slug: websocket-chat
|
|
||||||
title: WebSocket Chat with Redis Pub/Sub
|
|
||||||
created: 2026-02-05T21:52:07.328706369Z
|
|
||||||
branch: feature/websocket-chat
|
|
||||||
phase: implementation
|
|
||||||
phase_history:
|
|
||||||
- phase: draft
|
|
||||||
entered: 2026-02-05T21:52:07.328706369Z
|
|
||||||
exited: 2026-02-05T21:53:53.812434642Z
|
|
||||||
- phase: specified
|
|
||||||
entered: 2026-02-05T21:53:53.812434642Z
|
|
||||||
exited: 2026-02-05T21:54:20.249171969Z
|
|
||||||
- phase: planned
|
|
||||||
entered: 2026-02-05T21:54:20.249171969Z
|
|
||||||
exited: 2026-02-05T21:54:29.234820907Z
|
|
||||||
- phase: ready
|
|
||||||
entered: 2026-02-05T21:54:29.234820907Z
|
|
||||||
exited: 2026-02-05T21:54:29.239877785Z
|
|
||||||
- phase: implementation
|
|
||||||
entered: 2026-02-05T21:54:29.239877785Z
|
|
||||||
artifacts:
|
|
||||||
audit:
|
|
||||||
status: pending
|
|
||||||
path: audit.md
|
|
||||||
design:
|
|
||||||
status: approved
|
|
||||||
path: design.md
|
|
||||||
approved_by: user
|
|
||||||
approved_at: 2026-02-05T21:53:28.832971074Z
|
|
||||||
qa_plan:
|
|
||||||
status: approved
|
|
||||||
path: qa-plan.md
|
|
||||||
approved_by: user
|
|
||||||
approved_at: 2026-02-05T21:54:13.064966346Z
|
|
||||||
qa_results:
|
|
||||||
status: pending
|
|
||||||
path: qa-results.md
|
|
||||||
review:
|
|
||||||
status: pending
|
|
||||||
path: review.md
|
|
||||||
spec:
|
|
||||||
status: approved
|
|
||||||
path: spec.md
|
|
||||||
approved_by: user
|
|
||||||
approved_at: 2026-02-05T21:53:27.872265485Z
|
|
||||||
tasks:
|
|
||||||
status: approved
|
|
||||||
path: tasks.md
|
|
||||||
approved_by: user
|
|
||||||
approved_at: 2026-02-05T21:53:29.394129418Z
|
|
||||||
@ -1,46 +0,0 @@
|
|||||||
# QA Plan: WebSocket Chat with Redis Pub/Sub
|
|
||||||
|
|
||||||
## Test Categories
|
|
||||||
|
|
||||||
### 1. Unit Tests
|
|
||||||
|
|
||||||
| Test | Description | Location |
|
|
||||||
|------|-------------|----------|
|
|
||||||
| Config loading | RedisURL read from environment | `config/config_test.go` |
|
|
||||||
| Hub registration | Connections register/unregister | `pkg/realtime/hub_test.go` |
|
|
||||||
| Room management | Join/leave room operations | `pkg/realtime/hub_test.go` |
|
|
||||||
| Message broadcast | Room and global broadcast | `pkg/realtime/hub_test.go` |
|
|
||||||
|
|
||||||
### 2. Integration Tests
|
|
||||||
|
|
||||||
| Test | Description | Location |
|
|
||||||
|------|-------------|----------|
|
|
||||||
| WebSocket upgrade | GET /ws returns 101 Switching Protocols | `handlers/ws_test.go` |
|
|
||||||
| Room join URL | GET /ws/{room} joins room | `handlers/ws_test.go` |
|
|
||||||
| Room join query | GET /ws?room=x joins room | `handlers/ws_test.go` |
|
|
||||||
| Message echo | Client receives own messages | `handlers/ws_test.go` |
|
|
||||||
| Multi-client broadcast | All clients receive messages | `handlers/ws_test.go` |
|
|
||||||
| Stats endpoint | GET /ws/stats returns counts | `handlers/ws_test.go` |
|
|
||||||
| Connection cleanup | Disconnected clients removed | `handlers/ws_test.go` |
|
|
||||||
|
|
||||||
### 3. Manual Tests
|
|
||||||
|
|
||||||
| Test | Steps | Expected |
|
|
||||||
|------|-------|----------|
|
|
||||||
| wscat connection | `wscat -c ws://localhost:8001/api/chat-api/ws` | Connection established |
|
|
||||||
| Send message | Send JSON `{"type":"chat","data":{"text":"hello"}}` | Message echoed back |
|
|
||||||
| Room broadcast | Two clients join same room, one sends | Both receive message |
|
|
||||||
| Stats check | `curl localhost:8001/api/chat-api/ws/stats` | Returns connection count |
|
|
||||||
|
|
||||||
## Test Environment
|
|
||||||
|
|
||||||
- Redis running via docker-compose
|
|
||||||
- Service running on port 8001
|
|
||||||
- WebSocket client: wscat or websocat
|
|
||||||
|
|
||||||
## Pass Criteria
|
|
||||||
|
|
||||||
- All unit tests pass
|
|
||||||
- All integration tests pass
|
|
||||||
- Manual WebSocket connection test successful
|
|
||||||
- Message broadcast verified with multiple clients
|
|
||||||
@ -1,73 +0,0 @@
|
|||||||
# Feature Specification: WebSocket Chat with Redis Pub/Sub
|
|
||||||
|
|
||||||
## Overview
|
|
||||||
|
|
||||||
Implement a WebSocket endpoint at `GET /ws` that upgrades HTTP connections to WebSocket for real-time chat functionality. Messages received from clients are published to Redis channels, and a Redis subscriber broadcasts incoming messages to all connected clients.
|
|
||||||
|
|
||||||
## Requirements
|
|
||||||
|
|
||||||
### Functional Requirements
|
|
||||||
|
|
||||||
1. **WebSocket Upgrade Endpoint**
|
|
||||||
- `GET /api/chat-api/ws` upgrades HTTP to WebSocket
|
|
||||||
- `GET /api/chat-api/ws/{room}` joins a specific room on connect
|
|
||||||
- Support optional `?room=` query parameter for room selection
|
|
||||||
|
|
||||||
2. **Message Publishing**
|
|
||||||
- Incoming WebSocket messages are published to Redis pub/sub channels
|
|
||||||
- Room-specific messages go to `realtime:room:{room}` channel
|
|
||||||
- Global messages (no room) go to `realtime:global` channel
|
|
||||||
|
|
||||||
3. **Message Broadcasting**
|
|
||||||
- Redis subscriber receives messages from all channels
|
|
||||||
- Messages are broadcast to all connected WebSocket clients
|
|
||||||
- Room-targeted messages only go to clients in that room
|
|
||||||
- Cross-pod broadcasting ensures messages reach all service instances
|
|
||||||
|
|
||||||
4. **Message Format**
|
|
||||||
```json
|
|
||||||
{
|
|
||||||
"id": "uuid",
|
|
||||||
"type": "chat|presence|notification|system",
|
|
||||||
"room": "room-name",
|
|
||||||
"from": "client-id",
|
|
||||||
"data": {},
|
|
||||||
"timestamp": "2026-02-05T00:00:00Z"
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
### Non-Functional Requirements
|
|
||||||
|
|
||||||
1. **Connection Management**
|
|
||||||
- Automatic ping/pong heartbeat (60s timeout)
|
|
||||||
- Graceful connection cleanup on disconnect
|
|
||||||
- Maximum message size: 64KB
|
|
||||||
- Send buffer size: 256 messages
|
|
||||||
|
|
||||||
2. **Scalability**
|
|
||||||
- Multi-pod deployment via Redis pub/sub
|
|
||||||
- Pod ID tracking to prevent message echo
|
|
||||||
|
|
||||||
3. **Configuration**
|
|
||||||
- Redis URL via `REDIS_URL` environment variable
|
|
||||||
- Auth optional via existing `AUTH_ENABLED` flag
|
|
||||||
|
|
||||||
## Acceptance Criteria
|
|
||||||
|
|
||||||
- [ ] WebSocket endpoint accessible at `/api/chat-api/ws`
|
|
||||||
- [ ] Room-based WebSocket endpoint at `/api/chat-api/ws/{room}`
|
|
||||||
- [ ] Messages published to Redis channels
|
|
||||||
- [ ] Redis subscriber broadcasts to connected clients
|
|
||||||
- [ ] Room-targeted messages only reach room members
|
|
||||||
- [ ] Global messages reach all connected clients
|
|
||||||
- [ ] Connection stats endpoint at `/api/chat-api/ws/stats`
|
|
||||||
- [ ] Graceful shutdown of WebSocket connections
|
|
||||||
- [ ] Tests cover WebSocket handler and Redis integration
|
|
||||||
|
|
||||||
## Out of Scope
|
|
||||||
|
|
||||||
- Message persistence (database storage)
|
|
||||||
- Message history retrieval
|
|
||||||
- User authentication enforcement (auth remains optional)
|
|
||||||
- Rate limiting
|
|
||||||
- Message encryption
|
|
||||||
@ -1,142 +0,0 @@
|
|||||||
# Implementation Tasks: WebSocket Chat with Redis Pub/Sub
|
|
||||||
|
|
||||||
## Task 1: Add Redis Configuration
|
|
||||||
|
|
||||||
**ID:** `redis-config`
|
|
||||||
**Scope:** Add Redis URL to service configuration
|
|
||||||
|
|
||||||
**Files:**
|
|
||||||
- `services/chat-api/internal/config/config.go`
|
|
||||||
- `services/chat-api/.env.example`
|
|
||||||
|
|
||||||
**Changes:**
|
|
||||||
1. Add `RedisURL string` field to Config struct
|
|
||||||
2. Read from `REDIS_URL` environment variable
|
|
||||||
3. Add `REDIS_URL` to `.env.example` with comment
|
|
||||||
|
|
||||||
**Acceptance:**
|
|
||||||
- Config struct has RedisURL field
|
|
||||||
- Load() reads REDIS_URL from environment
|
|
||||||
- .env.example documents the configuration
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Task 2: Initialize Hub and Broadcaster
|
|
||||||
|
|
||||||
**ID:** `hub-init`
|
|
||||||
**Blocked By:** `redis-config`
|
|
||||||
**Scope:** Initialize realtime components in main.go
|
|
||||||
|
|
||||||
**Files:**
|
|
||||||
- `services/chat-api/cmd/server/main.go`
|
|
||||||
|
|
||||||
**Changes:**
|
|
||||||
1. Add context with cancel for graceful shutdown
|
|
||||||
2. Create LocalHub and start event loop
|
|
||||||
3. Create RedisBroadcaster if RedisURL configured
|
|
||||||
4. Update RegisterRoutes call to pass hub and broadcaster
|
|
||||||
5. Register shutdown hook to cancel context
|
|
||||||
|
|
||||||
**Acceptance:**
|
|
||||||
- Hub event loop runs in goroutine
|
|
||||||
- Redis broadcaster runs if configured
|
|
||||||
- Context cancelled on shutdown
|
|
||||||
- Service starts without errors
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Task 3: Mount WebSocket Handler
|
|
||||||
|
|
||||||
**ID:** `ws-routes`
|
|
||||||
**Blocked By:** `hub-init`
|
|
||||||
**Scope:** Register WebSocket routes in routes.go
|
|
||||||
|
|
||||||
**Files:**
|
|
||||||
- `services/chat-api/internal/api/routes.go`
|
|
||||||
|
|
||||||
**Changes:**
|
|
||||||
1. Update RegisterRoutes signature to accept Hub and Broadcaster
|
|
||||||
2. Create realtime.Handler with configuration
|
|
||||||
3. Mount handler at `/ws` path under `/api/chat-api`
|
|
||||||
4. Add stats endpoint at `/ws/stats`
|
|
||||||
|
|
||||||
**Acceptance:**
|
|
||||||
- WebSocket upgrade works at `/api/chat-api/ws`
|
|
||||||
- Room-based endpoint at `/api/chat-api/ws/{room}`
|
|
||||||
- Stats endpoint returns connection count
|
|
||||||
- Auth middleware applied when AUTH_ENABLED=true
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Task 4: Add WebSocket OpenAPI Documentation
|
|
||||||
|
|
||||||
**ID:** `ws-openapi`
|
|
||||||
**Blocked By:** `ws-routes`
|
|
||||||
**Scope:** Document WebSocket endpoints in OpenAPI spec
|
|
||||||
|
|
||||||
**Files:**
|
|
||||||
- `services/chat-api/internal/api/spec.go`
|
|
||||||
|
|
||||||
**Changes:**
|
|
||||||
1. Add `/ws` endpoint documentation (upgrade endpoint)
|
|
||||||
2. Add `/ws/{room}` endpoint documentation
|
|
||||||
3. Add `/ws/stats` endpoint with response schema
|
|
||||||
4. Document WebSocket message format in schemas
|
|
||||||
|
|
||||||
**Acceptance:**
|
|
||||||
- OpenAPI spec includes WebSocket endpoints
|
|
||||||
- Stats endpoint has proper response schema
|
|
||||||
- Message format documented
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Task 5: Create WebSocket Integration Tests
|
|
||||||
|
|
||||||
**ID:** `ws-tests`
|
|
||||||
**Blocked By:** `ws-routes`
|
|
||||||
**Scope:** Test WebSocket handler functionality
|
|
||||||
|
|
||||||
**Files:**
|
|
||||||
- `services/chat-api/internal/api/handlers/ws_test.go`
|
|
||||||
|
|
||||||
**Tests:**
|
|
||||||
1. WebSocket connection upgrade succeeds
|
|
||||||
2. Room join via URL parameter works
|
|
||||||
3. Room join via query parameter works
|
|
||||||
4. Message broadcast to room members
|
|
||||||
5. Global message broadcast to all clients
|
|
||||||
6. Stats endpoint returns correct counts
|
|
||||||
7. Connection cleanup on disconnect
|
|
||||||
|
|
||||||
**Acceptance:**
|
|
||||||
- All tests pass
|
|
||||||
- Coverage includes happy path and error cases
|
|
||||||
- Tests use httptest for server simulation
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Task Summary
|
|
||||||
|
|
||||||
| ID | Task | Blocked By | Status |
|
|
||||||
|----|------|------------|--------|
|
|
||||||
| `redis-config` | Add Redis Configuration | - | pending |
|
|
||||||
| `hub-init` | Initialize Hub and Broadcaster | redis-config | pending |
|
|
||||||
| `ws-routes` | Mount WebSocket Handler | hub-init | pending |
|
|
||||||
| `ws-openapi` | Add WebSocket OpenAPI Documentation | ws-routes | pending |
|
|
||||||
| `ws-tests` | Create WebSocket Integration Tests | ws-routes | pending |
|
|
||||||
|
|
||||||
## Dependency Graph
|
|
||||||
|
|
||||||
```
|
|
||||||
redis-config
|
|
||||||
│
|
|
||||||
▼
|
|
||||||
hub-init
|
|
||||||
│
|
|
||||||
▼
|
|
||||||
ws-routes
|
|
||||||
│
|
|
||||||
├─────────┐
|
|
||||||
▼ ▼
|
|
||||||
ws-openapi ws-tests
|
|
||||||
```
|
|
||||||
@ -1,63 +0,0 @@
|
|||||||
version: 1
|
|
||||||
project:
|
|
||||||
name: workspace
|
|
||||||
active_work:
|
|
||||||
features:
|
|
||||||
- slug: websocket-chat
|
|
||||||
branch: feature/websocket-chat
|
|
||||||
phase: implementation
|
|
||||||
blocked: []
|
|
||||||
last_updated: 2026-02-05T21:54:29.240473436Z
|
|
||||||
last_action: TRANSITION
|
|
||||||
last_actor: cli
|
|
||||||
history:
|
|
||||||
- timestamp: 2026-02-05T21:52:07.329080213Z
|
|
||||||
action: CREATE_FEATURE
|
|
||||||
feature: websocket-chat
|
|
||||||
actor: cli
|
|
||||||
result: success
|
|
||||||
- timestamp: 2026-02-05T21:53:27.872711534Z
|
|
||||||
action: APPROVE_ARTIFACT
|
|
||||||
feature: websocket-chat
|
|
||||||
actor: user
|
|
||||||
result: success
|
|
||||||
- timestamp: 2026-02-05T21:53:28.83345252Z
|
|
||||||
action: APPROVE_ARTIFACT
|
|
||||||
feature: websocket-chat
|
|
||||||
actor: user
|
|
||||||
result: success
|
|
||||||
- timestamp: 2026-02-05T21:53:29.394664165Z
|
|
||||||
action: APPROVE_ARTIFACT
|
|
||||||
feature: websocket-chat
|
|
||||||
actor: user
|
|
||||||
result: success
|
|
||||||
- timestamp: 2026-02-05T21:53:53.812877094Z
|
|
||||||
action: TRANSITION
|
|
||||||
feature: websocket-chat
|
|
||||||
actor: cli
|
|
||||||
result: success
|
|
||||||
- timestamp: 2026-02-05T21:54:13.065523174Z
|
|
||||||
action: APPROVE_ARTIFACT
|
|
||||||
feature: websocket-chat
|
|
||||||
actor: user
|
|
||||||
result: success
|
|
||||||
- timestamp: 2026-02-05T21:54:20.24969848Z
|
|
||||||
action: TRANSITION
|
|
||||||
feature: websocket-chat
|
|
||||||
actor: cli
|
|
||||||
result: success
|
|
||||||
- timestamp: 2026-02-05T21:54:23.438644211Z
|
|
||||||
action: CREATE_BRANCH
|
|
||||||
feature: websocket-chat
|
|
||||||
actor: cli
|
|
||||||
result: success
|
|
||||||
- timestamp: 2026-02-05T21:54:29.235335345Z
|
|
||||||
action: TRANSITION
|
|
||||||
feature: websocket-chat
|
|
||||||
actor: cli
|
|
||||||
result: success
|
|
||||||
- timestamp: 2026-02-05T21:54:29.240466754Z
|
|
||||||
action: TRANSITION
|
|
||||||
feature: websocket-chat
|
|
||||||
actor: cli
|
|
||||||
result: success
|
|
||||||
@ -19,6 +19,3 @@ JWT_SECRET=dev-secret-change-in-production
|
|||||||
|
|
||||||
# Database (if needed)
|
# Database (if needed)
|
||||||
DATABASE_URL=postgres://dev:dev@localhost:5432/sp3-solo-1770327084?sslmode=disable
|
DATABASE_URL=postgres://dev:dev@localhost:5432/sp3-solo-1770327084?sslmode=disable
|
||||||
|
|
||||||
# Redis (for WebSocket cross-pod broadcasting, empty = local-only mode)
|
|
||||||
REDIS_URL=redis://localhost:6379/0
|
|
||||||
|
|||||||
@ -2,16 +2,10 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
|
|
||||||
"github.com/redis/go-redis/v9"
|
|
||||||
|
|
||||||
"git.threesix.ai/jordan/sp3-solo-1770327084/pkg/app"
|
"git.threesix.ai/jordan/sp3-solo-1770327084/pkg/app"
|
||||||
"git.threesix.ai/jordan/sp3-solo-1770327084/pkg/logging"
|
"git.threesix.ai/jordan/sp3-solo-1770327084/pkg/logging"
|
||||||
"git.threesix.ai/jordan/sp3-solo-1770327084/pkg/realtime"
|
|
||||||
"git.threesix.ai/jordan/sp3-solo-1770327084/services/chat-api/internal/adapter/memory"
|
"git.threesix.ai/jordan/sp3-solo-1770327084/services/chat-api/internal/adapter/memory"
|
||||||
"git.threesix.ai/jordan/sp3-solo-1770327084/services/chat-api/internal/api"
|
"git.threesix.ai/jordan/sp3-solo-1770327084/services/chat-api/internal/api"
|
||||||
"git.threesix.ai/jordan/sp3-solo-1770327084/services/chat-api/internal/config"
|
|
||||||
"git.threesix.ai/jordan/sp3-solo-1770327084/services/chat-api/internal/service"
|
"git.threesix.ai/jordan/sp3-solo-1770327084/services/chat-api/internal/service"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -19,48 +13,17 @@ func main() {
|
|||||||
// Create logger
|
// Create logger
|
||||||
logger := logging.Default()
|
logger := logging.Default()
|
||||||
|
|
||||||
// Load configuration
|
|
||||||
cfg := config.Load()
|
|
||||||
|
|
||||||
// Create context for graceful shutdown of background goroutines
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
// Create adapters (repositories)
|
// Create adapters (repositories)
|
||||||
exampleRepo := memory.NewExampleRepository()
|
exampleRepo := memory.NewExampleRepository()
|
||||||
|
|
||||||
// Create services (business logic)
|
// Create services (business logic)
|
||||||
exampleService := service.NewExampleService(exampleRepo, logger)
|
exampleService := service.NewExampleService(exampleRepo, logger)
|
||||||
|
|
||||||
// Create WebSocket hub and start event loop
|
|
||||||
hub := realtime.NewHub(logger)
|
|
||||||
go hub.Run(ctx)
|
|
||||||
|
|
||||||
// Create Redis broadcaster if configured
|
|
||||||
var broadcaster realtime.Broadcaster
|
|
||||||
if cfg.RedisURL != "" {
|
|
||||||
opt, err := redis.ParseURL(cfg.RedisURL)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error("invalid REDIS_URL", "error", err)
|
|
||||||
} else {
|
|
||||||
redisClient := redis.NewClient(opt)
|
|
||||||
broadcaster = realtime.NewRedisBroadcaster(redisClient, hub, logger)
|
|
||||||
go broadcaster.Run(ctx)
|
|
||||||
logger.Info("redis broadcaster enabled", "url", cfg.RedisURL)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create application
|
// Create application
|
||||||
application := app.New("chat-api", app.WithDefaultPort(8001))
|
application := app.New("chat-api", app.WithDefaultPort(8001))
|
||||||
|
|
||||||
// Register shutdown hook to cancel context
|
|
||||||
application.OnShutdown(func(_ context.Context) error {
|
|
||||||
cancel()
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
// Register routes with dependency injection
|
// Register routes with dependency injection
|
||||||
api.RegisterRoutes(application, exampleService, hub, broadcaster)
|
api.RegisterRoutes(application, exampleService)
|
||||||
|
|
||||||
// Start server
|
// Start server
|
||||||
application.Run()
|
application.Run()
|
||||||
|
|||||||
@ -1,386 +0,0 @@
|
|||||||
package handlers
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"net/http"
|
|
||||||
"net/http/httptest"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/go-chi/chi/v5"
|
|
||||||
"github.com/gorilla/websocket"
|
|
||||||
|
|
||||||
"git.threesix.ai/jordan/sp3-solo-1770327084/pkg/httpresponse"
|
|
||||||
"git.threesix.ai/jordan/sp3-solo-1770327084/pkg/logging"
|
|
||||||
"git.threesix.ai/jordan/sp3-solo-1770327084/pkg/realtime"
|
|
||||||
)
|
|
||||||
|
|
||||||
// testHub wraps LocalHub for testing.
|
|
||||||
func newTestHub(t *testing.T) *realtime.LocalHub {
|
|
||||||
t.Helper()
|
|
||||||
hub := realtime.NewHub(logging.Nop())
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
t.Cleanup(cancel)
|
|
||||||
go hub.Run(ctx)
|
|
||||||
return hub
|
|
||||||
}
|
|
||||||
|
|
||||||
// testServer creates an HTTP test server with WebSocket handler.
|
|
||||||
func newTestServer(t *testing.T, hub realtime.Hub) *httptest.Server {
|
|
||||||
t.Helper()
|
|
||||||
|
|
||||||
wsHandler := realtime.NewHandler(hub, logging.Nop(), realtime.HandlerConfig{
|
|
||||||
Broadcaster: nil, // No Redis for unit tests
|
|
||||||
AuthRequired: false,
|
|
||||||
})
|
|
||||||
|
|
||||||
r := chi.NewRouter()
|
|
||||||
r.Mount("/ws", wsHandler.Routes())
|
|
||||||
r.Get("/ws/stats", func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
stats := wsHandler.GetStats()
|
|
||||||
httpresponse.OK(w, r, stats)
|
|
||||||
})
|
|
||||||
|
|
||||||
return httptest.NewServer(r)
|
|
||||||
}
|
|
||||||
|
|
||||||
// wsURL converts http:// URL to ws:// URL.
|
|
||||||
func wsURL(server *httptest.Server, path string) string {
|
|
||||||
return "ws" + strings.TrimPrefix(server.URL, "http") + path
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWebSocket_Upgrade(t *testing.T) {
|
|
||||||
hub := newTestHub(t)
|
|
||||||
server := newTestServer(t, hub)
|
|
||||||
defer server.Close()
|
|
||||||
|
|
||||||
// Connect via WebSocket
|
|
||||||
conn, resp, err := websocket.DefaultDialer.Dial(wsURL(server, "/ws"), nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to connect: %v", err)
|
|
||||||
}
|
|
||||||
defer conn.Close()
|
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusSwitchingProtocols {
|
|
||||||
t.Errorf("expected status 101, got %d", resp.StatusCode)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Give hub time to register
|
|
||||||
time.Sleep(50 * time.Millisecond)
|
|
||||||
|
|
||||||
if hub.ConnectionCount() != 1 {
|
|
||||||
t.Errorf("expected 1 connection, got %d", hub.ConnectionCount())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWebSocket_RoomJoinViaURL(t *testing.T) {
|
|
||||||
hub := newTestHub(t)
|
|
||||||
server := newTestServer(t, hub)
|
|
||||||
defer server.Close()
|
|
||||||
|
|
||||||
// Connect to specific room via URL path
|
|
||||||
conn, _, err := websocket.DefaultDialer.Dial(wsURL(server, "/ws/test-room"), nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to connect: %v", err)
|
|
||||||
}
|
|
||||||
defer conn.Close()
|
|
||||||
|
|
||||||
// Give hub time to register and join room
|
|
||||||
time.Sleep(50 * time.Millisecond)
|
|
||||||
|
|
||||||
if hub.RoomCount("test-room") != 1 {
|
|
||||||
t.Errorf("expected 1 client in room, got %d", hub.RoomCount("test-room"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWebSocket_RoomJoinViaQuery(t *testing.T) {
|
|
||||||
hub := newTestHub(t)
|
|
||||||
server := newTestServer(t, hub)
|
|
||||||
defer server.Close()
|
|
||||||
|
|
||||||
// Connect with room query parameter
|
|
||||||
conn, _, err := websocket.DefaultDialer.Dial(wsURL(server, "/ws?room=query-room"), nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to connect: %v", err)
|
|
||||||
}
|
|
||||||
defer conn.Close()
|
|
||||||
|
|
||||||
// Give hub time to register and join room
|
|
||||||
time.Sleep(50 * time.Millisecond)
|
|
||||||
|
|
||||||
if hub.RoomCount("query-room") != 1 {
|
|
||||||
t.Errorf("expected 1 client in query-room, got %d", hub.RoomCount("query-room"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWebSocket_MessageBroadcast(t *testing.T) {
|
|
||||||
hub := newTestHub(t)
|
|
||||||
server := newTestServer(t, hub)
|
|
||||||
defer server.Close()
|
|
||||||
|
|
||||||
// Connect two clients to the same room
|
|
||||||
conn1, _, err := websocket.DefaultDialer.Dial(wsURL(server, "/ws/broadcast-room"), nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("client 1 failed to connect: %v", err)
|
|
||||||
}
|
|
||||||
defer conn1.Close()
|
|
||||||
|
|
||||||
conn2, _, err := websocket.DefaultDialer.Dial(wsURL(server, "/ws/broadcast-room"), nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("client 2 failed to connect: %v", err)
|
|
||||||
}
|
|
||||||
defer conn2.Close()
|
|
||||||
|
|
||||||
// Give hub time to register both clients
|
|
||||||
time.Sleep(50 * time.Millisecond)
|
|
||||||
|
|
||||||
if hub.RoomCount("broadcast-room") != 2 {
|
|
||||||
t.Fatalf("expected 2 clients in room, got %d", hub.RoomCount("broadcast-room"))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send a message from client 1
|
|
||||||
msg := realtime.Message{
|
|
||||||
Type: realtime.MessageTypeChat,
|
|
||||||
Room: "broadcast-room",
|
|
||||||
Data: json.RawMessage(`{"text":"hello"}`),
|
|
||||||
}
|
|
||||||
if err := conn1.WriteJSON(msg); err != nil {
|
|
||||||
t.Fatalf("failed to send message: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Both clients should receive the message
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
wg.Add(2)
|
|
||||||
|
|
||||||
readMessage := func(conn *websocket.Conn, name string) {
|
|
||||||
defer wg.Done()
|
|
||||||
conn.SetReadDeadline(time.Now().Add(time.Second))
|
|
||||||
var received realtime.Message
|
|
||||||
if err := conn.ReadJSON(&received); err != nil {
|
|
||||||
t.Errorf("%s failed to read message: %v", name, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if received.Type != realtime.MessageTypeChat {
|
|
||||||
t.Errorf("%s: expected type 'chat', got %s", name, received.Type)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
go readMessage(conn1, "client1")
|
|
||||||
go readMessage(conn2, "client2")
|
|
||||||
wg.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWebSocket_GlobalBroadcast(t *testing.T) {
|
|
||||||
hub := newTestHub(t)
|
|
||||||
server := newTestServer(t, hub)
|
|
||||||
defer server.Close()
|
|
||||||
|
|
||||||
// Connect two clients without room (global)
|
|
||||||
conn1, _, err := websocket.DefaultDialer.Dial(wsURL(server, "/ws"), nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("client 1 failed to connect: %v", err)
|
|
||||||
}
|
|
||||||
defer conn1.Close()
|
|
||||||
|
|
||||||
conn2, _, err := websocket.DefaultDialer.Dial(wsURL(server, "/ws"), nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("client 2 failed to connect: %v", err)
|
|
||||||
}
|
|
||||||
defer conn2.Close()
|
|
||||||
|
|
||||||
// Give hub time to register
|
|
||||||
time.Sleep(50 * time.Millisecond)
|
|
||||||
|
|
||||||
if hub.ConnectionCount() != 2 {
|
|
||||||
t.Fatalf("expected 2 connections, got %d", hub.ConnectionCount())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send a global message (no room)
|
|
||||||
msg := realtime.Message{
|
|
||||||
Type: realtime.MessageTypeChat,
|
|
||||||
Data: json.RawMessage(`{"text":"global message"}`),
|
|
||||||
}
|
|
||||||
if err := conn1.WriteJSON(msg); err != nil {
|
|
||||||
t.Fatalf("failed to send message: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Both clients should receive
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
wg.Add(2)
|
|
||||||
|
|
||||||
readMessage := func(conn *websocket.Conn, name string) {
|
|
||||||
defer wg.Done()
|
|
||||||
conn.SetReadDeadline(time.Now().Add(time.Second))
|
|
||||||
var received realtime.Message
|
|
||||||
if err := conn.ReadJSON(&received); err != nil {
|
|
||||||
t.Errorf("%s failed to read message: %v", name, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
go readMessage(conn1, "client1")
|
|
||||||
go readMessage(conn2, "client2")
|
|
||||||
wg.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWebSocket_Stats(t *testing.T) {
|
|
||||||
hub := newTestHub(t)
|
|
||||||
server := newTestServer(t, hub)
|
|
||||||
defer server.Close()
|
|
||||||
|
|
||||||
// Check stats before any connections
|
|
||||||
resp, err := http.Get(server.URL + "/ws/stats")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to get stats: %v", err)
|
|
||||||
}
|
|
||||||
resp.Body.Close()
|
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
|
||||||
t.Errorf("expected status 200, got %d", resp.StatusCode)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Connect a client
|
|
||||||
conn, _, err := websocket.DefaultDialer.Dial(wsURL(server, "/ws"), nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to connect: %v", err)
|
|
||||||
}
|
|
||||||
defer conn.Close()
|
|
||||||
|
|
||||||
// Give hub time to register
|
|
||||||
time.Sleep(50 * time.Millisecond)
|
|
||||||
|
|
||||||
// Check stats after connection
|
|
||||||
resp, err = http.Get(server.URL + "/ws/stats")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to get stats: %v", err)
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
var result map[string]any
|
|
||||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
|
||||||
t.Fatalf("failed to decode stats: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
data, ok := result["data"].(map[string]any)
|
|
||||||
if !ok {
|
|
||||||
t.Fatal("expected 'data' field in response")
|
|
||||||
}
|
|
||||||
|
|
||||||
count, ok := data["total_connections"].(float64)
|
|
||||||
if !ok {
|
|
||||||
t.Fatal("expected 'total_connections' in data")
|
|
||||||
}
|
|
||||||
|
|
||||||
if int(count) != 1 {
|
|
||||||
t.Errorf("expected 1 connection in stats, got %d", int(count))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWebSocket_Disconnect(t *testing.T) {
|
|
||||||
hub := newTestHub(t)
|
|
||||||
server := newTestServer(t, hub)
|
|
||||||
defer server.Close()
|
|
||||||
|
|
||||||
// Connect
|
|
||||||
conn, _, err := websocket.DefaultDialer.Dial(wsURL(server, "/ws"), nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to connect: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Give hub time to register
|
|
||||||
time.Sleep(50 * time.Millisecond)
|
|
||||||
|
|
||||||
if hub.ConnectionCount() != 1 {
|
|
||||||
t.Fatalf("expected 1 connection, got %d", hub.ConnectionCount())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close connection
|
|
||||||
conn.Close()
|
|
||||||
|
|
||||||
// Give hub time to unregister
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
|
|
||||||
if hub.ConnectionCount() != 0 {
|
|
||||||
t.Errorf("expected 0 connections after disconnect, got %d", hub.ConnectionCount())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWebSocket_PingPong(t *testing.T) {
|
|
||||||
hub := newTestHub(t)
|
|
||||||
server := newTestServer(t, hub)
|
|
||||||
defer server.Close()
|
|
||||||
|
|
||||||
conn, _, err := websocket.DefaultDialer.Dial(wsURL(server, "/ws"), nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to connect: %v", err)
|
|
||||||
}
|
|
||||||
defer conn.Close()
|
|
||||||
|
|
||||||
// Send ping message
|
|
||||||
msg := realtime.Message{
|
|
||||||
Type: realtime.MessageTypePing,
|
|
||||||
}
|
|
||||||
if err := conn.WriteJSON(msg); err != nil {
|
|
||||||
t.Fatalf("failed to send ping: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Should receive pong
|
|
||||||
conn.SetReadDeadline(time.Now().Add(time.Second))
|
|
||||||
var pong realtime.Message
|
|
||||||
if err := conn.ReadJSON(&pong); err != nil {
|
|
||||||
t.Fatalf("failed to read pong: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if pong.Type != realtime.MessageTypePong {
|
|
||||||
t.Errorf("expected pong message, got %s", pong.Type)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWebSocket_RoomIsolation(t *testing.T) {
|
|
||||||
hub := newTestHub(t)
|
|
||||||
server := newTestServer(t, hub)
|
|
||||||
defer server.Close()
|
|
||||||
|
|
||||||
// Connect client to room A
|
|
||||||
connA, _, err := websocket.DefaultDialer.Dial(wsURL(server, "/ws/room-a"), nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("client A failed to connect: %v", err)
|
|
||||||
}
|
|
||||||
defer connA.Close()
|
|
||||||
|
|
||||||
// Connect client to room B
|
|
||||||
connB, _, err := websocket.DefaultDialer.Dial(wsURL(server, "/ws/room-b"), nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("client B failed to connect: %v", err)
|
|
||||||
}
|
|
||||||
defer connB.Close()
|
|
||||||
|
|
||||||
// Give hub time to register
|
|
||||||
time.Sleep(50 * time.Millisecond)
|
|
||||||
|
|
||||||
// Send message to room A
|
|
||||||
msg := realtime.Message{
|
|
||||||
Type: realtime.MessageTypeChat,
|
|
||||||
Room: "room-a",
|
|
||||||
Data: json.RawMessage(`{"text":"room A only"}`),
|
|
||||||
}
|
|
||||||
if err := connA.WriteJSON(msg); err != nil {
|
|
||||||
t.Fatalf("failed to send message: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Client A should receive it
|
|
||||||
connA.SetReadDeadline(time.Now().Add(500 * time.Millisecond))
|
|
||||||
var received realtime.Message
|
|
||||||
if err := connA.ReadJSON(&received); err != nil {
|
|
||||||
t.Errorf("client A should receive message: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Client B should NOT receive it (timeout expected)
|
|
||||||
connB.SetReadDeadline(time.Now().Add(200 * time.Millisecond))
|
|
||||||
if err := connB.ReadJSON(&received); err == nil {
|
|
||||||
t.Error("client B should NOT receive room-a message")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -2,12 +2,8 @@
|
|||||||
package api
|
package api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"net/http"
|
|
||||||
|
|
||||||
"git.threesix.ai/jordan/sp3-solo-1770327084/pkg/app"
|
"git.threesix.ai/jordan/sp3-solo-1770327084/pkg/app"
|
||||||
"git.threesix.ai/jordan/sp3-solo-1770327084/pkg/auth"
|
"git.threesix.ai/jordan/sp3-solo-1770327084/pkg/auth"
|
||||||
"git.threesix.ai/jordan/sp3-solo-1770327084/pkg/httpresponse"
|
|
||||||
"git.threesix.ai/jordan/sp3-solo-1770327084/pkg/realtime"
|
|
||||||
"git.threesix.ai/jordan/sp3-solo-1770327084/services/chat-api/internal/api/handlers"
|
"git.threesix.ai/jordan/sp3-solo-1770327084/services/chat-api/internal/api/handlers"
|
||||||
"git.threesix.ai/jordan/sp3-solo-1770327084/services/chat-api/internal/config"
|
"git.threesix.ai/jordan/sp3-solo-1770327084/services/chat-api/internal/config"
|
||||||
"git.threesix.ai/jordan/sp3-solo-1770327084/services/chat-api/internal/service"
|
"git.threesix.ai/jordan/sp3-solo-1770327084/services/chat-api/internal/service"
|
||||||
@ -18,13 +14,7 @@ import (
|
|||||||
// This allows the monorepo to expose multiple services under a single domain:
|
// This allows the monorepo to expose multiple services under a single domain:
|
||||||
// - https://domain/api/chat-api/health
|
// - https://domain/api/chat-api/health
|
||||||
// - https://domain/api/chat-api/examples
|
// - https://domain/api/chat-api/examples
|
||||||
// - https://domain/api/chat-api/ws (WebSocket)
|
func RegisterRoutes(application *app.App, exampleService *service.ExampleService) {
|
||||||
func RegisterRoutes(
|
|
||||||
application *app.App,
|
|
||||||
exampleService *service.ExampleService,
|
|
||||||
hub realtime.Hub,
|
|
||||||
broadcaster realtime.Broadcaster,
|
|
||||||
) {
|
|
||||||
logger := application.Logger()
|
logger := application.Logger()
|
||||||
cfg := config.Load()
|
cfg := config.Load()
|
||||||
|
|
||||||
@ -32,12 +22,6 @@ func RegisterRoutes(
|
|||||||
healthHandler := handlers.NewHealth(logger)
|
healthHandler := handlers.NewHealth(logger)
|
||||||
exampleHandler := handlers.NewExample(exampleService, logger)
|
exampleHandler := handlers.NewExample(exampleService, logger)
|
||||||
|
|
||||||
// Initialize WebSocket handler
|
|
||||||
wsHandler := realtime.NewHandler(hub, logger, realtime.HandlerConfig{
|
|
||||||
Broadcaster: broadcaster,
|
|
||||||
AuthRequired: cfg.AuthEnabled,
|
|
||||||
})
|
|
||||||
|
|
||||||
// Build and mount OpenAPI spec
|
// Build and mount OpenAPI spec
|
||||||
spec := NewServiceSpec()
|
spec := NewServiceSpec()
|
||||||
application.EnableDocs(spec)
|
application.EnableDocs(spec)
|
||||||
@ -51,17 +35,6 @@ func RegisterRoutes(
|
|||||||
r.Get("/examples", app.Wrap(exampleHandler.List))
|
r.Get("/examples", app.Wrap(exampleHandler.List))
|
||||||
r.Get("/examples/{id}", app.Wrap(exampleHandler.Get))
|
r.Get("/examples/{id}", app.Wrap(exampleHandler.Get))
|
||||||
|
|
||||||
// WebSocket routes
|
|
||||||
// GET /ws - connect to global channel
|
|
||||||
// GET /ws/{room} - connect and join a room
|
|
||||||
r.Mount("/ws", wsHandler.Routes())
|
|
||||||
|
|
||||||
// WebSocket stats endpoint
|
|
||||||
r.Get("/ws/stats", func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
stats := wsHandler.GetStats()
|
|
||||||
httpresponse.OK(w, r, stats)
|
|
||||||
})
|
|
||||||
|
|
||||||
// Protected routes (auth required when enabled)
|
// Protected routes (auth required when enabled)
|
||||||
r.Group(func(r app.Router) {
|
r.Group(func(r app.Router) {
|
||||||
if cfg.AuthEnabled {
|
if cfg.AuthEnabled {
|
||||||
|
|||||||
@ -8,8 +8,7 @@ func NewServiceSpec() *openapi.OpenAPISpec {
|
|||||||
WithDescription("REST API for the chat-api service").
|
WithDescription("REST API for the chat-api service").
|
||||||
WithBearerSecurity("bearer", "JWT authentication token").
|
WithBearerSecurity("bearer", "JWT authentication token").
|
||||||
WithTag("Health", "Service health endpoints").
|
WithTag("Health", "Service health endpoints").
|
||||||
WithTag("Examples", "Example CRUD endpoints").
|
WithTag("Examples", "Example CRUD endpoints")
|
||||||
WithTag("WebSocket", "Real-time WebSocket endpoints")
|
|
||||||
|
|
||||||
// Define reusable schemas
|
// Define reusable schemas
|
||||||
spec.WithSchema("Example", openapi.Object(map[string]openapi.Schema{
|
spec.WithSchema("Example", openapi.Object(map[string]openapi.Schema{
|
||||||
@ -109,65 +108,5 @@ func NewServiceSpec() *openapi.OpenAPISpec {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
// WebSocket schemas
|
|
||||||
spec.WithSchema("WebSocketMessage", openapi.Object(map[string]openapi.Schema{
|
|
||||||
"id": openapi.UUID().WithDescription("Unique message identifier"),
|
|
||||||
"type": openapi.String().WithDescription("Message type: chat, presence, notification, system, error"),
|
|
||||||
"room": openapi.String().WithDescription("Target room (empty for global messages)"),
|
|
||||||
"from": openapi.String().WithDescription("Sender's client ID"),
|
|
||||||
"data": openapi.Object(nil).WithDescription("Message payload"),
|
|
||||||
"timestamp": openapi.DateTime().WithDescription("Message timestamp"),
|
|
||||||
}, "type"))
|
|
||||||
|
|
||||||
spec.WithSchema("WebSocketStats", openapi.Object(map[string]openapi.Schema{
|
|
||||||
"total_connections": openapi.Integer().WithDescription("Total active WebSocket connections"),
|
|
||||||
"room_counts": openapi.Object(nil).WithDescription("Connection count per room"),
|
|
||||||
}, "total_connections"))
|
|
||||||
|
|
||||||
// WebSocket upgrade endpoint (documented for reference, actual upgrade is protocol switch)
|
|
||||||
spec.AddPath("/api/chat-api/ws", "get", map[string]any{
|
|
||||||
"summary": "WebSocket connection",
|
|
||||||
"description": "Upgrades HTTP to WebSocket for real-time chat. Connect to global channel.",
|
|
||||||
"tags": []string{"WebSocket"},
|
|
||||||
"responses": map[string]any{
|
|
||||||
"101": map[string]any{
|
|
||||||
"description": "Switching Protocols - WebSocket upgrade successful",
|
|
||||||
},
|
|
||||||
"401": openapi.OpResponse("Unauthorized (when AUTH_ENABLED=true)", openapi.ErrorResponseSchema()),
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
// WebSocket with room
|
|
||||||
spec.AddPath("/api/chat-api/ws/{room}", "get", map[string]any{
|
|
||||||
"summary": "WebSocket connection with room",
|
|
||||||
"description": "Upgrades HTTP to WebSocket and joins the specified room.",
|
|
||||||
"tags": []string{"WebSocket"},
|
|
||||||
"parameters": []any{
|
|
||||||
map[string]any{
|
|
||||||
"name": "room",
|
|
||||||
"in": "path",
|
|
||||||
"description": "Room name to join on connect",
|
|
||||||
"required": true,
|
|
||||||
"schema": openapi.String(),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"responses": map[string]any{
|
|
||||||
"101": map[string]any{
|
|
||||||
"description": "Switching Protocols - WebSocket upgrade successful",
|
|
||||||
},
|
|
||||||
"401": openapi.OpResponse("Unauthorized (when AUTH_ENABLED=true)", openapi.ErrorResponseSchema()),
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
// WebSocket stats endpoint
|
|
||||||
spec.AddPath("/api/chat-api/ws/stats", "get", map[string]any{
|
|
||||||
"summary": "WebSocket statistics",
|
|
||||||
"description": "Returns current WebSocket connection statistics.",
|
|
||||||
"tags": []string{"WebSocket"},
|
|
||||||
"responses": map[string]any{
|
|
||||||
"200": openapi.OpResponse("Success", openapi.ResponseSchema(openapi.Ref("WebSocketStats"))),
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
return spec
|
return spec
|
||||||
}
|
}
|
||||||
|
|||||||
@ -18,9 +18,6 @@ type Config struct {
|
|||||||
// Auth
|
// Auth
|
||||||
AuthEnabled bool
|
AuthEnabled bool
|
||||||
JWTSecret string
|
JWTSecret string
|
||||||
|
|
||||||
// Redis
|
|
||||||
RedisURL string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load reads configuration from environment variables.
|
// Load reads configuration from environment variables.
|
||||||
@ -33,7 +30,5 @@ func Load() *Config {
|
|||||||
|
|
||||||
AuthEnabled: strings.EqualFold(os.Getenv("AUTH_ENABLED"), "true"),
|
AuthEnabled: strings.EqualFold(os.Getenv("AUTH_ENABLED"), "true"),
|
||||||
JWTSecret: os.Getenv("JWT_SECRET"),
|
JWTSecret: os.Getenv("JWT_SECRET"),
|
||||||
|
|
||||||
RedisURL: os.Getenv("REDIS_URL"),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user