package handlers import ( "net/http" "time" "github.com/go-chi/chi/v5" "github.com/gorilla/websocket" "github.com/orchard9/rdev/internal/auth" "github.com/orchard9/rdev/internal/port" "github.com/orchard9/rdev/pkg/api" ) // BuildsWSHandler handles WebSocket connections for build event streaming. type BuildsWSHandler struct { streams port.StreamPublisher upgrader websocket.Upgrader } // NewBuildsWSHandler creates a new WebSocket handler for builds. func NewBuildsWSHandler(streams port.StreamPublisher) *BuildsWSHandler { return &BuildsWSHandler{ streams: streams, upgrader: websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, CheckOrigin: func(r *http.Request) bool { return true // Allow all origins (configure for production) }, }, } } // Mount registers the WebSocket routes. func (h *BuildsWSHandler) Mount(r api.Router) { r.With(auth.RequireScope(auth.ScopeBuildRead, auth.ScopeAdmin)). Get("/builds/{taskId}/ws", h.StreamEvents) } // wsMessage is the structure sent over WebSocket. type wsMessage struct { ID string `json:"id,omitempty"` Type string `json:"type"` TaskID string `json:"task_id,omitempty"` Timestamp string `json:"timestamp,omitempty"` Data map[string]any `json:"data,omitempty"` } // StreamEvents handles WebSocket connections for streaming build events. // GET /builds/{taskId}/ws func (h *BuildsWSHandler) StreamEvents(w http.ResponseWriter, r *http.Request) { taskID := chi.URLParam(r, "taskId") if taskID == "" { api.WriteBadRequest(w, r, "task ID is required") return } // Get optional last event ID from query string lastEventID := r.URL.Query().Get("last_event_id") // Upgrade to WebSocket conn, err := h.upgrader.Upgrade(w, r, nil) if err != nil { // Upgrade already wrote error response return } defer func() { _ = conn.Close() }() // Subscribe to events var events <-chan port.StreamEvent var cleanup func() if lastEventID != "" { events, cleanup = h.streams.SubscribeFromID(taskID, lastEventID) } else { events, cleanup = h.streams.Subscribe(taskID) } defer cleanup() // Send connected message _ = conn.WriteJSON(wsMessage{ Type: "connected", TaskID: taskID, Timestamp: time.Now().UTC().Format(time.RFC3339), Data: map[string]any{ "reconnecting": lastEventID != "", }, }) // Set up ping/pong for keepalive conn.SetPongHandler(func(string) error { return conn.SetReadDeadline(time.Now().Add(60 * time.Second)) }) // Start a goroutine to read from WebSocket (for close detection) done := make(chan struct{}) go func() { defer close(done) for { _, _, err := conn.ReadMessage() if err != nil { return } } }() // Stream events pingTicker := time.NewTicker(30 * time.Second) defer pingTicker.Stop() for { select { case <-done: // Client disconnected return case event, ok := <-events: if !ok { // Stream closed _ = conn.WriteJSON(wsMessage{ Type: "stream_closed", TaskID: taskID, Timestamp: time.Now().UTC().Format(time.RFC3339), }) return } // Convert port.StreamEvent to wsMessage msg := wsMessage{ ID: event.ID, Type: event.Type, TaskID: event.TaskID, Timestamp: event.Timestamp.Format(time.RFC3339), Data: event.Data, } if err := conn.WriteJSON(msg); err != nil { return // Write error, close connection } // Check for terminal events if event.Type == "build.completed" || event.Type == "build.failed" { // Give client time to process final message time.Sleep(100 * time.Millisecond) return } case <-pingTicker.C: if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { return } } } }