rdev/internal/telemetry/middleware.go
jordan 72d16929ca feat: Implement hexagonal architecture with services, webhooks, queue, and telemetry
Major refactoring to hexagonal (ports & adapters) architecture:

- Add service layer (apikey_service, project_service) for business logic
- Add webhook system with dispatcher and delivery tracking
- Add command queue with priority-based processing
- Add rate limiting with sliding window algorithm
- Add audit logging for command execution
- Add OpenTelemetry integration (traces, metrics, spans)
- Add circuit breaker for fault tolerance
- Add cached repository wrapper for performance
- Add comprehensive validation package
- Add Kubernetes client integration for pod management
- Add database migrations (allowed_ips, audit_log, rate_limiting, queue, webhooks)
- Add network policy and PodDisruptionBudget for k8s
- Remove legacy executor and projects/registry packages
- Untrack secrets.yaml (now managed via envault)
- Add coverage.out to .gitignore
- Add e2e test infrastructure with docker-compose
- Add comprehensive documentation (API, architecture, operations, plans)
- Add golangci-lint config and pre-commit hook

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-25 19:57:46 -07:00

162 lines
4.9 KiB
Go

package telemetry
import (
"fmt"
"net/http"
"regexp"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.opentelemetry.io/otel/trace"
)
// Middleware returns an HTTP middleware that traces requests using OpenTelemetry.
// It creates a span for each request with standard HTTP attributes.
func Middleware(serviceName string) func(http.Handler) http.Handler {
tracer := otel.Tracer(serviceName)
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Extract trace context from incoming request headers
ctx := otel.GetTextMapPropagator().Extract(r.Context(), propagation.HeaderCarrier(r.Header))
// Determine the route pattern (for chi router)
routePattern := getRoutePattern(r)
if routePattern == "" {
routePattern = r.URL.Path
}
// Create span name: "HTTP METHOD /path"
spanName := fmt.Sprintf("%s %s", r.Method, routePattern)
// Start span
ctx, span := tracer.Start(ctx, spanName,
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(
semconv.HTTPRequestMethodKey.String(r.Method),
semconv.URLPath(r.URL.Path),
semconv.URLScheme(getScheme(r)),
semconv.ServerAddress(r.Host),
semconv.UserAgentOriginal(r.UserAgent()),
semconv.HTTPRoute(routePattern),
),
)
defer span.End()
// Add request ID if available (from chi middleware)
if reqID := middleware.GetReqID(ctx); reqID != "" {
span.SetAttributes(attribute.String("request.id", reqID))
}
// Add client IP
if clientIP := r.Header.Get("X-Real-IP"); clientIP != "" {
span.SetAttributes(semconv.ClientAddress(clientIP))
} else if clientIP := r.Header.Get("X-Forwarded-For"); clientIP != "" {
span.SetAttributes(semconv.ClientAddress(clientIP))
} else {
span.SetAttributes(semconv.ClientAddress(r.RemoteAddr))
}
// Wrap response writer to capture status code
rw := &responseWriter{ResponseWriter: w, statusCode: http.StatusOK}
// Continue with the next handler
next.ServeHTTP(rw, r.WithContext(ctx))
// Record response attributes
span.SetAttributes(semconv.HTTPResponseStatusCode(rw.statusCode))
// Mark span as error if status >= 400
if rw.statusCode >= 400 {
span.SetAttributes(attribute.Bool("error", true))
}
// Add response size if available
if rw.bytesWritten > 0 {
span.SetAttributes(attribute.Int64("http.response.body.size", rw.bytesWritten))
}
})
}
}
// responseWriter wraps http.ResponseWriter to capture status code and bytes written.
type responseWriter struct {
http.ResponseWriter
statusCode int
bytesWritten int64
}
func (rw *responseWriter) WriteHeader(code int) {
rw.statusCode = code
rw.ResponseWriter.WriteHeader(code)
}
func (rw *responseWriter) Write(b []byte) (int, error) {
n, err := rw.ResponseWriter.Write(b)
rw.bytesWritten += int64(n)
return n, err
}
// Unwrap returns the underlying ResponseWriter for middleware that needs it.
func (rw *responseWriter) Unwrap() http.ResponseWriter {
return rw.ResponseWriter
}
// getRoutePattern attempts to get the chi route pattern for the request.
// Falls back to a normalized path if no pattern is available.
func getRoutePattern(r *http.Request) string {
// Try to get chi's route pattern
rctx := chi.RouteContext(r.Context())
if rctx != nil && rctx.RoutePattern() != "" {
return rctx.RoutePattern()
}
// Fall back to normalizing the path to avoid cardinality explosion
return normalizePath(r.URL.Path)
}
// pathNormalizers contains patterns to normalize variable path segments.
var pathNormalizers = []struct {
pattern *regexp.Regexp
replace string
}{
// /keys/uuid -> /keys/{id}
{regexp.MustCompile(`^/keys/[^/]+$`), "/keys/{id}"},
// /projects/{id}/claude-config/{type}/{name}
{regexp.MustCompile(`^/projects/[^/]+/claude-config/(commands|skills|agents)/[^/]+$`), "/projects/{id}/claude-config/$1/{name}"},
// /projects/{id}/... (any sub-path)
{regexp.MustCompile(`^/projects/[^/]+(/.*)?$`), "/projects/{id}$1"},
}
// normalizePath normalizes the URL path for consistent span names.
// Replaces variable path segments with placeholders to prevent cardinality explosion.
func normalizePath(path string) string {
for _, n := range pathNormalizers {
if n.pattern.MatchString(path) {
return n.pattern.ReplaceAllString(path, n.replace)
}
}
return path
}
// getScheme determines the request scheme (http or https).
func getScheme(r *http.Request) string {
if r.TLS != nil {
return "https"
}
if scheme := r.Header.Get("X-Forwarded-Proto"); scheme != "" {
return scheme
}
return "http"
}
// SpanFromRequest extracts the current span from a request context.
// Useful for adding attributes or events to the span in handlers.
func SpanFromRequest(r *http.Request) trace.Span {
return trace.SpanFromContext(r.Context())
}