rdev/internal/handlers/queue.go
jordan bc47e426b0 feat: Add CI pipeline proxy, DNS alias management, and worker executor system
- Add ListPipelines/GetPipeline to CIProvider port with Woodpecker adapter
- Add DNS alias endpoints: GET/POST/DELETE /projects/{id}/domains
- Implement worker executor daemon, build executor, and git operations
- Add build service, worker service, and build audit tracking
- Add worker registry with PostgreSQL adapter and migration
- Add multi-provider code agent interface (Claude Code + OpenCode)
- Add create-and-build combo endpoint
- Update landing-page cookbook to reflect all gaps closed
- Fix tech debt: unified validation, auth scopes, error wrapping, slog patterns

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-27 21:05:28 -07:00

358 lines
9.6 KiB
Go

// Package handlers provides HTTP handlers for the rdev API.
package handlers
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strconv"
"github.com/go-chi/chi/v5"
"github.com/orchard9/rdev/internal/auth"
"github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/port"
"github.com/orchard9/rdev/internal/sanitize"
"github.com/orchard9/rdev/pkg/api"
)
// QueueHandler handles command queue endpoints.
type QueueHandler struct {
queue port.CommandQueue
projects port.ProjectRepository
}
// NewQueueHandler creates a new queue handler.
func NewQueueHandler(queue port.CommandQueue, projects port.ProjectRepository) *QueueHandler {
return &QueueHandler{
queue: queue,
projects: projects,
}
}
// Mount registers the queue routes.
func (h *QueueHandler) Mount(r api.Router) {
r.Route("/projects/{id}/queue", func(r chi.Router) {
r.Post("/", h.Enqueue)
r.Get("/", h.List)
r.Get("/stats", h.Stats)
r.Get("/{cmdId}", h.GetByID)
r.Delete("/{cmdId}", h.Cancel)
})
}
// EnqueueRequest is the request body for POST /projects/{id}/queue.
type EnqueueRequest struct {
Command string `json:"command"` // Required: the command to execute
CommandType string `json:"command_type"` // Required: claude, shell, or git
WorkingDir string `json:"working_dir,omitempty"` // Optional: working directory
Priority int `json:"priority,omitempty"` // Optional: higher = more urgent (default: 0)
}
// MaxCommandSize is the maximum allowed size for command payloads (10KB).
const MaxCommandSize = 10 * 1024
// EnqueueResponse is the response for POST /projects/{id}/queue.
type EnqueueResponse struct {
ID string `json:"id"`
ProjectID string `json:"project_id"`
Status string `json:"status"`
StreamURL string `json:"stream_url"`
Position int `json:"position,omitempty"` // Approximate queue position
}
// Enqueue adds a command to the project's queue.
// POST /projects/{id}/queue
func (h *QueueHandler) Enqueue(w http.ResponseWriter, r *http.Request) {
projectID := chi.URLParam(r, "id")
// Check project exists
exists, err := h.projects.Exists(r.Context(), domain.ProjectID(projectID))
if err != nil {
api.WriteInternalError(w, r, "failed to check project")
return
}
if !exists {
api.WriteNotFound(w, r, fmt.Sprintf("project not found: %s", projectID))
return
}
// Parse request
var req EnqueueRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
api.WriteBadRequest(w, r, "invalid request body")
return
}
// Validate command type
var cmdType domain.CommandType
switch req.CommandType {
case "claude":
cmdType = domain.CommandTypeClaude
case "shell":
cmdType = domain.CommandTypeShell
case "git":
cmdType = domain.CommandTypeGit
default:
api.WriteBadRequest(w, r, "command_type must be one of: claude, shell, git")
return
}
// Validate command
if req.Command == "" {
api.WriteBadRequest(w, r, "command is required")
return
}
// Validate command size to prevent large payloads
if len(req.Command) > MaxCommandSize {
api.WriteBadRequest(w, r, fmt.Sprintf("command exceeds maximum size of %d bytes", MaxCommandSize))
return
}
// Sanitize based on command type
switch cmdType {
case domain.CommandTypeClaude:
if err := sanitize.ClaudePrompt(req.Command); err != nil {
api.WriteBadRequest(w, r, err.Error())
return
}
case domain.CommandTypeShell:
if err := sanitize.ShellCommand(req.Command); err != nil {
api.WriteBadRequest(w, r, err.Error())
return
}
case domain.CommandTypeGit:
// For git, the command should be JSON-encoded args
var gitArgs []string
if err := json.Unmarshal([]byte(req.Command), &gitArgs); err != nil {
api.WriteBadRequest(w, r, "git command must be JSON array of args")
return
}
if err := sanitize.GitArgs(gitArgs); err != nil {
api.WriteBadRequest(w, r, err.Error())
return
}
}
// Get API key ID for audit trail
var apiKeyID string
if apiKey := auth.GetAPIKey(r.Context()); apiKey != nil {
apiKeyID = string(apiKey.ID)
}
// Create queued command
cmd := &domain.QueuedCommand{
ProjectID: projectID,
Command: req.Command,
CommandType: cmdType,
WorkingDir: req.WorkingDir,
Status: domain.QueueStatusPending,
Priority: req.Priority,
APIKeyID: apiKeyID,
}
// Enqueue
if err := h.queue.Enqueue(r.Context(), cmd); err != nil {
api.WriteInternalError(w, r, "failed to enqueue command")
return
}
// Get approximate queue position
pendingStatus := domain.QueueStatusPending
pending, _ := h.queue.List(r.Context(), projectID, &domain.QueueFilters{
Status: &pendingStatus,
Limit: 1000,
SortOrder: "asc",
})
position := len(pending)
api.WriteCreated(w, r, EnqueueResponse{
ID: string(cmd.ID),
ProjectID: projectID,
Status: string(cmd.Status),
StreamURL: fmt.Sprintf("/projects/%s/events?stream_id=%s", projectID, cmd.ID),
Position: position,
})
}
// ListResponse is the response for GET /projects/{id}/queue.
type ListResponse struct {
Commands []*domain.QueuedCommand `json:"commands"`
Total int `json:"total"`
Limit int `json:"limit"`
Offset int `json:"offset"`
}
// List returns queued commands for a project.
// GET /projects/{id}/queue
func (h *QueueHandler) List(w http.ResponseWriter, r *http.Request) {
projectID := chi.URLParam(r, "id")
// Check project exists
exists, err := h.projects.Exists(r.Context(), domain.ProjectID(projectID))
if err != nil {
api.WriteInternalError(w, r, "failed to check project")
return
}
if !exists {
api.WriteNotFound(w, r, fmt.Sprintf("project not found: %s", projectID))
return
}
// Parse query params
filters := domain.DefaultQueueFilters()
if status := r.URL.Query().Get("status"); status != "" {
s := domain.QueueStatus(status)
filters.Status = &s
}
if limit := r.URL.Query().Get("limit"); limit != "" {
if l, err := strconv.Atoi(limit); err == nil && l > 0 && l <= 1000 {
filters.Limit = l
}
}
if offset := r.URL.Query().Get("offset"); offset != "" {
if o, err := strconv.Atoi(offset); err == nil && o >= 0 {
filters.Offset = o
}
}
if sort := r.URL.Query().Get("sort"); sort == "asc" || sort == "desc" {
filters.SortOrder = sort
}
// List commands
commands, err := h.queue.List(r.Context(), projectID, filters)
if err != nil {
api.WriteInternalError(w, r, "failed to list commands")
return
}
if commands == nil {
commands = []*domain.QueuedCommand{}
}
api.WriteSuccess(w, r, ListResponse{
Commands: commands,
Total: len(commands),
Limit: filters.Limit,
Offset: filters.Offset,
})
}
// GetByID returns a specific queued command.
// GET /projects/{id}/queue/{cmdId}
func (h *QueueHandler) GetByID(w http.ResponseWriter, r *http.Request) {
projectID := chi.URLParam(r, "id")
cmdID := chi.URLParam(r, "cmdId")
// Check project exists
exists, err := h.projects.Exists(r.Context(), domain.ProjectID(projectID))
if err != nil {
api.WriteInternalError(w, r, "failed to check project")
return
}
if !exists {
api.WriteNotFound(w, r, fmt.Sprintf("project not found: %s", projectID))
return
}
// Get command
cmd, err := h.queue.GetByID(r.Context(), domain.QueuedCommandID(cmdID))
if err != nil {
if errors.Is(err, domain.ErrCommandNotFound) {
api.WriteNotFound(w, r, fmt.Sprintf("command not found: %s", cmdID))
return
}
api.WriteInternalError(w, r, "failed to get command")
return
}
// Verify command belongs to project
if cmd.ProjectID != projectID {
api.WriteNotFound(w, r, fmt.Sprintf("command not found: %s", cmdID))
return
}
api.WriteSuccess(w, r, cmd)
}
// Cancel cancels a pending queued command.
// DELETE /projects/{id}/queue/{cmdId}
func (h *QueueHandler) Cancel(w http.ResponseWriter, r *http.Request) {
projectID := chi.URLParam(r, "id")
cmdID := chi.URLParam(r, "cmdId")
// Check project exists
exists, err := h.projects.Exists(r.Context(), domain.ProjectID(projectID))
if err != nil {
api.WriteInternalError(w, r, "failed to check project")
return
}
if !exists {
api.WriteNotFound(w, r, fmt.Sprintf("project not found: %s", projectID))
return
}
// Verify command exists and belongs to project
cmd, err := h.queue.GetByID(r.Context(), domain.QueuedCommandID(cmdID))
if err != nil {
if errors.Is(err, domain.ErrCommandNotFound) {
api.WriteNotFound(w, r, fmt.Sprintf("command not found: %s", cmdID))
return
}
api.WriteInternalError(w, r, "failed to get command")
return
}
if cmd.ProjectID != projectID {
api.WriteNotFound(w, r, fmt.Sprintf("command not found: %s", cmdID))
return
}
// Cancel command
if err := h.queue.Cancel(r.Context(), domain.QueuedCommandID(cmdID)); err != nil {
if errors.Is(err, domain.ErrCommandNotFound) {
api.WriteNotFound(w, r, fmt.Sprintf("command not found: %s", cmdID))
return
}
api.WriteBadRequest(w, r, err.Error())
return
}
api.WriteSuccess(w, r, map[string]any{
"id": cmdID,
"status": "cancelled",
"message": "command cancelled successfully",
})
}
// Stats returns queue statistics for a project.
// GET /projects/{id}/queue/stats
func (h *QueueHandler) Stats(w http.ResponseWriter, r *http.Request) {
projectID := chi.URLParam(r, "id")
// Check project exists
exists, err := h.projects.Exists(r.Context(), domain.ProjectID(projectID))
if err != nil {
api.WriteInternalError(w, r, "failed to check project")
return
}
if !exists {
api.WriteNotFound(w, r, fmt.Sprintf("project not found: %s", projectID))
return
}
// Get stats
stats, err := h.queue.GetStats(r.Context(), projectID)
if err != nil {
api.WriteInternalError(w, r, "failed to get queue stats")
return
}
api.WriteSuccess(w, r, stats)
}