All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
Adds complete media storage pipeline with GCS presigned uploads, AI image/video/text generation via queue-based workers, realtime SSE event streaming, and comprehensive skeleton packages (storage, mediagen, textgen, generation, realtime, persona, routing, ai-client). Includes security fixes for media delete authorization, nil pointer guards in handlers, video persistence via download-then-upload, consistent signed URLs, and Image→ImageIcon rename to avoid DOM collision. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
245 lines
6.6 KiB
Go
245 lines
6.6 KiB
Go
// Package citadel provides an HTTP client adapter for the Citadel logging platform.
|
|
//
|
|
// This adapter communicates with a partner-hosted Citadel instance
|
|
// (e.g., citadel-staging.orchard9.ai) to manage environments and ship logs.
|
|
package citadel
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"net/http"
|
|
"time"
|
|
|
|
"github.com/orchard9/rdev/internal/port"
|
|
)
|
|
|
|
// Client implements port.CitadelClient via HTTP.
|
|
type Client struct {
|
|
baseURL string
|
|
apiKey string
|
|
httpClient *http.Client
|
|
logger *slog.Logger
|
|
}
|
|
|
|
// Ensure Client implements port.CitadelClient.
|
|
var _ port.CitadelClient = (*Client)(nil)
|
|
|
|
// Config holds configuration for the Citadel client.
|
|
type Config struct {
|
|
// URL is the base URL of the Citadel instance (e.g., "https://citadel-staging.orchard9.ai").
|
|
URL string
|
|
// APIKey is the API key for authentication (starts with "ck_live_" or "ck_dev_").
|
|
APIKey string
|
|
}
|
|
|
|
// NewClient creates a new Citadel HTTP client.
|
|
func NewClient(cfg Config, logger *slog.Logger) *Client {
|
|
if logger == nil {
|
|
logger = slog.Default()
|
|
}
|
|
return &Client{
|
|
baseURL: cfg.URL,
|
|
apiKey: cfg.APIKey,
|
|
httpClient: &http.Client{
|
|
Timeout: 30 * time.Second,
|
|
},
|
|
logger: logger.With("component", "citadel_client"),
|
|
}
|
|
}
|
|
|
|
// CreateEnvironment creates a new Citadel environment.
|
|
func (c *Client) CreateEnvironment(ctx context.Context, name string) (*port.CitadelEnvironment, error) {
|
|
body := map[string]string{"name": name}
|
|
data, err := json.Marshal(body)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("marshal request: %w", err)
|
|
}
|
|
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/api/v1/environments", bytes.NewReader(data))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create request: %w", err)
|
|
}
|
|
c.setHeaders(req)
|
|
|
|
resp, err := c.httpClient.Do(req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create environment request failed: %w", err)
|
|
}
|
|
defer func() { _ = resp.Body.Close() }()
|
|
|
|
if resp.StatusCode == http.StatusConflict {
|
|
// Environment already exists — fetch and return it
|
|
c.logger.Info("citadel environment already exists, fetching", "name", name)
|
|
return c.GetEnvironment(ctx, name)
|
|
}
|
|
|
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
return nil, c.readError(resp)
|
|
}
|
|
|
|
var result struct {
|
|
TenantID string `json:"tenant_id"`
|
|
Name string `json:"name"`
|
|
}
|
|
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
|
return nil, fmt.Errorf("decode response: %w", err)
|
|
}
|
|
|
|
c.logger.Info("citadel environment created", "name", name, "tenant_id", result.TenantID)
|
|
|
|
return &port.CitadelEnvironment{
|
|
TenantID: result.TenantID,
|
|
Name: result.Name,
|
|
}, nil
|
|
}
|
|
|
|
// DeleteEnvironment removes a Citadel environment.
|
|
func (c *Client) DeleteEnvironment(ctx context.Context, tenantID string) error {
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, c.baseURL+"/api/v1/environments/"+tenantID, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("create request: %w", err)
|
|
}
|
|
c.setHeaders(req)
|
|
|
|
resp, err := c.httpClient.Do(req)
|
|
if err != nil {
|
|
return fmt.Errorf("delete environment request failed: %w", err)
|
|
}
|
|
defer func() { _ = resp.Body.Close() }()
|
|
|
|
// 404 is fine — environment may already be gone
|
|
if resp.StatusCode == http.StatusNotFound {
|
|
return nil
|
|
}
|
|
|
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
return c.readError(resp)
|
|
}
|
|
|
|
c.logger.Info("citadel environment deleted", "tenant_id", tenantID)
|
|
return nil
|
|
}
|
|
|
|
// GetEnvironment returns an environment by name.
|
|
func (c *Client) GetEnvironment(ctx context.Context, name string) (*port.CitadelEnvironment, error) {
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+"/api/v1/environments?name="+name, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create request: %w", err)
|
|
}
|
|
c.setHeaders(req)
|
|
|
|
resp, err := c.httpClient.Do(req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("get environment request failed: %w", err)
|
|
}
|
|
defer func() { _ = resp.Body.Close() }()
|
|
|
|
if resp.StatusCode == http.StatusNotFound {
|
|
return nil, nil
|
|
}
|
|
|
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
return nil, c.readError(resp)
|
|
}
|
|
|
|
var result struct {
|
|
TenantID string `json:"tenant_id"`
|
|
Name string `json:"name"`
|
|
}
|
|
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
|
return nil, fmt.Errorf("decode response: %w", err)
|
|
}
|
|
|
|
return &port.CitadelEnvironment{
|
|
TenantID: result.TenantID,
|
|
Name: result.Name,
|
|
}, nil
|
|
}
|
|
|
|
// IngestEvent sends a single log event to Citadel.
|
|
func (c *Client) IngestEvent(ctx context.Context, tenantID string, event map[string]any) error {
|
|
data, err := json.Marshal(event)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal event: %w", err)
|
|
}
|
|
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/api/v1/ingest/event", bytes.NewReader(data))
|
|
if err != nil {
|
|
return fmt.Errorf("create request: %w", err)
|
|
}
|
|
c.setHeaders(req)
|
|
req.Header.Set("X-Tenant-ID", tenantID)
|
|
|
|
resp, err := c.httpClient.Do(req)
|
|
if err != nil {
|
|
return fmt.Errorf("ingest event request failed: %w", err)
|
|
}
|
|
defer func() { _ = resp.Body.Close() }()
|
|
|
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
return c.readError(resp)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// IngestBatch sends a batch of log events to Citadel.
|
|
func (c *Client) IngestBatch(ctx context.Context, tenantID string, events []map[string]any) error {
|
|
data, err := json.Marshal(events)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal events: %w", err)
|
|
}
|
|
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/api/v1/ingest", bytes.NewReader(data))
|
|
if err != nil {
|
|
return fmt.Errorf("create request: %w", err)
|
|
}
|
|
c.setHeaders(req)
|
|
req.Header.Set("X-Tenant-ID", tenantID)
|
|
|
|
resp, err := c.httpClient.Do(req)
|
|
if err != nil {
|
|
return fmt.Errorf("ingest batch request failed: %w", err)
|
|
}
|
|
defer func() { _ = resp.Body.Close() }()
|
|
|
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
return c.readError(resp)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Healthy returns true if the Citadel instance is reachable.
|
|
func (c *Client) Healthy(ctx context.Context) bool {
|
|
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
defer cancel()
|
|
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+"/health", nil)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
resp, err := c.httpClient.Do(req)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
defer func() { _ = resp.Body.Close() }()
|
|
|
|
return resp.StatusCode == http.StatusOK
|
|
}
|
|
|
|
func (c *Client) setHeaders(req *http.Request) {
|
|
req.Header.Set("Content-Type", "application/json")
|
|
req.Header.Set("Authorization", "Bearer "+c.apiKey)
|
|
}
|
|
|
|
func (c *Client) readError(resp *http.Response) error {
|
|
body, _ := io.ReadAll(io.LimitReader(resp.Body, 1024))
|
|
return fmt.Errorf("citadel API error (HTTP %d): %s", resp.StatusCode, string(body))
|
|
}
|