package personagen import ( "context" "encoding/json" "fmt" "log/slog" "net/http" "time" "git.threesix.ai/jordan/persona-community-5/pkg/mediagen" "git.threesix.ai/jordan/persona-community-5/pkg/persona" "git.threesix.ai/jordan/persona-community-5/pkg/queue" "git.threesix.ai/jordan/persona-community-5/pkg/realtime" "git.threesix.ai/jordan/persona-community-5/pkg/storage" "git.threesix.ai/jordan/persona-community-5/pkg/textgen" ) // anchorFetchClient is used to download anchor images from storage URLs. var anchorFetchClient = &http.Client{Timeout: 30 * time.Second} // Stage constants matching domain.PersonaStage in the API service. const ( StageSpec = "spec" StageAnchor = "anchor" StageAvatar = "avatar" StageBanner = "banner" StageGalleryBatch = "gallery_batch" StageVideo = "video" ) // Video motion types in generation order. var videoOrder = []persona.MotionType{ persona.MotionSmileReveal, persona.MotionPersonality, persona.MotionLifestyle, persona.MotionInvitation, } // PipelineDeps holds dependencies for the staged pipeline handler. type PipelineDeps struct { TextGen *textgen.Manager MediaGen *mediagen.Manager Store storage.Store Pub realtime.EventPublisher Personas PersonaStore Queue queue.Producer Logger *slog.Logger } // StagedQueueHandler returns a queue.Handler for processing persona_generate jobs // using a stage-based pipeline. Each stage processes one unit of work, updates the // persona row, publishes a job_update SSE event, and enqueues the next stage. // // Job payload: {"persona_id": "...", "stage": "spec|anchor|avatar|banner|gallery_batch|video"} func StagedQueueHandler(deps PipelineDeps) queue.Handler { return func(ctx context.Context, job *queue.Job) error { personaID, _ := job.Payload["persona_id"].(string) if personaID == "" { return fmt.Errorf("missing persona_id in persona_generate job payload") } stage, _ := job.Payload["stage"].(string) if stage == "" { return fmt.Errorf("missing stage in persona_generate job payload") } logger := deps.Logger.With("job_id", job.ID, "persona_id", personaID, "stage", stage) logger.Info("processing persona generation stage") var err error switch stage { case StageSpec: err = handleStageSpec(ctx, deps, personaID, job.ID, logger) case StageAnchor: err = handleStageAnchor(ctx, deps, personaID, job.ID, logger) case StageAvatar: err = handleStageAvatar(ctx, deps, personaID, job.ID, logger) case StageBanner: err = handleStageBanner(ctx, deps, personaID, job.ID, logger) case StageGalleryBatch: err = handleStageGalleryBatch(ctx, deps, personaID, job.ID, logger) case StageVideo: err = handleStageVideo(ctx, deps, personaID, job.ID, logger) default: err = fmt.Errorf("unknown stage: %s", stage) } if err != nil { logger.Error("stage failed", "error", err) publishJobUpdate(deps.Pub, personaID, stage, "error", 0, err.Error()) } return err } } // handleStageSpec generates the persona spec via LLM and updates the persona row. func handleStageSpec(ctx context.Context, deps PipelineDeps, personaID, jobID string, logger *slog.Logger) error { rec, err := deps.Personas.GetByID(ctx, personaID) if err != nil { return fmt.Errorf("load persona: %w", err) } // Update status to generating. rec.Status = "generating" if err := deps.Personas.Update(ctx, rec); err != nil { return fmt.Errorf("update persona status: %w", err) } svc := New(deps.TextGen, deps.MediaGen, deps.Store, logger) seed := SeedParams{ Description: rec.Description, Gender: rec.Gender, Name: rec.Name, } spec, err := svc.GenerateSpec(ctx, seed) if err != nil { rec.Status = "failed" _ = deps.Personas.Update(ctx, rec) return fmt.Errorf("spec generation: %w", err) } // Serialize full spec to JSON for storage. specJSON, err := json.Marshal(spec) if err != nil { return fmt.Errorf("marshal spec: %w", err) } // Update persona with spec data. rec.SpecJSON = specJSON rec.Name = spec.Name.First if spec.Name.Last != "" { rec.Name = spec.Name.First + " " + spec.Name.Last } rec.Handle = generateHandleFromSpec(spec) rec.Tags = extractTags(spec) if err := deps.Personas.Update(ctx, rec); err != nil { return fmt.Errorf("update persona spec: %w", err) } publishJobUpdate(deps.Pub, personaID, StageSpec, "complete", 100, "") // Enqueue anchor stage. if _, err := deps.Queue.Enqueue(ctx, "persona_generate", map[string]any{ "persona_id": personaID, "stage": StageAnchor, }); err != nil { return fmt.Errorf("enqueue anchor stage: %w", err) } logger.Info("spec stage complete, anchor enqueued") return nil } // handleStageAnchor generates the anchor (position 1) image. func handleStageAnchor(ctx context.Context, deps PipelineDeps, personaID, jobID string, logger *slog.Logger) error { rec, err := deps.Personas.GetByID(ctx, personaID) if err != nil { return fmt.Errorf("load persona: %w", err) } spec, err := unmarshalSpec(rec.SpecJSON) if err != nil { return err } svc := New(deps.TextGen, deps.MediaGen, deps.Store, logger) // Generate position 1 (the identity anchor). if err := svc.generatePosition(ctx, spec, 1); err != nil { return fmt.Errorf("anchor generation: %w", err) } // Find position 1 URL. var anchorURL string for _, img := range spec.ImageMatrix { if img.Position == 1 { anchorURL = img.URL break } } rec.AnchorURL = anchorURL rec.ImageURLs = appendIfMissing(rec.ImageURLs, anchorURL) // Update spec JSON with the generated image URL/status. updatedSpec, _ := json.Marshal(spec) rec.SpecJSON = updatedSpec if err := deps.Personas.Update(ctx, rec); err != nil { return fmt.Errorf("update persona anchor: %w", err) } publishJobUpdate(deps.Pub, personaID, StageAnchor, "complete", 100, "") // Enqueue avatar, banner, and gallery_batch in parallel. for _, nextStage := range []string{StageAvatar, StageBanner, StageGalleryBatch} { if _, err := deps.Queue.Enqueue(ctx, "persona_generate", map[string]any{ "persona_id": personaID, "stage": nextStage, }); err != nil { return fmt.Errorf("enqueue %s stage: %w", nextStage, err) } } logger.Info("anchor stage complete, avatar/banner/gallery_batch enqueued") return nil } // handleStageAvatar generates the circular portrait avatar. func handleStageAvatar(ctx context.Context, deps PipelineDeps, personaID, jobID string, logger *slog.Logger) error { rec, err := deps.Personas.GetByID(ctx, personaID) if err != nil { return fmt.Errorf("load persona: %w", err) } spec, err := unmarshalSpec(rec.SpecJSON) if err != nil { return err } svc := New(deps.TextGen, deps.MediaGen, deps.Store, logger) // Restore anchor from storage for identity consistency. if err := restoreAnchor(ctx, svc, deps.Store, spec); err != nil { logger.Warn("could not restore anchor for avatar", "error", err) } avatarBytes, err := svc.GenerateAvatar(ctx, spec) if err != nil { return fmt.Errorf("avatar generation: %w", err) } storagePath := fmt.Sprintf("personas/%s/avatar.png", spec.ID) url, err := deps.Store.Upload(ctx, storagePath, avatarBytes, "image/png") if err != nil { return fmt.Errorf("upload avatar: %w", err) } rec.AvatarURL = url if err := deps.Personas.Update(ctx, rec); err != nil { return fmt.Errorf("update persona avatar: %w", err) } publishJobUpdate(deps.Pub, personaID, StageAvatar, "complete", 100, "") logger.Info("avatar stage complete") return nil } // handleStageBanner generates the 3:1 landscape banner image. func handleStageBanner(ctx context.Context, deps PipelineDeps, personaID, jobID string, logger *slog.Logger) error { rec, err := deps.Personas.GetByID(ctx, personaID) if err != nil { return fmt.Errorf("load persona: %w", err) } spec, err := unmarshalSpec(rec.SpecJSON) if err != nil { return err } svc := New(deps.TextGen, deps.MediaGen, deps.Store, logger) // Restore anchor for identity consistency. if err := restoreAnchor(ctx, svc, deps.Store, spec); err != nil { logger.Warn("could not restore anchor for banner", "error", err) } bannerBytes, err := svc.GenerateBanner(ctx, spec, "lifestyle") if err != nil { return fmt.Errorf("banner generation: %w", err) } storagePath := fmt.Sprintf("personas/%s/banner.png", spec.ID) url, err := deps.Store.Upload(ctx, storagePath, bannerBytes, "image/png") if err != nil { return fmt.Errorf("upload banner: %w", err) } rec.BannerURL = url if err := deps.Personas.Update(ctx, rec); err != nil { return fmt.Errorf("update persona banner: %w", err) } publishJobUpdate(deps.Pub, personaID, StageBanner, "complete", 100, "") logger.Info("banner stage complete") return nil } // handleStageGalleryBatch generates the next 10 unfilled image positions. func handleStageGalleryBatch(ctx context.Context, deps PipelineDeps, personaID, jobID string, logger *slog.Logger) error { rec, err := deps.Personas.GetByID(ctx, personaID) if err != nil { return fmt.Errorf("load persona: %w", err) } spec, err := unmarshalSpec(rec.SpecJSON) if err != nil { return err } svc := New(deps.TextGen, deps.MediaGen, deps.Store, logger) // Restore anchor for identity consistency. if err := restoreAnchor(ctx, svc, deps.Store, spec); err != nil { logger.Warn("could not restore anchor for gallery batch", "error", err) } // Find unfilled positions (skip position 1 which is the anchor, already done). var unfilled []int for _, img := range spec.ImageMatrix { if img.Status != persona.ImageStatusComplete && img.Position != 1 { unfilled = append(unfilled, img.Position) } } // Take up to 10 positions for this batch. batchSize := 10 if len(unfilled) < batchSize { batchSize = len(unfilled) } batch := unfilled[:batchSize] for _, pos := range batch { if err := svc.generatePosition(ctx, spec, pos); err != nil { logger.Error("gallery image generation failed", "position", pos, "error", err) return fmt.Errorf("gallery position %d: %w", pos, err) } // Find the URL for this position. var imgURL string for _, img := range spec.ImageMatrix { if img.Position == pos { imgURL = img.URL break } } rec.ImageURLs = appendIfMissing(rec.ImageURLs, imgURL) } // Update spec JSON with new image URLs/statuses. updatedSpec, _ := json.Marshal(spec) rec.SpecJSON = updatedSpec if err := deps.Personas.Update(ctx, rec); err != nil { return fmt.Errorf("update persona gallery: %w", err) } // Count total completed images (including anchor). completedImages := countCompletedImages(spec) totalImages := len(spec.ImageMatrix) progress := (completedImages * 100) / totalImages publishJobUpdate(deps.Pub, personaID, StageGalleryBatch, "complete", progress, "") // If fewer than totalImages are complete, enqueue another gallery_batch. if completedImages < totalImages { if _, err := deps.Queue.Enqueue(ctx, "persona_generate", map[string]any{ "persona_id": personaID, "stage": StageGalleryBatch, }); err != nil { return fmt.Errorf("enqueue next gallery batch: %w", err) } logger.Info("gallery batch complete, more images needed", "completed", completedImages, "total", totalImages) } else { // All images done, enqueue first video. if _, err := deps.Queue.Enqueue(ctx, "persona_generate", map[string]any{ "persona_id": personaID, "stage": StageVideo, }); err != nil { return fmt.Errorf("enqueue video stage: %w", err) } logger.Info("all gallery images complete, video stage enqueued") } return nil } // handleStageVideo generates the next missing video. func handleStageVideo(ctx context.Context, deps PipelineDeps, personaID, jobID string, logger *slog.Logger) error { rec, err := deps.Personas.GetByID(ctx, personaID) if err != nil { return fmt.Errorf("load persona: %w", err) } spec, err := unmarshalSpec(rec.SpecJSON) if err != nil { return err } svc := New(deps.TextGen, deps.MediaGen, deps.Store, logger) // Restore anchor for identity consistency. if err := restoreAnchor(ctx, svc, deps.Store, spec); err != nil { logger.Warn("could not restore anchor for video", "error", err) } // Find the next missing video in order. completedVideos := len(rec.VideoURLs) if completedVideos >= len(videoOrder) { // All videos done — mark complete. rec.Status = "complete" if err := deps.Personas.Update(ctx, rec); err != nil { return fmt.Errorf("update persona complete: %w", err) } publishJobUpdate(deps.Pub, personaID, StageVideo, "complete", 100, "") logger.Info("all videos complete, persona generation finished") return nil } motionType := videoOrder[completedVideos] logger.Info("generating video", "motion_type", motionType, "index", completedVideos) videoSpec, err := svc.GenerateVideo(ctx, spec, motionType) if err != nil { // Videos are best-effort — log the failure and continue to next. logger.Warn("video generation failed (non-fatal)", "motion_type", motionType, "error", err) publishJobUpdate(deps.Pub, personaID, StageVideo, "error", 0, fmt.Sprintf("%s video failed: %s", motionType, err.Error())) } else { rec.VideoURLs = append(rec.VideoURLs, videoSpec.URL) } // Update spec JSON. updatedSpec, _ := json.Marshal(spec) rec.SpecJSON = updatedSpec if err := deps.Personas.Update(ctx, rec); err != nil { return fmt.Errorf("update persona video: %w", err) } // Check if more videos needed (count by index, not by success). nextIndex := completedVideos + 1 if nextIndex < len(videoOrder) { if _, err := deps.Queue.Enqueue(ctx, "persona_generate", map[string]any{ "persona_id": personaID, "stage": StageVideo, }); err != nil { return fmt.Errorf("enqueue next video: %w", err) } progress := (nextIndex * 100) / len(videoOrder) publishJobUpdate(deps.Pub, personaID, StageVideo, "complete", progress, "") logger.Info("video stage complete, next video enqueued", "completed", nextIndex, "total", len(videoOrder)) } else { // All videos done — mark persona complete. rec.Status = "complete" if err := deps.Personas.Update(ctx, rec); err != nil { return fmt.Errorf("update persona complete: %w", err) } publishJobUpdate(deps.Pub, personaID, StageVideo, "complete", 100, "") logger.Info("all videos complete, persona generation finished") } return nil } // publishJobUpdate sends a job_update SSE event to channel:personas. func publishJobUpdate(pub realtime.EventPublisher, personaID, stage, status string, progress int, errMsg string) { event := &realtime.SSEEvent{ Type: "job_update", Progress: progress, Result: map[string]any{ "persona_id": personaID, "stage": stage, "status": status, "progress": progress, }, } if errMsg != "" { event.Error = errMsg } if err := pub.SendToChannel("channel:personas", event); err != nil { slog.Warn("failed to publish job_update event", "error", err, "persona_id", personaID, "stage", stage) } } // unmarshalSpec deserializes a PersonaSpec from JSON stored on the persona record. func unmarshalSpec(specJSON json.RawMessage) (*persona.PersonaSpec, error) { if len(specJSON) == 0 { return nil, fmt.Errorf("persona has no spec_json (spec stage may not have completed)") } var spec persona.PersonaSpec if err := json.Unmarshal(specJSON, &spec); err != nil { return nil, fmt.Errorf("unmarshal spec: %w", err) } return &spec, nil } // restoreAnchor downloads the anchor image (position 1) from storage and sets it // on the service for identity consistency in subsequent generations. func restoreAnchor(ctx context.Context, svc *Service, store storage.Store, spec *persona.PersonaSpec) error { if store == nil { return fmt.Errorf("no storage configured") } // Find position 1 URL from spec. var anchorURL string for _, img := range spec.ImageMatrix { if img.Position == 1 && img.URL != "" { anchorURL = img.URL break } } if anchorURL == "" { return fmt.Errorf("no anchor URL found in spec") } // Download anchor image from storage URL. anchorBytes, err := storage.FetchURL(ctx, anchorFetchClient, anchorURL, 50<<20) // 50 MB max if err != nil { return fmt.Errorf("fetch anchor image: %w", err) } svc.SetAnchor(anchorBytes) return nil } // generateHandleFromSpec creates a URL-safe handle from the spec name. func generateHandleFromSpec(spec *persona.PersonaSpec) string { name := spec.Name.First if spec.Name.Last != "" { name += " " + spec.Name.Last } return generateHandle(name) } // generateHandle creates a URL-safe handle from a name (same logic as persona-api service). func generateHandle(name string) string { // Reuse the service's handle generation logic. h := name // Simplified: lowercase, replace non-alphanumeric with underscore, trim. result := make([]byte, 0, len(h)) for _, c := range []byte(h) { if (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') { result = append(result, c) } else if c >= 'A' && c <= 'Z' { result = append(result, c+32) // lowercase } else { if len(result) > 0 && result[len(result)-1] != '_' { result = append(result, '_') } } } s := string(result) if len(s) > 40 { s = s[:40] } // Trim trailing underscores. for len(s) > 0 && s[len(s)-1] == '_' { s = s[:len(s)-1] } // Append timestamp suffix. suffix := fmt.Sprintf("_%d", now().UnixMilli()%100000) return s + suffix } // extractTags extracts 8 tags from the spec for the persona row. func extractTags(spec *persona.PersonaSpec) []string { tags := make([]string, 0, 8) // Add gender. if spec.DNA != nil { tags = append(tags, string(spec.DNA.Identity.Gender)) tags = append(tags, string(spec.DNA.Identity.Ethnicity)) } // Add occupation from identity. if spec.Name.First != "" { tags = append(tags, spec.Name.First) } // Add fashion context. if spec.Lifestyle.FashionSense.Primary != "" { tags = append(tags, string(spec.Lifestyle.FashionSense.Primary)) } // Add interests. for _, interest := range spec.Lifestyle.Interests.Creative { if len(tags) >= 8 { break } tags = append(tags, interest) } for _, interest := range spec.Lifestyle.Interests.Active { if len(tags) >= 8 { break } tags = append(tags, interest) } if len(tags) > 8 { tags = tags[:8] } return tags } // countCompletedImages counts how many images have status "complete" in the spec. func countCompletedImages(spec *persona.PersonaSpec) int { count := 0 for _, img := range spec.ImageMatrix { if img.Status == persona.ImageStatusComplete { count++ } } return count } // appendIfMissing appends a string to a slice if it's not already present and not empty. func appendIfMissing(slice []string, s string) []string { if s == "" { return slice } for _, existing := range slice { if existing == s { return slice } } return append(slice, s) }