Compare commits
No commits in common. "feature/websocket-chat" and "main" have entirely different histories.
feature/we
...
main
@ -1,4 +0,0 @@
|
|||||||
name: feature/websocket-chat
|
|
||||||
feature: websocket-chat
|
|
||||||
base_branch: main
|
|
||||||
created_at: 2026-02-05T21:46:00.799440711Z
|
|
||||||
@ -1,36 +0,0 @@
|
|||||||
version: 1
|
|
||||||
project:
|
|
||||||
name: workspace
|
|
||||||
branches:
|
|
||||||
main: main
|
|
||||||
feature_prefix: feature/
|
|
||||||
phases:
|
|
||||||
enabled:
|
|
||||||
- draft
|
|
||||||
- specified
|
|
||||||
- planned
|
|
||||||
- ready
|
|
||||||
- implementation
|
|
||||||
- review
|
|
||||||
- audit
|
|
||||||
- qa
|
|
||||||
- merge
|
|
||||||
- released
|
|
||||||
required_artifacts:
|
|
||||||
audit:
|
|
||||||
- audit
|
|
||||||
planned:
|
|
||||||
- spec
|
|
||||||
- design
|
|
||||||
- tasks
|
|
||||||
- qa_plan
|
|
||||||
qa:
|
|
||||||
- qa_results
|
|
||||||
review:
|
|
||||||
- review
|
|
||||||
specified:
|
|
||||||
- spec
|
|
||||||
compliance:
|
|
||||||
require_approvals: true
|
|
||||||
require_branch: true
|
|
||||||
require_qa: true
|
|
||||||
@ -1,210 +0,0 @@
|
|||||||
# Technical Design: WebSocket Chat
|
|
||||||
|
|
||||||
## Architecture Overview
|
|
||||||
|
|
||||||
This feature integrates the existing `pkg/realtime` package into the `chat-api` service to provide WebSocket chat functionality with Redis pub/sub for horizontal scaling.
|
|
||||||
|
|
||||||
```
|
|
||||||
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
|
|
||||||
│ Client 1 │ │ Client 2 │ │ Client N │
|
|
||||||
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
|
|
||||||
│ WebSocket │ WebSocket │ WebSocket
|
|
||||||
▼ ▼ ▼
|
|
||||||
┌──────────────────────────────────────────────────────┐
|
|
||||||
│ chat-api Pod 1 │
|
|
||||||
│ ┌─────────────────────────────────────────────────┐ │
|
|
||||||
│ │ realtime.Handler │ │
|
|
||||||
│ │ ┌─────────────────────────────────────────┐ │ │
|
|
||||||
│ │ │ LocalHub │ │ │
|
|
||||||
│ │ │ - connections map[id]Connection │ │ │
|
|
||||||
│ │ │ - rooms map[room]map[id]struct{} │ │ │
|
|
||||||
│ │ └─────────────────────────────────────────┘ │ │
|
|
||||||
│ └─────────────────────────────────────────────────┘ │
|
|
||||||
│ │ │
|
|
||||||
│ ┌─────────────────────────────────────────────────┐ │
|
|
||||||
│ │ RedisBroadcaster │ │
|
|
||||||
│ │ - Publish() → Redis channel │ │
|
|
||||||
│ │ - Run() ← Redis PSubscribe │ │
|
|
||||||
│ └─────────────────────────────────────────────────┘ │
|
|
||||||
└──────────────────────────┬───────────────────────────┘
|
|
||||||
│
|
|
||||||
┌────────────┴────────────┐
|
|
||||||
│ Redis Pub/Sub │
|
|
||||||
│ realtime:global │
|
|
||||||
│ realtime:room:{name} │
|
|
||||||
└────────────┬────────────┘
|
|
||||||
│
|
|
||||||
┌──────────────────────────┴───────────────────────────┐
|
|
||||||
│ chat-api Pod 2 │
|
|
||||||
│ (same structure) │
|
|
||||||
└──────────────────────────────────────────────────────┘
|
|
||||||
```
|
|
||||||
|
|
||||||
## Component Design
|
|
||||||
|
|
||||||
### 1. Configuration Extension
|
|
||||||
|
|
||||||
Extend `internal/config/config.go` to include Redis configuration:
|
|
||||||
|
|
||||||
```go
|
|
||||||
type Config struct {
|
|
||||||
// ... existing fields ...
|
|
||||||
|
|
||||||
// Redis
|
|
||||||
RedisURL string
|
|
||||||
}
|
|
||||||
|
|
||||||
func Load() *Config {
|
|
||||||
return &Config{
|
|
||||||
// ... existing ...
|
|
||||||
RedisURL: getEnv("REDIS_URL", "redis://localhost:6379"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
### 2. Service Bootstrap (main.go)
|
|
||||||
|
|
||||||
Wire up the realtime components in the service entry point:
|
|
||||||
|
|
||||||
```go
|
|
||||||
func main() {
|
|
||||||
cfg := config.Load()
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
// Create hub (local connection registry)
|
|
||||||
hub := realtime.NewHub(logger)
|
|
||||||
go hub.Run(ctx)
|
|
||||||
|
|
||||||
// Create Redis broadcaster (for multi-pod)
|
|
||||||
var broadcaster realtime.Broadcaster
|
|
||||||
if cfg.RedisURL != "" {
|
|
||||||
redisClient := redis.NewClient(parseRedisURL(cfg.RedisURL))
|
|
||||||
broadcaster = realtime.NewRedisBroadcaster(redisClient, hub, logger)
|
|
||||||
go broadcaster.Run(ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Register routes with realtime handler
|
|
||||||
api.RegisterRoutes(application, exampleService, hub, broadcaster)
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
### 3. Route Registration
|
|
||||||
|
|
||||||
Add WebSocket routes in `internal/api/routes.go`:
|
|
||||||
|
|
||||||
```go
|
|
||||||
func RegisterRoutes(
|
|
||||||
application *app.App,
|
|
||||||
exampleService *service.ExampleService,
|
|
||||||
hub realtime.Hub,
|
|
||||||
broadcaster realtime.Broadcaster,
|
|
||||||
) {
|
|
||||||
// ... existing routes ...
|
|
||||||
|
|
||||||
// WebSocket handler
|
|
||||||
wsHandler := realtime.NewHandler(hub, logger, realtime.HandlerConfig{
|
|
||||||
Broadcaster: broadcaster,
|
|
||||||
OnConnect: func(conn realtime.Connection) {
|
|
||||||
logger.Info("client connected", "id", conn.ID())
|
|
||||||
},
|
|
||||||
OnDisconnect: func(conn realtime.Connection) {
|
|
||||||
logger.Info("client disconnected", "id", conn.ID())
|
|
||||||
},
|
|
||||||
OnMessage: func(conn realtime.Connection, msg *realtime.Message) *realtime.Message {
|
|
||||||
// Set message type if not specified
|
|
||||||
if msg.Type == "" {
|
|
||||||
msg.Type = realtime.MessageTypeChat
|
|
||||||
}
|
|
||||||
return msg
|
|
||||||
},
|
|
||||||
AuthRequired: cfg.AuthEnabled,
|
|
||||||
})
|
|
||||||
|
|
||||||
// Mount WebSocket routes
|
|
||||||
application.Mount("/api/chat-api/ws", wsHandler.Routes())
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
### 4. Message Flow
|
|
||||||
|
|
||||||
1. **Client → Server (Incoming)**
|
|
||||||
- Client sends JSON message via WebSocket
|
|
||||||
- `WSClient.readPump()` decodes message
|
|
||||||
- `OnMessage` callback transforms/filters
|
|
||||||
- `RedisBroadcaster.Publish()` sends to Redis channel
|
|
||||||
|
|
||||||
2. **Redis → All Pods (Distribution)**
|
|
||||||
- Redis delivers to all subscribed pods
|
|
||||||
- `RedisBroadcaster.Run()` receives via PSubscribe
|
|
||||||
- Filters out messages from same pod (echo prevention)
|
|
||||||
- Forwards to `LocalHub.Broadcast()`
|
|
||||||
|
|
||||||
3. **Server → Clients (Outgoing)**
|
|
||||||
- `LocalHub.doBroadcast()` iterates room connections
|
|
||||||
- `WSClient.Send()` queues message
|
|
||||||
- `WSClient.writePump()` writes to WebSocket
|
|
||||||
|
|
||||||
### 5. Redis Channel Naming
|
|
||||||
|
|
||||||
| Pattern | Usage |
|
|
||||||
|---------|-------|
|
|
||||||
| `realtime:global` | Messages without room (broadcast to all) |
|
|
||||||
| `realtime:room:{name}` | Room-specific messages |
|
|
||||||
|
|
||||||
### 6. Graceful Shutdown
|
|
||||||
|
|
||||||
The existing `pkg/realtime` package handles:
|
|
||||||
- Context cancellation triggers hub shutdown
|
|
||||||
- Hub closes all connections
|
|
||||||
- WSClient sends close frame
|
|
||||||
- Redis broadcaster closes subscription
|
|
||||||
|
|
||||||
## File Changes
|
|
||||||
|
|
||||||
| File | Change |
|
|
||||||
|------|--------|
|
|
||||||
| `services/chat-api/internal/config/config.go` | Add `RedisURL` field |
|
|
||||||
| `services/chat-api/cmd/server/main.go` | Wire hub, broadcaster, Redis client |
|
|
||||||
| `services/chat-api/internal/api/routes.go` | Mount WebSocket handler |
|
|
||||||
| `services/chat-api/internal/api/spec.go` | Add WebSocket endpoint documentation |
|
|
||||||
|
|
||||||
## Testing Strategy
|
|
||||||
|
|
||||||
### Unit Tests
|
|
||||||
- Config loading with Redis URL
|
|
||||||
- Message transformation in OnMessage callback
|
|
||||||
|
|
||||||
### Integration Tests
|
|
||||||
- WebSocket connection upgrade
|
|
||||||
- Message send/receive flow
|
|
||||||
- Room-based isolation
|
|
||||||
- Multi-client broadcast
|
|
||||||
|
|
||||||
### Manual Testing
|
|
||||||
```bash
|
|
||||||
# Terminal 1: Start server
|
|
||||||
./scripts/dev.sh
|
|
||||||
|
|
||||||
# Terminal 2: Connect with wscat
|
|
||||||
wscat -c ws://localhost:8001/api/chat-api/ws
|
|
||||||
|
|
||||||
# Terminal 3: Another client
|
|
||||||
wscat -c ws://localhost:8001/api/chat-api/ws
|
|
||||||
|
|
||||||
# Send message from Terminal 2, verify received in Terminal 3
|
|
||||||
{"type":"chat","data":{"content":"Hello!"}}
|
|
||||||
```
|
|
||||||
|
|
||||||
## Security Considerations
|
|
||||||
|
|
||||||
1. **Authentication**: Optional auth via `AUTH_ENABLED` config
|
|
||||||
2. **Origin Check**: WebSocket upgrader validates Origin header (configurable)
|
|
||||||
3. **Message Size**: 64KB limit prevents memory exhaustion
|
|
||||||
4. **Buffer Limits**: 256-message buffer drops excess (prevents slow-client backpressure)
|
|
||||||
|
|
||||||
## Rollout Plan
|
|
||||||
|
|
||||||
1. Deploy with `REDIS_URL` set to enable cross-pod messaging
|
|
||||||
2. Without `REDIS_URL`, operates in single-pod mode (local broadcast only)
|
|
||||||
3. No database migrations required
|
|
||||||
4. Feature is additive (no breaking changes)
|
|
||||||
@ -1,50 +0,0 @@
|
|||||||
slug: websocket-chat
|
|
||||||
title: WebSocket Chat
|
|
||||||
created: 2026-02-05T21:41:24.394924276Z
|
|
||||||
branch: feature/websocket-chat
|
|
||||||
phase: implementation
|
|
||||||
phase_history:
|
|
||||||
- phase: draft
|
|
||||||
entered: 2026-02-05T21:41:24.394924276Z
|
|
||||||
exited: 2026-02-05T21:45:16.70675913Z
|
|
||||||
- phase: specified
|
|
||||||
entered: 2026-02-05T21:45:16.70675913Z
|
|
||||||
exited: 2026-02-05T21:45:55.781510539Z
|
|
||||||
- phase: planned
|
|
||||||
entered: 2026-02-05T21:45:55.781510539Z
|
|
||||||
exited: 2026-02-05T21:46:05.885128246Z
|
|
||||||
- phase: ready
|
|
||||||
entered: 2026-02-05T21:46:05.885128246Z
|
|
||||||
exited: 2026-02-05T21:46:05.895747946Z
|
|
||||||
- phase: implementation
|
|
||||||
entered: 2026-02-05T21:46:05.895747946Z
|
|
||||||
artifacts:
|
|
||||||
audit:
|
|
||||||
status: pending
|
|
||||||
path: audit.md
|
|
||||||
design:
|
|
||||||
status: approved
|
|
||||||
path: design.md
|
|
||||||
approved_by: user
|
|
||||||
approved_at: 2026-02-05T21:44:58.283726232Z
|
|
||||||
qa_plan:
|
|
||||||
status: approved
|
|
||||||
path: qa-plan.md
|
|
||||||
approved_by: user
|
|
||||||
approved_at: 2026-02-05T21:45:51.612922454Z
|
|
||||||
qa_results:
|
|
||||||
status: pending
|
|
||||||
path: qa-results.md
|
|
||||||
review:
|
|
||||||
status: pending
|
|
||||||
path: review.md
|
|
||||||
spec:
|
|
||||||
status: approved
|
|
||||||
path: spec.md
|
|
||||||
approved_by: user
|
|
||||||
approved_at: 2026-02-05T21:44:57.405079826Z
|
|
||||||
tasks:
|
|
||||||
status: approved
|
|
||||||
path: tasks.md
|
|
||||||
approved_by: user
|
|
||||||
approved_at: 2026-02-05T21:44:59.042871875Z
|
|
||||||
@ -1,38 +0,0 @@
|
|||||||
# QA Plan: WebSocket Chat
|
|
||||||
|
|
||||||
## Test Scope
|
|
||||||
|
|
||||||
### Unit Tests
|
|
||||||
| Test | File | Description |
|
|
||||||
|------|------|-------------|
|
|
||||||
| Config loading | `internal/config/config_test.go` | Verify RedisURL loads from env |
|
|
||||||
|
|
||||||
### Integration Tests
|
|
||||||
| Test | File | Description |
|
|
||||||
|------|------|-------------|
|
|
||||||
| WebSocket upgrade | `internal/api/handlers/websocket_test.go` | Verify HTTP→WS upgrade |
|
|
||||||
| Message broadcast | `internal/api/handlers/websocket_test.go` | Verify messages reach all clients |
|
|
||||||
| Room isolation | `internal/api/handlers/websocket_test.go` | Verify room messages stay in room |
|
|
||||||
| Ping/pong | `internal/api/handlers/websocket_test.go` | Verify heartbeat mechanism |
|
|
||||||
|
|
||||||
### Manual Tests
|
|
||||||
| Test | Steps | Expected |
|
|
||||||
|------|-------|----------|
|
|
||||||
| Basic chat | 1. Start server<br>2. Connect two wscat clients<br>3. Send message from one | Other client receives message |
|
|
||||||
| Room chat | 1. Connect clients to `/ws/room1`<br>2. Connect client to `/ws/room2`<br>3. Send to room1 | Only room1 clients receive |
|
|
||||||
| Disconnect | 1. Connect client<br>2. Kill client<br>3. Check server logs | Clean disconnect logged |
|
|
||||||
|
|
||||||
## Acceptance Criteria
|
|
||||||
- [ ] WebSocket endpoint accessible at `/api/chat-api/ws`
|
|
||||||
- [ ] Messages broadcast to all connected clients
|
|
||||||
- [ ] Room-based messaging isolates conversations
|
|
||||||
- [ ] Connection lifecycle properly managed
|
|
||||||
|
|
||||||
## Test Commands
|
|
||||||
```bash
|
|
||||||
# Run all Go tests
|
|
||||||
go test ./services/chat-api/... -v
|
|
||||||
|
|
||||||
# Manual WebSocket test
|
|
||||||
wscat -c ws://localhost:8001/api/chat-api/ws
|
|
||||||
```
|
|
||||||
@ -1,105 +0,0 @@
|
|||||||
# Feature Specification: WebSocket Chat
|
|
||||||
|
|
||||||
## Overview
|
|
||||||
|
|
||||||
Add real-time WebSocket chat capability to the chat-api service. This feature enables bidirectional communication between clients and the server, with message distribution via Redis pub/sub for horizontal scaling across multiple pods.
|
|
||||||
|
|
||||||
## Requirements
|
|
||||||
|
|
||||||
### Core Requirements
|
|
||||||
|
|
||||||
1. **WebSocket Endpoint**: `GET /ws` upgrades HTTP connections to WebSocket
|
|
||||||
2. **Message Publication**: Incoming messages from clients are published to a Redis channel
|
|
||||||
3. **Message Broadcasting**: A Redis subscriber broadcasts received messages to all connected clients
|
|
||||||
|
|
||||||
### Functional Requirements
|
|
||||||
|
|
||||||
| ID | Requirement | Priority |
|
|
||||||
|----|-------------|----------|
|
|
||||||
| FR-1 | WebSocket endpoint at `GET /api/chat-api/ws` upgrades HTTP to WebSocket | Must |
|
|
||||||
| FR-2 | WebSocket endpoint at `GET /api/chat-api/ws/{room}` allows joining specific rooms | Must |
|
|
||||||
| FR-3 | Incoming chat messages are published to Redis pub/sub channel | Must |
|
|
||||||
| FR-4 | Redis subscriber receives messages and broadcasts to all connected clients in the room | Must |
|
|
||||||
| FR-5 | Messages without a room are broadcast to all connected clients (global) | Must |
|
|
||||||
| FR-6 | Automatic ping/pong heartbeat maintains connection health | Must |
|
|
||||||
| FR-7 | Graceful connection lifecycle with proper cleanup on disconnect | Must |
|
|
||||||
|
|
||||||
### Non-Functional Requirements
|
|
||||||
|
|
||||||
| ID | Requirement | Priority |
|
|
||||||
|----|-------------|----------|
|
|
||||||
| NFR-1 | Connection supports 64KB max message size | Must |
|
|
||||||
| NFR-2 | 60-second pong timeout for dead connection detection | Must |
|
|
||||||
| NFR-3 | 256-message send buffer per connection | Must |
|
|
||||||
| NFR-4 | Multi-pod support via Redis pub/sub | Must |
|
|
||||||
| NFR-5 | Graceful shutdown closes all connections cleanly | Must |
|
|
||||||
|
|
||||||
## Message Format
|
|
||||||
|
|
||||||
Messages follow the standard `realtime.Message` structure:
|
|
||||||
|
|
||||||
```json
|
|
||||||
{
|
|
||||||
"id": "uuid",
|
|
||||||
"type": "chat",
|
|
||||||
"room": "room-name",
|
|
||||||
"from": "client-id",
|
|
||||||
"data": { "content": "message text" },
|
|
||||||
"timestamp": "2024-01-15T10:30:00Z"
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
### Message Types
|
|
||||||
|
|
||||||
- `chat` - User chat messages
|
|
||||||
- `presence` - Online/offline status changes
|
|
||||||
- `notification` - Server notifications
|
|
||||||
- `system` - System messages
|
|
||||||
- `error` - Error messages
|
|
||||||
- `ping`/`pong` - Client-initiated heartbeat
|
|
||||||
|
|
||||||
## API Endpoints
|
|
||||||
|
|
||||||
### WebSocket Connection
|
|
||||||
|
|
||||||
```
|
|
||||||
GET /api/chat-api/ws
|
|
||||||
GET /api/chat-api/ws/{room}
|
|
||||||
GET /api/chat-api/ws?room={room}
|
|
||||||
```
|
|
||||||
|
|
||||||
**Upgrade Headers:**
|
|
||||||
- `Connection: Upgrade`
|
|
||||||
- `Upgrade: websocket`
|
|
||||||
|
|
||||||
**Response:** 101 Switching Protocols (on success)
|
|
||||||
|
|
||||||
## Configuration
|
|
||||||
|
|
||||||
| Environment Variable | Description | Default |
|
|
||||||
|---------------------|-------------|---------|
|
|
||||||
| `REDIS_URL` | Redis connection URL | `redis://localhost:6379` |
|
|
||||||
| `AUTH_ENABLED` | Require authentication for WebSocket | `false` |
|
|
||||||
|
|
||||||
## Out of Scope
|
|
||||||
|
|
||||||
- Message persistence/history
|
|
||||||
- Typing indicators
|
|
||||||
- Read receipts
|
|
||||||
- User presence tracking (beyond connection events)
|
|
||||||
- Rate limiting (future enhancement)
|
|
||||||
|
|
||||||
## Success Criteria
|
|
||||||
|
|
||||||
1. WebSocket connections can be established at `/api/chat-api/ws`
|
|
||||||
2. Messages sent by one client are received by all other connected clients
|
|
||||||
3. Room-based messaging isolates conversations
|
|
||||||
4. Multi-pod deployment correctly distributes messages via Redis
|
|
||||||
5. Connection cleanup occurs properly on disconnect
|
|
||||||
|
|
||||||
## Dependencies
|
|
||||||
|
|
||||||
- Existing `pkg/realtime` package (WebSocket handling, hub, Redis broadcaster)
|
|
||||||
- Redis server for pub/sub
|
|
||||||
- `github.com/gorilla/websocket` (already in go.mod)
|
|
||||||
- `github.com/redis/go-redis/v9` (already in go.mod)
|
|
||||||
@ -1,304 +0,0 @@
|
|||||||
# Implementation Tasks: WebSocket Chat
|
|
||||||
|
|
||||||
## Task Overview
|
|
||||||
|
|
||||||
| ID | Task | Status | Blocked By |
|
|
||||||
|----|------|--------|------------|
|
|
||||||
| WS-1 | Extend config with Redis URL | pending | - |
|
|
||||||
| WS-2 | Wire realtime components in main.go | pending | WS-1 |
|
|
||||||
| WS-3 | Mount WebSocket handler in routes.go | pending | WS-2 |
|
|
||||||
| WS-4 | Add WebSocket documentation to spec.go | pending | WS-3 |
|
|
||||||
| WS-5 | Add integration tests for WebSocket chat | pending | WS-3 |
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## WS-1: Extend config with Redis URL
|
|
||||||
|
|
||||||
**Status:** pending
|
|
||||||
|
|
||||||
### Description
|
|
||||||
Add Redis URL configuration to the chat-api service config.
|
|
||||||
|
|
||||||
### Files to Modify
|
|
||||||
- `services/chat-api/internal/config/config.go`
|
|
||||||
|
|
||||||
### Implementation Details
|
|
||||||
|
|
||||||
Add to the `Config` struct:
|
|
||||||
```go
|
|
||||||
// Redis configuration for realtime pub/sub
|
|
||||||
RedisURL string
|
|
||||||
```
|
|
||||||
|
|
||||||
Add to the `Load()` function:
|
|
||||||
```go
|
|
||||||
RedisURL: os.Getenv("REDIS_URL"),
|
|
||||||
```
|
|
||||||
|
|
||||||
### Acceptance Criteria
|
|
||||||
- [ ] `Config` struct has `RedisURL` field
|
|
||||||
- [ ] `Load()` reads from `REDIS_URL` environment variable
|
|
||||||
- [ ] Empty string is valid (disables Redis, uses local-only broadcast)
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## WS-2: Wire realtime components in main.go
|
|
||||||
|
|
||||||
**Status:** pending
|
|
||||||
**Blocked By:** WS-1
|
|
||||||
|
|
||||||
### Description
|
|
||||||
Initialize the realtime hub and Redis broadcaster in the service entry point.
|
|
||||||
|
|
||||||
### Files to Modify
|
|
||||||
- `services/chat-api/cmd/server/main.go`
|
|
||||||
|
|
||||||
### Implementation Details
|
|
||||||
|
|
||||||
1. Import required packages:
|
|
||||||
```go
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"github.com/redis/go-redis/v9"
|
|
||||||
"git.threesix.ai/jordan/sp3-verify-1770325830/pkg/realtime"
|
|
||||||
"git.threesix.ai/jordan/sp3-verify-1770325830/services/chat-api/internal/config"
|
|
||||||
)
|
|
||||||
```
|
|
||||||
|
|
||||||
2. Load config and create context:
|
|
||||||
```go
|
|
||||||
cfg := config.Load()
|
|
||||||
ctx := context.Background()
|
|
||||||
```
|
|
||||||
|
|
||||||
3. Create hub:
|
|
||||||
```go
|
|
||||||
hub := realtime.NewHub(logger)
|
|
||||||
go hub.Run(ctx)
|
|
||||||
```
|
|
||||||
|
|
||||||
4. Create Redis broadcaster (optional):
|
|
||||||
```go
|
|
||||||
var broadcaster realtime.Broadcaster
|
|
||||||
if cfg.RedisURL != "" {
|
|
||||||
opts, err := redis.ParseURL(cfg.RedisURL)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error("invalid redis url", "error", err)
|
|
||||||
} else {
|
|
||||||
redisClient := redis.NewClient(opts)
|
|
||||||
broadcaster = realtime.NewRedisBroadcaster(redisClient, hub, logger)
|
|
||||||
go broadcaster.Run(ctx)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
5. Pass hub and broadcaster to route registration:
|
|
||||||
```go
|
|
||||||
api.RegisterRoutes(application, exampleService, hub, broadcaster)
|
|
||||||
```
|
|
||||||
|
|
||||||
### Acceptance Criteria
|
|
||||||
- [ ] Hub is created and running in a goroutine
|
|
||||||
- [ ] Redis broadcaster is created when `REDIS_URL` is set
|
|
||||||
- [ ] Redis broadcaster is nil when `REDIS_URL` is empty
|
|
||||||
- [ ] Hub and broadcaster are passed to route registration
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## WS-3: Mount WebSocket handler in routes.go
|
|
||||||
|
|
||||||
**Status:** pending
|
|
||||||
**Blocked By:** WS-2
|
|
||||||
|
|
||||||
### Description
|
|
||||||
Update the route registration to accept realtime components and mount the WebSocket handler.
|
|
||||||
|
|
||||||
### Files to Modify
|
|
||||||
- `services/chat-api/internal/api/routes.go`
|
|
||||||
|
|
||||||
### Implementation Details
|
|
||||||
|
|
||||||
1. Update function signature:
|
|
||||||
```go
|
|
||||||
func RegisterRoutes(
|
|
||||||
application *app.App,
|
|
||||||
exampleService *service.ExampleService,
|
|
||||||
hub realtime.Hub,
|
|
||||||
broadcaster realtime.Broadcaster,
|
|
||||||
) {
|
|
||||||
```
|
|
||||||
|
|
||||||
2. Import realtime package:
|
|
||||||
```go
|
|
||||||
import "git.threesix.ai/jordan/sp3-verify-1770325830/pkg/realtime"
|
|
||||||
```
|
|
||||||
|
|
||||||
3. Create and configure WebSocket handler:
|
|
||||||
```go
|
|
||||||
wsHandler := realtime.NewHandler(hub, logger, realtime.HandlerConfig{
|
|
||||||
Broadcaster: broadcaster,
|
|
||||||
OnConnect: func(conn realtime.Connection) {
|
|
||||||
logger.Info("websocket client connected",
|
|
||||||
"client_id", conn.ID(),
|
|
||||||
"user_id", conn.UserID(),
|
|
||||||
)
|
|
||||||
},
|
|
||||||
OnDisconnect: func(conn realtime.Connection) {
|
|
||||||
logger.Info("websocket client disconnected",
|
|
||||||
"client_id", conn.ID(),
|
|
||||||
)
|
|
||||||
},
|
|
||||||
OnMessage: func(conn realtime.Connection, msg *realtime.Message) *realtime.Message {
|
|
||||||
if msg.Type == "" {
|
|
||||||
msg.Type = realtime.MessageTypeChat
|
|
||||||
}
|
|
||||||
return msg
|
|
||||||
},
|
|
||||||
AuthRequired: cfg.AuthEnabled,
|
|
||||||
})
|
|
||||||
```
|
|
||||||
|
|
||||||
4. Mount WebSocket routes:
|
|
||||||
```go
|
|
||||||
application.Mount("/api/chat-api/ws", wsHandler.Routes())
|
|
||||||
```
|
|
||||||
|
|
||||||
### Acceptance Criteria
|
|
||||||
- [ ] Function signature accepts `hub` and `broadcaster` parameters
|
|
||||||
- [ ] WebSocket handler is created with appropriate callbacks
|
|
||||||
- [ ] Handler is mounted at `/api/chat-api/ws`
|
|
||||||
- [ ] Auth requirement respects `AUTH_ENABLED` config
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## WS-4: Add WebSocket documentation to spec.go
|
|
||||||
|
|
||||||
**Status:** pending
|
|
||||||
**Blocked By:** WS-3
|
|
||||||
|
|
||||||
### Description
|
|
||||||
Document the WebSocket endpoint in the OpenAPI specification.
|
|
||||||
|
|
||||||
### Files to Modify
|
|
||||||
- `services/chat-api/internal/api/spec.go`
|
|
||||||
|
|
||||||
### Implementation Details
|
|
||||||
|
|
||||||
Add WebSocket tag:
|
|
||||||
```go
|
|
||||||
spec.WithTag("WebSocket", "Real-time WebSocket endpoints")
|
|
||||||
```
|
|
||||||
|
|
||||||
Add WebSocket path documentation:
|
|
||||||
```go
|
|
||||||
spec.AddPath("/api/chat-api/ws", "get", map[string]any{
|
|
||||||
"summary": "WebSocket connection",
|
|
||||||
"description": "Upgrades to WebSocket for real-time chat. Messages are broadcast to all connected clients via Redis pub/sub.",
|
|
||||||
"tags": []string{"WebSocket"},
|
|
||||||
"responses": map[string]any{
|
|
||||||
"101": map[string]any{
|
|
||||||
"description": "Switching Protocols - WebSocket upgrade successful",
|
|
||||||
},
|
|
||||||
"401": openapi.OpResponse("Unauthorized - authentication required", nil),
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
spec.AddPath("/api/chat-api/ws/{room}", "get", map[string]any{
|
|
||||||
"summary": "WebSocket connection to room",
|
|
||||||
"description": "Upgrades to WebSocket and joins the specified room. Messages are broadcast only to clients in the same room.",
|
|
||||||
"tags": []string{"WebSocket"},
|
|
||||||
"parameters": []map[string]any{
|
|
||||||
{
|
|
||||||
"name": "room",
|
|
||||||
"in": "path",
|
|
||||||
"required": true,
|
|
||||||
"description": "Room identifier to join",
|
|
||||||
"schema": map[string]any{"type": "string"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"responses": map[string]any{
|
|
||||||
"101": map[string]any{
|
|
||||||
"description": "Switching Protocols - WebSocket upgrade successful",
|
|
||||||
},
|
|
||||||
"401": openapi.OpResponse("Unauthorized - authentication required", nil),
|
|
||||||
},
|
|
||||||
})
|
|
||||||
```
|
|
||||||
|
|
||||||
### Acceptance Criteria
|
|
||||||
- [ ] WebSocket tag added to spec
|
|
||||||
- [ ] `/api/chat-api/ws` endpoint documented
|
|
||||||
- [ ] `/api/chat-api/ws/{room}` endpoint documented with room parameter
|
|
||||||
- [ ] Response codes documented (101, 401)
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## WS-5: Add integration tests for WebSocket chat
|
|
||||||
|
|
||||||
**Status:** pending
|
|
||||||
**Blocked By:** WS-3
|
|
||||||
|
|
||||||
### Description
|
|
||||||
Add tests to verify WebSocket functionality.
|
|
||||||
|
|
||||||
### Files to Create
|
|
||||||
- `services/chat-api/internal/api/handlers/websocket_test.go`
|
|
||||||
|
|
||||||
### Implementation Details
|
|
||||||
|
|
||||||
Test cases:
|
|
||||||
1. **Connection upgrade** - Verify WebSocket upgrade succeeds
|
|
||||||
2. **Message broadcast** - Verify messages sent by one client reach others
|
|
||||||
3. **Room isolation** - Verify room messages don't leak to other rooms
|
|
||||||
4. **Ping/pong** - Verify heartbeat mechanism
|
|
||||||
|
|
||||||
Example test structure:
|
|
||||||
```go
|
|
||||||
package handlers_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"net/http"
|
|
||||||
"net/http/httptest"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/gorilla/websocket"
|
|
||||||
"git.threesix.ai/jordan/sp3-verify-1770325830/pkg/logging"
|
|
||||||
"git.threesix.ai/jordan/sp3-verify-1770325830/pkg/realtime"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestWebSocket_Connection(t *testing.T) {
|
|
||||||
// Create hub
|
|
||||||
hub := realtime.NewHub(logging.Nop())
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
defer cancel()
|
|
||||||
go hub.Run(ctx)
|
|
||||||
|
|
||||||
// Create handler
|
|
||||||
handler := realtime.NewHandler(hub, logging.Nop(), realtime.HandlerConfig{})
|
|
||||||
|
|
||||||
// Create test server
|
|
||||||
server := httptest.NewServer(handler.Routes())
|
|
||||||
defer server.Close()
|
|
||||||
|
|
||||||
// Connect WebSocket
|
|
||||||
wsURL := "ws" + server.URL[4:] + "/"
|
|
||||||
conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("dial failed: %v", err)
|
|
||||||
}
|
|
||||||
defer conn.Close()
|
|
||||||
|
|
||||||
// Verify connection count
|
|
||||||
time.Sleep(50 * time.Millisecond)
|
|
||||||
if hub.ConnectionCount() != 1 {
|
|
||||||
t.Errorf("expected 1 connection, got %d", hub.ConnectionCount())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
### Acceptance Criteria
|
|
||||||
- [ ] Test WebSocket connection upgrade works
|
|
||||||
- [ ] Test message broadcast to multiple clients
|
|
||||||
- [ ] Test room-based message isolation
|
|
||||||
- [ ] All tests pass
|
|
||||||
@ -1,63 +0,0 @@
|
|||||||
version: 1
|
|
||||||
project:
|
|
||||||
name: workspace
|
|
||||||
active_work:
|
|
||||||
features:
|
|
||||||
- slug: websocket-chat
|
|
||||||
branch: feature/websocket-chat
|
|
||||||
phase: implementation
|
|
||||||
blocked: []
|
|
||||||
last_updated: 2026-02-05T21:46:05.89630832Z
|
|
||||||
last_action: TRANSITION
|
|
||||||
last_actor: cli
|
|
||||||
history:
|
|
||||||
- timestamp: 2026-02-05T21:41:24.395367921Z
|
|
||||||
action: CREATE_FEATURE
|
|
||||||
feature: websocket-chat
|
|
||||||
actor: cli
|
|
||||||
result: success
|
|
||||||
- timestamp: 2026-02-05T21:44:57.410152204Z
|
|
||||||
action: APPROVE_ARTIFACT
|
|
||||||
feature: websocket-chat
|
|
||||||
actor: user
|
|
||||||
result: success
|
|
||||||
- timestamp: 2026-02-05T21:44:58.284093042Z
|
|
||||||
action: APPROVE_ARTIFACT
|
|
||||||
feature: websocket-chat
|
|
||||||
actor: user
|
|
||||||
result: success
|
|
||||||
- timestamp: 2026-02-05T21:44:59.043340487Z
|
|
||||||
action: APPROVE_ARTIFACT
|
|
||||||
feature: websocket-chat
|
|
||||||
actor: user
|
|
||||||
result: success
|
|
||||||
- timestamp: 2026-02-05T21:45:16.7071891Z
|
|
||||||
action: TRANSITION
|
|
||||||
feature: websocket-chat
|
|
||||||
actor: cli
|
|
||||||
result: success
|
|
||||||
- timestamp: 2026-02-05T21:45:51.613524427Z
|
|
||||||
action: APPROVE_ARTIFACT
|
|
||||||
feature: websocket-chat
|
|
||||||
actor: user
|
|
||||||
result: success
|
|
||||||
- timestamp: 2026-02-05T21:45:55.782110288Z
|
|
||||||
action: TRANSITION
|
|
||||||
feature: websocket-chat
|
|
||||||
actor: cli
|
|
||||||
result: success
|
|
||||||
- timestamp: 2026-02-05T21:46:00.807673749Z
|
|
||||||
action: CREATE_BRANCH
|
|
||||||
feature: websocket-chat
|
|
||||||
actor: cli
|
|
||||||
result: success
|
|
||||||
- timestamp: 2026-02-05T21:46:05.891255118Z
|
|
||||||
action: TRANSITION
|
|
||||||
feature: websocket-chat
|
|
||||||
actor: cli
|
|
||||||
result: success
|
|
||||||
- timestamp: 2026-02-05T21:46:05.896302369Z
|
|
||||||
action: TRANSITION
|
|
||||||
feature: websocket-chat
|
|
||||||
actor: cli
|
|
||||||
result: success
|
|
||||||
@ -2,49 +2,17 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
|
|
||||||
"github.com/redis/go-redis/v9"
|
|
||||||
|
|
||||||
"git.threesix.ai/jordan/sp3-verify-1770325830/pkg/app"
|
"git.threesix.ai/jordan/sp3-verify-1770325830/pkg/app"
|
||||||
"git.threesix.ai/jordan/sp3-verify-1770325830/pkg/logging"
|
"git.threesix.ai/jordan/sp3-verify-1770325830/pkg/logging"
|
||||||
"git.threesix.ai/jordan/sp3-verify-1770325830/pkg/realtime"
|
|
||||||
"git.threesix.ai/jordan/sp3-verify-1770325830/services/chat-api/internal/adapter/memory"
|
"git.threesix.ai/jordan/sp3-verify-1770325830/services/chat-api/internal/adapter/memory"
|
||||||
"git.threesix.ai/jordan/sp3-verify-1770325830/services/chat-api/internal/api"
|
"git.threesix.ai/jordan/sp3-verify-1770325830/services/chat-api/internal/api"
|
||||||
"git.threesix.ai/jordan/sp3-verify-1770325830/services/chat-api/internal/config"
|
|
||||||
"git.threesix.ai/jordan/sp3-verify-1770325830/services/chat-api/internal/service"
|
"git.threesix.ai/jordan/sp3-verify-1770325830/services/chat-api/internal/service"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
// Load configuration
|
|
||||||
cfg := config.Load()
|
|
||||||
|
|
||||||
// Create logger
|
// Create logger
|
||||||
logger := logging.Default()
|
logger := logging.Default()
|
||||||
|
|
||||||
// Create context for background goroutines
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
|
|
||||||
// Create realtime hub (local connection registry)
|
|
||||||
hub := realtime.NewHub(logger)
|
|
||||||
go hub.Run(ctx)
|
|
||||||
|
|
||||||
// Create Redis broadcaster for multi-pod scaling (optional)
|
|
||||||
var broadcaster realtime.Broadcaster
|
|
||||||
if cfg.RedisURL != "" {
|
|
||||||
opts, err := redis.ParseURL(cfg.RedisURL)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error("invalid redis url", "error", err)
|
|
||||||
} else {
|
|
||||||
redisClient := redis.NewClient(opts)
|
|
||||||
broadcaster = realtime.NewRedisBroadcaster(redisClient, hub, logger)
|
|
||||||
go broadcaster.Run(ctx)
|
|
||||||
logger.Info("redis broadcaster enabled", "url", cfg.RedisURL)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
logger.Info("redis broadcaster disabled, running in single-pod mode")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create adapters (repositories)
|
// Create adapters (repositories)
|
||||||
exampleRepo := memory.NewExampleRepository()
|
exampleRepo := memory.NewExampleRepository()
|
||||||
|
|
||||||
@ -54,14 +22,8 @@ func main() {
|
|||||||
// Create application
|
// Create application
|
||||||
application := app.New("chat-api", app.WithDefaultPort(8001))
|
application := app.New("chat-api", app.WithDefaultPort(8001))
|
||||||
|
|
||||||
// Register shutdown hook to cancel context
|
|
||||||
application.OnShutdown(func(_ context.Context) error {
|
|
||||||
cancel()
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
// Register routes with dependency injection
|
// Register routes with dependency injection
|
||||||
api.RegisterRoutes(application, exampleService, hub, broadcaster)
|
api.RegisterRoutes(application, exampleService)
|
||||||
|
|
||||||
// Start server
|
// Start server
|
||||||
application.Run()
|
application.Run()
|
||||||
|
|||||||
@ -1,271 +0,0 @@
|
|||||||
package handlers
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"net/http"
|
|
||||||
"net/http/httptest"
|
|
||||||
"strings"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/gorilla/websocket"
|
|
||||||
|
|
||||||
"git.threesix.ai/jordan/sp3-verify-1770325830/pkg/logging"
|
|
||||||
"git.threesix.ai/jordan/sp3-verify-1770325830/pkg/realtime"
|
|
||||||
)
|
|
||||||
|
|
||||||
// testSetup creates a hub and handler for WebSocket tests.
|
|
||||||
func testSetup(t *testing.T) (*realtime.LocalHub, *realtime.Handler, context.CancelFunc) {
|
|
||||||
t.Helper()
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
logger := logging.Nop()
|
|
||||||
|
|
||||||
hub := realtime.NewHub(logger)
|
|
||||||
go hub.Run(ctx)
|
|
||||||
|
|
||||||
handler := realtime.NewHandler(hub, logger, realtime.HandlerConfig{
|
|
||||||
Broadcaster: nil, // No Redis for tests
|
|
||||||
AuthRequired: false,
|
|
||||||
OnMessage: func(conn realtime.Connection, msg *realtime.Message) *realtime.Message {
|
|
||||||
if msg.Type == "" {
|
|
||||||
msg.Type = realtime.MessageTypeChat
|
|
||||||
}
|
|
||||||
return msg
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
return hub, handler, cancel
|
|
||||||
}
|
|
||||||
|
|
||||||
// dial connects to a WebSocket test server.
|
|
||||||
func dial(t *testing.T, server *httptest.Server, path string) *websocket.Conn {
|
|
||||||
t.Helper()
|
|
||||||
url := "ws" + strings.TrimPrefix(server.URL, "http") + path
|
|
||||||
conn, _, err := websocket.DefaultDialer.Dial(url, nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to dial websocket: %v", err)
|
|
||||||
}
|
|
||||||
return conn
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWebSocket_Connection(t *testing.T) {
|
|
||||||
hub, handler, cancel := testSetup(t)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
server := httptest.NewServer(handler.Routes())
|
|
||||||
defer server.Close()
|
|
||||||
|
|
||||||
// Connect
|
|
||||||
conn := dial(t, server, "/")
|
|
||||||
defer conn.Close()
|
|
||||||
|
|
||||||
// Wait for registration
|
|
||||||
time.Sleep(50 * time.Millisecond)
|
|
||||||
|
|
||||||
// Verify connection count
|
|
||||||
if count := hub.ConnectionCount(); count != 1 {
|
|
||||||
t.Errorf("expected 1 connection, got %d", count)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close and verify cleanup
|
|
||||||
conn.Close()
|
|
||||||
time.Sleep(50 * time.Millisecond)
|
|
||||||
|
|
||||||
if count := hub.ConnectionCount(); count != 0 {
|
|
||||||
t.Errorf("expected 0 connections after close, got %d", count)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWebSocket_RoomConnection(t *testing.T) {
|
|
||||||
hub, handler, cancel := testSetup(t)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
server := httptest.NewServer(handler.Routes())
|
|
||||||
defer server.Close()
|
|
||||||
|
|
||||||
// Connect to room
|
|
||||||
conn := dial(t, server, "/test-room")
|
|
||||||
defer conn.Close()
|
|
||||||
|
|
||||||
// Wait for registration
|
|
||||||
time.Sleep(50 * time.Millisecond)
|
|
||||||
|
|
||||||
// Verify room count
|
|
||||||
if count := hub.RoomCount("test-room"); count != 1 {
|
|
||||||
t.Errorf("expected 1 connection in room, got %d", count)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWebSocket_MessageBroadcast(t *testing.T) {
|
|
||||||
_, handler, cancel := testSetup(t)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
server := httptest.NewServer(handler.Routes())
|
|
||||||
defer server.Close()
|
|
||||||
|
|
||||||
// Connect two clients
|
|
||||||
conn1 := dial(t, server, "/")
|
|
||||||
defer conn1.Close()
|
|
||||||
|
|
||||||
conn2 := dial(t, server, "/")
|
|
||||||
defer conn2.Close()
|
|
||||||
|
|
||||||
// Wait for registration
|
|
||||||
time.Sleep(50 * time.Millisecond)
|
|
||||||
|
|
||||||
// Send message from conn1
|
|
||||||
msg := realtime.Message{
|
|
||||||
Type: realtime.MessageTypeChat,
|
|
||||||
Data: json.RawMessage(`{"content":"Hello"}`),
|
|
||||||
}
|
|
||||||
if err := conn1.WriteJSON(msg); err != nil {
|
|
||||||
t.Fatalf("failed to send message: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read from conn2
|
|
||||||
conn2.SetReadDeadline(time.Now().Add(2 * time.Second))
|
|
||||||
var received realtime.Message
|
|
||||||
if err := conn2.ReadJSON(&received); err != nil {
|
|
||||||
t.Fatalf("failed to receive message: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if received.Type != realtime.MessageTypeChat {
|
|
||||||
t.Errorf("expected type %s, got %s", realtime.MessageTypeChat, received.Type)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWebSocket_RoomIsolation(t *testing.T) {
|
|
||||||
_, handler, cancel := testSetup(t)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
server := httptest.NewServer(handler.Routes())
|
|
||||||
defer server.Close()
|
|
||||||
|
|
||||||
// Connect to room1
|
|
||||||
conn1 := dial(t, server, "/room1")
|
|
||||||
defer conn1.Close()
|
|
||||||
|
|
||||||
// Connect to room2
|
|
||||||
conn2 := dial(t, server, "/room2")
|
|
||||||
defer conn2.Close()
|
|
||||||
|
|
||||||
// Wait for registration
|
|
||||||
time.Sleep(50 * time.Millisecond)
|
|
||||||
|
|
||||||
// Send message to room1
|
|
||||||
msg := realtime.Message{
|
|
||||||
Type: realtime.MessageTypeChat,
|
|
||||||
Room: "room1",
|
|
||||||
Data: json.RawMessage(`{"content":"Room1 Only"}`),
|
|
||||||
}
|
|
||||||
if err := conn1.WriteJSON(msg); err != nil {
|
|
||||||
t.Fatalf("failed to send message: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set short timeout on conn2 - it should NOT receive the message
|
|
||||||
conn2.SetReadDeadline(time.Now().Add(200 * time.Millisecond))
|
|
||||||
var received realtime.Message
|
|
||||||
err := conn2.ReadJSON(&received)
|
|
||||||
|
|
||||||
// We expect a timeout error since room2 should not receive room1 messages
|
|
||||||
if err == nil {
|
|
||||||
t.Errorf("room2 client received message meant for room1: %+v", received)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWebSocket_PingPong(t *testing.T) {
|
|
||||||
_, handler, cancel := testSetup(t)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
server := httptest.NewServer(handler.Routes())
|
|
||||||
defer server.Close()
|
|
||||||
|
|
||||||
conn := dial(t, server, "/")
|
|
||||||
defer conn.Close()
|
|
||||||
|
|
||||||
// Wait for registration
|
|
||||||
time.Sleep(50 * time.Millisecond)
|
|
||||||
|
|
||||||
// Send ping
|
|
||||||
msg := realtime.Message{
|
|
||||||
Type: realtime.MessageTypePing,
|
|
||||||
}
|
|
||||||
if err := conn.WriteJSON(msg); err != nil {
|
|
||||||
t.Fatalf("failed to send ping: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Expect pong
|
|
||||||
conn.SetReadDeadline(time.Now().Add(2 * time.Second))
|
|
||||||
var received realtime.Message
|
|
||||||
if err := conn.ReadJSON(&received); err != nil {
|
|
||||||
t.Fatalf("failed to receive pong: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if received.Type != realtime.MessageTypePong {
|
|
||||||
t.Errorf("expected type %s, got %s", realtime.MessageTypePong, received.Type)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWebSocket_AuthRequired(t *testing.T) {
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
defer cancel()
|
|
||||||
logger := logging.Nop()
|
|
||||||
|
|
||||||
hub := realtime.NewHub(logger)
|
|
||||||
go hub.Run(ctx)
|
|
||||||
|
|
||||||
handler := realtime.NewHandler(hub, logger, realtime.HandlerConfig{
|
|
||||||
AuthRequired: true, // Require auth
|
|
||||||
})
|
|
||||||
|
|
||||||
server := httptest.NewServer(handler.Routes())
|
|
||||||
defer server.Close()
|
|
||||||
|
|
||||||
// Try to connect without auth
|
|
||||||
url := "ws" + strings.TrimPrefix(server.URL, "http") + "/"
|
|
||||||
_, resp, err := websocket.DefaultDialer.Dial(url, nil)
|
|
||||||
|
|
||||||
// Should fail with 401
|
|
||||||
if err == nil {
|
|
||||||
t.Error("expected connection to fail without auth")
|
|
||||||
}
|
|
||||||
if resp != nil && resp.StatusCode != http.StatusUnauthorized {
|
|
||||||
t.Errorf("expected status 401, got %d", resp.StatusCode)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWebSocket_MultipleClients(t *testing.T) {
|
|
||||||
hub, handler, cancel := testSetup(t)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
server := httptest.NewServer(handler.Routes())
|
|
||||||
defer server.Close()
|
|
||||||
|
|
||||||
// Connect multiple clients
|
|
||||||
const numClients = 5
|
|
||||||
conns := make([]*websocket.Conn, numClients)
|
|
||||||
for i := 0; i < numClients; i++ {
|
|
||||||
conns[i] = dial(t, server, "/")
|
|
||||||
defer conns[i].Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for registration
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
|
|
||||||
if count := hub.ConnectionCount(); count != numClients {
|
|
||||||
t.Errorf("expected %d connections, got %d", numClients, count)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close half the clients
|
|
||||||
for i := 0; i < numClients/2; i++ {
|
|
||||||
conns[i].Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
|
|
||||||
expected := numClients - numClients/2
|
|
||||||
if count := hub.ConnectionCount(); count != expected {
|
|
||||||
t.Errorf("expected %d connections after partial close, got %d", expected, count)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -4,7 +4,6 @@ package api
|
|||||||
import (
|
import (
|
||||||
"git.threesix.ai/jordan/sp3-verify-1770325830/pkg/app"
|
"git.threesix.ai/jordan/sp3-verify-1770325830/pkg/app"
|
||||||
"git.threesix.ai/jordan/sp3-verify-1770325830/pkg/auth"
|
"git.threesix.ai/jordan/sp3-verify-1770325830/pkg/auth"
|
||||||
"git.threesix.ai/jordan/sp3-verify-1770325830/pkg/realtime"
|
|
||||||
"git.threesix.ai/jordan/sp3-verify-1770325830/services/chat-api/internal/api/handlers"
|
"git.threesix.ai/jordan/sp3-verify-1770325830/services/chat-api/internal/api/handlers"
|
||||||
"git.threesix.ai/jordan/sp3-verify-1770325830/services/chat-api/internal/config"
|
"git.threesix.ai/jordan/sp3-verify-1770325830/services/chat-api/internal/config"
|
||||||
"git.threesix.ai/jordan/sp3-verify-1770325830/services/chat-api/internal/service"
|
"git.threesix.ai/jordan/sp3-verify-1770325830/services/chat-api/internal/service"
|
||||||
@ -15,13 +14,7 @@ import (
|
|||||||
// This allows the monorepo to expose multiple services under a single domain:
|
// This allows the monorepo to expose multiple services under a single domain:
|
||||||
// - https://domain/api/chat-api/health
|
// - https://domain/api/chat-api/health
|
||||||
// - https://domain/api/chat-api/examples
|
// - https://domain/api/chat-api/examples
|
||||||
// - wss://domain/api/chat-api/ws
|
func RegisterRoutes(application *app.App, exampleService *service.ExampleService) {
|
||||||
func RegisterRoutes(
|
|
||||||
application *app.App,
|
|
||||||
exampleService *service.ExampleService,
|
|
||||||
hub realtime.Hub,
|
|
||||||
broadcaster realtime.Broadcaster,
|
|
||||||
) {
|
|
||||||
logger := application.Logger()
|
logger := application.Logger()
|
||||||
cfg := config.Load()
|
cfg := config.Load()
|
||||||
|
|
||||||
@ -29,36 +22,10 @@ func RegisterRoutes(
|
|||||||
healthHandler := handlers.NewHealth(logger)
|
healthHandler := handlers.NewHealth(logger)
|
||||||
exampleHandler := handlers.NewExample(exampleService, logger)
|
exampleHandler := handlers.NewExample(exampleService, logger)
|
||||||
|
|
||||||
// Initialize WebSocket handler
|
|
||||||
wsHandler := realtime.NewHandler(hub, logger, realtime.HandlerConfig{
|
|
||||||
Broadcaster: broadcaster,
|
|
||||||
OnConnect: func(conn realtime.Connection) {
|
|
||||||
logger.Info("websocket client connected",
|
|
||||||
"client_id", conn.ID(),
|
|
||||||
"user_id", conn.UserID(),
|
|
||||||
)
|
|
||||||
},
|
|
||||||
OnDisconnect: func(conn realtime.Connection) {
|
|
||||||
logger.Info("websocket client disconnected",
|
|
||||||
"client_id", conn.ID(),
|
|
||||||
)
|
|
||||||
},
|
|
||||||
OnMessage: func(conn realtime.Connection, msg *realtime.Message) *realtime.Message {
|
|
||||||
if msg.Type == "" {
|
|
||||||
msg.Type = realtime.MessageTypeChat
|
|
||||||
}
|
|
||||||
return msg
|
|
||||||
},
|
|
||||||
AuthRequired: cfg.AuthEnabled,
|
|
||||||
})
|
|
||||||
|
|
||||||
// Build and mount OpenAPI spec
|
// Build and mount OpenAPI spec
|
||||||
spec := NewServiceSpec()
|
spec := NewServiceSpec()
|
||||||
application.EnableDocs(spec)
|
application.EnableDocs(spec)
|
||||||
|
|
||||||
// Mount WebSocket routes
|
|
||||||
application.Mount("/api/chat-api/ws", wsHandler.Routes())
|
|
||||||
|
|
||||||
// Register API routes under /api/{service-name} to match ingress path routing.
|
// Register API routes under /api/{service-name} to match ingress path routing.
|
||||||
// The ingress routes /api/chat-api/* to this service.
|
// The ingress routes /api/chat-api/* to this service.
|
||||||
application.Route("/api/chat-api", func(r app.Router) {
|
application.Route("/api/chat-api", func(r app.Router) {
|
||||||
|
|||||||
@ -8,8 +8,7 @@ func NewServiceSpec() *openapi.OpenAPISpec {
|
|||||||
WithDescription("REST API for the chat-api service").
|
WithDescription("REST API for the chat-api service").
|
||||||
WithBearerSecurity("bearer", "JWT authentication token").
|
WithBearerSecurity("bearer", "JWT authentication token").
|
||||||
WithTag("Health", "Service health endpoints").
|
WithTag("Health", "Service health endpoints").
|
||||||
WithTag("Examples", "Example CRUD endpoints").
|
WithTag("Examples", "Example CRUD endpoints")
|
||||||
WithTag("WebSocket", "Real-time WebSocket endpoints")
|
|
||||||
|
|
||||||
// Define reusable schemas
|
// Define reusable schemas
|
||||||
spec.WithSchema("Example", openapi.Object(map[string]openapi.Schema{
|
spec.WithSchema("Example", openapi.Object(map[string]openapi.Schema{
|
||||||
@ -109,50 +108,5 @@ func NewServiceSpec() *openapi.OpenAPISpec {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
// WebSocket message schema
|
|
||||||
spec.WithSchema("WebSocketMessage", openapi.Object(map[string]openapi.Schema{
|
|
||||||
"id": openapi.String().WithDescription("Message ID (set by server)"),
|
|
||||||
"type": openapi.String().WithDescription("Message type: chat, presence, notification, system, error, ping, pong"),
|
|
||||||
"room": openapi.String().WithDescription("Target room (optional)"),
|
|
||||||
"from": openapi.String().WithDescription("Sender client ID (set by server)"),
|
|
||||||
"data": openapi.Object(nil).WithDescription("Message payload"),
|
|
||||||
"timestamp": openapi.DateTime().WithDescription("Message timestamp"),
|
|
||||||
}, "type"))
|
|
||||||
|
|
||||||
// WebSocket connection
|
|
||||||
spec.AddPath("/api/chat-api/ws", "get", map[string]any{
|
|
||||||
"summary": "WebSocket connection",
|
|
||||||
"description": "Upgrades to WebSocket for real-time chat. Messages are broadcast to all connected clients via Redis pub/sub. Send JSON messages matching the WebSocketMessage schema.",
|
|
||||||
"tags": []string{"WebSocket"},
|
|
||||||
"responses": map[string]any{
|
|
||||||
"101": map[string]any{
|
|
||||||
"description": "Switching Protocols - WebSocket upgrade successful",
|
|
||||||
},
|
|
||||||
"401": openapi.OpResponse("Unauthorized - authentication required (when AUTH_ENABLED=true)", openapi.ErrorResponseSchema()),
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
// WebSocket connection to room
|
|
||||||
spec.AddPath("/api/chat-api/ws/{room}", "get", map[string]any{
|
|
||||||
"summary": "WebSocket connection to room",
|
|
||||||
"description": "Upgrades to WebSocket and joins the specified room. Messages are broadcast only to clients in the same room.",
|
|
||||||
"tags": []string{"WebSocket"},
|
|
||||||
"parameters": []any{
|
|
||||||
map[string]any{
|
|
||||||
"name": "room",
|
|
||||||
"in": "path",
|
|
||||||
"required": true,
|
|
||||||
"description": "Room identifier to join",
|
|
||||||
"schema": map[string]any{"type": "string"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"responses": map[string]any{
|
|
||||||
"101": map[string]any{
|
|
||||||
"description": "Switching Protocols - WebSocket upgrade successful",
|
|
||||||
},
|
|
||||||
"401": openapi.OpResponse("Unauthorized - authentication required (when AUTH_ENABLED=true)", openapi.ErrorResponseSchema()),
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
return spec
|
return spec
|
||||||
}
|
}
|
||||||
|
|||||||
@ -18,9 +18,6 @@ type Config struct {
|
|||||||
// Auth
|
// Auth
|
||||||
AuthEnabled bool
|
AuthEnabled bool
|
||||||
JWTSecret string
|
JWTSecret string
|
||||||
|
|
||||||
// Redis configuration for realtime pub/sub
|
|
||||||
RedisURL string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load reads configuration from environment variables.
|
// Load reads configuration from environment variables.
|
||||||
@ -33,6 +30,5 @@ func Load() *Config {
|
|||||||
|
|
||||||
AuthEnabled: strings.EqualFold(os.Getenv("AUTH_ENABLED"), "true"),
|
AuthEnabled: strings.EqualFold(os.Getenv("AUTH_ENABLED"), "true"),
|
||||||
JWTSecret: os.Getenv("JWT_SECRET"),
|
JWTSecret: os.Getenv("JWT_SECRET"),
|
||||||
RedisURL: os.Getenv("REDIS_URL"),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user