diff --git a/.sdlc/features/mesh-interop/design.md b/.sdlc/features/mesh-interop/design.md new file mode 100644 index 0000000..b1903ac --- /dev/null +++ b/.sdlc/features/mesh-interop/design.md @@ -0,0 +1,33 @@ +# Technical Design: Service Mesh Interop + +## Architecture + +``` +Client --Bearer token--> Chat-svc --POST /validate--> Auth-svc + | + +--Enqueue(chat_task)--> PostgreSQL jobs table --> Worker-svc +``` + +## Component Changes + +### 1. Auth-svc: /validate Endpoint +- **New handler**: `handlers/validate.go` - accepts POST with Bearer token, validates via existing JWT validator, returns user info +- **Route**: `POST /api/auth-svc/validate` (public endpoint, token is in the request body/header) +- **OpenAPI**: Document in `spec.go` + +### 2. Chat-svc: Auth Client + Queue Producer +- **New adapter**: `internal/adapter/authclient/client.go` - wraps `svc.Client` to call auth-svc /validate +- **New port**: `internal/port/auth.go` - `AuthValidator` interface +- **New adapter**: `internal/adapter/jobqueue/producer.go` - wraps `queue.Producer` +- **New port**: `internal/port/queue.go` - `TaskProducer` interface +- **Wire into routes**: Protected routes validate via auth-svc, handlers can enqueue tasks + +### 3. Worker-svc: Chat Task Handler +- **New handler**: `internal/handlers/chat_task.go` - processes `chat_task` jobs +- **Register**: In `main.go`, register the handler + +## Patterns +- Use `pkg/svc.NewClient("auth-svc")` for service discovery +- Use `pkg/queue.Producer` interface for enqueuing +- Follow hexagonal architecture (ports + adapters) +- All handlers return `error`, wrapped with `app.Wrap()` diff --git a/.sdlc/features/mesh-interop/manifest.yaml b/.sdlc/features/mesh-interop/manifest.yaml new file mode 100644 index 0000000..8a5d553 --- /dev/null +++ b/.sdlc/features/mesh-interop/manifest.yaml @@ -0,0 +1,37 @@ +slug: mesh-interop +title: Service Mesh Interop +created: 2026-02-07T21:28:48.335667969Z +phase: implementation +phase_history: + - phase: draft + entered: 2026-02-07T21:28:48.335667969Z + - phase: implementation + entered: 2026-02-07T21:30:00.000000000Z +artifacts: + audit: + status: pending + path: audit.md + design: + status: approved + path: design.md + approved_by: user + approved_at: 2026-02-07T21:29:47.287882658Z + qa_plan: + status: pending + path: qa-plan.md + qa_results: + status: pending + path: qa-results.md + review: + status: pending + path: review.md + spec: + status: approved + path: spec.md + approved_by: user + approved_at: 2026-02-07T21:29:47.287882658Z + tasks: + status: approved + path: tasks.md + approved_by: user + approved_at: 2026-02-07T21:29:47.287882658Z diff --git a/.sdlc/features/mesh-interop/spec.md b/.sdlc/features/mesh-interop/spec.md new file mode 100644 index 0000000..967c162 --- /dev/null +++ b/.sdlc/features/mesh-interop/spec.md @@ -0,0 +1,23 @@ +# Feature Spec: Service Mesh Interop + +## Summary +Enable inter-service communication: Chat Service validates tokens via auth-svc, and pushes async tasks to the PostgreSQL job queue for Worker to process. + +## Requirements +1. **Auth-svc /validate endpoint**: Auth Service exposes `POST /api/auth-svc/validate` that accepts a Bearer token and returns the validated user info. +2. **Chat-svc token validation**: Chat Service calls `http://auth-svc/api/auth-svc/validate` via `pkg/svc` client to check tokens on protected endpoints. +3. **Chat-svc queue producer**: Chat Service pushes jobs to the PostgreSQL queue (via `pkg/queue`) for async processing by Worker. +4. **Worker job processing**: Worker registers a handler for `chat_task` job type and processes tasks from the queue. + +## Non-Goals +- No new database schemas beyond existing jobs table +- No Redis (using existing PostgreSQL queue) +- No new frontend changes + +## Acceptance Criteria +- [ ] Auth-svc has a `/validate` endpoint that validates JWT tokens +- [ ] Chat-svc calls auth-svc to validate tokens on protected routes +- [ ] Chat-svc can enqueue jobs to the queue +- [ ] Worker processes `chat_task` jobs +- [ ] All components have tests +- [ ] All tests pass diff --git a/.sdlc/features/mesh-interop/tasks.md b/.sdlc/features/mesh-interop/tasks.md new file mode 100644 index 0000000..f173731 --- /dev/null +++ b/.sdlc/features/mesh-interop/tasks.md @@ -0,0 +1,38 @@ +# Implementation Tasks: Service Mesh Interop + +## Task 1: Add /validate endpoint to auth-svc +- **ID**: task-1 +- **Status**: completed +- **Blocked by**: none +- **Files**: + - `services/auth-svc/internal/api/handlers/validate.go` (new) + - `services/auth-svc/internal/api/handlers/validate_test.go` (new) + - `services/auth-svc/internal/api/routes.go` (modify) + - `services/auth-svc/internal/api/spec.go` (modify) +- **Scope**: Add POST /api/auth-svc/validate endpoint that validates a Bearer token and returns user info + +## Task 2: Add auth-svc client and queue producer to chat-svc +- **ID**: task-2 +- **Status**: completed +- **Blocked by**: task-1 +- **Files**: + - `services/chat-svc/internal/port/auth.go` (new) + - `services/chat-svc/internal/port/queue.go` (new) + - `services/chat-svc/internal/adapter/authclient/client.go` (new) + - `services/chat-svc/internal/adapter/jobqueue/producer.go` (new) + - `services/chat-svc/internal/api/handlers/task.go` (new) + - `services/chat-svc/internal/api/handlers/task_test.go` (new) + - `services/chat-svc/internal/api/routes.go` (modify) + - `services/chat-svc/internal/api/spec.go` (modify) + - `services/chat-svc/cmd/server/main.go` (modify) +- **Scope**: Add inter-service auth validation and job queue producer to chat-svc + +## Task 3: Register chat_task handler in worker-svc +- **ID**: task-3 +- **Status**: completed +- **Blocked by**: none +- **Files**: + - `workers/worker-svc/internal/handlers/chat_task.go` (new) + - `workers/worker-svc/internal/handlers/chat_task_test.go` (new) + - `workers/worker-svc/cmd/worker/main.go` (modify) +- **Scope**: Register and implement chat_task job handler in worker-svc diff --git a/services/auth-svc/internal/api/handlers/validate.go b/services/auth-svc/internal/api/handlers/validate.go new file mode 100644 index 0000000..344c544 --- /dev/null +++ b/services/auth-svc/internal/api/handlers/validate.go @@ -0,0 +1,55 @@ +package handlers + +import ( + "net/http" + + "git.threesix.ai/jordan/sp4-v2-1770499323/pkg/auth" + "git.threesix.ai/jordan/sp4-v2-1770499323/pkg/httperror" + "git.threesix.ai/jordan/sp4-v2-1770499323/pkg/httpresponse" + "git.threesix.ai/jordan/sp4-v2-1770499323/pkg/logging" +) + +// Validate handles token validation requests from sibling services. +type Validate struct { + validator auth.Validator + logger *logging.Logger +} + +// NewValidate creates a new Validate handler with injected dependencies. +func NewValidate(validator auth.Validator, logger *logging.Logger) *Validate { + return &Validate{ + validator: validator, + logger: logger.WithComponent("ValidateHandler"), + } +} + +// ValidateResponse is the response body for a successful token validation. +type ValidateResponse struct { + UserID string `json:"user_id"` + Email string `json:"email,omitempty"` + Roles []string `json:"roles,omitempty"` + Scopes []string `json:"scopes,omitempty"` +} + +// Check validates the Bearer token from the Authorization header +// and returns the authenticated user info. +func (h *Validate) Check(w http.ResponseWriter, r *http.Request) error { + token := auth.ExtractBearerToken(r) + if token == "" { + return httperror.Unauthorized("missing bearer token") + } + + user, err := h.validator.Validate(r.Context(), token) + if err != nil { + h.logger.Debug("token validation failed", "error", err) + return httperror.Unauthorized("invalid token") + } + + httpresponse.OK(w, r, ValidateResponse{ + UserID: user.ID, + Email: user.Email, + Roles: user.Roles, + Scopes: user.Scopes, + }) + return nil +} diff --git a/services/auth-svc/internal/api/handlers/validate_test.go b/services/auth-svc/internal/api/handlers/validate_test.go new file mode 100644 index 0000000..c6991b2 --- /dev/null +++ b/services/auth-svc/internal/api/handlers/validate_test.go @@ -0,0 +1,107 @@ +package handlers + +import ( + "context" + "encoding/json" + "errors" + "net/http" + "net/http/httptest" + "testing" + + "git.threesix.ai/jordan/sp4-v2-1770499323/pkg/auth" + "git.threesix.ai/jordan/sp4-v2-1770499323/pkg/logging" +) + +// mockValidator implements auth.Validator for testing. +type mockValidator struct { + user *auth.User + err error +} + +func (m *mockValidator) Validate(ctx context.Context, token string) (*auth.User, error) { + if m.err != nil { + return nil, m.err + } + return m.user, nil +} + +func TestValidate_Check(t *testing.T) { + tests := []struct { + name string + token string + validator *mockValidator + wantStatus int + wantUserID string + }{ + { + name: "valid token", + token: "valid-jwt-token", + validator: &mockValidator{ + user: &auth.User{ + ID: "user-123", + Email: "user@example.com", + Roles: []string{"admin"}, + }, + }, + wantStatus: http.StatusOK, + wantUserID: "user-123", + }, + { + name: "missing token", + token: "", + validator: &mockValidator{}, + wantStatus: http.StatusUnauthorized, + }, + { + name: "invalid token", + token: "bad-token", + validator: &mockValidator{ + err: errors.New("token invalid"), + }, + wantStatus: http.StatusUnauthorized, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + handler := NewValidate(tt.validator, logging.Nop()) + + req := httptest.NewRequest(http.MethodPost, "/api/auth-svc/validate", nil) + if tt.token != "" { + req.Header.Set("Authorization", "Bearer "+tt.token) + } + w := httptest.NewRecorder() + + err := handler.Check(w, req) + if err != nil { + // Handler returns error for app.Wrap to handle + if tt.wantStatus == http.StatusOK { + t.Fatalf("unexpected error: %v", err) + } + return + } + + if tt.wantStatus != http.StatusOK { + t.Fatalf("expected error but got nil") + } + + if w.Code != http.StatusOK { + t.Errorf("expected status %d, got %d", http.StatusOK, w.Code) + } + + var resp map[string]any + if err := json.NewDecoder(w.Body).Decode(&resp); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + + data, ok := resp["data"].(map[string]any) + if !ok { + t.Fatal("expected 'data' field in response") + } + + if data["user_id"] != tt.wantUserID { + t.Errorf("expected user_id %q, got %q", tt.wantUserID, data["user_id"]) + } + }) + } +} diff --git a/services/auth-svc/internal/api/routes.go b/services/auth-svc/internal/api/routes.go index b26a2c9..f2fad63 100644 --- a/services/auth-svc/internal/api/routes.go +++ b/services/auth-svc/internal/api/routes.go @@ -18,9 +18,16 @@ func RegisterRoutes(application *app.App, exampleService *service.ExampleService logger := application.Logger() cfg := config.Load() + // Create JWT validator (shared between middleware and /validate endpoint) + jwtValidator := auth.NewJWTValidator(auth.JWTConfig{ + Secret: []byte(cfg.JWTSecret), + Issuer: "sp4-v2-1770499323", + }) + // Initialize handlers with injected services healthHandler := handlers.NewHealth(logger) exampleHandler := handlers.NewExample(exampleService, logger) + validateHandler := handlers.NewValidate(jwtValidator, logger) // Build and mount OpenAPI spec spec := NewServiceSpec() @@ -31,6 +38,9 @@ func RegisterRoutes(application *app.App, exampleService *service.ExampleService application.Route("/api/auth-svc", func(r app.Router) { r.Get("/health", healthHandler.Check) + // Token validation endpoint (called by sibling services) + r.Post("/validate", app.Wrap(validateHandler.Check)) + // Public routes (no auth required) r.Get("/examples", app.Wrap(exampleHandler.List)) r.Get("/examples/{id}", app.Wrap(exampleHandler.Get)) @@ -39,10 +49,7 @@ func RegisterRoutes(application *app.App, exampleService *service.ExampleService r.Group(func(r app.Router) { if cfg.AuthEnabled { r.Use(auth.Middleware(auth.MiddlewareConfig{ - Validator: auth.NewJWTValidator(auth.JWTConfig{ - Secret: []byte(cfg.JWTSecret), - Issuer: "sp4-v2-1770499323", - }), + Validator: jwtValidator, })) } diff --git a/services/auth-svc/internal/api/spec.go b/services/auth-svc/internal/api/spec.go index 373124c..20db302 100644 --- a/services/auth-svc/internal/api/spec.go +++ b/services/auth-svc/internal/api/spec.go @@ -8,6 +8,7 @@ func NewServiceSpec() *openapi.OpenAPISpec { WithDescription("REST API for the auth-svc service"). WithBearerSecurity("bearer", "JWT authentication token"). WithTag("Health", "Service health endpoints"). + WithTag("Auth", "Authentication endpoints"). WithTag("Examples", "Example CRUD endpoints") // Define reusable schemas @@ -24,6 +25,13 @@ func NewServiceSpec() *openapi.OpenAPISpec { "description": openapi.StringWithMinMax(0, 500).WithDescription("Optional description"), }, "name")) + spec.WithSchema("ValidateResponse", openapi.Object(map[string]openapi.Schema{ + "user_id": openapi.String().WithDescription("Authenticated user ID"), + "email": openapi.String().WithDescription("User email"), + "roles": openapi.Array(openapi.String()).WithDescription("User roles"), + "scopes": openapi.Array(openapi.String()).WithDescription("User scopes"), + }, "user_id")) + spec.WithSchema("UpdateExampleRequest", openapi.Object(map[string]openapi.Schema{ "name": openapi.StringWithMinMax(1, 100).WithDescription("Updated name"), "description": openapi.StringWithMinMax(0, 500).WithDescription("Updated description"), @@ -41,6 +49,18 @@ func NewServiceSpec() *openapi.OpenAPISpec { }, }) + // Validate token + spec.AddPath("/api/auth-svc/validate", "post", map[string]any{ + "summary": "Validate token", + "description": "Validates a Bearer token and returns user info. Used by sibling services for token validation.", + "tags": []string{"Auth"}, + "security": []map[string][]string{{"bearer": {}}}, + "responses": map[string]any{ + "200": openapi.OpResponse("Token is valid", openapi.ResponseSchema(openapi.Ref("ValidateResponse"))), + "401": openapi.OpResponse("Invalid or missing token", openapi.ErrorResponseSchema()), + }, + }) + // List examples spec.AddPath("/api/auth-svc/examples", "get", map[string]any{ "summary": "List examples", diff --git a/services/chat-svc/cmd/server/main.go b/services/chat-svc/cmd/server/main.go index 6fcb844..c689b13 100644 --- a/services/chat-svc/cmd/server/main.go +++ b/services/chat-svc/cmd/server/main.go @@ -2,10 +2,18 @@ package main import ( + "context" + "git.threesix.ai/jordan/sp4-v2-1770499323/pkg/app" + "git.threesix.ai/jordan/sp4-v2-1770499323/pkg/database" "git.threesix.ai/jordan/sp4-v2-1770499323/pkg/logging" + "git.threesix.ai/jordan/sp4-v2-1770499323/pkg/queue" + "git.threesix.ai/jordan/sp4-v2-1770499323/services/chat-svc/internal/adapter/authclient" + "git.threesix.ai/jordan/sp4-v2-1770499323/services/chat-svc/internal/adapter/jobqueue" "git.threesix.ai/jordan/sp4-v2-1770499323/services/chat-svc/internal/adapter/memory" "git.threesix.ai/jordan/sp4-v2-1770499323/services/chat-svc/internal/api" + "git.threesix.ai/jordan/sp4-v2-1770499323/services/chat-svc/internal/config" + "git.threesix.ai/jordan/sp4-v2-1770499323/services/chat-svc/internal/port" "git.threesix.ai/jordan/sp4-v2-1770499323/services/chat-svc/internal/service" ) @@ -13,9 +21,41 @@ func main() { // Create logger logger := logging.Default() + // Load config + cfg := config.Load() + // Create adapters (repositories) exampleRepo := memory.NewExampleRepository() + // Create adapters (inter-service auth client) + var authValidator port.AuthValidator + authClient, err := authclient.New() + if err != nil { + logger.Warn("auth-svc client not configured, token validation disabled", "error", err) + } else { + authValidator = authClient + } + + // Create adapters (job queue producer) + var taskProducer port.TaskProducer + if cfg.Database.URL != "" { + pool, err := database.Connect(context.Background(), cfg.Database.URL, database.Options{ + MaxOpenConns: cfg.Database.MaxOpenConns, + MaxIdleConns: cfg.Database.MaxIdleConns, + ConnMaxLifetime: cfg.Database.ConnMaxLifetime, + }) + if err != nil { + logger.Warn("database not available, task queue disabled", "error", err) + } else { + defer pool.Close() + jobQueue := queue.NewPostgresQueue(pool.DB, logger) + taskProducer = jobqueue.New(jobQueue) + logger.Info("task queue enabled") + } + } else { + logger.Warn("DATABASE_URL not configured, task queue disabled") + } + // Create services (business logic) exampleService := service.NewExampleService(exampleRepo, logger) @@ -23,7 +63,7 @@ func main() { application := app.New("chat-svc", app.WithDefaultPort(8001)) // Register routes with dependency injection - api.RegisterRoutes(application, exampleService) + api.RegisterRoutes(application, exampleService, authValidator, taskProducer) // Start server application.Run() diff --git a/services/chat-svc/internal/adapter/authclient/client.go b/services/chat-svc/internal/adapter/authclient/client.go new file mode 100644 index 0000000..b6b8f5a --- /dev/null +++ b/services/chat-svc/internal/adapter/authclient/client.go @@ -0,0 +1,68 @@ +// Package authclient provides an adapter for validating tokens via the auth-svc. +package authclient + +import ( + "context" + "fmt" + "net/http" + + "git.threesix.ai/jordan/sp4-v2-1770499323/pkg/httpclient" + "git.threesix.ai/jordan/sp4-v2-1770499323/pkg/svc" + "git.threesix.ai/jordan/sp4-v2-1770499323/services/chat-svc/internal/port" +) + +// compile-time check +var _ port.AuthValidator = (*Client)(nil) + +// validateEnvelope matches the auth-svc response envelope: {data: ValidateResponse, meta: ...} +type validateEnvelope struct { + Data port.AuthUser `json:"data"` +} + +// Client validates tokens by calling auth-svc's /validate endpoint. +type Client struct { + baseURL string + httpClient *httpclient.Client +} + +// New creates a new auth-svc client adapter. +// Returns an error if AUTH_SVC_URL is not configured. +func New() (*Client, error) { + baseURL := svc.ServiceURL("auth-svc") + if baseURL == "" { + return nil, fmt.Errorf("auth-svc not configured (missing AUTH_SVC_URL env var)") + } + + httpClient := httpclient.New(httpclient.Config{ + MaxRetries: 3, + CircuitBreaker: httpclient.NewCircuitBreaker(httpclient.DefaultCircuitBreakerConfig()), + }) + + return &Client{ + baseURL: baseURL, + httpClient: httpClient, + }, nil +} + +// ValidateToken calls auth-svc's /validate endpoint with the given Bearer token. +func (c *Client) ValidateToken(ctx context.Context, token string) (*port.AuthUser, error) { + url := c.baseURL + "/api/auth-svc/validate" + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, nil) + if err != nil { + return nil, fmt.Errorf("create validate request: %w", err) + } + req.Header.Set("Authorization", "Bearer "+token) + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("auth-svc request failed: %w", err) + } + + envelope, err := svc.DecodeResponse[validateEnvelope](resp) + if err != nil { + return nil, fmt.Errorf("auth-svc validation failed: %w", err) + } + + return &envelope.Data, nil +} diff --git a/services/chat-svc/internal/adapter/jobqueue/producer.go b/services/chat-svc/internal/adapter/jobqueue/producer.go new file mode 100644 index 0000000..cac46a0 --- /dev/null +++ b/services/chat-svc/internal/adapter/jobqueue/producer.go @@ -0,0 +1,37 @@ +// Package jobqueue provides an adapter for enqueuing async tasks. +package jobqueue + +import ( + "context" + "fmt" + + "git.threesix.ai/jordan/sp4-v2-1770499323/pkg/queue" + "git.threesix.ai/jordan/sp4-v2-1770499323/services/chat-svc/internal/port" +) + +// compile-time check +var _ port.TaskProducer = (*Producer)(nil) + +// Producer enqueues chat tasks to the job queue. +type Producer struct { + queue queue.Producer +} + +// New creates a new task producer. +func New(q queue.Producer) *Producer { + return &Producer{queue: q} +} + +// EnqueueTask enqueues a chat_task job with the given action and payload. +func (p *Producer) EnqueueTask(ctx context.Context, action string, payload map[string]any) (string, error) { + if payload == nil { + payload = make(map[string]any) + } + payload["action"] = action + + jobID, err := p.queue.Enqueue(ctx, "chat_task", payload) + if err != nil { + return "", fmt.Errorf("enqueue chat task: %w", err) + } + return jobID, nil +} diff --git a/services/chat-svc/internal/api/handlers/task.go b/services/chat-svc/internal/api/handlers/task.go new file mode 100644 index 0000000..709b949 --- /dev/null +++ b/services/chat-svc/internal/api/handlers/task.go @@ -0,0 +1,54 @@ +package handlers + +import ( + "net/http" + + "git.threesix.ai/jordan/sp4-v2-1770499323/pkg/app" + "git.threesix.ai/jordan/sp4-v2-1770499323/pkg/httperror" + "git.threesix.ai/jordan/sp4-v2-1770499323/pkg/httpresponse" + "git.threesix.ai/jordan/sp4-v2-1770499323/pkg/logging" + "git.threesix.ai/jordan/sp4-v2-1770499323/services/chat-svc/internal/port" +) + +// Task handles HTTP requests for enqueuing async tasks. +type Task struct { + producer port.TaskProducer + logger *logging.Logger +} + +// NewTask creates a new Task handler with injected dependencies. +func NewTask(producer port.TaskProducer, logger *logging.Logger) *Task { + return &Task{ + producer: producer, + logger: logger.WithComponent("TaskHandler"), + } +} + +// EnqueueRequest is the request body for enqueuing a task. +type EnqueueRequest struct { + Action string `json:"action" validate:"required"` + Payload map[string]any `json:"payload"` +} + +// EnqueueResponse is the response body for a successfully enqueued task. +type EnqueueResponse struct { + JobID string `json:"job_id"` +} + +// Enqueue creates an async task for background processing. +func (h *Task) Enqueue(w http.ResponseWriter, r *http.Request) error { + var req EnqueueRequest + if err := app.BindAndValidate(r, &req); err != nil { + return err + } + + jobID, err := h.producer.EnqueueTask(r.Context(), req.Action, req.Payload) + if err != nil { + h.logger.Error("failed to enqueue task", "action", req.Action, "error", err) + return httperror.Internal("failed to enqueue task") + } + + h.logger.Info("task enqueued", "job_id", jobID, "action", req.Action) + httpresponse.Accepted(w, r, EnqueueResponse{JobID: jobID}) + return nil +} diff --git a/services/chat-svc/internal/api/handlers/task_test.go b/services/chat-svc/internal/api/handlers/task_test.go new file mode 100644 index 0000000..c1ce755 --- /dev/null +++ b/services/chat-svc/internal/api/handlers/task_test.go @@ -0,0 +1,117 @@ +package handlers + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "net/http" + "net/http/httptest" + "testing" + + "git.threesix.ai/jordan/sp4-v2-1770499323/pkg/logging" + "git.threesix.ai/jordan/sp4-v2-1770499323/services/chat-svc/internal/port" +) + +// mockTaskProducer implements port.TaskProducer for testing. +type mockTaskProducer struct { + lastAction string + lastPayload map[string]any + jobID string + err error +} + +var _ port.TaskProducer = (*mockTaskProducer)(nil) + +func (m *mockTaskProducer) EnqueueTask(ctx context.Context, action string, payload map[string]any) (string, error) { + m.lastAction = action + m.lastPayload = payload + if m.err != nil { + return "", m.err + } + return m.jobID, nil +} + +func TestTask_Enqueue(t *testing.T) { + tests := []struct { + name string + body any + producer *mockTaskProducer + wantStatus int + wantJobID string + }{ + { + name: "valid request", + body: EnqueueRequest{ + Action: "send_notification", + Payload: map[string]any{"user_id": "user-123", "message": "Hello"}, + }, + producer: &mockTaskProducer{jobID: "job-456"}, + wantStatus: http.StatusAccepted, + wantJobID: "job-456", + }, + { + name: "empty body", + body: nil, + producer: &mockTaskProducer{}, + wantStatus: http.StatusBadRequest, + }, + { + name: "producer error", + body: EnqueueRequest{ + Action: "send_notification", + }, + producer: &mockTaskProducer{err: errors.New("queue unavailable")}, + wantStatus: http.StatusInternalServerError, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + handler := NewTask(tt.producer, logging.Nop()) + + var body []byte + if tt.body != nil { + var err error + body, err = json.Marshal(tt.body) + if err != nil { + t.Fatalf("failed to marshal body: %v", err) + } + } + + req := httptest.NewRequest(http.MethodPost, "/api/chat-svc/tasks", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + + err := handler.Enqueue(w, req) + if err != nil { + if tt.wantStatus == http.StatusAccepted { + t.Fatalf("unexpected error: %v", err) + } + return + } + + if tt.wantStatus != http.StatusAccepted { + t.Fatalf("expected error but got nil") + } + + if w.Code != http.StatusAccepted { + t.Errorf("expected status %d, got %d", http.StatusAccepted, w.Code) + } + + var resp map[string]any + if err := json.NewDecoder(w.Body).Decode(&resp); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + + data, ok := resp["data"].(map[string]any) + if !ok { + t.Fatal("expected 'data' field in response") + } + + if data["job_id"] != tt.wantJobID { + t.Errorf("expected job_id %q, got %q", tt.wantJobID, data["job_id"]) + } + }) + } +} diff --git a/services/chat-svc/internal/api/routes.go b/services/chat-svc/internal/api/routes.go index e7c3e40..035c546 100644 --- a/services/chat-svc/internal/api/routes.go +++ b/services/chat-svc/internal/api/routes.go @@ -2,19 +2,19 @@ package api import ( + "context" + "git.threesix.ai/jordan/sp4-v2-1770499323/pkg/app" "git.threesix.ai/jordan/sp4-v2-1770499323/pkg/auth" "git.threesix.ai/jordan/sp4-v2-1770499323/services/chat-svc/internal/api/handlers" "git.threesix.ai/jordan/sp4-v2-1770499323/services/chat-svc/internal/config" + "git.threesix.ai/jordan/sp4-v2-1770499323/services/chat-svc/internal/port" "git.threesix.ai/jordan/sp4-v2-1770499323/services/chat-svc/internal/service" ) // RegisterRoutes registers all HTTP routes for the service. // Routes are mounted under /api/chat-svc to match the ingress path routing. -// This allows the monorepo to expose multiple services under a single domain: -// - https://domain/api/chat-svc/health -// - https://domain/api/chat-svc/examples -func RegisterRoutes(application *app.App, exampleService *service.ExampleService) { +func RegisterRoutes(application *app.App, exampleService *service.ExampleService, authValidator port.AuthValidator, taskProducer port.TaskProducer) { logger := application.Logger() cfg := config.Load() @@ -38,17 +38,50 @@ func RegisterRoutes(application *app.App, exampleService *service.ExampleService // Protected routes (auth required when enabled) r.Group(func(r app.Router) { if cfg.AuthEnabled { - r.Use(auth.Middleware(auth.MiddlewareConfig{ - Validator: auth.NewJWTValidator(auth.JWTConfig{ + // Use auth-svc for token validation when available, + // otherwise fall back to local JWT validation. + var validator auth.Validator + if authValidator != nil { + validator = &authSvcValidator{client: authValidator} + } else { + validator = auth.NewJWTValidator(auth.JWTConfig{ Secret: []byte(cfg.JWTSecret), Issuer: "sp4-v2-1770499323", - }), + }) + } + r.Use(auth.Middleware(auth.MiddlewareConfig{ + Validator: validator, })) } r.Post("/examples", app.Wrap(exampleHandler.Create)) r.Put("/examples/{id}", app.Wrap(exampleHandler.Update)) r.Delete("/examples/{id}", app.Wrap(exampleHandler.Delete)) + + // Task enqueue endpoint (requires auth, requires queue) + if taskProducer != nil { + taskHandler := handlers.NewTask(taskProducer, logger) + r.Post("/tasks", app.Wrap(taskHandler.Enqueue)) + } }) }) } + +// authSvcValidator adapts port.AuthValidator to auth.Validator, +// allowing the auth-svc HTTP client to be used with auth.Middleware. +type authSvcValidator struct { + client port.AuthValidator +} + +func (v *authSvcValidator) Validate(ctx context.Context, token string) (*auth.User, error) { + user, err := v.client.ValidateToken(ctx, token) + if err != nil { + return nil, err + } + return &auth.User{ + ID: user.UserID, + Email: user.Email, + Roles: user.Roles, + Scopes: user.Scopes, + }, nil +} diff --git a/services/chat-svc/internal/api/spec.go b/services/chat-svc/internal/api/spec.go index 25ae6b8..b117eed 100644 --- a/services/chat-svc/internal/api/spec.go +++ b/services/chat-svc/internal/api/spec.go @@ -8,7 +8,8 @@ func NewServiceSpec() *openapi.OpenAPISpec { WithDescription("REST API for the chat-svc service"). WithBearerSecurity("bearer", "JWT authentication token"). WithTag("Health", "Service health endpoints"). - WithTag("Examples", "Example CRUD endpoints") + WithTag("Examples", "Example CRUD endpoints"). + WithTag("Tasks", "Async task endpoints") // Define reusable schemas spec.WithSchema("Example", openapi.Object(map[string]openapi.Schema{ @@ -29,6 +30,15 @@ func NewServiceSpec() *openapi.OpenAPISpec { "description": openapi.StringWithMinMax(0, 500).WithDescription("Updated description"), })) + spec.WithSchema("EnqueueRequest", openapi.Object(map[string]openapi.Schema{ + "action": openapi.StringWithMinMax(1, 100).WithDescription("Task action to perform"), + "payload": openapi.Object(map[string]openapi.Schema{}).WithDescription("Arbitrary task payload"), + }, "action")) + + spec.WithSchema("EnqueueResponse", openapi.Object(map[string]openapi.Schema{ + "job_id": openapi.String().WithDescription("ID of the enqueued job"), + }, "job_id")) + // Health spec.AddPath("/api/chat-svc/health", "get", map[string]any{ "summary": "Health check", @@ -108,5 +118,20 @@ func NewServiceSpec() *openapi.OpenAPISpec { }, }) + // Enqueue task + spec.AddPath("/api/chat-svc/tasks", "post", map[string]any{ + "summary": "Enqueue task", + "description": "Enqueues an async task for background processing. Requires authentication.", + "tags": []string{"Tasks"}, + "security": []map[string][]string{{"bearer": {}}}, + "requestBody": openapi.RequestBody(openapi.Ref("EnqueueRequest"), true), + "responses": map[string]any{ + "202": openapi.OpResponse("Task accepted", openapi.ResponseSchema(openapi.Ref("EnqueueResponse"))), + "400": openapi.OpResponse("Bad request", openapi.ErrorResponseSchema()), + "401": openapi.OpResponse("Unauthorized", openapi.ErrorResponseSchema()), + "500": openapi.OpResponse("Internal error", openapi.ErrorResponseSchema()), + }, + }) + return spec } diff --git a/services/chat-svc/internal/port/auth.go b/services/chat-svc/internal/port/auth.go new file mode 100644 index 0000000..e8d80ca --- /dev/null +++ b/services/chat-svc/internal/port/auth.go @@ -0,0 +1,18 @@ +package port + +import ( + "context" +) + +// AuthUser represents a validated user from the auth service. +type AuthUser struct { + UserID string `json:"user_id"` + Email string `json:"email,omitempty"` + Roles []string `json:"roles,omitempty"` + Scopes []string `json:"scopes,omitempty"` +} + +// AuthValidator validates tokens against the auth service. +type AuthValidator interface { + ValidateToken(ctx context.Context, token string) (*AuthUser, error) +} diff --git a/services/chat-svc/internal/port/queue.go b/services/chat-svc/internal/port/queue.go new file mode 100644 index 0000000..6d23625 --- /dev/null +++ b/services/chat-svc/internal/port/queue.go @@ -0,0 +1,10 @@ +package port + +import ( + "context" +) + +// TaskProducer enqueues async tasks for background processing. +type TaskProducer interface { + EnqueueTask(ctx context.Context, action string, payload map[string]any) (jobID string, err error) +} diff --git a/workers/worker-svc/cmd/worker/main.go b/workers/worker-svc/cmd/worker/main.go index 8602cb5..faf4731 100644 --- a/workers/worker-svc/cmd/worker/main.go +++ b/workers/worker-svc/cmd/worker/main.go @@ -75,9 +75,8 @@ func main() { }) // Register job handlers - // TODO: Register your job handlers here - // handler.RegisterHandler("send_email", emailHandler) - // handler.RegisterHandler("process_image", imageHandler) + chatTaskHandler := handlers.NewChatTaskHandler(logger) + handler.RegisterHandler("chat_task", chatTaskHandler.Handle) // Setup signal handling sigCh := make(chan os.Signal, 1) diff --git a/workers/worker-svc/internal/handlers/chat_task.go b/workers/worker-svc/internal/handlers/chat_task.go new file mode 100644 index 0000000..4198f34 --- /dev/null +++ b/workers/worker-svc/internal/handlers/chat_task.go @@ -0,0 +1,71 @@ +package handlers + +import ( + "context" + "fmt" + + "git.threesix.ai/jordan/sp4-v2-1770499323/pkg/logging" + "git.threesix.ai/jordan/sp4-v2-1770499323/pkg/queue" +) + +// ChatTaskHandler processes chat_task jobs from the queue. +type ChatTaskHandler struct { + logger *logging.Logger +} + +// NewChatTaskHandler creates a new ChatTaskHandler. +func NewChatTaskHandler(logger *logging.Logger) *ChatTaskHandler { + return &ChatTaskHandler{ + logger: logger.WithComponent("chat_task"), + } +} + +// Handle processes a single chat_task job. +func (h *ChatTaskHandler) Handle(ctx context.Context, job *queue.Job) error { + h.logger.Info("processing chat task", + "job_id", job.ID, + "payload", job.Payload, + ) + + action, _ := job.Payload["action"].(string) + if action == "" { + return fmt.Errorf("missing required payload field: action") + } + + switch action { + case "send_notification": + return h.handleSendNotification(ctx, job) + case "process_message": + return h.handleProcessMessage(ctx, job) + default: + return fmt.Errorf("unknown chat task action: %s", action) + } +} + +func (h *ChatTaskHandler) handleSendNotification(ctx context.Context, job *queue.Job) error { + userID, _ := job.Payload["user_id"].(string) + message, _ := job.Payload["message"].(string) + + h.logger.Info("sending notification", + "job_id", job.ID, + "user_id", userID, + "message", message, + ) + + // TODO: Implement actual notification delivery + return nil +} + +func (h *ChatTaskHandler) handleProcessMessage(ctx context.Context, job *queue.Job) error { + channelID, _ := job.Payload["channel_id"].(string) + content, _ := job.Payload["content"].(string) + + h.logger.Info("processing message", + "job_id", job.ID, + "channel_id", channelID, + "content_length", len(content), + ) + + // TODO: Implement actual message processing + return nil +} diff --git a/workers/worker-svc/internal/handlers/chat_task_test.go b/workers/worker-svc/internal/handlers/chat_task_test.go new file mode 100644 index 0000000..de0126f --- /dev/null +++ b/workers/worker-svc/internal/handlers/chat_task_test.go @@ -0,0 +1,65 @@ +package handlers + +import ( + "context" + "testing" + + "git.threesix.ai/jordan/sp4-v2-1770499323/pkg/logging" + "git.threesix.ai/jordan/sp4-v2-1770499323/pkg/queue" +) + +func TestChatTaskHandler_Handle(t *testing.T) { + tests := []struct { + name string + payload map[string]any + wantErr bool + }{ + { + name: "send_notification action", + payload: map[string]any{ + "action": "send_notification", + "user_id": "user-123", + "message": "Hello!", + }, + wantErr: false, + }, + { + name: "process_message action", + payload: map[string]any{ + "action": "process_message", + "channel_id": "chan-456", + "content": "Some message content", + }, + wantErr: false, + }, + { + name: "missing action", + payload: map[string]any{}, + wantErr: true, + }, + { + name: "unknown action", + payload: map[string]any{ + "action": "unknown_action", + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + handler := NewChatTaskHandler(logging.Nop()) + + job := &queue.Job{ + ID: "job-test-123", + Type: "chat_task", + Payload: tt.payload, + } + + err := handler.Handle(context.Background(), job) + if (err != nil) != tt.wantErr { + t.Errorf("Handle() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +}