421 lines
15 KiB
Go
421 lines
15 KiB
Go
// Package main is the entry point for the persona-api service.
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"embed"
|
|
"flag"
|
|
"fmt"
|
|
"os"
|
|
"time"
|
|
|
|
"github.com/redis/go-redis/v9"
|
|
|
|
"git.threesix.ai/jordan/persona-community-3/pkg/album"
|
|
"git.threesix.ai/jordan/persona-community-3/pkg/app"
|
|
"git.threesix.ai/jordan/persona-community-3/pkg/personagen"
|
|
"git.threesix.ai/jordan/persona-community-3/pkg/database"
|
|
"git.threesix.ai/jordan/persona-community-3/pkg/gemini"
|
|
"git.threesix.ai/jordan/persona-community-3/pkg/laozhang"
|
|
"git.threesix.ai/jordan/persona-community-3/pkg/logging"
|
|
"git.threesix.ai/jordan/persona-community-3/pkg/mediagen"
|
|
mediagenAdapters "git.threesix.ai/jordan/persona-community-3/pkg/mediagen/adapters"
|
|
"git.threesix.ai/jordan/persona-community-3/pkg/generation"
|
|
emailpkg "git.threesix.ai/jordan/persona-community-3/pkg/email"
|
|
"git.threesix.ai/jordan/persona-community-3/pkg/notify"
|
|
"git.threesix.ai/jordan/persona-community-3/pkg/queue"
|
|
"git.threesix.ai/jordan/persona-community-3/pkg/realtime"
|
|
"git.threesix.ai/jordan/persona-community-3/pkg/storage"
|
|
"git.threesix.ai/jordan/persona-community-3/pkg/textgen"
|
|
textgenAdapters "git.threesix.ai/jordan/persona-community-3/pkg/textgen/adapters"
|
|
emailadapter "git.threesix.ai/jordan/persona-community-3/services/persona-api/internal/adapter/email"
|
|
componentemail "git.threesix.ai/jordan/persona-community-3/services/persona-api/internal/email"
|
|
"git.threesix.ai/jordan/persona-community-3/services/persona-api/internal/adapter/memory"
|
|
"git.threesix.ai/jordan/persona-community-3/services/persona-api/internal/adapter/postgres"
|
|
"git.threesix.ai/jordan/persona-community-3/services/persona-api/internal/api"
|
|
"git.threesix.ai/jordan/persona-community-3/services/persona-api/internal/config"
|
|
"git.threesix.ai/jordan/persona-community-3/services/persona-api/internal/port"
|
|
"git.threesix.ai/jordan/persona-community-3/services/persona-api/internal/service"
|
|
)
|
|
|
|
//go:embed migrations/*.sql
|
|
var migrationsFS embed.FS
|
|
|
|
func main() {
|
|
// Parse flags
|
|
exportOpenAPI := flag.Bool("export-openapi", false, "Export OpenAPI spec to stdout and exit")
|
|
flag.Parse()
|
|
|
|
// If exporting OpenAPI, generate spec and exit (used by CI for docs generation)
|
|
if *exportOpenAPI {
|
|
spec := api.NewServiceSpec()
|
|
jsonBytes, err := spec.JSON()
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "failed to generate OpenAPI spec: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
fmt.Println(string(jsonBytes))
|
|
os.Exit(0)
|
|
}
|
|
|
|
// Load config
|
|
cfg := config.Load()
|
|
|
|
// Create logger
|
|
logger := logging.Default()
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
// Create SSE hub for async event delivery (generation progress, chat, etc.)
|
|
sseHub := realtime.NewSSEHub(logger.Logger)
|
|
|
|
// Initialize storage backend (before queue, since standalone queue handlers use it).
|
|
// GCS_BUCKET set = production (GCS). Otherwise = dev (in-memory).
|
|
listenPort := fmt.Sprintf("%d", 8001)
|
|
var mediaStore storage.Store
|
|
if bucket := os.Getenv("GCS_BUCKET"); bucket != "" {
|
|
gcsStore, err := storage.NewGCSStore(ctx, bucket, os.Getenv("GCS_SERVICE_ACCOUNT_JSON"), logger.Logger)
|
|
if err != nil {
|
|
logger.Error("failed to create GCS store", "error", err)
|
|
os.Exit(1)
|
|
}
|
|
defer func() { _ = gcsStore.Close() }()
|
|
mediaStore = gcsStore
|
|
logger.Info("storage initialized (GCS)", "bucket", bucket)
|
|
} else {
|
|
memStore := storage.NewMemoryStore("http://localhost:" + listenPort + "/storage")
|
|
mediaStore = memStore
|
|
logger.Info("storage initialized (in-memory dev mode)")
|
|
}
|
|
|
|
// Select backend based on DATABASE_URL availability.
|
|
// With DATABASE_URL: Postgres repos + DB queue (production)
|
|
// Without DATABASE_URL: in-memory repos + in-process AI (development)
|
|
exampleRepo := memory.NewExampleRepository()
|
|
albumRepo := memory.NewAlbumRepository()
|
|
var userRepo port.UserRepository
|
|
var sessionRepo port.SessionRepository
|
|
var authCodeRepo port.AuthCodeRepository
|
|
var mediaRepo port.MediaRepository
|
|
var jobQueue queue.Producer
|
|
var jobReader queue.JobReader
|
|
|
|
if cfg.Database.URL != "" {
|
|
// Connect to database (shared pool for queue + auth repos).
|
|
dbPool, err := database.Connect(ctx, cfg.Database.URL, database.Options{
|
|
MaxOpenConns: cfg.Database.MaxOpenConns,
|
|
MaxIdleConns: cfg.Database.MaxIdleConns,
|
|
ConnMaxLifetime: cfg.Database.ConnMaxLifetime,
|
|
})
|
|
if err != nil {
|
|
logger.Error("failed to connect to database", "error", err)
|
|
os.Exit(1)
|
|
}
|
|
logger.Info("connected to database")
|
|
|
|
// Verify the database connection is actually alive before proceeding.
|
|
if err := dbPool.DB.PingContext(ctx); err != nil {
|
|
logger.Error("database health check failed", "error", err)
|
|
os.Exit(1)
|
|
}
|
|
logger.Info("database health check passed")
|
|
|
|
// Run auth migrations.
|
|
if err := database.RunMigrations(ctx, dbPool, migrationsFS, "migrations"); err != nil {
|
|
logger.Error("failed to run auth migrations", "error", err)
|
|
os.Exit(1)
|
|
}
|
|
logger.Info("auth migrations complete")
|
|
|
|
// Postgres-backed repositories.
|
|
userRepo = postgres.NewUserRepository(dbPool.DB)
|
|
sessionRepo = postgres.NewSessionRepository(dbPool.DB)
|
|
authCodeRepo = postgres.NewAuthCodeRepository(dbPool.DB)
|
|
mediaRepo = postgres.NewMediaObjectRepository(dbPool.DB)
|
|
|
|
// DB-backed queue.
|
|
jobQueue, jobReader = setupDBQueue(ctx, cfg, dbPool, sseHub, logger)
|
|
} else {
|
|
logger.Info("DATABASE_URL not set — running in standalone mode (in-memory queue + in-process AI)")
|
|
userRepo = memory.NewUserRepository(cfg.DevUserEmail, cfg.DevUserPassword)
|
|
sessionRepo = memory.NewSessionRepository()
|
|
authCodeRepo = memory.NewAuthCodeRepository()
|
|
mediaRepo = memory.NewMediaRepository()
|
|
jobQueue, jobReader = setupStandaloneQueue(ctx, mediaStore, albumRepo, sseHub, logger)
|
|
}
|
|
|
|
// Validate required config.
|
|
if cfg.JWTSecret == "" {
|
|
logger.Error("JWT_SECRET must be set (even in development)")
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Load email renderer (HTML templates embedded at build time).
|
|
emailRenderer, err := emailpkg.NewRendererFromFS(componentemail.TemplateFS, "templates", emailpkg.BrandConfig{
|
|
AppName: cfg.AppName,
|
|
AppURL: cfg.AppURL,
|
|
SupportEmail: cfg.SupportEmail,
|
|
LogoURL: cfg.LogoURL,
|
|
PrimaryColor: cfg.BrandColor,
|
|
})
|
|
if err != nil {
|
|
logger.Error("failed to load email templates", "error", err)
|
|
os.Exit(1)
|
|
}
|
|
logger.Info("email renderer loaded", "templates", len(emailRenderer.Purposes()))
|
|
|
|
// Create email sender — notify service in production (NOTIFY_URL set), log-only for dev.
|
|
var emailSender port.EmailSender
|
|
if cfg.NotifyURL != "" {
|
|
notifyClient, err := notify.NewClient(notify.Config{
|
|
URL: cfg.NotifyURL,
|
|
APIKey: cfg.NotifyAPIKey,
|
|
Logger: logger.Logger,
|
|
})
|
|
if err != nil {
|
|
logger.Error("failed to create notify client", "error", err)
|
|
os.Exit(1)
|
|
}
|
|
emailSender = emailadapter.NewNotifySender(notifyClient, emailRenderer, cfg.NotifyHost, cfg.NotifyFrom, logger)
|
|
logger.Info("email sender initialized (notify)", "url", cfg.NotifyURL, "host", cfg.NotifyHost)
|
|
} else {
|
|
emailSender = emailadapter.NewLogSender(logger)
|
|
logger.Info("email sender initialized (log-only dev mode)")
|
|
}
|
|
|
|
// Create services (business logic)
|
|
exampleService := service.NewExampleService(exampleRepo, logger)
|
|
albumService := service.NewAlbumService(albumRepo, jobQueue, logger)
|
|
authService := service.NewAuthService(
|
|
userRepo, sessionRepo, authCodeRepo, emailSender,
|
|
cfg.JWTSecret, cfg.RegistrationEnabled, logger,
|
|
)
|
|
|
|
// Create application
|
|
application := app.New("persona-api", app.WithDefaultPort(8001))
|
|
|
|
// Mount in-memory storage HTTP handler for dev mode
|
|
if memStore, ok := mediaStore.(*storage.MemoryStore); ok {
|
|
application.Router().Handle("/storage/*", memStore)
|
|
}
|
|
|
|
// Register routes with dependency injection
|
|
api.RegisterRoutes(application, &api.Dependencies{
|
|
ExampleService: exampleService,
|
|
AuthService: authService,
|
|
AlbumService: albumService,
|
|
Queue: jobQueue,
|
|
JobReader: jobReader,
|
|
SSEHub: sseHub,
|
|
Store: mediaStore,
|
|
MediaRepo: mediaRepo,
|
|
EmailRenderer: emailRenderer,
|
|
})
|
|
|
|
// Start background cleanup of expired sessions and auth codes.
|
|
go runCleanup(ctx, sessionRepo, authCodeRepo, logger)
|
|
|
|
// Start server
|
|
application.Run()
|
|
}
|
|
|
|
// setupDBQueue initializes the production queue backend using the shared database pool + optional Redis.
|
|
// Returns both Producer (for enqueue) and JobReader (for status polling).
|
|
func setupDBQueue(ctx context.Context, cfg *config.Config, pool *database.Pool, sseHub *realtime.SSEHub, logger *logging.Logger) (queue.Producer, queue.JobReader) {
|
|
if err := queue.RunMigrations(ctx, pool); err != nil {
|
|
logger.Error("failed to run queue migrations", "error", err)
|
|
os.Exit(1)
|
|
}
|
|
logger.Info("queue migrations complete")
|
|
|
|
jobQueue := queue.NewQueue(pool.DB, logger)
|
|
|
|
// Start Redis SSE subscriber if configured.
|
|
if cfg.RedisURL != "" {
|
|
opts, err := redis.ParseURL(cfg.RedisURL)
|
|
if err != nil {
|
|
logger.Error("failed to parse REDIS_URL", "error", err)
|
|
os.Exit(1)
|
|
}
|
|
redisClient := redis.NewClient(opts)
|
|
if err := redisClient.Ping(ctx).Err(); err != nil {
|
|
logger.Error("failed to connect to Redis", "error", err)
|
|
os.Exit(1)
|
|
}
|
|
logger.Info("connected to Redis")
|
|
|
|
go func() {
|
|
if err := realtime.RunSSESubscriber(ctx, redisClient, sseHub, logger.Logger); err != nil {
|
|
logger.Error("SSE Redis subscriber stopped", "error", err)
|
|
}
|
|
}()
|
|
} else {
|
|
logger.Warn("REDIS_URL not set — SSE events from worker will not be delivered")
|
|
}
|
|
|
|
return jobQueue, jobQueue
|
|
}
|
|
|
|
// setupStandaloneQueue initializes an in-memory queue with in-process AI handlers.
|
|
// This mode requires no database or Redis — everything runs in a single process.
|
|
// Returns both Producer (for enqueue) and JobReader (for status polling).
|
|
func setupStandaloneQueue(ctx context.Context, store storage.Store, albumUpdater album.AlbumUpdater, sseHub *realtime.SSEHub, logger *logging.Logger) (queue.Producer, queue.JobReader) {
|
|
memQueue := queue.NewMemoryQueue(logger.Logger)
|
|
|
|
// LocalPublisher delivers events directly to the SSE hub (no Redis needed).
|
|
pub := realtime.NewLocalPublisher(sseHub)
|
|
|
|
// Initialize AI providers
|
|
mediagenManager := initMediagen(ctx, logger)
|
|
textgenManager := initTextgen(ctx, logger)
|
|
|
|
// Register job handlers (same handlers the worker uses).
|
|
if mediagenManager != nil {
|
|
memQueue.RegisterHandler("generate_image", generation.ImageHandler(mediagenManager, store, pub, logger))
|
|
memQueue.RegisterHandler("generate_video", generation.VideoHandler(mediagenManager, store, pub, logger))
|
|
memQueue.RegisterHandler("generate_anchor", album.AnchorHandler(mediagenManager, store, pub, albumUpdater, logger))
|
|
memQueue.RegisterHandler("generate_shot", album.ShotHandler(mediagenManager, store, pub, albumUpdater, logger))
|
|
}
|
|
if textgenManager != nil {
|
|
memQueue.RegisterHandler("generate_text", generation.TextHandler(textgenManager, pub, logger))
|
|
memQueue.RegisterHandler("ai_chat_response", generation.ChatResponseHandler(textgenManager, pub, logger))
|
|
}
|
|
|
|
// Persona generation requires both textgen (5-stage LLM pipeline) and mediagen (20 images + 4 videos).
|
|
if textgenManager != nil && mediagenManager != nil {
|
|
memQueue.RegisterHandler("persona_generate", personagen.QueueHandler(textgenManager, mediagenManager, store, pub, logger.Logger))
|
|
}
|
|
|
|
return memQueue, memQueue
|
|
}
|
|
|
|
// initMediagen creates a mediagen manager from available AI provider credentials.
|
|
func initMediagen(ctx context.Context, logger *logging.Logger) *mediagen.Manager {
|
|
var laozhangMediaProvider *mediagenAdapters.LaoZhangProvider
|
|
var geminiMediaProvider *mediagenAdapters.GeminiProvider
|
|
|
|
if apiKey := os.Getenv("LAOZHANG_API_KEY"); apiKey != "" {
|
|
client, err := laozhang.NewClient(laozhang.Config{
|
|
APIKey: apiKey,
|
|
VideoTimeout: 5 * time.Minute,
|
|
Logger: logger.Logger,
|
|
})
|
|
if err != nil {
|
|
logger.Warn("failed to create LaoZhang client", "error", err)
|
|
} else {
|
|
laozhangMediaProvider = mediagenAdapters.NewLaoZhangProvider(client)
|
|
logger.Info("LaoZhang media provider initialized")
|
|
}
|
|
}
|
|
|
|
if apiKey := os.Getenv("GEMINI_API_KEY"); apiKey != "" {
|
|
client, err := gemini.NewClient(ctx, gemini.Config{
|
|
APIKey: apiKey,
|
|
Logger: logger.Logger,
|
|
})
|
|
if err != nil {
|
|
logger.Warn("failed to create Gemini client", "error", err)
|
|
} else {
|
|
geminiMediaProvider = mediagenAdapters.NewGeminiProvider(client)
|
|
logger.Info("Gemini media provider initialized")
|
|
}
|
|
}
|
|
|
|
if laozhangMediaProvider == nil && geminiMediaProvider == nil {
|
|
logger.Warn("no media generation providers available (set LAOZHANG_API_KEY or GEMINI_API_KEY)")
|
|
return nil
|
|
}
|
|
|
|
mgCfg := mediagen.ProductionConfig(mediagen.ProviderSet{
|
|
LaoZhang: laozhangMediaProvider,
|
|
Gemini: geminiMediaProvider,
|
|
}, mediagen.WithLogger(logger.Logger))
|
|
if laozhangMediaProvider != nil {
|
|
mgCfg.VideoProviders = append(mgCfg.VideoProviders, laozhangMediaProvider)
|
|
}
|
|
if geminiMediaProvider != nil {
|
|
mgCfg.VideoProviders = append(mgCfg.VideoProviders, geminiMediaProvider)
|
|
}
|
|
|
|
mgr, err := mediagen.NewManager(mgCfg)
|
|
if err != nil {
|
|
logger.Warn("failed to create mediagen manager", "error", err)
|
|
return nil
|
|
}
|
|
logger.Info("mediagen manager initialized (image + video)")
|
|
return mgr
|
|
}
|
|
|
|
// initTextgen creates a textgen manager from available AI provider credentials.
|
|
func initTextgen(ctx context.Context, logger *logging.Logger) *textgen.Manager {
|
|
var textProviders []textgen.TextGenerator
|
|
|
|
if apiKey := os.Getenv("LAOZHANG_API_KEY"); apiKey != "" {
|
|
client, err := laozhang.NewClient(laozhang.Config{
|
|
APIKey: apiKey,
|
|
Logger: logger.Logger,
|
|
})
|
|
if err != nil {
|
|
logger.Warn("failed to create LaoZhang text client", "error", err)
|
|
} else {
|
|
textProviders = append(textProviders, textgenAdapters.NewLaoZhangTextProvider(client, ""))
|
|
}
|
|
}
|
|
|
|
if apiKey := os.Getenv("GEMINI_API_KEY"); apiKey != "" {
|
|
provider, err := textgenAdapters.NewGeminiTextProvider(ctx, textgenAdapters.GeminiTextConfig{
|
|
APIKey: apiKey,
|
|
})
|
|
if err != nil {
|
|
logger.Warn("failed to create Gemini text provider", "error", err)
|
|
} else {
|
|
textProviders = append(textProviders, provider)
|
|
}
|
|
}
|
|
|
|
if len(textProviders) == 0 {
|
|
logger.Warn("no text generation providers available")
|
|
return nil
|
|
}
|
|
|
|
tgCfg := textgen.ProductionConfig(textgen.ProviderSet{}, textgen.WithLogger(logger.Logger))
|
|
tgCfg.Providers = textProviders
|
|
|
|
mgr, err := textgen.NewManager(tgCfg)
|
|
if err != nil {
|
|
logger.Warn("failed to create textgen manager", "error", err)
|
|
return nil
|
|
}
|
|
logger.Info("textgen manager initialized")
|
|
return mgr
|
|
}
|
|
|
|
// runCleanup periodically removes expired sessions and auth codes.
|
|
// Runs every hour. Stops when ctx is cancelled.
|
|
func runCleanup(ctx context.Context, sessions port.SessionRepository, codes port.AuthCodeRepository, logger *logging.Logger) {
|
|
ticker := time.NewTicker(1 * time.Hour)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
sessCount, err := sessions.DeleteExpired(ctx)
|
|
if err != nil {
|
|
logger.Warn("failed to cleanup expired sessions", "error", err)
|
|
} else if sessCount > 0 {
|
|
logger.Info("cleaned up expired sessions", "count", sessCount)
|
|
}
|
|
|
|
codeCount, err := codes.DeleteExpired(ctx)
|
|
if err != nil {
|
|
logger.Warn("failed to cleanup expired auth codes", "error", err)
|
|
} else if codeCount > 0 {
|
|
logger.Info("cleaned up expired auth codes", "count", codeCount)
|
|
}
|
|
}
|
|
}
|
|
}
|