rdev/internal/handlers/health.go
jordan f20fc6c51c
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
feat(saga): implement enterprise-grade resilience architecture
Fixes issues from code review of resilience implementation:

- Wire saga system in main.go (SagaRepository, SagaExecutor, SagaHandler)
- Fix CompletedSteps() to include skipped steps for dependency resolution
- Fix reverse loop bug in saga compensation (use standard swap pattern)
- Add circuit breaker state change callbacks for Prometheus metrics

Phase 1 (Build Resilience):
- Add failure:retry to all component Kaniko build steps
- Add preflight registry health check before builds
- Add services-deployed sync point to decouple docs from critical path

Phase 2 (API Resilience):
- Add pipeline retry endpoint (POST /projects/{id}/pipelines/{number}/retry)
- Wire circuit breakers with metrics callbacks
- Add /health/circuits endpoint for circuit breaker status

Phase 3 (Saga Engine):
- Full domain model (Saga, SagaStep, RetryPolicy, BackoffType)
- PostgreSQL saga repository with CRUD and step management
- Saga executor with retry, compensation, skip step support
- Saga API handlers with CRUD and control operations

Phase 4 (Observability):
- Add saga metrics (total, step_duration, retry, circuit_breaker_state)
- Add logging fields (saga_id, saga_name, step_name)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-08 01:58:02 -07:00

324 lines
8.5 KiB
Go

// Package handlers provides HTTP handlers for the rdev API.
package handlers
import (
"context"
"fmt"
"net/http"
"strings"
"time"
"github.com/orchard9/rdev/internal/circuitbreaker"
"github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/metrics"
"github.com/orchard9/rdev/internal/port"
"github.com/orchard9/rdev/pkg/api"
)
// ExecutorHealthChecker reports whether a background executor is running.
type ExecutorHealthChecker interface {
Running() bool
WorkerID() string
}
// ExternalHealthStatusProvider provides cached external system health statuses.
type ExternalHealthStatusProvider interface {
GetAllStatuses() map[domain.ExternalSystem]domain.ExternalSystemStatus
}
// HealthHandler handles health and readiness checks.
type HealthHandler struct {
serviceName string
db port.DatabasePinger
k8sChecker port.KubernetesChecker
agentRegistry port.CodeAgentRegistry
workExecutor ExecutorHealthChecker
registryChecker port.RegistryChecker
externalChecker ExternalHealthStatusProvider
}
// NewHealthHandler creates a new health handler with dependencies.
func NewHealthHandler(serviceName string, db port.DatabasePinger, k8sChecker port.KubernetesChecker) *HealthHandler {
return &HealthHandler{
serviceName: serviceName,
db: db,
k8sChecker: k8sChecker,
}
}
// WithAgentRegistry adds a code agent registry for health monitoring.
func (h *HealthHandler) WithAgentRegistry(registry port.CodeAgentRegistry) *HealthHandler {
h.agentRegistry = registry
return h
}
// WithWorkExecutor adds a work executor for health monitoring.
func (h *HealthHandler) WithWorkExecutor(executor ExecutorHealthChecker) *HealthHandler {
h.workExecutor = executor
return h
}
// WithRegistryChecker adds a registry checker for health monitoring.
func (h *HealthHandler) WithRegistryChecker(checker port.RegistryChecker) *HealthHandler {
h.registryChecker = checker
return h
}
// WithExternalHealthChecker adds a cached external health checker for monitoring.
func (h *HealthHandler) WithExternalHealthChecker(checker ExternalHealthStatusProvider) *HealthHandler {
h.externalChecker = checker
return h
}
// Health returns a simple liveness check.
// This should be lightweight and only fail if the process is unhealthy.
// GET /health
func (h *HealthHandler) Health(w http.ResponseWriter, r *http.Request) {
api.WriteSuccess(w, r, map[string]string{
"status": "ok",
"service": h.serviceName,
})
}
// Ready returns a readiness check with dependency health.
// This checks all required dependencies (database, k8s) and returns
// 503 if any are unhealthy.
// GET /ready
func (h *HealthHandler) Ready(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), TimeoutFastLookup)
defer cancel()
checks := make(map[string]CheckResult)
allHealthy := true
// Database check
if h.db != nil {
dbCheck := h.checkDatabase(ctx)
checks["database"] = dbCheck
if !dbCheck.Healthy {
allHealthy = false
}
}
// Kubernetes check
if h.k8sChecker != nil {
k8sCheck := h.checkKubernetes(ctx)
checks["kubernetes"] = k8sCheck
if !k8sCheck.Healthy {
allHealthy = false
}
}
// Code agent checks (informational - don't affect overall readiness)
if h.agentRegistry != nil {
agentChecks := h.checkCodeAgents(ctx)
for name, check := range agentChecks {
checks["agent:"+name] = check
}
}
// Work executor check (informational)
if h.workExecutor != nil {
checks["work_executor"] = h.checkWorkExecutor()
}
// Registry check (informational - doesn't affect overall readiness)
if h.registryChecker != nil {
checks["registry"] = h.checkRegistry(ctx)
}
// External system checks (cached, from background worker)
if h.externalChecker != nil {
for system, status := range h.externalChecker.GetAllStatuses() {
checks["external:"+string(system)] = CheckResult{
Healthy: status.Healthy,
Message: status.Error,
Latency: status.Latency.String(),
LastCheck: status.LastChecked,
}
if status.Healthy {
checks["external:"+string(system)] = CheckResult{
Healthy: true,
Message: "connected",
Latency: status.Latency.String(),
LastCheck: status.LastChecked,
}
}
}
}
response := ReadinessResponse{
Status: "ready",
Service: h.serviceName,
Checks: checks,
}
if !allHealthy {
response.Status = "not_ready"
api.WriteError(w, r, http.StatusServiceUnavailable, "NOT_READY",
"Service not ready - one or more checks failed")
return
}
api.WriteSuccess(w, r, response)
}
// checkDatabase performs a database health check.
func (h *HealthHandler) checkDatabase(ctx context.Context) CheckResult {
start := time.Now()
err := h.db.PingContext(ctx)
latency := time.Since(start)
if err != nil {
return CheckResult{
Healthy: false,
Message: "connection failed: " + err.Error(),
Latency: latency.String(),
LastCheck: time.Now().UTC(),
}
}
return CheckResult{
Healthy: true,
Message: "connected",
Latency: latency.String(),
LastCheck: time.Now().UTC(),
}
}
// checkKubernetes performs a Kubernetes API health check.
func (h *HealthHandler) checkKubernetes(_ context.Context) CheckResult {
start := time.Now()
// Try to get server version - lightweight API call
_, err := h.k8sChecker.ServerVersion()
latency := time.Since(start)
if err != nil {
// Check if it's a timeout or connection error
msg := err.Error()
if strings.Contains(msg, "timeout") || strings.Contains(msg, "deadline") {
msg = "connection timeout"
} else if strings.Contains(msg, "refused") {
msg = "connection refused"
}
return CheckResult{
Healthy: false,
Message: msg,
Latency: latency.String(),
LastCheck: time.Now().UTC(),
}
}
return CheckResult{
Healthy: true,
Message: "connected",
Latency: latency.String(),
LastCheck: time.Now().UTC(),
}
}
// checkCodeAgents performs health checks on all registered code agents.
func (h *HealthHandler) checkCodeAgents(ctx context.Context) map[string]CheckResult {
results := make(map[string]CheckResult)
providers := h.agentRegistry.Available()
for _, provider := range providers {
agent := h.agentRegistry.Get(provider)
if agent == nil {
continue
}
start := time.Now()
available := agent.Available(ctx)
latency := time.Since(start)
msg := "available"
if !available {
msg = "unavailable"
}
results[string(provider)] = CheckResult{
Healthy: available,
Message: fmt.Sprintf("%s (%s)", msg, agent.Name()),
Latency: latency.String(),
LastCheck: time.Now().UTC(),
}
}
return results
}
// checkWorkExecutor checks whether the work executor is running.
func (h *HealthHandler) checkWorkExecutor() CheckResult {
running := h.workExecutor.Running()
msg := fmt.Sprintf("worker %s: running", h.workExecutor.WorkerID())
if !running {
msg = fmt.Sprintf("worker %s: stopped", h.workExecutor.WorkerID())
}
return CheckResult{
Healthy: running,
Message: msg,
LastCheck: time.Now().UTC(),
}
}
// checkRegistry checks whether the container registry is healthy.
func (h *HealthHandler) checkRegistry(ctx context.Context) CheckResult {
status := h.registryChecker.Check(ctx)
// Update metrics
latencySeconds := 0.0
if status.Latency != "" {
// Parse duration string like "45ms"
if d, err := time.ParseDuration(status.Latency); err == nil {
latencySeconds = d.Seconds()
}
}
metrics.SetRegistryHealth(status.Healthy, latencySeconds)
result := CheckResult{
Healthy: status.Healthy,
Latency: status.Latency,
LastCheck: status.LastChecked,
}
if status.Healthy {
result.Message = "connected"
} else {
result.Message = status.Error
}
return result
}
// CheckResult represents the result of a health check.
type CheckResult struct {
Healthy bool `json:"healthy"`
Message string `json:"message"`
Latency string `json:"latency,omitempty"`
LastCheck time.Time `json:"last_check"`
}
// ReadinessResponse is the response for the /ready endpoint.
type ReadinessResponse struct {
Status string `json:"status"`
Service string `json:"service"`
Checks map[string]CheckResult `json:"checks,omitempty"`
}
// CircuitsResponse is the response for the /health/circuits endpoint.
type CircuitsResponse struct {
Circuits []circuitbreaker.StatusEntry `json:"circuits"`
}
// Circuits returns the status of all circuit breakers.
// GET /health/circuits
func (h *HealthHandler) Circuits(w http.ResponseWriter, r *http.Request) {
entries := circuitbreaker.GlobalRegistry.AllStatus()
api.WriteSuccess(w, r, CircuitsResponse{
Circuits: entries,
})
}