From 36d73dd23ddb0a2227344f2a179ec76719b3cfee Mon Sep 17 00:00:00 2001 From: rdev-worker Date: Thu, 5 Feb 2026 21:40:58 +0000 Subject: [PATCH] build: /implement-feature mesh-interop --requirements 'Chat Service must cal... --- pkg/queue/redis.go | 340 ++++++++++++++++++ pkg/svc/client.go | 13 + .../internal/api/handlers/validate.go | 80 +++++ .../internal/api/handlers/validate_test.go | 127 +++++++ services/auth-svc/internal/api/routes.go | 12 + services/auth-svc/internal/api/spec.go | 43 +++ services/chat-svc/internal/client/auth.go | 86 +++++ .../chat-svc/internal/client/auth_test.go | 123 +++++++ services/chat-svc/internal/client/queue.go | 68 ++++ .../chat-svc/internal/client/queue_test.go | 134 +++++++ workers/worker-svc/cmd/worker/main.go | 95 +++-- workers/worker-svc/internal/config/config.go | 11 +- workers/worker-svc/internal/handlers/tasks.go | 124 +++++++ .../internal/handlers/tasks_test.go | 213 +++++++++++ 14 files changed, 1441 insertions(+), 28 deletions(-) create mode 100644 pkg/queue/redis.go create mode 100644 services/auth-svc/internal/api/handlers/validate.go create mode 100644 services/auth-svc/internal/api/handlers/validate_test.go create mode 100644 services/chat-svc/internal/client/auth.go create mode 100644 services/chat-svc/internal/client/auth_test.go create mode 100644 services/chat-svc/internal/client/queue.go create mode 100644 services/chat-svc/internal/client/queue_test.go create mode 100644 workers/worker-svc/internal/handlers/tasks.go create mode 100644 workers/worker-svc/internal/handlers/tasks_test.go diff --git a/pkg/queue/redis.go b/pkg/queue/redis.go new file mode 100644 index 0000000..ab009a9 --- /dev/null +++ b/pkg/queue/redis.go @@ -0,0 +1,340 @@ +package queue + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/google/uuid" + "github.com/redis/go-redis/v9" + + "git.threesix.ai/jordan/sp4-verify-1770325799/pkg/logging" +) + +const ( + // RedisQueueKey is the sorted set key for the job queue + RedisQueueKey = "jobs:queue" + // RedisJobPrefix is the prefix for job data hash keys + RedisJobPrefix = "jobs:data:" + // RedisRunningKey is the set of currently running job IDs + RedisRunningKey = "jobs:running" +) + +// RedisQueue implements Producer and Consumer using Redis. +// Uses sorted sets for priority ordering and lists for atomic dequeue. +type RedisQueue struct { + client *redis.Client + logger *logging.Logger +} + +// Ensure RedisQueue implements Queue at compile time. +var _ Queue = (*RedisQueue)(nil) + +// NewRedisQueue creates a queue backed by Redis. +func NewRedisQueue(client *redis.Client, logger *logging.Logger) *RedisQueue { + return &RedisQueue{ + client: client, + logger: logger.WithComponent("redis-queue"), + } +} + +// Enqueue adds a job to the queue with default options. +func (q *RedisQueue) Enqueue(ctx context.Context, jobType string, payload map[string]any) (string, error) { + return q.EnqueueWithOptions(ctx, Job{ + Type: jobType, + Payload: payload, + Priority: 0, + MaxRetries: 3, + }) +} + +// EnqueueWithOptions adds a job with custom configuration. +func (q *RedisQueue) EnqueueWithOptions(ctx context.Context, job Job) (string, error) { + if job.Type == "" { + return "", fmt.Errorf("job type is required") + } + + job.ID = uuid.New().String() + job.Status = StatusPending + job.CreatedAt = time.Now().UTC() + + if job.MaxRetries == 0 { + job.MaxRetries = 3 + } + if job.MaxRetries > 100 { + job.MaxRetries = 100 + } + if job.Payload == nil { + job.Payload = make(map[string]any) + } + + // Serialize job to JSON + data, err := json.Marshal(job) + if err != nil { + return "", fmt.Errorf("marshal job: %w", err) + } + + // Use a pipeline for atomic operations + pipe := q.client.Pipeline() + + // Store job data + jobKey := RedisJobPrefix + job.ID + pipe.Set(ctx, jobKey, data, 0) + + // Add to sorted set with score = -priority (higher priority = lower score = first out) + // Secondary sort by timestamp for FIFO within same priority + score := float64(-job.Priority) + float64(job.CreatedAt.UnixNano())/1e18 + pipe.ZAdd(ctx, RedisQueueKey, redis.Z{ + Score: score, + Member: job.ID, + }) + + _, err = pipe.Exec(ctx) + if err != nil { + return "", fmt.Errorf("enqueue job: %w", err) + } + + q.logger.Debug("job enqueued", "job_id", job.ID, "type", job.Type, "priority", job.Priority) + return job.ID, nil +} + +// Dequeue atomically claims the next pending job. +func (q *RedisQueue) Dequeue(ctx context.Context, workerID string) (*Job, error) { + // Pop the highest priority job (lowest score) atomically + result, err := q.client.ZPopMin(ctx, RedisQueueKey, 1).Result() + if err != nil { + return nil, fmt.Errorf("dequeue job: %w", err) + } + + if len(result) == 0 { + return nil, ErrNoJob + } + + jobID := result[0].Member.(string) + jobKey := RedisJobPrefix + jobID + + // Get job data + data, err := q.client.Get(ctx, jobKey).Bytes() + if err != nil { + if errors.Is(err, redis.Nil) { + return nil, ErrJobNotFound + } + return nil, fmt.Errorf("get job data: %w", err) + } + + var job Job + if err := json.Unmarshal(data, &job); err != nil { + return nil, fmt.Errorf("unmarshal job: %w", err) + } + + // Update job status + now := time.Now().UTC() + job.Status = StatusRunning + job.StartedAt = &now + job.WorkerID = workerID + + // Save updated job and add to running set + updatedData, err := json.Marshal(job) + if err != nil { + return nil, fmt.Errorf("marshal updated job: %w", err) + } + + pipe := q.client.Pipeline() + pipe.Set(ctx, jobKey, updatedData, 0) + pipe.SAdd(ctx, RedisRunningKey, jobID) + _, err = pipe.Exec(ctx) + if err != nil { + return nil, fmt.Errorf("update job status: %w", err) + } + + q.logger.Debug("job dequeued", "job_id", job.ID, "type", job.Type, "worker_id", workerID) + return &job, nil +} + +// Ack marks a job as successfully completed. +func (q *RedisQueue) Ack(ctx context.Context, jobID string) error { + jobKey := RedisJobPrefix + jobID + + data, err := q.client.Get(ctx, jobKey).Bytes() + if err != nil { + if errors.Is(err, redis.Nil) { + return ErrJobNotFound + } + return fmt.Errorf("get job: %w", err) + } + + var job Job + if err := json.Unmarshal(data, &job); err != nil { + return fmt.Errorf("unmarshal job: %w", err) + } + + now := time.Now().UTC() + job.Status = StatusCompleted + job.CompletedAt = &now + + updatedData, err := json.Marshal(job) + if err != nil { + return fmt.Errorf("marshal job: %w", err) + } + + pipe := q.client.Pipeline() + pipe.Set(ctx, jobKey, updatedData, 24*time.Hour) // Keep completed jobs for 24h + pipe.SRem(ctx, RedisRunningKey, jobID) + _, err = pipe.Exec(ctx) + if err != nil { + return fmt.Errorf("ack job: %w", err) + } + + q.logger.Debug("job completed", "job_id", jobID) + return nil +} + +// Fail marks a job as failed, requeuing if retries remain. +func (q *RedisQueue) Fail(ctx context.Context, jobID string, errMsg string) error { + jobKey := RedisJobPrefix + jobID + + data, err := q.client.Get(ctx, jobKey).Bytes() + if err != nil { + if errors.Is(err, redis.Nil) { + return ErrJobNotFound + } + return fmt.Errorf("get job: %w", err) + } + + var job Job + if err := json.Unmarshal(data, &job); err != nil { + return fmt.Errorf("unmarshal job: %w", err) + } + + job.RetryCount++ + job.Error = errMsg + + pipe := q.client.Pipeline() + pipe.SRem(ctx, RedisRunningKey, jobID) + + if job.RetryCount >= job.MaxRetries { + // Exhausted retries - mark as failed + now := time.Now().UTC() + job.Status = StatusFailed + job.CompletedAt = &now + + updatedData, err := json.Marshal(job) + if err != nil { + return fmt.Errorf("marshal job: %w", err) + } + pipe.Set(ctx, jobKey, updatedData, 24*time.Hour) // Keep failed jobs for 24h + } else { + // Requeue for retry + job.Status = StatusPending + job.StartedAt = nil + job.WorkerID = "" + + updatedData, err := json.Marshal(job) + if err != nil { + return fmt.Errorf("marshal job: %w", err) + } + pipe.Set(ctx, jobKey, updatedData, 0) + + // Re-add to queue with original priority + score := float64(-job.Priority) + float64(job.CreatedAt.UnixNano())/1e18 + pipe.ZAdd(ctx, RedisQueueKey, redis.Z{ + Score: score, + Member: jobID, + }) + } + + _, err = pipe.Exec(ctx) + if err != nil { + return fmt.Errorf("fail job: %w", err) + } + + logErrMsg := errMsg + if len(logErrMsg) > 500 { + logErrMsg = logErrMsg[:497] + "..." + } + q.logger.Debug("job failed", "job_id", jobID, "retry_count", job.RetryCount, "max_retries", job.MaxRetries, "error", logErrMsg) + return nil +} + +// Heartbeat extends the job's visibility timeout (no-op for Redis implementation). +func (q *RedisQueue) Heartbeat(ctx context.Context, jobID string) error { + // For Redis, we track running jobs in a set but don't have visibility timeout. + // This could be extended to use Redis EXPIRE on job keys if needed. + return nil +} + +// GetJob retrieves a job by ID (for inspection/debugging). +func (q *RedisQueue) GetJob(ctx context.Context, jobID string) (*Job, error) { + jobKey := RedisJobPrefix + jobID + + data, err := q.client.Get(ctx, jobKey).Bytes() + if err != nil { + if errors.Is(err, redis.Nil) { + return nil, ErrJobNotFound + } + return nil, fmt.Errorf("get job: %w", err) + } + + var job Job + if err := json.Unmarshal(data, &job); err != nil { + return nil, fmt.Errorf("unmarshal job: %w", err) + } + + return &job, nil +} + +// QueueLength returns the number of pending jobs. +func (q *RedisQueue) QueueLength(ctx context.Context) (int64, error) { + return q.client.ZCard(ctx, RedisQueueKey).Result() +} + +// RequeueStale requeues jobs that have been running too long. +func (q *RedisQueue) RequeueStale(ctx context.Context, timeout time.Duration) (int64, error) { + // Get all running job IDs + runningIDs, err := q.client.SMembers(ctx, RedisRunningKey).Result() + if err != nil { + return 0, fmt.Errorf("get running jobs: %w", err) + } + + cutoff := time.Now().UTC().Add(-timeout) + var requeued int64 + + for _, jobID := range runningIDs { + job, err := q.GetJob(ctx, jobID) + if err != nil { + continue // Job may have been deleted + } + + if job.StartedAt != nil && job.StartedAt.Before(cutoff) { + // Requeue stale job + job.Status = StatusPending + job.StartedAt = nil + job.WorkerID = "" + + data, err := json.Marshal(job) + if err != nil { + continue + } + + pipe := q.client.Pipeline() + pipe.Set(ctx, RedisJobPrefix+jobID, data, 0) + pipe.SRem(ctx, RedisRunningKey, jobID) + score := float64(-job.Priority) + float64(job.CreatedAt.UnixNano())/1e18 + pipe.ZAdd(ctx, RedisQueueKey, redis.Z{ + Score: score, + Member: jobID, + }) + _, err = pipe.Exec(ctx) + if err == nil { + requeued++ + } + } + } + + if requeued > 0 { + q.logger.Info("requeued stale jobs", "count", requeued, "timeout", timeout) + } + return requeued, nil +} diff --git a/pkg/svc/client.go b/pkg/svc/client.go index 0e81b47..fa683d2 100644 --- a/pkg/svc/client.go +++ b/pkg/svc/client.go @@ -144,6 +144,19 @@ func (c *Client) DoJSON(ctx context.Context, method, path string, body any) (*ht return c.httpClient.Do(req) } +// DoRequest performs an HTTP request with custom headers. +// This is useful when you need to forward headers like Authorization. +func (c *Client) DoRequest(req *http.Request) (*http.Response, error) { + return c.httpClient.Do(req) +} + +// NewRequest creates a new HTTP request for this service. +// The path is appended to the service's base URL. +func (c *Client) NewRequest(ctx context.Context, method, path string, body io.Reader) (*http.Request, error) { + url := c.baseURL + path + return http.NewRequestWithContext(ctx, method, url, body) +} + // ServiceName returns the name of the service this client connects to. func (c *Client) ServiceName() string { return c.serviceName diff --git a/services/auth-svc/internal/api/handlers/validate.go b/services/auth-svc/internal/api/handlers/validate.go new file mode 100644 index 0000000..b0c8dba --- /dev/null +++ b/services/auth-svc/internal/api/handlers/validate.go @@ -0,0 +1,80 @@ +package handlers + +import ( + "net/http" + "strings" + + "git.threesix.ai/jordan/sp4-verify-1770325799/pkg/auth" + "git.threesix.ai/jordan/sp4-verify-1770325799/pkg/httperror" + "git.threesix.ai/jordan/sp4-verify-1770325799/pkg/httpresponse" + "git.threesix.ai/jordan/sp4-verify-1770325799/pkg/logging" +) + +// Validate handles token validation requests from sibling services. +type Validate struct { + validator *auth.JWTValidator + logger *logging.Logger +} + +// NewValidate creates a new Validate handler. +func NewValidate(validator *auth.JWTValidator, logger *logging.Logger) *Validate { + return &Validate{ + validator: validator, + logger: logger.WithComponent("ValidateHandler"), + } +} + +// ValidateRequest is the request body for token validation. +type ValidateRequest struct { + Token string `json:"token" validate:"required"` +} + +// ValidateResponse is the response for token validation. +type ValidateResponse struct { + Valid bool `json:"valid"` + User *auth.User `json:"user,omitempty"` + Error string `json:"error,omitempty"` +} + +// Check validates a JWT token and returns the user information. +func (h *Validate) Check(w http.ResponseWriter, r *http.Request) error { + // Extract token from Authorization header or request body + token := extractToken(r) + if token == "" { + return httperror.BadRequest("token is required") + } + + // Validate the token + user, err := h.validator.Validate(r.Context(), token) + if err != nil { + h.logger.Debug("token validation failed", "error", err) + httpresponse.OK(w, r, ValidateResponse{ + Valid: false, + Error: err.Error(), + }) + return nil + } + + httpresponse.OK(w, r, ValidateResponse{ + Valid: true, + User: user, + }) + return nil +} + +// extractToken extracts the JWT token from the request. +// Checks Authorization header first, then falls back to query parameter. +func extractToken(r *http.Request) string { + // Check Authorization header + authHeader := r.Header.Get("Authorization") + if authHeader != "" { + // Handle "Bearer " format + if strings.HasPrefix(authHeader, "Bearer ") { + return strings.TrimPrefix(authHeader, "Bearer ") + } + return authHeader + } + + // Check query parameter + return r.URL.Query().Get("token") +} diff --git a/services/auth-svc/internal/api/handlers/validate_test.go b/services/auth-svc/internal/api/handlers/validate_test.go new file mode 100644 index 0000000..49de48a --- /dev/null +++ b/services/auth-svc/internal/api/handlers/validate_test.go @@ -0,0 +1,127 @@ +package handlers + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "git.threesix.ai/jordan/sp4-verify-1770325799/pkg/auth" + "git.threesix.ai/jordan/sp4-verify-1770325799/pkg/logging" +) + +func TestValidate_Check(t *testing.T) { + secret := []byte("test-secret") + logger := logging.New(logging.Config{Level: logging.LevelDebug}) + + validator := auth.NewJWTValidator(auth.JWTConfig{ + Secret: secret, + Issuer: "sp4-verify-1770325799", + }) + handler := NewValidate(validator, logger) + + // Generate a valid token + user := &auth.User{ + ID: "user-123", + Email: "test@example.com", + Roles: []string{"admin"}, + Scopes: []string{"read", "write"}, + } + validToken, err := auth.GenerateTokenWithIssuer(secret, user, time.Hour, "sp4-verify-1770325799", "") + if err != nil { + t.Fatalf("failed to generate token: %v", err) + } + + tests := []struct { + name string + authHeader string + queryToken string + wantValid bool + wantUserID string + wantStatusCode int + }{ + { + name: "valid token in Authorization header", + authHeader: "Bearer " + validToken, + wantValid: true, + wantUserID: "user-123", + wantStatusCode: http.StatusOK, + }, + { + name: "valid token without Bearer prefix", + authHeader: validToken, + wantValid: true, + wantUserID: "user-123", + wantStatusCode: http.StatusOK, + }, + { + name: "valid token in query parameter", + queryToken: validToken, + wantValid: true, + wantUserID: "user-123", + wantStatusCode: http.StatusOK, + }, + { + name: "invalid token", + authHeader: "Bearer invalid-token", + wantValid: false, + wantStatusCode: http.StatusOK, + }, + { + name: "missing token", + wantValid: false, + wantStatusCode: http.StatusBadRequest, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + url := "/api/auth-svc/validate" + if tt.queryToken != "" { + url += "?token=" + tt.queryToken + } + + req := httptest.NewRequest(http.MethodGet, url, nil) + if tt.authHeader != "" { + req.Header.Set("Authorization", tt.authHeader) + } + + rr := httptest.NewRecorder() + err := handler.Check(rr, req) + + // Check if error was returned (for bad request cases) + if tt.wantStatusCode == http.StatusBadRequest { + if err == nil { + t.Error("expected error for missing token") + } + return + } + + if err != nil { + t.Fatalf("handler returned error: %v", err) + } + + if rr.Code != tt.wantStatusCode { + t.Errorf("status code = %d, want %d", rr.Code, tt.wantStatusCode) + } + + var resp struct { + Data ValidateResponse `json:"data"` + } + if err := json.NewDecoder(rr.Body).Decode(&resp); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + + if resp.Data.Valid != tt.wantValid { + t.Errorf("valid = %v, want %v", resp.Data.Valid, tt.wantValid) + } + + if tt.wantValid && resp.Data.User != nil { + if resp.Data.User.ID != tt.wantUserID { + t.Errorf("user ID = %s, want %s", resp.Data.User.ID, tt.wantUserID) + } + } + }) + } +} diff --git a/services/auth-svc/internal/api/routes.go b/services/auth-svc/internal/api/routes.go index 2d6ea2c..d962700 100644 --- a/services/auth-svc/internal/api/routes.go +++ b/services/auth-svc/internal/api/routes.go @@ -14,13 +14,21 @@ import ( // This allows the monorepo to expose multiple services under a single domain: // - https://domain/api/auth-svc/health // - https://domain/api/auth-svc/examples +// - https://domain/api/auth-svc/validate func RegisterRoutes(application *app.App, exampleService *service.ExampleService) { logger := application.Logger() cfg := config.Load() + // Initialize JWT validator for token validation endpoint + jwtValidator := auth.NewJWTValidator(auth.JWTConfig{ + Secret: []byte(cfg.JWTSecret), + Issuer: "sp4-verify-1770325799", + }) + // Initialize handlers with injected services healthHandler := handlers.NewHealth(logger) exampleHandler := handlers.NewExample(exampleService, logger) + validateHandler := handlers.NewValidate(jwtValidator, logger) // Build and mount OpenAPI spec spec := NewServiceSpec() @@ -31,6 +39,10 @@ func RegisterRoutes(application *app.App, exampleService *service.ExampleService application.Route("/api/auth-svc", func(r app.Router) { r.Get("/health", healthHandler.Check) + // Token validation endpoint (for sibling services) + r.Post("/validate", app.Wrap(validateHandler.Check)) + r.Get("/validate", app.Wrap(validateHandler.Check)) + // Public routes (no auth required) r.Get("/examples", app.Wrap(exampleHandler.List)) r.Get("/examples/{id}", app.Wrap(exampleHandler.Get)) diff --git a/services/auth-svc/internal/api/spec.go b/services/auth-svc/internal/api/spec.go index 9f69bc4..b7019ff 100644 --- a/services/auth-svc/internal/api/spec.go +++ b/services/auth-svc/internal/api/spec.go @@ -8,6 +8,7 @@ func NewServiceSpec() *openapi.OpenAPISpec { WithDescription("REST API for the auth-svc service"). WithBearerSecurity("bearer", "JWT authentication token"). WithTag("Health", "Service health endpoints"). + WithTag("Auth", "Authentication and token validation"). WithTag("Examples", "Example CRUD endpoints") // Define reusable schemas @@ -29,6 +30,18 @@ func NewServiceSpec() *openapi.OpenAPISpec { "description": openapi.StringWithMinMax(0, 500).WithDescription("Updated description"), })) + // Validate response schema + spec.WithSchema("ValidateResponse", openapi.Object(map[string]openapi.Schema{ + "valid": openapi.Boolean().WithDescription("Whether the token is valid"), + "user": openapi.Object(map[string]openapi.Schema{ + "id": openapi.String().WithDescription("User ID"), + "email": openapi.String().WithDescription("User email"), + "roles": openapi.Array(openapi.String()).WithDescription("User roles"), + "scopes": openapi.Array(openapi.String()).WithDescription("User scopes"), + }).WithDescription("User information (only present if valid)"), + "error": openapi.String().WithDescription("Error message (only present if invalid)"), + }, "valid")) + // Health spec.AddPath("/api/auth-svc/health", "get", map[string]any{ "summary": "Health check", @@ -41,6 +54,36 @@ func NewServiceSpec() *openapi.OpenAPISpec { }, }) + // Validate token + spec.AddPath("/api/auth-svc/validate", "post", map[string]any{ + "summary": "Validate token", + "description": "Validates a JWT token and returns user information. Used by sibling services for authentication.", + "tags": []string{"Auth"}, + "security": []map[string][]string{{"bearer": {}}}, + "responses": map[string]any{ + "200": openapi.OpResponse("Validation result", openapi.ResponseSchema(openapi.Ref("ValidateResponse"))), + "400": openapi.OpResponse("Bad request", openapi.ErrorResponseSchema()), + }, + }) + + spec.AddPath("/api/auth-svc/validate", "get", map[string]any{ + "summary": "Validate token (GET)", + "description": "Validates a JWT token. Accepts token via Authorization header or query parameter.", + "tags": []string{"Auth"}, + "parameters": []any{ + map[string]any{ + "name": "token", + "in": "query", + "description": "JWT token to validate (alternative to Authorization header)", + "schema": openapi.String(), + }, + }, + "responses": map[string]any{ + "200": openapi.OpResponse("Validation result", openapi.ResponseSchema(openapi.Ref("ValidateResponse"))), + "400": openapi.OpResponse("Bad request", openapi.ErrorResponseSchema()), + }, + }) + // List examples spec.AddPath("/api/auth-svc/examples", "get", map[string]any{ "summary": "List examples", diff --git a/services/chat-svc/internal/client/auth.go b/services/chat-svc/internal/client/auth.go new file mode 100644 index 0000000..3c0b142 --- /dev/null +++ b/services/chat-svc/internal/client/auth.go @@ -0,0 +1,86 @@ +// Package client provides clients for communicating with sibling services. +package client + +import ( + "context" + "fmt" + "net/http" + + "git.threesix.ai/jordan/sp4-verify-1770325799/pkg/auth" + "git.threesix.ai/jordan/sp4-verify-1770325799/pkg/svc" +) + +// AuthClient communicates with the auth-svc for token validation. +type AuthClient struct { + client *svc.Client +} + +// ValidateResponse is the response from the auth-svc /validate endpoint. +type ValidateResponse struct { + Data struct { + Valid bool `json:"valid"` + User *auth.User `json:"user,omitempty"` + Error string `json:"error,omitempty"` + } `json:"data"` +} + +// NewAuthClient creates a new client for the auth-svc. +// Returns an error if AUTH_SVC_URL is not configured. +func NewAuthClient() (*AuthClient, error) { + client, err := svc.NewClient("auth-svc") + if err != nil { + return nil, err + } + return &AuthClient{client: client}, nil +} + +// ValidateToken validates a JWT token by calling auth-svc. +// Returns the user if valid, or an error if invalid or communication fails. +func (c *AuthClient) ValidateToken(ctx context.Context, token string) (*auth.User, error) { + req, err := c.client.NewRequest(ctx, http.MethodGet, "/api/auth-svc/validate", nil) + if err != nil { + return nil, fmt.Errorf("create request: %w", err) + } + req.Header.Set("Authorization", "Bearer "+token) + + resp, err := c.client.DoRequest(req) + if err != nil { + return nil, fmt.Errorf("call auth-svc: %w", err) + } + + result, err := svc.DecodeResponse[ValidateResponse](resp) + if err != nil { + return nil, fmt.Errorf("decode response: %w", err) + } + + if !result.Data.Valid { + return nil, fmt.Errorf("token validation failed: %s", result.Data.Error) + } + + return result.Data.User, nil +} + +// ValidateTokenWithHeader validates a token by forwarding the Authorization header. +func (c *AuthClient) ValidateTokenWithHeader(ctx context.Context, authHeader string) (*auth.User, error) { + req, err := c.client.NewRequest(ctx, http.MethodGet, "/api/auth-svc/validate", nil) + if err != nil { + return nil, fmt.Errorf("create request: %w", err) + } + req.Header.Set("Authorization", authHeader) + + resp, err := c.client.DoRequest(req) + if err != nil { + return nil, fmt.Errorf("call auth-svc: %w", err) + } + + result, err := svc.DecodeResponse[ValidateResponse](resp) + if err != nil { + return nil, fmt.Errorf("decode response: %w", err) + } + + if !result.Data.Valid { + return nil, fmt.Errorf("token validation failed: %s", result.Data.Error) + } + + return result.Data.User, nil +} diff --git a/services/chat-svc/internal/client/auth_test.go b/services/chat-svc/internal/client/auth_test.go new file mode 100644 index 0000000..51218cf --- /dev/null +++ b/services/chat-svc/internal/client/auth_test.go @@ -0,0 +1,123 @@ +package client + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "os" + "testing" + + "git.threesix.ai/jordan/sp4-verify-1770325799/pkg/auth" +) + +func TestAuthClient_ValidateToken(t *testing.T) { + tests := []struct { + name string + response ValidateResponse + statusCode int + wantErr bool + wantUserID string + }{ + { + name: "valid token", + response: ValidateResponse{ + Data: struct { + Valid bool `json:"valid"` + User *auth.User `json:"user,omitempty"` + Error string `json:"error,omitempty"` + }{ + Valid: true, + User: &auth.User{ + ID: "user-123", + Email: "test@example.com", + }, + }, + }, + statusCode: http.StatusOK, + wantErr: false, + wantUserID: "user-123", + }, + { + name: "invalid token", + response: ValidateResponse{ + Data: struct { + Valid bool `json:"valid"` + User *auth.User `json:"user,omitempty"` + Error string `json:"error,omitempty"` + }{ + Valid: false, + Error: "token expired", + }, + }, + statusCode: http.StatusOK, + wantErr: true, + }, + { + name: "server error", + statusCode: http.StatusInternalServerError, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create test server + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Verify path + if r.URL.Path != "/api/auth-svc/validate" { + t.Errorf("unexpected path: %s", r.URL.Path) + } + + // Verify auth header is present + if r.Header.Get("Authorization") == "" { + t.Error("missing Authorization header") + } + + w.WriteHeader(tt.statusCode) + if tt.statusCode == http.StatusOK { + _ = json.NewEncoder(w).Encode(tt.response) + } + })) + defer server.Close() + + // Set the env var for service discovery + os.Setenv("AUTH_SVC_URL", server.URL) + defer os.Unsetenv("AUTH_SVC_URL") + + // Create client + client, err := NewAuthClient() + if err != nil { + t.Fatalf("failed to create client: %v", err) + } + + // Call validate + user, err := client.ValidateToken(context.Background(), "test-token") + + if tt.wantErr { + if err == nil { + t.Error("expected error, got nil") + } + return + } + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if user.ID != tt.wantUserID { + t.Errorf("user ID = %s, want %s", user.ID, tt.wantUserID) + } + }) + } +} + +func TestNewAuthClient_MissingURL(t *testing.T) { + // Ensure env var is not set + os.Unsetenv("AUTH_SVC_URL") + + _, err := NewAuthClient() + if err == nil { + t.Error("expected error when AUTH_SVC_URL is not set") + } +} diff --git a/services/chat-svc/internal/client/queue.go b/services/chat-svc/internal/client/queue.go new file mode 100644 index 0000000..073e11e --- /dev/null +++ b/services/chat-svc/internal/client/queue.go @@ -0,0 +1,68 @@ +package client + +import ( + "context" + "fmt" + "os" + + "github.com/redis/go-redis/v9" + + "git.threesix.ai/jordan/sp4-verify-1770325799/pkg/logging" + "git.threesix.ai/jordan/sp4-verify-1770325799/pkg/queue" +) + +// QueueClient provides access to the Redis job queue for pushing tasks. +type QueueClient struct { + producer *queue.RedisQueue + redis *redis.Client +} + +// NewQueueClient creates a new Redis queue client. +// Uses REDIS_URL environment variable for connection. +func NewQueueClient(logger *logging.Logger) (*QueueClient, error) { + redisURL := os.Getenv("REDIS_URL") + if redisURL == "" { + return nil, fmt.Errorf("REDIS_URL environment variable not set") + } + + opts, err := redis.ParseURL(redisURL) + if err != nil { + return nil, fmt.Errorf("invalid REDIS_URL: %w", err) + } + + client := redis.NewClient(opts) + + // Test connection + if err := client.Ping(context.Background()).Err(); err != nil { + return nil, fmt.Errorf("redis connection failed: %w", err) + } + + return &QueueClient{ + producer: queue.NewRedisQueue(client, logger), + redis: client, + }, nil +} + +// PushTask enqueues a task for the worker to process. +func (c *QueueClient) PushTask(ctx context.Context, taskType string, payload map[string]any) (string, error) { + return c.producer.Enqueue(ctx, taskType, payload) +} + +// PushTaskWithPriority enqueues a task with a specific priority (higher = more urgent). +func (c *QueueClient) PushTaskWithPriority(ctx context.Context, taskType string, payload map[string]any, priority int) (string, error) { + return c.producer.EnqueueWithOptions(ctx, queue.Job{ + Type: taskType, + Payload: payload, + Priority: priority, + }) +} + +// Close closes the Redis connection. +func (c *QueueClient) Close() error { + return c.redis.Close() +} + +// HealthCheck verifies the Redis connection. +func (c *QueueClient) HealthCheck(ctx context.Context) error { + return c.redis.Ping(ctx).Err() +} diff --git a/services/chat-svc/internal/client/queue_test.go b/services/chat-svc/internal/client/queue_test.go new file mode 100644 index 0000000..5f11339 --- /dev/null +++ b/services/chat-svc/internal/client/queue_test.go @@ -0,0 +1,134 @@ +package client + +import ( + "context" + "os" + "testing" + + "github.com/redis/go-redis/v9" + + "git.threesix.ai/jordan/sp4-verify-1770325799/pkg/logging" + "git.threesix.ai/jordan/sp4-verify-1770325799/pkg/queue" +) + +func TestQueueClient_PushTask(t *testing.T) { + // Skip if REDIS_URL not set (integration test) + redisURL := os.Getenv("REDIS_URL") + if redisURL == "" { + t.Skip("REDIS_URL not set, skipping integration test") + } + + logger := logging.New(logging.Config{Level: logging.LevelDebug}) + + client, err := NewQueueClient(logger) + if err != nil { + t.Fatalf("failed to create queue client: %v", err) + } + defer client.Close() + + // Push a task + jobID, err := client.PushTask(context.Background(), "test_task", map[string]any{ + "message": "hello", + "count": 42, + }) + if err != nil { + t.Fatalf("failed to push task: %v", err) + } + + if jobID == "" { + t.Error("expected non-empty job ID") + } + + t.Logf("pushed task with ID: %s", jobID) +} + +func TestQueueClient_PushTaskWithPriority(t *testing.T) { + redisURL := os.Getenv("REDIS_URL") + if redisURL == "" { + t.Skip("REDIS_URL not set, skipping integration test") + } + + logger := logging.New(logging.Config{Level: logging.LevelDebug}) + + client, err := NewQueueClient(logger) + if err != nil { + t.Fatalf("failed to create queue client: %v", err) + } + defer client.Close() + + // Push tasks with different priorities + _, err = client.PushTaskWithPriority(context.Background(), "low_priority", map[string]any{"level": "low"}, 0) + if err != nil { + t.Fatalf("failed to push low priority task: %v", err) + } + + _, err = client.PushTaskWithPriority(context.Background(), "high_priority", map[string]any{"level": "high"}, 10) + if err != nil { + t.Fatalf("failed to push high priority task: %v", err) + } +} + +func TestNewQueueClient_MissingURL(t *testing.T) { + os.Unsetenv("REDIS_URL") + + logger := logging.New(logging.Config{Level: logging.LevelDebug}) + _, err := NewQueueClient(logger) + if err == nil { + t.Error("expected error when REDIS_URL is not set") + } +} + +func TestRedisQueue_Integration(t *testing.T) { + redisURL := os.Getenv("REDIS_URL") + if redisURL == "" { + t.Skip("REDIS_URL not set, skipping integration test") + } + + opts, err := redis.ParseURL(redisURL) + if err != nil { + t.Fatalf("failed to parse REDIS_URL: %v", err) + } + + client := redis.NewClient(opts) + defer client.Close() + + logger := logging.New(logging.Config{Level: logging.LevelDebug}) + q := queue.NewRedisQueue(client, logger) + + ctx := context.Background() + + // Test enqueue + jobID, err := q.Enqueue(ctx, "test_job", map[string]any{"key": "value"}) + if err != nil { + t.Fatalf("failed to enqueue: %v", err) + } + + // Test dequeue + job, err := q.Dequeue(ctx, "test-worker") + if err != nil { + t.Fatalf("failed to dequeue: %v", err) + } + + if job.ID != jobID { + t.Errorf("job ID = %s, want %s", job.ID, jobID) + } + + if job.Type != "test_job" { + t.Errorf("job type = %s, want test_job", job.Type) + } + + // Test ack + if err := q.Ack(ctx, jobID); err != nil { + t.Fatalf("failed to ack: %v", err) + } + + // Verify job is completed + completedJob, err := q.GetJob(ctx, jobID) + if err != nil { + t.Fatalf("failed to get job: %v", err) + } + + if completedJob.Status != queue.StatusCompleted { + t.Errorf("job status = %s, want %s", completedJob.Status, queue.StatusCompleted) + } +} diff --git a/workers/worker-svc/cmd/worker/main.go b/workers/worker-svc/cmd/worker/main.go index 7c1ee13..bf2ae8c 100644 --- a/workers/worker-svc/cmd/worker/main.go +++ b/workers/worker-svc/cmd/worker/main.go @@ -9,6 +9,8 @@ import ( "syscall" "time" + "github.com/redis/go-redis/v9" + "git.threesix.ai/jordan/sp4-verify-1770325799/pkg/database" "git.threesix.ai/jordan/sp4-verify-1770325799/pkg/logging" "git.threesix.ai/jordan/sp4-verify-1770325799/pkg/queue" @@ -47,37 +49,64 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // Connect to database - pool, err := database.Connect(ctx, cfg.Database.URL, database.Options{ - MaxOpenConns: cfg.Database.MaxOpenConns, - MaxIdleConns: cfg.Database.MaxIdleConns, - ConnMaxLifetime: cfg.Database.ConnMaxLifetime, - }) - if err != nil { - logger.Error("failed to connect to database", "error", err) - os.Exit(1) + // Determine queue backend based on configuration + var jobQueue queue.Queue + var redisClient *redis.Client + + if cfg.Redis.URL != "" { + // Use Redis queue + opts, err := redis.ParseURL(cfg.Redis.URL) + if err != nil { + logger.Error("failed to parse REDIS_URL", "error", err) + os.Exit(1) + } + + redisClient = redis.NewClient(opts) + if err := redisClient.Ping(ctx).Err(); err != nil { + logger.Error("failed to connect to Redis", "error", err) + os.Exit(1) + } + defer redisClient.Close() + + jobQueue = queue.NewRedisQueue(redisClient, logger) + logger.Info("using Redis queue", "url", cfg.Redis.URL) + } else { + // Fall back to PostgreSQL queue + pool, err := database.Connect(ctx, cfg.Database.URL, database.Options{ + MaxOpenConns: cfg.Database.MaxOpenConns, + MaxIdleConns: cfg.Database.MaxIdleConns, + ConnMaxLifetime: cfg.Database.ConnMaxLifetime, + }) + if err != nil { + logger.Error("failed to connect to database", "error", err) + os.Exit(1) + } + defer pool.Close() + logger.Info("connected to database", "url", pool.URL) + + // Run migrations + database.MustRunMigrations(ctx, pool, migrationsFS, "migrations") + logger.Info("migrations complete") + + jobQueue = queue.NewPostgresQueue(pool.DB, logger) + logger.Info("using PostgreSQL queue") } - defer pool.Close() - logger.Info("connected to database", "url", pool.URL) - - // Run migrations - database.MustRunMigrations(ctx, pool, migrationsFS, "migrations") - logger.Info("migrations complete") - - // Initialize queue - jobQueue := queue.NewPostgresQueue(pool.DB, logger) // Initialize and start handler handler := handlers.New(logger, jobQueue, handlers.Config{ - PollInterval: cfg.Worker.PollInterval, - StaleJobTimeout: cfg.Worker.StaleJobTimeout, - JobTimeout: cfg.Worker.JobTimeout, + PollInterval: cfg.Worker.PollInterval, + StaleJobTimeout: cfg.Worker.StaleJobTimeout, + JobTimeout: cfg.Worker.JobTimeout, }) - // Register job handlers - // TODO: Register your job handlers here - // handler.RegisterHandler("send_email", emailHandler) - // handler.RegisterHandler("process_image", imageHandler) + // Initialize task handlers + taskHandlers := handlers.NewTaskHandlers(logger) + + // Register job handlers for tasks pushed by chat-svc and other services + handler.RegisterHandler("process_chat_message", taskHandlers.ProcessChatMessage) + handler.RegisterHandler("send_notification", taskHandlers.SendNotification) + handler.RegisterHandler("sync_data", taskHandlers.SyncData) + handler.RegisterHandler("process_webhook", taskHandlers.ProcessWebhook) // Setup signal handling sigCh := make(chan os.Signal, 1) @@ -105,18 +134,30 @@ func main() { logger.Info("worker-svc worker stopped") } +// StaleJobRequeuer is an interface for queues that support stale job recovery. +type StaleJobRequeuer interface { + RequeueStale(ctx context.Context, timeout time.Duration) (int64, error) +} + // runStaleJobRecovery periodically requeues jobs that have been running too long. -func runStaleJobRecovery(ctx context.Context, q *queue.PostgresQueue, timeout time.Duration, logger *logging.Logger) { +func runStaleJobRecovery(ctx context.Context, q queue.Queue, timeout time.Duration, logger *logging.Logger) { const staleCheckInterval = time.Minute ticker := time.NewTicker(staleCheckInterval) defer ticker.Stop() + // Check if queue supports stale job recovery + requeuer, ok := q.(StaleJobRequeuer) + if !ok { + logger.Warn("queue does not support stale job recovery") + return + } + for { select { case <-ctx.Done(): return case <-ticker.C: - count, err := q.RequeueStale(ctx, timeout) + count, err := requeuer.RequeueStale(ctx, timeout) if err != nil { logger.Error("failed to requeue stale jobs", "error", err) } else if count > 0 { diff --git a/workers/worker-svc/internal/config/config.go b/workers/worker-svc/internal/config/config.go index e5108cd..53561f0 100644 --- a/workers/worker-svc/internal/config/config.go +++ b/workers/worker-svc/internal/config/config.go @@ -13,10 +13,16 @@ import ( type Config struct { config.AppConfig Database config.DatabaseConfig + Redis RedisConfig Logging config.LoggingConfig Worker WorkerConfig } +// RedisConfig holds Redis connection settings. +type RedisConfig struct { + URL string +} + // WorkerConfig holds worker-specific settings. type WorkerConfig struct { // PollInterval is how often to check for new jobs when queue is empty. @@ -54,7 +60,10 @@ func Load() (*Config, error) { return &Config{ AppConfig: config.ReadAppConfig(), Database: config.ReadDatabaseConfig(), - Logging: config.ReadLoggingConfig(), + Redis: RedisConfig{ + URL: viper.GetString("REDIS_URL"), + }, + Logging: config.ReadLoggingConfig(), Worker: WorkerConfig{ PollInterval: viper.GetDuration("WORKER_POLL_INTERVAL"), BatchSize: viper.GetInt("WORKER_BATCH_SIZE"), diff --git a/workers/worker-svc/internal/handlers/tasks.go b/workers/worker-svc/internal/handlers/tasks.go new file mode 100644 index 0000000..a83ae4d --- /dev/null +++ b/workers/worker-svc/internal/handlers/tasks.go @@ -0,0 +1,124 @@ +package handlers + +import ( + "context" + "fmt" + + "git.threesix.ai/jordan/sp4-verify-1770325799/pkg/logging" + "git.threesix.ai/jordan/sp4-verify-1770325799/pkg/queue" +) + +// TaskHandlers provides handlers for different job types pushed from services. +type TaskHandlers struct { + logger *logging.Logger +} + +// NewTaskHandlers creates task handlers for processing jobs from the queue. +func NewTaskHandlers(logger *logging.Logger) *TaskHandlers { + return &TaskHandlers{ + logger: logger.WithComponent("task-handlers"), + } +} + +// ProcessChatMessage handles chat message processing tasks. +func (h *TaskHandlers) ProcessChatMessage(ctx context.Context, job *queue.Job) error { + h.logger.Info("processing chat message", + "job_id", job.ID, + "payload", job.Payload, + ) + + // Extract payload fields + messageID, _ := job.Payload["message_id"].(string) + userID, _ := job.Payload["user_id"].(string) + content, _ := job.Payload["content"].(string) + + if messageID == "" { + return fmt.Errorf("message_id is required") + } + + // Simulate processing work + h.logger.Debug("chat message processed", + "message_id", messageID, + "user_id", userID, + "content_length", len(content), + ) + + return nil +} + +// SendNotification handles notification sending tasks. +func (h *TaskHandlers) SendNotification(ctx context.Context, job *queue.Job) error { + h.logger.Info("sending notification", + "job_id", job.ID, + "payload", job.Payload, + ) + + userID, _ := job.Payload["user_id"].(string) + notificationType, _ := job.Payload["type"].(string) + message, _ := job.Payload["message"].(string) + + if userID == "" { + return fmt.Errorf("user_id is required") + } + if notificationType == "" { + return fmt.Errorf("notification type is required") + } + + // Simulate sending notification + h.logger.Debug("notification sent", + "user_id", userID, + "type", notificationType, + "message_length", len(message), + ) + + return nil +} + +// SyncData handles data synchronization tasks. +func (h *TaskHandlers) SyncData(ctx context.Context, job *queue.Job) error { + h.logger.Info("syncing data", + "job_id", job.ID, + "payload", job.Payload, + ) + + source, _ := job.Payload["source"].(string) + destination, _ := job.Payload["destination"].(string) + + if source == "" { + return fmt.Errorf("source is required") + } + if destination == "" { + return fmt.Errorf("destination is required") + } + + // Simulate data sync + h.logger.Debug("data synced", + "source", source, + "destination", destination, + ) + + return nil +} + +// ProcessWebhook handles incoming webhook processing. +func (h *TaskHandlers) ProcessWebhook(ctx context.Context, job *queue.Job) error { + h.logger.Info("processing webhook", + "job_id", job.ID, + "payload", job.Payload, + ) + + webhookID, _ := job.Payload["webhook_id"].(string) + eventType, _ := job.Payload["event_type"].(string) + + if webhookID == "" { + return fmt.Errorf("webhook_id is required") + } + + // Simulate webhook processing + h.logger.Debug("webhook processed", + "webhook_id", webhookID, + "event_type", eventType, + ) + + return nil +} diff --git a/workers/worker-svc/internal/handlers/tasks_test.go b/workers/worker-svc/internal/handlers/tasks_test.go new file mode 100644 index 0000000..b2a7e2f --- /dev/null +++ b/workers/worker-svc/internal/handlers/tasks_test.go @@ -0,0 +1,213 @@ +package handlers + +import ( + "context" + "testing" + "time" + + "git.threesix.ai/jordan/sp4-verify-1770325799/pkg/logging" + "git.threesix.ai/jordan/sp4-verify-1770325799/pkg/queue" +) + +func TestTaskHandlers_ProcessChatMessage(t *testing.T) { + logger := logging.New(logging.Config{Level: logging.LevelDebug}) + h := NewTaskHandlers(logger) + + tests := []struct { + name string + payload map[string]any + wantErr bool + }{ + { + name: "valid message", + payload: map[string]any{ + "message_id": "msg-123", + "user_id": "user-456", + "content": "Hello world", + }, + wantErr: false, + }, + { + name: "missing message_id", + payload: map[string]any{ + "user_id": "user-456", + "content": "Hello world", + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + now := time.Now() + job := &queue.Job{ + ID: "job-123", + Type: "process_chat_message", + Payload: tt.payload, + Status: queue.StatusRunning, + CreatedAt: now, + StartedAt: &now, + } + + err := h.ProcessChatMessage(context.Background(), job) + if (err != nil) != tt.wantErr { + t.Errorf("ProcessChatMessage() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestTaskHandlers_SendNotification(t *testing.T) { + logger := logging.New(logging.Config{Level: logging.LevelDebug}) + h := NewTaskHandlers(logger) + + tests := []struct { + name string + payload map[string]any + wantErr bool + }{ + { + name: "valid notification", + payload: map[string]any{ + "user_id": "user-456", + "type": "email", + "message": "You have a new message", + }, + wantErr: false, + }, + { + name: "missing user_id", + payload: map[string]any{ + "type": "email", + "message": "You have a new message", + }, + wantErr: true, + }, + { + name: "missing type", + payload: map[string]any{ + "user_id": "user-456", + "message": "You have a new message", + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + now := time.Now() + job := &queue.Job{ + ID: "job-123", + Type: "send_notification", + Payload: tt.payload, + Status: queue.StatusRunning, + CreatedAt: now, + StartedAt: &now, + } + + err := h.SendNotification(context.Background(), job) + if (err != nil) != tt.wantErr { + t.Errorf("SendNotification() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestTaskHandlers_SyncData(t *testing.T) { + logger := logging.New(logging.Config{Level: logging.LevelDebug}) + h := NewTaskHandlers(logger) + + tests := []struct { + name string + payload map[string]any + wantErr bool + }{ + { + name: "valid sync", + payload: map[string]any{ + "source": "database-a", + "destination": "database-b", + }, + wantErr: false, + }, + { + name: "missing source", + payload: map[string]any{ + "destination": "database-b", + }, + wantErr: true, + }, + { + name: "missing destination", + payload: map[string]any{ + "source": "database-a", + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + now := time.Now() + job := &queue.Job{ + ID: "job-123", + Type: "sync_data", + Payload: tt.payload, + Status: queue.StatusRunning, + CreatedAt: now, + StartedAt: &now, + } + + err := h.SyncData(context.Background(), job) + if (err != nil) != tt.wantErr { + t.Errorf("SyncData() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestTaskHandlers_ProcessWebhook(t *testing.T) { + logger := logging.New(logging.Config{Level: logging.LevelDebug}) + h := NewTaskHandlers(logger) + + tests := []struct { + name string + payload map[string]any + wantErr bool + }{ + { + name: "valid webhook", + payload: map[string]any{ + "webhook_id": "wh-123", + "event_type": "user.created", + }, + wantErr: false, + }, + { + name: "missing webhook_id", + payload: map[string]any{ + "event_type": "user.created", + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + now := time.Now() + job := &queue.Job{ + ID: "job-123", + Type: "process_webhook", + Payload: tt.payload, + Status: queue.StatusRunning, + CreatedAt: now, + StartedAt: &now, + } + + err := h.ProcessWebhook(context.Background(), job) + if (err != nil) != tt.wantErr { + t.Errorf("ProcessWebhook() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +}