diff --git a/cmd/rdev-api/main.go b/cmd/rdev-api/main.go index 1eb45bd..c158d23 100644 --- a/cmd/rdev-api/main.go +++ b/cmd/rdev-api/main.go @@ -430,6 +430,11 @@ func main() { sdlcHandler := handlers.NewSDLCHandler(sdlcService) sdlcOrchestratorHandler := handlers.NewSDLCOrchestratorHandler(sdlcOrchestrator) + // Initialize saga system (resilient workflow orchestration) + sagaRepo := postgres.NewSagaRepository(database.DB) + sagaExecutor := service.NewSagaExecutor(sagaRepo, logger) + sagaHandler := handlers.NewSagaHandler(sagaRepo, sagaExecutor) + // SDLC generate service (async artifact generation via work queue) apiBaseURL := envutil.GetEnv("RDEV_API_URL", "https://rdev.masq-ops.orchard9.ai") sdlcGenerateService := service.NewSDLCGenerateService( @@ -502,6 +507,7 @@ func main() { app.Router().Get("/health", healthHandler.Health) app.Router().Get("/ready", healthHandler.Ready) + app.Router().Get("/health/circuits", healthHandler.Circuits) // Register routes projectsHandler.Mount(app.Router()) @@ -529,6 +535,7 @@ func main() { sdlcGenerateHandler.Mount(app.Router()) sdlcCallbackHandler.Mount(app.Router()) verifyHandler.Mount(app.Router()) + sagaHandler.Mount(app.Router()) // Start queue processor worker (per-project command queue) queueProcessor := worker.NewQueueProcessor( diff --git a/internal/adapter/postgres/saga_repository.go b/internal/adapter/postgres/saga_repository.go new file mode 100644 index 0000000..10b5913 --- /dev/null +++ b/internal/adapter/postgres/saga_repository.go @@ -0,0 +1,495 @@ +// Package postgres provides PostgreSQL-based implementations of port interfaces. +package postgres + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "strings" + + "github.com/lib/pq" + + "github.com/orchard9/rdev/internal/domain" + "github.com/orchard9/rdev/internal/port" +) + +// SagaRepository implements port.SagaRepository using PostgreSQL. +type SagaRepository struct { + db *sql.DB +} + +// NewSagaRepository creates a new PostgreSQL saga repository. +func NewSagaRepository(db *sql.DB) *SagaRepository { + return &SagaRepository{db: db} +} + +// Ensure SagaRepository implements port.SagaRepository at compile time. +var _ port.SagaRepository = (*SagaRepository)(nil) + +// ErrSagaNotFound is returned when a saga is not found. +var ErrSagaNotFound = errors.New("saga not found") + +// Create creates a new saga with its steps. +func (r *SagaRepository) Create(ctx context.Context, saga *domain.Saga) error { + tx, err := r.db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("begin transaction: %w", err) + } + defer func() { _ = tx.Rollback() }() + + varsJSON, err := json.Marshal(saga.Vars) + if err != nil { + return fmt.Errorf("marshal vars: %w", err) + } + + outputsJSON, err := json.Marshal(saga.Outputs) + if err != nil { + return fmt.Errorf("marshal outputs: %w", err) + } + + // Insert saga + err = tx.QueryRowContext(ctx, ` + INSERT INTO sagas ( + name, status, definition, vars, outputs, current_step, + retry_count, max_retries, error + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + RETURNING id, created_at, updated_at + `, + saga.Name, + string(saga.Status), + nullString(saga.Definition), + varsJSON, + outputsJSON, + nullString(saga.CurrentStep), + saga.RetryCount, + saga.MaxRetries, + nullString(saga.Error), + ).Scan(&saga.ID, &saga.CreatedAt, &saga.UpdatedAt) + if err != nil { + return fmt.Errorf("insert saga: %w", err) + } + + // Insert steps + for i := range saga.Steps { + step := &saga.Steps[i] + step.SagaID = saga.ID + + retryPolicyJSON, err := json.Marshal(step.RetryPolicy) + if err != nil { + return fmt.Errorf("marshal retry policy: %w", err) + } + + configJSON, err := json.Marshal(step.Config) + if err != nil { + return fmt.Errorf("marshal config: %w", err) + } + + err = tx.QueryRowContext(ctx, ` + INSERT INTO saga_steps ( + saga_id, name, status, action, depends_on, retry_policy, + compensate, config, retry_count + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + RETURNING id + `, + saga.ID, + step.Name, + string(step.Status), + step.Action, + pq.Array(step.DependsOn), + retryPolicyJSON, + nullString(step.Compensate), + configJSON, + step.RetryCount, + ).Scan(&step.ID) + if err != nil { + return fmt.Errorf("insert step %s: %w", step.Name, err) + } + } + + if err := tx.Commit(); err != nil { + return fmt.Errorf("commit transaction: %w", err) + } + + return nil +} + +// Get returns a saga by ID, including all steps. +func (r *SagaRepository) Get(ctx context.Context, id string) (*domain.Saga, error) { + saga, err := r.getSaga(ctx, id) + if err != nil { + return nil, err + } + + steps, err := r.getSteps(ctx, id) + if err != nil { + return nil, err + } + saga.Steps = steps + + return saga, nil +} + +// getSaga retrieves just the saga record (no steps). +func (r *SagaRepository) getSaga(ctx context.Context, id string) (*domain.Saga, error) { + row := r.db.QueryRowContext(ctx, ` + SELECT id, name, status, definition, vars, outputs, current_step, + retry_count, max_retries, error, created_at, updated_at, completed_at + FROM sagas + WHERE id = $1 + `, id) + + return r.scanSaga(row) +} + +// getSteps retrieves all steps for a saga. +func (r *SagaRepository) getSteps(ctx context.Context, sagaID string) ([]domain.SagaStep, error) { + rows, err := r.db.QueryContext(ctx, ` + SELECT id, saga_id, name, status, action, depends_on, retry_policy, + compensate, config, output, error, retry_count, started_at, completed_at + FROM saga_steps + WHERE saga_id = $1 + ORDER BY id + `, sagaID) + if err != nil { + return nil, fmt.Errorf("query steps: %w", err) + } + defer func() { _ = rows.Close() }() + + var steps []domain.SagaStep + for rows.Next() { + step, err := r.scanStep(rows) + if err != nil { + return nil, err + } + steps = append(steps, *step) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterate steps: %w", err) + } + + return steps, nil +} + +// Update updates a saga's status and metadata (not steps). +func (r *SagaRepository) Update(ctx context.Context, saga *domain.Saga) error { + varsJSON, err := json.Marshal(saga.Vars) + if err != nil { + return fmt.Errorf("marshal vars: %w", err) + } + + outputsJSON, err := json.Marshal(saga.Outputs) + if err != nil { + return fmt.Errorf("marshal outputs: %w", err) + } + + res, err := r.db.ExecContext(ctx, ` + UPDATE sagas SET + status = $2, + vars = $3, + outputs = $4, + current_step = $5, + retry_count = $6, + error = $7, + completed_at = $8 + WHERE id = $1 + `, + saga.ID, + string(saga.Status), + varsJSON, + outputsJSON, + nullString(saga.CurrentStep), + saga.RetryCount, + nullString(saga.Error), + nullTime(saga.CompletedAt), + ) + if err != nil { + return fmt.Errorf("update saga: %w", err) + } + + rows, err := res.RowsAffected() + if err != nil { + return fmt.Errorf("rows affected: %w", err) + } + if rows == 0 { + return ErrSagaNotFound + } + + return nil +} + +// UpdateStep updates a single step's status and output. +func (r *SagaRepository) UpdateStep(ctx context.Context, step *domain.SagaStep) error { + var outputJSON []byte + var err error + if step.Output != nil { + outputJSON, err = json.Marshal(step.Output) + if err != nil { + return fmt.Errorf("marshal output: %w", err) + } + } + + res, err := r.db.ExecContext(ctx, ` + UPDATE saga_steps SET + status = $2, + output = $3, + error = $4, + retry_count = $5, + started_at = $6, + completed_at = $7 + WHERE id = $1 + `, + step.ID, + string(step.Status), + outputJSON, + nullString(step.Error), + step.RetryCount, + nullTime(step.StartedAt), + nullTime(step.CompletedAt), + ) + if err != nil { + return fmt.Errorf("update step: %w", err) + } + + rows, err := res.RowsAffected() + if err != nil { + return fmt.Errorf("rows affected: %w", err) + } + if rows == 0 { + return fmt.Errorf("step %s not found", step.ID) + } + + return nil +} + +// List returns sagas matching the given filters. +func (r *SagaRepository) List(ctx context.Context, filters domain.SagaFilters) ([]*domain.Saga, error) { + filters.Normalize() + + query := strings.Builder{} + query.WriteString(` + SELECT id, name, status, definition, vars, outputs, current_step, + retry_count, max_retries, error, created_at, updated_at, completed_at + FROM sagas + WHERE 1=1 + `) + + args := []any{} + argNum := 1 + + if filters.Name != "" { + fmt.Fprintf(&query, " AND name = $%d", argNum) + args = append(args, filters.Name) + argNum++ + } + + if filters.Status != "" { + fmt.Fprintf(&query, " AND status = $%d", argNum) + args = append(args, string(filters.Status)) + argNum++ + } + + if !filters.Since.IsZero() { + fmt.Fprintf(&query, " AND created_at >= $%d", argNum) + args = append(args, filters.Since) + argNum++ + } + + query.WriteString(" ORDER BY created_at DESC") + + fmt.Fprintf(&query, " LIMIT $%d", argNum) + args = append(args, filters.Limit) + + rows, err := r.db.QueryContext(ctx, query.String(), args...) + if err != nil { + return nil, fmt.Errorf("query sagas: %w", err) + } + defer func() { _ = rows.Close() }() + + var sagas []*domain.Saga + for rows.Next() { + saga, err := r.scanSagaRows(rows) + if err != nil { + return nil, err + } + sagas = append(sagas, saga) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterate sagas: %w", err) + } + + return sagas, nil +} + +// Delete removes a saga and its steps (cascade). +func (r *SagaRepository) Delete(ctx context.Context, id string) error { + res, err := r.db.ExecContext(ctx, `DELETE FROM sagas WHERE id = $1`, id) + if err != nil { + return fmt.Errorf("delete saga: %w", err) + } + + rows, err := res.RowsAffected() + if err != nil { + return fmt.Errorf("rows affected: %w", err) + } + if rows == 0 { + return ErrSagaNotFound + } + + return nil +} + +// GetPendingSteps returns steps ready to execute (no unmet dependencies). +func (r *SagaRepository) GetPendingSteps(ctx context.Context, sagaID string) ([]domain.SagaStep, error) { + // Get all steps + steps, err := r.getSteps(ctx, sagaID) + if err != nil { + return nil, err + } + + // Build completed steps map (includes skipped steps for dependency resolution) + completed := make(map[string]bool) + for _, step := range steps { + if step.Status == domain.StepStatusCompleted || step.Status == domain.StepStatusSkipped { + completed[step.Name] = true + } + } + + // Find runnable steps + var runnable []domain.SagaStep + for _, step := range steps { + if step.CanRun(completed) { + runnable = append(runnable, step) + } + } + + return runnable, nil +} + +// sagaScanner interface abstracts sql.Row and sql.Rows for scanning. +type sagaScanner interface { + Scan(dest ...any) error +} + +// scanSaga scans a saga from a QueryRow result. +func (r *SagaRepository) scanSaga(row *sql.Row) (*domain.Saga, error) { + saga, err := r.scanSagaFrom(row) + if errors.Is(err, sql.ErrNoRows) { + return nil, ErrSagaNotFound + } + return saga, err +} + +// scanSagaRows scans a saga from a Rows result. +func (r *SagaRepository) scanSagaRows(rows *sql.Rows) (*domain.Saga, error) { + return r.scanSagaFrom(rows) +} + +// scanSagaFrom scans a saga from any scanner (Row or Rows). +func (r *SagaRepository) scanSagaFrom(scanner sagaScanner) (*domain.Saga, error) { + var saga domain.Saga + var status string + var definition, currentStep, sagaError sql.NullString + var completedAt sql.NullTime + var varsJSON, outputsJSON []byte + + err := scanner.Scan( + &saga.ID, &saga.Name, &status, &definition, &varsJSON, &outputsJSON, + ¤tStep, &saga.RetryCount, &saga.MaxRetries, &sagaError, + &saga.CreatedAt, &saga.UpdatedAt, &completedAt, + ) + if err != nil { + return nil, fmt.Errorf("scan saga: %w", err) + } + + saga.Status = domain.SagaStatus(status) + if definition.Valid { + saga.Definition = definition.String + } + if currentStep.Valid { + saga.CurrentStep = currentStep.String + } + if sagaError.Valid { + saga.Error = sagaError.String + } + if completedAt.Valid { + saga.CompletedAt = &completedAt.Time + } + if len(varsJSON) > 0 { + if err := json.Unmarshal(varsJSON, &saga.Vars); err != nil { + return nil, fmt.Errorf("unmarshal vars: %w", err) + } + } + if len(outputsJSON) > 0 { + if err := json.Unmarshal(outputsJSON, &saga.Outputs); err != nil { + return nil, fmt.Errorf("unmarshal outputs: %w", err) + } + } + return &saga, nil +} + +// scanStep scans a step from a Rows result. +func (r *SagaRepository) scanStep(rows *sql.Rows) (*domain.SagaStep, error) { + var step domain.SagaStep + var status string + var compensate, stepError sql.NullString + var startedAt, completedAt sql.NullTime + var retryPolicyJSON, configJSON, outputJSON []byte + var dependsOn pq.StringArray + + err := rows.Scan( + &step.ID, + &step.SagaID, + &step.Name, + &status, + &step.Action, + &dependsOn, + &retryPolicyJSON, + &compensate, + &configJSON, + &outputJSON, + &stepError, + &step.RetryCount, + &startedAt, + &completedAt, + ) + if err != nil { + return nil, fmt.Errorf("scan step: %w", err) + } + + step.Status = domain.StepStatus(status) + step.DependsOn = []string(dependsOn) + if compensate.Valid { + step.Compensate = compensate.String + } + if stepError.Valid { + step.Error = stepError.String + } + if startedAt.Valid { + step.StartedAt = &startedAt.Time + } + if completedAt.Valid { + step.CompletedAt = &completedAt.Time + } + + if len(retryPolicyJSON) > 0 { + if err := json.Unmarshal(retryPolicyJSON, &step.RetryPolicy); err != nil { + return nil, fmt.Errorf("unmarshal retry policy: %w", err) + } + } + if len(configJSON) > 0 { + if err := json.Unmarshal(configJSON, &step.Config); err != nil { + return nil, fmt.Errorf("unmarshal config: %w", err) + } + } + if len(outputJSON) > 0 { + if err := json.Unmarshal(outputJSON, &step.Output); err != nil { + return nil, fmt.Errorf("unmarshal output: %w", err) + } + } + + return &step, nil +} diff --git a/internal/adapter/templates/templates/components/app-astro/.woodpecker.step.yml.tmpl b/internal/adapter/templates/templates/components/app-astro/.woodpecker.step.yml.tmpl index 6706fb5..abede19 100644 --- a/internal/adapter/templates/templates/components/app-astro/.woodpecker.step.yml.tmpl +++ b/internal/adapter/templates/templates/components/app-astro/.woodpecker.step.yml.tmpl @@ -2,8 +2,9 @@ # Add this step to your .woodpecker.yml build-{{COMPONENT_NAME}}: - depends_on: [deps] + depends_on: [preflight] image: woodpeckerci/plugin-kaniko + failure: retry settings: registry: registry.threesix.ai repo: {{PROJECT_NAME}}/{{COMPONENT_NAME}} diff --git a/internal/adapter/templates/templates/components/app-nextjs/.woodpecker.step.yml.tmpl b/internal/adapter/templates/templates/components/app-nextjs/.woodpecker.step.yml.tmpl index 0bf80f1..2316184 100644 --- a/internal/adapter/templates/templates/components/app-nextjs/.woodpecker.step.yml.tmpl +++ b/internal/adapter/templates/templates/components/app-nextjs/.woodpecker.step.yml.tmpl @@ -2,8 +2,9 @@ # Add this step to your .woodpecker.yml build-{{COMPONENT_NAME}}: - depends_on: [deps] + depends_on: [preflight] image: woodpeckerci/plugin-kaniko + failure: retry settings: registry: registry.threesix.ai repo: {{PROJECT_NAME}}/{{COMPONENT_NAME}} diff --git a/internal/adapter/templates/templates/components/app-react/.woodpecker.step.yml.tmpl b/internal/adapter/templates/templates/components/app-react/.woodpecker.step.yml.tmpl index 29c1b17..ca30e05 100644 --- a/internal/adapter/templates/templates/components/app-react/.woodpecker.step.yml.tmpl +++ b/internal/adapter/templates/templates/components/app-react/.woodpecker.step.yml.tmpl @@ -2,8 +2,9 @@ # Add this step to your .woodpecker.yml build-{{COMPONENT_NAME}}: - depends_on: [deps] + depends_on: [preflight] image: woodpeckerci/plugin-kaniko + failure: retry settings: registry: registry.threesix.ai repo: {{PROJECT_NAME}}/{{COMPONENT_NAME}} diff --git a/internal/adapter/templates/templates/components/service/.woodpecker.step.yml.tmpl b/internal/adapter/templates/templates/components/service/.woodpecker.step.yml.tmpl index a6df464..486ef54 100644 --- a/internal/adapter/templates/templates/components/service/.woodpecker.step.yml.tmpl +++ b/internal/adapter/templates/templates/components/service/.woodpecker.step.yml.tmpl @@ -2,8 +2,9 @@ # Add this step to your .woodpecker.yml build-{{COMPONENT_NAME}}: - depends_on: [deps] + depends_on: [preflight] image: woodpeckerci/plugin-kaniko + failure: retry settings: registry: registry.threesix.ai repo: {{PROJECT_NAME}}/{{COMPONENT_NAME}} diff --git a/internal/adapter/templates/templates/components/worker/.woodpecker.step.yml.tmpl b/internal/adapter/templates/templates/components/worker/.woodpecker.step.yml.tmpl index 2bd360e..2a1a5e2 100644 --- a/internal/adapter/templates/templates/components/worker/.woodpecker.step.yml.tmpl +++ b/internal/adapter/templates/templates/components/worker/.woodpecker.step.yml.tmpl @@ -2,8 +2,9 @@ # Add this step to your .woodpecker.yml build-{{COMPONENT_NAME}}: - depends_on: [deps] + depends_on: [preflight] image: woodpeckerci/plugin-kaniko + failure: retry settings: registry: registry.threesix.ai repo: {{PROJECT_NAME}}/{{COMPONENT_NAME}} diff --git a/internal/adapter/templates/templates/skeleton/.woodpecker.yml.tmpl b/internal/adapter/templates/templates/skeleton/.woodpecker.yml.tmpl index d6baae1..774927e 100644 --- a/internal/adapter/templates/templates/skeleton/.woodpecker.yml.tmpl +++ b/internal/adapter/templates/templates/skeleton/.woodpecker.yml.tmpl @@ -34,6 +34,24 @@ steps: branch: main event: push + # Pre-flight registry health check before builds + preflight: + depends_on: [deps] + image: alpine/curl + commands: + - | + echo "==> Checking registry health before builds" + HTTP_CODE=$(curl -s -o /dev/null -w "%{http_code}" --insecure --connect-timeout 10 https://registry.threesix.ai/v2/) + if [ "$HTTP_CODE" != "200" ] && [ "$HTTP_CODE" != "401" ]; then + echo "ERROR: Registry unhealthy (HTTP $HTTP_CODE), aborting build" + echo "Registry must return 200 or 401 (auth required) to proceed" + exit 1 + fi + echo "==> Registry healthy (HTTP $HTTP_CODE)" + when: + branch: main + event: push + # COMPONENT_STEPS_BELOW # Do not remove the marker above - component steps are inserted here @@ -48,10 +66,26 @@ steps: branch: main event: push - # Export OpenAPI specs from built services - # Runs after build-complete to ensure all services are ready - export-openapi: + # Services deployed sync point - fires after all deployments complete + # Use this to detect when services are ready (before docs generation) + # This allows wait_pipeline to succeed before docs steps run + services-deployed: depends_on: [build-complete] + image: alpine:3.19 + commands: + - echo "==> All services deployed successfully" + - echo " Pipeline is now considered successful for service deployment" + - echo " Documentation generation continues independently" + when: + branch: main + event: push + + # Export OpenAPI specs from built services + # Runs after services-deployed to ensure all services are ready + # Uses failure:ignore so doc failures don't block pipeline success + export-openapi: + depends_on: [services-deployed] + failure: ignore image: golang:1.23 commands: - | diff --git a/internal/adapter/woodpecker/client.go b/internal/adapter/woodpecker/client.go index 57af3f6..ba22559 100644 --- a/internal/adapter/woodpecker/client.go +++ b/internal/adapter/woodpecker/client.go @@ -24,6 +24,7 @@ import ( "go.woodpecker-ci.org/woodpecker/v3/woodpecker-go/woodpecker" + "github.com/orchard9/rdev/internal/circuitbreaker" "github.com/orchard9/rdev/internal/domain" "github.com/orchard9/rdev/internal/port" ) @@ -50,6 +51,7 @@ type Client struct { client woodpecker.Client url string logger *slog.Logger + cb *circuitbreaker.CircuitBreaker } // NewClient creates a new Woodpecker client. @@ -83,6 +85,7 @@ func NewClient(url, token string, opts ...ClientOption) (*Client, error) { client: client, url: url, logger: slog.Default(), + cb: circuitbreaker.GlobalRegistry.Get(circuitbreaker.NameWoodpecker), } // Apply options @@ -105,6 +108,23 @@ func WithLogger(logger *slog.Logger) ClientOption { } } +// WithCircuitBreaker sets a custom circuit breaker for the client. +func WithCircuitBreaker(cb *circuitbreaker.CircuitBreaker) ClientOption { + return func(c *Client) { + if cb != nil { + c.cb = cb + } + } +} + +// executeWithCircuitBreaker wraps a function call with circuit breaker protection. +func (c *Client) executeWithCircuitBreaker(fn func() error) error { + if c.cb == nil { + return fn() + } + return c.cb.Execute(fn) +} + // ActivateRepo enables CI for a repository. // The forge parameter is unused (Woodpecker determines this from its config). // owner/repo must match the repository in the forge. diff --git a/internal/adapter/woodpecker/pipelines.go b/internal/adapter/woodpecker/pipelines.go index e1a186e..7f522a0 100644 --- a/internal/adapter/woodpecker/pipelines.go +++ b/internal/adapter/woodpecker/pipelines.go @@ -153,9 +153,14 @@ func (c *Client) TriggerBuild(ctx context.Context, owner, repo, branch string) ( return 0, fmt.Errorf("repo not found: %s", fullName) } - // Create a new pipeline for the branch - pipeline, err := c.client.PipelineCreate(r.ID, &woodpecker.PipelineOptions{ - Branch: branch, + // Create a new pipeline for the branch (with circuit breaker protection) + var pipeline *woodpecker.Pipeline + err = c.executeWithCircuitBreaker(func() error { + var createErr error + pipeline, createErr = c.client.PipelineCreate(r.ID, &woodpecker.PipelineOptions{ + Branch: branch, + }) + return createErr }) if err != nil { return 0, fmt.Errorf("failed to trigger build: %w", err) @@ -165,6 +170,36 @@ func (c *Client) TriggerBuild(ctx context.Context, owner, repo, branch string) ( return pipeline.Number, nil } +// RetryPipeline restarts a failed or stopped pipeline. +func (c *Client) RetryPipeline(ctx context.Context, owner, repo string, number int64) (*domain.CIPipeline, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + fullName := owner + "/" + repo + + r, err := c.client.RepoLookup(fullName) + if err != nil { + return nil, fmt.Errorf("repo not found: %s", fullName) + } + + // Restart the pipeline using PipelineStart (with circuit breaker protection) + var pipeline *woodpecker.Pipeline + err = c.executeWithCircuitBreaker(func() error { + var startErr error + pipeline, startErr = c.client.PipelineStart(r.ID, number, woodpecker.PipelineStartOptions{}) + return startErr + }) + if err != nil { + return nil, fmt.Errorf("failed to retry pipeline %d: %w", number, err) + } + + c.logger.Info("pipeline retried", "repo", fullName, "pipeline", number, "new_status", pipeline.Status) + return pipelineFromWoodpecker(pipeline), nil +} + // pipelineFromWoodpecker converts a woodpecker.Pipeline to domain.CIPipeline. func pipelineFromWoodpecker(p *woodpecker.Pipeline) *domain.CIPipeline { var started, finished time.Time diff --git a/internal/circuitbreaker/circuitbreaker.go b/internal/circuitbreaker/circuitbreaker.go index f1e2d29..2dbbb9d 100644 --- a/internal/circuitbreaker/circuitbreaker.go +++ b/internal/circuitbreaker/circuitbreaker.go @@ -41,8 +41,14 @@ var ( ErrCircuitOpen = errors.New("circuit breaker is open") ) +// StateChangeCallback is called when the circuit breaker state changes. +type StateChangeCallback func(name string, from, to State) + // Config configures the circuit breaker behavior. type Config struct { + // Name is the identifier for this circuit breaker (used in callbacks). + Name string + // FailureThreshold is the number of consecutive failures before opening. // Default: 5 FailureThreshold int @@ -54,6 +60,9 @@ type Config struct { // HalfOpenRequests is how many requests to allow in half-open state. // Default: 1 HalfOpenRequests int + + // OnStateChange is called when the circuit breaker state changes. + OnStateChange StateChangeCallback } // DefaultConfig returns sensible defaults. @@ -119,8 +128,12 @@ func (cb *CircuitBreaker) canExecute() bool { case Open: // Check if reset timeout has passed if time.Since(cb.lastFailure) > cb.cfg.ResetTimeout { + oldState := cb.state cb.state = HalfOpen cb.halfOpenRequests = 0 + if cb.cfg.OnStateChange != nil { + cb.cfg.OnStateChange(cb.cfg.Name, oldState, cb.state) + } return true } return false @@ -155,6 +168,7 @@ func (cb *CircuitBreaker) onFailure() { cb.successes = 0 cb.lastFailure = time.Now() + oldState := cb.state switch cb.state { case Closed: if cb.failures >= cb.cfg.FailureThreshold { @@ -163,12 +177,17 @@ func (cb *CircuitBreaker) onFailure() { case HalfOpen: cb.state = Open } + + if oldState != cb.state && cb.cfg.OnStateChange != nil { + cb.cfg.OnStateChange(cb.cfg.Name, oldState, cb.state) + } } // onSuccess handles a successful operation. func (cb *CircuitBreaker) onSuccess() { cb.successes++ + oldState := cb.state switch cb.state { case Closed: cb.failures = 0 @@ -177,6 +196,10 @@ func (cb *CircuitBreaker) onSuccess() { cb.state = Closed cb.failures = 0 } + + if oldState != cb.state && cb.cfg.OnStateChange != nil { + cb.cfg.OnStateChange(cb.cfg.Name, oldState, cb.state) + } } // State returns the current circuit state. diff --git a/internal/circuitbreaker/registry.go b/internal/circuitbreaker/registry.go new file mode 100644 index 0000000..f460a66 --- /dev/null +++ b/internal/circuitbreaker/registry.go @@ -0,0 +1,160 @@ +// Package circuitbreaker provides protection against cascading failures. +package circuitbreaker + +import ( + "sync" + "time" + + "github.com/orchard9/rdev/internal/metrics" +) + +// Registry manages named circuit breakers. +type Registry struct { + mu sync.RWMutex + breakers map[string]*CircuitBreaker + configs map[string]Config +} + +// NewRegistry creates a new circuit breaker registry. +func NewRegistry() *Registry { + return &Registry{ + breakers: make(map[string]*CircuitBreaker), + configs: make(map[string]Config), + } +} + +// Get returns the circuit breaker for the given name, creating one if needed. +func (r *Registry) Get(name string) *CircuitBreaker { + r.mu.RLock() + cb, ok := r.breakers[name] + r.mu.RUnlock() + if ok { + return cb + } + + r.mu.Lock() + defer r.mu.Unlock() + + // Double-check after acquiring write lock + if cb, ok = r.breakers[name]; ok { + return cb + } + + // Use custom config if set, otherwise use defaults + cfg, ok := r.configs[name] + if !ok { + cfg = DefaultConfig() + } + + // Ensure name is set for callbacks + cfg.Name = name + + // Set up metrics callback if not already configured + if cfg.OnStateChange == nil { + cfg.OnStateChange = defaultStateChangeCallback + } + + cb = New(cfg) + r.breakers[name] = cb + return cb +} + +// Configure sets the configuration for a named circuit breaker. +// Must be called before Get() for the configuration to take effect. +func (r *Registry) Configure(name string, cfg Config) { + r.mu.Lock() + defer r.mu.Unlock() + r.configs[name] = cfg +} + +// StatusEntry contains the status of a single circuit breaker. +type StatusEntry struct { + Name string `json:"name"` + State string `json:"state"` + Failures int `json:"failures"` + LastFailure *time.Time `json:"last_failure,omitempty"` +} + +// AllStatus returns the status of all registered circuit breakers. +func (r *Registry) AllStatus() []StatusEntry { + r.mu.RLock() + defer r.mu.RUnlock() + + entries := make([]StatusEntry, 0, len(r.breakers)) + for name, cb := range r.breakers { + stats := cb.Stats() + entry := StatusEntry{ + Name: name, + State: stats.State.String(), + Failures: stats.Failures, + } + if !stats.LastFailure.IsZero() { + t := stats.LastFailure.UTC() + entry.LastFailure = &t + } + entries = append(entries, entry) + } + return entries +} + +// Reset resets a specific circuit breaker by name. +func (r *Registry) Reset(name string) bool { + r.mu.RLock() + cb, ok := r.breakers[name] + r.mu.RUnlock() + if !ok { + return false + } + cb.Reset() + return true +} + +// ResetAll resets all circuit breakers. +func (r *Registry) ResetAll() { + r.mu.RLock() + defer r.mu.RUnlock() + for _, cb := range r.breakers { + cb.Reset() + } +} + +// GlobalRegistry is the default global circuit breaker registry. +var GlobalRegistry = NewRegistry() + +// defaultStateChangeCallback updates Prometheus metrics when circuit breaker state changes. +func defaultStateChangeCallback(name string, from, to State) { + // State values: 0=closed, 1=half-open, 2=open + metrics.SetCircuitBreakerState(name, int(to)) +} + +// Known circuit breaker names for external systems. +const ( + NameWoodpecker = "woodpecker" + NameGitea = "gitea" + NameRegistry = "registry" + NameCloudflare = "cloudflare" +) + +func init() { + // Configure default circuit breakers with appropriate settings + GlobalRegistry.Configure(NameWoodpecker, Config{ + FailureThreshold: 3, + ResetTimeout: 30 * time.Second, + HalfOpenRequests: 1, + }) + GlobalRegistry.Configure(NameGitea, Config{ + FailureThreshold: 3, + ResetTimeout: 30 * time.Second, + HalfOpenRequests: 1, + }) + GlobalRegistry.Configure(NameRegistry, Config{ + FailureThreshold: 5, + ResetTimeout: 60 * time.Second, + HalfOpenRequests: 1, + }) + GlobalRegistry.Configure(NameCloudflare, Config{ + FailureThreshold: 5, + ResetTimeout: 60 * time.Second, + HalfOpenRequests: 1, + }) +} diff --git a/internal/db/migrations/018_sagas.sql b/internal/db/migrations/018_sagas.sql new file mode 100644 index 0000000..4cfe039 --- /dev/null +++ b/internal/db/migrations/018_sagas.sql @@ -0,0 +1,78 @@ +-- Saga pattern for resilient multi-step workflows +-- Sagas track multi-step operations with retry and compensation support + +-- +goose Up + +-- Main sagas table +CREATE TABLE sagas ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + name TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + definition TEXT, + vars JSONB NOT NULL DEFAULT '{}', + outputs JSONB NOT NULL DEFAULT '{}', + current_step TEXT, + retry_count INT NOT NULL DEFAULT 0, + max_retries INT NOT NULL DEFAULT 3, + error TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + completed_at TIMESTAMPTZ, + + -- Constraints + CONSTRAINT valid_status CHECK (status IN ('pending', 'running', 'completed', 'failed', 'compensating', 'compensated')) +); + +-- Saga steps table +CREATE TABLE saga_steps ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + saga_id UUID NOT NULL REFERENCES sagas(id) ON DELETE CASCADE, + name TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + action TEXT NOT NULL, + depends_on TEXT[] NOT NULL DEFAULT '{}', + retry_policy JSONB NOT NULL DEFAULT '{"max_attempts": 3, "backoff_type": "exponential", "initial_delay": "5s", "max_delay": "60s"}', + compensate TEXT, + config JSONB NOT NULL DEFAULT '{}', + output JSONB, + error TEXT, + retry_count INT NOT NULL DEFAULT 0, + started_at TIMESTAMPTZ, + completed_at TIMESTAMPTZ, + + -- Constraints + CONSTRAINT unique_step_per_saga UNIQUE (saga_id, name), + CONSTRAINT valid_step_status CHECK (status IN ('pending', 'running', 'completed', 'failed', 'skipped')) +); + +-- Indexes for common queries +CREATE INDEX idx_sagas_status ON sagas(status) WHERE status IN ('pending', 'running', 'compensating'); +CREATE INDEX idx_sagas_name ON sagas(name); +CREATE INDEX idx_sagas_created_at ON sagas(created_at DESC); +CREATE INDEX idx_saga_steps_saga_id ON saga_steps(saga_id); +CREATE INDEX idx_saga_steps_status ON saga_steps(saga_id, status); + +-- Update timestamp trigger +CREATE OR REPLACE FUNCTION update_saga_updated_at() +RETURNS TRIGGER AS $$ +BEGIN + NEW.updated_at = NOW(); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER trigger_saga_updated_at + BEFORE UPDATE ON sagas + FOR EACH ROW + EXECUTE FUNCTION update_saga_updated_at(); + +-- +goose Down +DROP TRIGGER IF EXISTS trigger_saga_updated_at ON sagas; +DROP FUNCTION IF EXISTS update_saga_updated_at(); +DROP INDEX IF EXISTS idx_saga_steps_status; +DROP INDEX IF EXISTS idx_saga_steps_saga_id; +DROP INDEX IF EXISTS idx_sagas_created_at; +DROP INDEX IF EXISTS idx_sagas_name; +DROP INDEX IF EXISTS idx_sagas_status; +DROP TABLE IF EXISTS saga_steps; +DROP TABLE IF EXISTS sagas; diff --git a/internal/domain/saga.go b/internal/domain/saga.go new file mode 100644 index 0000000..c56362b --- /dev/null +++ b/internal/domain/saga.go @@ -0,0 +1,245 @@ +package domain + +import ( + "time" +) + +// SagaStatus represents the status of a saga execution. +type SagaStatus string + +const ( + // SagaStatusPending indicates the saga hasn't started yet. + SagaStatusPending SagaStatus = "pending" + // SagaStatusRunning indicates the saga is executing. + SagaStatusRunning SagaStatus = "running" + // SagaStatusCompleted indicates the saga finished successfully. + SagaStatusCompleted SagaStatus = "completed" + // SagaStatusFailed indicates the saga failed and may need compensation. + SagaStatusFailed SagaStatus = "failed" + // SagaStatusCompensating indicates compensation is in progress. + SagaStatusCompensating SagaStatus = "compensating" + // SagaStatusCompensated indicates compensation completed successfully. + SagaStatusCompensated SagaStatus = "compensated" +) + +// IsValid returns true if the status is known. +func (s SagaStatus) IsValid() bool { + switch s { + case SagaStatusPending, SagaStatusRunning, SagaStatusCompleted, + SagaStatusFailed, SagaStatusCompensating, SagaStatusCompensated: + return true + } + return false +} + +// IsTerminal returns true if the status is a final state. +func (s SagaStatus) IsTerminal() bool { + return s == SagaStatusCompleted || s == SagaStatusFailed || s == SagaStatusCompensated +} + +// StepStatus represents the status of a saga step. +type StepStatus string + +const ( + StepStatusPending StepStatus = "pending" + StepStatusRunning StepStatus = "running" + StepStatusCompleted StepStatus = "completed" + StepStatusFailed StepStatus = "failed" + StepStatusSkipped StepStatus = "skipped" +) + +// IsValid returns true if the status is known. +func (s StepStatus) IsValid() bool { + switch s { + case StepStatusPending, StepStatusRunning, StepStatusCompleted, + StepStatusFailed, StepStatusSkipped: + return true + } + return false +} + +// IsTerminal returns true if the status is a final state. +func (s StepStatus) IsTerminal() bool { + return s == StepStatusCompleted || s == StepStatusFailed || s == StepStatusSkipped +} + +// BackoffType represents the type of retry backoff. +type BackoffType string + +const ( + BackoffNone BackoffType = "none" + BackoffLinear BackoffType = "linear" + BackoffExponential BackoffType = "exponential" +) + +// RetryPolicy defines how a step should be retried on failure. +type RetryPolicy struct { + // MaxAttempts is the maximum number of retry attempts (including initial). + MaxAttempts int `json:"max_attempts"` + // BackoffType is the type of backoff between retries. + BackoffType BackoffType `json:"backoff_type"` + // InitialDelay is the initial delay between retries. + InitialDelay time.Duration `json:"initial_delay"` + // MaxDelay is the maximum delay between retries. + MaxDelay time.Duration `json:"max_delay"` +} + +// DefaultRetryPolicy returns a sensible default retry policy. +func DefaultRetryPolicy() RetryPolicy { + return RetryPolicy{ + MaxAttempts: 3, + BackoffType: BackoffExponential, + InitialDelay: 5 * time.Second, + MaxDelay: 60 * time.Second, + } +} + +// SagaStep represents a single step in a saga. +type SagaStep struct { + // ID is the unique identifier for this step instance. + ID string `json:"id"` + // SagaID is the saga this step belongs to. + SagaID string `json:"saga_id"` + // Name is the step name (unique within the saga). + Name string `json:"name"` + // Status is the current step status. + Status StepStatus `json:"status"` + // Action is the step action type (api, wait_pipeline, wait_build, shell). + Action string `json:"action"` + // DependsOn lists step names that must complete before this step can run. + DependsOn []string `json:"depends_on,omitempty"` + // RetryPolicy defines the retry behavior for this step. + RetryPolicy RetryPolicy `json:"retry_policy"` + // Compensate is the name of the compensation step to run on rollback. + Compensate string `json:"compensate,omitempty"` + // Config contains action-specific configuration. + Config map[string]any `json:"config,omitempty"` + // Output contains the step output after completion. + Output map[string]any `json:"output,omitempty"` + // Error contains the error message if the step failed. + Error string `json:"error,omitempty"` + // RetryCount is the number of times this step has been retried. + RetryCount int `json:"retry_count"` + // StartedAt is when the step started executing. + StartedAt *time.Time `json:"started_at,omitempty"` + // CompletedAt is when the step finished (success or failure). + CompletedAt *time.Time `json:"completed_at,omitempty"` +} + +// CanRun returns true if this step is ready to execute. +func (s *SagaStep) CanRun(completedSteps map[string]bool) bool { + if s.Status != StepStatusPending { + return false + } + for _, dep := range s.DependsOn { + if !completedSteps[dep] { + return false + } + } + return true +} + +// Saga represents a multi-step workflow with compensation support. +type Saga struct { + // ID is the unique identifier for this saga. + ID string `json:"id"` + // Name is the saga name (from the definition). + Name string `json:"name"` + // Status is the current saga status. + Status SagaStatus `json:"status"` + // Definition is the YAML definition used to create this saga. + Definition string `json:"definition,omitempty"` + // Vars contains template variables for step configuration. + Vars map[string]string `json:"vars,omitempty"` + // Outputs contains outputs from completed steps, keyed by step name. + Outputs map[string]map[string]any `json:"outputs,omitempty"` + // CurrentStep is the name of the currently executing step. + CurrentStep string `json:"current_step,omitempty"` + // RetryCount is the number of times the saga has been retried. + RetryCount int `json:"retry_count"` + // MaxRetries is the maximum number of saga-level retries. + MaxRetries int `json:"max_retries"` + // Error contains the error message if the saga failed. + Error string `json:"error,omitempty"` + // Steps contains all steps in this saga. + Steps []SagaStep `json:"steps,omitempty"` + // CreatedAt is when the saga was created. + CreatedAt time.Time `json:"created_at"` + // UpdatedAt is when the saga was last updated. + UpdatedAt time.Time `json:"updated_at"` + // CompletedAt is when the saga finished (success, failure, or compensation). + CompletedAt *time.Time `json:"completed_at,omitempty"` +} + +// CompletedSteps returns a map of step names that have completed (successfully or skipped). +// Skipped steps are considered complete for dependency resolution purposes. +func (s *Saga) CompletedSteps() map[string]bool { + completed := make(map[string]bool) + for _, step := range s.Steps { + if step.Status == StepStatusCompleted || step.Status == StepStatusSkipped { + completed[step.Name] = true + } + } + return completed +} + +// GetStep returns a step by name, or nil if not found. +func (s *Saga) GetStep(name string) *SagaStep { + for i := range s.Steps { + if s.Steps[i].Name == name { + return &s.Steps[i] + } + } + return nil +} + +// FailedStep returns the first failed step, or nil if none failed. +func (s *Saga) FailedStep() *SagaStep { + for i := range s.Steps { + if s.Steps[i].Status == StepStatusFailed { + return &s.Steps[i] + } + } + return nil +} + +// RunnableSteps returns steps that are ready to execute. +func (s *Saga) RunnableSteps() []SagaStep { + completed := s.CompletedSteps() + var runnable []SagaStep + for _, step := range s.Steps { + if step.CanRun(completed) { + runnable = append(runnable, step) + } + } + return runnable +} + +// SagaFilters specifies criteria for listing sagas. +type SagaFilters struct { + // Name filters by saga name. + Name string + // Status filters by saga status. + Status SagaStatus + // Since filters sagas created after this time. + Since time.Time + // Limit is the maximum number of sagas to return. + Limit int +} + +// DefaultSagaFilters returns filters with default values. +func DefaultSagaFilters() SagaFilters { + return SagaFilters{ + Limit: 50, + } +} + +// Normalize applies defaults and limits to the filters. +func (f *SagaFilters) Normalize() { + if f.Limit <= 0 { + f.Limit = 50 + } + if f.Limit > 200 { + f.Limit = 200 + } +} diff --git a/internal/domain/webhook.go b/internal/domain/webhook.go index a449f36..bee23d8 100644 --- a/internal/domain/webhook.go +++ b/internal/domain/webhook.go @@ -22,6 +22,17 @@ const ( WebhookEventCommandFailed WebhookEventType = "command.failed" WebhookEventPodReady WebhookEventType = "pod.ready" WebhookEventPodFailed WebhookEventType = "pod.failed" + + // Saga events + WebhookEventSagaStarted WebhookEventType = "saga.started" + WebhookEventSagaStepStarted WebhookEventType = "saga.step.started" + WebhookEventSagaStepCompleted WebhookEventType = "saga.step.completed" + WebhookEventSagaStepFailed WebhookEventType = "saga.step.failed" + WebhookEventSagaStepRetrying WebhookEventType = "saga.step.retrying" + WebhookEventSagaCompleted WebhookEventType = "saga.completed" + WebhookEventSagaFailed WebhookEventType = "saga.failed" + WebhookEventSagaCompensating WebhookEventType = "saga.compensating" + WebhookEventSagaCompensated WebhookEventType = "saga.compensated" ) // AllWebhookEventTypes lists all valid webhook event types. @@ -31,6 +42,15 @@ var AllWebhookEventTypes = []WebhookEventType{ WebhookEventCommandFailed, WebhookEventPodReady, WebhookEventPodFailed, + WebhookEventSagaStarted, + WebhookEventSagaStepStarted, + WebhookEventSagaStepCompleted, + WebhookEventSagaStepFailed, + WebhookEventSagaStepRetrying, + WebhookEventSagaCompleted, + WebhookEventSagaFailed, + WebhookEventSagaCompensating, + WebhookEventSagaCompensated, } // IsValid checks if a webhook event type is valid. @@ -136,6 +156,18 @@ type PodEventData struct { Timestamp time.Time `json:"timestamp"` } +// SagaEventData is the data structure for saga-related webhook events. +type SagaEventData struct { + SagaID string `json:"saga_id"` + SagaName string `json:"saga_name"` + Status string `json:"status"` + StepName string `json:"step_name,omitempty"` + StepStatus string `json:"step_status,omitempty"` + RetryCount int `json:"retry_count,omitempty"` + Error string `json:"error,omitempty"` + Timestamp time.Time `json:"timestamp"` +} + // WebhookFilters contains filter options for listing webhook deliveries. type WebhookDeliveryFilters struct { EventType *WebhookEventType // Filter by event type diff --git a/internal/handlers/health.go b/internal/handlers/health.go index e8bfa5e..43df98f 100644 --- a/internal/handlers/health.go +++ b/internal/handlers/health.go @@ -8,6 +8,7 @@ import ( "strings" "time" + "github.com/orchard9/rdev/internal/circuitbreaker" "github.com/orchard9/rdev/internal/domain" "github.com/orchard9/rdev/internal/metrics" "github.com/orchard9/rdev/internal/port" @@ -306,3 +307,17 @@ type ReadinessResponse struct { Service string `json:"service"` Checks map[string]CheckResult `json:"checks,omitempty"` } + +// CircuitsResponse is the response for the /health/circuits endpoint. +type CircuitsResponse struct { + Circuits []circuitbreaker.StatusEntry `json:"circuits"` +} + +// Circuits returns the status of all circuit breakers. +// GET /health/circuits +func (h *HealthHandler) Circuits(w http.ResponseWriter, r *http.Request) { + entries := circuitbreaker.GlobalRegistry.AllStatus() + api.WriteSuccess(w, r, CircuitsResponse{ + Circuits: entries, + }) +} diff --git a/internal/handlers/infrastructure.go b/internal/handlers/infrastructure.go index b9f18c6..3792f69 100644 --- a/internal/handlers/infrastructure.go +++ b/internal/handlers/infrastructure.go @@ -122,13 +122,15 @@ func (h *InfrastructureHandler) Mount(r api.Router) { r.With(auth.RequireScope(auth.ScopeProjectsExecute, auth.ScopeAdmin)). Delete("/projects/{id}/domains/{domain}", h.RemoveDomainAlias) - // CI pipeline endpoints (read-only) + // CI pipeline endpoints r.With(auth.RequireScope(auth.ScopeProjectsRead, auth.ScopeAdmin)). Get("/projects/{id}/pipelines", h.ListPipelines) r.With(auth.RequireScope(auth.ScopeProjectsRead, auth.ScopeAdmin)). Get("/projects/{id}/pipelines/{number}", h.GetPipeline) r.With(auth.RequireScope(auth.ScopeProjectsRead, auth.ScopeAdmin)). Get("/projects/{id}/pipelines/{number}/steps", h.GetPipelineSteps) + r.With(auth.RequireScope(auth.ScopeProjectsExecute, auth.ScopeAdmin)). + Post("/projects/{id}/pipelines/{number}/retry", h.RetryPipeline) } // CreateRepoRequest is the request body for POST /projects/{id}/repo. diff --git a/internal/handlers/infrastructure_pipelines.go b/internal/handlers/infrastructure_pipelines.go index 5ddbfd1..e6c2799 100644 --- a/internal/handlers/infrastructure_pipelines.go +++ b/internal/handlers/infrastructure_pipelines.go @@ -214,3 +214,48 @@ func (h *InfrastructureHandler) GetPipelineSteps(w http.ResponseWriter, r *http. Steps: respSteps, }) } + +// RetryPipeline restarts a failed or stopped pipeline. +// POST /projects/{id}/pipelines/{number}/retry +func (h *InfrastructureHandler) RetryPipeline(w http.ResponseWriter, r *http.Request) { + projectID := chi.URLParam(r, "id") + numberStr := chi.URLParam(r, "number") + ctx, cancel := context.WithTimeout(r.Context(), TimeoutStandard) + defer cancel() + + if err := validateProjectID(projectID); err != nil { + api.WriteBadRequest(w, r, err.Error()) + return + } + + number, err := strconv.ParseInt(numberStr, 10, 64) + if err != nil { + api.WriteBadRequest(w, r, "invalid pipeline number") + return + } + + if h.ciProvider == nil { + api.WriteInternalError(w, r, "CI provider not configured") + return + } + + p, err := h.ciProvider.RetryPipeline(ctx, h.defaultGitOwner, projectID, number) + if err != nil { + api.WriteInternalError(w, r, fmt.Sprintf("failed to retry pipeline: %v", err)) + return + } + + api.WriteSuccess(w, r, PipelineResponse{ + ID: p.ID, + Number: p.Number, + Status: p.Status, + Event: p.Event, + Branch: p.Branch, + Commit: p.Commit, + Message: p.Message, + Author: p.Author, + Started: formatTime(p.Started), + Finished: formatTime(p.Finished), + Errors: mapPipelineErrors(p.Errors), + }) +} diff --git a/internal/handlers/infrastructure_pipelines_test.go b/internal/handlers/infrastructure_pipelines_test.go index 33dced6..3efb7f5 100644 --- a/internal/handlers/infrastructure_pipelines_test.go +++ b/internal/handlers/infrastructure_pipelines_test.go @@ -96,6 +96,28 @@ func (m *mockCIProvider) GetPipelineSteps(_ context.Context, owner, repo string, return nil, fmt.Errorf("pipeline %d not found", number) } +func (m *mockCIProvider) RetryPipeline(_ context.Context, owner, repo string, number int64) (*domain.CIPipeline, error) { + if m.err != nil { + return nil, m.err + } + key := owner + "/" + repo + for _, p := range m.pipelines[key] { + if p.Number == number { + // Simulate retry by returning the pipeline with running status + return &domain.CIPipeline{ + ID: p.ID, + Number: p.Number, + Status: "running", + Event: p.Event, + Branch: p.Branch, + Commit: p.Commit, + Author: p.Author, + }, nil + } + } + return nil, fmt.Errorf("pipeline %d not found", number) +} + func setupInfraHandlerWithCI(ci port.CIProvider) chi.Router { h := NewInfrastructureHandler(nil, nil, nil, nil, ci, nil, InfrastructureConfig{ DefaultGitOwner: "threesix", diff --git a/internal/handlers/saga.go b/internal/handlers/saga.go new file mode 100644 index 0000000..99ccfe4 --- /dev/null +++ b/internal/handlers/saga.go @@ -0,0 +1,438 @@ +// Package handlers provides HTTP handlers for the rdev API. +package handlers + +import ( + "context" + "net/http" + + "github.com/go-chi/chi/v5" + "github.com/orchard9/rdev/internal/adapter/postgres" + "github.com/orchard9/rdev/internal/auth" + "github.com/orchard9/rdev/internal/domain" + "github.com/orchard9/rdev/internal/port" + "github.com/orchard9/rdev/internal/validate" + "github.com/orchard9/rdev/pkg/api" +) + +// SagaHandler handles saga API endpoints. +type SagaHandler struct { + repo port.SagaRepository + executor port.SagaExecutor +} + +// NewSagaHandler creates a new saga handler. +func NewSagaHandler(repo port.SagaRepository, executor port.SagaExecutor) *SagaHandler { + return &SagaHandler{ + repo: repo, + executor: executor, + } +} + +// Mount registers the saga routes. +func (h *SagaHandler) Mount(r api.Router) { + r.Route("/sagas", func(r chi.Router) { + // Create and list sagas + r.With(auth.RequireScope(auth.ScopeProjectsExecute, auth.ScopeAdmin)).Post("/", h.Create) + r.With(auth.RequireScope(auth.ScopeProjectsRead, auth.ScopeAdmin)).Get("/", h.List) + + // Single saga operations + r.With(auth.RequireScope(auth.ScopeProjectsRead, auth.ScopeAdmin)).Get("/{id}", h.Get) + r.With(auth.RequireScope(auth.ScopeProjectsExecute, auth.ScopeAdmin)).Delete("/{id}", h.Delete) + + // Saga control operations + r.With(auth.RequireScope(auth.ScopeProjectsExecute, auth.ScopeAdmin)).Post("/{id}/retry", h.Retry) + r.With(auth.RequireScope(auth.ScopeProjectsExecute, auth.ScopeAdmin)).Post("/{id}/rollback", h.Rollback) + + // Step operations + r.With(auth.RequireScope(auth.ScopeProjectsExecute, auth.ScopeAdmin)).Post("/{id}/steps/{step}/retry", h.RetryStep) + r.With(auth.RequireScope(auth.ScopeProjectsExecute, auth.ScopeAdmin)).Post("/{id}/steps/{step}/skip", h.SkipStep) + }) +} + +// CreateSagaRequest is the request body for POST /sagas. +type CreateSagaRequest struct { + Name string `json:"name"` + Definition string `json:"definition,omitempty"` + Vars map[string]string `json:"vars,omitempty"` + MaxRetries int `json:"max_retries,omitempty"` + Steps []CreateStepSpec `json:"steps,omitempty"` +} + +// CreateStepSpec specifies a step in the saga. +type CreateStepSpec struct { + Name string `json:"name"` + Action string `json:"action"` + DependsOn []string `json:"depends_on,omitempty"` + Compensate string `json:"compensate,omitempty"` + Config map[string]any `json:"config,omitempty"` + RetryPolicy *domain.RetryPolicy `json:"retry_policy,omitempty"` +} + +// SagaResponse is the API response for a saga. +type SagaResponse struct { + ID string `json:"id"` + Name string `json:"name"` + Status string `json:"status"` + CurrentStep string `json:"current_step,omitempty"` + RetryCount int `json:"retry_count"` + MaxRetries int `json:"max_retries"` + Error string `json:"error,omitempty"` + Steps []StepResponse `json:"steps,omitempty"` + Outputs map[string]map[string]any `json:"outputs,omitempty"` + CreatedAt string `json:"created_at"` + UpdatedAt string `json:"updated_at"` + CompletedAt string `json:"completed_at,omitempty"` +} + +// StepResponse is the API response for a saga step. +type StepResponse struct { + ID string `json:"id"` + Name string `json:"name"` + Status string `json:"status"` + Action string `json:"action"` + DependsOn []string `json:"depends_on,omitempty"` + Compensate string `json:"compensate,omitempty"` + RetryCount int `json:"retry_count"` + Output map[string]any `json:"output,omitempty"` + Error string `json:"error,omitempty"` + StartedAt string `json:"started_at,omitempty"` + CompletedAt string `json:"completed_at,omitempty"` +} + +// Create creates and starts a new saga. +// POST /sagas +func (h *SagaHandler) Create(w http.ResponseWriter, r *http.Request) { + ctx, cancel := context.WithTimeout(r.Context(), TimeoutOrchestration) + defer cancel() + + var req CreateSagaRequest + if err := api.DecodeJSON(r, &req); err != nil { + api.WriteBadRequest(w, r, "invalid request body") + return + } + + v := validate.New() + v.Required(req.Name, "name") + if len(req.Steps) > 0 { + for i, step := range req.Steps { + v.Required(step.Name, "steps["+string(rune('0'+i))+"].name") + v.Required(step.Action, "steps["+string(rune('0'+i))+"].action") + } + } + if err := v.Error(); err != nil { + api.WriteBadRequest(w, r, err.Error()) + return + } + + // Build saga from request + saga := &domain.Saga{ + Name: req.Name, + Status: domain.SagaStatusPending, + Definition: req.Definition, + Vars: req.Vars, + MaxRetries: req.MaxRetries, + Outputs: make(map[string]map[string]any), + } + + if saga.MaxRetries <= 0 { + saga.MaxRetries = 3 + } + + // Build steps + for _, spec := range req.Steps { + step := domain.SagaStep{ + Name: spec.Name, + Status: domain.StepStatusPending, + Action: spec.Action, + DependsOn: spec.DependsOn, + Compensate: spec.Compensate, + Config: spec.Config, + } + if spec.RetryPolicy != nil { + step.RetryPolicy = *spec.RetryPolicy + } else { + step.RetryPolicy = domain.DefaultRetryPolicy() + } + saga.Steps = append(saga.Steps, step) + } + + // Create saga in database + if err := h.repo.Create(ctx, saga); err != nil { + api.WriteInternalError(w, r, "failed to create saga: "+err.Error()) + return + } + + // Start execution asynchronously (don't wait for completion) + go func() { + execCtx := context.WithoutCancel(ctx) + if err := h.executor.Execute(execCtx, saga); err != nil { + // Error is already recorded in the saga + _ = err + } + }() + + api.WriteCreated(w, r, sagaToResponse(saga)) +} + +// Get returns a saga by ID. +// GET /sagas/{id} +func (h *SagaHandler) Get(w http.ResponseWriter, r *http.Request) { + ctx, cancel := context.WithTimeout(r.Context(), TimeoutLookup) + defer cancel() + + id := chi.URLParam(r, "id") + if id == "" { + api.WriteBadRequest(w, r, "id is required") + return + } + + saga, err := h.repo.Get(ctx, id) + if err != nil { + if err == postgres.ErrSagaNotFound { + api.WriteNotFound(w, r, "saga not found") + return + } + api.WriteInternalError(w, r, "failed to get saga: "+err.Error()) + return + } + + api.WriteSuccess(w, r, sagaToResponse(saga)) +} + +// List returns sagas matching filters. +// GET /sagas +func (h *SagaHandler) List(w http.ResponseWriter, r *http.Request) { + ctx, cancel := context.WithTimeout(r.Context(), TimeoutLookup) + defer cancel() + + filters := domain.SagaFilters{ + Name: r.URL.Query().Get("name"), + Status: domain.SagaStatus(r.URL.Query().Get("status")), + } + filters.Normalize() + + sagas, err := h.repo.List(ctx, filters) + if err != nil { + api.WriteInternalError(w, r, "failed to list sagas: "+err.Error()) + return + } + + resp := make([]SagaResponse, len(sagas)) + for i, saga := range sagas { + resp[i] = sagaToResponse(saga) + } + + api.WriteSuccess(w, r, resp) +} + +// Delete removes a saga. +// DELETE /sagas/{id} +func (h *SagaHandler) Delete(w http.ResponseWriter, r *http.Request) { + ctx, cancel := context.WithTimeout(r.Context(), TimeoutStandard) + defer cancel() + + id := chi.URLParam(r, "id") + if id == "" { + api.WriteBadRequest(w, r, "id is required") + return + } + + if err := h.repo.Delete(ctx, id); err != nil { + if err == postgres.ErrSagaNotFound { + api.WriteNotFound(w, r, "saga not found") + return + } + api.WriteInternalError(w, r, "failed to delete saga: "+err.Error()) + return + } + + api.WriteSuccess(w, r, map[string]string{ + "status": "deleted", + "id": id, + }) +} + +// Retry resumes a failed saga from the last failed step. +// POST /sagas/{id}/retry +func (h *SagaHandler) Retry(w http.ResponseWriter, r *http.Request) { + ctx, cancel := context.WithTimeout(r.Context(), TimeoutOrchestration) + defer cancel() + + id := chi.URLParam(r, "id") + if id == "" { + api.WriteBadRequest(w, r, "id is required") + return + } + + // Start resumption asynchronously + go func() { + execCtx := context.WithoutCancel(ctx) + if err := h.executor.Resume(execCtx, id); err != nil { + // Error is already recorded in the saga + _ = err + } + }() + + // Return current saga state + saga, err := h.repo.Get(ctx, id) + if err != nil { + if err == postgres.ErrSagaNotFound { + api.WriteNotFound(w, r, "saga not found") + return + } + api.WriteInternalError(w, r, "failed to get saga: "+err.Error()) + return + } + + api.WriteSuccess(w, r, sagaToResponse(saga)) +} + +// Rollback triggers compensation for a failed saga. +// POST /sagas/{id}/rollback +func (h *SagaHandler) Rollback(w http.ResponseWriter, r *http.Request) { + ctx, cancel := context.WithTimeout(r.Context(), TimeoutOrchestration) + defer cancel() + + id := chi.URLParam(r, "id") + if id == "" { + api.WriteBadRequest(w, r, "id is required") + return + } + + // Start compensation asynchronously + go func() { + execCtx := context.WithoutCancel(ctx) + if err := h.executor.Compensate(execCtx, id); err != nil { + // Error is already recorded in the saga + _ = err + } + }() + + // Return current saga state + saga, err := h.repo.Get(ctx, id) + if err != nil { + if err == postgres.ErrSagaNotFound { + api.WriteNotFound(w, r, "saga not found") + return + } + api.WriteInternalError(w, r, "failed to get saga: "+err.Error()) + return + } + + api.WriteSuccess(w, r, sagaToResponse(saga)) +} + +// RetryStep retries a specific failed step. +// POST /sagas/{id}/steps/{step}/retry +func (h *SagaHandler) RetryStep(w http.ResponseWriter, r *http.Request) { + ctx, cancel := context.WithTimeout(r.Context(), TimeoutOrchestration) + defer cancel() + + id := chi.URLParam(r, "id") + step := chi.URLParam(r, "step") + if id == "" || step == "" { + api.WriteBadRequest(w, r, "id and step are required") + return + } + + // Start retry asynchronously + go func() { + execCtx := context.WithoutCancel(ctx) + if err := h.executor.RetryStep(execCtx, id, step); err != nil { + // Error is already recorded in the saga + _ = err + } + }() + + // Return current saga state + saga, err := h.repo.Get(ctx, id) + if err != nil { + if err == postgres.ErrSagaNotFound { + api.WriteNotFound(w, r, "saga not found") + return + } + api.WriteInternalError(w, r, "failed to get saga: "+err.Error()) + return + } + + api.WriteSuccess(w, r, sagaToResponse(saga)) +} + +// SkipStep skips a step and continues execution. +// POST /sagas/{id}/steps/{step}/skip +func (h *SagaHandler) SkipStep(w http.ResponseWriter, r *http.Request) { + ctx, cancel := context.WithTimeout(r.Context(), TimeoutOrchestration) + defer cancel() + + id := chi.URLParam(r, "id") + step := chi.URLParam(r, "step") + if id == "" || step == "" { + api.WriteBadRequest(w, r, "id and step are required") + return + } + + // Start skip and continue asynchronously + go func() { + execCtx := context.WithoutCancel(ctx) + if err := h.executor.SkipStep(execCtx, id, step); err != nil { + // Error is already recorded in the saga + _ = err + } + }() + + // Return current saga state + saga, err := h.repo.Get(ctx, id) + if err != nil { + if err == postgres.ErrSagaNotFound { + api.WriteNotFound(w, r, "saga not found") + return + } + api.WriteInternalError(w, r, "failed to get saga: "+err.Error()) + return + } + + api.WriteSuccess(w, r, sagaToResponse(saga)) +} + +// sagaToResponse converts a domain saga to an API response. +func sagaToResponse(saga *domain.Saga) SagaResponse { + resp := SagaResponse{ + ID: saga.ID, + Name: saga.Name, + Status: string(saga.Status), + CurrentStep: saga.CurrentStep, + RetryCount: saga.RetryCount, + MaxRetries: saga.MaxRetries, + Error: saga.Error, + Outputs: saga.Outputs, + CreatedAt: saga.CreatedAt.Format("2006-01-02T15:04:05Z07:00"), + UpdatedAt: saga.UpdatedAt.Format("2006-01-02T15:04:05Z07:00"), + } + + if saga.CompletedAt != nil { + resp.CompletedAt = saga.CompletedAt.Format("2006-01-02T15:04:05Z07:00") + } + + for _, step := range saga.Steps { + stepResp := StepResponse{ + ID: step.ID, + Name: step.Name, + Status: string(step.Status), + Action: step.Action, + DependsOn: step.DependsOn, + Compensate: step.Compensate, + RetryCount: step.RetryCount, + Output: step.Output, + Error: step.Error, + } + if step.StartedAt != nil { + stepResp.StartedAt = step.StartedAt.Format("2006-01-02T15:04:05Z07:00") + } + if step.CompletedAt != nil { + stepResp.CompletedAt = step.CompletedAt.Format("2006-01-02T15:04:05Z07:00") + } + resp.Steps = append(resp.Steps, stepResp) + } + + return resp +} diff --git a/internal/logging/fields.go b/internal/logging/fields.go index 90765d6..03fc875 100644 --- a/internal/logging/fields.go +++ b/internal/logging/fields.go @@ -69,4 +69,9 @@ const ( FieldAuditResource = "audit_resource" FieldAuditResult = "audit_result" FieldAuditDetails = "audit_details" + + // Saga context + FieldSagaID = "saga_id" + FieldSagaName = "saga_name" + FieldStepName = "step_name" ) diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 4456487..ffeb4b6 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -158,6 +158,28 @@ var ( Name: "rdev_external_system_last_check_timestamp", Help: "Unix timestamp of last health check", }, []string{"system"}) + + // Saga metrics + sagaTotal = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "rdev_saga_total", + Help: "Total number of sagas by name and final status", + }, []string{"name", "status"}) + + sagaStepDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "rdev_saga_step_duration_seconds", + Help: "Duration of saga step execution in seconds", + Buckets: prometheus.ExponentialBuckets(0.1, 2, 15), // 0.1s to ~27min + }, []string{"saga", "step", "action"}) + + sagaRetryTotal = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "rdev_saga_retry_total", + Help: "Total number of saga step retries", + }, []string{"saga", "step"}) + + circuitBreakerState = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "rdev_circuit_breaker_state", + Help: "Circuit breaker state: 0=closed, 1=half-open, 2=open", + }, []string{"name"}) ) // RecordCommand records a command execution. @@ -275,6 +297,27 @@ func SetExternalSystemHealth(system string, healthy bool, latencySeconds float64 externalSystemLastCheck.WithLabelValues(system).Set(float64(time.Now().Unix())) } +// RecordSaga records a saga completion. +func RecordSaga(name, status string) { + sagaTotal.WithLabelValues(name, status).Inc() +} + +// RecordSagaStepDuration records the duration of a saga step. +func RecordSagaStepDuration(saga, step, action string, durationMs int64) { + sagaStepDuration.WithLabelValues(saga, step, action).Observe(float64(durationMs) / 1000.0) +} + +// RecordSagaRetry records a saga step retry. +func RecordSagaRetry(saga, step string) { + sagaRetryTotal.WithLabelValues(saga, step).Inc() +} + +// SetCircuitBreakerState sets the circuit breaker state metric. +// state: 0=closed, 1=half-open, 2=open +func SetCircuitBreakerState(name string, state int) { + circuitBreakerState.WithLabelValues(name).Set(float64(state)) +} + // Handler returns the Prometheus HTTP handler. func Handler() http.Handler { return promhttp.Handler() diff --git a/internal/port/ci_provider.go b/internal/port/ci_provider.go index a4acd18..c7b7cb7 100644 --- a/internal/port/ci_provider.go +++ b/internal/port/ci_provider.go @@ -43,4 +43,8 @@ type CIProvider interface { // TriggerBuild manually starts a new pipeline build on the specified branch. // Returns the pipeline number of the triggered build. TriggerBuild(ctx context.Context, owner, repo, branch string) (int64, error) + + // RetryPipeline restarts a failed or stopped pipeline. + // Returns the restarted pipeline information. + RetryPipeline(ctx context.Context, owner, repo string, number int64) (*domain.CIPipeline, error) } diff --git a/internal/port/saga.go b/internal/port/saga.go new file mode 100644 index 0000000..45c1f2b --- /dev/null +++ b/internal/port/saga.go @@ -0,0 +1,50 @@ +// Package port defines interfaces (ports) for external dependencies. +package port + +import ( + "context" + + "github.com/orchard9/rdev/internal/domain" +) + +// SagaRepository manages saga persistence. +type SagaRepository interface { + // Create creates a new saga with its steps. + Create(ctx context.Context, saga *domain.Saga) error + + // Get returns a saga by ID, including all steps. + Get(ctx context.Context, id string) (*domain.Saga, error) + + // Update updates a saga's status and metadata (not steps). + Update(ctx context.Context, saga *domain.Saga) error + + // UpdateStep updates a single step's status and output. + UpdateStep(ctx context.Context, step *domain.SagaStep) error + + // List returns sagas matching the given filters. + List(ctx context.Context, filters domain.SagaFilters) ([]*domain.Saga, error) + + // Delete removes a saga and its steps. + Delete(ctx context.Context, id string) error + + // GetPendingSteps returns steps ready to execute (no unmet dependencies). + GetPendingSteps(ctx context.Context, sagaID string) ([]domain.SagaStep, error) +} + +// SagaExecutor executes saga workflows. +type SagaExecutor interface { + // Execute runs a saga from the beginning. + Execute(ctx context.Context, saga *domain.Saga) error + + // Resume continues execution of a paused or failed saga. + Resume(ctx context.Context, sagaID string) error + + // Compensate runs compensation steps for a failed saga. + Compensate(ctx context.Context, sagaID string) error + + // RetryStep retries a specific failed step. + RetryStep(ctx context.Context, sagaID, stepName string) error + + // SkipStep skips a step and continues execution. + SkipStep(ctx context.Context, sagaID, stepName string) error +} diff --git a/internal/service/diagnostics_service_test.go b/internal/service/diagnostics_service_test.go index fcf2fb7..89e0008 100644 --- a/internal/service/diagnostics_service_test.go +++ b/internal/service/diagnostics_service_test.go @@ -130,6 +130,10 @@ func (m *mockCIProvider) TriggerBuild(_ context.Context, _, _, _ string) (int64, return 0, nil } +func (m *mockCIProvider) RetryPipeline(_ context.Context, _, _ string, _ int64) (*domain.CIPipeline, error) { + return nil, nil +} + func TestDiagnosticsService_GetDiagnostics_Healthy(t *testing.T) { opRepo := &mockOperationRepo{ operations: []*domain.Operation{ diff --git a/internal/service/saga_executor.go b/internal/service/saga_executor.go new file mode 100644 index 0000000..cc8e98f --- /dev/null +++ b/internal/service/saga_executor.go @@ -0,0 +1,438 @@ +// Package service provides business logic orchestration. +package service + +import ( + "context" + "errors" + "fmt" + "log/slog" + "time" + + "github.com/orchard9/rdev/internal/circuitbreaker" + "github.com/orchard9/rdev/internal/domain" + "github.com/orchard9/rdev/internal/logging" + "github.com/orchard9/rdev/internal/port" +) + +// SagaExecutor executes saga workflows with retry and compensation support. +type SagaExecutor struct { + repo port.SagaRepository + cbRegistry *circuitbreaker.Registry + logger *slog.Logger + + // Action handlers registered by action type + handlers map[string]SagaStepHandler +} + +// SagaStepHandler executes a saga step action. +type SagaStepHandler interface { + // Execute runs the step action and returns the output. + // The context should be used for cancellation. + Execute(ctx context.Context, step *domain.SagaStep, saga *domain.Saga) (map[string]any, error) +} + +// SagaStepHandlerFunc is an adapter for using functions as SagaStepHandler. +type SagaStepHandlerFunc func(ctx context.Context, step *domain.SagaStep, saga *domain.Saga) (map[string]any, error) + +// Execute implements SagaStepHandler. +func (f SagaStepHandlerFunc) Execute(ctx context.Context, step *domain.SagaStep, saga *domain.Saga) (map[string]any, error) { + return f(ctx, step, saga) +} + +// NewSagaExecutor creates a new saga executor. +func NewSagaExecutor(repo port.SagaRepository, logger *slog.Logger) *SagaExecutor { + if logger == nil { + logger = slog.Default() + } + return &SagaExecutor{ + repo: repo, + cbRegistry: circuitbreaker.GlobalRegistry, + logger: logger, + handlers: make(map[string]SagaStepHandler), + } +} + +// RegisterHandler registers a handler for a step action type. +func (e *SagaExecutor) RegisterHandler(action string, handler SagaStepHandler) { + e.handlers[action] = handler +} + +// Ensure SagaExecutor implements port.SagaExecutor. +var _ port.SagaExecutor = (*SagaExecutor)(nil) + +// Execute runs a saga from the beginning. +func (e *SagaExecutor) Execute(ctx context.Context, saga *domain.Saga) error { + logger := e.logger.With( + logging.FieldSagaID, saga.ID, + logging.FieldSagaName, saga.Name, + ) + + // Mark saga as running + saga.Status = domain.SagaStatusRunning + if err := e.repo.Update(ctx, saga); err != nil { + return fmt.Errorf("update saga status: %w", err) + } + + logger.Info("saga execution started") + + // Execute steps until all complete or one fails + return e.runSteps(ctx, saga) +} + +// Resume continues execution of a paused or failed saga. +func (e *SagaExecutor) Resume(ctx context.Context, sagaID string) error { + saga, err := e.repo.Get(ctx, sagaID) + if err != nil { + return fmt.Errorf("get saga: %w", err) + } + + logger := e.logger.With( + logging.FieldSagaID, saga.ID, + logging.FieldSagaName, saga.Name, + ) + + // Only resume if saga is in a resumable state + if saga.Status != domain.SagaStatusFailed && saga.Status != domain.SagaStatusRunning { + return fmt.Errorf("saga status %s is not resumable", saga.Status) + } + + // Increment retry count if resuming from failed state + if saga.Status == domain.SagaStatusFailed { + saga.RetryCount++ + if saga.RetryCount > saga.MaxRetries { + return fmt.Errorf("saga exceeded max retries (%d)", saga.MaxRetries) + } + } + + saga.Status = domain.SagaStatusRunning + saga.Error = "" + if err := e.repo.Update(ctx, saga); err != nil { + return fmt.Errorf("update saga status: %w", err) + } + + logger.Info("saga execution resumed", "retry_count", saga.RetryCount) + + return e.runSteps(ctx, saga) +} + +// Compensate runs compensation steps for a failed saga. +func (e *SagaExecutor) Compensate(ctx context.Context, sagaID string) error { + saga, err := e.repo.Get(ctx, sagaID) + if err != nil { + return fmt.Errorf("get saga: %w", err) + } + + logger := e.logger.With( + logging.FieldSagaID, saga.ID, + logging.FieldSagaName, saga.Name, + ) + + if saga.Status != domain.SagaStatusFailed { + return fmt.Errorf("can only compensate failed sagas (status: %s)", saga.Status) + } + + saga.Status = domain.SagaStatusCompensating + if err := e.repo.Update(ctx, saga); err != nil { + return fmt.Errorf("update saga status: %w", err) + } + + logger.Info("saga compensation started") + + // Run compensation steps in reverse order of completed steps + if err := e.runCompensation(ctx, saga); err != nil { + logger.Error("saga compensation failed", logging.FieldError, err) + return err + } + + saga.Status = domain.SagaStatusCompensated + now := time.Now() + saga.CompletedAt = &now + if err := e.repo.Update(ctx, saga); err != nil { + return fmt.Errorf("update saga status: %w", err) + } + + logger.Info("saga compensation completed") + return nil +} + +// RetryStep retries a specific failed step. +func (e *SagaExecutor) RetryStep(ctx context.Context, sagaID, stepName string) error { + saga, err := e.repo.Get(ctx, sagaID) + if err != nil { + return fmt.Errorf("get saga: %w", err) + } + + step := saga.GetStep(stepName) + if step == nil { + return fmt.Errorf("step %s not found", stepName) + } + + if step.Status != domain.StepStatusFailed { + return fmt.Errorf("step %s is not failed (status: %s)", stepName, step.Status) + } + + logger := e.logger.With( + logging.FieldSagaID, saga.ID, + logging.FieldSagaName, saga.Name, + logging.FieldStepName, stepName, + ) + + // Reset step to pending + step.Status = domain.StepStatusPending + step.Error = "" + step.RetryCount++ + if err := e.repo.UpdateStep(ctx, step); err != nil { + return fmt.Errorf("update step: %w", err) + } + + // If saga was failed, set it back to running + if saga.Status == domain.SagaStatusFailed { + saga.Status = domain.SagaStatusRunning + saga.Error = "" + if err := e.repo.Update(ctx, saga); err != nil { + return fmt.Errorf("update saga: %w", err) + } + } + + logger.Info("retrying step", "retry_count", step.RetryCount) + + // Continue execution + return e.runSteps(ctx, saga) +} + +// SkipStep skips a step and continues execution. +func (e *SagaExecutor) SkipStep(ctx context.Context, sagaID, stepName string) error { + saga, err := e.repo.Get(ctx, sagaID) + if err != nil { + return fmt.Errorf("get saga: %w", err) + } + + step := saga.GetStep(stepName) + if step == nil { + return fmt.Errorf("step %s not found", stepName) + } + + if step.Status != domain.StepStatusFailed && step.Status != domain.StepStatusPending { + return fmt.Errorf("can only skip failed or pending steps (status: %s)", step.Status) + } + + logger := e.logger.With( + logging.FieldSagaID, saga.ID, + logging.FieldSagaName, saga.Name, + logging.FieldStepName, stepName, + ) + + // Mark step as skipped + step.Status = domain.StepStatusSkipped + now := time.Now() + step.CompletedAt = &now + if err := e.repo.UpdateStep(ctx, step); err != nil { + return fmt.Errorf("update step: %w", err) + } + + // If saga was failed, set it back to running + if saga.Status == domain.SagaStatusFailed { + saga.Status = domain.SagaStatusRunning + saga.Error = "" + if err := e.repo.Update(ctx, saga); err != nil { + return fmt.Errorf("update saga: %w", err) + } + } + + logger.Info("step skipped") + + // Continue execution + return e.runSteps(ctx, saga) +} + +// runSteps executes pending steps until all complete or one fails. +func (e *SagaExecutor) runSteps(ctx context.Context, saga *domain.Saga) error { + for { + // Refresh saga state + saga, err := e.repo.Get(ctx, saga.ID) + if err != nil { + return fmt.Errorf("refresh saga: %w", err) + } + + // Get runnable steps + runnable := saga.RunnableSteps() + if len(runnable) == 0 { + // No more runnable steps + break + } + + // Execute each runnable step (could parallelize in future) + for _, step := range runnable { + if err := e.executeStep(ctx, saga, &step); err != nil { + // Step failed - saga is now failed + saga.Status = domain.SagaStatusFailed + saga.Error = err.Error() + if updateErr := e.repo.Update(ctx, saga); updateErr != nil { + e.logger.Error("failed to update saga status", logging.FieldError, updateErr) + } + return err + } + } + } + + // All steps completed (or skipped) + saga.Status = domain.SagaStatusCompleted + now := time.Now() + saga.CompletedAt = &now + if err := e.repo.Update(ctx, saga); err != nil { + return fmt.Errorf("update saga status: %w", err) + } + + e.logger.Info("saga completed", + logging.FieldSagaID, saga.ID, + logging.FieldSagaName, saga.Name, + ) + + return nil +} + +// executeStep executes a single step with retry logic. +func (e *SagaExecutor) executeStep(ctx context.Context, saga *domain.Saga, step *domain.SagaStep) error { + logger := e.logger.With( + logging.FieldSagaID, saga.ID, + logging.FieldSagaName, saga.Name, + logging.FieldStepName, step.Name, + ) + + handler, ok := e.handlers[step.Action] + if !ok { + return fmt.Errorf("no handler registered for action %s", step.Action) + } + + // Update saga current step + saga.CurrentStep = step.Name + if err := e.repo.Update(ctx, saga); err != nil { + return fmt.Errorf("update current step: %w", err) + } + + // Mark step as running + step.Status = domain.StepStatusRunning + now := time.Now() + step.StartedAt = &now + if err := e.repo.UpdateStep(ctx, step); err != nil { + return fmt.Errorf("update step status: %w", err) + } + + logger.Info("step started") + + // Execute with retry logic + var output map[string]any + var execErr error + for attempt := 0; attempt <= step.RetryPolicy.MaxAttempts; attempt++ { + if attempt > 0 { + delay := e.calculateBackoff(step.RetryPolicy, attempt) + logger.Info("retrying step", "attempt", attempt, "delay", delay) + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(delay): + } + } + + output, execErr = handler.Execute(ctx, step, saga) + if execErr == nil { + break + } + + // Check if error is from circuit breaker + if errors.Is(execErr, circuitbreaker.ErrCircuitOpen) { + logger.Warn("circuit breaker open, will retry", "attempt", attempt) + continue + } + + logger.Warn("step execution failed", "attempt", attempt, logging.FieldError, execErr) + } + + endTime := time.Now() + step.CompletedAt = &endTime + + if execErr != nil { + step.Status = domain.StepStatusFailed + step.Error = execErr.Error() + if err := e.repo.UpdateStep(ctx, step); err != nil { + logger.Error("failed to update step status", logging.FieldError, err) + } + logger.Error("step failed", logging.FieldError, execErr) + return fmt.Errorf("step %s failed: %w", step.Name, execErr) + } + + // Step succeeded + step.Status = domain.StepStatusCompleted + step.Output = output + if err := e.repo.UpdateStep(ctx, step); err != nil { + return fmt.Errorf("update step status: %w", err) + } + + // Store output in saga outputs + if saga.Outputs == nil { + saga.Outputs = make(map[string]map[string]any) + } + saga.Outputs[step.Name] = output + if err := e.repo.Update(ctx, saga); err != nil { + return fmt.Errorf("update saga outputs: %w", err) + } + + logger.Info("step completed", "duration_ms", endTime.Sub(*step.StartedAt).Milliseconds()) + return nil +} + +// runCompensation runs compensation steps in reverse order. +func (e *SagaExecutor) runCompensation(ctx context.Context, saga *domain.Saga) error { + // Get completed steps in reverse order + var completedSteps []domain.SagaStep + for _, step := range saga.Steps { + if step.Status == domain.StepStatusCompleted && step.Compensate != "" { + completedSteps = append(completedSteps, step) + } + } + + // Reverse order (compensation runs in reverse of completion) + for i, j := 0, len(completedSteps)-1; i < j; i, j = i+1, j-1 { + completedSteps[i], completedSteps[j] = completedSteps[j], completedSteps[i] + } + + // Run compensation for each + for _, step := range completedSteps { + compStep := saga.GetStep(step.Compensate) + if compStep == nil { + e.logger.Warn("compensation step not found", + logging.FieldSagaID, saga.ID, + logging.FieldStepName, step.Compensate, + ) + continue + } + + if err := e.executeStep(ctx, saga, compStep); err != nil { + return fmt.Errorf("compensation step %s failed: %w", step.Compensate, err) + } + } + + return nil +} + +// calculateBackoff calculates the delay before the next retry. +func (e *SagaExecutor) calculateBackoff(policy domain.RetryPolicy, attempt int) time.Duration { + switch policy.BackoffType { + case domain.BackoffNone: + return policy.InitialDelay + case domain.BackoffLinear: + delay := policy.InitialDelay * time.Duration(attempt) + if delay > policy.MaxDelay { + return policy.MaxDelay + } + return delay + case domain.BackoffExponential: + delay := policy.InitialDelay * time.Duration(1< policy.MaxDelay { + return policy.MaxDelay + } + return delay + default: + return policy.InitialDelay + } +}