rdev/internal/adapter/citadel/client.go
jordan a8c8a0a14d
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
feat: add GCS-based persistent media storage, AI generation pipeline, and composable skeleton packages
Adds complete media storage pipeline with GCS presigned uploads, AI image/video/text generation
via queue-based workers, realtime SSE event streaming, and comprehensive skeleton packages
(storage, mediagen, textgen, generation, realtime, persona, routing, ai-client). Includes
security fixes for media delete authorization, nil pointer guards in handlers, video persistence
via download-then-upload, consistent signed URLs, and Image→ImageIcon rename to avoid DOM collision.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 21:29:09 -07:00

245 lines
6.6 KiB
Go

// Package citadel provides an HTTP client adapter for the Citadel logging platform.
//
// This adapter communicates with a partner-hosted Citadel instance
// (e.g., citadel-staging.orchard9.ai) to manage environments and ship logs.
package citadel
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"time"
"github.com/orchard9/rdev/internal/port"
)
// Client implements port.CitadelClient via HTTP.
type Client struct {
baseURL string
apiKey string
httpClient *http.Client
logger *slog.Logger
}
// Ensure Client implements port.CitadelClient.
var _ port.CitadelClient = (*Client)(nil)
// Config holds configuration for the Citadel client.
type Config struct {
// URL is the base URL of the Citadel instance (e.g., "https://citadel-staging.orchard9.ai").
URL string
// APIKey is the API key for authentication (starts with "ck_live_" or "ck_dev_").
APIKey string
}
// NewClient creates a new Citadel HTTP client.
func NewClient(cfg Config, logger *slog.Logger) *Client {
if logger == nil {
logger = slog.Default()
}
return &Client{
baseURL: cfg.URL,
apiKey: cfg.APIKey,
httpClient: &http.Client{
Timeout: 30 * time.Second,
},
logger: logger.With("component", "citadel_client"),
}
}
// CreateEnvironment creates a new Citadel environment.
func (c *Client) CreateEnvironment(ctx context.Context, name string) (*port.CitadelEnvironment, error) {
body := map[string]string{"name": name}
data, err := json.Marshal(body)
if err != nil {
return nil, fmt.Errorf("marshal request: %w", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/api/v1/environments", bytes.NewReader(data))
if err != nil {
return nil, fmt.Errorf("create request: %w", err)
}
c.setHeaders(req)
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("create environment request failed: %w", err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode == http.StatusConflict {
// Environment already exists — fetch and return it
c.logger.Info("citadel environment already exists, fetching", "name", name)
return c.GetEnvironment(ctx, name)
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, c.readError(resp)
}
var result struct {
TenantID string `json:"tenant_id"`
Name string `json:"name"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("decode response: %w", err)
}
c.logger.Info("citadel environment created", "name", name, "tenant_id", result.TenantID)
return &port.CitadelEnvironment{
TenantID: result.TenantID,
Name: result.Name,
}, nil
}
// DeleteEnvironment removes a Citadel environment.
func (c *Client) DeleteEnvironment(ctx context.Context, tenantID string) error {
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, c.baseURL+"/api/v1/environments/"+tenantID, nil)
if err != nil {
return fmt.Errorf("create request: %w", err)
}
c.setHeaders(req)
resp, err := c.httpClient.Do(req)
if err != nil {
return fmt.Errorf("delete environment request failed: %w", err)
}
defer func() { _ = resp.Body.Close() }()
// 404 is fine — environment may already be gone
if resp.StatusCode == http.StatusNotFound {
return nil
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return c.readError(resp)
}
c.logger.Info("citadel environment deleted", "tenant_id", tenantID)
return nil
}
// GetEnvironment returns an environment by name.
func (c *Client) GetEnvironment(ctx context.Context, name string) (*port.CitadelEnvironment, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+"/api/v1/environments?name="+name, nil)
if err != nil {
return nil, fmt.Errorf("create request: %w", err)
}
c.setHeaders(req)
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("get environment request failed: %w", err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode == http.StatusNotFound {
return nil, nil
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, c.readError(resp)
}
var result struct {
TenantID string `json:"tenant_id"`
Name string `json:"name"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("decode response: %w", err)
}
return &port.CitadelEnvironment{
TenantID: result.TenantID,
Name: result.Name,
}, nil
}
// IngestEvent sends a single log event to Citadel.
func (c *Client) IngestEvent(ctx context.Context, tenantID string, event map[string]any) error {
data, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("marshal event: %w", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/api/v1/ingest/event", bytes.NewReader(data))
if err != nil {
return fmt.Errorf("create request: %w", err)
}
c.setHeaders(req)
req.Header.Set("X-Tenant-ID", tenantID)
resp, err := c.httpClient.Do(req)
if err != nil {
return fmt.Errorf("ingest event request failed: %w", err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return c.readError(resp)
}
return nil
}
// IngestBatch sends a batch of log events to Citadel.
func (c *Client) IngestBatch(ctx context.Context, tenantID string, events []map[string]any) error {
data, err := json.Marshal(events)
if err != nil {
return fmt.Errorf("marshal events: %w", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/api/v1/ingest", bytes.NewReader(data))
if err != nil {
return fmt.Errorf("create request: %w", err)
}
c.setHeaders(req)
req.Header.Set("X-Tenant-ID", tenantID)
resp, err := c.httpClient.Do(req)
if err != nil {
return fmt.Errorf("ingest batch request failed: %w", err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return c.readError(resp)
}
return nil
}
// Healthy returns true if the Citadel instance is reachable.
func (c *Client) Healthy(ctx context.Context) bool {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+"/health", nil)
if err != nil {
return false
}
resp, err := c.httpClient.Do(req)
if err != nil {
return false
}
defer func() { _ = resp.Body.Close() }()
return resp.StatusCode == http.StatusOK
}
func (c *Client) setHeaders(req *http.Request) {
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+c.apiKey)
}
func (c *Client) readError(resp *http.Response) error {
body, _ := io.ReadAll(io.LimitReader(resp.Body, 1024))
return fmt.Errorf("citadel API error (HTTP %d): %s", resp.StatusCode, string(body))
}