// Package handlers provides HTTP handlers for the rdev API. package handlers import ( "context" "net/http" "github.com/go-chi/chi/v5" "github.com/orchard9/rdev/internal/adapter/postgres" "github.com/orchard9/rdev/internal/auth" "github.com/orchard9/rdev/internal/domain" "github.com/orchard9/rdev/internal/port" "github.com/orchard9/rdev/internal/validate" "github.com/orchard9/rdev/pkg/api" ) // SagaHandler handles saga API endpoints. type SagaHandler struct { repo port.SagaRepository executor port.SagaExecutor } // NewSagaHandler creates a new saga handler. func NewSagaHandler(repo port.SagaRepository, executor port.SagaExecutor) *SagaHandler { return &SagaHandler{ repo: repo, executor: executor, } } // Mount registers the saga routes. func (h *SagaHandler) Mount(r api.Router) { r.Route("/sagas", func(r chi.Router) { // Create and list sagas r.With(auth.RequireScope(auth.ScopeProjectsExecute, auth.ScopeAdmin)).Post("/", h.Create) r.With(auth.RequireScope(auth.ScopeProjectsRead, auth.ScopeAdmin)).Get("/", h.List) // Single saga operations r.With(auth.RequireScope(auth.ScopeProjectsRead, auth.ScopeAdmin)).Get("/{id}", h.Get) r.With(auth.RequireScope(auth.ScopeProjectsExecute, auth.ScopeAdmin)).Delete("/{id}", h.Delete) // Saga control operations r.With(auth.RequireScope(auth.ScopeProjectsExecute, auth.ScopeAdmin)).Post("/{id}/retry", h.Retry) r.With(auth.RequireScope(auth.ScopeProjectsExecute, auth.ScopeAdmin)).Post("/{id}/rollback", h.Rollback) // Step operations r.With(auth.RequireScope(auth.ScopeProjectsExecute, auth.ScopeAdmin)).Post("/{id}/steps/{step}/retry", h.RetryStep) r.With(auth.RequireScope(auth.ScopeProjectsExecute, auth.ScopeAdmin)).Post("/{id}/steps/{step}/skip", h.SkipStep) }) } // CreateSagaRequest is the request body for POST /sagas. type CreateSagaRequest struct { Name string `json:"name"` Definition string `json:"definition,omitempty"` Vars map[string]string `json:"vars,omitempty"` MaxRetries int `json:"max_retries,omitempty"` Steps []CreateStepSpec `json:"steps,omitempty"` } // CreateStepSpec specifies a step in the saga. type CreateStepSpec struct { Name string `json:"name"` Action string `json:"action"` DependsOn []string `json:"depends_on,omitempty"` Compensate string `json:"compensate,omitempty"` Config map[string]any `json:"config,omitempty"` RetryPolicy *domain.RetryPolicy `json:"retry_policy,omitempty"` } // SagaResponse is the API response for a saga. type SagaResponse struct { ID string `json:"id"` Name string `json:"name"` Status string `json:"status"` CurrentStep string `json:"current_step,omitempty"` RetryCount int `json:"retry_count"` MaxRetries int `json:"max_retries"` Error string `json:"error,omitempty"` Steps []StepResponse `json:"steps,omitempty"` Outputs map[string]map[string]any `json:"outputs,omitempty"` CreatedAt string `json:"created_at"` UpdatedAt string `json:"updated_at"` CompletedAt string `json:"completed_at,omitempty"` } // StepResponse is the API response for a saga step. type StepResponse struct { ID string `json:"id"` Name string `json:"name"` Status string `json:"status"` Action string `json:"action"` DependsOn []string `json:"depends_on,omitempty"` Compensate string `json:"compensate,omitempty"` RetryCount int `json:"retry_count"` Output map[string]any `json:"output,omitempty"` Error string `json:"error,omitempty"` StartedAt string `json:"started_at,omitempty"` CompletedAt string `json:"completed_at,omitempty"` } // Create creates and starts a new saga. // POST /sagas func (h *SagaHandler) Create(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), TimeoutOrchestration) defer cancel() var req CreateSagaRequest if err := api.DecodeJSON(r, &req); err != nil { api.WriteBadRequest(w, r, "invalid request body") return } v := validate.New() v.Required(req.Name, "name") if len(req.Steps) > 0 { for i, step := range req.Steps { v.Required(step.Name, "steps["+string(rune('0'+i))+"].name") v.Required(step.Action, "steps["+string(rune('0'+i))+"].action") } } if err := v.Error(); err != nil { api.WriteBadRequest(w, r, err.Error()) return } // Build saga from request saga := &domain.Saga{ Name: req.Name, Status: domain.SagaStatusPending, Definition: req.Definition, Vars: req.Vars, MaxRetries: req.MaxRetries, Outputs: make(map[string]map[string]any), } if saga.MaxRetries <= 0 { saga.MaxRetries = 3 } // Build steps for _, spec := range req.Steps { step := domain.SagaStep{ Name: spec.Name, Status: domain.StepStatusPending, Action: spec.Action, DependsOn: spec.DependsOn, Compensate: spec.Compensate, Config: spec.Config, } if spec.RetryPolicy != nil { step.RetryPolicy = *spec.RetryPolicy } else { step.RetryPolicy = domain.DefaultRetryPolicy() } saga.Steps = append(saga.Steps, step) } // Create saga in database if err := h.repo.Create(ctx, saga); err != nil { api.WriteInternalError(w, r, "failed to create saga: "+err.Error()) return } // Start execution asynchronously (don't wait for completion) go func() { execCtx := context.WithoutCancel(ctx) if err := h.executor.Execute(execCtx, saga); err != nil { // Error is already recorded in the saga _ = err } }() api.WriteCreated(w, r, sagaToResponse(saga)) } // Get returns a saga by ID. // GET /sagas/{id} func (h *SagaHandler) Get(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), TimeoutLookup) defer cancel() id := chi.URLParam(r, "id") if id == "" { api.WriteBadRequest(w, r, "id is required") return } saga, err := h.repo.Get(ctx, id) if err != nil { if err == postgres.ErrSagaNotFound { api.WriteNotFound(w, r, "saga not found") return } api.WriteInternalError(w, r, "failed to get saga: "+err.Error()) return } api.WriteSuccess(w, r, sagaToResponse(saga)) } // List returns sagas matching filters. // GET /sagas func (h *SagaHandler) List(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), TimeoutLookup) defer cancel() filters := domain.SagaFilters{ Name: r.URL.Query().Get("name"), Status: domain.SagaStatus(r.URL.Query().Get("status")), } filters.Normalize() sagas, err := h.repo.List(ctx, filters) if err != nil { api.WriteInternalError(w, r, "failed to list sagas: "+err.Error()) return } resp := make([]SagaResponse, len(sagas)) for i, saga := range sagas { resp[i] = sagaToResponse(saga) } api.WriteSuccess(w, r, resp) } // Delete removes a saga. // DELETE /sagas/{id} func (h *SagaHandler) Delete(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), TimeoutStandard) defer cancel() id := chi.URLParam(r, "id") if id == "" { api.WriteBadRequest(w, r, "id is required") return } if err := h.repo.Delete(ctx, id); err != nil { if err == postgres.ErrSagaNotFound { api.WriteNotFound(w, r, "saga not found") return } api.WriteInternalError(w, r, "failed to delete saga: "+err.Error()) return } api.WriteSuccess(w, r, map[string]string{ "status": "deleted", "id": id, }) } // Retry resumes a failed saga from the last failed step. // POST /sagas/{id}/retry func (h *SagaHandler) Retry(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), TimeoutOrchestration) defer cancel() id := chi.URLParam(r, "id") if id == "" { api.WriteBadRequest(w, r, "id is required") return } // Start resumption asynchronously go func() { execCtx := context.WithoutCancel(ctx) if err := h.executor.Resume(execCtx, id); err != nil { // Error is already recorded in the saga _ = err } }() // Return current saga state saga, err := h.repo.Get(ctx, id) if err != nil { if err == postgres.ErrSagaNotFound { api.WriteNotFound(w, r, "saga not found") return } api.WriteInternalError(w, r, "failed to get saga: "+err.Error()) return } api.WriteSuccess(w, r, sagaToResponse(saga)) } // Rollback triggers compensation for a failed saga. // POST /sagas/{id}/rollback func (h *SagaHandler) Rollback(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), TimeoutOrchestration) defer cancel() id := chi.URLParam(r, "id") if id == "" { api.WriteBadRequest(w, r, "id is required") return } // Start compensation asynchronously go func() { execCtx := context.WithoutCancel(ctx) if err := h.executor.Compensate(execCtx, id); err != nil { // Error is already recorded in the saga _ = err } }() // Return current saga state saga, err := h.repo.Get(ctx, id) if err != nil { if err == postgres.ErrSagaNotFound { api.WriteNotFound(w, r, "saga not found") return } api.WriteInternalError(w, r, "failed to get saga: "+err.Error()) return } api.WriteSuccess(w, r, sagaToResponse(saga)) } // RetryStep retries a specific failed step. // POST /sagas/{id}/steps/{step}/retry func (h *SagaHandler) RetryStep(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), TimeoutOrchestration) defer cancel() id := chi.URLParam(r, "id") step := chi.URLParam(r, "step") if id == "" || step == "" { api.WriteBadRequest(w, r, "id and step are required") return } // Start retry asynchronously go func() { execCtx := context.WithoutCancel(ctx) if err := h.executor.RetryStep(execCtx, id, step); err != nil { // Error is already recorded in the saga _ = err } }() // Return current saga state saga, err := h.repo.Get(ctx, id) if err != nil { if err == postgres.ErrSagaNotFound { api.WriteNotFound(w, r, "saga not found") return } api.WriteInternalError(w, r, "failed to get saga: "+err.Error()) return } api.WriteSuccess(w, r, sagaToResponse(saga)) } // SkipStep skips a step and continues execution. // POST /sagas/{id}/steps/{step}/skip func (h *SagaHandler) SkipStep(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), TimeoutOrchestration) defer cancel() id := chi.URLParam(r, "id") step := chi.URLParam(r, "step") if id == "" || step == "" { api.WriteBadRequest(w, r, "id and step are required") return } // Start skip and continue asynchronously go func() { execCtx := context.WithoutCancel(ctx) if err := h.executor.SkipStep(execCtx, id, step); err != nil { // Error is already recorded in the saga _ = err } }() // Return current saga state saga, err := h.repo.Get(ctx, id) if err != nil { if err == postgres.ErrSagaNotFound { api.WriteNotFound(w, r, "saga not found") return } api.WriteInternalError(w, r, "failed to get saga: "+err.Error()) return } api.WriteSuccess(w, r, sagaToResponse(saga)) } // sagaToResponse converts a domain saga to an API response. func sagaToResponse(saga *domain.Saga) SagaResponse { resp := SagaResponse{ ID: saga.ID, Name: saga.Name, Status: string(saga.Status), CurrentStep: saga.CurrentStep, RetryCount: saga.RetryCount, MaxRetries: saga.MaxRetries, Error: saga.Error, Outputs: saga.Outputs, CreatedAt: saga.CreatedAt.Format("2006-01-02T15:04:05Z07:00"), UpdatedAt: saga.UpdatedAt.Format("2006-01-02T15:04:05Z07:00"), } if saga.CompletedAt != nil { resp.CompletedAt = saga.CompletedAt.Format("2006-01-02T15:04:05Z07:00") } for _, step := range saga.Steps { stepResp := StepResponse{ ID: step.ID, Name: step.Name, Status: string(step.Status), Action: step.Action, DependsOn: step.DependsOn, Compensate: step.Compensate, RetryCount: step.RetryCount, Output: step.Output, Error: step.Error, } if step.StartedAt != nil { stepResp.StartedAt = step.StartedAt.Format("2006-01-02T15:04:05Z07:00") } if step.CompletedAt != nil { stepResp.CompletedAt = step.CompletedAt.Format("2006-01-02T15:04:05Z07:00") } resp.Steps = append(resp.Steps, stepResp) } return resp }