# Media Pipeline ## Critical Rules - **ALL media operations are async jobs.** Upload, process, generate - everything goes through the job queue. - **NEVER wait synchronously.** POST returns a job ID immediately. Results come via SSE. - **NEVER simulate progress.** Real progress comes from real events. Fake progress is a lie. - **Storage is opaque.** Backend returns URLs. Frontend never constructs storage paths. - **GCS in production, MemoryStore in dev.** When `GCS_BUCKET` env var is set, storage uses GCS. Otherwise, an in-memory store serves files at `/storage/`. ## Architecture Overview ``` ┌─────────────┐ POST /generate/* ┌──────────────┐ enqueue ┌──────────────┐ │ Frontend │ ─────────────────────▶│ Service │ ───────────▶│ CRDB │ │ │ { jobId } (202) │ (thin) │ │ Queue │ │ │ ◀─────────────────────│ │ └──────┬───────┘ │ │ │ │ │ │ │ │ │ dequeue│ │ │ │ │ ▼ │ │ │ │ ┌──────────────┐ │ │ │ │ │ Worker │ │ │ │ │ │ (AI work) │ │ │ │ │ └──────┬───────┘ │ │ │ │ │ │ │ SSE stream │ SSE Hub │ Redis sub │ persist to │ │ ◀─────────────────────│ ◀──────────│─────────────│◀── storage └─────────────┘ └──────────────┘ │ (GCS) └─────┘ ``` ## Storage ### Backend: `pkg/storage/` The storage package provides a `Store` interface with two implementations: | Implementation | When | Env Vars | |---|---|---| | `GCSStore` | `GCS_BUCKET` is set (production, deployed) | `GCS_BUCKET`, `GCS_SERVICE_ACCOUNT_JSON` | | `MemoryStore` | No `GCS_BUCKET` (local dev, standalone) | None | ```go type Store interface { Upload(ctx context.Context, path string, data []byte, contentType string) (string, error) UploadPresigned(ctx context.Context, path string, contentType string) (*PresignedUpload, error) GetURL(ctx context.Context, path string) (string, error) Delete(ctx context.Context, path string) error List(ctx context.Context, prefix string) ([]MediaObject, error) } ``` ### Initialization (service main.go) Storage is initialized early in `main()` — before the queue, since standalone queue handlers need it: ```go var mediaStore storage.Store if bucket := os.Getenv("GCS_BUCKET"); bucket != "" { mediaStore, _ = storage.NewGCSStore(bucket, os.Getenv("GCS_SERVICE_ACCOUNT_JSON"), logger) } else { memStore := storage.NewMemoryStore("http://localhost:" + port + "/storage") mediaStore = memStore // Mount memStore.ServeHTTP at /storage/* for dev mode } ``` ### Object Path Convention All media is stored under `media/{userID}/`: - Generated images: `media/{userID}/images/{jobID}_{index}.png` - Generated videos: `media/{userID}/videos/{jobID}_{index}.mp4` - Uploads: `media/{userID}/{uuid}/{filename}` ### Generation Auto-Persist Image and video generation handlers accept a `storage.Store`. When non-nil, generated results are automatically persisted and SSE events contain permanent URLs instead of temporary provider URLs or base64. ```go generation.ImageHandler(mediagenManager, store, pub, logger) generation.VideoHandler(mediagenManager, store, pub, logger) ``` ## Upload Flow (Presigned URL) ``` Frontend Backend Storage (GCS/Memory) │ │ │ │ POST /media/upload/init │ │ │ {filename, contentType} │ │ │──────────────────────────▶│ │ │ │ UploadPresigned() │ │ │─────────────────────────────▶│ │ {uploadURL, objectPath} │ │ │◀──────────────────────────│ │ │ │ │ │ PUT uploadURL (file body) │ │ │─────────────────────────────────────────────────────────▶│ │ 200 OK │ │ │◀─────────────────────────────────────────────────────────│ │ │ │ │ POST /media/upload/complete│ │ │ {objectPath} │ │ │──────────────────────────▶│ GetURL() │ │ │─────────────────────────────▶│ │ {url, path} │ │ │◀──────────────────────────│ │ ``` ### Frontend Hook ```typescript import { useMediaUpload } from '@project/realtime'; const { upload, isUploading, progress, error, reset } = useMediaUpload({ apiPrefix: '', serviceName: 'example-api', headers: { Authorization: `Bearer ${token}` }, }); // Upload a file const result = await upload(file); // { url, path } ``` ## Media Library ### Endpoints | Method | Path | Description | |--------|------|-------------| | `POST` | `/media/upload/init` | Get presigned upload URL | | `POST` | `/media/upload/complete` | Confirm upload, get final URL | | `GET` | `/media` | List user's media (optional `?prefix=images`) | | `DELETE` | `/media/{path...}` | Delete a media object | All endpoints require authentication. ### Frontend Components ```tsx import { MediaUploader, MediaLibrary } from '@project/ui'; // Upload drop zone refetchMedia()} /> // Media grid with preview deleteMedia(path)} /> ``` ## Event Reference | Event | When | Payload | |-------|------|---------| | `generation_started` | Generation begins | `{ jobId }` | | `generation_progress` | Progress update | `{ jobId, progress, message }` | | `generation_complete` | Generation done (URLs are persistent) | `{ jobId, result }` | | `generation_failed` | Error occurred | `{ jobId, error }` | | `upload_started` | Upload job begins | `{ jobId }` | | `upload_progress` | Chunk uploaded | `{ jobId, progress }` | | `upload_complete` | Upload done | `{ jobId, result: { url, path } }` | | `upload_failed` | Error occurred | `{ jobId, error }` | ## Common Mistakes ### DON'T: Construct storage URLs ```typescript // WRONG const url = `/storage/uploads/${userId}/${fileId}.jpg`; ``` ### DO: Use URLs from backend ```typescript // CORRECT const url = media.url; // Backend provides complete URL ``` ### DON'T: Proxy uploads through the backend ```typescript // WRONG - wastes backend bandwidth await fetch('/api/upload', { body: file }); ``` ### DO: Upload directly to storage via presigned URL ```typescript // CORRECT - frontend uploads directly to GCS const { uploadURL } = await initUpload(file); await fetch(uploadURL, { method: 'PUT', body: file }); ```