rdev/internal/domain/saga.go
jordan f20fc6c51c
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
feat(saga): implement enterprise-grade resilience architecture
Fixes issues from code review of resilience implementation:

- Wire saga system in main.go (SagaRepository, SagaExecutor, SagaHandler)
- Fix CompletedSteps() to include skipped steps for dependency resolution
- Fix reverse loop bug in saga compensation (use standard swap pattern)
- Add circuit breaker state change callbacks for Prometheus metrics

Phase 1 (Build Resilience):
- Add failure:retry to all component Kaniko build steps
- Add preflight registry health check before builds
- Add services-deployed sync point to decouple docs from critical path

Phase 2 (API Resilience):
- Add pipeline retry endpoint (POST /projects/{id}/pipelines/{number}/retry)
- Wire circuit breakers with metrics callbacks
- Add /health/circuits endpoint for circuit breaker status

Phase 3 (Saga Engine):
- Full domain model (Saga, SagaStep, RetryPolicy, BackoffType)
- PostgreSQL saga repository with CRUD and step management
- Saga executor with retry, compensation, skip step support
- Saga API handlers with CRUD and control operations

Phase 4 (Observability):
- Add saga metrics (total, step_duration, retry, circuit_breaker_state)
- Add logging fields (saga_id, saga_name, step_name)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-08 01:58:02 -07:00

246 lines
7.7 KiB
Go

package domain
import (
"time"
)
// SagaStatus represents the status of a saga execution.
type SagaStatus string
const (
// SagaStatusPending indicates the saga hasn't started yet.
SagaStatusPending SagaStatus = "pending"
// SagaStatusRunning indicates the saga is executing.
SagaStatusRunning SagaStatus = "running"
// SagaStatusCompleted indicates the saga finished successfully.
SagaStatusCompleted SagaStatus = "completed"
// SagaStatusFailed indicates the saga failed and may need compensation.
SagaStatusFailed SagaStatus = "failed"
// SagaStatusCompensating indicates compensation is in progress.
SagaStatusCompensating SagaStatus = "compensating"
// SagaStatusCompensated indicates compensation completed successfully.
SagaStatusCompensated SagaStatus = "compensated"
)
// IsValid returns true if the status is known.
func (s SagaStatus) IsValid() bool {
switch s {
case SagaStatusPending, SagaStatusRunning, SagaStatusCompleted,
SagaStatusFailed, SagaStatusCompensating, SagaStatusCompensated:
return true
}
return false
}
// IsTerminal returns true if the status is a final state.
func (s SagaStatus) IsTerminal() bool {
return s == SagaStatusCompleted || s == SagaStatusFailed || s == SagaStatusCompensated
}
// StepStatus represents the status of a saga step.
type StepStatus string
const (
StepStatusPending StepStatus = "pending"
StepStatusRunning StepStatus = "running"
StepStatusCompleted StepStatus = "completed"
StepStatusFailed StepStatus = "failed"
StepStatusSkipped StepStatus = "skipped"
)
// IsValid returns true if the status is known.
func (s StepStatus) IsValid() bool {
switch s {
case StepStatusPending, StepStatusRunning, StepStatusCompleted,
StepStatusFailed, StepStatusSkipped:
return true
}
return false
}
// IsTerminal returns true if the status is a final state.
func (s StepStatus) IsTerminal() bool {
return s == StepStatusCompleted || s == StepStatusFailed || s == StepStatusSkipped
}
// BackoffType represents the type of retry backoff.
type BackoffType string
const (
BackoffNone BackoffType = "none"
BackoffLinear BackoffType = "linear"
BackoffExponential BackoffType = "exponential"
)
// RetryPolicy defines how a step should be retried on failure.
type RetryPolicy struct {
// MaxAttempts is the maximum number of retry attempts (including initial).
MaxAttempts int `json:"max_attempts"`
// BackoffType is the type of backoff between retries.
BackoffType BackoffType `json:"backoff_type"`
// InitialDelay is the initial delay between retries.
InitialDelay time.Duration `json:"initial_delay"`
// MaxDelay is the maximum delay between retries.
MaxDelay time.Duration `json:"max_delay"`
}
// DefaultRetryPolicy returns a sensible default retry policy.
func DefaultRetryPolicy() RetryPolicy {
return RetryPolicy{
MaxAttempts: 3,
BackoffType: BackoffExponential,
InitialDelay: 5 * time.Second,
MaxDelay: 60 * time.Second,
}
}
// SagaStep represents a single step in a saga.
type SagaStep struct {
// ID is the unique identifier for this step instance.
ID string `json:"id"`
// SagaID is the saga this step belongs to.
SagaID string `json:"saga_id"`
// Name is the step name (unique within the saga).
Name string `json:"name"`
// Status is the current step status.
Status StepStatus `json:"status"`
// Action is the step action type (api, wait_pipeline, wait_build, shell).
Action string `json:"action"`
// DependsOn lists step names that must complete before this step can run.
DependsOn []string `json:"depends_on,omitempty"`
// RetryPolicy defines the retry behavior for this step.
RetryPolicy RetryPolicy `json:"retry_policy"`
// Compensate is the name of the compensation step to run on rollback.
Compensate string `json:"compensate,omitempty"`
// Config contains action-specific configuration.
Config map[string]any `json:"config,omitempty"`
// Output contains the step output after completion.
Output map[string]any `json:"output,omitempty"`
// Error contains the error message if the step failed.
Error string `json:"error,omitempty"`
// RetryCount is the number of times this step has been retried.
RetryCount int `json:"retry_count"`
// StartedAt is when the step started executing.
StartedAt *time.Time `json:"started_at,omitempty"`
// CompletedAt is when the step finished (success or failure).
CompletedAt *time.Time `json:"completed_at,omitempty"`
}
// CanRun returns true if this step is ready to execute.
func (s *SagaStep) CanRun(completedSteps map[string]bool) bool {
if s.Status != StepStatusPending {
return false
}
for _, dep := range s.DependsOn {
if !completedSteps[dep] {
return false
}
}
return true
}
// Saga represents a multi-step workflow with compensation support.
type Saga struct {
// ID is the unique identifier for this saga.
ID string `json:"id"`
// Name is the saga name (from the definition).
Name string `json:"name"`
// Status is the current saga status.
Status SagaStatus `json:"status"`
// Definition is the YAML definition used to create this saga.
Definition string `json:"definition,omitempty"`
// Vars contains template variables for step configuration.
Vars map[string]string `json:"vars,omitempty"`
// Outputs contains outputs from completed steps, keyed by step name.
Outputs map[string]map[string]any `json:"outputs,omitempty"`
// CurrentStep is the name of the currently executing step.
CurrentStep string `json:"current_step,omitempty"`
// RetryCount is the number of times the saga has been retried.
RetryCount int `json:"retry_count"`
// MaxRetries is the maximum number of saga-level retries.
MaxRetries int `json:"max_retries"`
// Error contains the error message if the saga failed.
Error string `json:"error,omitempty"`
// Steps contains all steps in this saga.
Steps []SagaStep `json:"steps,omitempty"`
// CreatedAt is when the saga was created.
CreatedAt time.Time `json:"created_at"`
// UpdatedAt is when the saga was last updated.
UpdatedAt time.Time `json:"updated_at"`
// CompletedAt is when the saga finished (success, failure, or compensation).
CompletedAt *time.Time `json:"completed_at,omitempty"`
}
// CompletedSteps returns a map of step names that have completed (successfully or skipped).
// Skipped steps are considered complete for dependency resolution purposes.
func (s *Saga) CompletedSteps() map[string]bool {
completed := make(map[string]bool)
for _, step := range s.Steps {
if step.Status == StepStatusCompleted || step.Status == StepStatusSkipped {
completed[step.Name] = true
}
}
return completed
}
// GetStep returns a step by name, or nil if not found.
func (s *Saga) GetStep(name string) *SagaStep {
for i := range s.Steps {
if s.Steps[i].Name == name {
return &s.Steps[i]
}
}
return nil
}
// FailedStep returns the first failed step, or nil if none failed.
func (s *Saga) FailedStep() *SagaStep {
for i := range s.Steps {
if s.Steps[i].Status == StepStatusFailed {
return &s.Steps[i]
}
}
return nil
}
// RunnableSteps returns steps that are ready to execute.
func (s *Saga) RunnableSteps() []SagaStep {
completed := s.CompletedSteps()
var runnable []SagaStep
for _, step := range s.Steps {
if step.CanRun(completed) {
runnable = append(runnable, step)
}
}
return runnable
}
// SagaFilters specifies criteria for listing sagas.
type SagaFilters struct {
// Name filters by saga name.
Name string
// Status filters by saga status.
Status SagaStatus
// Since filters sagas created after this time.
Since time.Time
// Limit is the maximum number of sagas to return.
Limit int
}
// DefaultSagaFilters returns filters with default values.
func DefaultSagaFilters() SagaFilters {
return SagaFilters{
Limit: 50,
}
}
// Normalize applies defaults and limits to the filters.
func (f *SagaFilters) Normalize() {
if f.Limit <= 0 {
f.Limit = 50
}
if f.Limit > 200 {
f.Limit = 200
}
}