// Package handlers provides HTTP handlers for the rdev API. package handlers import ( "context" "encoding/json" "fmt" "net/http" "sync" "sync/atomic" "time" "github.com/go-chi/chi/v5" "github.com/orchard9/rdev/internal/executor" "github.com/orchard9/rdev/internal/projects" "github.com/orchard9/rdev/pkg/api" ) // ProjectsHandler handles project-related endpoints. type ProjectsHandler struct { registry *projects.Registry executor *executor.Executor streams *streamManager cmdID atomic.Uint64 } // NewProjectsHandler creates a new projects handler. func NewProjectsHandler() *ProjectsHandler { return &ProjectsHandler{ registry: projects.NewRegistry("rdev"), executor: executor.New("rdev"), streams: newStreamManager(), } } // Mount registers the projects routes. func (h *ProjectsHandler) Mount(r api.Router) { r.Route("/projects", func(r chi.Router) { r.Get("/", h.List) r.Get("/{id}", h.Get) r.Post("/{id}/claude", h.RunClaude) r.Post("/{id}/shell", h.RunShell) r.Post("/{id}/git", h.RunGit) r.Get("/{id}/events", h.Events) }) } // List returns all available projects. // GET /projects func (h *ProjectsHandler) List(w http.ResponseWriter, r *http.Request) { // Refresh status from K8s ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) defer cancel() h.registry.RefreshStatus(ctx) projects := h.registry.List() api.WriteSuccess(w, r, projects) } // Get returns a specific project by ID. // GET /projects/{id} func (h *ProjectsHandler) Get(w http.ResponseWriter, r *http.Request) { id := chi.URLParam(r, "id") project, ok := h.registry.Get(id) if !ok { api.WriteNotFound(w, r, fmt.Sprintf("project not found: %s", id)) return } // Refresh this project's status ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) defer cancel() h.registry.RefreshStatus(ctx) api.WriteSuccess(w, r, project) } // ClaudeRequest is the request body for POST /projects/{id}/claude. type ClaudeRequest struct { Prompt string `json:"prompt"` StreamID string `json:"stream_id,omitempty"` } // RunClaude executes a Claude command in the project's claudebox. // POST /projects/{id}/claude func (h *ProjectsHandler) RunClaude(w http.ResponseWriter, r *http.Request) { id := chi.URLParam(r, "id") project, ok := h.registry.Get(id) if !ok { api.WriteNotFound(w, r, fmt.Sprintf("project not found: %s", id)) return } var req ClaudeRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { api.WriteBadRequest(w, r, "invalid request body") return } if req.Prompt == "" { api.WriteBadRequest(w, r, "prompt is required") return } // Generate command ID cmdNum := h.cmdID.Add(1) cmdID := fmt.Sprintf("cmd-%s-%03d", id, cmdNum) if req.StreamID != "" { cmdID = req.StreamID } // Create the command cmd := &executor.Command{ ID: cmdID, PodName: project.PodName, Type: executor.CommandTypeClaude, Args: []string{req.Prompt}, StartedAt: time.Now(), } // Execute in background go h.executeCommand(cmd) result := map[string]any{ "id": cmdID, "project": id, "type": "claude", "status": "running", "stream_url": fmt.Sprintf("/projects/%s/events?stream_id=%s", id, cmdID), } api.WriteCreated(w, r, result) } // ShellRequest is the request body for POST /projects/{id}/shell. type ShellRequest struct { Command string `json:"command"` StreamID string `json:"stream_id,omitempty"` } // RunShell executes a shell command in the project's claudebox. // POST /projects/{id}/shell func (h *ProjectsHandler) RunShell(w http.ResponseWriter, r *http.Request) { id := chi.URLParam(r, "id") project, ok := h.registry.Get(id) if !ok { api.WriteNotFound(w, r, fmt.Sprintf("project not found: %s", id)) return } var req ShellRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { api.WriteBadRequest(w, r, "invalid request body") return } if req.Command == "" { api.WriteBadRequest(w, r, "command is required") return } // Generate command ID cmdNum := h.cmdID.Add(1) cmdID := fmt.Sprintf("cmd-%s-%03d", id, cmdNum) if req.StreamID != "" { cmdID = req.StreamID } // Create the command cmd := &executor.Command{ ID: cmdID, PodName: project.PodName, Type: executor.CommandTypeShell, Args: []string{req.Command}, StartedAt: time.Now(), } // Execute in background go h.executeCommand(cmd) result := map[string]any{ "id": cmdID, "project": id, "type": "shell", "status": "running", "stream_url": fmt.Sprintf("/projects/%s/events?stream_id=%s", id, cmdID), } api.WriteCreated(w, r, result) } // GitRequest is the request body for POST /projects/{id}/git. type GitRequest struct { Args []string `json:"args"` StreamID string `json:"stream_id,omitempty"` } // RunGit executes a git command in the project's claudebox. // POST /projects/{id}/git func (h *ProjectsHandler) RunGit(w http.ResponseWriter, r *http.Request) { id := chi.URLParam(r, "id") project, ok := h.registry.Get(id) if !ok { api.WriteNotFound(w, r, fmt.Sprintf("project not found: %s", id)) return } var req GitRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { api.WriteBadRequest(w, r, "invalid request body") return } if len(req.Args) == 0 { api.WriteBadRequest(w, r, "args is required") return } // Generate command ID cmdNum := h.cmdID.Add(1) cmdID := fmt.Sprintf("cmd-%s-%03d", id, cmdNum) if req.StreamID != "" { cmdID = req.StreamID } // Create the command cmd := &executor.Command{ ID: cmdID, PodName: project.PodName, Type: executor.CommandTypeGit, Args: req.Args, StartedAt: time.Now(), } // Execute in background go h.executeCommand(cmd) result := map[string]any{ "id": cmdID, "project": id, "type": "git", "status": "running", "stream_url": fmt.Sprintf("/projects/%s/events?stream_id=%s", id, cmdID), } api.WriteCreated(w, r, result) } // executeCommand runs a command and streams output to subscribers. func (h *ProjectsHandler) executeCommand(cmd *executor.Command) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) defer cancel() result := h.executor.Exec(ctx, cmd, func(stream, line string) { h.streams.Send(cmd.ID, "output", map[string]any{ "line": line, "stream": stream, }) }) // Send completion event h.streams.Send(cmd.ID, "complete", map[string]any{ "exit_code": result.ExitCode, "duration_ms": result.DurationMs, }) // Clean up stream after a delay go func() { time.Sleep(30 * time.Second) h.streams.Close(cmd.ID) }() } // Events streams command output via Server-Sent Events. // GET /projects/{id}/events func (h *ProjectsHandler) Events(w http.ResponseWriter, r *http.Request) { id := chi.URLParam(r, "id") streamID := r.URL.Query().Get("stream_id") if !h.registry.Exists(id) { api.WriteNotFound(w, r, fmt.Sprintf("project not found: %s", id)) return } // Set SSE headers w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("Access-Control-Allow-Origin", "*") flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "SSE not supported", http.StatusInternalServerError) return } // Subscribe to events events := h.streams.Subscribe(streamID) defer h.streams.Unsubscribe(streamID, events) // Send initial connected event writeSSE(w, flusher, "connected", map[string]any{ "project": id, "stream_id": streamID, }) // Stream events until client disconnects or stream closes ctx := r.Context() heartbeat := time.NewTicker(30 * time.Second) defer heartbeat.Stop() for { select { case <-ctx.Done(): return case event, ok := <-events: if !ok { return } writeSSE(w, flusher, event.Type, event.Data) if event.Type == "complete" { return } case <-heartbeat.C: writeSSE(w, flusher, "heartbeat", map[string]any{ "timestamp": time.Now().UTC().Format(time.RFC3339), }) } } } // writeSSE writes a Server-Sent Event. func writeSSE(w http.ResponseWriter, flusher http.Flusher, event string, data map[string]any) { dataBytes, _ := json.Marshal(data) fmt.Fprintf(w, "event: %s\n", event) fmt.Fprintf(w, "data: %s\n\n", dataBytes) flusher.Flush() } // streamManager manages SSE event streams. type streamManager struct { mu sync.RWMutex streams map[string][]chan streamEvent } type streamEvent struct { Type string Data map[string]any } func newStreamManager() *streamManager { return &streamManager{ streams: make(map[string][]chan streamEvent), } } func (sm *streamManager) Subscribe(streamID string) chan streamEvent { sm.mu.Lock() defer sm.mu.Unlock() ch := make(chan streamEvent, 100) sm.streams[streamID] = append(sm.streams[streamID], ch) return ch } func (sm *streamManager) Unsubscribe(streamID string, ch chan streamEvent) { sm.mu.Lock() defer sm.mu.Unlock() channels := sm.streams[streamID] for i, c := range channels { if c == ch { sm.streams[streamID] = append(channels[:i], channels[i+1:]...) close(ch) break } } } func (sm *streamManager) Send(streamID, eventType string, data map[string]any) { sm.mu.RLock() defer sm.mu.RUnlock() for _, ch := range sm.streams[streamID] { select { case ch <- streamEvent{Type: eventType, Data: data}: default: // Channel full, skip } } } func (sm *streamManager) Close(streamID string) { sm.mu.Lock() defer sm.mu.Unlock() for _, ch := range sm.streams[streamID] { close(ch) } delete(sm.streams, streamID) }