persona-community-1/pkg/album/handler.go
jordan 4004f88f4a
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
ci/woodpecker/manual/woodpecker Pipeline was successful
Initialize project from skeleton template
2026-02-23 10:20:59 +00:00

266 lines
8.6 KiB
Go

package album
import (
"context"
"fmt"
"io"
"log/slog"
"net/http"
"time"
"git.threesix.ai/jordan/persona-community-1/pkg/logging"
"git.threesix.ai/jordan/persona-community-1/pkg/mediagen"
"git.threesix.ai/jordan/persona-community-1/pkg/queue"
"git.threesix.ai/jordan/persona-community-1/pkg/realtime"
"git.threesix.ai/jordan/persona-community-1/pkg/storage"
)
// httpClient fetches anchor images at job execution time.
// 30-second timeout is sufficient for public storage URLs.
var httpClient = &http.Client{Timeout: 30 * time.Second}
// sendAlbumEvent sends an SSE event to the user channel and logs failures at warn level.
func sendAlbumEvent(pub realtime.EventPublisher, userID string, eventType string, result any) {
event := &realtime.SSEEvent{
Type: eventType,
Result: result,
}
if err := pub.SendToUser(userID, event); err != nil {
slog.Warn("failed to send album SSE event", "error", err, "type", eventType)
}
}
// AnchorHandler returns a queue.Handler that generates the anchor image for an album.
// The anchor is generated from the subject description alone (no reference image).
// On success it persists the URL and emits album_anchor_complete via SSE.
// On failure it emits album_anchor_failed via SSE.
//
// Job type: "generate_anchor"
func AnchorHandler(mg *mediagen.Manager, store storage.Store, pub realtime.EventPublisher, updater AlbumUpdater, logger *logging.Logger) queue.Handler {
return func(ctx context.Context, job *queue.Job) error {
albumID, _ := job.Payload["albumId"].(string)
userID, _ := job.Payload["userId"].(string)
subjectDesc, _ := job.Payload["subjectDesc"].(string)
if albumID == "" || userID == "" {
return fmt.Errorf("generate_anchor: missing albumId or userId in payload")
}
start := time.Now()
resp, err := mg.GenerateImage(ctx, mediagen.ImageRequest{
Prompt: subjectDesc,
Count: 1,
})
elapsed := time.Since(start)
if err != nil {
logger.Error("anchor generation failed", "error", err, "job_id", job.ID, "album_id", albumID)
sendAlbumEvent(pub, userID, EventAlbumAnchorFailed, AlbumAnchorFailedData{
AlbumID: albumID,
Error: "Anchor generation failed: " + err.Error(),
})
return err
}
if len(resp.Images) == 0 {
err = fmt.Errorf("no images returned from provider")
logger.Error("anchor generation returned no images", "job_id", job.ID, "album_id", albumID)
sendAlbumEvent(pub, userID, EventAlbumAnchorFailed, AlbumAnchorFailedData{
AlbumID: albumID,
Error: err.Error(),
})
return err
}
img := resp.Images[0]
// Persist anchor to storage.
anchorURL := img.URL
if store != nil && len(img.Data) > 0 {
storagePath := fmt.Sprintf("albums/%s/%s/anchor.png", userID, albumID)
url, uploadErr := store.Upload(ctx, storagePath, img.Data, "image/png")
if uploadErr != nil {
logger.Warn("failed to persist anchor to storage, using inline URL", "error", uploadErr, "job_id", job.ID)
} else {
anchorURL = url
}
}
if anchorURL == "" {
err = fmt.Errorf("anchor has no URL after generation")
sendAlbumEvent(pub, userID, EventAlbumAnchorFailed, AlbumAnchorFailedData{
AlbumID: albumID,
Error: err.Error(),
})
return err
}
// Persist anchor URL to the album repository.
if err := updater.UpdateAnchor(ctx, AlbumID(albumID), userID, anchorURL, job.ID); err != nil {
logger.Error("failed to persist anchor URL", "error", err, "job_id", job.ID, "album_id", albumID)
sendAlbumEvent(pub, userID, EventAlbumAnchorFailed, AlbumAnchorFailedData{
AlbumID: albumID,
Error: "Failed to save anchor: " + err.Error(),
})
return err
}
logger.Info("anchor generation complete",
"job_id", job.ID, "album_id", albumID,
"provider", resp.Provider, "elapsed", elapsed)
sendAlbumEvent(pub, userID, EventAlbumAnchorComplete, AlbumAnchorCompleteData{
AlbumID: albumID,
AnchorURL: anchorURL,
})
return nil
}
}
// ShotHandler returns a queue.Handler that generates a single shot for an album.
// The anchor image bytes are fetched from AnchorURL at execution time.
// On success it persists the URL and emits album_shot_complete via SSE.
// On failure it emits album_shot_failed via SSE.
//
// Job type: "generate_shot"
func ShotHandler(mg *mediagen.Manager, store storage.Store, pub realtime.EventPublisher, updater AlbumUpdater, logger *logging.Logger) queue.Handler {
return func(ctx context.Context, job *queue.Job) error {
albumID, _ := job.Payload["albumId"].(string)
userID, _ := job.Payload["userId"].(string)
anchorURL, _ := job.Payload["anchorUrl"].(string)
subjectDesc, _ := job.Payload["subjectDesc"].(string)
direction, _ := job.Payload["direction"].(string)
shotIndex := 0
if si, ok := job.Payload["shotIndex"].(float64); ok {
shotIndex = int(si)
}
if albumID == "" || userID == "" {
return fmt.Errorf("generate_shot: missing albumId or userId in payload")
}
// Emit shot-generating event so the frontend shows a shimmer immediately.
sendAlbumEvent(pub, userID, EventAlbumShotGenerating, map[string]any{
"albumId": albumID,
"shotIndex": shotIndex,
})
// Fetch anchor image bytes from storage.
var anchorBytes []byte
var anchorMime string
if anchorURL != "" {
data, err := fetchBytes(ctx, anchorURL)
if err != nil {
logger.Warn("failed to fetch anchor image, proceeding without reference",
"error", err, "job_id", job.ID, "anchor_url", anchorURL)
} else {
anchorBytes = data
anchorMime = "image/png"
}
}
// Build prompt: subject description + shot direction.
prompt := subjectDesc
if direction != "" {
prompt = subjectDesc + ", " + direction
}
imageReq := mediagen.ImageRequest{
Prompt: prompt,
Count: 1,
}
if len(anchorBytes) > 0 {
imageReq.ReferenceImage = anchorBytes
imageReq.ReferenceMime = anchorMime
}
start := time.Now()
resp, err := mg.GenerateImage(ctx, imageReq)
elapsed := time.Since(start)
if err != nil {
logger.Error("shot generation failed",
"error", err, "job_id", job.ID, "album_id", albumID, "shot_index", shotIndex)
_ = updater.UpdateShot(ctx, AlbumID(albumID), userID, shotIndex, "", ShotFailed, err.Error())
sendAlbumEvent(pub, userID, EventAlbumShotFailed, AlbumShotFailedData{
AlbumID: albumID,
ShotIndex: shotIndex,
Error: "Shot generation failed: " + err.Error(),
})
return err
}
if len(resp.Images) == 0 {
errMsg := "no images returned from provider"
_ = updater.UpdateShot(ctx, AlbumID(albumID), userID, shotIndex, "", ShotFailed, errMsg)
sendAlbumEvent(pub, userID, EventAlbumShotFailed, AlbumShotFailedData{
AlbumID: albumID,
ShotIndex: shotIndex,
Error: errMsg,
})
return fmt.Errorf(errMsg)
}
img := resp.Images[0]
// Persist shot image to storage.
imageURL := img.URL
if store != nil && len(img.Data) > 0 {
storagePath := fmt.Sprintf("albums/%s/%s/shots/%d.png", userID, albumID, shotIndex)
url, uploadErr := store.Upload(ctx, storagePath, img.Data, "image/png")
if uploadErr != nil {
logger.Warn("failed to persist shot to storage, using inline URL",
"error", uploadErr, "job_id", job.ID)
} else {
imageURL = url
}
}
// Persist shot URL to the album repository.
if err := updater.UpdateShot(ctx, AlbumID(albumID), userID, shotIndex, imageURL, ShotComplete, ""); err != nil {
logger.Error("failed to persist shot URL",
"error", err, "job_id", job.ID, "album_id", albumID, "shot_index", shotIndex)
sendAlbumEvent(pub, userID, EventAlbumShotFailed, AlbumShotFailedData{
AlbumID: albumID,
ShotIndex: shotIndex,
Error: "Failed to save shot: " + err.Error(),
})
return err
}
logger.Info("shot generation complete",
"job_id", job.ID, "album_id", albumID,
"shot_index", shotIndex, "provider", resp.Provider, "elapsed", elapsed)
sendAlbumEvent(pub, userID, EventAlbumShotComplete, AlbumShotCompleteData{
AlbumID: albumID,
ShotIndex: shotIndex,
ImageURL: imageURL,
})
return nil
}
}
// fetchBytes downloads raw bytes from a URL.
// Used to load anchor images at shot-generation time.
func fetchBytes(ctx context.Context, url string) ([]byte, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, fmt.Errorf("create request: %w", err)
}
resp, err := httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("fetch: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("fetch: HTTP %d", resp.StatusCode)
}
const maxSize = 20 << 20 // 20 MB limit for anchor images
data, err := io.ReadAll(io.LimitReader(resp.Body, maxSize))
if err != nil {
return nil, fmt.Errorf("read: %w", err)
}
return data, nil
}