rdev/internal/circuitbreaker/circuitbreaker.go
jordan f20fc6c51c
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
feat(saga): implement enterprise-grade resilience architecture
Fixes issues from code review of resilience implementation:

- Wire saga system in main.go (SagaRepository, SagaExecutor, SagaHandler)
- Fix CompletedSteps() to include skipped steps for dependency resolution
- Fix reverse loop bug in saga compensation (use standard swap pattern)
- Add circuit breaker state change callbacks for Prometheus metrics

Phase 1 (Build Resilience):
- Add failure:retry to all component Kaniko build steps
- Add preflight registry health check before builds
- Add services-deployed sync point to decouple docs from critical path

Phase 2 (API Resilience):
- Add pipeline retry endpoint (POST /projects/{id}/pipelines/{number}/retry)
- Wire circuit breakers with metrics callbacks
- Add /health/circuits endpoint for circuit breaker status

Phase 3 (Saga Engine):
- Full domain model (Saga, SagaStep, RetryPolicy, BackoffType)
- PostgreSQL saga repository with CRUD and step management
- Saga executor with retry, compensation, skip step support
- Saga API handlers with CRUD and control operations

Phase 4 (Observability):
- Add saga metrics (total, step_duration, retry, circuit_breaker_state)
- Add logging fields (saga_id, saga_name, step_name)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-08 01:58:02 -07:00

244 lines
5.2 KiB
Go

// Package circuitbreaker provides protection against cascading failures.
//
// The circuit breaker pattern prevents repeated calls to a failing service,
// allowing it time to recover. After a threshold of failures, the circuit
// "opens" and returns errors immediately without attempting the operation.
package circuitbreaker
import (
"errors"
"sync"
"time"
)
// State represents the circuit breaker state.
type State int
const (
// Closed is the normal operating state - requests are allowed through.
Closed State = iota
// Open means the circuit is tripped - requests fail immediately.
Open
// HalfOpen means we're testing if the service has recovered.
HalfOpen
)
func (s State) String() string {
switch s {
case Closed:
return "closed"
case Open:
return "open"
case HalfOpen:
return "half-open"
default:
return "unknown"
}
}
// Errors returned by the circuit breaker.
var (
ErrCircuitOpen = errors.New("circuit breaker is open")
)
// StateChangeCallback is called when the circuit breaker state changes.
type StateChangeCallback func(name string, from, to State)
// Config configures the circuit breaker behavior.
type Config struct {
// Name is the identifier for this circuit breaker (used in callbacks).
Name string
// FailureThreshold is the number of consecutive failures before opening.
// Default: 5
FailureThreshold int
// ResetTimeout is how long to wait before attempting recovery (half-open).
// Default: 30 seconds
ResetTimeout time.Duration
// HalfOpenRequests is how many requests to allow in half-open state.
// Default: 1
HalfOpenRequests int
// OnStateChange is called when the circuit breaker state changes.
OnStateChange StateChangeCallback
}
// DefaultConfig returns sensible defaults.
func DefaultConfig() Config {
return Config{
FailureThreshold: 5,
ResetTimeout: 30 * time.Second,
HalfOpenRequests: 1,
}
}
// CircuitBreaker implements the circuit breaker pattern.
type CircuitBreaker struct {
cfg Config
mu sync.RWMutex
state State
failures int
successes int
lastFailure time.Time
halfOpenRequests int
}
// New creates a new circuit breaker with the given configuration.
func New(cfg Config) *CircuitBreaker {
if cfg.FailureThreshold <= 0 {
cfg.FailureThreshold = 5
}
if cfg.ResetTimeout <= 0 {
cfg.ResetTimeout = 30 * time.Second
}
if cfg.HalfOpenRequests <= 0 {
cfg.HalfOpenRequests = 1
}
return &CircuitBreaker{
cfg: cfg,
state: Closed,
}
}
// Execute runs the function if the circuit allows it.
// Returns ErrCircuitOpen if the circuit is open.
func (cb *CircuitBreaker) Execute(fn func() error) error {
if !cb.canExecute() {
return ErrCircuitOpen
}
err := fn()
cb.recordResult(err)
return err
}
// canExecute checks if a request should be allowed.
func (cb *CircuitBreaker) canExecute() bool {
cb.mu.Lock()
defer cb.mu.Unlock()
switch cb.state {
case Closed:
return true
case Open:
// Check if reset timeout has passed
if time.Since(cb.lastFailure) > cb.cfg.ResetTimeout {
oldState := cb.state
cb.state = HalfOpen
cb.halfOpenRequests = 0
if cb.cfg.OnStateChange != nil {
cb.cfg.OnStateChange(cb.cfg.Name, oldState, cb.state)
}
return true
}
return false
case HalfOpen:
// Allow limited requests in half-open state
if cb.halfOpenRequests < cb.cfg.HalfOpenRequests {
cb.halfOpenRequests++
return true
}
return false
}
return false
}
// recordResult updates state based on operation outcome.
func (cb *CircuitBreaker) recordResult(err error) {
cb.mu.Lock()
defer cb.mu.Unlock()
if err != nil {
cb.onFailure()
} else {
cb.onSuccess()
}
}
// onFailure handles a failed operation.
func (cb *CircuitBreaker) onFailure() {
cb.failures++
cb.successes = 0
cb.lastFailure = time.Now()
oldState := cb.state
switch cb.state {
case Closed:
if cb.failures >= cb.cfg.FailureThreshold {
cb.state = Open
}
case HalfOpen:
cb.state = Open
}
if oldState != cb.state && cb.cfg.OnStateChange != nil {
cb.cfg.OnStateChange(cb.cfg.Name, oldState, cb.state)
}
}
// onSuccess handles a successful operation.
func (cb *CircuitBreaker) onSuccess() {
cb.successes++
oldState := cb.state
switch cb.state {
case Closed:
cb.failures = 0
case HalfOpen:
// Successful probe - close the circuit
cb.state = Closed
cb.failures = 0
}
if oldState != cb.state && cb.cfg.OnStateChange != nil {
cb.cfg.OnStateChange(cb.cfg.Name, oldState, cb.state)
}
}
// State returns the current circuit state.
func (cb *CircuitBreaker) State() State {
cb.mu.RLock()
defer cb.mu.RUnlock()
return cb.state
}
// Stats returns current circuit statistics.
func (cb *CircuitBreaker) Stats() Stats {
cb.mu.RLock()
defer cb.mu.RUnlock()
return Stats{
State: cb.state,
Failures: cb.failures,
Successes: cb.successes,
LastFailure: cb.lastFailure,
}
}
// Reset manually resets the circuit breaker to closed state.
func (cb *CircuitBreaker) Reset() {
cb.mu.Lock()
defer cb.mu.Unlock()
cb.state = Closed
cb.failures = 0
cb.successes = 0
cb.lastFailure = time.Time{}
cb.halfOpenRequests = 0
}
// Stats contains circuit breaker statistics.
type Stats struct {
State State
Failures int
Successes int
LastFailure time.Time
}