rdev/internal/adapter/citadel/audit_shipper.go
jordan a8c8a0a14d
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
feat: add GCS-based persistent media storage, AI generation pipeline, and composable skeleton packages
Adds complete media storage pipeline with GCS presigned uploads, AI image/video/text generation
via queue-based workers, realtime SSE event streaming, and comprehensive skeleton packages
(storage, mediagen, textgen, generation, realtime, persona, routing, ai-client). Includes
security fixes for media delete authorization, nil pointer guards in handlers, video persistence
via download-then-upload, consistent signed URLs, and Image→ImageIcon rename to avoid DOM collision.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 21:29:09 -07:00

162 lines
4.2 KiB
Go

package citadel
import (
"context"
"log/slog"
"sync"
"time"
"github.com/orchard9/rdev/internal/domain"
"github.com/orchard9/rdev/internal/port"
)
// AuditShipper wraps an existing AuditLogger and also ships audit events to Citadel.
// It buffers events and flushes them in batches for efficiency.
type AuditShipper struct {
inner port.AuditLogger
client port.CitadelClient
tenantID string // rdev-platform tenant ID
logger *slog.Logger
mu sync.Mutex
buffer []map[string]any
done chan struct{}
}
// NewAuditShipper wraps an existing AuditLogger with Citadel shipping.
// tenantID is the Citadel tenant ID for the rdev-platform environment.
func NewAuditShipper(inner port.AuditLogger, client port.CitadelClient, tenantID string, logger *slog.Logger) *AuditShipper {
if logger == nil {
logger = slog.Default()
}
s := &AuditShipper{
inner: inner,
client: client,
tenantID: tenantID,
logger: logger.With("component", "audit_shipper"),
buffer: make([]map[string]any, 0, 64),
done: make(chan struct{}),
}
go s.flushLoop()
return s
}
// LogCommandStart records the start of a command and ships to Citadel.
func (s *AuditShipper) LogCommandStart(ctx context.Context, entry *domain.AuditLogEntry) error {
// Always write to primary store first
if err := s.inner.LogCommandStart(ctx, entry); err != nil {
return err
}
// Buffer for Citadel (best-effort, don't block)
s.enqueue(map[string]any{
"message": "audit: command started",
"level": "info",
"service": "rdev-platform",
"event_type": "audit",
"audit_action": "command_start",
"command_id": entry.CommandID,
"command_type": string(entry.CommandType),
"project_id": entry.ProjectID,
"api_key_id": entry.APIKeyID,
"client_ip": entry.ClientIP,
"timestamp": entry.StartedAt.Format(time.RFC3339Nano),
})
return nil
}
// LogCommandEnd records the completion of a command and ships to Citadel.
func (s *AuditShipper) LogCommandEnd(ctx context.Context, commandID string, result *domain.AuditResult) error {
// Always write to primary store first
if err := s.inner.LogCommandEnd(ctx, commandID, result); err != nil {
return err
}
// Buffer for Citadel (best-effort)
s.enqueue(map[string]any{
"message": "audit: command completed",
"level": auditStatusToLevel(result.Status),
"service": "rdev-platform",
"event_type": "audit",
"audit_action": "command_end",
"command_id": commandID,
"status": string(result.Status),
"exit_code": result.ExitCode,
"duration_ms": result.DurationMs,
"error_message": result.ErrorMessage,
"output_size_bytes": result.OutputSizeBytes,
"timestamp": time.Now().Format(time.RFC3339Nano),
})
return nil
}
// List delegates to the inner AuditLogger.
func (s *AuditShipper) List(ctx context.Context, filters domain.AuditFilters) ([]domain.AuditLogEntry, error) {
return s.inner.List(ctx, filters)
}
// Get delegates to the inner AuditLogger.
func (s *AuditShipper) Get(ctx context.Context, commandID string) (*domain.AuditLogEntry, error) {
return s.inner.Get(ctx, commandID)
}
// Close flushes remaining events and stops the background goroutine.
func (s *AuditShipper) Close() {
close(s.done)
s.flush()
}
func (s *AuditShipper) enqueue(event map[string]any) {
s.mu.Lock()
defer s.mu.Unlock()
s.buffer = append(s.buffer, event)
}
func (s *AuditShipper) flushLoop() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.flush()
case <-s.done:
return
}
}
}
func (s *AuditShipper) flush() {
s.mu.Lock()
if len(s.buffer) == 0 {
s.mu.Unlock()
return
}
events := s.buffer
s.buffer = make([]map[string]any, 0, 64)
s.mu.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := s.client.IngestBatch(ctx, s.tenantID, events); err != nil {
s.logger.Warn("failed to ship audit events to citadel",
"error", err,
"event_count", len(events),
)
}
}
func auditStatusToLevel(status domain.AuditStatus) string {
switch status {
case domain.AuditStatusError:
return "error"
case domain.AuditStatusCancelled:
return "warn"
default:
return "info"
}
}