package handlers import ( "context" "errors" "fmt" "net/http" "time" "github.com/go-chi/chi/v5" "github.com/orchard9/rdev/internal/domain" "github.com/orchard9/rdev/internal/port" "github.com/orchard9/rdev/pkg/api" ) // SessionExecRequest is the JSON body for executing a command in a session. type SessionExecRequest struct { Type string `json:"type"` // "claude", "shell", or "git" Prompt string `json:"prompt,omitempty"` // For claude commands Command string `json:"command,omitempty"` // For shell/git commands Args []string `json:"args,omitempty"` // Additional arguments StreamID string `json:"stream_id,omitempty"` // Client-provided stream ID (optional) } // SessionExecResponse is the JSON response for a session exec command. type SessionExecResponse struct { ID string `json:"id"` SessionID string `json:"session_id"` Type string `json:"type"` Status string `json:"status"` StreamURL string `json:"stream_url"` } // Exec executes a command in the context of an active session. // POST /projects/{id}/sessions/{sid}/exec func (h *SessionsHandler) Exec(w http.ResponseWriter, r *http.Request) { projectID := chi.URLParam(r, "id") if err := domain.ValidateProjectID(projectID); err != nil { api.WriteBadRequest(w, r, "invalid project id") return } sid := chi.URLParam(r, "sid") if sid == "" { api.WriteBadRequest(w, r, "session id is required") return } var req SessionExecRequest if err := api.DecodeJSON(r, &req); err != nil { api.WriteBadRequest(w, r, "invalid request body") return } // Validate command type and type-specific fields. cmdType := domain.CommandType(req.Type) switch cmdType { case domain.CommandTypeClaude: if req.Prompt == "" { api.WriteBadRequest(w, r, "prompt is required for claude commands") return } case domain.CommandTypeShell: if req.Command == "" { api.WriteBadRequest(w, r, "command is required for shell commands") return } case domain.CommandTypeGit: if req.Command == "" { api.WriteBadRequest(w, r, "command is required for git commands") return } default: api.WriteBadRequest(w, r, "type must be claude, shell, or git") return } ctx, cancel := context.WithTimeout(r.Context(), TimeoutStandard) defer cancel() // Get session and verify it belongs to this project and is active. session, err := h.sessionService.Get(ctx, domain.SessionID(sid)) if err != nil { if errors.Is(err, domain.ErrSessionNotFound) { api.WriteNotFound(w, r, "session not found") return } api.WriteInternalError(w, r, "Failed to get session") return } if string(session.ProjectID) != projectID { api.WriteNotFound(w, r, "session not found") return } if !session.IsActive() { api.WriteBadRequest(w, r, "session is not active") return } // Touch activity. _ = h.sessionService.TouchActivity(ctx, session.ID) // Build command args. var args []string switch cmdType { case domain.CommandTypeClaude: args = append([]string{req.Prompt}, req.Args...) case domain.CommandTypeShell, domain.CommandTypeGit: args = append([]string{req.Command}, req.Args...) } // Generate stream ID. streamID := req.StreamID if streamID == "" { streamID = fmt.Sprintf("session-%s-%d", sid, time.Now().UnixNano()) } cmd := &domain.Command{ ID: domain.CommandID(streamID), ProjectID: domain.ProjectID(projectID), Type: cmdType, Args: args, StartedAt: time.Now(), } // Execute in background goroutine. go h.executeSessionCommand(r.Context(), cmd, session.PodName, streamID) streamURL := fmt.Sprintf("/projects/%s/sessions/%s/events?stream_id=%s", projectID, sid, streamID) api.WriteCreated(w, r, SessionExecResponse{ ID: streamID, SessionID: string(session.ID), Type: req.Type, Status: "running", StreamURL: streamURL, }) } // executeSessionCommand runs a command and streams output to subscribers. func (h *SessionsHandler) executeSessionCommand(parentCtx context.Context, cmd *domain.Command, podName, streamID string) { ctx, cancel := context.WithTimeout(context.WithoutCancel(parentCtx), TimeoutLongRunning) defer cancel() result, _ := h.executor.Execute(ctx, cmd, podName, func(line domain.OutputLine) { h.streams.Publish(streamID, port.StreamEvent{ Type: "output", Data: map[string]any{ "line": line.Line, "stream": line.Stream, }, }) }) // Publish completion event. h.streams.Publish(streamID, port.StreamEvent{ Type: "complete", Data: map[string]any{ "exit_code": result.ExitCode, "duration_ms": result.DurationMs, }, }) // Allow subscribers time to receive the completion event before cleanup. time.Sleep(30 * time.Second) h.streams.Close(streamID) } // Events streams session command output via Server-Sent Events. // GET /projects/{id}/sessions/{sid}/events func (h *SessionsHandler) Events(w http.ResponseWriter, r *http.Request) { projectID := chi.URLParam(r, "id") if err := domain.ValidateProjectID(projectID); err != nil { api.WriteBadRequest(w, r, "invalid project id") return } sid := chi.URLParam(r, "sid") if sid == "" { api.WriteBadRequest(w, r, "session id is required") return } streamID := r.URL.Query().Get("stream_id") lastEventID := r.Header.Get("Last-Event-ID") ctx, cancel := context.WithTimeout(r.Context(), TimeoutFastLookup) defer cancel() // Verify session exists and belongs to project. session, err := h.sessionService.Get(ctx, domain.SessionID(sid)) if err != nil { if errors.Is(err, domain.ErrSessionNotFound) { api.WriteNotFound(w, r, "session not found") return } api.WriteInternalError(w, r, "Failed to get session") return } if string(session.ProjectID) != projectID { api.WriteNotFound(w, r, "session not found") return } // Touch activity. _ = h.sessionService.TouchActivity(ctx, session.ID) // Set SSE headers. w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("Access-Control-Allow-Origin", "*") flusher, ok := w.(http.Flusher) if !ok { api.WriteInternalError(w, r, "SSE not supported") return } // Subscribe to events with Last-Event-ID support. var events <-chan port.StreamEvent var cleanup func() if lastEventID != "" { events, cleanup = h.streams.SubscribeFromID(streamID, lastEventID) } else { events, cleanup = h.streams.Subscribe(streamID) } defer cleanup() // Send initial connected event. writeSSE(w, flusher, "connected", map[string]any{ "session_id": sid, "stream_id": streamID, "reconnecting": lastEventID != "", }) // Stream events until client disconnects or stream closes. reqCtx := r.Context() heartbeat := time.NewTicker(30 * time.Second) defer heartbeat.Stop() for { select { case <-reqCtx.Done(): return case event, ok := <-events: if !ok { return } writeSSEWithID(w, flusher, event.ID, event.Type, event.Data) if event.Type == "complete" { return } case <-heartbeat.C: writeSSE(w, flusher, "heartbeat", map[string]any{ "timestamp": time.Now().UTC().Format(time.RFC3339), }) } } }