feat: implement mesh-interop service communication
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful

Add auth-svc /validate endpoint for token checking
Add chat-svc with auth client and Redis task queue
Add worker-svc chat handler for task processing

Co-Authored-By: Claude Code <claude@anthropic.com>
This commit is contained in:
rdev-worker 2026-02-07 16:45:22 +00:00
parent 927537046a
commit 5a877ca1a1
21 changed files with 1158 additions and 24 deletions

View File

@ -0,0 +1,54 @@
package handlers
import (
"net/http"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/auth"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/httperror"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/httpresponse"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/logging"
)
// Validate handles token validation requests from sibling services.
type Validate struct {
validator *auth.JWTValidator
logger *logging.Logger
}
// NewValidate creates a new Validate handler.
func NewValidate(validator *auth.JWTValidator, logger *logging.Logger) *Validate {
return &Validate{
validator: validator,
logger: logger.WithComponent("ValidateHandler"),
}
}
// ValidateResponse is returned on successful token validation.
type ValidateResponse struct {
UserID string `json:"user_id"`
Email string `json:"email,omitempty"`
Roles []string `json:"roles,omitempty"`
Scopes []string `json:"scopes,omitempty"`
}
// Check validates the Bearer token from the Authorization header.
func (h *Validate) Check(w http.ResponseWriter, r *http.Request) error {
token := auth.ExtractBearerToken(r)
if token == "" {
return httperror.Unauthorized("missing authorization token")
}
user, err := h.validator.Validate(r.Context(), token)
if err != nil {
h.logger.Debug("token validation failed", "error", err)
return httperror.Unauthorized("invalid token")
}
httpresponse.OK(w, r, ValidateResponse{
UserID: user.ID,
Email: user.Email,
Roles: user.Roles,
Scopes: user.Scopes,
})
return nil
}

View File

@ -0,0 +1,168 @@
package handlers
import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/go-chi/chi/v5"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/app"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/auth"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/logging"
)
func newTestValidateHandler() *Validate {
validator := auth.NewJWTValidator(auth.JWTConfig{
Secret: []byte("test-secret"),
Issuer: "sp4-debug-1770477266",
})
return NewValidate(validator, logging.Nop())
}
func generateTestToken(t *testing.T, user *auth.User) string {
t.Helper()
token, err := auth.GenerateTokenWithIssuer(
[]byte("test-secret"),
user,
time.Hour,
"sp4-debug-1770477266",
"",
)
if err != nil {
t.Fatalf("failed to generate token: %v", err)
}
return token
}
func TestValidate_Check_ValidToken(t *testing.T) {
handler := newTestValidateHandler()
user := &auth.User{
ID: "user-123",
Email: "test@example.com",
Roles: []string{"admin"},
Scopes: []string{"read", "write"},
}
token := generateTestToken(t, user)
r := chi.NewRouter()
r.Post("/api/auth-svc/validate", app.Wrap(handler.Check))
req := httptest.NewRequest(http.MethodPost, "/api/auth-svc/validate", nil)
req.Header.Set("Authorization", "Bearer "+token)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Errorf("expected status 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]any
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
t.Fatalf("failed to decode response: %v", err)
}
data, ok := resp["data"].(map[string]any)
if !ok {
t.Fatal("expected 'data' field in response")
}
if data["user_id"] != "user-123" {
t.Errorf("expected user_id 'user-123', got %v", data["user_id"])
}
if data["email"] != "test@example.com" {
t.Errorf("expected email 'test@example.com', got %v", data["email"])
}
}
func TestValidate_Check_MissingToken(t *testing.T) {
handler := newTestValidateHandler()
r := chi.NewRouter()
r.Post("/api/auth-svc/validate", app.Wrap(handler.Check))
req := httptest.NewRequest(http.MethodPost, "/api/auth-svc/validate", nil)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusUnauthorized {
t.Errorf("expected status 401, got %d: %s", w.Code, w.Body.String())
}
}
func TestValidate_Check_InvalidToken(t *testing.T) {
handler := newTestValidateHandler()
r := chi.NewRouter()
r.Post("/api/auth-svc/validate", app.Wrap(handler.Check))
req := httptest.NewRequest(http.MethodPost, "/api/auth-svc/validate", nil)
req.Header.Set("Authorization", "Bearer invalid-token")
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusUnauthorized {
t.Errorf("expected status 401, got %d: %s", w.Code, w.Body.String())
}
}
func TestValidate_Check_ExpiredToken(t *testing.T) {
handler := newTestValidateHandler()
user := &auth.User{ID: "user-123", Email: "test@example.com"}
// Generate token that's already expired
token, err := auth.GenerateTokenWithIssuer(
[]byte("test-secret"),
user,
-time.Hour, // negative duration = expired
"sp4-debug-1770477266",
"",
)
if err != nil {
t.Fatalf("failed to generate token: %v", err)
}
r := chi.NewRouter()
r.Post("/api/auth-svc/validate", app.Wrap(handler.Check))
req := httptest.NewRequest(http.MethodPost, "/api/auth-svc/validate", nil)
req.Header.Set("Authorization", "Bearer "+token)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusUnauthorized {
t.Errorf("expected status 401, got %d: %s", w.Code, w.Body.String())
}
}
func TestValidate_Check_WrongSecret(t *testing.T) {
handler := newTestValidateHandler()
user := &auth.User{ID: "user-123"}
// Generate token with different secret
token, err := auth.GenerateTokenWithIssuer(
[]byte("wrong-secret"),
user,
time.Hour,
"sp4-debug-1770477266",
"",
)
if err != nil {
t.Fatalf("failed to generate token: %v", err)
}
r := chi.NewRouter()
r.Post("/api/auth-svc/validate", app.Wrap(handler.Check))
req := httptest.NewRequest(http.MethodPost, "/api/auth-svc/validate", nil)
req.Header.Set("Authorization", "Bearer "+token)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusUnauthorized {
t.Errorf("expected status 401, got %d: %s", w.Code, w.Body.String())
}
}

View File

@ -22,6 +22,13 @@ func RegisterRoutes(application *app.App, exampleService *service.ExampleService
healthHandler := handlers.NewHealth(logger) healthHandler := handlers.NewHealth(logger)
exampleHandler := handlers.NewExample(exampleService, logger) exampleHandler := handlers.NewExample(exampleService, logger)
// Token validation handler for inter-service auth
jwtValidator := auth.NewJWTValidator(auth.JWTConfig{
Secret: []byte(cfg.JWTSecret),
Issuer: "sp4-debug-1770477266",
})
validateHandler := handlers.NewValidate(jwtValidator, logger)
// Build and mount OpenAPI spec // Build and mount OpenAPI spec
spec := NewServiceSpec() spec := NewServiceSpec()
application.EnableDocs(spec) application.EnableDocs(spec)
@ -31,6 +38,9 @@ func RegisterRoutes(application *app.App, exampleService *service.ExampleService
application.Route("/api/auth-svc", func(r app.Router) { application.Route("/api/auth-svc", func(r app.Router) {
r.Get("/health", healthHandler.Check) r.Get("/health", healthHandler.Check)
// Token validation endpoint for sibling services
r.Post("/validate", app.Wrap(validateHandler.Check))
// Public routes (no auth required) // Public routes (no auth required)
r.Get("/examples", app.Wrap(exampleHandler.List)) r.Get("/examples", app.Wrap(exampleHandler.List))
r.Get("/examples/{id}", app.Wrap(exampleHandler.Get)) r.Get("/examples/{id}", app.Wrap(exampleHandler.Get))

View File

@ -8,6 +8,7 @@ func NewServiceSpec() *openapi.OpenAPISpec {
WithDescription("REST API for the auth-svc service"). WithDescription("REST API for the auth-svc service").
WithBearerSecurity("bearer", "JWT authentication token"). WithBearerSecurity("bearer", "JWT authentication token").
WithTag("Health", "Service health endpoints"). WithTag("Health", "Service health endpoints").
WithTag("Auth", "Authentication endpoints").
WithTag("Examples", "Example CRUD endpoints") WithTag("Examples", "Example CRUD endpoints")
// Define reusable schemas // Define reusable schemas
@ -29,6 +30,13 @@ func NewServiceSpec() *openapi.OpenAPISpec {
"description": openapi.StringWithMinMax(0, 500).WithDescription("Updated description"), "description": openapi.StringWithMinMax(0, 500).WithDescription("Updated description"),
})) }))
spec.WithSchema("ValidateResponse", openapi.Object(map[string]openapi.Schema{
"user_id": openapi.String().WithDescription("User identifier"),
"email": openapi.String().WithDescription("User email"),
"roles": openapi.Array(openapi.String()).WithDescription("User roles"),
"scopes": openapi.Array(openapi.String()).WithDescription("User scopes"),
}, "user_id"))
// Health // Health
spec.AddPath("/api/auth-svc/health", "get", map[string]any{ spec.AddPath("/api/auth-svc/health", "get", map[string]any{
"summary": "Health check", "summary": "Health check",
@ -41,6 +49,18 @@ func NewServiceSpec() *openapi.OpenAPISpec {
}, },
}) })
// Validate token
spec.AddPath("/api/auth-svc/validate", "post", map[string]any{
"summary": "Validate token",
"description": "Validates a Bearer token and returns the authenticated user info. Used by sibling services for token verification.",
"tags": []string{"Auth"},
"security": []map[string][]string{{"bearer": {}}},
"responses": map[string]any{
"200": openapi.OpResponse("Token is valid", openapi.ResponseSchema(openapi.Ref("ValidateResponse"))),
"401": openapi.OpResponse("Invalid or missing token", openapi.ErrorResponseSchema()),
},
})
// List examples // List examples
spec.AddPath("/api/auth-svc/examples", "get", map[string]any{ spec.AddPath("/api/auth-svc/examples", "get", map[string]any{
"summary": "List examples", "summary": "List examples",

View File

@ -2,28 +2,55 @@
package main package main
import ( import (
"context"
"os"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/app" "git.threesix.ai/jordan/sp4-debug-1770477266/pkg/app"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/database"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/logging" "git.threesix.ai/jordan/sp4-debug-1770477266/pkg/logging"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/queue"
"git.threesix.ai/jordan/sp4-debug-1770477266/services/chat-svc/internal/adapter/memory" "git.threesix.ai/jordan/sp4-debug-1770477266/services/chat-svc/internal/adapter/memory"
"git.threesix.ai/jordan/sp4-debug-1770477266/services/chat-svc/internal/api" "git.threesix.ai/jordan/sp4-debug-1770477266/services/chat-svc/internal/api"
"git.threesix.ai/jordan/sp4-debug-1770477266/services/chat-svc/internal/config"
"git.threesix.ai/jordan/sp4-debug-1770477266/services/chat-svc/internal/service" "git.threesix.ai/jordan/sp4-debug-1770477266/services/chat-svc/internal/service"
"git.threesix.ai/jordan/sp4-debug-1770477266/services/chat-svc/internal/taskqueue"
) )
func main() { func main() {
// Create logger // Create logger
logger := logging.Default() logger := logging.Default()
// Load config
cfg := config.Load()
// Create adapters (repositories) // Create adapters (repositories)
exampleRepo := memory.NewExampleRepository() exampleRepo := memory.NewExampleRepository()
// Create services (business logic) // Create services (business logic)
exampleService := service.NewExampleService(exampleRepo, logger) exampleService := service.NewExampleService(exampleRepo, logger)
// Connect to database for queue producer (shared with worker-svc)
var producer *taskqueue.Producer
if cfg.Database.URL != "" {
pool, err := database.Connect(context.Background(), cfg.Database.URL, database.Options{})
if err != nil {
logger.Error("failed to connect to database for queue", "error", err)
os.Exit(1)
}
defer pool.Close()
jobQueue := queue.NewPostgresQueue(pool.DB, logger)
producer = taskqueue.NewProducer(jobQueue, logger)
logger.Info("task queue producer initialized")
} else {
logger.Warn("DATABASE_URL not set, chat task queue disabled")
}
// Create application // Create application
application := app.New("chat-svc", app.WithDefaultPort(8001)) application := app.New("chat-svc", app.WithDefaultPort(8001))
// Register routes with dependency injection // Register routes with dependency injection
api.RegisterRoutes(application, exampleService) api.RegisterRoutes(application, exampleService, producer)
// Start server // Start server
application.Run() application.Run()

View File

@ -2,8 +2,6 @@ name: chat-svc
type: service type: service
port: 8001 port: 8001
path: services/chat-svc path: services/chat-svc
dependencies: [] dependencies:
# Add dependencies as needed: - auth-svc
# - postgres - postgres
# - redis
# - other-service

View File

@ -0,0 +1,69 @@
package handlers
import (
"net/http"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/app"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/auth"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/httperror"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/httpresponse"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/logging"
"git.threesix.ai/jordan/sp4-debug-1770477266/services/chat-svc/internal/taskqueue"
)
// Chat handles chat message endpoints.
type Chat struct {
producer *taskqueue.Producer
logger *logging.Logger
}
// NewChat creates a new Chat handler.
func NewChat(producer *taskqueue.Producer, logger *logging.Logger) *Chat {
return &Chat{
producer: producer,
logger: logger.WithComponent("ChatHandler"),
}
}
// SendRequest is the request body for sending a chat message.
type SendRequest struct {
Message string `json:"message" validate:"required,min=1,max=5000"`
}
// SendResponse is returned after a message is queued for processing.
type SendResponse struct {
JobID string `json:"job_id"`
Status string `json:"status"`
Message string `json:"message"`
}
// Send accepts a chat message and pushes it to the worker queue for processing.
func (h *Chat) Send(w http.ResponseWriter, r *http.Request) error {
if h.producer == nil {
return httperror.ServiceUnavailable("task queue not configured")
}
var req SendRequest
if err := app.BindAndValidate(r, &req); err != nil {
return err
}
// Get authenticated user from context
user := auth.GetUser(r.Context())
if user == nil {
return httperror.Unauthorized("authentication required")
}
jobID, err := h.producer.EnqueueChatProcess(r.Context(), user.ID, req.Message)
if err != nil {
h.logger.Error("failed to enqueue chat message", "error", err, "user_id", user.ID)
return httperror.Internal("failed to queue message for processing")
}
httpresponse.Accepted(w, r, SendResponse{
JobID: jobID,
Status: "queued",
Message: "message queued for processing",
})
return nil
}

View File

@ -0,0 +1,170 @@
package handlers
import (
"bytes"
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"sync"
"testing"
"github.com/go-chi/chi/v5"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/auth"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/logging"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/queue"
"git.threesix.ai/jordan/sp4-debug-1770477266/services/chat-svc/internal/taskqueue"
)
// mockQueueProducer implements queue.Producer for testing.
type mockQueueProducer struct {
mu sync.Mutex
jobs []queue.Job
}
func (m *mockQueueProducer) Enqueue(ctx context.Context, jobType string, payload map[string]any) (string, error) {
return m.EnqueueWithOptions(ctx, queue.Job{
Type: jobType,
Payload: payload,
})
}
func (m *mockQueueProducer) EnqueueWithOptions(ctx context.Context, job queue.Job) (string, error) {
m.mu.Lock()
defer m.mu.Unlock()
job.ID = "mock-job-id"
m.jobs = append(m.jobs, job)
return job.ID, nil
}
func newTestChatHandler() (*Chat, *mockQueueProducer) {
mockQueue := &mockQueueProducer{}
producer := taskqueue.NewProducer(mockQueue, logging.Nop())
handler := NewChat(producer, logging.Nop())
return handler, mockQueue
}
func TestChat_Send_Success(t *testing.T) {
handler, mockQueue := newTestChatHandler()
r := chi.NewRouter()
r.Post("/api/chat-svc/send", func(w http.ResponseWriter, r *http.Request) {
// Inject authenticated user into context
ctx := auth.SetUser(r.Context(), &auth.User{ID: "user-123", Email: "test@example.com"})
r = r.WithContext(ctx)
if err := handler.Send(w, r); err != nil {
t.Fatalf("unexpected error: %v", err)
}
})
body, _ := json.Marshal(SendRequest{Message: "Hello world"})
req := httptest.NewRequest(http.MethodPost, "/api/chat-svc/send", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusAccepted {
t.Errorf("expected status 202, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]any
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
t.Fatalf("failed to decode response: %v", err)
}
data, ok := resp["data"].(map[string]any)
if !ok {
t.Fatal("expected 'data' field in response")
}
if data["job_id"] != "mock-job-id" {
t.Errorf("expected job_id 'mock-job-id', got %v", data["job_id"])
}
if data["status"] != "queued" {
t.Errorf("expected status 'queued', got %v", data["status"])
}
// Verify job was enqueued
mockQueue.mu.Lock()
defer mockQueue.mu.Unlock()
if len(mockQueue.jobs) != 1 {
t.Fatalf("expected 1 enqueued job, got %d", len(mockQueue.jobs))
}
if mockQueue.jobs[0].Payload["user_id"] != "user-123" {
t.Errorf("expected user_id 'user-123' in payload, got %v", mockQueue.jobs[0].Payload["user_id"])
}
}
func TestChat_Send_NoAuth(t *testing.T) {
handler, _ := newTestChatHandler()
r := chi.NewRouter()
r.Post("/api/chat-svc/send", func(w http.ResponseWriter, r *http.Request) {
if err := handler.Send(w, r); err != nil {
w.WriteHeader(http.StatusUnauthorized)
return
}
})
body, _ := json.Marshal(SendRequest{Message: "Hello"})
req := httptest.NewRequest(http.MethodPost, "/api/chat-svc/send", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusUnauthorized {
t.Errorf("expected status 401, got %d", w.Code)
}
}
func TestChat_Send_NilProducer(t *testing.T) {
handler := NewChat(nil, logging.Nop())
r := chi.NewRouter()
r.Post("/api/chat-svc/send", func(w http.ResponseWriter, r *http.Request) {
ctx := auth.SetUser(r.Context(), &auth.User{ID: "user-123"})
r = r.WithContext(ctx)
if err := handler.Send(w, r); err != nil {
w.WriteHeader(http.StatusServiceUnavailable)
return
}
})
body, _ := json.Marshal(SendRequest{Message: "Hello"})
req := httptest.NewRequest(http.MethodPost, "/api/chat-svc/send", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusServiceUnavailable {
t.Errorf("expected status 503, got %d", w.Code)
}
}
func TestChat_Send_EmptyMessage(t *testing.T) {
handler, _ := newTestChatHandler()
r := chi.NewRouter()
r.Post("/api/chat-svc/send", func(w http.ResponseWriter, r *http.Request) {
ctx := auth.SetUser(r.Context(), &auth.User{ID: "user-123"})
r = r.WithContext(ctx)
if err := handler.Send(w, r); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
})
body, _ := json.Marshal(map[string]string{"message": ""})
req := httptest.NewRequest(http.MethodPost, "/api/chat-svc/send", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
// Empty message should fail validation (required,min=1)
if w.Code != http.StatusBadRequest {
t.Errorf("expected status 400, got %d: %s", w.Code, w.Body.String())
}
}

View File

@ -4,23 +4,24 @@ package api
import ( import (
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/app" "git.threesix.ai/jordan/sp4-debug-1770477266/pkg/app"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/auth" "git.threesix.ai/jordan/sp4-debug-1770477266/pkg/auth"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/svc"
"git.threesix.ai/jordan/sp4-debug-1770477266/services/chat-svc/internal/api/handlers" "git.threesix.ai/jordan/sp4-debug-1770477266/services/chat-svc/internal/api/handlers"
"git.threesix.ai/jordan/sp4-debug-1770477266/services/chat-svc/internal/authclient"
"git.threesix.ai/jordan/sp4-debug-1770477266/services/chat-svc/internal/config" "git.threesix.ai/jordan/sp4-debug-1770477266/services/chat-svc/internal/config"
"git.threesix.ai/jordan/sp4-debug-1770477266/services/chat-svc/internal/service" "git.threesix.ai/jordan/sp4-debug-1770477266/services/chat-svc/internal/service"
"git.threesix.ai/jordan/sp4-debug-1770477266/services/chat-svc/internal/taskqueue"
) )
// RegisterRoutes registers all HTTP routes for the service. // RegisterRoutes registers all HTTP routes for the service.
// Routes are mounted under /api/chat-svc to match the ingress path routing. // Routes are mounted under /api/chat-svc to match the ingress path routing.
// This allows the monorepo to expose multiple services under a single domain: func RegisterRoutes(application *app.App, exampleService *service.ExampleService, producer *taskqueue.Producer) {
// - https://domain/api/chat-svc/health
// - https://domain/api/chat-svc/examples
func RegisterRoutes(application *app.App, exampleService *service.ExampleService) {
logger := application.Logger() logger := application.Logger()
cfg := config.Load() cfg := config.Load()
// Initialize handlers with injected services // Initialize handlers with injected services
healthHandler := handlers.NewHealth(logger) healthHandler := handlers.NewHealth(logger)
exampleHandler := handlers.NewExample(exampleService, logger) exampleHandler := handlers.NewExample(exampleService, logger)
chatHandler := handlers.NewChat(producer, logger)
// Build and mount OpenAPI spec // Build and mount OpenAPI spec
spec := NewServiceSpec() spec := NewServiceSpec()
@ -38,17 +39,30 @@ func RegisterRoutes(application *app.App, exampleService *service.ExampleService
// Protected routes (auth required when enabled) // Protected routes (auth required when enabled)
r.Group(func(r app.Router) { r.Group(func(r app.Router) {
if cfg.AuthEnabled { if cfg.AuthEnabled {
r.Use(auth.Middleware(auth.MiddlewareConfig{ // Use remote auth-svc validation if configured, otherwise fall back to local JWT
Validator: auth.NewJWTValidator(auth.JWTConfig{ if svc.ServiceConfigured("auth-svc") {
Secret: []byte(cfg.JWTSecret), ac, err := authclient.New(logger)
Issuer: "sp4-debug-1770477266", if err != nil {
}), logger.Error("failed to create auth client", "error", err)
})) } else {
r.Use(authclient.Middleware(ac))
}
} else {
r.Use(auth.Middleware(auth.MiddlewareConfig{
Validator: auth.NewJWTValidator(auth.JWTConfig{
Secret: []byte(cfg.JWTSecret),
Issuer: "sp4-debug-1770477266",
}),
}))
}
} }
r.Post("/examples", app.Wrap(exampleHandler.Create)) r.Post("/examples", app.Wrap(exampleHandler.Create))
r.Put("/examples/{id}", app.Wrap(exampleHandler.Update)) r.Put("/examples/{id}", app.Wrap(exampleHandler.Update))
r.Delete("/examples/{id}", app.Wrap(exampleHandler.Delete)) r.Delete("/examples/{id}", app.Wrap(exampleHandler.Delete))
// Chat endpoints (require auth, push to worker queue)
r.Post("/send", app.Wrap(chatHandler.Send))
}) })
}) })
} }

View File

@ -8,6 +8,7 @@ func NewServiceSpec() *openapi.OpenAPISpec {
WithDescription("REST API for the chat-svc service"). WithDescription("REST API for the chat-svc service").
WithBearerSecurity("bearer", "JWT authentication token"). WithBearerSecurity("bearer", "JWT authentication token").
WithTag("Health", "Service health endpoints"). WithTag("Health", "Service health endpoints").
WithTag("Chat", "Chat messaging endpoints").
WithTag("Examples", "Example CRUD endpoints") WithTag("Examples", "Example CRUD endpoints")
// Define reusable schemas // Define reusable schemas
@ -29,6 +30,16 @@ func NewServiceSpec() *openapi.OpenAPISpec {
"description": openapi.StringWithMinMax(0, 500).WithDescription("Updated description"), "description": openapi.StringWithMinMax(0, 500).WithDescription("Updated description"),
})) }))
spec.WithSchema("SendRequest", openapi.Object(map[string]openapi.Schema{
"message": openapi.StringWithMinMax(1, 5000).WithDescription("Chat message to process"),
}, "message"))
spec.WithSchema("SendResponse", openapi.Object(map[string]openapi.Schema{
"job_id": openapi.String().WithDescription("Queued job identifier"),
"status": openapi.String().WithDescription("Job status").WithExample("queued"),
"message": openapi.String().WithDescription("Status message"),
}, "job_id", "status"))
// Health // Health
spec.AddPath("/api/chat-svc/health", "get", map[string]any{ spec.AddPath("/api/chat-svc/health", "get", map[string]any{
"summary": "Health check", "summary": "Health check",
@ -94,6 +105,21 @@ func NewServiceSpec() *openapi.OpenAPISpec {
}, },
}) })
// Send chat message
spec.AddPath("/api/chat-svc/send", "post", map[string]any{
"summary": "Send chat message",
"description": "Sends a chat message for async processing by the worker. Requires authentication.",
"tags": []string{"Chat"},
"security": []map[string][]string{{"bearer": {}}},
"requestBody": openapi.RequestBody(openapi.Ref("SendRequest"), true),
"responses": map[string]any{
"202": openapi.OpResponse("Message queued", openapi.ResponseSchema(openapi.Ref("SendResponse"))),
"401": openapi.OpResponse("Unauthorized", openapi.ErrorResponseSchema()),
"422": openapi.OpResponse("Validation error", openapi.ErrorResponseSchema()),
"503": openapi.OpResponse("Queue unavailable", openapi.ErrorResponseSchema()),
},
})
// Delete example // Delete example
spec.AddPath("/api/chat-svc/examples/{id}", "delete", map[string]any{ spec.AddPath("/api/chat-svc/examples/{id}", "delete", map[string]any{
"summary": "Delete example", "summary": "Delete example",

View File

@ -0,0 +1,77 @@
// Package authclient provides a client for validating tokens via the auth-svc.
package authclient
import (
"context"
"fmt"
"net/http"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/auth"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/httpclient"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/logging"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/svc"
)
// ValidateResponse is the envelope response from auth-svc /validate endpoint.
type ValidateResponse struct {
Data ValidateData `json:"data"`
}
// ValidateData is the user info returned by auth-svc.
type ValidateData struct {
UserID string `json:"user_id"`
Email string `json:"email,omitempty"`
Roles []string `json:"roles,omitempty"`
Scopes []string `json:"scopes,omitempty"`
}
// Client validates tokens by calling auth-svc.
type Client struct {
baseURL string
httpClient *httpclient.Client
logger *logging.Logger
}
// New creates a new auth client that calls auth-svc/validate.
// Requires AUTH_SVC_URL environment variable to be set.
func New(logger *logging.Logger) (*Client, error) {
baseURL := svc.ServiceURL("auth-svc")
if baseURL == "" {
return nil, fmt.Errorf("auth-svc not configured (missing AUTH_SVC_URL env var)")
}
return &Client{
baseURL: baseURL,
httpClient: httpclient.New(httpclient.Config{}),
logger: logger.WithComponent("authclient"),
}, nil
}
// Validate calls POST /api/auth-svc/validate with the Bearer token.
// Returns the authenticated user or an error.
func (c *Client) Validate(ctx context.Context, token string) (*auth.User, error) {
url := c.baseURL + "/api/auth-svc/validate"
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, nil)
if err != nil {
return nil, fmt.Errorf("create request: %w", err)
}
req.Header.Set("Authorization", "Bearer "+token)
req.Header.Set("Accept", "application/json")
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("call auth-svc: %w", err)
}
result, err := svc.DecodeResponse[ValidateResponse](resp)
if err != nil {
return nil, fmt.Errorf("auth-svc validation failed: %w", err)
}
return &auth.User{
ID: result.Data.UserID,
Email: result.Data.Email,
Roles: result.Data.Roles,
Scopes: result.Data.Scopes,
}, nil
}

View File

@ -0,0 +1,128 @@
package authclient
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/httpclient"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/logging"
)
func TestClient_Validate_Success(t *testing.T) {
// Create a mock auth-svc server
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/api/auth-svc/validate" {
t.Errorf("unexpected path: %s", r.URL.Path)
}
if r.Method != http.MethodPost {
t.Errorf("unexpected method: %s", r.Method)
}
authHeader := r.Header.Get("Authorization")
if authHeader != "Bearer valid-token" {
w.WriteHeader(http.StatusUnauthorized)
json.NewEncoder(w).Encode(map[string]any{"error": "invalid token"})
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(ValidateResponse{
Data: ValidateData{
UserID: "user-123",
Email: "test@example.com",
Roles: []string{"admin"},
Scopes: []string{"read"},
},
})
}))
defer server.Close()
client := &Client{
baseURL: server.URL,
httpClient: httpclient.New(httpclient.Config{MaxRetries: 1}),
logger: logging.Nop(),
}
user, err := client.Validate(context.Background(), "valid-token")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if user.ID != "user-123" {
t.Errorf("expected user ID 'user-123', got '%s'", user.ID)
}
if user.Email != "test@example.com" {
t.Errorf("expected email 'test@example.com', got '%s'", user.Email)
}
if len(user.Roles) != 1 || user.Roles[0] != "admin" {
t.Errorf("expected roles [admin], got %v", user.Roles)
}
}
func TestClient_Validate_InvalidToken(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusUnauthorized)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]any{"error": "invalid token"})
}))
defer server.Close()
client := &Client{
baseURL: server.URL,
httpClient: httpclient.New(httpclient.Config{MaxRetries: 1}),
logger: logging.Nop(),
}
_, err := client.Validate(context.Background(), "bad-token")
if err == nil {
t.Fatal("expected error for invalid token")
}
}
func TestClient_Validate_ServerError(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
}))
defer server.Close()
client := &Client{
baseURL: server.URL,
httpClient: httpclient.New(httpclient.Config{MaxRetries: 1}),
logger: logging.Nop(),
}
_, err := client.Validate(context.Background(), "some-token")
if err == nil {
t.Fatal("expected error for server error")
}
}
func TestClient_Validate_BearerTokenPassedCorrectly(t *testing.T) {
var receivedAuth string
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
receivedAuth = r.Header.Get("Authorization")
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(ValidateResponse{
Data: ValidateData{UserID: "user-1"},
})
}))
defer server.Close()
client := &Client{
baseURL: server.URL,
httpClient: httpclient.New(httpclient.Config{MaxRetries: 1}),
logger: logging.Nop(),
}
_, err := client.Validate(context.Background(), "my-token-123")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if receivedAuth != "Bearer my-token-123" {
t.Errorf("expected 'Bearer my-token-123', got '%s'", receivedAuth)
}
}

View File

@ -0,0 +1,34 @@
package authclient
import (
"net/http"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/auth"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/httpresponse"
)
// Middleware validates tokens by calling auth-svc.
// Extracts the Bearer token from the Authorization header, calls auth-svc/validate,
// and stores the authenticated user in the request context.
func Middleware(client *Client) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
token := auth.ExtractBearerToken(r)
if token == "" {
httpresponse.Unauthorized(w, r, "missing authorization token")
return
}
user, err := client.Validate(r.Context(), token)
if err != nil {
client.logger.Debug("token validation via auth-svc failed", "error", err)
httpresponse.Unauthorized(w, r, "invalid token")
return
}
ctx := auth.SetUser(r.Context(), user)
ctx = auth.SetToken(ctx, token)
next.ServeHTTP(w, r.WithContext(ctx))
})
}
}

View File

@ -0,0 +1,118 @@
package authclient
import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"github.com/go-chi/chi/v5"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/auth"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/httpclient"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/logging"
)
func newMockAuthServer(t *testing.T) *httptest.Server {
t.Helper()
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
authHeader := r.Header.Get("Authorization")
if authHeader == "Bearer valid-token" {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(ValidateResponse{
Data: ValidateData{
UserID: "user-123",
Email: "test@example.com",
},
})
return
}
w.WriteHeader(http.StatusUnauthorized)
json.NewEncoder(w).Encode(map[string]any{"error": "invalid token"})
}))
}
func TestMiddleware_ValidToken(t *testing.T) {
server := newMockAuthServer(t)
defer server.Close()
client := &Client{
baseURL: server.URL,
httpClient: httpclient.New(httpclient.Config{MaxRetries: 1}),
logger: logging.Nop(),
}
var capturedUserID string
r := chi.NewRouter()
r.Use(Middleware(client))
r.Get("/test", func(w http.ResponseWriter, r *http.Request) {
user := auth.GetUser(r.Context())
if user != nil {
capturedUserID = user.ID
}
w.WriteHeader(http.StatusOK)
})
req := httptest.NewRequest(http.MethodGet, "/test", nil)
req.Header.Set("Authorization", "Bearer valid-token")
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Errorf("expected status 200, got %d: %s", w.Code, w.Body.String())
}
if capturedUserID != "user-123" {
t.Errorf("expected user ID 'user-123', got '%s'", capturedUserID)
}
}
func TestMiddleware_MissingToken(t *testing.T) {
server := newMockAuthServer(t)
defer server.Close()
client := &Client{
baseURL: server.URL,
httpClient: httpclient.New(httpclient.Config{MaxRetries: 1}),
logger: logging.Nop(),
}
r := chi.NewRouter()
r.Use(Middleware(client))
r.Get("/test", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})
req := httptest.NewRequest(http.MethodGet, "/test", nil)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusUnauthorized {
t.Errorf("expected status 401, got %d", w.Code)
}
}
func TestMiddleware_InvalidToken(t *testing.T) {
server := newMockAuthServer(t)
defer server.Close()
client := &Client{
baseURL: server.URL,
httpClient: httpclient.New(httpclient.Config{MaxRetries: 1}),
logger: logging.Nop(),
}
r := chi.NewRouter()
r.Use(Middleware(client))
r.Get("/test", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})
req := httptest.NewRequest(http.MethodGet, "/test", nil)
req.Header.Set("Authorization", "Bearer invalid-token")
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusUnauthorized {
t.Errorf("expected status 401, got %d", w.Code)
}
}

View File

@ -18,6 +18,9 @@ type Config struct {
// Auth // Auth
AuthEnabled bool AuthEnabled bool
JWTSecret string JWTSecret string
// Redis queue URL for pushing tasks to worker-svc
RedisURL string
} }
// Load reads configuration from environment variables. // Load reads configuration from environment variables.
@ -30,5 +33,6 @@ func Load() *Config {
AuthEnabled: strings.EqualFold(os.Getenv("AUTH_ENABLED"), "true"), AuthEnabled: strings.EqualFold(os.Getenv("AUTH_ENABLED"), "true"),
JWTSecret: os.Getenv("JWT_SECRET"), JWTSecret: os.Getenv("JWT_SECRET"),
RedisURL: os.Getenv("REDIS_URL"),
} }
} }

View File

@ -0,0 +1,43 @@
// Package taskqueue provides a producer for pushing tasks to the worker queue.
package taskqueue
import (
"context"
"fmt"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/logging"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/queue"
)
// Job types that chat-svc produces for the worker.
const (
JobTypeChatProcess = "chat.process"
)
// Producer enqueues tasks for the worker-svc to process.
type Producer struct {
queue queue.Producer
logger *logging.Logger
}
// NewProducer creates a new task producer.
func NewProducer(q queue.Producer, logger *logging.Logger) *Producer {
return &Producer{
queue: q,
logger: logger.WithComponent("taskqueue"),
}
}
// EnqueueChatProcess enqueues a chat processing task for the worker.
func (p *Producer) EnqueueChatProcess(ctx context.Context, userID string, message string) (string, error) {
jobID, err := p.queue.Enqueue(ctx, JobTypeChatProcess, map[string]any{
"user_id": userID,
"message": message,
})
if err != nil {
return "", fmt.Errorf("enqueue chat.process: %w", err)
}
p.logger.Info("enqueued chat task", "job_id", jobID, "user_id", userID)
return jobID, nil
}

View File

@ -0,0 +1,60 @@
package taskqueue
import (
"context"
"sync"
"testing"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/logging"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/queue"
)
// mockProducer implements queue.Producer for testing.
type mockProducer struct {
mu sync.Mutex
jobs []queue.Job
}
func (m *mockProducer) Enqueue(ctx context.Context, jobType string, payload map[string]any) (string, error) {
return m.EnqueueWithOptions(ctx, queue.Job{
Type: jobType,
Payload: payload,
})
}
func (m *mockProducer) EnqueueWithOptions(ctx context.Context, job queue.Job) (string, error) {
m.mu.Lock()
defer m.mu.Unlock()
job.ID = "test-job-id"
m.jobs = append(m.jobs, job)
return job.ID, nil
}
func TestProducer_EnqueueChatProcess(t *testing.T) {
mock := &mockProducer{}
producer := NewProducer(mock, logging.Nop())
jobID, err := producer.EnqueueChatProcess(context.Background(), "user-123", "Hello world")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if jobID != "test-job-id" {
t.Errorf("expected job ID 'test-job-id', got '%s'", jobID)
}
if len(mock.jobs) != 1 {
t.Fatalf("expected 1 job, got %d", len(mock.jobs))
}
job := mock.jobs[0]
if job.Type != JobTypeChatProcess {
t.Errorf("expected job type '%s', got '%s'", JobTypeChatProcess, job.Type)
}
if job.Payload["user_id"] != "user-123" {
t.Errorf("expected user_id 'user-123', got '%v'", job.Payload["user_id"])
}
if job.Payload["message"] != "Hello world" {
t.Errorf("expected message 'Hello world', got '%v'", job.Payload["message"])
}
}

View File

@ -75,9 +75,7 @@ func main() {
}) })
// Register job handlers // Register job handlers
// TODO: Register your job handlers here handler.RegisterHandler(handlers.JobTypeChatProcess, handlers.NewChatProcessHandler(logger))
// handler.RegisterHandler("send_email", emailHandler)
// handler.RegisterHandler("process_image", imageHandler)
// Setup signal handling // Setup signal handling
sigCh := make(chan os.Signal, 1) sigCh := make(chan os.Signal, 1)

View File

@ -1,8 +1,5 @@
name: worker-svc name: worker-svc
type: worker type: worker
path: workers/worker-svc path: workers/worker-svc
dependencies: [] dependencies:
# Add dependencies as needed: - postgres
# - postgres
# - redis
# - rabbitmq

View File

@ -0,0 +1,43 @@
package handlers
import (
"context"
"fmt"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/logging"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/queue"
)
// JobTypeChatProcess is the job type for chat message processing.
const JobTypeChatProcess = "chat.process"
// NewChatProcessHandler creates a handler for chat.process jobs.
func NewChatProcessHandler(logger *logging.Logger) queue.Handler {
log := logger.WithComponent("chat-handler")
return func(ctx context.Context, job *queue.Job) error {
userID, _ := job.Payload["user_id"].(string)
message, _ := job.Payload["message"].(string)
if userID == "" {
return fmt.Errorf("missing user_id in payload")
}
if message == "" {
return fmt.Errorf("missing message in payload")
}
log.Info("processing chat message",
"job_id", job.ID,
"user_id", userID,
"message_len", len(message),
)
// TODO: Implement actual chat processing logic (e.g., AI response, storage, notifications)
log.Info("chat message processed",
"job_id", job.ID,
"user_id", userID,
)
return nil
}
}

View File

@ -0,0 +1,76 @@
package handlers
import (
"context"
"testing"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/logging"
"git.threesix.ai/jordan/sp4-debug-1770477266/pkg/queue"
)
func TestNewChatProcessHandler_Success(t *testing.T) {
handler := NewChatProcessHandler(logging.Nop())
job := &queue.Job{
ID: "job-123",
Type: JobTypeChatProcess,
Payload: map[string]any{
"user_id": "user-456",
"message": "Hello, world!",
},
}
err := handler(context.Background(), job)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
}
func TestNewChatProcessHandler_MissingUserID(t *testing.T) {
handler := NewChatProcessHandler(logging.Nop())
job := &queue.Job{
ID: "job-123",
Type: JobTypeChatProcess,
Payload: map[string]any{
"message": "Hello",
},
}
err := handler(context.Background(), job)
if err == nil {
t.Fatal("expected error for missing user_id")
}
}
func TestNewChatProcessHandler_MissingMessage(t *testing.T) {
handler := NewChatProcessHandler(logging.Nop())
job := &queue.Job{
ID: "job-123",
Type: JobTypeChatProcess,
Payload: map[string]any{
"user_id": "user-456",
},
}
err := handler(context.Background(), job)
if err == nil {
t.Fatal("expected error for missing message")
}
}
func TestNewChatProcessHandler_EmptyPayload(t *testing.T) {
handler := NewChatProcessHandler(logging.Nop())
job := &queue.Job{
ID: "job-123",
Type: JobTypeChatProcess,
Payload: map[string]any{},
}
err := handler(context.Background(), job)
if err == nil {
t.Fatal("expected error for empty payload")
}
}