slack-auth-1770276413/pkg/realtime/websocket.go
jordan 5812d7a868
All checks were successful
ci/woodpecker/manual/woodpecker Pipeline was successful
ci/woodpecker/push/woodpecker Pipeline was successful
Initialize project from skeleton template
2026-02-05 07:26:54 +00:00

224 lines
5.2 KiB
Go

package realtime
import (
"context"
"encoding/json"
"net/http"
"sync"
"time"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"git.threesix.ai/jordan/slack-auth-1770276413/pkg/logging"
)
const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
// Maximum message size allowed from peer.
maxMessageSize = 64 * 1024 // 64KB
// Size of the send channel buffer.
sendBufferSize = 256
)
// WSClient represents a WebSocket connection to the hub.
type WSClient struct {
id string
userID string
hub Hub
conn *websocket.Conn
send chan *Message
logger *logging.Logger
onMsg func(*WSClient, *Message) // Optional message callback
closeOnce sync.Once
}
// Ensure WSClient implements Connection at compile time.
var _ Connection = (*WSClient)(nil)
// WSClientConfig configures a new WebSocket client.
type WSClientConfig struct {
// UserID is the authenticated user ID (empty if anonymous).
UserID string
// OnMessage is called for each incoming message.
// If nil, messages are ignored (useful for broadcast-only connections).
OnMessage func(*WSClient, *Message)
}
// NewWSClient creates a new WebSocket client from an upgraded connection.
func NewWSClient(hub Hub, conn *websocket.Conn, logger *logging.Logger, cfg WSClientConfig) *WSClient {
return &WSClient{
id: uuid.New().String(),
userID: cfg.UserID,
hub: hub,
conn: conn,
send: make(chan *Message, sendBufferSize),
logger: logger.WithComponent("ws-client"),
onMsg: cfg.OnMessage,
}
}
// ID returns the unique connection identifier.
func (c *WSClient) ID() string {
return c.id
}
// UserID returns the authenticated user ID.
func (c *WSClient) UserID() string {
return c.userID
}
// Send queues a message for delivery.
func (c *WSClient) Send(msg *Message) bool {
select {
case c.send <- msg:
return true
default:
// Buffer full, message dropped
c.logger.Warn("send buffer full, dropping message", "client_id", c.id)
return false
}
}
// Close gracefully closes the connection.
func (c *WSClient) Close() {
c.closeOnce.Do(func() {
close(c.send)
})
}
// Run starts the read and write pumps. Call after registering with hub.
// This method blocks until the connection is closed.
func (c *WSClient) Run(ctx context.Context) {
// Register with hub
c.hub.Register(c)
defer c.hub.Unregister(c)
// Start pumps
go c.writePump(ctx)
c.readPump(ctx)
}
// readPump reads messages from the WebSocket connection.
func (c *WSClient) readPump(ctx context.Context) {
defer func() {
c.Close()
_ = c.conn.Close()
}()
c.conn.SetReadLimit(maxMessageSize)
_ = c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error {
_ = c.conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
for {
select {
case <-ctx.Done():
return
default:
}
_, data, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
c.logger.Debug("websocket read error", "client_id", c.id, "error", err)
}
return
}
// Parse message
var msg Message
if err := json.Unmarshal(data, &msg); err != nil {
c.logger.Debug("invalid message format", "client_id", c.id, "error", err)
continue
}
// Set server-side fields
msg.From = c.id
if msg.Timestamp.IsZero() {
msg.Timestamp = time.Now().UTC()
}
// Handle ping messages locally
if msg.Type == MessageTypePing {
pong := &Message{
Type: MessageTypePong,
Timestamp: time.Now().UTC(),
}
c.Send(pong)
continue
}
// Dispatch to callback if set
if c.onMsg != nil {
c.onMsg(c, &msg)
}
}
}
// writePump writes messages to the WebSocket connection.
func (c *WSClient) writePump(ctx context.Context) {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
_ = c.conn.Close()
}()
for {
select {
case <-ctx.Done():
// Send close message
_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
_ = c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
case msg, ok := <-c.send:
_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// Channel closed
_ = c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
if err := c.conn.WriteJSON(msg); err != nil {
c.logger.Debug("websocket write error", "client_id", c.id, "error", err)
return
}
case <-ticker.C:
_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
// Upgrader is a pre-configured WebSocket upgrader.
// Customize CheckOrigin in production for security.
var Upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
// TODO: Configure for production (check Origin header)
return true
},
}
// UpgradeConnection upgrades an HTTP connection to WebSocket.
func UpgradeConnection(w http.ResponseWriter, r *http.Request) (*websocket.Conn, error) {
return Upgrader.Upgrade(w, r, nil)
}