// Package service provides business logic orchestration. package service import ( "context" "errors" "fmt" "log/slog" "time" "github.com/orchard9/rdev/internal/circuitbreaker" "github.com/orchard9/rdev/internal/domain" "github.com/orchard9/rdev/internal/logging" "github.com/orchard9/rdev/internal/port" ) // SagaExecutor executes saga workflows with retry and compensation support. type SagaExecutor struct { repo port.SagaRepository cbRegistry *circuitbreaker.Registry logger *slog.Logger // Action handlers registered by action type handlers map[string]SagaStepHandler } // SagaStepHandler executes a saga step action. type SagaStepHandler interface { // Execute runs the step action and returns the output. // The context should be used for cancellation. Execute(ctx context.Context, step *domain.SagaStep, saga *domain.Saga) (map[string]any, error) } // SagaStepHandlerFunc is an adapter for using functions as SagaStepHandler. type SagaStepHandlerFunc func(ctx context.Context, step *domain.SagaStep, saga *domain.Saga) (map[string]any, error) // Execute implements SagaStepHandler. func (f SagaStepHandlerFunc) Execute(ctx context.Context, step *domain.SagaStep, saga *domain.Saga) (map[string]any, error) { return f(ctx, step, saga) } // NewSagaExecutor creates a new saga executor. func NewSagaExecutor(repo port.SagaRepository, logger *slog.Logger) *SagaExecutor { if logger == nil { logger = slog.Default() } return &SagaExecutor{ repo: repo, cbRegistry: circuitbreaker.GlobalRegistry, logger: logger, handlers: make(map[string]SagaStepHandler), } } // RegisterHandler registers a handler for a step action type. func (e *SagaExecutor) RegisterHandler(action string, handler SagaStepHandler) { e.handlers[action] = handler } // Ensure SagaExecutor implements port.SagaExecutor. var _ port.SagaExecutor = (*SagaExecutor)(nil) // Execute runs a saga from the beginning. func (e *SagaExecutor) Execute(ctx context.Context, saga *domain.Saga) error { logger := e.logger.With( logging.FieldSagaID, saga.ID, logging.FieldSagaName, saga.Name, ) // Mark saga as running saga.Status = domain.SagaStatusRunning if err := e.repo.Update(ctx, saga); err != nil { return fmt.Errorf("update saga status: %w", err) } logger.Info("saga execution started") // Execute steps until all complete or one fails return e.runSteps(ctx, saga) } // Resume continues execution of a paused or failed saga. func (e *SagaExecutor) Resume(ctx context.Context, sagaID string) error { saga, err := e.repo.Get(ctx, sagaID) if err != nil { return fmt.Errorf("get saga: %w", err) } logger := e.logger.With( logging.FieldSagaID, saga.ID, logging.FieldSagaName, saga.Name, ) // Only resume if saga is in a resumable state if saga.Status != domain.SagaStatusFailed && saga.Status != domain.SagaStatusRunning { return fmt.Errorf("saga status %s is not resumable", saga.Status) } // Increment retry count if resuming from failed state if saga.Status == domain.SagaStatusFailed { saga.RetryCount++ if saga.RetryCount > saga.MaxRetries { return fmt.Errorf("saga exceeded max retries (%d)", saga.MaxRetries) } } saga.Status = domain.SagaStatusRunning saga.Error = "" if err := e.repo.Update(ctx, saga); err != nil { return fmt.Errorf("update saga status: %w", err) } logger.Info("saga execution resumed", "retry_count", saga.RetryCount) return e.runSteps(ctx, saga) } // Compensate runs compensation steps for a failed saga. func (e *SagaExecutor) Compensate(ctx context.Context, sagaID string) error { saga, err := e.repo.Get(ctx, sagaID) if err != nil { return fmt.Errorf("get saga: %w", err) } logger := e.logger.With( logging.FieldSagaID, saga.ID, logging.FieldSagaName, saga.Name, ) if saga.Status != domain.SagaStatusFailed { return fmt.Errorf("can only compensate failed sagas (status: %s)", saga.Status) } saga.Status = domain.SagaStatusCompensating if err := e.repo.Update(ctx, saga); err != nil { return fmt.Errorf("update saga status: %w", err) } logger.Info("saga compensation started") // Run compensation steps in reverse order of completed steps if err := e.runCompensation(ctx, saga); err != nil { logger.Error("saga compensation failed", logging.FieldError, err) return err } saga.Status = domain.SagaStatusCompensated now := time.Now() saga.CompletedAt = &now if err := e.repo.Update(ctx, saga); err != nil { return fmt.Errorf("update saga status: %w", err) } logger.Info("saga compensation completed") return nil } // RetryStep retries a specific failed step. func (e *SagaExecutor) RetryStep(ctx context.Context, sagaID, stepName string) error { saga, err := e.repo.Get(ctx, sagaID) if err != nil { return fmt.Errorf("get saga: %w", err) } step := saga.GetStep(stepName) if step == nil { return fmt.Errorf("step %s not found", stepName) } if step.Status != domain.StepStatusFailed { return fmt.Errorf("step %s is not failed (status: %s)", stepName, step.Status) } logger := e.logger.With( logging.FieldSagaID, saga.ID, logging.FieldSagaName, saga.Name, logging.FieldStepName, stepName, ) // Reset step to pending step.Status = domain.StepStatusPending step.Error = "" step.RetryCount++ if err := e.repo.UpdateStep(ctx, step); err != nil { return fmt.Errorf("update step: %w", err) } // If saga was failed, set it back to running if saga.Status == domain.SagaStatusFailed { saga.Status = domain.SagaStatusRunning saga.Error = "" if err := e.repo.Update(ctx, saga); err != nil { return fmt.Errorf("update saga: %w", err) } } logger.Info("retrying step", "retry_count", step.RetryCount) // Continue execution return e.runSteps(ctx, saga) } // SkipStep skips a step and continues execution. func (e *SagaExecutor) SkipStep(ctx context.Context, sagaID, stepName string) error { saga, err := e.repo.Get(ctx, sagaID) if err != nil { return fmt.Errorf("get saga: %w", err) } step := saga.GetStep(stepName) if step == nil { return fmt.Errorf("step %s not found", stepName) } if step.Status != domain.StepStatusFailed && step.Status != domain.StepStatusPending { return fmt.Errorf("can only skip failed or pending steps (status: %s)", step.Status) } logger := e.logger.With( logging.FieldSagaID, saga.ID, logging.FieldSagaName, saga.Name, logging.FieldStepName, stepName, ) // Mark step as skipped step.Status = domain.StepStatusSkipped now := time.Now() step.CompletedAt = &now if err := e.repo.UpdateStep(ctx, step); err != nil { return fmt.Errorf("update step: %w", err) } // If saga was failed, set it back to running if saga.Status == domain.SagaStatusFailed { saga.Status = domain.SagaStatusRunning saga.Error = "" if err := e.repo.Update(ctx, saga); err != nil { return fmt.Errorf("update saga: %w", err) } } logger.Info("step skipped") // Continue execution return e.runSteps(ctx, saga) } // runSteps executes pending steps until all complete or one fails. func (e *SagaExecutor) runSteps(ctx context.Context, saga *domain.Saga) error { for { // Refresh saga state saga, err := e.repo.Get(ctx, saga.ID) if err != nil { return fmt.Errorf("refresh saga: %w", err) } // Get runnable steps runnable := saga.RunnableSteps() if len(runnable) == 0 { // No more runnable steps break } // Execute each runnable step (could parallelize in future) for _, step := range runnable { if err := e.executeStep(ctx, saga, &step); err != nil { // Step failed - saga is now failed saga.Status = domain.SagaStatusFailed saga.Error = err.Error() if updateErr := e.repo.Update(ctx, saga); updateErr != nil { e.logger.Error("failed to update saga status", logging.FieldError, updateErr) } return err } } } // All steps completed (or skipped) saga.Status = domain.SagaStatusCompleted now := time.Now() saga.CompletedAt = &now if err := e.repo.Update(ctx, saga); err != nil { return fmt.Errorf("update saga status: %w", err) } e.logger.Info("saga completed", logging.FieldSagaID, saga.ID, logging.FieldSagaName, saga.Name, ) return nil } // executeStep executes a single step with retry logic. func (e *SagaExecutor) executeStep(ctx context.Context, saga *domain.Saga, step *domain.SagaStep) error { logger := e.logger.With( logging.FieldSagaID, saga.ID, logging.FieldSagaName, saga.Name, logging.FieldStepName, step.Name, ) handler, ok := e.handlers[step.Action] if !ok { return fmt.Errorf("no handler registered for action %s", step.Action) } // Update saga current step saga.CurrentStep = step.Name if err := e.repo.Update(ctx, saga); err != nil { return fmt.Errorf("update current step: %w", err) } // Mark step as running step.Status = domain.StepStatusRunning now := time.Now() step.StartedAt = &now if err := e.repo.UpdateStep(ctx, step); err != nil { return fmt.Errorf("update step status: %w", err) } logger.Info("step started") // Execute with retry logic var output map[string]any var execErr error for attempt := 0; attempt <= step.RetryPolicy.MaxAttempts; attempt++ { if attempt > 0 { delay := e.calculateBackoff(step.RetryPolicy, attempt) logger.Info("retrying step", "attempt", attempt, "delay", delay) select { case <-ctx.Done(): return ctx.Err() case <-time.After(delay): } } output, execErr = handler.Execute(ctx, step, saga) if execErr == nil { break } // Check if error is from circuit breaker if errors.Is(execErr, circuitbreaker.ErrCircuitOpen) { logger.Warn("circuit breaker open, will retry", "attempt", attempt) continue } logger.Warn("step execution failed", "attempt", attempt, logging.FieldError, execErr) } endTime := time.Now() step.CompletedAt = &endTime if execErr != nil { step.Status = domain.StepStatusFailed step.Error = execErr.Error() if err := e.repo.UpdateStep(ctx, step); err != nil { logger.Error("failed to update step status", logging.FieldError, err) } logger.Error("step failed", logging.FieldError, execErr) return fmt.Errorf("step %s failed: %w", step.Name, execErr) } // Step succeeded step.Status = domain.StepStatusCompleted step.Output = output if err := e.repo.UpdateStep(ctx, step); err != nil { return fmt.Errorf("update step status: %w", err) } // Store output in saga outputs if saga.Outputs == nil { saga.Outputs = make(map[string]map[string]any) } saga.Outputs[step.Name] = output if err := e.repo.Update(ctx, saga); err != nil { return fmt.Errorf("update saga outputs: %w", err) } logger.Info("step completed", "duration_ms", endTime.Sub(*step.StartedAt).Milliseconds()) return nil } // runCompensation runs compensation steps in reverse order. func (e *SagaExecutor) runCompensation(ctx context.Context, saga *domain.Saga) error { // Get completed steps in reverse order var completedSteps []domain.SagaStep for _, step := range saga.Steps { if step.Status == domain.StepStatusCompleted && step.Compensate != "" { completedSteps = append(completedSteps, step) } } // Reverse order (compensation runs in reverse of completion) for i, j := 0, len(completedSteps)-1; i < j; i, j = i+1, j-1 { completedSteps[i], completedSteps[j] = completedSteps[j], completedSteps[i] } // Run compensation for each for _, step := range completedSteps { compStep := saga.GetStep(step.Compensate) if compStep == nil { e.logger.Warn("compensation step not found", logging.FieldSagaID, saga.ID, logging.FieldStepName, step.Compensate, ) continue } if err := e.executeStep(ctx, saga, compStep); err != nil { return fmt.Errorf("compensation step %s failed: %w", step.Compensate, err) } } return nil } // calculateBackoff calculates the delay before the next retry. func (e *SagaExecutor) calculateBackoff(policy domain.RetryPolicy, attempt int) time.Duration { switch policy.BackoffType { case domain.BackoffNone: return policy.InitialDelay case domain.BackoffLinear: delay := policy.InitialDelay * time.Duration(attempt) if delay > policy.MaxDelay { return policy.MaxDelay } return delay case domain.BackoffExponential: delay := policy.InitialDelay * time.Duration(1< policy.MaxDelay { return policy.MaxDelay } return delay default: return policy.InitialDelay } }