Major refactoring to hexagonal (ports & adapters) architecture: - Add service layer (apikey_service, project_service) for business logic - Add webhook system with dispatcher and delivery tracking - Add command queue with priority-based processing - Add rate limiting with sliding window algorithm - Add audit logging for command execution - Add OpenTelemetry integration (traces, metrics, spans) - Add circuit breaker for fault tolerance - Add cached repository wrapper for performance - Add comprehensive validation package - Add Kubernetes client integration for pod management - Add database migrations (allowed_ips, audit_log, rate_limiting, queue, webhooks) - Add network policy and PodDisruptionBudget for k8s - Remove legacy executor and projects/registry packages - Untrack secrets.yaml (now managed via envault) - Add coverage.out to .gitignore - Add e2e test infrastructure with docker-compose - Add comprehensive documentation (API, architecture, operations, plans) - Add golangci-lint config and pre-commit hook Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
221 lines
4.5 KiB
Go
221 lines
4.5 KiB
Go
// Package circuitbreaker provides protection against cascading failures.
|
|
//
|
|
// The circuit breaker pattern prevents repeated calls to a failing service,
|
|
// allowing it time to recover. After a threshold of failures, the circuit
|
|
// "opens" and returns errors immediately without attempting the operation.
|
|
package circuitbreaker
|
|
|
|
import (
|
|
"errors"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// State represents the circuit breaker state.
|
|
type State int
|
|
|
|
const (
|
|
// Closed is the normal operating state - requests are allowed through.
|
|
Closed State = iota
|
|
// Open means the circuit is tripped - requests fail immediately.
|
|
Open
|
|
// HalfOpen means we're testing if the service has recovered.
|
|
HalfOpen
|
|
)
|
|
|
|
func (s State) String() string {
|
|
switch s {
|
|
case Closed:
|
|
return "closed"
|
|
case Open:
|
|
return "open"
|
|
case HalfOpen:
|
|
return "half-open"
|
|
default:
|
|
return "unknown"
|
|
}
|
|
}
|
|
|
|
// Errors returned by the circuit breaker.
|
|
var (
|
|
ErrCircuitOpen = errors.New("circuit breaker is open")
|
|
)
|
|
|
|
// Config configures the circuit breaker behavior.
|
|
type Config struct {
|
|
// FailureThreshold is the number of consecutive failures before opening.
|
|
// Default: 5
|
|
FailureThreshold int
|
|
|
|
// ResetTimeout is how long to wait before attempting recovery (half-open).
|
|
// Default: 30 seconds
|
|
ResetTimeout time.Duration
|
|
|
|
// HalfOpenRequests is how many requests to allow in half-open state.
|
|
// Default: 1
|
|
HalfOpenRequests int
|
|
}
|
|
|
|
// DefaultConfig returns sensible defaults.
|
|
func DefaultConfig() Config {
|
|
return Config{
|
|
FailureThreshold: 5,
|
|
ResetTimeout: 30 * time.Second,
|
|
HalfOpenRequests: 1,
|
|
}
|
|
}
|
|
|
|
// CircuitBreaker implements the circuit breaker pattern.
|
|
type CircuitBreaker struct {
|
|
cfg Config
|
|
|
|
mu sync.RWMutex
|
|
state State
|
|
failures int
|
|
successes int
|
|
lastFailure time.Time
|
|
halfOpenRequests int
|
|
}
|
|
|
|
// New creates a new circuit breaker with the given configuration.
|
|
func New(cfg Config) *CircuitBreaker {
|
|
if cfg.FailureThreshold <= 0 {
|
|
cfg.FailureThreshold = 5
|
|
}
|
|
if cfg.ResetTimeout <= 0 {
|
|
cfg.ResetTimeout = 30 * time.Second
|
|
}
|
|
if cfg.HalfOpenRequests <= 0 {
|
|
cfg.HalfOpenRequests = 1
|
|
}
|
|
|
|
return &CircuitBreaker{
|
|
cfg: cfg,
|
|
state: Closed,
|
|
}
|
|
}
|
|
|
|
// Execute runs the function if the circuit allows it.
|
|
// Returns ErrCircuitOpen if the circuit is open.
|
|
func (cb *CircuitBreaker) Execute(fn func() error) error {
|
|
if !cb.canExecute() {
|
|
return ErrCircuitOpen
|
|
}
|
|
|
|
err := fn()
|
|
cb.recordResult(err)
|
|
return err
|
|
}
|
|
|
|
// canExecute checks if a request should be allowed.
|
|
func (cb *CircuitBreaker) canExecute() bool {
|
|
cb.mu.Lock()
|
|
defer cb.mu.Unlock()
|
|
|
|
switch cb.state {
|
|
case Closed:
|
|
return true
|
|
|
|
case Open:
|
|
// Check if reset timeout has passed
|
|
if time.Since(cb.lastFailure) > cb.cfg.ResetTimeout {
|
|
cb.state = HalfOpen
|
|
cb.halfOpenRequests = 0
|
|
return true
|
|
}
|
|
return false
|
|
|
|
case HalfOpen:
|
|
// Allow limited requests in half-open state
|
|
if cb.halfOpenRequests < cb.cfg.HalfOpenRequests {
|
|
cb.halfOpenRequests++
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// recordResult updates state based on operation outcome.
|
|
func (cb *CircuitBreaker) recordResult(err error) {
|
|
cb.mu.Lock()
|
|
defer cb.mu.Unlock()
|
|
|
|
if err != nil {
|
|
cb.onFailure()
|
|
} else {
|
|
cb.onSuccess()
|
|
}
|
|
}
|
|
|
|
// onFailure handles a failed operation.
|
|
func (cb *CircuitBreaker) onFailure() {
|
|
cb.failures++
|
|
cb.successes = 0
|
|
cb.lastFailure = time.Now()
|
|
|
|
switch cb.state {
|
|
case Closed:
|
|
if cb.failures >= cb.cfg.FailureThreshold {
|
|
cb.state = Open
|
|
}
|
|
case HalfOpen:
|
|
cb.state = Open
|
|
}
|
|
}
|
|
|
|
// onSuccess handles a successful operation.
|
|
func (cb *CircuitBreaker) onSuccess() {
|
|
cb.successes++
|
|
|
|
switch cb.state {
|
|
case Closed:
|
|
cb.failures = 0
|
|
case HalfOpen:
|
|
// Successful probe - close the circuit
|
|
cb.state = Closed
|
|
cb.failures = 0
|
|
}
|
|
}
|
|
|
|
// State returns the current circuit state.
|
|
func (cb *CircuitBreaker) State() State {
|
|
cb.mu.RLock()
|
|
defer cb.mu.RUnlock()
|
|
return cb.state
|
|
}
|
|
|
|
// Stats returns current circuit statistics.
|
|
func (cb *CircuitBreaker) Stats() Stats {
|
|
cb.mu.RLock()
|
|
defer cb.mu.RUnlock()
|
|
|
|
return Stats{
|
|
State: cb.state,
|
|
Failures: cb.failures,
|
|
Successes: cb.successes,
|
|
LastFailure: cb.lastFailure,
|
|
}
|
|
}
|
|
|
|
// Reset manually resets the circuit breaker to closed state.
|
|
func (cb *CircuitBreaker) Reset() {
|
|
cb.mu.Lock()
|
|
defer cb.mu.Unlock()
|
|
|
|
cb.state = Closed
|
|
cb.failures = 0
|
|
cb.successes = 0
|
|
cb.lastFailure = time.Time{}
|
|
cb.halfOpenRequests = 0
|
|
}
|
|
|
|
// Stats contains circuit breaker statistics.
|
|
type Stats struct {
|
|
State State
|
|
Failures int
|
|
Successes int
|
|
LastFailure time.Time
|
|
}
|