persona-community-1/services/persona-api/cmd/server/main.go
2026-02-23 10:21:29 +00:00

421 lines
15 KiB
Go

// Package main is the entry point for the persona-api service.
package main
import (
"context"
"embed"
"flag"
"fmt"
"os"
"time"
"github.com/redis/go-redis/v9"
"git.threesix.ai/jordan/persona-community-1/pkg/album"
"git.threesix.ai/jordan/persona-community-1/pkg/app"
"git.threesix.ai/jordan/persona-community-1/pkg/personagen"
"git.threesix.ai/jordan/persona-community-1/pkg/database"
"git.threesix.ai/jordan/persona-community-1/pkg/gemini"
"git.threesix.ai/jordan/persona-community-1/pkg/laozhang"
"git.threesix.ai/jordan/persona-community-1/pkg/logging"
"git.threesix.ai/jordan/persona-community-1/pkg/mediagen"
mediagenAdapters "git.threesix.ai/jordan/persona-community-1/pkg/mediagen/adapters"
"git.threesix.ai/jordan/persona-community-1/pkg/generation"
emailpkg "git.threesix.ai/jordan/persona-community-1/pkg/email"
"git.threesix.ai/jordan/persona-community-1/pkg/notify"
"git.threesix.ai/jordan/persona-community-1/pkg/queue"
"git.threesix.ai/jordan/persona-community-1/pkg/realtime"
"git.threesix.ai/jordan/persona-community-1/pkg/storage"
"git.threesix.ai/jordan/persona-community-1/pkg/textgen"
textgenAdapters "git.threesix.ai/jordan/persona-community-1/pkg/textgen/adapters"
emailadapter "git.threesix.ai/jordan/persona-community-1/services/persona-api/internal/adapter/email"
componentemail "git.threesix.ai/jordan/persona-community-1/services/persona-api/internal/email"
"git.threesix.ai/jordan/persona-community-1/services/persona-api/internal/adapter/memory"
"git.threesix.ai/jordan/persona-community-1/services/persona-api/internal/adapter/postgres"
"git.threesix.ai/jordan/persona-community-1/services/persona-api/internal/api"
"git.threesix.ai/jordan/persona-community-1/services/persona-api/internal/config"
"git.threesix.ai/jordan/persona-community-1/services/persona-api/internal/port"
"git.threesix.ai/jordan/persona-community-1/services/persona-api/internal/service"
)
//go:embed migrations/*.sql
var migrationsFS embed.FS
func main() {
// Parse flags
exportOpenAPI := flag.Bool("export-openapi", false, "Export OpenAPI spec to stdout and exit")
flag.Parse()
// If exporting OpenAPI, generate spec and exit (used by CI for docs generation)
if *exportOpenAPI {
spec := api.NewServiceSpec()
jsonBytes, err := spec.JSON()
if err != nil {
fmt.Fprintf(os.Stderr, "failed to generate OpenAPI spec: %v\n", err)
os.Exit(1)
}
fmt.Println(string(jsonBytes))
os.Exit(0)
}
// Load config
cfg := config.Load()
// Create logger
logger := logging.Default()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create SSE hub for async event delivery (generation progress, chat, etc.)
sseHub := realtime.NewSSEHub(logger.Logger)
// Initialize storage backend (before queue, since standalone queue handlers use it).
// GCS_BUCKET set = production (GCS). Otherwise = dev (in-memory).
listenPort := fmt.Sprintf("%d", 8001)
var mediaStore storage.Store
if bucket := os.Getenv("GCS_BUCKET"); bucket != "" {
gcsStore, err := storage.NewGCSStore(ctx, bucket, os.Getenv("GCS_SERVICE_ACCOUNT_JSON"), logger.Logger)
if err != nil {
logger.Error("failed to create GCS store", "error", err)
os.Exit(1)
}
defer func() { _ = gcsStore.Close() }()
mediaStore = gcsStore
logger.Info("storage initialized (GCS)", "bucket", bucket)
} else {
memStore := storage.NewMemoryStore("http://localhost:" + listenPort + "/storage")
mediaStore = memStore
logger.Info("storage initialized (in-memory dev mode)")
}
// Select backend based on DATABASE_URL availability.
// With DATABASE_URL: Postgres repos + DB queue (production)
// Without DATABASE_URL: in-memory repos + in-process AI (development)
exampleRepo := memory.NewExampleRepository()
albumRepo := memory.NewAlbumRepository()
var userRepo port.UserRepository
var sessionRepo port.SessionRepository
var authCodeRepo port.AuthCodeRepository
var mediaRepo port.MediaRepository
var jobQueue queue.Producer
var jobReader queue.JobReader
if cfg.Database.URL != "" {
// Connect to database (shared pool for queue + auth repos).
dbPool, err := database.Connect(ctx, cfg.Database.URL, database.Options{
MaxOpenConns: cfg.Database.MaxOpenConns,
MaxIdleConns: cfg.Database.MaxIdleConns,
ConnMaxLifetime: cfg.Database.ConnMaxLifetime,
})
if err != nil {
logger.Error("failed to connect to database", "error", err)
os.Exit(1)
}
logger.Info("connected to database")
// Verify the database connection is actually alive before proceeding.
if err := dbPool.DB.PingContext(ctx); err != nil {
logger.Error("database health check failed", "error", err)
os.Exit(1)
}
logger.Info("database health check passed")
// Run auth migrations.
if err := database.RunMigrations(ctx, dbPool, migrationsFS, "migrations"); err != nil {
logger.Error("failed to run auth migrations", "error", err)
os.Exit(1)
}
logger.Info("auth migrations complete")
// Postgres-backed repositories.
userRepo = postgres.NewUserRepository(dbPool.DB)
sessionRepo = postgres.NewSessionRepository(dbPool.DB)
authCodeRepo = postgres.NewAuthCodeRepository(dbPool.DB)
mediaRepo = postgres.NewMediaObjectRepository(dbPool.DB)
// DB-backed queue.
jobQueue, jobReader = setupDBQueue(ctx, cfg, dbPool, sseHub, logger)
} else {
logger.Info("DATABASE_URL not set — running in standalone mode (in-memory queue + in-process AI)")
userRepo = memory.NewUserRepository(cfg.DevUserEmail, cfg.DevUserPassword)
sessionRepo = memory.NewSessionRepository()
authCodeRepo = memory.NewAuthCodeRepository()
mediaRepo = memory.NewMediaRepository()
jobQueue, jobReader = setupStandaloneQueue(ctx, mediaStore, albumRepo, sseHub, logger)
}
// Validate required config.
if cfg.JWTSecret == "" {
logger.Error("JWT_SECRET must be set (even in development)")
os.Exit(1)
}
// Load email renderer (HTML templates embedded at build time).
emailRenderer, err := emailpkg.NewRendererFromFS(componentemail.TemplateFS, "templates", emailpkg.BrandConfig{
AppName: cfg.AppName,
AppURL: cfg.AppURL,
SupportEmail: cfg.SupportEmail,
LogoURL: cfg.LogoURL,
PrimaryColor: cfg.BrandColor,
})
if err != nil {
logger.Error("failed to load email templates", "error", err)
os.Exit(1)
}
logger.Info("email renderer loaded", "templates", len(emailRenderer.Purposes()))
// Create email sender — notify service in production (NOTIFY_URL set), log-only for dev.
var emailSender port.EmailSender
if cfg.NotifyURL != "" {
notifyClient, err := notify.NewClient(notify.Config{
URL: cfg.NotifyURL,
APIKey: cfg.NotifyAPIKey,
Logger: logger.Logger,
})
if err != nil {
logger.Error("failed to create notify client", "error", err)
os.Exit(1)
}
emailSender = emailadapter.NewNotifySender(notifyClient, emailRenderer, cfg.NotifyHost, cfg.NotifyFrom, logger)
logger.Info("email sender initialized (notify)", "url", cfg.NotifyURL, "host", cfg.NotifyHost)
} else {
emailSender = emailadapter.NewLogSender(logger)
logger.Info("email sender initialized (log-only dev mode)")
}
// Create services (business logic)
exampleService := service.NewExampleService(exampleRepo, logger)
albumService := service.NewAlbumService(albumRepo, jobQueue, logger)
authService := service.NewAuthService(
userRepo, sessionRepo, authCodeRepo, emailSender,
cfg.JWTSecret, cfg.RegistrationEnabled, logger,
)
// Create application
application := app.New("persona-api", app.WithDefaultPort(8001))
// Mount in-memory storage HTTP handler for dev mode
if memStore, ok := mediaStore.(*storage.MemoryStore); ok {
application.Router().Handle("/storage/*", memStore)
}
// Register routes with dependency injection
api.RegisterRoutes(application, &api.Dependencies{
ExampleService: exampleService,
AuthService: authService,
AlbumService: albumService,
Queue: jobQueue,
JobReader: jobReader,
SSEHub: sseHub,
Store: mediaStore,
MediaRepo: mediaRepo,
EmailRenderer: emailRenderer,
})
// Start background cleanup of expired sessions and auth codes.
go runCleanup(ctx, sessionRepo, authCodeRepo, logger)
// Start server
application.Run()
}
// setupDBQueue initializes the production queue backend using the shared database pool + optional Redis.
// Returns both Producer (for enqueue) and JobReader (for status polling).
func setupDBQueue(ctx context.Context, cfg *config.Config, pool *database.Pool, sseHub *realtime.SSEHub, logger *logging.Logger) (queue.Producer, queue.JobReader) {
if err := queue.RunMigrations(ctx, pool); err != nil {
logger.Error("failed to run queue migrations", "error", err)
os.Exit(1)
}
logger.Info("queue migrations complete")
jobQueue := queue.NewQueue(pool.DB, logger)
// Start Redis SSE subscriber if configured.
if cfg.RedisURL != "" {
opts, err := redis.ParseURL(cfg.RedisURL)
if err != nil {
logger.Error("failed to parse REDIS_URL", "error", err)
os.Exit(1)
}
redisClient := redis.NewClient(opts)
if err := redisClient.Ping(ctx).Err(); err != nil {
logger.Error("failed to connect to Redis", "error", err)
os.Exit(1)
}
logger.Info("connected to Redis")
go func() {
if err := realtime.RunSSESubscriber(ctx, redisClient, sseHub, logger.Logger); err != nil {
logger.Error("SSE Redis subscriber stopped", "error", err)
}
}()
} else {
logger.Warn("REDIS_URL not set — SSE events from worker will not be delivered")
}
return jobQueue, jobQueue
}
// setupStandaloneQueue initializes an in-memory queue with in-process AI handlers.
// This mode requires no database or Redis — everything runs in a single process.
// Returns both Producer (for enqueue) and JobReader (for status polling).
func setupStandaloneQueue(ctx context.Context, store storage.Store, albumUpdater album.AlbumUpdater, sseHub *realtime.SSEHub, logger *logging.Logger) (queue.Producer, queue.JobReader) {
memQueue := queue.NewMemoryQueue(logger.Logger)
// LocalPublisher delivers events directly to the SSE hub (no Redis needed).
pub := realtime.NewLocalPublisher(sseHub)
// Initialize AI providers
mediagenManager := initMediagen(ctx, logger)
textgenManager := initTextgen(ctx, logger)
// Register job handlers (same handlers the worker uses).
if mediagenManager != nil {
memQueue.RegisterHandler("generate_image", generation.ImageHandler(mediagenManager, store, pub, logger))
memQueue.RegisterHandler("generate_video", generation.VideoHandler(mediagenManager, store, pub, logger))
memQueue.RegisterHandler("generate_anchor", album.AnchorHandler(mediagenManager, store, pub, albumUpdater, logger))
memQueue.RegisterHandler("generate_shot", album.ShotHandler(mediagenManager, store, pub, albumUpdater, logger))
}
if textgenManager != nil {
memQueue.RegisterHandler("generate_text", generation.TextHandler(textgenManager, pub, logger))
memQueue.RegisterHandler("ai_chat_response", generation.ChatResponseHandler(textgenManager, pub, logger))
}
// Persona generation requires both textgen (5-stage LLM pipeline) and mediagen (20 images + 4 videos).
if textgenManager != nil && mediagenManager != nil {
memQueue.RegisterHandler("persona_generate", personagen.QueueHandler(textgenManager, mediagenManager, store, pub, logger.Logger))
}
return memQueue, memQueue
}
// initMediagen creates a mediagen manager from available AI provider credentials.
func initMediagen(ctx context.Context, logger *logging.Logger) *mediagen.Manager {
var laozhangMediaProvider *mediagenAdapters.LaoZhangProvider
var geminiMediaProvider *mediagenAdapters.GeminiProvider
if apiKey := os.Getenv("LAOZHANG_API_KEY"); apiKey != "" {
client, err := laozhang.NewClient(laozhang.Config{
APIKey: apiKey,
VideoTimeout: 5 * time.Minute,
Logger: logger.Logger,
})
if err != nil {
logger.Warn("failed to create LaoZhang client", "error", err)
} else {
laozhangMediaProvider = mediagenAdapters.NewLaoZhangProvider(client)
logger.Info("LaoZhang media provider initialized")
}
}
if apiKey := os.Getenv("GEMINI_API_KEY"); apiKey != "" {
client, err := gemini.NewClient(ctx, gemini.Config{
APIKey: apiKey,
Logger: logger.Logger,
})
if err != nil {
logger.Warn("failed to create Gemini client", "error", err)
} else {
geminiMediaProvider = mediagenAdapters.NewGeminiProvider(client)
logger.Info("Gemini media provider initialized")
}
}
if laozhangMediaProvider == nil && geminiMediaProvider == nil {
logger.Warn("no media generation providers available (set LAOZHANG_API_KEY or GEMINI_API_KEY)")
return nil
}
mgCfg := mediagen.ProductionConfig(mediagen.ProviderSet{
LaoZhang: laozhangMediaProvider,
Gemini: geminiMediaProvider,
}, mediagen.WithLogger(logger.Logger))
if laozhangMediaProvider != nil {
mgCfg.VideoProviders = append(mgCfg.VideoProviders, laozhangMediaProvider)
}
if geminiMediaProvider != nil {
mgCfg.VideoProviders = append(mgCfg.VideoProviders, geminiMediaProvider)
}
mgr, err := mediagen.NewManager(mgCfg)
if err != nil {
logger.Warn("failed to create mediagen manager", "error", err)
return nil
}
logger.Info("mediagen manager initialized (image + video)")
return mgr
}
// initTextgen creates a textgen manager from available AI provider credentials.
func initTextgen(ctx context.Context, logger *logging.Logger) *textgen.Manager {
var textProviders []textgen.TextGenerator
if apiKey := os.Getenv("LAOZHANG_API_KEY"); apiKey != "" {
client, err := laozhang.NewClient(laozhang.Config{
APIKey: apiKey,
Logger: logger.Logger,
})
if err != nil {
logger.Warn("failed to create LaoZhang text client", "error", err)
} else {
textProviders = append(textProviders, textgenAdapters.NewLaoZhangTextProvider(client, ""))
}
}
if apiKey := os.Getenv("GEMINI_API_KEY"); apiKey != "" {
provider, err := textgenAdapters.NewGeminiTextProvider(ctx, textgenAdapters.GeminiTextConfig{
APIKey: apiKey,
})
if err != nil {
logger.Warn("failed to create Gemini text provider", "error", err)
} else {
textProviders = append(textProviders, provider)
}
}
if len(textProviders) == 0 {
logger.Warn("no text generation providers available")
return nil
}
tgCfg := textgen.ProductionConfig(textgen.ProviderSet{}, textgen.WithLogger(logger.Logger))
tgCfg.Providers = textProviders
mgr, err := textgen.NewManager(tgCfg)
if err != nil {
logger.Warn("failed to create textgen manager", "error", err)
return nil
}
logger.Info("textgen manager initialized")
return mgr
}
// runCleanup periodically removes expired sessions and auth codes.
// Runs every hour. Stops when ctx is cancelled.
func runCleanup(ctx context.Context, sessions port.SessionRepository, codes port.AuthCodeRepository, logger *logging.Logger) {
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
sessCount, err := sessions.DeleteExpired(ctx)
if err != nil {
logger.Warn("failed to cleanup expired sessions", "error", err)
} else if sessCount > 0 {
logger.Info("cleaned up expired sessions", "count", sessCount)
}
codeCount, err := codes.DeleteExpired(ctx)
if err != nil {
logger.Warn("failed to cleanup expired auth codes", "error", err)
} else if codeCount > 0 {
logger.Info("cleaned up expired auth codes", "count", codeCount)
}
}
}
}