package handlers import ( "context" "encoding/json" "errors" "fmt" "net/http" "time" "github.com/go-chi/chi/v5" "github.com/google/uuid" "github.com/orchard9/rdev/internal/domain" "github.com/orchard9/rdev/internal/logging" "github.com/orchard9/rdev/internal/port" "github.com/orchard9/rdev/pkg/api" ) // SessionExecRequest is the JSON body for executing a command in a session. type SessionExecRequest struct { Type string `json:"type"` // "claude", "shell", or "git" Prompt string `json:"prompt,omitempty"` // For claude commands Command string `json:"command,omitempty"` // For shell/git commands Args []string `json:"args,omitempty"` // Additional arguments StreamID string `json:"stream_id,omitempty"` // Client-provided stream ID (optional) ContinueConversation bool `json:"continue_conversation,omitempty"` // Resume stored claude_session_id ConversationID string `json:"conversation_id,omitempty"` // Explicit --resume ID override } // SessionExecResponse is the JSON response for a session exec command. type SessionExecResponse struct { ID string `json:"id"` SessionID string `json:"session_id"` Type string `json:"type"` Status string `json:"status"` StreamURL string `json:"stream_url"` } // Exec executes a command in the context of an active session. // POST /projects/{id}/sessions/{sid}/exec func (h *SessionsHandler) Exec(w http.ResponseWriter, r *http.Request) { projectID := chi.URLParam(r, "id") if err := domain.ValidateProjectID(projectID); err != nil { api.WriteBadRequest(w, r, "invalid project id") return } sid := chi.URLParam(r, "sid") if sid == "" { api.WriteBadRequest(w, r, "session id is required") return } var req SessionExecRequest if err := api.DecodeJSON(r, &req); err != nil { api.WriteBadRequest(w, r, "invalid request body") return } // Validate command type and type-specific fields. cmdType := domain.CommandType(req.Type) switch cmdType { case domain.CommandTypeClaude: if req.Prompt == "" { api.WriteBadRequest(w, r, "prompt is required for claude commands") return } case domain.CommandTypeShell: if req.Command == "" { api.WriteBadRequest(w, r, "command is required for shell commands") return } case domain.CommandTypeGit: if req.Command == "" { api.WriteBadRequest(w, r, "command is required for git commands") return } default: api.WriteBadRequest(w, r, "type must be claude, shell, or git") return } ctx, cancel := context.WithTimeout(r.Context(), TimeoutStandard) defer cancel() // Get session and verify it belongs to this project and is active. session, err := h.sessionService.Get(ctx, domain.SessionID(sid)) if err != nil { if errors.Is(err, domain.ErrSessionNotFound) { api.WriteNotFound(w, r, "session not found") return } api.WriteInternalError(w, r, "Failed to get session") return } if string(session.ProjectID) != projectID { api.WriteNotFound(w, r, "session not found") return } if !session.IsActive() { api.WriteBadRequest(w, r, "session is not active") return } // Touch activity. _ = h.sessionService.TouchActivity(ctx, session.ID) // Build command args. var args []string switch cmdType { case domain.CommandTypeClaude: args = append([]string{req.Prompt}, req.Args...) case domain.CommandTypeShell, domain.CommandTypeGit: args = append([]string{req.Command}, req.Args...) } // Generate stream ID. streamID := req.StreamID if streamID == "" { streamID = "session-" + uuid.New().String() } cmd := &domain.Command{ ID: domain.CommandID(streamID), ProjectID: domain.ProjectID(projectID), Type: cmdType, Args: args, StartedAt: time.Now(), } // Populate ResumeSessionID for claude commands. if cmdType == domain.CommandTypeClaude { if req.ConversationID != "" { cmd.ResumeSessionID = req.ConversationID } else if req.ContinueConversation && session.ClaudeSessionID != "" { cmd.ResumeSessionID = session.ClaudeSessionID } } streamURL := fmt.Sprintf("/projects/%s/sessions/%s/events?stream_id=%s", projectID, sid, streamID) // Compute background context before writing response so it carries logger/trace values // from the request context but is not cancelled when the HTTP handler returns. bgCtx := context.WithoutCancel(r.Context()) api.WriteCreated(w, r, SessionExecResponse{ ID: streamID, SessionID: string(session.ID), Type: req.Type, Status: "running", StreamURL: streamURL, }) go h.executeSessionCommand(bgCtx, cmd, session, streamID, req.Prompt) } // executeSessionCommand runs a command and streams output to subscribers. // For claude commands it parses JSONL output to capture session_id, persist conversation // records, and write user/assistant messages. // parentCtx must already be detached from any request lifecycle (use context.WithoutCancel). // prompt is the original user prompt text, used for conversation message persistence. func (h *SessionsHandler) executeSessionCommand(parentCtx context.Context, cmd *domain.Command, session *domain.Session, streamID, prompt string) { ctx, cancel := context.WithTimeout(parentCtx, TimeoutLongRunning) defer cancel() log := logging.FromContext(parentCtx) var capturedSessionID string var assistantResult string result, _ := h.executor.Execute(ctx, cmd, session.PodName, func(line domain.OutputLine) { if cmd.Type != domain.CommandTypeClaude { // Non-claude commands: plain output events. h.streams.Publish(streamID, port.StreamEvent{ Type: "output", Data: map[string]any{ "line": line.Line, "stream": line.Stream, }, }) return } // Claude commands: parse JSONL. if line.Stream == "stderr" { h.streams.Publish(streamID, port.StreamEvent{ Type: "error", Data: map[string]any{"line": line.Line}, }) return } var raw map[string]json.RawMessage if err := json.Unmarshal([]byte(line.Line), &raw); err == nil { // Capture session_id from the first event that carries it. if sidRaw, ok := raw["session_id"]; ok && capturedSessionID == "" { _ = json.Unmarshal(sidRaw, &capturedSessionID) } // Capture assistant result text from the "result" event. if typeRaw, ok := raw["type"]; ok { var evtType string if _ = json.Unmarshal(typeRaw, &evtType); evtType == "result" { if resRaw, ok := raw["result"]; ok { _ = json.Unmarshal(resRaw, &assistantResult) } } } h.streams.Publish(streamID, port.StreamEvent{ Type: "claude_event", Data: map[string]any{"event": json.RawMessage(line.Line)}, }) } else { // Non-JSON line from claude (e.g., startup messages): emit as output. h.streams.Publish(streamID, port.StreamEvent{ Type: "output", Data: map[string]any{ "line": line.Line, "stream": line.Stream, }, }) } }) // Persist claude_session_id and write message history for claude commands. // Use parentCtx (not ctx) so persistence is not cancelled by the command timeout. convID := session.ConversationRecordID if cmd.Type == domain.CommandTypeClaude && capturedSessionID != "" { // Ensure conversation record exists (create on first exec, reuse after). if convID == "" && h.conversationService != nil { sessionIDPrefix := string(session.ID) if len(sessionIDPrefix) > 8 { sessionIDPrefix = sessionIDPrefix[:8] } conv, err := h.conversationService.CreateConversation(parentCtx, string(session.ProjectID), fmt.Sprintf("Session %s", sessionIDPrefix), ) if err != nil { log.Error("failed to create conversation record", logging.FieldError, err, "session_id", session.ID, ) } else { convID = string(conv.ID) } } if convID != "" && h.conversationService != nil { // Write user prompt as a message. if prompt != "" { if _, err := h.conversationService.AddMessage(parentCtx, domain.ConversationID(convID), domain.MessageRoleUser, prompt); err != nil { log.Warn("failed to persist user message", logging.FieldError, err, "session_id", session.ID, ) } } // Write assistant response as a message. if assistantResult != "" { if _, err := h.conversationService.AddMessage(parentCtx, domain.ConversationID(convID), domain.MessageRoleAssistant, assistantResult); err != nil { log.Warn("failed to persist assistant message", logging.FieldError, err, "session_id", session.ID, ) } } } // Persist the claude session ID if it changed. if capturedSessionID != session.ClaudeSessionID { if err := h.sessionService.SetClaudeSessionID(parentCtx, session.ID, capturedSessionID, convID); err != nil { log.Error("failed to persist claude session ID", logging.FieldError, err, "session_id", session.ID, ) } } } // Publish completion event. h.streams.Publish(streamID, port.StreamEvent{ Type: "complete", Data: map[string]any{ "exit_code": result.ExitCode, "duration_ms": result.DurationMs, "claude_session_id": capturedSessionID, "conversation_id": convID, }, }) // Allow 5 seconds for SSE clients to receive the completion event and any Last-Event-ID reconnects. time.Sleep(5 * time.Second) h.streams.Close(streamID) } // Events streams session command output via Server-Sent Events. // GET /projects/{id}/sessions/{sid}/events func (h *SessionsHandler) Events(w http.ResponseWriter, r *http.Request) { projectID := chi.URLParam(r, "id") if err := domain.ValidateProjectID(projectID); err != nil { api.WriteBadRequest(w, r, "invalid project id") return } sid := chi.URLParam(r, "sid") if sid == "" { api.WriteBadRequest(w, r, "session id is required") return } streamID := r.URL.Query().Get("stream_id") lastEventID := r.Header.Get("Last-Event-ID") lookupCtx, lookupCancel := context.WithTimeout(r.Context(), TimeoutFastLookup) session, err := h.sessionService.Get(lookupCtx, domain.SessionID(sid)) lookupCancel() if err != nil { if errors.Is(err, domain.ErrSessionNotFound) { api.WriteNotFound(w, r, "session not found") return } api.WriteInternalError(w, r, "Failed to get session") return } if string(session.ProjectID) != projectID { api.WriteNotFound(w, r, "session not found") return } // Touch activity with its own budget so it does not fail if the lookup was slow. touchCtx, touchCancel := context.WithTimeout(r.Context(), TimeoutFastLookup) defer touchCancel() _ = h.sessionService.TouchActivity(touchCtx, session.ID) // Set SSE headers. w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("Access-Control-Allow-Origin", "*") flusher, ok := w.(http.Flusher) if !ok { api.WriteInternalError(w, r, "SSE not supported") return } // Subscribe to events with Last-Event-ID support. var events <-chan port.StreamEvent var cleanup func() if lastEventID != "" { events, cleanup = h.streams.SubscribeFromID(streamID, lastEventID) } else { events, cleanup = h.streams.Subscribe(streamID) } defer cleanup() // Send initial connected event. writeSSE(w, flusher, "connected", map[string]any{ "session_id": sid, "stream_id": streamID, "reconnecting": lastEventID != "", }) // Stream events until client disconnects or stream closes. reqCtx := r.Context() heartbeat := time.NewTicker(30 * time.Second) defer heartbeat.Stop() for { select { case <-reqCtx.Done(): return case event, ok := <-events: if !ok { return } writeSSEWithID(w, flusher, event.ID, event.Type, event.Data) if event.Type == "complete" { return } case <-heartbeat.C: writeSSE(w, flusher, "heartbeat", map[string]any{ "timestamp": time.Now().UTC().Format(time.RFC3339), }) } } }