rdev/internal/service/saga_executor.go
jordan f20fc6c51c
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
feat(saga): implement enterprise-grade resilience architecture
Fixes issues from code review of resilience implementation:

- Wire saga system in main.go (SagaRepository, SagaExecutor, SagaHandler)
- Fix CompletedSteps() to include skipped steps for dependency resolution
- Fix reverse loop bug in saga compensation (use standard swap pattern)
- Add circuit breaker state change callbacks for Prometheus metrics

Phase 1 (Build Resilience):
- Add failure:retry to all component Kaniko build steps
- Add preflight registry health check before builds
- Add services-deployed sync point to decouple docs from critical path

Phase 2 (API Resilience):
- Add pipeline retry endpoint (POST /projects/{id}/pipelines/{number}/retry)
- Wire circuit breakers with metrics callbacks
- Add /health/circuits endpoint for circuit breaker status

Phase 3 (Saga Engine):
- Full domain model (Saga, SagaStep, RetryPolicy, BackoffType)
- PostgreSQL saga repository with CRUD and step management
- Saga executor with retry, compensation, skip step support
- Saga API handlers with CRUD and control operations

Phase 4 (Observability):
- Add saga metrics (total, step_duration, retry, circuit_breaker_state)
- Add logging fields (saga_id, saga_name, step_name)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-08 01:58:02 -07:00

439 lines
12 KiB
Go

// Package service provides business logic orchestration.
package service
import (
"context"
"errors"
"fmt"
"log/slog"
"time"
"github.com/orchard9/rdev/internal/circuitbreaker"
"github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/logging"
"github.com/orchard9/rdev/internal/port"
)
// SagaExecutor executes saga workflows with retry and compensation support.
type SagaExecutor struct {
repo port.SagaRepository
cbRegistry *circuitbreaker.Registry
logger *slog.Logger
// Action handlers registered by action type
handlers map[string]SagaStepHandler
}
// SagaStepHandler executes a saga step action.
type SagaStepHandler interface {
// Execute runs the step action and returns the output.
// The context should be used for cancellation.
Execute(ctx context.Context, step *domain.SagaStep, saga *domain.Saga) (map[string]any, error)
}
// SagaStepHandlerFunc is an adapter for using functions as SagaStepHandler.
type SagaStepHandlerFunc func(ctx context.Context, step *domain.SagaStep, saga *domain.Saga) (map[string]any, error)
// Execute implements SagaStepHandler.
func (f SagaStepHandlerFunc) Execute(ctx context.Context, step *domain.SagaStep, saga *domain.Saga) (map[string]any, error) {
return f(ctx, step, saga)
}
// NewSagaExecutor creates a new saga executor.
func NewSagaExecutor(repo port.SagaRepository, logger *slog.Logger) *SagaExecutor {
if logger == nil {
logger = slog.Default()
}
return &SagaExecutor{
repo: repo,
cbRegistry: circuitbreaker.GlobalRegistry,
logger: logger,
handlers: make(map[string]SagaStepHandler),
}
}
// RegisterHandler registers a handler for a step action type.
func (e *SagaExecutor) RegisterHandler(action string, handler SagaStepHandler) {
e.handlers[action] = handler
}
// Ensure SagaExecutor implements port.SagaExecutor.
var _ port.SagaExecutor = (*SagaExecutor)(nil)
// Execute runs a saga from the beginning.
func (e *SagaExecutor) Execute(ctx context.Context, saga *domain.Saga) error {
logger := e.logger.With(
logging.FieldSagaID, saga.ID,
logging.FieldSagaName, saga.Name,
)
// Mark saga as running
saga.Status = domain.SagaStatusRunning
if err := e.repo.Update(ctx, saga); err != nil {
return fmt.Errorf("update saga status: %w", err)
}
logger.Info("saga execution started")
// Execute steps until all complete or one fails
return e.runSteps(ctx, saga)
}
// Resume continues execution of a paused or failed saga.
func (e *SagaExecutor) Resume(ctx context.Context, sagaID string) error {
saga, err := e.repo.Get(ctx, sagaID)
if err != nil {
return fmt.Errorf("get saga: %w", err)
}
logger := e.logger.With(
logging.FieldSagaID, saga.ID,
logging.FieldSagaName, saga.Name,
)
// Only resume if saga is in a resumable state
if saga.Status != domain.SagaStatusFailed && saga.Status != domain.SagaStatusRunning {
return fmt.Errorf("saga status %s is not resumable", saga.Status)
}
// Increment retry count if resuming from failed state
if saga.Status == domain.SagaStatusFailed {
saga.RetryCount++
if saga.RetryCount > saga.MaxRetries {
return fmt.Errorf("saga exceeded max retries (%d)", saga.MaxRetries)
}
}
saga.Status = domain.SagaStatusRunning
saga.Error = ""
if err := e.repo.Update(ctx, saga); err != nil {
return fmt.Errorf("update saga status: %w", err)
}
logger.Info("saga execution resumed", "retry_count", saga.RetryCount)
return e.runSteps(ctx, saga)
}
// Compensate runs compensation steps for a failed saga.
func (e *SagaExecutor) Compensate(ctx context.Context, sagaID string) error {
saga, err := e.repo.Get(ctx, sagaID)
if err != nil {
return fmt.Errorf("get saga: %w", err)
}
logger := e.logger.With(
logging.FieldSagaID, saga.ID,
logging.FieldSagaName, saga.Name,
)
if saga.Status != domain.SagaStatusFailed {
return fmt.Errorf("can only compensate failed sagas (status: %s)", saga.Status)
}
saga.Status = domain.SagaStatusCompensating
if err := e.repo.Update(ctx, saga); err != nil {
return fmt.Errorf("update saga status: %w", err)
}
logger.Info("saga compensation started")
// Run compensation steps in reverse order of completed steps
if err := e.runCompensation(ctx, saga); err != nil {
logger.Error("saga compensation failed", logging.FieldError, err)
return err
}
saga.Status = domain.SagaStatusCompensated
now := time.Now()
saga.CompletedAt = &now
if err := e.repo.Update(ctx, saga); err != nil {
return fmt.Errorf("update saga status: %w", err)
}
logger.Info("saga compensation completed")
return nil
}
// RetryStep retries a specific failed step.
func (e *SagaExecutor) RetryStep(ctx context.Context, sagaID, stepName string) error {
saga, err := e.repo.Get(ctx, sagaID)
if err != nil {
return fmt.Errorf("get saga: %w", err)
}
step := saga.GetStep(stepName)
if step == nil {
return fmt.Errorf("step %s not found", stepName)
}
if step.Status != domain.StepStatusFailed {
return fmt.Errorf("step %s is not failed (status: %s)", stepName, step.Status)
}
logger := e.logger.With(
logging.FieldSagaID, saga.ID,
logging.FieldSagaName, saga.Name,
logging.FieldStepName, stepName,
)
// Reset step to pending
step.Status = domain.StepStatusPending
step.Error = ""
step.RetryCount++
if err := e.repo.UpdateStep(ctx, step); err != nil {
return fmt.Errorf("update step: %w", err)
}
// If saga was failed, set it back to running
if saga.Status == domain.SagaStatusFailed {
saga.Status = domain.SagaStatusRunning
saga.Error = ""
if err := e.repo.Update(ctx, saga); err != nil {
return fmt.Errorf("update saga: %w", err)
}
}
logger.Info("retrying step", "retry_count", step.RetryCount)
// Continue execution
return e.runSteps(ctx, saga)
}
// SkipStep skips a step and continues execution.
func (e *SagaExecutor) SkipStep(ctx context.Context, sagaID, stepName string) error {
saga, err := e.repo.Get(ctx, sagaID)
if err != nil {
return fmt.Errorf("get saga: %w", err)
}
step := saga.GetStep(stepName)
if step == nil {
return fmt.Errorf("step %s not found", stepName)
}
if step.Status != domain.StepStatusFailed && step.Status != domain.StepStatusPending {
return fmt.Errorf("can only skip failed or pending steps (status: %s)", step.Status)
}
logger := e.logger.With(
logging.FieldSagaID, saga.ID,
logging.FieldSagaName, saga.Name,
logging.FieldStepName, stepName,
)
// Mark step as skipped
step.Status = domain.StepStatusSkipped
now := time.Now()
step.CompletedAt = &now
if err := e.repo.UpdateStep(ctx, step); err != nil {
return fmt.Errorf("update step: %w", err)
}
// If saga was failed, set it back to running
if saga.Status == domain.SagaStatusFailed {
saga.Status = domain.SagaStatusRunning
saga.Error = ""
if err := e.repo.Update(ctx, saga); err != nil {
return fmt.Errorf("update saga: %w", err)
}
}
logger.Info("step skipped")
// Continue execution
return e.runSteps(ctx, saga)
}
// runSteps executes pending steps until all complete or one fails.
func (e *SagaExecutor) runSteps(ctx context.Context, saga *domain.Saga) error {
for {
// Refresh saga state
saga, err := e.repo.Get(ctx, saga.ID)
if err != nil {
return fmt.Errorf("refresh saga: %w", err)
}
// Get runnable steps
runnable := saga.RunnableSteps()
if len(runnable) == 0 {
// No more runnable steps
break
}
// Execute each runnable step (could parallelize in future)
for _, step := range runnable {
if err := e.executeStep(ctx, saga, &step); err != nil {
// Step failed - saga is now failed
saga.Status = domain.SagaStatusFailed
saga.Error = err.Error()
if updateErr := e.repo.Update(ctx, saga); updateErr != nil {
e.logger.Error("failed to update saga status", logging.FieldError, updateErr)
}
return err
}
}
}
// All steps completed (or skipped)
saga.Status = domain.SagaStatusCompleted
now := time.Now()
saga.CompletedAt = &now
if err := e.repo.Update(ctx, saga); err != nil {
return fmt.Errorf("update saga status: %w", err)
}
e.logger.Info("saga completed",
logging.FieldSagaID, saga.ID,
logging.FieldSagaName, saga.Name,
)
return nil
}
// executeStep executes a single step with retry logic.
func (e *SagaExecutor) executeStep(ctx context.Context, saga *domain.Saga, step *domain.SagaStep) error {
logger := e.logger.With(
logging.FieldSagaID, saga.ID,
logging.FieldSagaName, saga.Name,
logging.FieldStepName, step.Name,
)
handler, ok := e.handlers[step.Action]
if !ok {
return fmt.Errorf("no handler registered for action %s", step.Action)
}
// Update saga current step
saga.CurrentStep = step.Name
if err := e.repo.Update(ctx, saga); err != nil {
return fmt.Errorf("update current step: %w", err)
}
// Mark step as running
step.Status = domain.StepStatusRunning
now := time.Now()
step.StartedAt = &now
if err := e.repo.UpdateStep(ctx, step); err != nil {
return fmt.Errorf("update step status: %w", err)
}
logger.Info("step started")
// Execute with retry logic
var output map[string]any
var execErr error
for attempt := 0; attempt <= step.RetryPolicy.MaxAttempts; attempt++ {
if attempt > 0 {
delay := e.calculateBackoff(step.RetryPolicy, attempt)
logger.Info("retrying step", "attempt", attempt, "delay", delay)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(delay):
}
}
output, execErr = handler.Execute(ctx, step, saga)
if execErr == nil {
break
}
// Check if error is from circuit breaker
if errors.Is(execErr, circuitbreaker.ErrCircuitOpen) {
logger.Warn("circuit breaker open, will retry", "attempt", attempt)
continue
}
logger.Warn("step execution failed", "attempt", attempt, logging.FieldError, execErr)
}
endTime := time.Now()
step.CompletedAt = &endTime
if execErr != nil {
step.Status = domain.StepStatusFailed
step.Error = execErr.Error()
if err := e.repo.UpdateStep(ctx, step); err != nil {
logger.Error("failed to update step status", logging.FieldError, err)
}
logger.Error("step failed", logging.FieldError, execErr)
return fmt.Errorf("step %s failed: %w", step.Name, execErr)
}
// Step succeeded
step.Status = domain.StepStatusCompleted
step.Output = output
if err := e.repo.UpdateStep(ctx, step); err != nil {
return fmt.Errorf("update step status: %w", err)
}
// Store output in saga outputs
if saga.Outputs == nil {
saga.Outputs = make(map[string]map[string]any)
}
saga.Outputs[step.Name] = output
if err := e.repo.Update(ctx, saga); err != nil {
return fmt.Errorf("update saga outputs: %w", err)
}
logger.Info("step completed", "duration_ms", endTime.Sub(*step.StartedAt).Milliseconds())
return nil
}
// runCompensation runs compensation steps in reverse order.
func (e *SagaExecutor) runCompensation(ctx context.Context, saga *domain.Saga) error {
// Get completed steps in reverse order
var completedSteps []domain.SagaStep
for _, step := range saga.Steps {
if step.Status == domain.StepStatusCompleted && step.Compensate != "" {
completedSteps = append(completedSteps, step)
}
}
// Reverse order (compensation runs in reverse of completion)
for i, j := 0, len(completedSteps)-1; i < j; i, j = i+1, j-1 {
completedSteps[i], completedSteps[j] = completedSteps[j], completedSteps[i]
}
// Run compensation for each
for _, step := range completedSteps {
compStep := saga.GetStep(step.Compensate)
if compStep == nil {
e.logger.Warn("compensation step not found",
logging.FieldSagaID, saga.ID,
logging.FieldStepName, step.Compensate,
)
continue
}
if err := e.executeStep(ctx, saga, compStep); err != nil {
return fmt.Errorf("compensation step %s failed: %w", step.Compensate, err)
}
}
return nil
}
// calculateBackoff calculates the delay before the next retry.
func (e *SagaExecutor) calculateBackoff(policy domain.RetryPolicy, attempt int) time.Duration {
switch policy.BackoffType {
case domain.BackoffNone:
return policy.InitialDelay
case domain.BackoffLinear:
delay := policy.InitialDelay * time.Duration(attempt)
if delay > policy.MaxDelay {
return policy.MaxDelay
}
return delay
case domain.BackoffExponential:
delay := policy.InitialDelay * time.Duration(1<<uint(attempt-1))
if delay > policy.MaxDelay {
return policy.MaxDelay
}
return delay
default:
return policy.InitialDelay
}
}