rdev/internal/handlers/sessions_exec.go
jordan 3dbde72966
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
feat: add claude_id tracking and session improvements for interactive dev
- Add claude_id field to sessions (migration 026) for tracking Claude
  process IDs across pod restarts
- Extend session repository with UpdateClaudeID and session lookup methods
- Improve kubernetes executor with better error handling and exec streaming
- Add claudebox client/server improvements for session lifecycle
- Expand sessions handler with exec streaming endpoint
- Add comprehensive tests for sessions and kubernetes executor

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-24 00:20:32 -07:00

386 lines
12 KiB
Go

package handlers
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"time"
"github.com/go-chi/chi/v5"
"github.com/google/uuid"
"github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/logging"
"github.com/orchard9/rdev/internal/port"
"github.com/orchard9/rdev/pkg/api"
)
// SessionExecRequest is the JSON body for executing a command in a session.
type SessionExecRequest struct {
Type string `json:"type"` // "claude", "shell", or "git"
Prompt string `json:"prompt,omitempty"` // For claude commands
Command string `json:"command,omitempty"` // For shell/git commands
Args []string `json:"args,omitempty"` // Additional arguments
StreamID string `json:"stream_id,omitempty"` // Client-provided stream ID (optional)
ContinueConversation bool `json:"continue_conversation,omitempty"` // Resume stored claude_session_id
ConversationID string `json:"conversation_id,omitempty"` // Explicit --resume ID override
}
// SessionExecResponse is the JSON response for a session exec command.
type SessionExecResponse struct {
ID string `json:"id"`
SessionID string `json:"session_id"`
Type string `json:"type"`
Status string `json:"status"`
StreamURL string `json:"stream_url"`
}
// Exec executes a command in the context of an active session.
// POST /projects/{id}/sessions/{sid}/exec
func (h *SessionsHandler) Exec(w http.ResponseWriter, r *http.Request) {
projectID := chi.URLParam(r, "id")
if err := domain.ValidateProjectID(projectID); err != nil {
api.WriteBadRequest(w, r, "invalid project id")
return
}
sid := chi.URLParam(r, "sid")
if sid == "" {
api.WriteBadRequest(w, r, "session id is required")
return
}
var req SessionExecRequest
if err := api.DecodeJSON(r, &req); err != nil {
api.WriteBadRequest(w, r, "invalid request body")
return
}
// Validate command type and type-specific fields.
cmdType := domain.CommandType(req.Type)
switch cmdType {
case domain.CommandTypeClaude:
if req.Prompt == "" {
api.WriteBadRequest(w, r, "prompt is required for claude commands")
return
}
case domain.CommandTypeShell:
if req.Command == "" {
api.WriteBadRequest(w, r, "command is required for shell commands")
return
}
case domain.CommandTypeGit:
if req.Command == "" {
api.WriteBadRequest(w, r, "command is required for git commands")
return
}
default:
api.WriteBadRequest(w, r, "type must be claude, shell, or git")
return
}
ctx, cancel := context.WithTimeout(r.Context(), TimeoutStandard)
defer cancel()
// Get session and verify it belongs to this project and is active.
session, err := h.sessionService.Get(ctx, domain.SessionID(sid))
if err != nil {
if errors.Is(err, domain.ErrSessionNotFound) {
api.WriteNotFound(w, r, "session not found")
return
}
api.WriteInternalError(w, r, "Failed to get session")
return
}
if string(session.ProjectID) != projectID {
api.WriteNotFound(w, r, "session not found")
return
}
if !session.IsActive() {
api.WriteBadRequest(w, r, "session is not active")
return
}
// Touch activity.
_ = h.sessionService.TouchActivity(ctx, session.ID)
// Build command args.
var args []string
switch cmdType {
case domain.CommandTypeClaude:
args = append([]string{req.Prompt}, req.Args...)
case domain.CommandTypeShell, domain.CommandTypeGit:
args = append([]string{req.Command}, req.Args...)
}
// Generate stream ID.
streamID := req.StreamID
if streamID == "" {
streamID = "session-" + uuid.New().String()
}
cmd := &domain.Command{
ID: domain.CommandID(streamID),
ProjectID: domain.ProjectID(projectID),
Type: cmdType,
Args: args,
StartedAt: time.Now(),
}
// Populate ResumeSessionID for claude commands.
if cmdType == domain.CommandTypeClaude {
if req.ConversationID != "" {
cmd.ResumeSessionID = req.ConversationID
} else if req.ContinueConversation && session.ClaudeSessionID != "" {
cmd.ResumeSessionID = session.ClaudeSessionID
}
}
streamURL := fmt.Sprintf("/projects/%s/sessions/%s/events?stream_id=%s", projectID, sid, streamID)
// Compute background context before writing response so it carries logger/trace values
// from the request context but is not cancelled when the HTTP handler returns.
bgCtx := context.WithoutCancel(r.Context())
api.WriteCreated(w, r, SessionExecResponse{
ID: streamID,
SessionID: string(session.ID),
Type: req.Type,
Status: "running",
StreamURL: streamURL,
})
go h.executeSessionCommand(bgCtx, cmd, session, streamID, req.Prompt)
}
// executeSessionCommand runs a command and streams output to subscribers.
// For claude commands it parses JSONL output to capture session_id, persist conversation
// records, and write user/assistant messages.
// parentCtx must already be detached from any request lifecycle (use context.WithoutCancel).
// prompt is the original user prompt text, used for conversation message persistence.
func (h *SessionsHandler) executeSessionCommand(parentCtx context.Context, cmd *domain.Command, session *domain.Session, streamID, prompt string) {
ctx, cancel := context.WithTimeout(parentCtx, TimeoutLongRunning)
defer cancel()
log := logging.FromContext(parentCtx)
var capturedSessionID string
var assistantResult string
result, _ := h.executor.Execute(ctx, cmd, session.PodName, func(line domain.OutputLine) {
if cmd.Type != domain.CommandTypeClaude {
// Non-claude commands: plain output events.
h.streams.Publish(streamID, port.StreamEvent{
Type: "output",
Data: map[string]any{
"line": line.Line,
"stream": line.Stream,
},
})
return
}
// Claude commands: parse JSONL.
if line.Stream == "stderr" {
h.streams.Publish(streamID, port.StreamEvent{
Type: "error",
Data: map[string]any{"line": line.Line},
})
return
}
var raw map[string]json.RawMessage
if err := json.Unmarshal([]byte(line.Line), &raw); err == nil {
// Capture session_id from the first event that carries it.
if sidRaw, ok := raw["session_id"]; ok && capturedSessionID == "" {
_ = json.Unmarshal(sidRaw, &capturedSessionID)
}
// Capture assistant result text from the "result" event.
if typeRaw, ok := raw["type"]; ok {
var evtType string
if _ = json.Unmarshal(typeRaw, &evtType); evtType == "result" {
if resRaw, ok := raw["result"]; ok {
_ = json.Unmarshal(resRaw, &assistantResult)
}
}
}
h.streams.Publish(streamID, port.StreamEvent{
Type: "claude_event",
Data: map[string]any{"event": json.RawMessage(line.Line)},
})
} else {
// Non-JSON line from claude (e.g., startup messages): emit as output.
h.streams.Publish(streamID, port.StreamEvent{
Type: "output",
Data: map[string]any{
"line": line.Line,
"stream": line.Stream,
},
})
}
})
// Persist claude_session_id and write message history for claude commands.
// Use parentCtx (not ctx) so persistence is not cancelled by the command timeout.
convID := session.ConversationRecordID
if cmd.Type == domain.CommandTypeClaude && capturedSessionID != "" {
// Ensure conversation record exists (create on first exec, reuse after).
if convID == "" && h.conversationService != nil {
sessionIDPrefix := string(session.ID)
if len(sessionIDPrefix) > 8 {
sessionIDPrefix = sessionIDPrefix[:8]
}
conv, err := h.conversationService.CreateConversation(parentCtx,
string(session.ProjectID),
fmt.Sprintf("Session %s", sessionIDPrefix),
)
if err != nil {
log.Error("failed to create conversation record",
logging.FieldError, err,
"session_id", session.ID,
)
} else {
convID = string(conv.ID)
}
}
if convID != "" && h.conversationService != nil {
// Write user prompt as a message.
if prompt != "" {
if _, err := h.conversationService.AddMessage(parentCtx, domain.ConversationID(convID), domain.MessageRoleUser, prompt); err != nil {
log.Warn("failed to persist user message",
logging.FieldError, err,
"session_id", session.ID,
)
}
}
// Write assistant response as a message.
if assistantResult != "" {
if _, err := h.conversationService.AddMessage(parentCtx, domain.ConversationID(convID), domain.MessageRoleAssistant, assistantResult); err != nil {
log.Warn("failed to persist assistant message",
logging.FieldError, err,
"session_id", session.ID,
)
}
}
}
// Persist the claude session ID if it changed.
if capturedSessionID != session.ClaudeSessionID {
if err := h.sessionService.SetClaudeSessionID(parentCtx, session.ID, capturedSessionID, convID); err != nil {
log.Error("failed to persist claude session ID",
logging.FieldError, err,
"session_id", session.ID,
)
}
}
}
// Publish completion event.
h.streams.Publish(streamID, port.StreamEvent{
Type: "complete",
Data: map[string]any{
"exit_code": result.ExitCode,
"duration_ms": result.DurationMs,
"claude_session_id": capturedSessionID,
"conversation_id": convID,
},
})
// Allow 5 seconds for SSE clients to receive the completion event and any Last-Event-ID reconnects.
time.Sleep(5 * time.Second)
h.streams.Close(streamID)
}
// Events streams session command output via Server-Sent Events.
// GET /projects/{id}/sessions/{sid}/events
func (h *SessionsHandler) Events(w http.ResponseWriter, r *http.Request) {
projectID := chi.URLParam(r, "id")
if err := domain.ValidateProjectID(projectID); err != nil {
api.WriteBadRequest(w, r, "invalid project id")
return
}
sid := chi.URLParam(r, "sid")
if sid == "" {
api.WriteBadRequest(w, r, "session id is required")
return
}
streamID := r.URL.Query().Get("stream_id")
lastEventID := r.Header.Get("Last-Event-ID")
lookupCtx, lookupCancel := context.WithTimeout(r.Context(), TimeoutFastLookup)
session, err := h.sessionService.Get(lookupCtx, domain.SessionID(sid))
lookupCancel()
if err != nil {
if errors.Is(err, domain.ErrSessionNotFound) {
api.WriteNotFound(w, r, "session not found")
return
}
api.WriteInternalError(w, r, "Failed to get session")
return
}
if string(session.ProjectID) != projectID {
api.WriteNotFound(w, r, "session not found")
return
}
// Touch activity with its own budget so it does not fail if the lookup was slow.
touchCtx, touchCancel := context.WithTimeout(r.Context(), TimeoutFastLookup)
defer touchCancel()
_ = h.sessionService.TouchActivity(touchCtx, session.ID)
// Set SSE headers.
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
flusher, ok := w.(http.Flusher)
if !ok {
api.WriteInternalError(w, r, "SSE not supported")
return
}
// Subscribe to events with Last-Event-ID support.
var events <-chan port.StreamEvent
var cleanup func()
if lastEventID != "" {
events, cleanup = h.streams.SubscribeFromID(streamID, lastEventID)
} else {
events, cleanup = h.streams.Subscribe(streamID)
}
defer cleanup()
// Send initial connected event.
writeSSE(w, flusher, "connected", map[string]any{
"session_id": sid,
"stream_id": streamID,
"reconnecting": lastEventID != "",
})
// Stream events until client disconnects or stream closes.
reqCtx := r.Context()
heartbeat := time.NewTicker(30 * time.Second)
defer heartbeat.Stop()
for {
select {
case <-reqCtx.Done():
return
case event, ok := <-events:
if !ok {
return
}
writeSSEWithID(w, flusher, event.ID, event.Type, event.Data)
if event.Type == "complete" {
return
}
case <-heartbeat.C:
writeSSE(w, flusher, "heartbeat", map[string]any{
"timestamp": time.Now().UTC().Format(time.RFC3339),
})
}
}
}