201 lines
8.6 KiB
Markdown
201 lines
8.6 KiB
Markdown
# Media Pipeline
|
|
|
|
## Critical Rules
|
|
|
|
- **ALL media operations are async jobs.** Upload, process, generate - everything goes through the job queue.
|
|
- **NEVER wait synchronously.** POST returns a job ID immediately. Results come via SSE.
|
|
- **NEVER simulate progress.** Real progress comes from real events. Fake progress is a lie.
|
|
- **Storage is opaque.** Backend returns URLs. Frontend never constructs storage paths.
|
|
- **GCS in production, MemoryStore in dev.** When `GCS_BUCKET` env var is set, storage uses GCS. Otherwise, an in-memory store serves files at `/storage/`.
|
|
|
|
## Architecture Overview
|
|
|
|
```
|
|
┌─────────────┐ POST /generate/* ┌──────────────┐ enqueue ┌──────────────┐
|
|
│ Frontend │ ─────────────────────▶│ Service │ ───────────▶│ CRDB │
|
|
│ │ { jobId } (202) │ (thin) │ │ Queue │
|
|
│ │ ◀─────────────────────│ │ └──────┬───────┘
|
|
│ │ │ │ │
|
|
│ │ │ │ dequeue│
|
|
│ │ │ │ ▼
|
|
│ │ │ │ ┌──────────────┐
|
|
│ │ │ │ │ Worker │
|
|
│ │ │ │ │ (AI work) │
|
|
│ │ │ │ └──────┬───────┘
|
|
│ │ │ │ │
|
|
│ │ SSE stream │ SSE Hub │ Redis sub │ persist to
|
|
│ │ ◀─────────────────────│ ◀──────────│─────────────│◀── storage
|
|
└─────────────┘ └──────────────┘ │ (GCS)
|
|
└─────┘
|
|
```
|
|
|
|
## Storage
|
|
|
|
### Backend: `pkg/storage/`
|
|
|
|
The storage package provides a `Store` interface with two implementations:
|
|
|
|
| Implementation | When | Env Vars |
|
|
|---|---|---|
|
|
| `GCSStore` | `GCS_BUCKET` is set (production, deployed) | `GCS_BUCKET`, `GCS_SERVICE_ACCOUNT_JSON` |
|
|
| `MemoryStore` | No `GCS_BUCKET` (local dev, standalone) | None |
|
|
|
|
```go
|
|
type Store interface {
|
|
Upload(ctx context.Context, path string, data []byte, contentType string) (string, error)
|
|
UploadPresigned(ctx context.Context, path string, contentType string) (*PresignedUpload, error)
|
|
GetURL(ctx context.Context, path string) (string, error)
|
|
Delete(ctx context.Context, path string) error
|
|
List(ctx context.Context, prefix string) ([]MediaObject, error)
|
|
}
|
|
```
|
|
|
|
### Initialization (service main.go)
|
|
|
|
Storage is initialized early in `main()` — before the queue, since standalone queue handlers need it:
|
|
|
|
```go
|
|
var mediaStore storage.Store
|
|
if bucket := os.Getenv("GCS_BUCKET"); bucket != "" {
|
|
mediaStore, _ = storage.NewGCSStore(bucket, os.Getenv("GCS_SERVICE_ACCOUNT_JSON"), logger)
|
|
} else {
|
|
memStore := storage.NewMemoryStore("http://localhost:" + port + "/storage")
|
|
mediaStore = memStore
|
|
// Mount memStore.ServeHTTP at /storage/* for dev mode
|
|
}
|
|
```
|
|
|
|
### Object Path Convention
|
|
|
|
All media is stored under `media/{userID}/`:
|
|
- Generated images: `media/{userID}/images/{jobID}_{index}.png`
|
|
- Generated videos: `media/{userID}/videos/{jobID}_{index}.mp4`
|
|
- Uploads: `media/{userID}/{uuid}/{filename}`
|
|
|
|
### Generation Auto-Persist
|
|
|
|
Image and video generation handlers accept a `storage.Store`. When non-nil, generated results are automatically persisted and SSE events contain permanent URLs instead of temporary provider URLs or base64.
|
|
|
|
```go
|
|
generation.ImageHandler(mediagenManager, store, pub, logger)
|
|
generation.VideoHandler(mediagenManager, store, pub, logger)
|
|
```
|
|
|
|
## Upload Flow (Presigned URL)
|
|
|
|
```
|
|
Frontend Backend Storage (GCS/Memory)
|
|
│ │ │
|
|
│ POST /media/upload/init │ │
|
|
│ {filename, contentType} │ │
|
|
│──────────────────────────▶│ │
|
|
│ │ UploadPresigned() │
|
|
│ │─────────────────────────────▶│
|
|
│ {uploadURL, objectPath} │ │
|
|
│◀──────────────────────────│ │
|
|
│ │ │
|
|
│ PUT uploadURL (file body) │ │
|
|
│─────────────────────────────────────────────────────────▶│
|
|
│ 200 OK │ │
|
|
│◀─────────────────────────────────────────────────────────│
|
|
│ │ │
|
|
│ POST /media/upload/complete│ │
|
|
│ {objectPath} │ │
|
|
│──────────────────────────▶│ GetURL() │
|
|
│ │─────────────────────────────▶│
|
|
│ {url, path} │ │
|
|
│◀──────────────────────────│ │
|
|
```
|
|
|
|
### Frontend Hook
|
|
|
|
```typescript
|
|
import { useMediaUpload } from '@project/realtime';
|
|
|
|
const { upload, isUploading, progress, error, reset } = useMediaUpload({
|
|
apiPrefix: '',
|
|
serviceName: 'example-api',
|
|
headers: { Authorization: `Bearer ${token}` },
|
|
});
|
|
|
|
// Upload a file
|
|
const result = await upload(file); // { url, path }
|
|
```
|
|
|
|
## Media Library
|
|
|
|
### Endpoints
|
|
|
|
| Method | Path | Description |
|
|
|--------|------|-------------|
|
|
| `POST` | `/media/upload/init` | Get presigned upload URL |
|
|
| `POST` | `/media/upload/complete` | Confirm upload, get final URL |
|
|
| `GET` | `/media` | List user's media (optional `?prefix=images`) |
|
|
| `DELETE` | `/media/{path...}` | Delete a media object |
|
|
|
|
All endpoints require authentication.
|
|
|
|
### Frontend Components
|
|
|
|
```tsx
|
|
import { MediaUploader, MediaLibrary } from '@project/ui';
|
|
|
|
// Upload drop zone
|
|
<MediaUploader
|
|
upload={mediaUpload.upload}
|
|
isUploading={mediaUpload.isUploading}
|
|
progress={mediaUpload.progress}
|
|
onUploadComplete={() => refetchMedia()}
|
|
/>
|
|
|
|
// Media grid with preview
|
|
<MediaLibrary
|
|
items={items}
|
|
onDelete={(path) => deleteMedia(path)}
|
|
/>
|
|
```
|
|
|
|
## Event Reference
|
|
|
|
| Event | When | Payload |
|
|
|-------|------|---------|
|
|
| `generation_started` | Generation begins | `{ jobId }` |
|
|
| `generation_progress` | Progress update | `{ jobId, progress, message }` |
|
|
| `generation_complete` | Generation done (URLs are persistent) | `{ jobId, result }` |
|
|
| `generation_failed` | Error occurred | `{ jobId, error }` |
|
|
| `upload_started` | Upload job begins | `{ jobId }` |
|
|
| `upload_progress` | Chunk uploaded | `{ jobId, progress }` |
|
|
| `upload_complete` | Upload done | `{ jobId, result: { url, path } }` |
|
|
| `upload_failed` | Error occurred | `{ jobId, error }` |
|
|
|
|
## Common Mistakes
|
|
|
|
### DON'T: Construct storage URLs
|
|
|
|
```typescript
|
|
// WRONG
|
|
const url = `/storage/uploads/${userId}/${fileId}.jpg`;
|
|
```
|
|
|
|
### DO: Use URLs from backend
|
|
|
|
```typescript
|
|
// CORRECT
|
|
const url = media.url; // Backend provides complete URL
|
|
```
|
|
|
|
### DON'T: Proxy uploads through the backend
|
|
|
|
```typescript
|
|
// WRONG - wastes backend bandwidth
|
|
await fetch('/api/upload', { body: file });
|
|
```
|
|
|
|
### DO: Upload directly to storage via presigned URL
|
|
|
|
```typescript
|
|
// CORRECT - frontend uploads directly to GCS
|
|
const { uploadURL } = await initUpload(file);
|
|
await fetch(uploadURL, { method: 'PUT', body: file });
|
|
```
|