All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
- Add claude_id field to sessions (migration 026) for tracking Claude process IDs across pod restarts - Extend session repository with UpdateClaudeID and session lookup methods - Improve kubernetes executor with better error handling and exec streaming - Add claudebox client/server improvements for session lifecycle - Expand sessions handler with exec streaming endpoint - Add comprehensive tests for sessions and kubernetes executor Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
386 lines
12 KiB
Go
386 lines
12 KiB
Go
package handlers
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
"time"
|
|
|
|
"github.com/go-chi/chi/v5"
|
|
"github.com/google/uuid"
|
|
"github.com/orchard9/rdev/internal/domain"
|
|
"github.com/orchard9/rdev/internal/logging"
|
|
"github.com/orchard9/rdev/internal/port"
|
|
"github.com/orchard9/rdev/pkg/api"
|
|
)
|
|
|
|
// SessionExecRequest is the JSON body for executing a command in a session.
|
|
type SessionExecRequest struct {
|
|
Type string `json:"type"` // "claude", "shell", or "git"
|
|
Prompt string `json:"prompt,omitempty"` // For claude commands
|
|
Command string `json:"command,omitempty"` // For shell/git commands
|
|
Args []string `json:"args,omitempty"` // Additional arguments
|
|
StreamID string `json:"stream_id,omitempty"` // Client-provided stream ID (optional)
|
|
ContinueConversation bool `json:"continue_conversation,omitempty"` // Resume stored claude_session_id
|
|
ConversationID string `json:"conversation_id,omitempty"` // Explicit --resume ID override
|
|
}
|
|
|
|
// SessionExecResponse is the JSON response for a session exec command.
|
|
type SessionExecResponse struct {
|
|
ID string `json:"id"`
|
|
SessionID string `json:"session_id"`
|
|
Type string `json:"type"`
|
|
Status string `json:"status"`
|
|
StreamURL string `json:"stream_url"`
|
|
}
|
|
|
|
// Exec executes a command in the context of an active session.
|
|
// POST /projects/{id}/sessions/{sid}/exec
|
|
func (h *SessionsHandler) Exec(w http.ResponseWriter, r *http.Request) {
|
|
projectID := chi.URLParam(r, "id")
|
|
if err := domain.ValidateProjectID(projectID); err != nil {
|
|
api.WriteBadRequest(w, r, "invalid project id")
|
|
return
|
|
}
|
|
|
|
sid := chi.URLParam(r, "sid")
|
|
if sid == "" {
|
|
api.WriteBadRequest(w, r, "session id is required")
|
|
return
|
|
}
|
|
|
|
var req SessionExecRequest
|
|
if err := api.DecodeJSON(r, &req); err != nil {
|
|
api.WriteBadRequest(w, r, "invalid request body")
|
|
return
|
|
}
|
|
|
|
// Validate command type and type-specific fields.
|
|
cmdType := domain.CommandType(req.Type)
|
|
switch cmdType {
|
|
case domain.CommandTypeClaude:
|
|
if req.Prompt == "" {
|
|
api.WriteBadRequest(w, r, "prompt is required for claude commands")
|
|
return
|
|
}
|
|
case domain.CommandTypeShell:
|
|
if req.Command == "" {
|
|
api.WriteBadRequest(w, r, "command is required for shell commands")
|
|
return
|
|
}
|
|
case domain.CommandTypeGit:
|
|
if req.Command == "" {
|
|
api.WriteBadRequest(w, r, "command is required for git commands")
|
|
return
|
|
}
|
|
default:
|
|
api.WriteBadRequest(w, r, "type must be claude, shell, or git")
|
|
return
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(r.Context(), TimeoutStandard)
|
|
defer cancel()
|
|
|
|
// Get session and verify it belongs to this project and is active.
|
|
session, err := h.sessionService.Get(ctx, domain.SessionID(sid))
|
|
if err != nil {
|
|
if errors.Is(err, domain.ErrSessionNotFound) {
|
|
api.WriteNotFound(w, r, "session not found")
|
|
return
|
|
}
|
|
api.WriteInternalError(w, r, "Failed to get session")
|
|
return
|
|
}
|
|
if string(session.ProjectID) != projectID {
|
|
api.WriteNotFound(w, r, "session not found")
|
|
return
|
|
}
|
|
if !session.IsActive() {
|
|
api.WriteBadRequest(w, r, "session is not active")
|
|
return
|
|
}
|
|
|
|
// Touch activity.
|
|
_ = h.sessionService.TouchActivity(ctx, session.ID)
|
|
|
|
// Build command args.
|
|
var args []string
|
|
switch cmdType {
|
|
case domain.CommandTypeClaude:
|
|
args = append([]string{req.Prompt}, req.Args...)
|
|
case domain.CommandTypeShell, domain.CommandTypeGit:
|
|
args = append([]string{req.Command}, req.Args...)
|
|
}
|
|
|
|
// Generate stream ID.
|
|
streamID := req.StreamID
|
|
if streamID == "" {
|
|
streamID = "session-" + uuid.New().String()
|
|
}
|
|
|
|
cmd := &domain.Command{
|
|
ID: domain.CommandID(streamID),
|
|
ProjectID: domain.ProjectID(projectID),
|
|
Type: cmdType,
|
|
Args: args,
|
|
StartedAt: time.Now(),
|
|
}
|
|
|
|
// Populate ResumeSessionID for claude commands.
|
|
if cmdType == domain.CommandTypeClaude {
|
|
if req.ConversationID != "" {
|
|
cmd.ResumeSessionID = req.ConversationID
|
|
} else if req.ContinueConversation && session.ClaudeSessionID != "" {
|
|
cmd.ResumeSessionID = session.ClaudeSessionID
|
|
}
|
|
}
|
|
|
|
streamURL := fmt.Sprintf("/projects/%s/sessions/%s/events?stream_id=%s", projectID, sid, streamID)
|
|
|
|
// Compute background context before writing response so it carries logger/trace values
|
|
// from the request context but is not cancelled when the HTTP handler returns.
|
|
bgCtx := context.WithoutCancel(r.Context())
|
|
api.WriteCreated(w, r, SessionExecResponse{
|
|
ID: streamID,
|
|
SessionID: string(session.ID),
|
|
Type: req.Type,
|
|
Status: "running",
|
|
StreamURL: streamURL,
|
|
})
|
|
go h.executeSessionCommand(bgCtx, cmd, session, streamID, req.Prompt)
|
|
}
|
|
|
|
// executeSessionCommand runs a command and streams output to subscribers.
|
|
// For claude commands it parses JSONL output to capture session_id, persist conversation
|
|
// records, and write user/assistant messages.
|
|
// parentCtx must already be detached from any request lifecycle (use context.WithoutCancel).
|
|
// prompt is the original user prompt text, used for conversation message persistence.
|
|
func (h *SessionsHandler) executeSessionCommand(parentCtx context.Context, cmd *domain.Command, session *domain.Session, streamID, prompt string) {
|
|
ctx, cancel := context.WithTimeout(parentCtx, TimeoutLongRunning)
|
|
defer cancel()
|
|
|
|
log := logging.FromContext(parentCtx)
|
|
|
|
var capturedSessionID string
|
|
var assistantResult string
|
|
|
|
result, _ := h.executor.Execute(ctx, cmd, session.PodName, func(line domain.OutputLine) {
|
|
if cmd.Type != domain.CommandTypeClaude {
|
|
// Non-claude commands: plain output events.
|
|
h.streams.Publish(streamID, port.StreamEvent{
|
|
Type: "output",
|
|
Data: map[string]any{
|
|
"line": line.Line,
|
|
"stream": line.Stream,
|
|
},
|
|
})
|
|
return
|
|
}
|
|
|
|
// Claude commands: parse JSONL.
|
|
if line.Stream == "stderr" {
|
|
h.streams.Publish(streamID, port.StreamEvent{
|
|
Type: "error",
|
|
Data: map[string]any{"line": line.Line},
|
|
})
|
|
return
|
|
}
|
|
|
|
var raw map[string]json.RawMessage
|
|
if err := json.Unmarshal([]byte(line.Line), &raw); err == nil {
|
|
// Capture session_id from the first event that carries it.
|
|
if sidRaw, ok := raw["session_id"]; ok && capturedSessionID == "" {
|
|
_ = json.Unmarshal(sidRaw, &capturedSessionID)
|
|
}
|
|
// Capture assistant result text from the "result" event.
|
|
if typeRaw, ok := raw["type"]; ok {
|
|
var evtType string
|
|
if _ = json.Unmarshal(typeRaw, &evtType); evtType == "result" {
|
|
if resRaw, ok := raw["result"]; ok {
|
|
_ = json.Unmarshal(resRaw, &assistantResult)
|
|
}
|
|
}
|
|
}
|
|
h.streams.Publish(streamID, port.StreamEvent{
|
|
Type: "claude_event",
|
|
Data: map[string]any{"event": json.RawMessage(line.Line)},
|
|
})
|
|
} else {
|
|
// Non-JSON line from claude (e.g., startup messages): emit as output.
|
|
h.streams.Publish(streamID, port.StreamEvent{
|
|
Type: "output",
|
|
Data: map[string]any{
|
|
"line": line.Line,
|
|
"stream": line.Stream,
|
|
},
|
|
})
|
|
}
|
|
})
|
|
|
|
// Persist claude_session_id and write message history for claude commands.
|
|
// Use parentCtx (not ctx) so persistence is not cancelled by the command timeout.
|
|
convID := session.ConversationRecordID
|
|
if cmd.Type == domain.CommandTypeClaude && capturedSessionID != "" {
|
|
// Ensure conversation record exists (create on first exec, reuse after).
|
|
if convID == "" && h.conversationService != nil {
|
|
sessionIDPrefix := string(session.ID)
|
|
if len(sessionIDPrefix) > 8 {
|
|
sessionIDPrefix = sessionIDPrefix[:8]
|
|
}
|
|
conv, err := h.conversationService.CreateConversation(parentCtx,
|
|
string(session.ProjectID),
|
|
fmt.Sprintf("Session %s", sessionIDPrefix),
|
|
)
|
|
if err != nil {
|
|
log.Error("failed to create conversation record",
|
|
logging.FieldError, err,
|
|
"session_id", session.ID,
|
|
)
|
|
} else {
|
|
convID = string(conv.ID)
|
|
}
|
|
}
|
|
|
|
if convID != "" && h.conversationService != nil {
|
|
// Write user prompt as a message.
|
|
if prompt != "" {
|
|
if _, err := h.conversationService.AddMessage(parentCtx, domain.ConversationID(convID), domain.MessageRoleUser, prompt); err != nil {
|
|
log.Warn("failed to persist user message",
|
|
logging.FieldError, err,
|
|
"session_id", session.ID,
|
|
)
|
|
}
|
|
}
|
|
// Write assistant response as a message.
|
|
if assistantResult != "" {
|
|
if _, err := h.conversationService.AddMessage(parentCtx, domain.ConversationID(convID), domain.MessageRoleAssistant, assistantResult); err != nil {
|
|
log.Warn("failed to persist assistant message",
|
|
logging.FieldError, err,
|
|
"session_id", session.ID,
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Persist the claude session ID if it changed.
|
|
if capturedSessionID != session.ClaudeSessionID {
|
|
if err := h.sessionService.SetClaudeSessionID(parentCtx, session.ID, capturedSessionID, convID); err != nil {
|
|
log.Error("failed to persist claude session ID",
|
|
logging.FieldError, err,
|
|
"session_id", session.ID,
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Publish completion event.
|
|
h.streams.Publish(streamID, port.StreamEvent{
|
|
Type: "complete",
|
|
Data: map[string]any{
|
|
"exit_code": result.ExitCode,
|
|
"duration_ms": result.DurationMs,
|
|
"claude_session_id": capturedSessionID,
|
|
"conversation_id": convID,
|
|
},
|
|
})
|
|
|
|
// Allow 5 seconds for SSE clients to receive the completion event and any Last-Event-ID reconnects.
|
|
time.Sleep(5 * time.Second)
|
|
h.streams.Close(streamID)
|
|
}
|
|
|
|
// Events streams session command output via Server-Sent Events.
|
|
// GET /projects/{id}/sessions/{sid}/events
|
|
func (h *SessionsHandler) Events(w http.ResponseWriter, r *http.Request) {
|
|
projectID := chi.URLParam(r, "id")
|
|
if err := domain.ValidateProjectID(projectID); err != nil {
|
|
api.WriteBadRequest(w, r, "invalid project id")
|
|
return
|
|
}
|
|
|
|
sid := chi.URLParam(r, "sid")
|
|
if sid == "" {
|
|
api.WriteBadRequest(w, r, "session id is required")
|
|
return
|
|
}
|
|
|
|
streamID := r.URL.Query().Get("stream_id")
|
|
lastEventID := r.Header.Get("Last-Event-ID")
|
|
|
|
lookupCtx, lookupCancel := context.WithTimeout(r.Context(), TimeoutFastLookup)
|
|
session, err := h.sessionService.Get(lookupCtx, domain.SessionID(sid))
|
|
lookupCancel()
|
|
if err != nil {
|
|
if errors.Is(err, domain.ErrSessionNotFound) {
|
|
api.WriteNotFound(w, r, "session not found")
|
|
return
|
|
}
|
|
api.WriteInternalError(w, r, "Failed to get session")
|
|
return
|
|
}
|
|
if string(session.ProjectID) != projectID {
|
|
api.WriteNotFound(w, r, "session not found")
|
|
return
|
|
}
|
|
|
|
// Touch activity with its own budget so it does not fail if the lookup was slow.
|
|
touchCtx, touchCancel := context.WithTimeout(r.Context(), TimeoutFastLookup)
|
|
defer touchCancel()
|
|
_ = h.sessionService.TouchActivity(touchCtx, session.ID)
|
|
|
|
// Set SSE headers.
|
|
w.Header().Set("Content-Type", "text/event-stream")
|
|
w.Header().Set("Cache-Control", "no-cache")
|
|
w.Header().Set("Connection", "keep-alive")
|
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
|
|
flusher, ok := w.(http.Flusher)
|
|
if !ok {
|
|
api.WriteInternalError(w, r, "SSE not supported")
|
|
return
|
|
}
|
|
|
|
// Subscribe to events with Last-Event-ID support.
|
|
var events <-chan port.StreamEvent
|
|
var cleanup func()
|
|
if lastEventID != "" {
|
|
events, cleanup = h.streams.SubscribeFromID(streamID, lastEventID)
|
|
} else {
|
|
events, cleanup = h.streams.Subscribe(streamID)
|
|
}
|
|
defer cleanup()
|
|
|
|
// Send initial connected event.
|
|
writeSSE(w, flusher, "connected", map[string]any{
|
|
"session_id": sid,
|
|
"stream_id": streamID,
|
|
"reconnecting": lastEventID != "",
|
|
})
|
|
|
|
// Stream events until client disconnects or stream closes.
|
|
reqCtx := r.Context()
|
|
heartbeat := time.NewTicker(30 * time.Second)
|
|
defer heartbeat.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-reqCtx.Done():
|
|
return
|
|
case event, ok := <-events:
|
|
if !ok {
|
|
return
|
|
}
|
|
writeSSEWithID(w, flusher, event.ID, event.Type, event.Data)
|
|
if event.Type == "complete" {
|
|
return
|
|
}
|
|
case <-heartbeat.C:
|
|
writeSSE(w, flusher, "heartbeat", map[string]any{
|
|
"timestamp": time.Now().UTC().Format(time.RFC3339),
|
|
})
|
|
}
|
|
}
|
|
}
|