build: /implement-feature mesh-interop --requirements 'Chat Service must cal...
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
This commit is contained in:
parent
1d68629d6f
commit
34f37a44b8
33
.sdlc/features/mesh-interop/design.md
Normal file
33
.sdlc/features/mesh-interop/design.md
Normal file
@ -0,0 +1,33 @@
|
||||
# Technical Design: Service Mesh Interop
|
||||
|
||||
## Architecture
|
||||
|
||||
```
|
||||
Client --Bearer token--> Chat-svc --POST /validate--> Auth-svc
|
||||
|
|
||||
+--Enqueue(chat_task)--> PostgreSQL jobs table --> Worker-svc
|
||||
```
|
||||
|
||||
## Component Changes
|
||||
|
||||
### 1. Auth-svc: /validate Endpoint
|
||||
- **New handler**: `handlers/validate.go` - accepts POST with Bearer token, validates via existing JWT validator, returns user info
|
||||
- **Route**: `POST /api/auth-svc/validate` (public endpoint, token is in the request body/header)
|
||||
- **OpenAPI**: Document in `spec.go`
|
||||
|
||||
### 2. Chat-svc: Auth Client + Queue Producer
|
||||
- **New adapter**: `internal/adapter/authclient/client.go` - wraps `svc.Client` to call auth-svc /validate
|
||||
- **New port**: `internal/port/auth.go` - `AuthValidator` interface
|
||||
- **New adapter**: `internal/adapter/jobqueue/producer.go` - wraps `queue.Producer`
|
||||
- **New port**: `internal/port/queue.go` - `TaskProducer` interface
|
||||
- **Wire into routes**: Protected routes validate via auth-svc, handlers can enqueue tasks
|
||||
|
||||
### 3. Worker-svc: Chat Task Handler
|
||||
- **New handler**: `internal/handlers/chat_task.go` - processes `chat_task` jobs
|
||||
- **Register**: In `main.go`, register the handler
|
||||
|
||||
## Patterns
|
||||
- Use `pkg/svc.NewClient("auth-svc")` for service discovery
|
||||
- Use `pkg/queue.Producer` interface for enqueuing
|
||||
- Follow hexagonal architecture (ports + adapters)
|
||||
- All handlers return `error`, wrapped with `app.Wrap()`
|
||||
37
.sdlc/features/mesh-interop/manifest.yaml
Normal file
37
.sdlc/features/mesh-interop/manifest.yaml
Normal file
@ -0,0 +1,37 @@
|
||||
slug: mesh-interop
|
||||
title: Service Mesh Interop
|
||||
created: 2026-02-07T21:28:48.335667969Z
|
||||
phase: implementation
|
||||
phase_history:
|
||||
- phase: draft
|
||||
entered: 2026-02-07T21:28:48.335667969Z
|
||||
- phase: implementation
|
||||
entered: 2026-02-07T21:30:00.000000000Z
|
||||
artifacts:
|
||||
audit:
|
||||
status: pending
|
||||
path: audit.md
|
||||
design:
|
||||
status: approved
|
||||
path: design.md
|
||||
approved_by: user
|
||||
approved_at: 2026-02-07T21:29:47.287882658Z
|
||||
qa_plan:
|
||||
status: pending
|
||||
path: qa-plan.md
|
||||
qa_results:
|
||||
status: pending
|
||||
path: qa-results.md
|
||||
review:
|
||||
status: pending
|
||||
path: review.md
|
||||
spec:
|
||||
status: approved
|
||||
path: spec.md
|
||||
approved_by: user
|
||||
approved_at: 2026-02-07T21:29:47.287882658Z
|
||||
tasks:
|
||||
status: approved
|
||||
path: tasks.md
|
||||
approved_by: user
|
||||
approved_at: 2026-02-07T21:29:47.287882658Z
|
||||
23
.sdlc/features/mesh-interop/spec.md
Normal file
23
.sdlc/features/mesh-interop/spec.md
Normal file
@ -0,0 +1,23 @@
|
||||
# Feature Spec: Service Mesh Interop
|
||||
|
||||
## Summary
|
||||
Enable inter-service communication: Chat Service validates tokens via auth-svc, and pushes async tasks to the PostgreSQL job queue for Worker to process.
|
||||
|
||||
## Requirements
|
||||
1. **Auth-svc /validate endpoint**: Auth Service exposes `POST /api/auth-svc/validate` that accepts a Bearer token and returns the validated user info.
|
||||
2. **Chat-svc token validation**: Chat Service calls `http://auth-svc/api/auth-svc/validate` via `pkg/svc` client to check tokens on protected endpoints.
|
||||
3. **Chat-svc queue producer**: Chat Service pushes jobs to the PostgreSQL queue (via `pkg/queue`) for async processing by Worker.
|
||||
4. **Worker job processing**: Worker registers a handler for `chat_task` job type and processes tasks from the queue.
|
||||
|
||||
## Non-Goals
|
||||
- No new database schemas beyond existing jobs table
|
||||
- No Redis (using existing PostgreSQL queue)
|
||||
- No new frontend changes
|
||||
|
||||
## Acceptance Criteria
|
||||
- [ ] Auth-svc has a `/validate` endpoint that validates JWT tokens
|
||||
- [ ] Chat-svc calls auth-svc to validate tokens on protected routes
|
||||
- [ ] Chat-svc can enqueue jobs to the queue
|
||||
- [ ] Worker processes `chat_task` jobs
|
||||
- [ ] All components have tests
|
||||
- [ ] All tests pass
|
||||
38
.sdlc/features/mesh-interop/tasks.md
Normal file
38
.sdlc/features/mesh-interop/tasks.md
Normal file
@ -0,0 +1,38 @@
|
||||
# Implementation Tasks: Service Mesh Interop
|
||||
|
||||
## Task 1: Add /validate endpoint to auth-svc
|
||||
- **ID**: task-1
|
||||
- **Status**: completed
|
||||
- **Blocked by**: none
|
||||
- **Files**:
|
||||
- `services/auth-svc/internal/api/handlers/validate.go` (new)
|
||||
- `services/auth-svc/internal/api/handlers/validate_test.go` (new)
|
||||
- `services/auth-svc/internal/api/routes.go` (modify)
|
||||
- `services/auth-svc/internal/api/spec.go` (modify)
|
||||
- **Scope**: Add POST /api/auth-svc/validate endpoint that validates a Bearer token and returns user info
|
||||
|
||||
## Task 2: Add auth-svc client and queue producer to chat-svc
|
||||
- **ID**: task-2
|
||||
- **Status**: completed
|
||||
- **Blocked by**: task-1
|
||||
- **Files**:
|
||||
- `services/chat-svc/internal/port/auth.go` (new)
|
||||
- `services/chat-svc/internal/port/queue.go` (new)
|
||||
- `services/chat-svc/internal/adapter/authclient/client.go` (new)
|
||||
- `services/chat-svc/internal/adapter/jobqueue/producer.go` (new)
|
||||
- `services/chat-svc/internal/api/handlers/task.go` (new)
|
||||
- `services/chat-svc/internal/api/handlers/task_test.go` (new)
|
||||
- `services/chat-svc/internal/api/routes.go` (modify)
|
||||
- `services/chat-svc/internal/api/spec.go` (modify)
|
||||
- `services/chat-svc/cmd/server/main.go` (modify)
|
||||
- **Scope**: Add inter-service auth validation and job queue producer to chat-svc
|
||||
|
||||
## Task 3: Register chat_task handler in worker-svc
|
||||
- **ID**: task-3
|
||||
- **Status**: completed
|
||||
- **Blocked by**: none
|
||||
- **Files**:
|
||||
- `workers/worker-svc/internal/handlers/chat_task.go` (new)
|
||||
- `workers/worker-svc/internal/handlers/chat_task_test.go` (new)
|
||||
- `workers/worker-svc/cmd/worker/main.go` (modify)
|
||||
- **Scope**: Register and implement chat_task job handler in worker-svc
|
||||
55
services/auth-svc/internal/api/handlers/validate.go
Normal file
55
services/auth-svc/internal/api/handlers/validate.go
Normal file
@ -0,0 +1,55 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/pkg/auth"
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/pkg/httperror"
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/pkg/httpresponse"
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/pkg/logging"
|
||||
)
|
||||
|
||||
// Validate handles token validation requests from sibling services.
|
||||
type Validate struct {
|
||||
validator auth.Validator
|
||||
logger *logging.Logger
|
||||
}
|
||||
|
||||
// NewValidate creates a new Validate handler with injected dependencies.
|
||||
func NewValidate(validator auth.Validator, logger *logging.Logger) *Validate {
|
||||
return &Validate{
|
||||
validator: validator,
|
||||
logger: logger.WithComponent("ValidateHandler"),
|
||||
}
|
||||
}
|
||||
|
||||
// ValidateResponse is the response body for a successful token validation.
|
||||
type ValidateResponse struct {
|
||||
UserID string `json:"user_id"`
|
||||
Email string `json:"email,omitempty"`
|
||||
Roles []string `json:"roles,omitempty"`
|
||||
Scopes []string `json:"scopes,omitempty"`
|
||||
}
|
||||
|
||||
// Check validates the Bearer token from the Authorization header
|
||||
// and returns the authenticated user info.
|
||||
func (h *Validate) Check(w http.ResponseWriter, r *http.Request) error {
|
||||
token := auth.ExtractBearerToken(r)
|
||||
if token == "" {
|
||||
return httperror.Unauthorized("missing bearer token")
|
||||
}
|
||||
|
||||
user, err := h.validator.Validate(r.Context(), token)
|
||||
if err != nil {
|
||||
h.logger.Debug("token validation failed", "error", err)
|
||||
return httperror.Unauthorized("invalid token")
|
||||
}
|
||||
|
||||
httpresponse.OK(w, r, ValidateResponse{
|
||||
UserID: user.ID,
|
||||
Email: user.Email,
|
||||
Roles: user.Roles,
|
||||
Scopes: user.Scopes,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
107
services/auth-svc/internal/api/handlers/validate_test.go
Normal file
107
services/auth-svc/internal/api/handlers/validate_test.go
Normal file
@ -0,0 +1,107 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/pkg/auth"
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/pkg/logging"
|
||||
)
|
||||
|
||||
// mockValidator implements auth.Validator for testing.
|
||||
type mockValidator struct {
|
||||
user *auth.User
|
||||
err error
|
||||
}
|
||||
|
||||
func (m *mockValidator) Validate(ctx context.Context, token string) (*auth.User, error) {
|
||||
if m.err != nil {
|
||||
return nil, m.err
|
||||
}
|
||||
return m.user, nil
|
||||
}
|
||||
|
||||
func TestValidate_Check(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
token string
|
||||
validator *mockValidator
|
||||
wantStatus int
|
||||
wantUserID string
|
||||
}{
|
||||
{
|
||||
name: "valid token",
|
||||
token: "valid-jwt-token",
|
||||
validator: &mockValidator{
|
||||
user: &auth.User{
|
||||
ID: "user-123",
|
||||
Email: "user@example.com",
|
||||
Roles: []string{"admin"},
|
||||
},
|
||||
},
|
||||
wantStatus: http.StatusOK,
|
||||
wantUserID: "user-123",
|
||||
},
|
||||
{
|
||||
name: "missing token",
|
||||
token: "",
|
||||
validator: &mockValidator{},
|
||||
wantStatus: http.StatusUnauthorized,
|
||||
},
|
||||
{
|
||||
name: "invalid token",
|
||||
token: "bad-token",
|
||||
validator: &mockValidator{
|
||||
err: errors.New("token invalid"),
|
||||
},
|
||||
wantStatus: http.StatusUnauthorized,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
handler := NewValidate(tt.validator, logging.Nop())
|
||||
|
||||
req := httptest.NewRequest(http.MethodPost, "/api/auth-svc/validate", nil)
|
||||
if tt.token != "" {
|
||||
req.Header.Set("Authorization", "Bearer "+tt.token)
|
||||
}
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
err := handler.Check(w, req)
|
||||
if err != nil {
|
||||
// Handler returns error for app.Wrap to handle
|
||||
if tt.wantStatus == http.StatusOK {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if tt.wantStatus != http.StatusOK {
|
||||
t.Fatalf("expected error but got nil")
|
||||
}
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected status %d, got %d", http.StatusOK, w.Code)
|
||||
}
|
||||
|
||||
var resp map[string]any
|
||||
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
|
||||
t.Fatalf("failed to decode response: %v", err)
|
||||
}
|
||||
|
||||
data, ok := resp["data"].(map[string]any)
|
||||
if !ok {
|
||||
t.Fatal("expected 'data' field in response")
|
||||
}
|
||||
|
||||
if data["user_id"] != tt.wantUserID {
|
||||
t.Errorf("expected user_id %q, got %q", tt.wantUserID, data["user_id"])
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -18,9 +18,16 @@ func RegisterRoutes(application *app.App, exampleService *service.ExampleService
|
||||
logger := application.Logger()
|
||||
cfg := config.Load()
|
||||
|
||||
// Create JWT validator (shared between middleware and /validate endpoint)
|
||||
jwtValidator := auth.NewJWTValidator(auth.JWTConfig{
|
||||
Secret: []byte(cfg.JWTSecret),
|
||||
Issuer: "sp4-v2-1770499323",
|
||||
})
|
||||
|
||||
// Initialize handlers with injected services
|
||||
healthHandler := handlers.NewHealth(logger)
|
||||
exampleHandler := handlers.NewExample(exampleService, logger)
|
||||
validateHandler := handlers.NewValidate(jwtValidator, logger)
|
||||
|
||||
// Build and mount OpenAPI spec
|
||||
spec := NewServiceSpec()
|
||||
@ -31,6 +38,9 @@ func RegisterRoutes(application *app.App, exampleService *service.ExampleService
|
||||
application.Route("/api/auth-svc", func(r app.Router) {
|
||||
r.Get("/health", healthHandler.Check)
|
||||
|
||||
// Token validation endpoint (called by sibling services)
|
||||
r.Post("/validate", app.Wrap(validateHandler.Check))
|
||||
|
||||
// Public routes (no auth required)
|
||||
r.Get("/examples", app.Wrap(exampleHandler.List))
|
||||
r.Get("/examples/{id}", app.Wrap(exampleHandler.Get))
|
||||
@ -39,10 +49,7 @@ func RegisterRoutes(application *app.App, exampleService *service.ExampleService
|
||||
r.Group(func(r app.Router) {
|
||||
if cfg.AuthEnabled {
|
||||
r.Use(auth.Middleware(auth.MiddlewareConfig{
|
||||
Validator: auth.NewJWTValidator(auth.JWTConfig{
|
||||
Secret: []byte(cfg.JWTSecret),
|
||||
Issuer: "sp4-v2-1770499323",
|
||||
}),
|
||||
Validator: jwtValidator,
|
||||
}))
|
||||
}
|
||||
|
||||
|
||||
@ -8,6 +8,7 @@ func NewServiceSpec() *openapi.OpenAPISpec {
|
||||
WithDescription("REST API for the auth-svc service").
|
||||
WithBearerSecurity("bearer", "JWT authentication token").
|
||||
WithTag("Health", "Service health endpoints").
|
||||
WithTag("Auth", "Authentication endpoints").
|
||||
WithTag("Examples", "Example CRUD endpoints")
|
||||
|
||||
// Define reusable schemas
|
||||
@ -24,6 +25,13 @@ func NewServiceSpec() *openapi.OpenAPISpec {
|
||||
"description": openapi.StringWithMinMax(0, 500).WithDescription("Optional description"),
|
||||
}, "name"))
|
||||
|
||||
spec.WithSchema("ValidateResponse", openapi.Object(map[string]openapi.Schema{
|
||||
"user_id": openapi.String().WithDescription("Authenticated user ID"),
|
||||
"email": openapi.String().WithDescription("User email"),
|
||||
"roles": openapi.Array(openapi.String()).WithDescription("User roles"),
|
||||
"scopes": openapi.Array(openapi.String()).WithDescription("User scopes"),
|
||||
}, "user_id"))
|
||||
|
||||
spec.WithSchema("UpdateExampleRequest", openapi.Object(map[string]openapi.Schema{
|
||||
"name": openapi.StringWithMinMax(1, 100).WithDescription("Updated name"),
|
||||
"description": openapi.StringWithMinMax(0, 500).WithDescription("Updated description"),
|
||||
@ -41,6 +49,18 @@ func NewServiceSpec() *openapi.OpenAPISpec {
|
||||
},
|
||||
})
|
||||
|
||||
// Validate token
|
||||
spec.AddPath("/api/auth-svc/validate", "post", map[string]any{
|
||||
"summary": "Validate token",
|
||||
"description": "Validates a Bearer token and returns user info. Used by sibling services for token validation.",
|
||||
"tags": []string{"Auth"},
|
||||
"security": []map[string][]string{{"bearer": {}}},
|
||||
"responses": map[string]any{
|
||||
"200": openapi.OpResponse("Token is valid", openapi.ResponseSchema(openapi.Ref("ValidateResponse"))),
|
||||
"401": openapi.OpResponse("Invalid or missing token", openapi.ErrorResponseSchema()),
|
||||
},
|
||||
})
|
||||
|
||||
// List examples
|
||||
spec.AddPath("/api/auth-svc/examples", "get", map[string]any{
|
||||
"summary": "List examples",
|
||||
|
||||
@ -2,10 +2,18 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/pkg/app"
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/pkg/database"
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/pkg/logging"
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/pkg/queue"
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/services/chat-svc/internal/adapter/authclient"
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/services/chat-svc/internal/adapter/jobqueue"
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/services/chat-svc/internal/adapter/memory"
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/services/chat-svc/internal/api"
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/services/chat-svc/internal/config"
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/services/chat-svc/internal/port"
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/services/chat-svc/internal/service"
|
||||
)
|
||||
|
||||
@ -13,9 +21,41 @@ func main() {
|
||||
// Create logger
|
||||
logger := logging.Default()
|
||||
|
||||
// Load config
|
||||
cfg := config.Load()
|
||||
|
||||
// Create adapters (repositories)
|
||||
exampleRepo := memory.NewExampleRepository()
|
||||
|
||||
// Create adapters (inter-service auth client)
|
||||
var authValidator port.AuthValidator
|
||||
authClient, err := authclient.New()
|
||||
if err != nil {
|
||||
logger.Warn("auth-svc client not configured, token validation disabled", "error", err)
|
||||
} else {
|
||||
authValidator = authClient
|
||||
}
|
||||
|
||||
// Create adapters (job queue producer)
|
||||
var taskProducer port.TaskProducer
|
||||
if cfg.Database.URL != "" {
|
||||
pool, err := database.Connect(context.Background(), cfg.Database.URL, database.Options{
|
||||
MaxOpenConns: cfg.Database.MaxOpenConns,
|
||||
MaxIdleConns: cfg.Database.MaxIdleConns,
|
||||
ConnMaxLifetime: cfg.Database.ConnMaxLifetime,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Warn("database not available, task queue disabled", "error", err)
|
||||
} else {
|
||||
defer pool.Close()
|
||||
jobQueue := queue.NewPostgresQueue(pool.DB, logger)
|
||||
taskProducer = jobqueue.New(jobQueue)
|
||||
logger.Info("task queue enabled")
|
||||
}
|
||||
} else {
|
||||
logger.Warn("DATABASE_URL not configured, task queue disabled")
|
||||
}
|
||||
|
||||
// Create services (business logic)
|
||||
exampleService := service.NewExampleService(exampleRepo, logger)
|
||||
|
||||
@ -23,7 +63,7 @@ func main() {
|
||||
application := app.New("chat-svc", app.WithDefaultPort(8001))
|
||||
|
||||
// Register routes with dependency injection
|
||||
api.RegisterRoutes(application, exampleService)
|
||||
api.RegisterRoutes(application, exampleService, authValidator, taskProducer)
|
||||
|
||||
// Start server
|
||||
application.Run()
|
||||
|
||||
68
services/chat-svc/internal/adapter/authclient/client.go
Normal file
68
services/chat-svc/internal/adapter/authclient/client.go
Normal file
@ -0,0 +1,68 @@
|
||||
// Package authclient provides an adapter for validating tokens via the auth-svc.
|
||||
package authclient
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/pkg/httpclient"
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/pkg/svc"
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/services/chat-svc/internal/port"
|
||||
)
|
||||
|
||||
// compile-time check
|
||||
var _ port.AuthValidator = (*Client)(nil)
|
||||
|
||||
// validateEnvelope matches the auth-svc response envelope: {data: ValidateResponse, meta: ...}
|
||||
type validateEnvelope struct {
|
||||
Data port.AuthUser `json:"data"`
|
||||
}
|
||||
|
||||
// Client validates tokens by calling auth-svc's /validate endpoint.
|
||||
type Client struct {
|
||||
baseURL string
|
||||
httpClient *httpclient.Client
|
||||
}
|
||||
|
||||
// New creates a new auth-svc client adapter.
|
||||
// Returns an error if AUTH_SVC_URL is not configured.
|
||||
func New() (*Client, error) {
|
||||
baseURL := svc.ServiceURL("auth-svc")
|
||||
if baseURL == "" {
|
||||
return nil, fmt.Errorf("auth-svc not configured (missing AUTH_SVC_URL env var)")
|
||||
}
|
||||
|
||||
httpClient := httpclient.New(httpclient.Config{
|
||||
MaxRetries: 3,
|
||||
CircuitBreaker: httpclient.NewCircuitBreaker(httpclient.DefaultCircuitBreakerConfig()),
|
||||
})
|
||||
|
||||
return &Client{
|
||||
baseURL: baseURL,
|
||||
httpClient: httpClient,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ValidateToken calls auth-svc's /validate endpoint with the given Bearer token.
|
||||
func (c *Client) ValidateToken(ctx context.Context, token string) (*port.AuthUser, error) {
|
||||
url := c.baseURL + "/api/auth-svc/validate"
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create validate request: %w", err)
|
||||
}
|
||||
req.Header.Set("Authorization", "Bearer "+token)
|
||||
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("auth-svc request failed: %w", err)
|
||||
}
|
||||
|
||||
envelope, err := svc.DecodeResponse[validateEnvelope](resp)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("auth-svc validation failed: %w", err)
|
||||
}
|
||||
|
||||
return &envelope.Data, nil
|
||||
}
|
||||
37
services/chat-svc/internal/adapter/jobqueue/producer.go
Normal file
37
services/chat-svc/internal/adapter/jobqueue/producer.go
Normal file
@ -0,0 +1,37 @@
|
||||
// Package jobqueue provides an adapter for enqueuing async tasks.
|
||||
package jobqueue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/pkg/queue"
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/services/chat-svc/internal/port"
|
||||
)
|
||||
|
||||
// compile-time check
|
||||
var _ port.TaskProducer = (*Producer)(nil)
|
||||
|
||||
// Producer enqueues chat tasks to the job queue.
|
||||
type Producer struct {
|
||||
queue queue.Producer
|
||||
}
|
||||
|
||||
// New creates a new task producer.
|
||||
func New(q queue.Producer) *Producer {
|
||||
return &Producer{queue: q}
|
||||
}
|
||||
|
||||
// EnqueueTask enqueues a chat_task job with the given action and payload.
|
||||
func (p *Producer) EnqueueTask(ctx context.Context, action string, payload map[string]any) (string, error) {
|
||||
if payload == nil {
|
||||
payload = make(map[string]any)
|
||||
}
|
||||
payload["action"] = action
|
||||
|
||||
jobID, err := p.queue.Enqueue(ctx, "chat_task", payload)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("enqueue chat task: %w", err)
|
||||
}
|
||||
return jobID, nil
|
||||
}
|
||||
54
services/chat-svc/internal/api/handlers/task.go
Normal file
54
services/chat-svc/internal/api/handlers/task.go
Normal file
@ -0,0 +1,54 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/pkg/app"
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/pkg/httperror"
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/pkg/httpresponse"
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/pkg/logging"
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/services/chat-svc/internal/port"
|
||||
)
|
||||
|
||||
// Task handles HTTP requests for enqueuing async tasks.
|
||||
type Task struct {
|
||||
producer port.TaskProducer
|
||||
logger *logging.Logger
|
||||
}
|
||||
|
||||
// NewTask creates a new Task handler with injected dependencies.
|
||||
func NewTask(producer port.TaskProducer, logger *logging.Logger) *Task {
|
||||
return &Task{
|
||||
producer: producer,
|
||||
logger: logger.WithComponent("TaskHandler"),
|
||||
}
|
||||
}
|
||||
|
||||
// EnqueueRequest is the request body for enqueuing a task.
|
||||
type EnqueueRequest struct {
|
||||
Action string `json:"action" validate:"required"`
|
||||
Payload map[string]any `json:"payload"`
|
||||
}
|
||||
|
||||
// EnqueueResponse is the response body for a successfully enqueued task.
|
||||
type EnqueueResponse struct {
|
||||
JobID string `json:"job_id"`
|
||||
}
|
||||
|
||||
// Enqueue creates an async task for background processing.
|
||||
func (h *Task) Enqueue(w http.ResponseWriter, r *http.Request) error {
|
||||
var req EnqueueRequest
|
||||
if err := app.BindAndValidate(r, &req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
jobID, err := h.producer.EnqueueTask(r.Context(), req.Action, req.Payload)
|
||||
if err != nil {
|
||||
h.logger.Error("failed to enqueue task", "action", req.Action, "error", err)
|
||||
return httperror.Internal("failed to enqueue task")
|
||||
}
|
||||
|
||||
h.logger.Info("task enqueued", "job_id", jobID, "action", req.Action)
|
||||
httpresponse.Accepted(w, r, EnqueueResponse{JobID: jobID})
|
||||
return nil
|
||||
}
|
||||
117
services/chat-svc/internal/api/handlers/task_test.go
Normal file
117
services/chat-svc/internal/api/handlers/task_test.go
Normal file
@ -0,0 +1,117 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/pkg/logging"
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/services/chat-svc/internal/port"
|
||||
)
|
||||
|
||||
// mockTaskProducer implements port.TaskProducer for testing.
|
||||
type mockTaskProducer struct {
|
||||
lastAction string
|
||||
lastPayload map[string]any
|
||||
jobID string
|
||||
err error
|
||||
}
|
||||
|
||||
var _ port.TaskProducer = (*mockTaskProducer)(nil)
|
||||
|
||||
func (m *mockTaskProducer) EnqueueTask(ctx context.Context, action string, payload map[string]any) (string, error) {
|
||||
m.lastAction = action
|
||||
m.lastPayload = payload
|
||||
if m.err != nil {
|
||||
return "", m.err
|
||||
}
|
||||
return m.jobID, nil
|
||||
}
|
||||
|
||||
func TestTask_Enqueue(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
body any
|
||||
producer *mockTaskProducer
|
||||
wantStatus int
|
||||
wantJobID string
|
||||
}{
|
||||
{
|
||||
name: "valid request",
|
||||
body: EnqueueRequest{
|
||||
Action: "send_notification",
|
||||
Payload: map[string]any{"user_id": "user-123", "message": "Hello"},
|
||||
},
|
||||
producer: &mockTaskProducer{jobID: "job-456"},
|
||||
wantStatus: http.StatusAccepted,
|
||||
wantJobID: "job-456",
|
||||
},
|
||||
{
|
||||
name: "empty body",
|
||||
body: nil,
|
||||
producer: &mockTaskProducer{},
|
||||
wantStatus: http.StatusBadRequest,
|
||||
},
|
||||
{
|
||||
name: "producer error",
|
||||
body: EnqueueRequest{
|
||||
Action: "send_notification",
|
||||
},
|
||||
producer: &mockTaskProducer{err: errors.New("queue unavailable")},
|
||||
wantStatus: http.StatusInternalServerError,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
handler := NewTask(tt.producer, logging.Nop())
|
||||
|
||||
var body []byte
|
||||
if tt.body != nil {
|
||||
var err error
|
||||
body, err = json.Marshal(tt.body)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to marshal body: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
req := httptest.NewRequest(http.MethodPost, "/api/chat-svc/tasks", bytes.NewReader(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
err := handler.Enqueue(w, req)
|
||||
if err != nil {
|
||||
if tt.wantStatus == http.StatusAccepted {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if tt.wantStatus != http.StatusAccepted {
|
||||
t.Fatalf("expected error but got nil")
|
||||
}
|
||||
|
||||
if w.Code != http.StatusAccepted {
|
||||
t.Errorf("expected status %d, got %d", http.StatusAccepted, w.Code)
|
||||
}
|
||||
|
||||
var resp map[string]any
|
||||
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
|
||||
t.Fatalf("failed to decode response: %v", err)
|
||||
}
|
||||
|
||||
data, ok := resp["data"].(map[string]any)
|
||||
if !ok {
|
||||
t.Fatal("expected 'data' field in response")
|
||||
}
|
||||
|
||||
if data["job_id"] != tt.wantJobID {
|
||||
t.Errorf("expected job_id %q, got %q", tt.wantJobID, data["job_id"])
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -2,19 +2,19 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/pkg/app"
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/pkg/auth"
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/services/chat-svc/internal/api/handlers"
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/services/chat-svc/internal/config"
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/services/chat-svc/internal/port"
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/services/chat-svc/internal/service"
|
||||
)
|
||||
|
||||
// RegisterRoutes registers all HTTP routes for the service.
|
||||
// Routes are mounted under /api/chat-svc to match the ingress path routing.
|
||||
// This allows the monorepo to expose multiple services under a single domain:
|
||||
// - https://domain/api/chat-svc/health
|
||||
// - https://domain/api/chat-svc/examples
|
||||
func RegisterRoutes(application *app.App, exampleService *service.ExampleService) {
|
||||
func RegisterRoutes(application *app.App, exampleService *service.ExampleService, authValidator port.AuthValidator, taskProducer port.TaskProducer) {
|
||||
logger := application.Logger()
|
||||
cfg := config.Load()
|
||||
|
||||
@ -38,17 +38,50 @@ func RegisterRoutes(application *app.App, exampleService *service.ExampleService
|
||||
// Protected routes (auth required when enabled)
|
||||
r.Group(func(r app.Router) {
|
||||
if cfg.AuthEnabled {
|
||||
r.Use(auth.Middleware(auth.MiddlewareConfig{
|
||||
Validator: auth.NewJWTValidator(auth.JWTConfig{
|
||||
// Use auth-svc for token validation when available,
|
||||
// otherwise fall back to local JWT validation.
|
||||
var validator auth.Validator
|
||||
if authValidator != nil {
|
||||
validator = &authSvcValidator{client: authValidator}
|
||||
} else {
|
||||
validator = auth.NewJWTValidator(auth.JWTConfig{
|
||||
Secret: []byte(cfg.JWTSecret),
|
||||
Issuer: "sp4-v2-1770499323",
|
||||
}),
|
||||
})
|
||||
}
|
||||
r.Use(auth.Middleware(auth.MiddlewareConfig{
|
||||
Validator: validator,
|
||||
}))
|
||||
}
|
||||
|
||||
r.Post("/examples", app.Wrap(exampleHandler.Create))
|
||||
r.Put("/examples/{id}", app.Wrap(exampleHandler.Update))
|
||||
r.Delete("/examples/{id}", app.Wrap(exampleHandler.Delete))
|
||||
|
||||
// Task enqueue endpoint (requires auth, requires queue)
|
||||
if taskProducer != nil {
|
||||
taskHandler := handlers.NewTask(taskProducer, logger)
|
||||
r.Post("/tasks", app.Wrap(taskHandler.Enqueue))
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// authSvcValidator adapts port.AuthValidator to auth.Validator,
|
||||
// allowing the auth-svc HTTP client to be used with auth.Middleware.
|
||||
type authSvcValidator struct {
|
||||
client port.AuthValidator
|
||||
}
|
||||
|
||||
func (v *authSvcValidator) Validate(ctx context.Context, token string) (*auth.User, error) {
|
||||
user, err := v.client.ValidateToken(ctx, token)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &auth.User{
|
||||
ID: user.UserID,
|
||||
Email: user.Email,
|
||||
Roles: user.Roles,
|
||||
Scopes: user.Scopes,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -8,7 +8,8 @@ func NewServiceSpec() *openapi.OpenAPISpec {
|
||||
WithDescription("REST API for the chat-svc service").
|
||||
WithBearerSecurity("bearer", "JWT authentication token").
|
||||
WithTag("Health", "Service health endpoints").
|
||||
WithTag("Examples", "Example CRUD endpoints")
|
||||
WithTag("Examples", "Example CRUD endpoints").
|
||||
WithTag("Tasks", "Async task endpoints")
|
||||
|
||||
// Define reusable schemas
|
||||
spec.WithSchema("Example", openapi.Object(map[string]openapi.Schema{
|
||||
@ -29,6 +30,15 @@ func NewServiceSpec() *openapi.OpenAPISpec {
|
||||
"description": openapi.StringWithMinMax(0, 500).WithDescription("Updated description"),
|
||||
}))
|
||||
|
||||
spec.WithSchema("EnqueueRequest", openapi.Object(map[string]openapi.Schema{
|
||||
"action": openapi.StringWithMinMax(1, 100).WithDescription("Task action to perform"),
|
||||
"payload": openapi.Object(map[string]openapi.Schema{}).WithDescription("Arbitrary task payload"),
|
||||
}, "action"))
|
||||
|
||||
spec.WithSchema("EnqueueResponse", openapi.Object(map[string]openapi.Schema{
|
||||
"job_id": openapi.String().WithDescription("ID of the enqueued job"),
|
||||
}, "job_id"))
|
||||
|
||||
// Health
|
||||
spec.AddPath("/api/chat-svc/health", "get", map[string]any{
|
||||
"summary": "Health check",
|
||||
@ -108,5 +118,20 @@ func NewServiceSpec() *openapi.OpenAPISpec {
|
||||
},
|
||||
})
|
||||
|
||||
// Enqueue task
|
||||
spec.AddPath("/api/chat-svc/tasks", "post", map[string]any{
|
||||
"summary": "Enqueue task",
|
||||
"description": "Enqueues an async task for background processing. Requires authentication.",
|
||||
"tags": []string{"Tasks"},
|
||||
"security": []map[string][]string{{"bearer": {}}},
|
||||
"requestBody": openapi.RequestBody(openapi.Ref("EnqueueRequest"), true),
|
||||
"responses": map[string]any{
|
||||
"202": openapi.OpResponse("Task accepted", openapi.ResponseSchema(openapi.Ref("EnqueueResponse"))),
|
||||
"400": openapi.OpResponse("Bad request", openapi.ErrorResponseSchema()),
|
||||
"401": openapi.OpResponse("Unauthorized", openapi.ErrorResponseSchema()),
|
||||
"500": openapi.OpResponse("Internal error", openapi.ErrorResponseSchema()),
|
||||
},
|
||||
})
|
||||
|
||||
return spec
|
||||
}
|
||||
|
||||
18
services/chat-svc/internal/port/auth.go
Normal file
18
services/chat-svc/internal/port/auth.go
Normal file
@ -0,0 +1,18 @@
|
||||
package port
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
// AuthUser represents a validated user from the auth service.
|
||||
type AuthUser struct {
|
||||
UserID string `json:"user_id"`
|
||||
Email string `json:"email,omitempty"`
|
||||
Roles []string `json:"roles,omitempty"`
|
||||
Scopes []string `json:"scopes,omitempty"`
|
||||
}
|
||||
|
||||
// AuthValidator validates tokens against the auth service.
|
||||
type AuthValidator interface {
|
||||
ValidateToken(ctx context.Context, token string) (*AuthUser, error)
|
||||
}
|
||||
10
services/chat-svc/internal/port/queue.go
Normal file
10
services/chat-svc/internal/port/queue.go
Normal file
@ -0,0 +1,10 @@
|
||||
package port
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
// TaskProducer enqueues async tasks for background processing.
|
||||
type TaskProducer interface {
|
||||
EnqueueTask(ctx context.Context, action string, payload map[string]any) (jobID string, err error)
|
||||
}
|
||||
@ -75,9 +75,8 @@ func main() {
|
||||
})
|
||||
|
||||
// Register job handlers
|
||||
// TODO: Register your job handlers here
|
||||
// handler.RegisterHandler("send_email", emailHandler)
|
||||
// handler.RegisterHandler("process_image", imageHandler)
|
||||
chatTaskHandler := handlers.NewChatTaskHandler(logger)
|
||||
handler.RegisterHandler("chat_task", chatTaskHandler.Handle)
|
||||
|
||||
// Setup signal handling
|
||||
sigCh := make(chan os.Signal, 1)
|
||||
|
||||
71
workers/worker-svc/internal/handlers/chat_task.go
Normal file
71
workers/worker-svc/internal/handlers/chat_task.go
Normal file
@ -0,0 +1,71 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/pkg/logging"
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/pkg/queue"
|
||||
)
|
||||
|
||||
// ChatTaskHandler processes chat_task jobs from the queue.
|
||||
type ChatTaskHandler struct {
|
||||
logger *logging.Logger
|
||||
}
|
||||
|
||||
// NewChatTaskHandler creates a new ChatTaskHandler.
|
||||
func NewChatTaskHandler(logger *logging.Logger) *ChatTaskHandler {
|
||||
return &ChatTaskHandler{
|
||||
logger: logger.WithComponent("chat_task"),
|
||||
}
|
||||
}
|
||||
|
||||
// Handle processes a single chat_task job.
|
||||
func (h *ChatTaskHandler) Handle(ctx context.Context, job *queue.Job) error {
|
||||
h.logger.Info("processing chat task",
|
||||
"job_id", job.ID,
|
||||
"payload", job.Payload,
|
||||
)
|
||||
|
||||
action, _ := job.Payload["action"].(string)
|
||||
if action == "" {
|
||||
return fmt.Errorf("missing required payload field: action")
|
||||
}
|
||||
|
||||
switch action {
|
||||
case "send_notification":
|
||||
return h.handleSendNotification(ctx, job)
|
||||
case "process_message":
|
||||
return h.handleProcessMessage(ctx, job)
|
||||
default:
|
||||
return fmt.Errorf("unknown chat task action: %s", action)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *ChatTaskHandler) handleSendNotification(ctx context.Context, job *queue.Job) error {
|
||||
userID, _ := job.Payload["user_id"].(string)
|
||||
message, _ := job.Payload["message"].(string)
|
||||
|
||||
h.logger.Info("sending notification",
|
||||
"job_id", job.ID,
|
||||
"user_id", userID,
|
||||
"message", message,
|
||||
)
|
||||
|
||||
// TODO: Implement actual notification delivery
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *ChatTaskHandler) handleProcessMessage(ctx context.Context, job *queue.Job) error {
|
||||
channelID, _ := job.Payload["channel_id"].(string)
|
||||
content, _ := job.Payload["content"].(string)
|
||||
|
||||
h.logger.Info("processing message",
|
||||
"job_id", job.ID,
|
||||
"channel_id", channelID,
|
||||
"content_length", len(content),
|
||||
)
|
||||
|
||||
// TODO: Implement actual message processing
|
||||
return nil
|
||||
}
|
||||
65
workers/worker-svc/internal/handlers/chat_task_test.go
Normal file
65
workers/worker-svc/internal/handlers/chat_task_test.go
Normal file
@ -0,0 +1,65 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/pkg/logging"
|
||||
"git.threesix.ai/jordan/sp4-v2-1770499323/pkg/queue"
|
||||
)
|
||||
|
||||
func TestChatTaskHandler_Handle(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
payload map[string]any
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "send_notification action",
|
||||
payload: map[string]any{
|
||||
"action": "send_notification",
|
||||
"user_id": "user-123",
|
||||
"message": "Hello!",
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "process_message action",
|
||||
payload: map[string]any{
|
||||
"action": "process_message",
|
||||
"channel_id": "chan-456",
|
||||
"content": "Some message content",
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "missing action",
|
||||
payload: map[string]any{},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "unknown action",
|
||||
payload: map[string]any{
|
||||
"action": "unknown_action",
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
handler := NewChatTaskHandler(logging.Nop())
|
||||
|
||||
job := &queue.Job{
|
||||
ID: "job-test-123",
|
||||
Type: "chat_task",
|
||||
Payload: tt.payload,
|
||||
}
|
||||
|
||||
err := handler.Handle(context.Background(), job)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("Handle() error = %v, wantErr %v", err, tt.wantErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user