// Package main is the entry point for the persona-api service. package main import ( "context" "embed" "flag" "fmt" "os" "time" "github.com/redis/go-redis/v9" "git.threesix.ai/jordan/persona-community-5/pkg/album" "git.threesix.ai/jordan/persona-community-5/pkg/app" "git.threesix.ai/jordan/persona-community-5/pkg/personagen" "git.threesix.ai/jordan/persona-community-5/pkg/database" "git.threesix.ai/jordan/persona-community-5/pkg/gemini" "git.threesix.ai/jordan/persona-community-5/pkg/laozhang" "git.threesix.ai/jordan/persona-community-5/pkg/logging" "git.threesix.ai/jordan/persona-community-5/pkg/mediagen" mediagenAdapters "git.threesix.ai/jordan/persona-community-5/pkg/mediagen/adapters" "git.threesix.ai/jordan/persona-community-5/pkg/generation" emailpkg "git.threesix.ai/jordan/persona-community-5/pkg/email" "git.threesix.ai/jordan/persona-community-5/pkg/notify" "git.threesix.ai/jordan/persona-community-5/pkg/queue" pkgconfig "git.threesix.ai/jordan/persona-community-5/pkg/config" "git.threesix.ai/jordan/persona-community-5/pkg/realtime" "git.threesix.ai/jordan/persona-community-5/pkg/storage" "git.threesix.ai/jordan/persona-community-5/pkg/textgen" textgenAdapters "git.threesix.ai/jordan/persona-community-5/pkg/textgen/adapters" emailadapter "git.threesix.ai/jordan/persona-community-5/services/persona-api/internal/adapter/email" componentemail "git.threesix.ai/jordan/persona-community-5/services/persona-api/internal/email" "git.threesix.ai/jordan/persona-community-5/services/persona-api/internal/adapter/memory" "git.threesix.ai/jordan/persona-community-5/services/persona-api/internal/adapter/postgres" "git.threesix.ai/jordan/persona-community-5/services/persona-api/internal/api" "git.threesix.ai/jordan/persona-community-5/services/persona-api/internal/config" "git.threesix.ai/jordan/persona-community-5/services/persona-api/internal/port" "git.threesix.ai/jordan/persona-community-5/services/persona-api/internal/service" ) //go:embed migrations/*.sql var migrationsFS embed.FS func main() { // Parse flags exportOpenAPI := flag.Bool("export-openapi", false, "Export OpenAPI spec to stdout and exit") flag.Parse() // If exporting OpenAPI, generate spec and exit (used by CI for docs generation) if *exportOpenAPI { spec := api.NewServiceSpec() jsonBytes, err := spec.JSON() if err != nil { fmt.Fprintf(os.Stderr, "failed to generate OpenAPI spec: %v\n", err) os.Exit(1) } fmt.Println(string(jsonBytes)) os.Exit(0) } // Initialize viper with AutomaticEnv so Viper reads env vars injected by // the platform (K8s secrets, envFrom). Must happen before any config reads. pkgconfig.MustInit(pkgconfig.Options{ AppName: "persona-api", DefaultPort: 8001, }) // Load config cfg := config.Load() // Create logger logger := logging.Default() ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Create SSE hub for async event delivery (generation progress, chat, etc.) sseHub := realtime.NewSSEHub(logger.Logger) // Initialize storage backend (before queue, since standalone queue handlers use it). // GCS_BUCKET set = production (GCS). Otherwise = dev (in-memory). listenPort := fmt.Sprintf("%d", 8001) var mediaStore storage.Store if bucket := os.Getenv("GCS_BUCKET"); bucket != "" { gcsStore, err := storage.NewGCSStore(ctx, bucket, os.Getenv("GCS_SERVICE_ACCOUNT_JSON"), logger.Logger) if err != nil { logger.Error("failed to create GCS store", "error", err) os.Exit(1) } defer func() { _ = gcsStore.Close() }() mediaStore = gcsStore logger.Info("storage initialized (GCS)", "bucket", bucket) } else { memStore := storage.NewMemoryStore("http://localhost:" + listenPort + "/storage") mediaStore = memStore logger.Info("storage initialized (in-memory dev mode)") } // Select backend based on DATABASE_URL availability. // With DATABASE_URL: Postgres repos + DB queue (production) // Without DATABASE_URL: in-memory repos + in-process AI (development) exampleRepo := memory.NewExampleRepository() albumRepo := memory.NewAlbumRepository() var personaRepo port.PersonaRepository var userRepo port.UserRepository var sessionRepo port.SessionRepository var authCodeRepo port.AuthCodeRepository var mediaRepo port.MediaRepository var jobQueue queue.Producer var jobReader queue.JobReader if cfg.Database.URL != "" { // Connect to database (shared pool for queue + auth repos). dbPool, err := database.Connect(ctx, cfg.Database.URL, database.Options{ MaxOpenConns: cfg.Database.MaxOpenConns, MaxIdleConns: cfg.Database.MaxIdleConns, ConnMaxLifetime: cfg.Database.ConnMaxLifetime, }) if err != nil { logger.Error("failed to connect to database", "error", err) os.Exit(1) } logger.Info("connected to database") // Verify the database connection is actually alive before proceeding. if err := dbPool.DB.PingContext(ctx); err != nil { logger.Error("database health check failed", "error", err) os.Exit(1) } logger.Info("database health check passed") // Run auth migrations. if err := database.RunMigrations(ctx, dbPool, migrationsFS, "migrations"); err != nil { logger.Error("failed to run auth migrations", "error", err) os.Exit(1) } logger.Info("auth migrations complete") // Postgres-backed repositories. userRepo = postgres.NewUserRepository(dbPool.DB) sessionRepo = postgres.NewSessionRepository(dbPool.DB) authCodeRepo = postgres.NewAuthCodeRepository(dbPool.DB) mediaRepo = postgres.NewMediaObjectRepository(dbPool.DB) personaRepo = postgres.NewPersonaRepository(dbPool.DB) // DB-backed queue. jobQueue, jobReader = setupDBQueue(ctx, cfg, dbPool, sseHub, logger) } else { logger.Info("DATABASE_URL not set — running in standalone mode (in-memory queue + in-process AI)") userRepo = memory.NewUserRepository(cfg.DevUserEmail, cfg.DevUserPassword) sessionRepo = memory.NewSessionRepository() authCodeRepo = memory.NewAuthCodeRepository() mediaRepo = memory.NewMediaRepository() personaRepo = memory.NewPersonaRepository() jobQueue, jobReader = setupStandaloneQueue(ctx, mediaStore, albumRepo, sseHub, logger) } // Validate required config. if cfg.JWTSecret == "" { logger.Error("JWT_SECRET must be set (even in development)") os.Exit(1) } // Load email renderer (HTML templates embedded at build time). emailRenderer, err := emailpkg.NewRendererFromFS(componentemail.TemplateFS, "templates", emailpkg.BrandConfig{ AppName: cfg.AppName, AppURL: cfg.AppURL, SupportEmail: cfg.SupportEmail, LogoURL: cfg.LogoURL, PrimaryColor: cfg.BrandColor, }) if err != nil { logger.Error("failed to load email templates", "error", err) os.Exit(1) } logger.Info("email renderer loaded", "templates", len(emailRenderer.Purposes())) // Create email sender — notify service in production (NOTIFY_URL + NOTIFY_API_KEY set), log-only for dev. // NOTIFY_URL is a global platform credential; NOTIFY_API_KEY is project-scoped and may not be // provisioned yet. If either is missing, fall back to log-only mode instead of crashing. var emailSender port.EmailSender if cfg.NotifyURL != "" && cfg.NotifyAPIKey != "" { notifyClient, err := notify.NewClient(notify.Config{ URL: cfg.NotifyURL, APIKey: cfg.NotifyAPIKey, Logger: logger.Logger, }) if err != nil { logger.Error("failed to create notify client", "error", err) os.Exit(1) } emailSender = emailadapter.NewNotifySender(notifyClient, emailRenderer, cfg.NotifyHost, cfg.NotifyFrom, logger) logger.Info("email sender initialized (notify)", "url", cfg.NotifyURL, "host", cfg.NotifyHost) } else { emailSender = emailadapter.NewLogSender(logger) if cfg.NotifyURL != "" { logger.Warn("email sender in log-only mode (NOTIFY_API_KEY not set)") } else { logger.Info("email sender initialized (log-only dev mode)") } } // Create services (business logic) exampleService := service.NewExampleService(exampleRepo, logger) albumService := service.NewAlbumService(albumRepo, jobQueue, logger) personaService := service.NewPersonaService(personaRepo, jobQueue, sseHub, logger) authService := service.NewAuthService( userRepo, sessionRepo, authCodeRepo, emailSender, cfg.JWTSecret, cfg.RegistrationEnabled, logger, ) // Create application application := app.New("persona-api", app.WithDefaultPort(8001)) // Mount in-memory storage HTTP handler for dev mode if memStore, ok := mediaStore.(*storage.MemoryStore); ok { application.Router().Handle("/storage/*", memStore) } // Register routes with dependency injection api.RegisterRoutes(application, &api.Dependencies{ ExampleService: exampleService, AuthService: authService, AlbumService: albumService, PersonaService: personaService, Queue: jobQueue, JobReader: jobReader, SSEHub: sseHub, Store: mediaStore, MediaRepo: mediaRepo, EmailRenderer: emailRenderer, }) // Start background cleanup of expired sessions and auth codes. go runCleanup(ctx, sessionRepo, authCodeRepo, logger) // Start server application.Run() } // setupDBQueue initializes the production queue backend using the shared database pool + optional Redis. // Returns both Producer (for enqueue) and JobReader (for status polling). func setupDBQueue(ctx context.Context, cfg *config.Config, pool *database.Pool, sseHub *realtime.SSEHub, logger *logging.Logger) (queue.Producer, queue.JobReader) { if err := queue.RunMigrations(ctx, pool); err != nil { logger.Error("failed to run queue migrations", "error", err) os.Exit(1) } logger.Info("queue migrations complete") jobQueue := queue.NewQueue(pool.DB, logger) // Start Redis SSE subscriber if configured. if cfg.RedisURL != "" { opts, err := redis.ParseURL(cfg.RedisURL) if err != nil { logger.Error("failed to parse REDIS_URL", "error", err) os.Exit(1) } redisClient := redis.NewClient(opts) if err := redisClient.Ping(ctx).Err(); err != nil { logger.Error("failed to connect to Redis", "error", err) os.Exit(1) } logger.Info("connected to Redis") go func() { if err := realtime.RunSSESubscriber(ctx, redisClient, sseHub, logger.Logger); err != nil { logger.Error("SSE Redis subscriber stopped", "error", err) } }() } else { logger.Warn("REDIS_URL not set — SSE events from worker will not be delivered") } return jobQueue, jobQueue } // setupStandaloneQueue initializes an in-memory queue with in-process AI handlers. // This mode requires no database or Redis — everything runs in a single process. // Returns both Producer (for enqueue) and JobReader (for status polling). func setupStandaloneQueue(ctx context.Context, store storage.Store, albumUpdater album.AlbumUpdater, sseHub *realtime.SSEHub, logger *logging.Logger) (queue.Producer, queue.JobReader) { memQueue := queue.NewMemoryQueue(logger.Logger) // LocalPublisher delivers events directly to the SSE hub (no Redis needed). pub := realtime.NewLocalPublisher(sseHub) // Initialize AI providers mediagenManager := initMediagen(ctx, logger) textgenManager := initTextgen(ctx, logger) // Register job handlers (same handlers the worker uses). if mediagenManager != nil { memQueue.RegisterHandler("generate_image", generation.ImageHandler(mediagenManager, store, pub, logger)) memQueue.RegisterHandler("generate_video", generation.VideoHandler(mediagenManager, store, pub, logger)) memQueue.RegisterHandler("generate_anchor", album.AnchorHandler(mediagenManager, store, pub, albumUpdater, logger)) memQueue.RegisterHandler("generate_shot", album.ShotHandler(mediagenManager, store, pub, albumUpdater, logger)) } if textgenManager != nil { memQueue.RegisterHandler("generate_text", generation.TextHandler(textgenManager, pub, logger)) memQueue.RegisterHandler("ai_chat_response", generation.ChatResponseHandler(textgenManager, pub, logger)) } // Persona generation requires both textgen (5-stage LLM pipeline) and mediagen (20 images + 4 videos). if textgenManager != nil && mediagenManager != nil { memQueue.RegisterHandler("persona_generate", personagen.QueueHandler(textgenManager, mediagenManager, store, pub, logger.Logger)) } return memQueue, memQueue } // initMediagen creates a mediagen manager from available AI provider credentials. func initMediagen(ctx context.Context, logger *logging.Logger) *mediagen.Manager { var laozhangMediaProvider *mediagenAdapters.LaoZhangProvider var geminiMediaProvider *mediagenAdapters.GeminiProvider if apiKey := os.Getenv("LAOZHANG_API_KEY"); apiKey != "" { client, err := laozhang.NewClient(laozhang.Config{ APIKey: apiKey, VideoTimeout: 5 * time.Minute, Logger: logger.Logger, }) if err != nil { logger.Warn("failed to create LaoZhang client", "error", err) } else { laozhangMediaProvider = mediagenAdapters.NewLaoZhangProvider(client) logger.Info("LaoZhang media provider initialized") } } if apiKey := os.Getenv("GEMINI_API_KEY"); apiKey != "" { client, err := gemini.NewClient(ctx, gemini.Config{ APIKey: apiKey, Logger: logger.Logger, }) if err != nil { logger.Warn("failed to create Gemini client", "error", err) } else { geminiMediaProvider = mediagenAdapters.NewGeminiProvider(client) logger.Info("Gemini media provider initialized") } } if laozhangMediaProvider == nil && geminiMediaProvider == nil { logger.Warn("no media generation providers available (set LAOZHANG_API_KEY or GEMINI_API_KEY)") return nil } mgCfg := mediagen.ProductionConfig(mediagen.ProviderSet{ LaoZhang: laozhangMediaProvider, Gemini: geminiMediaProvider, }, mediagen.WithLogger(logger.Logger)) if laozhangMediaProvider != nil { mgCfg.VideoProviders = append(mgCfg.VideoProviders, laozhangMediaProvider) } if geminiMediaProvider != nil { mgCfg.VideoProviders = append(mgCfg.VideoProviders, geminiMediaProvider) } mgr, err := mediagen.NewManager(mgCfg) if err != nil { logger.Warn("failed to create mediagen manager", "error", err) return nil } logger.Info("mediagen manager initialized (image + video)") return mgr } // initTextgen creates a textgen manager from available AI provider credentials. func initTextgen(ctx context.Context, logger *logging.Logger) *textgen.Manager { var textProviders []textgen.TextGenerator if apiKey := os.Getenv("LAOZHANG_API_KEY"); apiKey != "" { client, err := laozhang.NewClient(laozhang.Config{ APIKey: apiKey, Logger: logger.Logger, }) if err != nil { logger.Warn("failed to create LaoZhang text client", "error", err) } else { textProviders = append(textProviders, textgenAdapters.NewLaoZhangTextProvider(client, "")) } } if apiKey := os.Getenv("GEMINI_API_KEY"); apiKey != "" { provider, err := textgenAdapters.NewGeminiTextProvider(ctx, textgenAdapters.GeminiTextConfig{ APIKey: apiKey, }) if err != nil { logger.Warn("failed to create Gemini text provider", "error", err) } else { textProviders = append(textProviders, provider) } } if len(textProviders) == 0 { logger.Warn("no text generation providers available") return nil } tgCfg := textgen.ProductionConfig(textgen.ProviderSet{}, textgen.WithLogger(logger.Logger)) tgCfg.Providers = textProviders mgr, err := textgen.NewManager(tgCfg) if err != nil { logger.Warn("failed to create textgen manager", "error", err) return nil } logger.Info("textgen manager initialized") return mgr } // runCleanup periodically removes expired sessions and auth codes. // Runs every hour. Stops when ctx is cancelled. func runCleanup(ctx context.Context, sessions port.SessionRepository, codes port.AuthCodeRepository, logger *logging.Logger) { ticker := time.NewTicker(1 * time.Hour) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: sessCount, err := sessions.DeleteExpired(ctx) if err != nil { logger.Warn("failed to cleanup expired sessions", "error", err) } else if sessCount > 0 { logger.Info("cleaned up expired sessions", "count", sessCount) } codeCount, err := codes.DeleteExpired(ctx) if err != nil { logger.Warn("failed to cleanup expired auth codes", "error", err) } else if codeCount > 0 { logger.Info("cleaned up expired auth codes", "count", codeCount) } } } }