rdev/internal/webhook/dispatcher.go
jordan 853ec4cf81 fix: go.work race condition with batch components and idempotent provisioning
Three coordinated fixes for CI pipeline race conditions:

1. Woodpecker step dependencies: Added depends_on: [deps] to all 6 component
   templates (service, worker, cli, app-astro, app-react, app-nextjs) so build
   steps wait for go work sync to complete.

2. Idempotent resource provisioning: Modified provisionResources() to check
   for existing database/cache before creating, preventing "already exists"
   errors on component re-adds.

3. Batch component endpoint: POST /projects/{id}/components/batch enables
   atomic multi-component additions in a single git commit. Validates all
   components upfront, provisions infra sequentially, commits code components
   atomically.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-05 12:31:40 -07:00

409 lines
11 KiB
Go

// Package webhook provides webhook dispatch functionality.
package webhook
import (
"bytes"
"context"
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"sync"
"time"
"github.com/google/uuid"
"github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/port"
)
// DispatcherConfig holds configuration for the webhook dispatcher.
type DispatcherConfig struct {
// WorkerCount is the number of concurrent delivery workers.
WorkerCount int
// MaxRetries is the maximum number of retry attempts for failed deliveries.
MaxRetries int
// Timeout is the HTTP request timeout for webhook deliveries.
Timeout time.Duration
// RetryBackoff defines the base backoff duration for retries (exponential).
RetryBackoff time.Duration
// MaxResponseBodySize is the maximum size of response body to store.
MaxResponseBodySize int
// Logger is the logger to use.
Logger *slog.Logger
}
// DefaultDispatcherConfig returns sensible defaults.
func DefaultDispatcherConfig() *DispatcherConfig {
return &DispatcherConfig{
WorkerCount: 10,
MaxRetries: 3,
Timeout: 30 * time.Second,
RetryBackoff: 5 * time.Second,
MaxResponseBodySize: 1024, // 1KB
Logger: slog.Default(),
}
}
// deliveryJob represents a webhook delivery job.
type deliveryJob struct {
webhook *domain.Webhook
event *domain.WebhookEvent
deliveryID string
retryCount int
}
// Dispatcher handles webhook delivery with worker pool and retry logic.
type Dispatcher struct {
repo port.WebhookRepository
config *DispatcherConfig
client *http.Client
// Job queue
jobs chan deliveryJob
// Shutdown management
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// NewDispatcher creates a new webhook dispatcher.
func NewDispatcher(repo port.WebhookRepository, cfg *DispatcherConfig) *Dispatcher {
if cfg == nil {
cfg = DefaultDispatcherConfig()
}
ctx, cancel := context.WithCancel(context.Background())
return &Dispatcher{
repo: repo,
config: cfg,
client: &http.Client{
Timeout: cfg.Timeout,
},
jobs: make(chan deliveryJob, 1000), // Buffered channel for job queue
ctx: ctx,
cancel: cancel,
}
}
// Ensure Dispatcher implements port.WebhookDispatcher at compile time.
var _ port.WebhookDispatcher = (*Dispatcher)(nil)
// Start starts the background dispatcher workers.
func (d *Dispatcher) Start() error {
d.config.Logger.Info("webhook dispatcher starting", "workers", d.config.WorkerCount)
// Start worker goroutines
for i := 0; i < d.config.WorkerCount; i++ {
d.wg.Add(1)
go d.worker(i)
}
return nil
}
// Stop gracefully shuts down the dispatcher.
func (d *Dispatcher) Stop() {
d.config.Logger.Info("webhook dispatcher stopping")
d.cancel()
close(d.jobs)
d.wg.Wait()
d.config.Logger.Info("webhook dispatcher stopped")
}
// Health returns true if the dispatcher is running and healthy.
func (d *Dispatcher) Health() bool {
select {
case <-d.ctx.Done():
return false
default:
return true
}
}
// QueueSize returns the current number of pending jobs in the queue.
func (d *Dispatcher) QueueSize() int {
return len(d.jobs)
}
// Dispatch sends an event to all subscribed webhooks for a project.
// This is a non-blocking operation - deliveries happen in the background.
func (d *Dispatcher) Dispatch(ctx context.Context, projectID string, event *domain.WebhookEvent) error {
// Find all enabled webhooks that subscribe to this event type
webhooks, err := d.repo.ListEnabledByProjectAndEvent(ctx, projectID, event.Type)
if err != nil {
return fmt.Errorf("list webhooks: %w", err)
}
if len(webhooks) == 0 {
return nil // No webhooks to dispatch to
}
d.config.Logger.Debug("dispatching webhook event",
"project_id", projectID,
"event_type", event.Type,
"webhook_count", len(webhooks),
)
// Queue delivery jobs for each webhook
for _, webhook := range webhooks {
deliveryID := uuid.New().String()
select {
case d.jobs <- deliveryJob{
webhook: webhook,
event: event,
deliveryID: deliveryID,
retryCount: 0,
}:
// Job queued successfully
default:
// Job queue is full, log warning
d.config.Logger.Warn("webhook job queue full, dropping event",
"webhook_id", webhook.ID,
"event_type", event.Type,
)
}
}
return nil
}
// worker processes delivery jobs from the queue.
func (d *Dispatcher) worker(id int) {
defer d.wg.Done()
d.config.Logger.Debug("webhook worker started", "worker_id", id)
for {
select {
case <-d.ctx.Done():
d.config.Logger.Debug("webhook worker stopping", "worker_id", id)
return
case job, ok := <-d.jobs:
if !ok {
d.config.Logger.Debug("webhook worker job channel closed", "worker_id", id)
return
}
d.processJob(job)
}
}
}
// processJob delivers a webhook and handles retries.
func (d *Dispatcher) processJob(job deliveryJob) {
delivery := d.deliver(job)
// Record the delivery attempt (fire-and-forget: uses dedicated context with
// 10s timeout since recording should not block the job processing loop or
// fail if the dispatcher context is cancelled)
recordCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := d.repo.RecordDelivery(recordCtx, delivery); err != nil {
d.config.Logger.Error("failed to record webhook delivery",
"webhook_id", job.webhook.ID,
"delivery_id", delivery.ID,
"error", err,
)
}
// Handle retry if delivery failed
if !delivery.Success && job.retryCount < d.config.MaxRetries {
// Calculate exponential backoff
backoff := d.config.RetryBackoff * time.Duration(1<<job.retryCount)
d.config.Logger.Info("scheduling webhook retry",
"webhook_id", job.webhook.ID,
"delivery_id", delivery.ID,
"retry_count", job.retryCount+1,
"backoff", backoff,
)
// Schedule retry after backoff with shutdown awareness
time.AfterFunc(backoff, func() {
// Check if dispatcher is shutting down before attempting to queue
select {
case <-d.ctx.Done():
// Dispatcher shutting down, don't queue retry
d.config.Logger.Debug("skipping webhook retry due to shutdown",
"webhook_id", job.webhook.ID,
)
return
default:
}
// Attempt to queue the retry
select {
case d.jobs <- deliveryJob{
webhook: job.webhook,
event: job.event,
deliveryID: job.deliveryID,
retryCount: job.retryCount + 1,
}:
// Retry queued successfully
case <-d.ctx.Done():
// Dispatcher shut down while waiting to queue
d.config.Logger.Debug("webhook retry cancelled due to shutdown",
"webhook_id", job.webhook.ID,
)
default:
d.config.Logger.Warn("failed to queue webhook retry, queue full",
"webhook_id", job.webhook.ID,
)
}
})
}
}
// deliver performs the actual HTTP delivery of a webhook.
func (d *Dispatcher) deliver(job deliveryJob) *domain.WebhookDelivery {
delivery := &domain.WebhookDelivery{
ID: domain.WebhookDeliveryID(fmt.Sprintf("%s-%d", job.deliveryID, job.retryCount)),
WebhookID: job.webhook.ID,
EventType: job.event.Type,
DeliveredAt: time.Now(),
RetryCount: job.retryCount,
}
// Build payload
payload := domain.WebhookPayload{
ID: string(delivery.ID),
Event: job.event.Type,
Timestamp: job.event.Timestamp,
ProjectID: job.event.ProjectID,
Data: job.event.Data,
}
payloadBytes, err := json.Marshal(payload)
if err != nil {
delivery.Success = false
delivery.ErrorMessage = fmt.Sprintf("failed to marshal payload: %v", err)
return delivery
}
delivery.Payload = string(payloadBytes)
// Create request
req, err := http.NewRequestWithContext(d.ctx, http.MethodPost, job.webhook.URL, bytes.NewReader(payloadBytes))
if err != nil {
delivery.Success = false
delivery.ErrorMessage = fmt.Sprintf("failed to create request: %v", err)
return delivery
}
// Set headers
req.Header.Set("Content-Type", "application/json")
req.Header.Set("User-Agent", "rdev-webhook/1.0")
req.Header.Set("X-Webhook-Event", string(job.event.Type))
req.Header.Set("X-Webhook-Delivery", string(delivery.ID))
// Sign payload if secret is configured
if job.webhook.HasSecret() {
signature := d.signPayload(payloadBytes, job.webhook.Secret)
req.Header.Set("X-Webhook-Signature", signature)
}
// Send request
resp, err := d.client.Do(req)
if err != nil {
delivery.Success = false
delivery.ErrorMessage = fmt.Sprintf("request failed: %v", err)
d.config.Logger.Debug("webhook delivery failed",
"webhook_id", job.webhook.ID,
"delivery_id", delivery.ID,
"error", err,
)
return delivery
}
defer func() { _ = resp.Body.Close() }()
delivery.ResponseStatus = resp.StatusCode
// Read response body (limited)
bodyBytes, _ := io.ReadAll(io.LimitReader(resp.Body, int64(d.config.MaxResponseBodySize)))
delivery.ResponseBody = string(bodyBytes)
// Check if delivery was successful (2xx status codes)
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
delivery.Success = true
d.config.Logger.Debug("webhook delivered successfully",
"webhook_id", job.webhook.ID,
"delivery_id", delivery.ID,
"status", resp.StatusCode,
)
} else {
delivery.Success = false
delivery.ErrorMessage = fmt.Sprintf("received non-2xx status: %d", resp.StatusCode)
d.config.Logger.Debug("webhook delivery failed",
"webhook_id", job.webhook.ID,
"delivery_id", delivery.ID,
"status", resp.StatusCode,
)
}
return delivery
}
// signPayload creates an HMAC-SHA256 signature of the payload.
func (d *Dispatcher) signPayload(payload []byte, secret string) string {
mac := hmac.New(sha256.New, []byte(secret))
mac.Write(payload)
return "sha256=" + hex.EncodeToString(mac.Sum(nil))
}
// DispatchToURL sends a webhook to a specific URL (for callback URLs).
// This is a simpler version of Dispatch for one-off callbacks that don't go through
// the webhook subscription system.
func (d *Dispatcher) DispatchToURL(url string, eventType string, payload map[string]any) error {
if url == "" {
return nil
}
// Build payload with standard envelope
fullPayload := map[string]any{
"id": uuid.New().String(),
"event": eventType,
"timestamp": time.Now().UTC().Format(time.RFC3339),
"data": payload,
}
payloadBytes, err := json.Marshal(fullPayload)
if err != nil {
return fmt.Errorf("marshal payload: %w", err)
}
// Create request
req, err := http.NewRequestWithContext(d.ctx, http.MethodPost, url, bytes.NewReader(payloadBytes))
if err != nil {
return fmt.Errorf("create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("User-Agent", "rdev-webhook/1.0")
req.Header.Set("X-Webhook-Event", eventType)
// Send request
resp, err := d.client.Do(req)
if err != nil {
return fmt.Errorf("request failed: %w", err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return fmt.Errorf("received non-2xx status: %d", resp.StatusCode)
}
d.config.Logger.Debug("callback sent successfully",
"url", url,
"event", eventType,
"status", resp.StatusCode,
)
return nil
}