All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
Adds complete media storage pipeline with GCS presigned uploads, AI image/video/text generation via queue-based workers, realtime SSE event streaming, and comprehensive skeleton packages (storage, mediagen, textgen, generation, realtime, persona, routing, ai-client). Includes security fixes for media delete authorization, nil pointer guards in handlers, video persistence via download-then-upload, consistent signed URLs, and Image→ImageIcon rename to avoid DOM collision. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
162 lines
4.2 KiB
Go
162 lines
4.2 KiB
Go
package citadel
|
|
|
|
import (
|
|
"context"
|
|
"log/slog"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/orchard9/rdev/internal/domain"
|
|
"github.com/orchard9/rdev/internal/port"
|
|
)
|
|
|
|
// AuditShipper wraps an existing AuditLogger and also ships audit events to Citadel.
|
|
// It buffers events and flushes them in batches for efficiency.
|
|
type AuditShipper struct {
|
|
inner port.AuditLogger
|
|
client port.CitadelClient
|
|
tenantID string // rdev-platform tenant ID
|
|
logger *slog.Logger
|
|
|
|
mu sync.Mutex
|
|
buffer []map[string]any
|
|
done chan struct{}
|
|
}
|
|
|
|
// NewAuditShipper wraps an existing AuditLogger with Citadel shipping.
|
|
// tenantID is the Citadel tenant ID for the rdev-platform environment.
|
|
func NewAuditShipper(inner port.AuditLogger, client port.CitadelClient, tenantID string, logger *slog.Logger) *AuditShipper {
|
|
if logger == nil {
|
|
logger = slog.Default()
|
|
}
|
|
s := &AuditShipper{
|
|
inner: inner,
|
|
client: client,
|
|
tenantID: tenantID,
|
|
logger: logger.With("component", "audit_shipper"),
|
|
buffer: make([]map[string]any, 0, 64),
|
|
done: make(chan struct{}),
|
|
}
|
|
go s.flushLoop()
|
|
return s
|
|
}
|
|
|
|
// LogCommandStart records the start of a command and ships to Citadel.
|
|
func (s *AuditShipper) LogCommandStart(ctx context.Context, entry *domain.AuditLogEntry) error {
|
|
// Always write to primary store first
|
|
if err := s.inner.LogCommandStart(ctx, entry); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Buffer for Citadel (best-effort, don't block)
|
|
s.enqueue(map[string]any{
|
|
"message": "audit: command started",
|
|
"level": "info",
|
|
"service": "rdev-platform",
|
|
"event_type": "audit",
|
|
"audit_action": "command_start",
|
|
"command_id": entry.CommandID,
|
|
"command_type": string(entry.CommandType),
|
|
"project_id": entry.ProjectID,
|
|
"api_key_id": entry.APIKeyID,
|
|
"client_ip": entry.ClientIP,
|
|
"timestamp": entry.StartedAt.Format(time.RFC3339Nano),
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
// LogCommandEnd records the completion of a command and ships to Citadel.
|
|
func (s *AuditShipper) LogCommandEnd(ctx context.Context, commandID string, result *domain.AuditResult) error {
|
|
// Always write to primary store first
|
|
if err := s.inner.LogCommandEnd(ctx, commandID, result); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Buffer for Citadel (best-effort)
|
|
s.enqueue(map[string]any{
|
|
"message": "audit: command completed",
|
|
"level": auditStatusToLevel(result.Status),
|
|
"service": "rdev-platform",
|
|
"event_type": "audit",
|
|
"audit_action": "command_end",
|
|
"command_id": commandID,
|
|
"status": string(result.Status),
|
|
"exit_code": result.ExitCode,
|
|
"duration_ms": result.DurationMs,
|
|
"error_message": result.ErrorMessage,
|
|
"output_size_bytes": result.OutputSizeBytes,
|
|
"timestamp": time.Now().Format(time.RFC3339Nano),
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
// List delegates to the inner AuditLogger.
|
|
func (s *AuditShipper) List(ctx context.Context, filters domain.AuditFilters) ([]domain.AuditLogEntry, error) {
|
|
return s.inner.List(ctx, filters)
|
|
}
|
|
|
|
// Get delegates to the inner AuditLogger.
|
|
func (s *AuditShipper) Get(ctx context.Context, commandID string) (*domain.AuditLogEntry, error) {
|
|
return s.inner.Get(ctx, commandID)
|
|
}
|
|
|
|
// Close flushes remaining events and stops the background goroutine.
|
|
func (s *AuditShipper) Close() {
|
|
close(s.done)
|
|
s.flush()
|
|
}
|
|
|
|
func (s *AuditShipper) enqueue(event map[string]any) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.buffer = append(s.buffer, event)
|
|
}
|
|
|
|
func (s *AuditShipper) flushLoop() {
|
|
ticker := time.NewTicker(5 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
s.flush()
|
|
case <-s.done:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *AuditShipper) flush() {
|
|
s.mu.Lock()
|
|
if len(s.buffer) == 0 {
|
|
s.mu.Unlock()
|
|
return
|
|
}
|
|
events := s.buffer
|
|
s.buffer = make([]map[string]any, 0, 64)
|
|
s.mu.Unlock()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
if err := s.client.IngestBatch(ctx, s.tenantID, events); err != nil {
|
|
s.logger.Warn("failed to ship audit events to citadel",
|
|
"error", err,
|
|
"event_count", len(events),
|
|
)
|
|
}
|
|
}
|
|
|
|
func auditStatusToLevel(status domain.AuditStatus) string {
|
|
switch status {
|
|
case domain.AuditStatusError:
|
|
return "error"
|
|
case domain.AuditStatusCancelled:
|
|
return "warn"
|
|
default:
|
|
return "info"
|
|
}
|
|
}
|