Major refactoring to hexagonal (ports & adapters) architecture: - Add service layer (apikey_service, project_service) for business logic - Add webhook system with dispatcher and delivery tracking - Add command queue with priority-based processing - Add rate limiting with sliding window algorithm - Add audit logging for command execution - Add OpenTelemetry integration (traces, metrics, spans) - Add circuit breaker for fault tolerance - Add cached repository wrapper for performance - Add comprehensive validation package - Add Kubernetes client integration for pod management - Add database migrations (allowed_ips, audit_log, rate_limiting, queue, webhooks) - Add network policy and PodDisruptionBudget for k8s - Remove legacy executor and projects/registry packages - Untrack secrets.yaml (now managed via envault) - Add coverage.out to .gitignore - Add e2e test infrastructure with docker-compose - Add comprehensive documentation (API, architecture, operations, plans) - Add golangci-lint config and pre-commit hook Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
356 lines
9.4 KiB
Go
356 lines
9.4 KiB
Go
// Package webhook provides webhook dispatch functionality.
|
|
package webhook
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/hmac"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/orchard9/rdev/internal/domain"
|
|
"github.com/orchard9/rdev/internal/port"
|
|
)
|
|
|
|
// DispatcherConfig holds configuration for the webhook dispatcher.
|
|
type DispatcherConfig struct {
|
|
// WorkerCount is the number of concurrent delivery workers.
|
|
WorkerCount int
|
|
// MaxRetries is the maximum number of retry attempts for failed deliveries.
|
|
MaxRetries int
|
|
// Timeout is the HTTP request timeout for webhook deliveries.
|
|
Timeout time.Duration
|
|
// RetryBackoff defines the base backoff duration for retries (exponential).
|
|
RetryBackoff time.Duration
|
|
// MaxResponseBodySize is the maximum size of response body to store.
|
|
MaxResponseBodySize int
|
|
// Logger is the logger to use.
|
|
Logger *slog.Logger
|
|
}
|
|
|
|
// DefaultDispatcherConfig returns sensible defaults.
|
|
func DefaultDispatcherConfig() *DispatcherConfig {
|
|
return &DispatcherConfig{
|
|
WorkerCount: 10,
|
|
MaxRetries: 3,
|
|
Timeout: 30 * time.Second,
|
|
RetryBackoff: 5 * time.Second,
|
|
MaxResponseBodySize: 1024, // 1KB
|
|
Logger: slog.Default(),
|
|
}
|
|
}
|
|
|
|
// deliveryJob represents a webhook delivery job.
|
|
type deliveryJob struct {
|
|
webhook *domain.Webhook
|
|
event *domain.WebhookEvent
|
|
deliveryID string
|
|
retryCount int
|
|
}
|
|
|
|
// Dispatcher handles webhook delivery with worker pool and retry logic.
|
|
type Dispatcher struct {
|
|
repo port.WebhookRepository
|
|
config *DispatcherConfig
|
|
client *http.Client
|
|
|
|
// Job queue
|
|
jobs chan deliveryJob
|
|
|
|
// Shutdown management
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
// NewDispatcher creates a new webhook dispatcher.
|
|
func NewDispatcher(repo port.WebhookRepository, cfg *DispatcherConfig) *Dispatcher {
|
|
if cfg == nil {
|
|
cfg = DefaultDispatcherConfig()
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
return &Dispatcher{
|
|
repo: repo,
|
|
config: cfg,
|
|
client: &http.Client{
|
|
Timeout: cfg.Timeout,
|
|
},
|
|
jobs: make(chan deliveryJob, 1000), // Buffered channel for job queue
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
}
|
|
}
|
|
|
|
// Ensure Dispatcher implements port.WebhookDispatcher at compile time.
|
|
var _ port.WebhookDispatcher = (*Dispatcher)(nil)
|
|
|
|
// Start starts the background dispatcher workers.
|
|
func (d *Dispatcher) Start() error {
|
|
d.config.Logger.Info("webhook dispatcher starting", "workers", d.config.WorkerCount)
|
|
|
|
// Start worker goroutines
|
|
for i := 0; i < d.config.WorkerCount; i++ {
|
|
d.wg.Add(1)
|
|
go d.worker(i)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stop gracefully shuts down the dispatcher.
|
|
func (d *Dispatcher) Stop() {
|
|
d.config.Logger.Info("webhook dispatcher stopping")
|
|
d.cancel()
|
|
close(d.jobs)
|
|
d.wg.Wait()
|
|
d.config.Logger.Info("webhook dispatcher stopped")
|
|
}
|
|
|
|
// Health returns true if the dispatcher is running and healthy.
|
|
func (d *Dispatcher) Health() bool {
|
|
select {
|
|
case <-d.ctx.Done():
|
|
return false
|
|
default:
|
|
return true
|
|
}
|
|
}
|
|
|
|
// QueueSize returns the current number of pending jobs in the queue.
|
|
func (d *Dispatcher) QueueSize() int {
|
|
return len(d.jobs)
|
|
}
|
|
|
|
// Dispatch sends an event to all subscribed webhooks for a project.
|
|
// This is a non-blocking operation - deliveries happen in the background.
|
|
func (d *Dispatcher) Dispatch(ctx context.Context, projectID string, event *domain.WebhookEvent) error {
|
|
// Find all enabled webhooks that subscribe to this event type
|
|
webhooks, err := d.repo.ListEnabledByProjectAndEvent(ctx, projectID, event.Type)
|
|
if err != nil {
|
|
return fmt.Errorf("list webhooks: %w", err)
|
|
}
|
|
|
|
if len(webhooks) == 0 {
|
|
return nil // No webhooks to dispatch to
|
|
}
|
|
|
|
d.config.Logger.Debug("dispatching webhook event",
|
|
"project_id", projectID,
|
|
"event_type", event.Type,
|
|
"webhook_count", len(webhooks),
|
|
)
|
|
|
|
// Queue delivery jobs for each webhook
|
|
for _, webhook := range webhooks {
|
|
deliveryID := uuid.New().String()
|
|
|
|
select {
|
|
case d.jobs <- deliveryJob{
|
|
webhook: webhook,
|
|
event: event,
|
|
deliveryID: deliveryID,
|
|
retryCount: 0,
|
|
}:
|
|
// Job queued successfully
|
|
default:
|
|
// Job queue is full, log warning
|
|
d.config.Logger.Warn("webhook job queue full, dropping event",
|
|
"webhook_id", webhook.ID,
|
|
"event_type", event.Type,
|
|
)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// worker processes delivery jobs from the queue.
|
|
func (d *Dispatcher) worker(id int) {
|
|
defer d.wg.Done()
|
|
|
|
d.config.Logger.Debug("webhook worker started", "worker_id", id)
|
|
|
|
for {
|
|
select {
|
|
case <-d.ctx.Done():
|
|
d.config.Logger.Debug("webhook worker stopping", "worker_id", id)
|
|
return
|
|
case job, ok := <-d.jobs:
|
|
if !ok {
|
|
d.config.Logger.Debug("webhook worker job channel closed", "worker_id", id)
|
|
return
|
|
}
|
|
d.processJob(job)
|
|
}
|
|
}
|
|
}
|
|
|
|
// processJob delivers a webhook and handles retries.
|
|
func (d *Dispatcher) processJob(job deliveryJob) {
|
|
delivery := d.deliver(job)
|
|
|
|
// Record the delivery attempt
|
|
recordCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
if err := d.repo.RecordDelivery(recordCtx, delivery); err != nil {
|
|
d.config.Logger.Error("failed to record webhook delivery",
|
|
"webhook_id", job.webhook.ID,
|
|
"delivery_id", delivery.ID,
|
|
"error", err,
|
|
)
|
|
}
|
|
|
|
// Handle retry if delivery failed
|
|
if !delivery.Success && job.retryCount < d.config.MaxRetries {
|
|
// Calculate exponential backoff
|
|
backoff := d.config.RetryBackoff * time.Duration(1<<job.retryCount)
|
|
|
|
d.config.Logger.Info("scheduling webhook retry",
|
|
"webhook_id", job.webhook.ID,
|
|
"delivery_id", delivery.ID,
|
|
"retry_count", job.retryCount+1,
|
|
"backoff", backoff,
|
|
)
|
|
|
|
// Schedule retry after backoff with shutdown awareness
|
|
time.AfterFunc(backoff, func() {
|
|
// Check if dispatcher is shutting down before attempting to queue
|
|
select {
|
|
case <-d.ctx.Done():
|
|
// Dispatcher shutting down, don't queue retry
|
|
d.config.Logger.Debug("skipping webhook retry due to shutdown",
|
|
"webhook_id", job.webhook.ID,
|
|
)
|
|
return
|
|
default:
|
|
}
|
|
|
|
// Attempt to queue the retry
|
|
select {
|
|
case d.jobs <- deliveryJob{
|
|
webhook: job.webhook,
|
|
event: job.event,
|
|
deliveryID: job.deliveryID,
|
|
retryCount: job.retryCount + 1,
|
|
}:
|
|
// Retry queued successfully
|
|
case <-d.ctx.Done():
|
|
// Dispatcher shut down while waiting to queue
|
|
d.config.Logger.Debug("webhook retry cancelled due to shutdown",
|
|
"webhook_id", job.webhook.ID,
|
|
)
|
|
default:
|
|
d.config.Logger.Warn("failed to queue webhook retry, queue full",
|
|
"webhook_id", job.webhook.ID,
|
|
)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// deliver performs the actual HTTP delivery of a webhook.
|
|
func (d *Dispatcher) deliver(job deliveryJob) *domain.WebhookDelivery {
|
|
delivery := &domain.WebhookDelivery{
|
|
ID: domain.WebhookDeliveryID(fmt.Sprintf("%s-%d", job.deliveryID, job.retryCount)),
|
|
WebhookID: job.webhook.ID,
|
|
EventType: job.event.Type,
|
|
DeliveredAt: time.Now(),
|
|
RetryCount: job.retryCount,
|
|
}
|
|
|
|
// Build payload
|
|
payload := domain.WebhookPayload{
|
|
ID: string(delivery.ID),
|
|
Event: job.event.Type,
|
|
Timestamp: job.event.Timestamp,
|
|
ProjectID: job.event.ProjectID,
|
|
Data: job.event.Data,
|
|
}
|
|
|
|
payloadBytes, err := json.Marshal(payload)
|
|
if err != nil {
|
|
delivery.Success = false
|
|
delivery.ErrorMessage = fmt.Sprintf("failed to marshal payload: %v", err)
|
|
return delivery
|
|
}
|
|
delivery.Payload = string(payloadBytes)
|
|
|
|
// Create request
|
|
req, err := http.NewRequestWithContext(d.ctx, http.MethodPost, job.webhook.URL, bytes.NewReader(payloadBytes))
|
|
if err != nil {
|
|
delivery.Success = false
|
|
delivery.ErrorMessage = fmt.Sprintf("failed to create request: %v", err)
|
|
return delivery
|
|
}
|
|
|
|
// Set headers
|
|
req.Header.Set("Content-Type", "application/json")
|
|
req.Header.Set("User-Agent", "rdev-webhook/1.0")
|
|
req.Header.Set("X-Webhook-Event", string(job.event.Type))
|
|
req.Header.Set("X-Webhook-Delivery", string(delivery.ID))
|
|
|
|
// Sign payload if secret is configured
|
|
if job.webhook.HasSecret() {
|
|
signature := d.signPayload(payloadBytes, job.webhook.Secret)
|
|
req.Header.Set("X-Webhook-Signature", signature)
|
|
}
|
|
|
|
// Send request
|
|
resp, err := d.client.Do(req)
|
|
if err != nil {
|
|
delivery.Success = false
|
|
delivery.ErrorMessage = fmt.Sprintf("request failed: %v", err)
|
|
d.config.Logger.Debug("webhook delivery failed",
|
|
"webhook_id", job.webhook.ID,
|
|
"delivery_id", delivery.ID,
|
|
"error", err,
|
|
)
|
|
return delivery
|
|
}
|
|
defer func() { _ = resp.Body.Close() }()
|
|
|
|
delivery.ResponseStatus = resp.StatusCode
|
|
|
|
// Read response body (limited)
|
|
bodyBytes, _ := io.ReadAll(io.LimitReader(resp.Body, int64(d.config.MaxResponseBodySize)))
|
|
delivery.ResponseBody = string(bodyBytes)
|
|
|
|
// Check if delivery was successful (2xx status codes)
|
|
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
|
|
delivery.Success = true
|
|
d.config.Logger.Debug("webhook delivered successfully",
|
|
"webhook_id", job.webhook.ID,
|
|
"delivery_id", delivery.ID,
|
|
"status", resp.StatusCode,
|
|
)
|
|
} else {
|
|
delivery.Success = false
|
|
delivery.ErrorMessage = fmt.Sprintf("received non-2xx status: %d", resp.StatusCode)
|
|
d.config.Logger.Debug("webhook delivery failed",
|
|
"webhook_id", job.webhook.ID,
|
|
"delivery_id", delivery.ID,
|
|
"status", resp.StatusCode,
|
|
)
|
|
}
|
|
|
|
return delivery
|
|
}
|
|
|
|
// signPayload creates an HMAC-SHA256 signature of the payload.
|
|
func (d *Dispatcher) signPayload(payload []byte, secret string) string {
|
|
mac := hmac.New(sha256.New, []byte(secret))
|
|
mac.Write(payload)
|
|
return "sha256=" + hex.EncodeToString(mac.Sum(nil))
|
|
}
|