rdev/internal/handlers/saga.go
jordan f20fc6c51c
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
feat(saga): implement enterprise-grade resilience architecture
Fixes issues from code review of resilience implementation:

- Wire saga system in main.go (SagaRepository, SagaExecutor, SagaHandler)
- Fix CompletedSteps() to include skipped steps for dependency resolution
- Fix reverse loop bug in saga compensation (use standard swap pattern)
- Add circuit breaker state change callbacks for Prometheus metrics

Phase 1 (Build Resilience):
- Add failure:retry to all component Kaniko build steps
- Add preflight registry health check before builds
- Add services-deployed sync point to decouple docs from critical path

Phase 2 (API Resilience):
- Add pipeline retry endpoint (POST /projects/{id}/pipelines/{number}/retry)
- Wire circuit breakers with metrics callbacks
- Add /health/circuits endpoint for circuit breaker status

Phase 3 (Saga Engine):
- Full domain model (Saga, SagaStep, RetryPolicy, BackoffType)
- PostgreSQL saga repository with CRUD and step management
- Saga executor with retry, compensation, skip step support
- Saga API handlers with CRUD and control operations

Phase 4 (Observability):
- Add saga metrics (total, step_duration, retry, circuit_breaker_state)
- Add logging fields (saga_id, saga_name, step_name)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-08 01:58:02 -07:00

439 lines
12 KiB
Go

// Package handlers provides HTTP handlers for the rdev API.
package handlers
import (
"context"
"net/http"
"github.com/go-chi/chi/v5"
"github.com/orchard9/rdev/internal/adapter/postgres"
"github.com/orchard9/rdev/internal/auth"
"github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/port"
"github.com/orchard9/rdev/internal/validate"
"github.com/orchard9/rdev/pkg/api"
)
// SagaHandler handles saga API endpoints.
type SagaHandler struct {
repo port.SagaRepository
executor port.SagaExecutor
}
// NewSagaHandler creates a new saga handler.
func NewSagaHandler(repo port.SagaRepository, executor port.SagaExecutor) *SagaHandler {
return &SagaHandler{
repo: repo,
executor: executor,
}
}
// Mount registers the saga routes.
func (h *SagaHandler) Mount(r api.Router) {
r.Route("/sagas", func(r chi.Router) {
// Create and list sagas
r.With(auth.RequireScope(auth.ScopeProjectsExecute, auth.ScopeAdmin)).Post("/", h.Create)
r.With(auth.RequireScope(auth.ScopeProjectsRead, auth.ScopeAdmin)).Get("/", h.List)
// Single saga operations
r.With(auth.RequireScope(auth.ScopeProjectsRead, auth.ScopeAdmin)).Get("/{id}", h.Get)
r.With(auth.RequireScope(auth.ScopeProjectsExecute, auth.ScopeAdmin)).Delete("/{id}", h.Delete)
// Saga control operations
r.With(auth.RequireScope(auth.ScopeProjectsExecute, auth.ScopeAdmin)).Post("/{id}/retry", h.Retry)
r.With(auth.RequireScope(auth.ScopeProjectsExecute, auth.ScopeAdmin)).Post("/{id}/rollback", h.Rollback)
// Step operations
r.With(auth.RequireScope(auth.ScopeProjectsExecute, auth.ScopeAdmin)).Post("/{id}/steps/{step}/retry", h.RetryStep)
r.With(auth.RequireScope(auth.ScopeProjectsExecute, auth.ScopeAdmin)).Post("/{id}/steps/{step}/skip", h.SkipStep)
})
}
// CreateSagaRequest is the request body for POST /sagas.
type CreateSagaRequest struct {
Name string `json:"name"`
Definition string `json:"definition,omitempty"`
Vars map[string]string `json:"vars,omitempty"`
MaxRetries int `json:"max_retries,omitempty"`
Steps []CreateStepSpec `json:"steps,omitempty"`
}
// CreateStepSpec specifies a step in the saga.
type CreateStepSpec struct {
Name string `json:"name"`
Action string `json:"action"`
DependsOn []string `json:"depends_on,omitempty"`
Compensate string `json:"compensate,omitempty"`
Config map[string]any `json:"config,omitempty"`
RetryPolicy *domain.RetryPolicy `json:"retry_policy,omitempty"`
}
// SagaResponse is the API response for a saga.
type SagaResponse struct {
ID string `json:"id"`
Name string `json:"name"`
Status string `json:"status"`
CurrentStep string `json:"current_step,omitempty"`
RetryCount int `json:"retry_count"`
MaxRetries int `json:"max_retries"`
Error string `json:"error,omitempty"`
Steps []StepResponse `json:"steps,omitempty"`
Outputs map[string]map[string]any `json:"outputs,omitempty"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
CompletedAt string `json:"completed_at,omitempty"`
}
// StepResponse is the API response for a saga step.
type StepResponse struct {
ID string `json:"id"`
Name string `json:"name"`
Status string `json:"status"`
Action string `json:"action"`
DependsOn []string `json:"depends_on,omitempty"`
Compensate string `json:"compensate,omitempty"`
RetryCount int `json:"retry_count"`
Output map[string]any `json:"output,omitempty"`
Error string `json:"error,omitempty"`
StartedAt string `json:"started_at,omitempty"`
CompletedAt string `json:"completed_at,omitempty"`
}
// Create creates and starts a new saga.
// POST /sagas
func (h *SagaHandler) Create(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), TimeoutOrchestration)
defer cancel()
var req CreateSagaRequest
if err := api.DecodeJSON(r, &req); err != nil {
api.WriteBadRequest(w, r, "invalid request body")
return
}
v := validate.New()
v.Required(req.Name, "name")
if len(req.Steps) > 0 {
for i, step := range req.Steps {
v.Required(step.Name, "steps["+string(rune('0'+i))+"].name")
v.Required(step.Action, "steps["+string(rune('0'+i))+"].action")
}
}
if err := v.Error(); err != nil {
api.WriteBadRequest(w, r, err.Error())
return
}
// Build saga from request
saga := &domain.Saga{
Name: req.Name,
Status: domain.SagaStatusPending,
Definition: req.Definition,
Vars: req.Vars,
MaxRetries: req.MaxRetries,
Outputs: make(map[string]map[string]any),
}
if saga.MaxRetries <= 0 {
saga.MaxRetries = 3
}
// Build steps
for _, spec := range req.Steps {
step := domain.SagaStep{
Name: spec.Name,
Status: domain.StepStatusPending,
Action: spec.Action,
DependsOn: spec.DependsOn,
Compensate: spec.Compensate,
Config: spec.Config,
}
if spec.RetryPolicy != nil {
step.RetryPolicy = *spec.RetryPolicy
} else {
step.RetryPolicy = domain.DefaultRetryPolicy()
}
saga.Steps = append(saga.Steps, step)
}
// Create saga in database
if err := h.repo.Create(ctx, saga); err != nil {
api.WriteInternalError(w, r, "failed to create saga: "+err.Error())
return
}
// Start execution asynchronously (don't wait for completion)
go func() {
execCtx := context.WithoutCancel(ctx)
if err := h.executor.Execute(execCtx, saga); err != nil {
// Error is already recorded in the saga
_ = err
}
}()
api.WriteCreated(w, r, sagaToResponse(saga))
}
// Get returns a saga by ID.
// GET /sagas/{id}
func (h *SagaHandler) Get(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), TimeoutLookup)
defer cancel()
id := chi.URLParam(r, "id")
if id == "" {
api.WriteBadRequest(w, r, "id is required")
return
}
saga, err := h.repo.Get(ctx, id)
if err != nil {
if err == postgres.ErrSagaNotFound {
api.WriteNotFound(w, r, "saga not found")
return
}
api.WriteInternalError(w, r, "failed to get saga: "+err.Error())
return
}
api.WriteSuccess(w, r, sagaToResponse(saga))
}
// List returns sagas matching filters.
// GET /sagas
func (h *SagaHandler) List(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), TimeoutLookup)
defer cancel()
filters := domain.SagaFilters{
Name: r.URL.Query().Get("name"),
Status: domain.SagaStatus(r.URL.Query().Get("status")),
}
filters.Normalize()
sagas, err := h.repo.List(ctx, filters)
if err != nil {
api.WriteInternalError(w, r, "failed to list sagas: "+err.Error())
return
}
resp := make([]SagaResponse, len(sagas))
for i, saga := range sagas {
resp[i] = sagaToResponse(saga)
}
api.WriteSuccess(w, r, resp)
}
// Delete removes a saga.
// DELETE /sagas/{id}
func (h *SagaHandler) Delete(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), TimeoutStandard)
defer cancel()
id := chi.URLParam(r, "id")
if id == "" {
api.WriteBadRequest(w, r, "id is required")
return
}
if err := h.repo.Delete(ctx, id); err != nil {
if err == postgres.ErrSagaNotFound {
api.WriteNotFound(w, r, "saga not found")
return
}
api.WriteInternalError(w, r, "failed to delete saga: "+err.Error())
return
}
api.WriteSuccess(w, r, map[string]string{
"status": "deleted",
"id": id,
})
}
// Retry resumes a failed saga from the last failed step.
// POST /sagas/{id}/retry
func (h *SagaHandler) Retry(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), TimeoutOrchestration)
defer cancel()
id := chi.URLParam(r, "id")
if id == "" {
api.WriteBadRequest(w, r, "id is required")
return
}
// Start resumption asynchronously
go func() {
execCtx := context.WithoutCancel(ctx)
if err := h.executor.Resume(execCtx, id); err != nil {
// Error is already recorded in the saga
_ = err
}
}()
// Return current saga state
saga, err := h.repo.Get(ctx, id)
if err != nil {
if err == postgres.ErrSagaNotFound {
api.WriteNotFound(w, r, "saga not found")
return
}
api.WriteInternalError(w, r, "failed to get saga: "+err.Error())
return
}
api.WriteSuccess(w, r, sagaToResponse(saga))
}
// Rollback triggers compensation for a failed saga.
// POST /sagas/{id}/rollback
func (h *SagaHandler) Rollback(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), TimeoutOrchestration)
defer cancel()
id := chi.URLParam(r, "id")
if id == "" {
api.WriteBadRequest(w, r, "id is required")
return
}
// Start compensation asynchronously
go func() {
execCtx := context.WithoutCancel(ctx)
if err := h.executor.Compensate(execCtx, id); err != nil {
// Error is already recorded in the saga
_ = err
}
}()
// Return current saga state
saga, err := h.repo.Get(ctx, id)
if err != nil {
if err == postgres.ErrSagaNotFound {
api.WriteNotFound(w, r, "saga not found")
return
}
api.WriteInternalError(w, r, "failed to get saga: "+err.Error())
return
}
api.WriteSuccess(w, r, sagaToResponse(saga))
}
// RetryStep retries a specific failed step.
// POST /sagas/{id}/steps/{step}/retry
func (h *SagaHandler) RetryStep(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), TimeoutOrchestration)
defer cancel()
id := chi.URLParam(r, "id")
step := chi.URLParam(r, "step")
if id == "" || step == "" {
api.WriteBadRequest(w, r, "id and step are required")
return
}
// Start retry asynchronously
go func() {
execCtx := context.WithoutCancel(ctx)
if err := h.executor.RetryStep(execCtx, id, step); err != nil {
// Error is already recorded in the saga
_ = err
}
}()
// Return current saga state
saga, err := h.repo.Get(ctx, id)
if err != nil {
if err == postgres.ErrSagaNotFound {
api.WriteNotFound(w, r, "saga not found")
return
}
api.WriteInternalError(w, r, "failed to get saga: "+err.Error())
return
}
api.WriteSuccess(w, r, sagaToResponse(saga))
}
// SkipStep skips a step and continues execution.
// POST /sagas/{id}/steps/{step}/skip
func (h *SagaHandler) SkipStep(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), TimeoutOrchestration)
defer cancel()
id := chi.URLParam(r, "id")
step := chi.URLParam(r, "step")
if id == "" || step == "" {
api.WriteBadRequest(w, r, "id and step are required")
return
}
// Start skip and continue asynchronously
go func() {
execCtx := context.WithoutCancel(ctx)
if err := h.executor.SkipStep(execCtx, id, step); err != nil {
// Error is already recorded in the saga
_ = err
}
}()
// Return current saga state
saga, err := h.repo.Get(ctx, id)
if err != nil {
if err == postgres.ErrSagaNotFound {
api.WriteNotFound(w, r, "saga not found")
return
}
api.WriteInternalError(w, r, "failed to get saga: "+err.Error())
return
}
api.WriteSuccess(w, r, sagaToResponse(saga))
}
// sagaToResponse converts a domain saga to an API response.
func sagaToResponse(saga *domain.Saga) SagaResponse {
resp := SagaResponse{
ID: saga.ID,
Name: saga.Name,
Status: string(saga.Status),
CurrentStep: saga.CurrentStep,
RetryCount: saga.RetryCount,
MaxRetries: saga.MaxRetries,
Error: saga.Error,
Outputs: saga.Outputs,
CreatedAt: saga.CreatedAt.Format("2006-01-02T15:04:05Z07:00"),
UpdatedAt: saga.UpdatedAt.Format("2006-01-02T15:04:05Z07:00"),
}
if saga.CompletedAt != nil {
resp.CompletedAt = saga.CompletedAt.Format("2006-01-02T15:04:05Z07:00")
}
for _, step := range saga.Steps {
stepResp := StepResponse{
ID: step.ID,
Name: step.Name,
Status: string(step.Status),
Action: step.Action,
DependsOn: step.DependsOn,
Compensate: step.Compensate,
RetryCount: step.RetryCount,
Output: step.Output,
Error: step.Error,
}
if step.StartedAt != nil {
stepResp.StartedAt = step.StartedAt.Format("2006-01-02T15:04:05Z07:00")
}
if step.CompletedAt != nil {
stepResp.CompletedAt = step.CompletedAt.Format("2006-01-02T15:04:05Z07:00")
}
resp.Steps = append(resp.Steps, stepResp)
}
return resp
}