8.6 KiB
8.6 KiB
Media Pipeline
Critical Rules
- ALL media operations are async jobs. Upload, process, generate - everything goes through the job queue.
- NEVER wait synchronously. POST returns a job ID immediately. Results come via SSE.
- NEVER simulate progress. Real progress comes from real events. Fake progress is a lie.
- Storage is opaque. Backend returns URLs. Frontend never constructs storage paths.
- GCS in production, MemoryStore in dev. When
GCS_BUCKETenv var is set, storage uses GCS. Otherwise, an in-memory store serves files at/storage/.
Architecture Overview
┌─────────────┐ POST /generate/* ┌──────────────┐ enqueue ┌──────────────┐
│ Frontend │ ─────────────────────▶│ Service │ ───────────▶│ CRDB │
│ │ { jobId } (202) │ (thin) │ │ Queue │
│ │ ◀─────────────────────│ │ └──────┬───────┘
│ │ │ │ │
│ │ │ │ dequeue│
│ │ │ │ ▼
│ │ │ │ ┌──────────────┐
│ │ │ │ │ Worker │
│ │ │ │ │ (AI work) │
│ │ │ │ └──────┬───────┘
│ │ │ │ │
│ │ SSE stream │ SSE Hub │ Redis sub │ persist to
│ │ ◀─────────────────────│ ◀──────────│─────────────│◀── storage
└─────────────┘ └──────────────┘ │ (GCS)
└─────┘
Storage
Backend: pkg/storage/
The storage package provides a Store interface with two implementations:
| Implementation | When | Env Vars |
|---|---|---|
GCSStore |
GCS_BUCKET is set (production, deployed) |
GCS_BUCKET, GCS_SERVICE_ACCOUNT_JSON |
MemoryStore |
No GCS_BUCKET (local dev, standalone) |
None |
type Store interface {
Upload(ctx context.Context, path string, data []byte, contentType string) (string, error)
UploadPresigned(ctx context.Context, path string, contentType string) (*PresignedUpload, error)
GetURL(ctx context.Context, path string) (string, error)
Delete(ctx context.Context, path string) error
List(ctx context.Context, prefix string) ([]MediaObject, error)
}
Initialization (service main.go)
Storage is initialized early in main() — before the queue, since standalone queue handlers need it:
var mediaStore storage.Store
if bucket := os.Getenv("GCS_BUCKET"); bucket != "" {
mediaStore, _ = storage.NewGCSStore(bucket, os.Getenv("GCS_SERVICE_ACCOUNT_JSON"), logger)
} else {
memStore := storage.NewMemoryStore("http://localhost:" + port + "/storage")
mediaStore = memStore
// Mount memStore.ServeHTTP at /storage/* for dev mode
}
Object Path Convention
All media is stored under media/{userID}/:
- Generated images:
media/{userID}/images/{jobID}_{index}.png - Generated videos:
media/{userID}/videos/{jobID}_{index}.mp4 - Uploads:
media/{userID}/{uuid}/{filename}
Generation Auto-Persist
Image and video generation handlers accept a storage.Store. When non-nil, generated results are automatically persisted and SSE events contain permanent URLs instead of temporary provider URLs or base64.
generation.ImageHandler(mediagenManager, store, pub, logger)
generation.VideoHandler(mediagenManager, store, pub, logger)
Upload Flow (Presigned URL)
Frontend Backend Storage (GCS/Memory)
│ │ │
│ POST /media/upload/init │ │
│ {filename, contentType} │ │
│──────────────────────────▶│ │
│ │ UploadPresigned() │
│ │─────────────────────────────▶│
│ {uploadURL, objectPath} │ │
│◀──────────────────────────│ │
│ │ │
│ PUT uploadURL (file body) │ │
│─────────────────────────────────────────────────────────▶│
│ 200 OK │ │
│◀─────────────────────────────────────────────────────────│
│ │ │
│ POST /media/upload/complete│ │
│ {objectPath} │ │
│──────────────────────────▶│ GetURL() │
│ │─────────────────────────────▶│
│ {url, path} │ │
│◀──────────────────────────│ │
Frontend Hook
import { useMediaUpload } from '@project/realtime';
const { upload, isUploading, progress, error, reset } = useMediaUpload({
apiPrefix: '',
serviceName: 'example-api',
headers: { Authorization: `Bearer ${token}` },
});
// Upload a file
const result = await upload(file); // { url, path }
Media Library
Endpoints
| Method | Path | Description |
|---|---|---|
POST |
/media/upload/init |
Get presigned upload URL |
POST |
/media/upload/complete |
Confirm upload, get final URL |
GET |
/media |
List user's media (optional ?prefix=images) |
DELETE |
/media/{path...} |
Delete a media object |
All endpoints require authentication.
Frontend Components
import { MediaUploader, MediaLibrary } from '@project/ui';
// Upload drop zone
<MediaUploader
upload={mediaUpload.upload}
isUploading={mediaUpload.isUploading}
progress={mediaUpload.progress}
onUploadComplete={() => refetchMedia()}
/>
// Media grid with preview
<MediaLibrary
items={items}
onDelete={(path) => deleteMedia(path)}
/>
Event Reference
| Event | When | Payload |
|---|---|---|
generation_started |
Generation begins | { jobId } |
generation_progress |
Progress update | { jobId, progress, message } |
generation_complete |
Generation done (URLs are persistent) | { jobId, result } |
generation_failed |
Error occurred | { jobId, error } |
upload_started |
Upload job begins | { jobId } |
upload_progress |
Chunk uploaded | { jobId, progress } |
upload_complete |
Upload done | { jobId, result: { url, path } } |
upload_failed |
Error occurred | { jobId, error } |
Common Mistakes
DON'T: Construct storage URLs
// WRONG
const url = `/storage/uploads/${userId}/${fileId}.jpg`;
DO: Use URLs from backend
// CORRECT
const url = media.url; // Backend provides complete URL
DON'T: Proxy uploads through the backend
// WRONG - wastes backend bandwidth
await fetch('/api/upload', { body: file });
DO: Upload directly to storage via presigned URL
// CORRECT - frontend uploads directly to GCS
const { uploadURL } = await initUpload(file);
await fetch(uploadURL, { method: 'PUT', body: file });