rdev/internal/circuitbreaker/circuitbreaker.go
jordan 72d16929ca feat: Implement hexagonal architecture with services, webhooks, queue, and telemetry
Major refactoring to hexagonal (ports & adapters) architecture:

- Add service layer (apikey_service, project_service) for business logic
- Add webhook system with dispatcher and delivery tracking
- Add command queue with priority-based processing
- Add rate limiting with sliding window algorithm
- Add audit logging for command execution
- Add OpenTelemetry integration (traces, metrics, spans)
- Add circuit breaker for fault tolerance
- Add cached repository wrapper for performance
- Add comprehensive validation package
- Add Kubernetes client integration for pod management
- Add database migrations (allowed_ips, audit_log, rate_limiting, queue, webhooks)
- Add network policy and PodDisruptionBudget for k8s
- Remove legacy executor and projects/registry packages
- Untrack secrets.yaml (now managed via envault)
- Add coverage.out to .gitignore
- Add e2e test infrastructure with docker-compose
- Add comprehensive documentation (API, architecture, operations, plans)
- Add golangci-lint config and pre-commit hook

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-25 19:57:46 -07:00

221 lines
4.5 KiB
Go

// Package circuitbreaker provides protection against cascading failures.
//
// The circuit breaker pattern prevents repeated calls to a failing service,
// allowing it time to recover. After a threshold of failures, the circuit
// "opens" and returns errors immediately without attempting the operation.
package circuitbreaker
import (
"errors"
"sync"
"time"
)
// State represents the circuit breaker state.
type State int
const (
// Closed is the normal operating state - requests are allowed through.
Closed State = iota
// Open means the circuit is tripped - requests fail immediately.
Open
// HalfOpen means we're testing if the service has recovered.
HalfOpen
)
func (s State) String() string {
switch s {
case Closed:
return "closed"
case Open:
return "open"
case HalfOpen:
return "half-open"
default:
return "unknown"
}
}
// Errors returned by the circuit breaker.
var (
ErrCircuitOpen = errors.New("circuit breaker is open")
)
// Config configures the circuit breaker behavior.
type Config struct {
// FailureThreshold is the number of consecutive failures before opening.
// Default: 5
FailureThreshold int
// ResetTimeout is how long to wait before attempting recovery (half-open).
// Default: 30 seconds
ResetTimeout time.Duration
// HalfOpenRequests is how many requests to allow in half-open state.
// Default: 1
HalfOpenRequests int
}
// DefaultConfig returns sensible defaults.
func DefaultConfig() Config {
return Config{
FailureThreshold: 5,
ResetTimeout: 30 * time.Second,
HalfOpenRequests: 1,
}
}
// CircuitBreaker implements the circuit breaker pattern.
type CircuitBreaker struct {
cfg Config
mu sync.RWMutex
state State
failures int
successes int
lastFailure time.Time
halfOpenRequests int
}
// New creates a new circuit breaker with the given configuration.
func New(cfg Config) *CircuitBreaker {
if cfg.FailureThreshold <= 0 {
cfg.FailureThreshold = 5
}
if cfg.ResetTimeout <= 0 {
cfg.ResetTimeout = 30 * time.Second
}
if cfg.HalfOpenRequests <= 0 {
cfg.HalfOpenRequests = 1
}
return &CircuitBreaker{
cfg: cfg,
state: Closed,
}
}
// Execute runs the function if the circuit allows it.
// Returns ErrCircuitOpen if the circuit is open.
func (cb *CircuitBreaker) Execute(fn func() error) error {
if !cb.canExecute() {
return ErrCircuitOpen
}
err := fn()
cb.recordResult(err)
return err
}
// canExecute checks if a request should be allowed.
func (cb *CircuitBreaker) canExecute() bool {
cb.mu.Lock()
defer cb.mu.Unlock()
switch cb.state {
case Closed:
return true
case Open:
// Check if reset timeout has passed
if time.Since(cb.lastFailure) > cb.cfg.ResetTimeout {
cb.state = HalfOpen
cb.halfOpenRequests = 0
return true
}
return false
case HalfOpen:
// Allow limited requests in half-open state
if cb.halfOpenRequests < cb.cfg.HalfOpenRequests {
cb.halfOpenRequests++
return true
}
return false
}
return false
}
// recordResult updates state based on operation outcome.
func (cb *CircuitBreaker) recordResult(err error) {
cb.mu.Lock()
defer cb.mu.Unlock()
if err != nil {
cb.onFailure()
} else {
cb.onSuccess()
}
}
// onFailure handles a failed operation.
func (cb *CircuitBreaker) onFailure() {
cb.failures++
cb.successes = 0
cb.lastFailure = time.Now()
switch cb.state {
case Closed:
if cb.failures >= cb.cfg.FailureThreshold {
cb.state = Open
}
case HalfOpen:
cb.state = Open
}
}
// onSuccess handles a successful operation.
func (cb *CircuitBreaker) onSuccess() {
cb.successes++
switch cb.state {
case Closed:
cb.failures = 0
case HalfOpen:
// Successful probe - close the circuit
cb.state = Closed
cb.failures = 0
}
}
// State returns the current circuit state.
func (cb *CircuitBreaker) State() State {
cb.mu.RLock()
defer cb.mu.RUnlock()
return cb.state
}
// Stats returns current circuit statistics.
func (cb *CircuitBreaker) Stats() Stats {
cb.mu.RLock()
defer cb.mu.RUnlock()
return Stats{
State: cb.state,
Failures: cb.failures,
Successes: cb.successes,
LastFailure: cb.lastFailure,
}
}
// Reset manually resets the circuit breaker to closed state.
func (cb *CircuitBreaker) Reset() {
cb.mu.Lock()
defer cb.mu.Unlock()
cb.state = Closed
cb.failures = 0
cb.successes = 0
cb.lastFailure = time.Time{}
cb.halfOpenRequests = 0
}
// Stats contains circuit breaker statistics.
type Stats struct {
State State
Failures int
Successes int
LastFailure time.Time
}