feat(saga): implement enterprise-grade resilience architecture
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed

Fixes issues from code review of resilience implementation:

- Wire saga system in main.go (SagaRepository, SagaExecutor, SagaHandler)
- Fix CompletedSteps() to include skipped steps for dependency resolution
- Fix reverse loop bug in saga compensation (use standard swap pattern)
- Add circuit breaker state change callbacks for Prometheus metrics

Phase 1 (Build Resilience):
- Add failure:retry to all component Kaniko build steps
- Add preflight registry health check before builds
- Add services-deployed sync point to decouple docs from critical path

Phase 2 (API Resilience):
- Add pipeline retry endpoint (POST /projects/{id}/pipelines/{number}/retry)
- Wire circuit breakers with metrics callbacks
- Add /health/circuits endpoint for circuit breaker status

Phase 3 (Saga Engine):
- Full domain model (Saga, SagaStep, RetryPolicy, BackoffType)
- PostgreSQL saga repository with CRUD and step management
- Saga executor with retry, compensation, skip step support
- Saga API handlers with CRUD and control operations

Phase 4 (Observability):
- Add saga metrics (total, step_duration, retry, circuit_breaker_state)
- Add logging fields (saga_id, saga_name, step_name)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
jordan 2026-02-08 01:58:02 -07:00
parent 1a2a36e11b
commit f20fc6c51c
26 changed files with 2212 additions and 12 deletions

View File

@ -430,6 +430,11 @@ func main() {
sdlcHandler := handlers.NewSDLCHandler(sdlcService)
sdlcOrchestratorHandler := handlers.NewSDLCOrchestratorHandler(sdlcOrchestrator)
// Initialize saga system (resilient workflow orchestration)
sagaRepo := postgres.NewSagaRepository(database.DB)
sagaExecutor := service.NewSagaExecutor(sagaRepo, logger)
sagaHandler := handlers.NewSagaHandler(sagaRepo, sagaExecutor)
// SDLC generate service (async artifact generation via work queue)
apiBaseURL := envutil.GetEnv("RDEV_API_URL", "https://rdev.masq-ops.orchard9.ai")
sdlcGenerateService := service.NewSDLCGenerateService(
@ -502,6 +507,7 @@ func main() {
app.Router().Get("/health", healthHandler.Health)
app.Router().Get("/ready", healthHandler.Ready)
app.Router().Get("/health/circuits", healthHandler.Circuits)
// Register routes
projectsHandler.Mount(app.Router())
@ -529,6 +535,7 @@ func main() {
sdlcGenerateHandler.Mount(app.Router())
sdlcCallbackHandler.Mount(app.Router())
verifyHandler.Mount(app.Router())
sagaHandler.Mount(app.Router())
// Start queue processor worker (per-project command queue)
queueProcessor := worker.NewQueueProcessor(

View File

@ -0,0 +1,495 @@
// Package postgres provides PostgreSQL-based implementations of port interfaces.
package postgres
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"strings"
"github.com/lib/pq"
"github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/port"
)
// SagaRepository implements port.SagaRepository using PostgreSQL.
type SagaRepository struct {
db *sql.DB
}
// NewSagaRepository creates a new PostgreSQL saga repository.
func NewSagaRepository(db *sql.DB) *SagaRepository {
return &SagaRepository{db: db}
}
// Ensure SagaRepository implements port.SagaRepository at compile time.
var _ port.SagaRepository = (*SagaRepository)(nil)
// ErrSagaNotFound is returned when a saga is not found.
var ErrSagaNotFound = errors.New("saga not found")
// Create creates a new saga with its steps.
func (r *SagaRepository) Create(ctx context.Context, saga *domain.Saga) error {
tx, err := r.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("begin transaction: %w", err)
}
defer func() { _ = tx.Rollback() }()
varsJSON, err := json.Marshal(saga.Vars)
if err != nil {
return fmt.Errorf("marshal vars: %w", err)
}
outputsJSON, err := json.Marshal(saga.Outputs)
if err != nil {
return fmt.Errorf("marshal outputs: %w", err)
}
// Insert saga
err = tx.QueryRowContext(ctx, `
INSERT INTO sagas (
name, status, definition, vars, outputs, current_step,
retry_count, max_retries, error
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
RETURNING id, created_at, updated_at
`,
saga.Name,
string(saga.Status),
nullString(saga.Definition),
varsJSON,
outputsJSON,
nullString(saga.CurrentStep),
saga.RetryCount,
saga.MaxRetries,
nullString(saga.Error),
).Scan(&saga.ID, &saga.CreatedAt, &saga.UpdatedAt)
if err != nil {
return fmt.Errorf("insert saga: %w", err)
}
// Insert steps
for i := range saga.Steps {
step := &saga.Steps[i]
step.SagaID = saga.ID
retryPolicyJSON, err := json.Marshal(step.RetryPolicy)
if err != nil {
return fmt.Errorf("marshal retry policy: %w", err)
}
configJSON, err := json.Marshal(step.Config)
if err != nil {
return fmt.Errorf("marshal config: %w", err)
}
err = tx.QueryRowContext(ctx, `
INSERT INTO saga_steps (
saga_id, name, status, action, depends_on, retry_policy,
compensate, config, retry_count
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
RETURNING id
`,
saga.ID,
step.Name,
string(step.Status),
step.Action,
pq.Array(step.DependsOn),
retryPolicyJSON,
nullString(step.Compensate),
configJSON,
step.RetryCount,
).Scan(&step.ID)
if err != nil {
return fmt.Errorf("insert step %s: %w", step.Name, err)
}
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("commit transaction: %w", err)
}
return nil
}
// Get returns a saga by ID, including all steps.
func (r *SagaRepository) Get(ctx context.Context, id string) (*domain.Saga, error) {
saga, err := r.getSaga(ctx, id)
if err != nil {
return nil, err
}
steps, err := r.getSteps(ctx, id)
if err != nil {
return nil, err
}
saga.Steps = steps
return saga, nil
}
// getSaga retrieves just the saga record (no steps).
func (r *SagaRepository) getSaga(ctx context.Context, id string) (*domain.Saga, error) {
row := r.db.QueryRowContext(ctx, `
SELECT id, name, status, definition, vars, outputs, current_step,
retry_count, max_retries, error, created_at, updated_at, completed_at
FROM sagas
WHERE id = $1
`, id)
return r.scanSaga(row)
}
// getSteps retrieves all steps for a saga.
func (r *SagaRepository) getSteps(ctx context.Context, sagaID string) ([]domain.SagaStep, error) {
rows, err := r.db.QueryContext(ctx, `
SELECT id, saga_id, name, status, action, depends_on, retry_policy,
compensate, config, output, error, retry_count, started_at, completed_at
FROM saga_steps
WHERE saga_id = $1
ORDER BY id
`, sagaID)
if err != nil {
return nil, fmt.Errorf("query steps: %w", err)
}
defer func() { _ = rows.Close() }()
var steps []domain.SagaStep
for rows.Next() {
step, err := r.scanStep(rows)
if err != nil {
return nil, err
}
steps = append(steps, *step)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate steps: %w", err)
}
return steps, nil
}
// Update updates a saga's status and metadata (not steps).
func (r *SagaRepository) Update(ctx context.Context, saga *domain.Saga) error {
varsJSON, err := json.Marshal(saga.Vars)
if err != nil {
return fmt.Errorf("marshal vars: %w", err)
}
outputsJSON, err := json.Marshal(saga.Outputs)
if err != nil {
return fmt.Errorf("marshal outputs: %w", err)
}
res, err := r.db.ExecContext(ctx, `
UPDATE sagas SET
status = $2,
vars = $3,
outputs = $4,
current_step = $5,
retry_count = $6,
error = $7,
completed_at = $8
WHERE id = $1
`,
saga.ID,
string(saga.Status),
varsJSON,
outputsJSON,
nullString(saga.CurrentStep),
saga.RetryCount,
nullString(saga.Error),
nullTime(saga.CompletedAt),
)
if err != nil {
return fmt.Errorf("update saga: %w", err)
}
rows, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("rows affected: %w", err)
}
if rows == 0 {
return ErrSagaNotFound
}
return nil
}
// UpdateStep updates a single step's status and output.
func (r *SagaRepository) UpdateStep(ctx context.Context, step *domain.SagaStep) error {
var outputJSON []byte
var err error
if step.Output != nil {
outputJSON, err = json.Marshal(step.Output)
if err != nil {
return fmt.Errorf("marshal output: %w", err)
}
}
res, err := r.db.ExecContext(ctx, `
UPDATE saga_steps SET
status = $2,
output = $3,
error = $4,
retry_count = $5,
started_at = $6,
completed_at = $7
WHERE id = $1
`,
step.ID,
string(step.Status),
outputJSON,
nullString(step.Error),
step.RetryCount,
nullTime(step.StartedAt),
nullTime(step.CompletedAt),
)
if err != nil {
return fmt.Errorf("update step: %w", err)
}
rows, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("rows affected: %w", err)
}
if rows == 0 {
return fmt.Errorf("step %s not found", step.ID)
}
return nil
}
// List returns sagas matching the given filters.
func (r *SagaRepository) List(ctx context.Context, filters domain.SagaFilters) ([]*domain.Saga, error) {
filters.Normalize()
query := strings.Builder{}
query.WriteString(`
SELECT id, name, status, definition, vars, outputs, current_step,
retry_count, max_retries, error, created_at, updated_at, completed_at
FROM sagas
WHERE 1=1
`)
args := []any{}
argNum := 1
if filters.Name != "" {
fmt.Fprintf(&query, " AND name = $%d", argNum)
args = append(args, filters.Name)
argNum++
}
if filters.Status != "" {
fmt.Fprintf(&query, " AND status = $%d", argNum)
args = append(args, string(filters.Status))
argNum++
}
if !filters.Since.IsZero() {
fmt.Fprintf(&query, " AND created_at >= $%d", argNum)
args = append(args, filters.Since)
argNum++
}
query.WriteString(" ORDER BY created_at DESC")
fmt.Fprintf(&query, " LIMIT $%d", argNum)
args = append(args, filters.Limit)
rows, err := r.db.QueryContext(ctx, query.String(), args...)
if err != nil {
return nil, fmt.Errorf("query sagas: %w", err)
}
defer func() { _ = rows.Close() }()
var sagas []*domain.Saga
for rows.Next() {
saga, err := r.scanSagaRows(rows)
if err != nil {
return nil, err
}
sagas = append(sagas, saga)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate sagas: %w", err)
}
return sagas, nil
}
// Delete removes a saga and its steps (cascade).
func (r *SagaRepository) Delete(ctx context.Context, id string) error {
res, err := r.db.ExecContext(ctx, `DELETE FROM sagas WHERE id = $1`, id)
if err != nil {
return fmt.Errorf("delete saga: %w", err)
}
rows, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("rows affected: %w", err)
}
if rows == 0 {
return ErrSagaNotFound
}
return nil
}
// GetPendingSteps returns steps ready to execute (no unmet dependencies).
func (r *SagaRepository) GetPendingSteps(ctx context.Context, sagaID string) ([]domain.SagaStep, error) {
// Get all steps
steps, err := r.getSteps(ctx, sagaID)
if err != nil {
return nil, err
}
// Build completed steps map (includes skipped steps for dependency resolution)
completed := make(map[string]bool)
for _, step := range steps {
if step.Status == domain.StepStatusCompleted || step.Status == domain.StepStatusSkipped {
completed[step.Name] = true
}
}
// Find runnable steps
var runnable []domain.SagaStep
for _, step := range steps {
if step.CanRun(completed) {
runnable = append(runnable, step)
}
}
return runnable, nil
}
// sagaScanner interface abstracts sql.Row and sql.Rows for scanning.
type sagaScanner interface {
Scan(dest ...any) error
}
// scanSaga scans a saga from a QueryRow result.
func (r *SagaRepository) scanSaga(row *sql.Row) (*domain.Saga, error) {
saga, err := r.scanSagaFrom(row)
if errors.Is(err, sql.ErrNoRows) {
return nil, ErrSagaNotFound
}
return saga, err
}
// scanSagaRows scans a saga from a Rows result.
func (r *SagaRepository) scanSagaRows(rows *sql.Rows) (*domain.Saga, error) {
return r.scanSagaFrom(rows)
}
// scanSagaFrom scans a saga from any scanner (Row or Rows).
func (r *SagaRepository) scanSagaFrom(scanner sagaScanner) (*domain.Saga, error) {
var saga domain.Saga
var status string
var definition, currentStep, sagaError sql.NullString
var completedAt sql.NullTime
var varsJSON, outputsJSON []byte
err := scanner.Scan(
&saga.ID, &saga.Name, &status, &definition, &varsJSON, &outputsJSON,
&currentStep, &saga.RetryCount, &saga.MaxRetries, &sagaError,
&saga.CreatedAt, &saga.UpdatedAt, &completedAt,
)
if err != nil {
return nil, fmt.Errorf("scan saga: %w", err)
}
saga.Status = domain.SagaStatus(status)
if definition.Valid {
saga.Definition = definition.String
}
if currentStep.Valid {
saga.CurrentStep = currentStep.String
}
if sagaError.Valid {
saga.Error = sagaError.String
}
if completedAt.Valid {
saga.CompletedAt = &completedAt.Time
}
if len(varsJSON) > 0 {
if err := json.Unmarshal(varsJSON, &saga.Vars); err != nil {
return nil, fmt.Errorf("unmarshal vars: %w", err)
}
}
if len(outputsJSON) > 0 {
if err := json.Unmarshal(outputsJSON, &saga.Outputs); err != nil {
return nil, fmt.Errorf("unmarshal outputs: %w", err)
}
}
return &saga, nil
}
// scanStep scans a step from a Rows result.
func (r *SagaRepository) scanStep(rows *sql.Rows) (*domain.SagaStep, error) {
var step domain.SagaStep
var status string
var compensate, stepError sql.NullString
var startedAt, completedAt sql.NullTime
var retryPolicyJSON, configJSON, outputJSON []byte
var dependsOn pq.StringArray
err := rows.Scan(
&step.ID,
&step.SagaID,
&step.Name,
&status,
&step.Action,
&dependsOn,
&retryPolicyJSON,
&compensate,
&configJSON,
&outputJSON,
&stepError,
&step.RetryCount,
&startedAt,
&completedAt,
)
if err != nil {
return nil, fmt.Errorf("scan step: %w", err)
}
step.Status = domain.StepStatus(status)
step.DependsOn = []string(dependsOn)
if compensate.Valid {
step.Compensate = compensate.String
}
if stepError.Valid {
step.Error = stepError.String
}
if startedAt.Valid {
step.StartedAt = &startedAt.Time
}
if completedAt.Valid {
step.CompletedAt = &completedAt.Time
}
if len(retryPolicyJSON) > 0 {
if err := json.Unmarshal(retryPolicyJSON, &step.RetryPolicy); err != nil {
return nil, fmt.Errorf("unmarshal retry policy: %w", err)
}
}
if len(configJSON) > 0 {
if err := json.Unmarshal(configJSON, &step.Config); err != nil {
return nil, fmt.Errorf("unmarshal config: %w", err)
}
}
if len(outputJSON) > 0 {
if err := json.Unmarshal(outputJSON, &step.Output); err != nil {
return nil, fmt.Errorf("unmarshal output: %w", err)
}
}
return &step, nil
}

View File

@ -2,8 +2,9 @@
# Add this step to your .woodpecker.yml
build-{{COMPONENT_NAME}}:
depends_on: [deps]
depends_on: [preflight]
image: woodpeckerci/plugin-kaniko
failure: retry
settings:
registry: registry.threesix.ai
repo: {{PROJECT_NAME}}/{{COMPONENT_NAME}}

View File

@ -2,8 +2,9 @@
# Add this step to your .woodpecker.yml
build-{{COMPONENT_NAME}}:
depends_on: [deps]
depends_on: [preflight]
image: woodpeckerci/plugin-kaniko
failure: retry
settings:
registry: registry.threesix.ai
repo: {{PROJECT_NAME}}/{{COMPONENT_NAME}}

View File

@ -2,8 +2,9 @@
# Add this step to your .woodpecker.yml
build-{{COMPONENT_NAME}}:
depends_on: [deps]
depends_on: [preflight]
image: woodpeckerci/plugin-kaniko
failure: retry
settings:
registry: registry.threesix.ai
repo: {{PROJECT_NAME}}/{{COMPONENT_NAME}}

View File

@ -2,8 +2,9 @@
# Add this step to your .woodpecker.yml
build-{{COMPONENT_NAME}}:
depends_on: [deps]
depends_on: [preflight]
image: woodpeckerci/plugin-kaniko
failure: retry
settings:
registry: registry.threesix.ai
repo: {{PROJECT_NAME}}/{{COMPONENT_NAME}}

View File

@ -2,8 +2,9 @@
# Add this step to your .woodpecker.yml
build-{{COMPONENT_NAME}}:
depends_on: [deps]
depends_on: [preflight]
image: woodpeckerci/plugin-kaniko
failure: retry
settings:
registry: registry.threesix.ai
repo: {{PROJECT_NAME}}/{{COMPONENT_NAME}}

View File

@ -34,6 +34,24 @@ steps:
branch: main
event: push
# Pre-flight registry health check before builds
preflight:
depends_on: [deps]
image: alpine/curl
commands:
- |
echo "==> Checking registry health before builds"
HTTP_CODE=$(curl -s -o /dev/null -w "%{http_code}" --insecure --connect-timeout 10 https://registry.threesix.ai/v2/)
if [ "$HTTP_CODE" != "200" ] && [ "$HTTP_CODE" != "401" ]; then
echo "ERROR: Registry unhealthy (HTTP $HTTP_CODE), aborting build"
echo "Registry must return 200 or 401 (auth required) to proceed"
exit 1
fi
echo "==> Registry healthy (HTTP $HTTP_CODE)"
when:
branch: main
event: push
# COMPONENT_STEPS_BELOW
# Do not remove the marker above - component steps are inserted here
@ -48,10 +66,26 @@ steps:
branch: main
event: push
# Export OpenAPI specs from built services
# Runs after build-complete to ensure all services are ready
export-openapi:
# Services deployed sync point - fires after all deployments complete
# Use this to detect when services are ready (before docs generation)
# This allows wait_pipeline to succeed before docs steps run
services-deployed:
depends_on: [build-complete]
image: alpine:3.19
commands:
- echo "==> All services deployed successfully"
- echo " Pipeline is now considered successful for service deployment"
- echo " Documentation generation continues independently"
when:
branch: main
event: push
# Export OpenAPI specs from built services
# Runs after services-deployed to ensure all services are ready
# Uses failure:ignore so doc failures don't block pipeline success
export-openapi:
depends_on: [services-deployed]
failure: ignore
image: golang:1.23
commands:
- |

View File

@ -24,6 +24,7 @@ import (
"go.woodpecker-ci.org/woodpecker/v3/woodpecker-go/woodpecker"
"github.com/orchard9/rdev/internal/circuitbreaker"
"github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/port"
)
@ -50,6 +51,7 @@ type Client struct {
client woodpecker.Client
url string
logger *slog.Logger
cb *circuitbreaker.CircuitBreaker
}
// NewClient creates a new Woodpecker client.
@ -83,6 +85,7 @@ func NewClient(url, token string, opts ...ClientOption) (*Client, error) {
client: client,
url: url,
logger: slog.Default(),
cb: circuitbreaker.GlobalRegistry.Get(circuitbreaker.NameWoodpecker),
}
// Apply options
@ -105,6 +108,23 @@ func WithLogger(logger *slog.Logger) ClientOption {
}
}
// WithCircuitBreaker sets a custom circuit breaker for the client.
func WithCircuitBreaker(cb *circuitbreaker.CircuitBreaker) ClientOption {
return func(c *Client) {
if cb != nil {
c.cb = cb
}
}
}
// executeWithCircuitBreaker wraps a function call with circuit breaker protection.
func (c *Client) executeWithCircuitBreaker(fn func() error) error {
if c.cb == nil {
return fn()
}
return c.cb.Execute(fn)
}
// ActivateRepo enables CI for a repository.
// The forge parameter is unused (Woodpecker determines this from its config).
// owner/repo must match the repository in the forge.

View File

@ -153,9 +153,14 @@ func (c *Client) TriggerBuild(ctx context.Context, owner, repo, branch string) (
return 0, fmt.Errorf("repo not found: %s", fullName)
}
// Create a new pipeline for the branch
pipeline, err := c.client.PipelineCreate(r.ID, &woodpecker.PipelineOptions{
Branch: branch,
// Create a new pipeline for the branch (with circuit breaker protection)
var pipeline *woodpecker.Pipeline
err = c.executeWithCircuitBreaker(func() error {
var createErr error
pipeline, createErr = c.client.PipelineCreate(r.ID, &woodpecker.PipelineOptions{
Branch: branch,
})
return createErr
})
if err != nil {
return 0, fmt.Errorf("failed to trigger build: %w", err)
@ -165,6 +170,36 @@ func (c *Client) TriggerBuild(ctx context.Context, owner, repo, branch string) (
return pipeline.Number, nil
}
// RetryPipeline restarts a failed or stopped pipeline.
func (c *Client) RetryPipeline(ctx context.Context, owner, repo string, number int64) (*domain.CIPipeline, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
fullName := owner + "/" + repo
r, err := c.client.RepoLookup(fullName)
if err != nil {
return nil, fmt.Errorf("repo not found: %s", fullName)
}
// Restart the pipeline using PipelineStart (with circuit breaker protection)
var pipeline *woodpecker.Pipeline
err = c.executeWithCircuitBreaker(func() error {
var startErr error
pipeline, startErr = c.client.PipelineStart(r.ID, number, woodpecker.PipelineStartOptions{})
return startErr
})
if err != nil {
return nil, fmt.Errorf("failed to retry pipeline %d: %w", number, err)
}
c.logger.Info("pipeline retried", "repo", fullName, "pipeline", number, "new_status", pipeline.Status)
return pipelineFromWoodpecker(pipeline), nil
}
// pipelineFromWoodpecker converts a woodpecker.Pipeline to domain.CIPipeline.
func pipelineFromWoodpecker(p *woodpecker.Pipeline) *domain.CIPipeline {
var started, finished time.Time

View File

@ -41,8 +41,14 @@ var (
ErrCircuitOpen = errors.New("circuit breaker is open")
)
// StateChangeCallback is called when the circuit breaker state changes.
type StateChangeCallback func(name string, from, to State)
// Config configures the circuit breaker behavior.
type Config struct {
// Name is the identifier for this circuit breaker (used in callbacks).
Name string
// FailureThreshold is the number of consecutive failures before opening.
// Default: 5
FailureThreshold int
@ -54,6 +60,9 @@ type Config struct {
// HalfOpenRequests is how many requests to allow in half-open state.
// Default: 1
HalfOpenRequests int
// OnStateChange is called when the circuit breaker state changes.
OnStateChange StateChangeCallback
}
// DefaultConfig returns sensible defaults.
@ -119,8 +128,12 @@ func (cb *CircuitBreaker) canExecute() bool {
case Open:
// Check if reset timeout has passed
if time.Since(cb.lastFailure) > cb.cfg.ResetTimeout {
oldState := cb.state
cb.state = HalfOpen
cb.halfOpenRequests = 0
if cb.cfg.OnStateChange != nil {
cb.cfg.OnStateChange(cb.cfg.Name, oldState, cb.state)
}
return true
}
return false
@ -155,6 +168,7 @@ func (cb *CircuitBreaker) onFailure() {
cb.successes = 0
cb.lastFailure = time.Now()
oldState := cb.state
switch cb.state {
case Closed:
if cb.failures >= cb.cfg.FailureThreshold {
@ -163,12 +177,17 @@ func (cb *CircuitBreaker) onFailure() {
case HalfOpen:
cb.state = Open
}
if oldState != cb.state && cb.cfg.OnStateChange != nil {
cb.cfg.OnStateChange(cb.cfg.Name, oldState, cb.state)
}
}
// onSuccess handles a successful operation.
func (cb *CircuitBreaker) onSuccess() {
cb.successes++
oldState := cb.state
switch cb.state {
case Closed:
cb.failures = 0
@ -177,6 +196,10 @@ func (cb *CircuitBreaker) onSuccess() {
cb.state = Closed
cb.failures = 0
}
if oldState != cb.state && cb.cfg.OnStateChange != nil {
cb.cfg.OnStateChange(cb.cfg.Name, oldState, cb.state)
}
}
// State returns the current circuit state.

View File

@ -0,0 +1,160 @@
// Package circuitbreaker provides protection against cascading failures.
package circuitbreaker
import (
"sync"
"time"
"github.com/orchard9/rdev/internal/metrics"
)
// Registry manages named circuit breakers.
type Registry struct {
mu sync.RWMutex
breakers map[string]*CircuitBreaker
configs map[string]Config
}
// NewRegistry creates a new circuit breaker registry.
func NewRegistry() *Registry {
return &Registry{
breakers: make(map[string]*CircuitBreaker),
configs: make(map[string]Config),
}
}
// Get returns the circuit breaker for the given name, creating one if needed.
func (r *Registry) Get(name string) *CircuitBreaker {
r.mu.RLock()
cb, ok := r.breakers[name]
r.mu.RUnlock()
if ok {
return cb
}
r.mu.Lock()
defer r.mu.Unlock()
// Double-check after acquiring write lock
if cb, ok = r.breakers[name]; ok {
return cb
}
// Use custom config if set, otherwise use defaults
cfg, ok := r.configs[name]
if !ok {
cfg = DefaultConfig()
}
// Ensure name is set for callbacks
cfg.Name = name
// Set up metrics callback if not already configured
if cfg.OnStateChange == nil {
cfg.OnStateChange = defaultStateChangeCallback
}
cb = New(cfg)
r.breakers[name] = cb
return cb
}
// Configure sets the configuration for a named circuit breaker.
// Must be called before Get() for the configuration to take effect.
func (r *Registry) Configure(name string, cfg Config) {
r.mu.Lock()
defer r.mu.Unlock()
r.configs[name] = cfg
}
// StatusEntry contains the status of a single circuit breaker.
type StatusEntry struct {
Name string `json:"name"`
State string `json:"state"`
Failures int `json:"failures"`
LastFailure *time.Time `json:"last_failure,omitempty"`
}
// AllStatus returns the status of all registered circuit breakers.
func (r *Registry) AllStatus() []StatusEntry {
r.mu.RLock()
defer r.mu.RUnlock()
entries := make([]StatusEntry, 0, len(r.breakers))
for name, cb := range r.breakers {
stats := cb.Stats()
entry := StatusEntry{
Name: name,
State: stats.State.String(),
Failures: stats.Failures,
}
if !stats.LastFailure.IsZero() {
t := stats.LastFailure.UTC()
entry.LastFailure = &t
}
entries = append(entries, entry)
}
return entries
}
// Reset resets a specific circuit breaker by name.
func (r *Registry) Reset(name string) bool {
r.mu.RLock()
cb, ok := r.breakers[name]
r.mu.RUnlock()
if !ok {
return false
}
cb.Reset()
return true
}
// ResetAll resets all circuit breakers.
func (r *Registry) ResetAll() {
r.mu.RLock()
defer r.mu.RUnlock()
for _, cb := range r.breakers {
cb.Reset()
}
}
// GlobalRegistry is the default global circuit breaker registry.
var GlobalRegistry = NewRegistry()
// defaultStateChangeCallback updates Prometheus metrics when circuit breaker state changes.
func defaultStateChangeCallback(name string, from, to State) {
// State values: 0=closed, 1=half-open, 2=open
metrics.SetCircuitBreakerState(name, int(to))
}
// Known circuit breaker names for external systems.
const (
NameWoodpecker = "woodpecker"
NameGitea = "gitea"
NameRegistry = "registry"
NameCloudflare = "cloudflare"
)
func init() {
// Configure default circuit breakers with appropriate settings
GlobalRegistry.Configure(NameWoodpecker, Config{
FailureThreshold: 3,
ResetTimeout: 30 * time.Second,
HalfOpenRequests: 1,
})
GlobalRegistry.Configure(NameGitea, Config{
FailureThreshold: 3,
ResetTimeout: 30 * time.Second,
HalfOpenRequests: 1,
})
GlobalRegistry.Configure(NameRegistry, Config{
FailureThreshold: 5,
ResetTimeout: 60 * time.Second,
HalfOpenRequests: 1,
})
GlobalRegistry.Configure(NameCloudflare, Config{
FailureThreshold: 5,
ResetTimeout: 60 * time.Second,
HalfOpenRequests: 1,
})
}

View File

@ -0,0 +1,78 @@
-- Saga pattern for resilient multi-step workflows
-- Sagas track multi-step operations with retry and compensation support
-- +goose Up
-- Main sagas table
CREATE TABLE sagas (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
definition TEXT,
vars JSONB NOT NULL DEFAULT '{}',
outputs JSONB NOT NULL DEFAULT '{}',
current_step TEXT,
retry_count INT NOT NULL DEFAULT 0,
max_retries INT NOT NULL DEFAULT 3,
error TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ,
-- Constraints
CONSTRAINT valid_status CHECK (status IN ('pending', 'running', 'completed', 'failed', 'compensating', 'compensated'))
);
-- Saga steps table
CREATE TABLE saga_steps (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
saga_id UUID NOT NULL REFERENCES sagas(id) ON DELETE CASCADE,
name TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
action TEXT NOT NULL,
depends_on TEXT[] NOT NULL DEFAULT '{}',
retry_policy JSONB NOT NULL DEFAULT '{"max_attempts": 3, "backoff_type": "exponential", "initial_delay": "5s", "max_delay": "60s"}',
compensate TEXT,
config JSONB NOT NULL DEFAULT '{}',
output JSONB,
error TEXT,
retry_count INT NOT NULL DEFAULT 0,
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
-- Constraints
CONSTRAINT unique_step_per_saga UNIQUE (saga_id, name),
CONSTRAINT valid_step_status CHECK (status IN ('pending', 'running', 'completed', 'failed', 'skipped'))
);
-- Indexes for common queries
CREATE INDEX idx_sagas_status ON sagas(status) WHERE status IN ('pending', 'running', 'compensating');
CREATE INDEX idx_sagas_name ON sagas(name);
CREATE INDEX idx_sagas_created_at ON sagas(created_at DESC);
CREATE INDEX idx_saga_steps_saga_id ON saga_steps(saga_id);
CREATE INDEX idx_saga_steps_status ON saga_steps(saga_id, status);
-- Update timestamp trigger
CREATE OR REPLACE FUNCTION update_saga_updated_at()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trigger_saga_updated_at
BEFORE UPDATE ON sagas
FOR EACH ROW
EXECUTE FUNCTION update_saga_updated_at();
-- +goose Down
DROP TRIGGER IF EXISTS trigger_saga_updated_at ON sagas;
DROP FUNCTION IF EXISTS update_saga_updated_at();
DROP INDEX IF EXISTS idx_saga_steps_status;
DROP INDEX IF EXISTS idx_saga_steps_saga_id;
DROP INDEX IF EXISTS idx_sagas_created_at;
DROP INDEX IF EXISTS idx_sagas_name;
DROP INDEX IF EXISTS idx_sagas_status;
DROP TABLE IF EXISTS saga_steps;
DROP TABLE IF EXISTS sagas;

245
internal/domain/saga.go Normal file
View File

@ -0,0 +1,245 @@
package domain
import (
"time"
)
// SagaStatus represents the status of a saga execution.
type SagaStatus string
const (
// SagaStatusPending indicates the saga hasn't started yet.
SagaStatusPending SagaStatus = "pending"
// SagaStatusRunning indicates the saga is executing.
SagaStatusRunning SagaStatus = "running"
// SagaStatusCompleted indicates the saga finished successfully.
SagaStatusCompleted SagaStatus = "completed"
// SagaStatusFailed indicates the saga failed and may need compensation.
SagaStatusFailed SagaStatus = "failed"
// SagaStatusCompensating indicates compensation is in progress.
SagaStatusCompensating SagaStatus = "compensating"
// SagaStatusCompensated indicates compensation completed successfully.
SagaStatusCompensated SagaStatus = "compensated"
)
// IsValid returns true if the status is known.
func (s SagaStatus) IsValid() bool {
switch s {
case SagaStatusPending, SagaStatusRunning, SagaStatusCompleted,
SagaStatusFailed, SagaStatusCompensating, SagaStatusCompensated:
return true
}
return false
}
// IsTerminal returns true if the status is a final state.
func (s SagaStatus) IsTerminal() bool {
return s == SagaStatusCompleted || s == SagaStatusFailed || s == SagaStatusCompensated
}
// StepStatus represents the status of a saga step.
type StepStatus string
const (
StepStatusPending StepStatus = "pending"
StepStatusRunning StepStatus = "running"
StepStatusCompleted StepStatus = "completed"
StepStatusFailed StepStatus = "failed"
StepStatusSkipped StepStatus = "skipped"
)
// IsValid returns true if the status is known.
func (s StepStatus) IsValid() bool {
switch s {
case StepStatusPending, StepStatusRunning, StepStatusCompleted,
StepStatusFailed, StepStatusSkipped:
return true
}
return false
}
// IsTerminal returns true if the status is a final state.
func (s StepStatus) IsTerminal() bool {
return s == StepStatusCompleted || s == StepStatusFailed || s == StepStatusSkipped
}
// BackoffType represents the type of retry backoff.
type BackoffType string
const (
BackoffNone BackoffType = "none"
BackoffLinear BackoffType = "linear"
BackoffExponential BackoffType = "exponential"
)
// RetryPolicy defines how a step should be retried on failure.
type RetryPolicy struct {
// MaxAttempts is the maximum number of retry attempts (including initial).
MaxAttempts int `json:"max_attempts"`
// BackoffType is the type of backoff between retries.
BackoffType BackoffType `json:"backoff_type"`
// InitialDelay is the initial delay between retries.
InitialDelay time.Duration `json:"initial_delay"`
// MaxDelay is the maximum delay between retries.
MaxDelay time.Duration `json:"max_delay"`
}
// DefaultRetryPolicy returns a sensible default retry policy.
func DefaultRetryPolicy() RetryPolicy {
return RetryPolicy{
MaxAttempts: 3,
BackoffType: BackoffExponential,
InitialDelay: 5 * time.Second,
MaxDelay: 60 * time.Second,
}
}
// SagaStep represents a single step in a saga.
type SagaStep struct {
// ID is the unique identifier for this step instance.
ID string `json:"id"`
// SagaID is the saga this step belongs to.
SagaID string `json:"saga_id"`
// Name is the step name (unique within the saga).
Name string `json:"name"`
// Status is the current step status.
Status StepStatus `json:"status"`
// Action is the step action type (api, wait_pipeline, wait_build, shell).
Action string `json:"action"`
// DependsOn lists step names that must complete before this step can run.
DependsOn []string `json:"depends_on,omitempty"`
// RetryPolicy defines the retry behavior for this step.
RetryPolicy RetryPolicy `json:"retry_policy"`
// Compensate is the name of the compensation step to run on rollback.
Compensate string `json:"compensate,omitempty"`
// Config contains action-specific configuration.
Config map[string]any `json:"config,omitempty"`
// Output contains the step output after completion.
Output map[string]any `json:"output,omitempty"`
// Error contains the error message if the step failed.
Error string `json:"error,omitempty"`
// RetryCount is the number of times this step has been retried.
RetryCount int `json:"retry_count"`
// StartedAt is when the step started executing.
StartedAt *time.Time `json:"started_at,omitempty"`
// CompletedAt is when the step finished (success or failure).
CompletedAt *time.Time `json:"completed_at,omitempty"`
}
// CanRun returns true if this step is ready to execute.
func (s *SagaStep) CanRun(completedSteps map[string]bool) bool {
if s.Status != StepStatusPending {
return false
}
for _, dep := range s.DependsOn {
if !completedSteps[dep] {
return false
}
}
return true
}
// Saga represents a multi-step workflow with compensation support.
type Saga struct {
// ID is the unique identifier for this saga.
ID string `json:"id"`
// Name is the saga name (from the definition).
Name string `json:"name"`
// Status is the current saga status.
Status SagaStatus `json:"status"`
// Definition is the YAML definition used to create this saga.
Definition string `json:"definition,omitempty"`
// Vars contains template variables for step configuration.
Vars map[string]string `json:"vars,omitempty"`
// Outputs contains outputs from completed steps, keyed by step name.
Outputs map[string]map[string]any `json:"outputs,omitempty"`
// CurrentStep is the name of the currently executing step.
CurrentStep string `json:"current_step,omitempty"`
// RetryCount is the number of times the saga has been retried.
RetryCount int `json:"retry_count"`
// MaxRetries is the maximum number of saga-level retries.
MaxRetries int `json:"max_retries"`
// Error contains the error message if the saga failed.
Error string `json:"error,omitempty"`
// Steps contains all steps in this saga.
Steps []SagaStep `json:"steps,omitempty"`
// CreatedAt is when the saga was created.
CreatedAt time.Time `json:"created_at"`
// UpdatedAt is when the saga was last updated.
UpdatedAt time.Time `json:"updated_at"`
// CompletedAt is when the saga finished (success, failure, or compensation).
CompletedAt *time.Time `json:"completed_at,omitempty"`
}
// CompletedSteps returns a map of step names that have completed (successfully or skipped).
// Skipped steps are considered complete for dependency resolution purposes.
func (s *Saga) CompletedSteps() map[string]bool {
completed := make(map[string]bool)
for _, step := range s.Steps {
if step.Status == StepStatusCompleted || step.Status == StepStatusSkipped {
completed[step.Name] = true
}
}
return completed
}
// GetStep returns a step by name, or nil if not found.
func (s *Saga) GetStep(name string) *SagaStep {
for i := range s.Steps {
if s.Steps[i].Name == name {
return &s.Steps[i]
}
}
return nil
}
// FailedStep returns the first failed step, or nil if none failed.
func (s *Saga) FailedStep() *SagaStep {
for i := range s.Steps {
if s.Steps[i].Status == StepStatusFailed {
return &s.Steps[i]
}
}
return nil
}
// RunnableSteps returns steps that are ready to execute.
func (s *Saga) RunnableSteps() []SagaStep {
completed := s.CompletedSteps()
var runnable []SagaStep
for _, step := range s.Steps {
if step.CanRun(completed) {
runnable = append(runnable, step)
}
}
return runnable
}
// SagaFilters specifies criteria for listing sagas.
type SagaFilters struct {
// Name filters by saga name.
Name string
// Status filters by saga status.
Status SagaStatus
// Since filters sagas created after this time.
Since time.Time
// Limit is the maximum number of sagas to return.
Limit int
}
// DefaultSagaFilters returns filters with default values.
func DefaultSagaFilters() SagaFilters {
return SagaFilters{
Limit: 50,
}
}
// Normalize applies defaults and limits to the filters.
func (f *SagaFilters) Normalize() {
if f.Limit <= 0 {
f.Limit = 50
}
if f.Limit > 200 {
f.Limit = 200
}
}

View File

@ -22,6 +22,17 @@ const (
WebhookEventCommandFailed WebhookEventType = "command.failed"
WebhookEventPodReady WebhookEventType = "pod.ready"
WebhookEventPodFailed WebhookEventType = "pod.failed"
// Saga events
WebhookEventSagaStarted WebhookEventType = "saga.started"
WebhookEventSagaStepStarted WebhookEventType = "saga.step.started"
WebhookEventSagaStepCompleted WebhookEventType = "saga.step.completed"
WebhookEventSagaStepFailed WebhookEventType = "saga.step.failed"
WebhookEventSagaStepRetrying WebhookEventType = "saga.step.retrying"
WebhookEventSagaCompleted WebhookEventType = "saga.completed"
WebhookEventSagaFailed WebhookEventType = "saga.failed"
WebhookEventSagaCompensating WebhookEventType = "saga.compensating"
WebhookEventSagaCompensated WebhookEventType = "saga.compensated"
)
// AllWebhookEventTypes lists all valid webhook event types.
@ -31,6 +42,15 @@ var AllWebhookEventTypes = []WebhookEventType{
WebhookEventCommandFailed,
WebhookEventPodReady,
WebhookEventPodFailed,
WebhookEventSagaStarted,
WebhookEventSagaStepStarted,
WebhookEventSagaStepCompleted,
WebhookEventSagaStepFailed,
WebhookEventSagaStepRetrying,
WebhookEventSagaCompleted,
WebhookEventSagaFailed,
WebhookEventSagaCompensating,
WebhookEventSagaCompensated,
}
// IsValid checks if a webhook event type is valid.
@ -136,6 +156,18 @@ type PodEventData struct {
Timestamp time.Time `json:"timestamp"`
}
// SagaEventData is the data structure for saga-related webhook events.
type SagaEventData struct {
SagaID string `json:"saga_id"`
SagaName string `json:"saga_name"`
Status string `json:"status"`
StepName string `json:"step_name,omitempty"`
StepStatus string `json:"step_status,omitempty"`
RetryCount int `json:"retry_count,omitempty"`
Error string `json:"error,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
// WebhookFilters contains filter options for listing webhook deliveries.
type WebhookDeliveryFilters struct {
EventType *WebhookEventType // Filter by event type

View File

@ -8,6 +8,7 @@ import (
"strings"
"time"
"github.com/orchard9/rdev/internal/circuitbreaker"
"github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/metrics"
"github.com/orchard9/rdev/internal/port"
@ -306,3 +307,17 @@ type ReadinessResponse struct {
Service string `json:"service"`
Checks map[string]CheckResult `json:"checks,omitempty"`
}
// CircuitsResponse is the response for the /health/circuits endpoint.
type CircuitsResponse struct {
Circuits []circuitbreaker.StatusEntry `json:"circuits"`
}
// Circuits returns the status of all circuit breakers.
// GET /health/circuits
func (h *HealthHandler) Circuits(w http.ResponseWriter, r *http.Request) {
entries := circuitbreaker.GlobalRegistry.AllStatus()
api.WriteSuccess(w, r, CircuitsResponse{
Circuits: entries,
})
}

View File

@ -122,13 +122,15 @@ func (h *InfrastructureHandler) Mount(r api.Router) {
r.With(auth.RequireScope(auth.ScopeProjectsExecute, auth.ScopeAdmin)).
Delete("/projects/{id}/domains/{domain}", h.RemoveDomainAlias)
// CI pipeline endpoints (read-only)
// CI pipeline endpoints
r.With(auth.RequireScope(auth.ScopeProjectsRead, auth.ScopeAdmin)).
Get("/projects/{id}/pipelines", h.ListPipelines)
r.With(auth.RequireScope(auth.ScopeProjectsRead, auth.ScopeAdmin)).
Get("/projects/{id}/pipelines/{number}", h.GetPipeline)
r.With(auth.RequireScope(auth.ScopeProjectsRead, auth.ScopeAdmin)).
Get("/projects/{id}/pipelines/{number}/steps", h.GetPipelineSteps)
r.With(auth.RequireScope(auth.ScopeProjectsExecute, auth.ScopeAdmin)).
Post("/projects/{id}/pipelines/{number}/retry", h.RetryPipeline)
}
// CreateRepoRequest is the request body for POST /projects/{id}/repo.

View File

@ -214,3 +214,48 @@ func (h *InfrastructureHandler) GetPipelineSteps(w http.ResponseWriter, r *http.
Steps: respSteps,
})
}
// RetryPipeline restarts a failed or stopped pipeline.
// POST /projects/{id}/pipelines/{number}/retry
func (h *InfrastructureHandler) RetryPipeline(w http.ResponseWriter, r *http.Request) {
projectID := chi.URLParam(r, "id")
numberStr := chi.URLParam(r, "number")
ctx, cancel := context.WithTimeout(r.Context(), TimeoutStandard)
defer cancel()
if err := validateProjectID(projectID); err != nil {
api.WriteBadRequest(w, r, err.Error())
return
}
number, err := strconv.ParseInt(numberStr, 10, 64)
if err != nil {
api.WriteBadRequest(w, r, "invalid pipeline number")
return
}
if h.ciProvider == nil {
api.WriteInternalError(w, r, "CI provider not configured")
return
}
p, err := h.ciProvider.RetryPipeline(ctx, h.defaultGitOwner, projectID, number)
if err != nil {
api.WriteInternalError(w, r, fmt.Sprintf("failed to retry pipeline: %v", err))
return
}
api.WriteSuccess(w, r, PipelineResponse{
ID: p.ID,
Number: p.Number,
Status: p.Status,
Event: p.Event,
Branch: p.Branch,
Commit: p.Commit,
Message: p.Message,
Author: p.Author,
Started: formatTime(p.Started),
Finished: formatTime(p.Finished),
Errors: mapPipelineErrors(p.Errors),
})
}

View File

@ -96,6 +96,28 @@ func (m *mockCIProvider) GetPipelineSteps(_ context.Context, owner, repo string,
return nil, fmt.Errorf("pipeline %d not found", number)
}
func (m *mockCIProvider) RetryPipeline(_ context.Context, owner, repo string, number int64) (*domain.CIPipeline, error) {
if m.err != nil {
return nil, m.err
}
key := owner + "/" + repo
for _, p := range m.pipelines[key] {
if p.Number == number {
// Simulate retry by returning the pipeline with running status
return &domain.CIPipeline{
ID: p.ID,
Number: p.Number,
Status: "running",
Event: p.Event,
Branch: p.Branch,
Commit: p.Commit,
Author: p.Author,
}, nil
}
}
return nil, fmt.Errorf("pipeline %d not found", number)
}
func setupInfraHandlerWithCI(ci port.CIProvider) chi.Router {
h := NewInfrastructureHandler(nil, nil, nil, nil, ci, nil, InfrastructureConfig{
DefaultGitOwner: "threesix",

438
internal/handlers/saga.go Normal file
View File

@ -0,0 +1,438 @@
// Package handlers provides HTTP handlers for the rdev API.
package handlers
import (
"context"
"net/http"
"github.com/go-chi/chi/v5"
"github.com/orchard9/rdev/internal/adapter/postgres"
"github.com/orchard9/rdev/internal/auth"
"github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/port"
"github.com/orchard9/rdev/internal/validate"
"github.com/orchard9/rdev/pkg/api"
)
// SagaHandler handles saga API endpoints.
type SagaHandler struct {
repo port.SagaRepository
executor port.SagaExecutor
}
// NewSagaHandler creates a new saga handler.
func NewSagaHandler(repo port.SagaRepository, executor port.SagaExecutor) *SagaHandler {
return &SagaHandler{
repo: repo,
executor: executor,
}
}
// Mount registers the saga routes.
func (h *SagaHandler) Mount(r api.Router) {
r.Route("/sagas", func(r chi.Router) {
// Create and list sagas
r.With(auth.RequireScope(auth.ScopeProjectsExecute, auth.ScopeAdmin)).Post("/", h.Create)
r.With(auth.RequireScope(auth.ScopeProjectsRead, auth.ScopeAdmin)).Get("/", h.List)
// Single saga operations
r.With(auth.RequireScope(auth.ScopeProjectsRead, auth.ScopeAdmin)).Get("/{id}", h.Get)
r.With(auth.RequireScope(auth.ScopeProjectsExecute, auth.ScopeAdmin)).Delete("/{id}", h.Delete)
// Saga control operations
r.With(auth.RequireScope(auth.ScopeProjectsExecute, auth.ScopeAdmin)).Post("/{id}/retry", h.Retry)
r.With(auth.RequireScope(auth.ScopeProjectsExecute, auth.ScopeAdmin)).Post("/{id}/rollback", h.Rollback)
// Step operations
r.With(auth.RequireScope(auth.ScopeProjectsExecute, auth.ScopeAdmin)).Post("/{id}/steps/{step}/retry", h.RetryStep)
r.With(auth.RequireScope(auth.ScopeProjectsExecute, auth.ScopeAdmin)).Post("/{id}/steps/{step}/skip", h.SkipStep)
})
}
// CreateSagaRequest is the request body for POST /sagas.
type CreateSagaRequest struct {
Name string `json:"name"`
Definition string `json:"definition,omitempty"`
Vars map[string]string `json:"vars,omitempty"`
MaxRetries int `json:"max_retries,omitempty"`
Steps []CreateStepSpec `json:"steps,omitempty"`
}
// CreateStepSpec specifies a step in the saga.
type CreateStepSpec struct {
Name string `json:"name"`
Action string `json:"action"`
DependsOn []string `json:"depends_on,omitempty"`
Compensate string `json:"compensate,omitempty"`
Config map[string]any `json:"config,omitempty"`
RetryPolicy *domain.RetryPolicy `json:"retry_policy,omitempty"`
}
// SagaResponse is the API response for a saga.
type SagaResponse struct {
ID string `json:"id"`
Name string `json:"name"`
Status string `json:"status"`
CurrentStep string `json:"current_step,omitempty"`
RetryCount int `json:"retry_count"`
MaxRetries int `json:"max_retries"`
Error string `json:"error,omitempty"`
Steps []StepResponse `json:"steps,omitempty"`
Outputs map[string]map[string]any `json:"outputs,omitempty"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
CompletedAt string `json:"completed_at,omitempty"`
}
// StepResponse is the API response for a saga step.
type StepResponse struct {
ID string `json:"id"`
Name string `json:"name"`
Status string `json:"status"`
Action string `json:"action"`
DependsOn []string `json:"depends_on,omitempty"`
Compensate string `json:"compensate,omitempty"`
RetryCount int `json:"retry_count"`
Output map[string]any `json:"output,omitempty"`
Error string `json:"error,omitempty"`
StartedAt string `json:"started_at,omitempty"`
CompletedAt string `json:"completed_at,omitempty"`
}
// Create creates and starts a new saga.
// POST /sagas
func (h *SagaHandler) Create(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), TimeoutOrchestration)
defer cancel()
var req CreateSagaRequest
if err := api.DecodeJSON(r, &req); err != nil {
api.WriteBadRequest(w, r, "invalid request body")
return
}
v := validate.New()
v.Required(req.Name, "name")
if len(req.Steps) > 0 {
for i, step := range req.Steps {
v.Required(step.Name, "steps["+string(rune('0'+i))+"].name")
v.Required(step.Action, "steps["+string(rune('0'+i))+"].action")
}
}
if err := v.Error(); err != nil {
api.WriteBadRequest(w, r, err.Error())
return
}
// Build saga from request
saga := &domain.Saga{
Name: req.Name,
Status: domain.SagaStatusPending,
Definition: req.Definition,
Vars: req.Vars,
MaxRetries: req.MaxRetries,
Outputs: make(map[string]map[string]any),
}
if saga.MaxRetries <= 0 {
saga.MaxRetries = 3
}
// Build steps
for _, spec := range req.Steps {
step := domain.SagaStep{
Name: spec.Name,
Status: domain.StepStatusPending,
Action: spec.Action,
DependsOn: spec.DependsOn,
Compensate: spec.Compensate,
Config: spec.Config,
}
if spec.RetryPolicy != nil {
step.RetryPolicy = *spec.RetryPolicy
} else {
step.RetryPolicy = domain.DefaultRetryPolicy()
}
saga.Steps = append(saga.Steps, step)
}
// Create saga in database
if err := h.repo.Create(ctx, saga); err != nil {
api.WriteInternalError(w, r, "failed to create saga: "+err.Error())
return
}
// Start execution asynchronously (don't wait for completion)
go func() {
execCtx := context.WithoutCancel(ctx)
if err := h.executor.Execute(execCtx, saga); err != nil {
// Error is already recorded in the saga
_ = err
}
}()
api.WriteCreated(w, r, sagaToResponse(saga))
}
// Get returns a saga by ID.
// GET /sagas/{id}
func (h *SagaHandler) Get(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), TimeoutLookup)
defer cancel()
id := chi.URLParam(r, "id")
if id == "" {
api.WriteBadRequest(w, r, "id is required")
return
}
saga, err := h.repo.Get(ctx, id)
if err != nil {
if err == postgres.ErrSagaNotFound {
api.WriteNotFound(w, r, "saga not found")
return
}
api.WriteInternalError(w, r, "failed to get saga: "+err.Error())
return
}
api.WriteSuccess(w, r, sagaToResponse(saga))
}
// List returns sagas matching filters.
// GET /sagas
func (h *SagaHandler) List(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), TimeoutLookup)
defer cancel()
filters := domain.SagaFilters{
Name: r.URL.Query().Get("name"),
Status: domain.SagaStatus(r.URL.Query().Get("status")),
}
filters.Normalize()
sagas, err := h.repo.List(ctx, filters)
if err != nil {
api.WriteInternalError(w, r, "failed to list sagas: "+err.Error())
return
}
resp := make([]SagaResponse, len(sagas))
for i, saga := range sagas {
resp[i] = sagaToResponse(saga)
}
api.WriteSuccess(w, r, resp)
}
// Delete removes a saga.
// DELETE /sagas/{id}
func (h *SagaHandler) Delete(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), TimeoutStandard)
defer cancel()
id := chi.URLParam(r, "id")
if id == "" {
api.WriteBadRequest(w, r, "id is required")
return
}
if err := h.repo.Delete(ctx, id); err != nil {
if err == postgres.ErrSagaNotFound {
api.WriteNotFound(w, r, "saga not found")
return
}
api.WriteInternalError(w, r, "failed to delete saga: "+err.Error())
return
}
api.WriteSuccess(w, r, map[string]string{
"status": "deleted",
"id": id,
})
}
// Retry resumes a failed saga from the last failed step.
// POST /sagas/{id}/retry
func (h *SagaHandler) Retry(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), TimeoutOrchestration)
defer cancel()
id := chi.URLParam(r, "id")
if id == "" {
api.WriteBadRequest(w, r, "id is required")
return
}
// Start resumption asynchronously
go func() {
execCtx := context.WithoutCancel(ctx)
if err := h.executor.Resume(execCtx, id); err != nil {
// Error is already recorded in the saga
_ = err
}
}()
// Return current saga state
saga, err := h.repo.Get(ctx, id)
if err != nil {
if err == postgres.ErrSagaNotFound {
api.WriteNotFound(w, r, "saga not found")
return
}
api.WriteInternalError(w, r, "failed to get saga: "+err.Error())
return
}
api.WriteSuccess(w, r, sagaToResponse(saga))
}
// Rollback triggers compensation for a failed saga.
// POST /sagas/{id}/rollback
func (h *SagaHandler) Rollback(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), TimeoutOrchestration)
defer cancel()
id := chi.URLParam(r, "id")
if id == "" {
api.WriteBadRequest(w, r, "id is required")
return
}
// Start compensation asynchronously
go func() {
execCtx := context.WithoutCancel(ctx)
if err := h.executor.Compensate(execCtx, id); err != nil {
// Error is already recorded in the saga
_ = err
}
}()
// Return current saga state
saga, err := h.repo.Get(ctx, id)
if err != nil {
if err == postgres.ErrSagaNotFound {
api.WriteNotFound(w, r, "saga not found")
return
}
api.WriteInternalError(w, r, "failed to get saga: "+err.Error())
return
}
api.WriteSuccess(w, r, sagaToResponse(saga))
}
// RetryStep retries a specific failed step.
// POST /sagas/{id}/steps/{step}/retry
func (h *SagaHandler) RetryStep(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), TimeoutOrchestration)
defer cancel()
id := chi.URLParam(r, "id")
step := chi.URLParam(r, "step")
if id == "" || step == "" {
api.WriteBadRequest(w, r, "id and step are required")
return
}
// Start retry asynchronously
go func() {
execCtx := context.WithoutCancel(ctx)
if err := h.executor.RetryStep(execCtx, id, step); err != nil {
// Error is already recorded in the saga
_ = err
}
}()
// Return current saga state
saga, err := h.repo.Get(ctx, id)
if err != nil {
if err == postgres.ErrSagaNotFound {
api.WriteNotFound(w, r, "saga not found")
return
}
api.WriteInternalError(w, r, "failed to get saga: "+err.Error())
return
}
api.WriteSuccess(w, r, sagaToResponse(saga))
}
// SkipStep skips a step and continues execution.
// POST /sagas/{id}/steps/{step}/skip
func (h *SagaHandler) SkipStep(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), TimeoutOrchestration)
defer cancel()
id := chi.URLParam(r, "id")
step := chi.URLParam(r, "step")
if id == "" || step == "" {
api.WriteBadRequest(w, r, "id and step are required")
return
}
// Start skip and continue asynchronously
go func() {
execCtx := context.WithoutCancel(ctx)
if err := h.executor.SkipStep(execCtx, id, step); err != nil {
// Error is already recorded in the saga
_ = err
}
}()
// Return current saga state
saga, err := h.repo.Get(ctx, id)
if err != nil {
if err == postgres.ErrSagaNotFound {
api.WriteNotFound(w, r, "saga not found")
return
}
api.WriteInternalError(w, r, "failed to get saga: "+err.Error())
return
}
api.WriteSuccess(w, r, sagaToResponse(saga))
}
// sagaToResponse converts a domain saga to an API response.
func sagaToResponse(saga *domain.Saga) SagaResponse {
resp := SagaResponse{
ID: saga.ID,
Name: saga.Name,
Status: string(saga.Status),
CurrentStep: saga.CurrentStep,
RetryCount: saga.RetryCount,
MaxRetries: saga.MaxRetries,
Error: saga.Error,
Outputs: saga.Outputs,
CreatedAt: saga.CreatedAt.Format("2006-01-02T15:04:05Z07:00"),
UpdatedAt: saga.UpdatedAt.Format("2006-01-02T15:04:05Z07:00"),
}
if saga.CompletedAt != nil {
resp.CompletedAt = saga.CompletedAt.Format("2006-01-02T15:04:05Z07:00")
}
for _, step := range saga.Steps {
stepResp := StepResponse{
ID: step.ID,
Name: step.Name,
Status: string(step.Status),
Action: step.Action,
DependsOn: step.DependsOn,
Compensate: step.Compensate,
RetryCount: step.RetryCount,
Output: step.Output,
Error: step.Error,
}
if step.StartedAt != nil {
stepResp.StartedAt = step.StartedAt.Format("2006-01-02T15:04:05Z07:00")
}
if step.CompletedAt != nil {
stepResp.CompletedAt = step.CompletedAt.Format("2006-01-02T15:04:05Z07:00")
}
resp.Steps = append(resp.Steps, stepResp)
}
return resp
}

View File

@ -69,4 +69,9 @@ const (
FieldAuditResource = "audit_resource"
FieldAuditResult = "audit_result"
FieldAuditDetails = "audit_details"
// Saga context
FieldSagaID = "saga_id"
FieldSagaName = "saga_name"
FieldStepName = "step_name"
)

View File

@ -158,6 +158,28 @@ var (
Name: "rdev_external_system_last_check_timestamp",
Help: "Unix timestamp of last health check",
}, []string{"system"})
// Saga metrics
sagaTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "rdev_saga_total",
Help: "Total number of sagas by name and final status",
}, []string{"name", "status"})
sagaStepDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "rdev_saga_step_duration_seconds",
Help: "Duration of saga step execution in seconds",
Buckets: prometheus.ExponentialBuckets(0.1, 2, 15), // 0.1s to ~27min
}, []string{"saga", "step", "action"})
sagaRetryTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "rdev_saga_retry_total",
Help: "Total number of saga step retries",
}, []string{"saga", "step"})
circuitBreakerState = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "rdev_circuit_breaker_state",
Help: "Circuit breaker state: 0=closed, 1=half-open, 2=open",
}, []string{"name"})
)
// RecordCommand records a command execution.
@ -275,6 +297,27 @@ func SetExternalSystemHealth(system string, healthy bool, latencySeconds float64
externalSystemLastCheck.WithLabelValues(system).Set(float64(time.Now().Unix()))
}
// RecordSaga records a saga completion.
func RecordSaga(name, status string) {
sagaTotal.WithLabelValues(name, status).Inc()
}
// RecordSagaStepDuration records the duration of a saga step.
func RecordSagaStepDuration(saga, step, action string, durationMs int64) {
sagaStepDuration.WithLabelValues(saga, step, action).Observe(float64(durationMs) / 1000.0)
}
// RecordSagaRetry records a saga step retry.
func RecordSagaRetry(saga, step string) {
sagaRetryTotal.WithLabelValues(saga, step).Inc()
}
// SetCircuitBreakerState sets the circuit breaker state metric.
// state: 0=closed, 1=half-open, 2=open
func SetCircuitBreakerState(name string, state int) {
circuitBreakerState.WithLabelValues(name).Set(float64(state))
}
// Handler returns the Prometheus HTTP handler.
func Handler() http.Handler {
return promhttp.Handler()

View File

@ -43,4 +43,8 @@ type CIProvider interface {
// TriggerBuild manually starts a new pipeline build on the specified branch.
// Returns the pipeline number of the triggered build.
TriggerBuild(ctx context.Context, owner, repo, branch string) (int64, error)
// RetryPipeline restarts a failed or stopped pipeline.
// Returns the restarted pipeline information.
RetryPipeline(ctx context.Context, owner, repo string, number int64) (*domain.CIPipeline, error)
}

50
internal/port/saga.go Normal file
View File

@ -0,0 +1,50 @@
// Package port defines interfaces (ports) for external dependencies.
package port
import (
"context"
"github.com/orchard9/rdev/internal/domain"
)
// SagaRepository manages saga persistence.
type SagaRepository interface {
// Create creates a new saga with its steps.
Create(ctx context.Context, saga *domain.Saga) error
// Get returns a saga by ID, including all steps.
Get(ctx context.Context, id string) (*domain.Saga, error)
// Update updates a saga's status and metadata (not steps).
Update(ctx context.Context, saga *domain.Saga) error
// UpdateStep updates a single step's status and output.
UpdateStep(ctx context.Context, step *domain.SagaStep) error
// List returns sagas matching the given filters.
List(ctx context.Context, filters domain.SagaFilters) ([]*domain.Saga, error)
// Delete removes a saga and its steps.
Delete(ctx context.Context, id string) error
// GetPendingSteps returns steps ready to execute (no unmet dependencies).
GetPendingSteps(ctx context.Context, sagaID string) ([]domain.SagaStep, error)
}
// SagaExecutor executes saga workflows.
type SagaExecutor interface {
// Execute runs a saga from the beginning.
Execute(ctx context.Context, saga *domain.Saga) error
// Resume continues execution of a paused or failed saga.
Resume(ctx context.Context, sagaID string) error
// Compensate runs compensation steps for a failed saga.
Compensate(ctx context.Context, sagaID string) error
// RetryStep retries a specific failed step.
RetryStep(ctx context.Context, sagaID, stepName string) error
// SkipStep skips a step and continues execution.
SkipStep(ctx context.Context, sagaID, stepName string) error
}

View File

@ -130,6 +130,10 @@ func (m *mockCIProvider) TriggerBuild(_ context.Context, _, _, _ string) (int64,
return 0, nil
}
func (m *mockCIProvider) RetryPipeline(_ context.Context, _, _ string, _ int64) (*domain.CIPipeline, error) {
return nil, nil
}
func TestDiagnosticsService_GetDiagnostics_Healthy(t *testing.T) {
opRepo := &mockOperationRepo{
operations: []*domain.Operation{

View File

@ -0,0 +1,438 @@
// Package service provides business logic orchestration.
package service
import (
"context"
"errors"
"fmt"
"log/slog"
"time"
"github.com/orchard9/rdev/internal/circuitbreaker"
"github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/logging"
"github.com/orchard9/rdev/internal/port"
)
// SagaExecutor executes saga workflows with retry and compensation support.
type SagaExecutor struct {
repo port.SagaRepository
cbRegistry *circuitbreaker.Registry
logger *slog.Logger
// Action handlers registered by action type
handlers map[string]SagaStepHandler
}
// SagaStepHandler executes a saga step action.
type SagaStepHandler interface {
// Execute runs the step action and returns the output.
// The context should be used for cancellation.
Execute(ctx context.Context, step *domain.SagaStep, saga *domain.Saga) (map[string]any, error)
}
// SagaStepHandlerFunc is an adapter for using functions as SagaStepHandler.
type SagaStepHandlerFunc func(ctx context.Context, step *domain.SagaStep, saga *domain.Saga) (map[string]any, error)
// Execute implements SagaStepHandler.
func (f SagaStepHandlerFunc) Execute(ctx context.Context, step *domain.SagaStep, saga *domain.Saga) (map[string]any, error) {
return f(ctx, step, saga)
}
// NewSagaExecutor creates a new saga executor.
func NewSagaExecutor(repo port.SagaRepository, logger *slog.Logger) *SagaExecutor {
if logger == nil {
logger = slog.Default()
}
return &SagaExecutor{
repo: repo,
cbRegistry: circuitbreaker.GlobalRegistry,
logger: logger,
handlers: make(map[string]SagaStepHandler),
}
}
// RegisterHandler registers a handler for a step action type.
func (e *SagaExecutor) RegisterHandler(action string, handler SagaStepHandler) {
e.handlers[action] = handler
}
// Ensure SagaExecutor implements port.SagaExecutor.
var _ port.SagaExecutor = (*SagaExecutor)(nil)
// Execute runs a saga from the beginning.
func (e *SagaExecutor) Execute(ctx context.Context, saga *domain.Saga) error {
logger := e.logger.With(
logging.FieldSagaID, saga.ID,
logging.FieldSagaName, saga.Name,
)
// Mark saga as running
saga.Status = domain.SagaStatusRunning
if err := e.repo.Update(ctx, saga); err != nil {
return fmt.Errorf("update saga status: %w", err)
}
logger.Info("saga execution started")
// Execute steps until all complete or one fails
return e.runSteps(ctx, saga)
}
// Resume continues execution of a paused or failed saga.
func (e *SagaExecutor) Resume(ctx context.Context, sagaID string) error {
saga, err := e.repo.Get(ctx, sagaID)
if err != nil {
return fmt.Errorf("get saga: %w", err)
}
logger := e.logger.With(
logging.FieldSagaID, saga.ID,
logging.FieldSagaName, saga.Name,
)
// Only resume if saga is in a resumable state
if saga.Status != domain.SagaStatusFailed && saga.Status != domain.SagaStatusRunning {
return fmt.Errorf("saga status %s is not resumable", saga.Status)
}
// Increment retry count if resuming from failed state
if saga.Status == domain.SagaStatusFailed {
saga.RetryCount++
if saga.RetryCount > saga.MaxRetries {
return fmt.Errorf("saga exceeded max retries (%d)", saga.MaxRetries)
}
}
saga.Status = domain.SagaStatusRunning
saga.Error = ""
if err := e.repo.Update(ctx, saga); err != nil {
return fmt.Errorf("update saga status: %w", err)
}
logger.Info("saga execution resumed", "retry_count", saga.RetryCount)
return e.runSteps(ctx, saga)
}
// Compensate runs compensation steps for a failed saga.
func (e *SagaExecutor) Compensate(ctx context.Context, sagaID string) error {
saga, err := e.repo.Get(ctx, sagaID)
if err != nil {
return fmt.Errorf("get saga: %w", err)
}
logger := e.logger.With(
logging.FieldSagaID, saga.ID,
logging.FieldSagaName, saga.Name,
)
if saga.Status != domain.SagaStatusFailed {
return fmt.Errorf("can only compensate failed sagas (status: %s)", saga.Status)
}
saga.Status = domain.SagaStatusCompensating
if err := e.repo.Update(ctx, saga); err != nil {
return fmt.Errorf("update saga status: %w", err)
}
logger.Info("saga compensation started")
// Run compensation steps in reverse order of completed steps
if err := e.runCompensation(ctx, saga); err != nil {
logger.Error("saga compensation failed", logging.FieldError, err)
return err
}
saga.Status = domain.SagaStatusCompensated
now := time.Now()
saga.CompletedAt = &now
if err := e.repo.Update(ctx, saga); err != nil {
return fmt.Errorf("update saga status: %w", err)
}
logger.Info("saga compensation completed")
return nil
}
// RetryStep retries a specific failed step.
func (e *SagaExecutor) RetryStep(ctx context.Context, sagaID, stepName string) error {
saga, err := e.repo.Get(ctx, sagaID)
if err != nil {
return fmt.Errorf("get saga: %w", err)
}
step := saga.GetStep(stepName)
if step == nil {
return fmt.Errorf("step %s not found", stepName)
}
if step.Status != domain.StepStatusFailed {
return fmt.Errorf("step %s is not failed (status: %s)", stepName, step.Status)
}
logger := e.logger.With(
logging.FieldSagaID, saga.ID,
logging.FieldSagaName, saga.Name,
logging.FieldStepName, stepName,
)
// Reset step to pending
step.Status = domain.StepStatusPending
step.Error = ""
step.RetryCount++
if err := e.repo.UpdateStep(ctx, step); err != nil {
return fmt.Errorf("update step: %w", err)
}
// If saga was failed, set it back to running
if saga.Status == domain.SagaStatusFailed {
saga.Status = domain.SagaStatusRunning
saga.Error = ""
if err := e.repo.Update(ctx, saga); err != nil {
return fmt.Errorf("update saga: %w", err)
}
}
logger.Info("retrying step", "retry_count", step.RetryCount)
// Continue execution
return e.runSteps(ctx, saga)
}
// SkipStep skips a step and continues execution.
func (e *SagaExecutor) SkipStep(ctx context.Context, sagaID, stepName string) error {
saga, err := e.repo.Get(ctx, sagaID)
if err != nil {
return fmt.Errorf("get saga: %w", err)
}
step := saga.GetStep(stepName)
if step == nil {
return fmt.Errorf("step %s not found", stepName)
}
if step.Status != domain.StepStatusFailed && step.Status != domain.StepStatusPending {
return fmt.Errorf("can only skip failed or pending steps (status: %s)", step.Status)
}
logger := e.logger.With(
logging.FieldSagaID, saga.ID,
logging.FieldSagaName, saga.Name,
logging.FieldStepName, stepName,
)
// Mark step as skipped
step.Status = domain.StepStatusSkipped
now := time.Now()
step.CompletedAt = &now
if err := e.repo.UpdateStep(ctx, step); err != nil {
return fmt.Errorf("update step: %w", err)
}
// If saga was failed, set it back to running
if saga.Status == domain.SagaStatusFailed {
saga.Status = domain.SagaStatusRunning
saga.Error = ""
if err := e.repo.Update(ctx, saga); err != nil {
return fmt.Errorf("update saga: %w", err)
}
}
logger.Info("step skipped")
// Continue execution
return e.runSteps(ctx, saga)
}
// runSteps executes pending steps until all complete or one fails.
func (e *SagaExecutor) runSteps(ctx context.Context, saga *domain.Saga) error {
for {
// Refresh saga state
saga, err := e.repo.Get(ctx, saga.ID)
if err != nil {
return fmt.Errorf("refresh saga: %w", err)
}
// Get runnable steps
runnable := saga.RunnableSteps()
if len(runnable) == 0 {
// No more runnable steps
break
}
// Execute each runnable step (could parallelize in future)
for _, step := range runnable {
if err := e.executeStep(ctx, saga, &step); err != nil {
// Step failed - saga is now failed
saga.Status = domain.SagaStatusFailed
saga.Error = err.Error()
if updateErr := e.repo.Update(ctx, saga); updateErr != nil {
e.logger.Error("failed to update saga status", logging.FieldError, updateErr)
}
return err
}
}
}
// All steps completed (or skipped)
saga.Status = domain.SagaStatusCompleted
now := time.Now()
saga.CompletedAt = &now
if err := e.repo.Update(ctx, saga); err != nil {
return fmt.Errorf("update saga status: %w", err)
}
e.logger.Info("saga completed",
logging.FieldSagaID, saga.ID,
logging.FieldSagaName, saga.Name,
)
return nil
}
// executeStep executes a single step with retry logic.
func (e *SagaExecutor) executeStep(ctx context.Context, saga *domain.Saga, step *domain.SagaStep) error {
logger := e.logger.With(
logging.FieldSagaID, saga.ID,
logging.FieldSagaName, saga.Name,
logging.FieldStepName, step.Name,
)
handler, ok := e.handlers[step.Action]
if !ok {
return fmt.Errorf("no handler registered for action %s", step.Action)
}
// Update saga current step
saga.CurrentStep = step.Name
if err := e.repo.Update(ctx, saga); err != nil {
return fmt.Errorf("update current step: %w", err)
}
// Mark step as running
step.Status = domain.StepStatusRunning
now := time.Now()
step.StartedAt = &now
if err := e.repo.UpdateStep(ctx, step); err != nil {
return fmt.Errorf("update step status: %w", err)
}
logger.Info("step started")
// Execute with retry logic
var output map[string]any
var execErr error
for attempt := 0; attempt <= step.RetryPolicy.MaxAttempts; attempt++ {
if attempt > 0 {
delay := e.calculateBackoff(step.RetryPolicy, attempt)
logger.Info("retrying step", "attempt", attempt, "delay", delay)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(delay):
}
}
output, execErr = handler.Execute(ctx, step, saga)
if execErr == nil {
break
}
// Check if error is from circuit breaker
if errors.Is(execErr, circuitbreaker.ErrCircuitOpen) {
logger.Warn("circuit breaker open, will retry", "attempt", attempt)
continue
}
logger.Warn("step execution failed", "attempt", attempt, logging.FieldError, execErr)
}
endTime := time.Now()
step.CompletedAt = &endTime
if execErr != nil {
step.Status = domain.StepStatusFailed
step.Error = execErr.Error()
if err := e.repo.UpdateStep(ctx, step); err != nil {
logger.Error("failed to update step status", logging.FieldError, err)
}
logger.Error("step failed", logging.FieldError, execErr)
return fmt.Errorf("step %s failed: %w", step.Name, execErr)
}
// Step succeeded
step.Status = domain.StepStatusCompleted
step.Output = output
if err := e.repo.UpdateStep(ctx, step); err != nil {
return fmt.Errorf("update step status: %w", err)
}
// Store output in saga outputs
if saga.Outputs == nil {
saga.Outputs = make(map[string]map[string]any)
}
saga.Outputs[step.Name] = output
if err := e.repo.Update(ctx, saga); err != nil {
return fmt.Errorf("update saga outputs: %w", err)
}
logger.Info("step completed", "duration_ms", endTime.Sub(*step.StartedAt).Milliseconds())
return nil
}
// runCompensation runs compensation steps in reverse order.
func (e *SagaExecutor) runCompensation(ctx context.Context, saga *domain.Saga) error {
// Get completed steps in reverse order
var completedSteps []domain.SagaStep
for _, step := range saga.Steps {
if step.Status == domain.StepStatusCompleted && step.Compensate != "" {
completedSteps = append(completedSteps, step)
}
}
// Reverse order (compensation runs in reverse of completion)
for i, j := 0, len(completedSteps)-1; i < j; i, j = i+1, j-1 {
completedSteps[i], completedSteps[j] = completedSteps[j], completedSteps[i]
}
// Run compensation for each
for _, step := range completedSteps {
compStep := saga.GetStep(step.Compensate)
if compStep == nil {
e.logger.Warn("compensation step not found",
logging.FieldSagaID, saga.ID,
logging.FieldStepName, step.Compensate,
)
continue
}
if err := e.executeStep(ctx, saga, compStep); err != nil {
return fmt.Errorf("compensation step %s failed: %w", step.Compensate, err)
}
}
return nil
}
// calculateBackoff calculates the delay before the next retry.
func (e *SagaExecutor) calculateBackoff(policy domain.RetryPolicy, attempt int) time.Duration {
switch policy.BackoffType {
case domain.BackoffNone:
return policy.InitialDelay
case domain.BackoffLinear:
delay := policy.InitialDelay * time.Duration(attempt)
if delay > policy.MaxDelay {
return policy.MaxDelay
}
return delay
case domain.BackoffExponential:
delay := policy.InitialDelay * time.Duration(1<<uint(attempt-1))
if delay > policy.MaxDelay {
return policy.MaxDelay
}
return delay
default:
return policy.InitialDelay
}
}