//! Storage for quarantined assertions awaiting admin review. //! //! Quarantined assertions are stored at `\x00QUAR:{timestamp}:{hash_hex}` to enable //! efficient time-ordered scanning. Admin can approve or reject quarantined content. //! //! # Flow //! //! 1. Content Defense flags assertion for quarantine //! 2. Assertion is stored in quarantine (NOT indexed) //! 3. Admin reviews via API (`GET /v1/admin/quarantine`) //! 4. Admin approves → assertion is indexed normally //! 5. Admin rejects → assertion remains quarantined, logged for audit use crate::key_codec; use crate::{KVStore, Result, StorageError}; use async_trait::async_trait; use std::sync::Arc; use stemedb_core::types::{Hash, QuarantineEvent}; use tracing::{debug, instrument}; /// Storage trait for quarantined assertions. /// /// Provides operations for storing, listing, and resolving quarantined content. #[async_trait] pub trait QuarantineStore: Send + Sync { /// Write a quarantine event to storage. /// /// Key format: `\x00QUAR:{timestamp}:{hash_hex}` async fn write_quarantine(&self, event: &QuarantineEvent) -> Result<()>; /// Get a specific quarantine event by hash. /// /// Returns `None` if the event does not exist. async fn get_quarantine(&self, hash: &Hash) -> Result>; /// Get all pending (unreviewed) quarantine events. /// /// Returns events ordered by timestamp (oldest first). /// Optionally limit the number of results. async fn list_pending(&self, limit: usize) -> Result>; /// Approve a quarantined assertion. /// /// Marks the event as reviewed and approved, returns the event with its /// original assertion bytes for indexing. /// /// Returns `Err(NotFound)` if the event does not exist. async fn approve(&self, hash: &Hash) -> Result; /// Reject a quarantined assertion. /// /// Marks the event as reviewed and rejected. The assertion will remain /// in quarantine for audit trail. /// /// Returns `Err(NotFound)` if the event does not exist. async fn reject(&self, hash: &Hash) -> Result<()>; /// Get the total count of pending quarantine events. async fn pending_count(&self) -> Result; /// Get all quarantine events (including reviewed ones). /// /// Returns events ordered by timestamp (oldest first). async fn list_all(&self, limit: usize) -> Result>; } /// Generic implementation of `QuarantineStore` backed by any `KVStore`. pub struct GenericQuarantineStore { store: Arc, } impl GenericQuarantineStore { /// Create a new quarantine store backed by the given KV store. pub fn new(store: Arc) -> Self { Self { store } } /// Parse a key into (timestamp, hash). /// /// Key format: `\x00QUAR:{timestamp}:{hash_hex}` /// /// Note: This function is primarily used for testing key format validation. /// Production code uses the secondary index for O(1) lookups by hash. #[cfg(test)] fn parse_key(key: &[u8]) -> Option<(u64, Hash)> { let key_str = std::str::from_utf8(key).ok()?; // Remove the leading \x00 if present let key_str = key_str.strip_prefix('\x00').unwrap_or(key_str); let parts: Vec<&str> = key_str.split(':').collect(); if parts.len() != 3 || parts[0] != "QUAR" { return None; } let timestamp = parts[1].parse::().ok()?; let hash_hex = parts[2]; let hash_bytes = hex::decode(hash_hex).ok()?; if hash_bytes.len() != 32 { return None; } let mut hash = [0u8; 32]; hash.copy_from_slice(&hash_bytes); Some((timestamp, hash)) } } #[async_trait] impl QuarantineStore for GenericQuarantineStore { #[instrument(skip(self, event), fields(hash = %hex::encode(event.hash), reason = ?event.reason))] async fn write_quarantine(&self, event: &QuarantineEvent) -> Result<()> { let hash_hex = hex::encode(event.hash); let key = key_codec::quarantine_key(event.timestamp, &hash_hex); let serialized = stemedb_core::serde::serialize(event) .map_err(|e| StorageError::Serialization(e.to_string()))?; // Write quarantine entry self.store.put(&key, &serialized).await?; // Write hash→timestamp index for O(1) lookup by hash let index_key = key_codec::quarantine_hash_index_key(&hash_hex); self.store.put(&index_key, &event.timestamp.to_be_bytes()).await?; debug!( hash = %hash_hex, reason = ?event.reason, quality_score = event.quality.score, "Wrote quarantine event" ); Ok(()) } #[instrument(skip(self), fields(hash = %hex::encode(hash)))] async fn get_quarantine(&self, hash: &Hash) -> Result> { let hash_hex = hex::encode(hash); // O(1) lookup via secondary index let index_key = key_codec::quarantine_hash_index_key(&hash_hex); let timestamp_bytes = match self.store.get(&index_key).await? { Some(bytes) if bytes.len() == 8 => bytes, Some(_) => { debug!(hash = %hash_hex, "Invalid timestamp in quarantine index"); return Ok(None); } None => return Ok(None), }; let mut ts_arr = [0u8; 8]; ts_arr.copy_from_slice(×tamp_bytes); let timestamp = u64::from_be_bytes(ts_arr); // Lookup the actual quarantine entry let key = key_codec::quarantine_key(timestamp, &hash_hex); match self.store.get(&key).await? { Some(data) => { let event: QuarantineEvent = stemedb_core::serde::deserialize(&data) .map_err(|e| StorageError::Serialization(e.to_string()))?; Ok(Some(event)) } None => Ok(None), } } #[instrument(skip(self))] async fn list_pending(&self, limit: usize) -> Result> { let entries = self.store.scan_prefix(&key_codec::quarantine_scan_prefix()).await?; let mut events = Vec::new(); for (_key, data) in entries { if events.len() >= limit { break; } match stemedb_core::serde::deserialize::(&data) { Ok(event) if event.is_pending() => events.push(event), Ok(_) => {} // Skip reviewed events Err(e) => { debug!(error = %e, "Skipping malformed quarantine event"); } } } // Sort by timestamp (oldest first) - should already be sorted by key prefix events.sort_by_key(|e| e.timestamp); debug!(count = events.len(), limit = limit, "Retrieved pending quarantine events"); Ok(events) } #[instrument(skip(self), fields(hash = %hex::encode(hash)))] async fn approve(&self, hash: &Hash) -> Result { let hash_hex = hex::encode(hash); // O(1) lookup via secondary index let index_key = key_codec::quarantine_hash_index_key(&hash_hex); let timestamp_bytes = match self.store.get(&index_key).await? { Some(bytes) if bytes.len() == 8 => bytes, _ => { debug!(hash = %hash_hex, "Quarantine event not found"); return Err(StorageError::NotFound(hash_hex)); } }; let mut ts_arr = [0u8; 8]; ts_arr.copy_from_slice(×tamp_bytes); let timestamp = u64::from_be_bytes(ts_arr); let key = key_codec::quarantine_key(timestamp, &hash_hex); let data = self.store.get(&key).await?.ok_or_else(|| { debug!(hash = %hash_hex, "Quarantine entry missing despite index"); StorageError::NotFound(hash_hex.clone()) })?; let mut event: QuarantineEvent = stemedb_core::serde::deserialize(&data) .map_err(|e| StorageError::Serialization(e.to_string()))?; if event.reviewed { // Already reviewed, return as-is return Ok(event); } event.mark_reviewed(true); let serialized = stemedb_core::serde::serialize(&event) .map_err(|e| StorageError::Serialization(e.to_string()))?; self.store.put(&key, &serialized).await?; debug!(hash = %hash_hex, "Approved quarantine event"); Ok(event) } #[instrument(skip(self), fields(hash = %hex::encode(hash)))] async fn reject(&self, hash: &Hash) -> Result<()> { let hash_hex = hex::encode(hash); // O(1) lookup via secondary index let index_key = key_codec::quarantine_hash_index_key(&hash_hex); let timestamp_bytes = match self.store.get(&index_key).await? { Some(bytes) if bytes.len() == 8 => bytes, _ => { debug!(hash = %hash_hex, "Quarantine event not found"); return Err(StorageError::NotFound(hash_hex)); } }; let mut ts_arr = [0u8; 8]; ts_arr.copy_from_slice(×tamp_bytes); let timestamp = u64::from_be_bytes(ts_arr); let key = key_codec::quarantine_key(timestamp, &hash_hex); let data = self.store.get(&key).await?.ok_or_else(|| { debug!(hash = %hash_hex, "Quarantine entry missing despite index"); StorageError::NotFound(hash_hex.clone()) })?; let mut event: QuarantineEvent = stemedb_core::serde::deserialize(&data) .map_err(|e| StorageError::Serialization(e.to_string()))?; if event.reviewed { // Already reviewed return Ok(()); } event.mark_reviewed(false); let serialized = stemedb_core::serde::serialize(&event) .map_err(|e| StorageError::Serialization(e.to_string()))?; self.store.put(&key, &serialized).await?; debug!(hash = %hash_hex, "Rejected quarantine event"); Ok(()) } #[instrument(skip(self))] async fn pending_count(&self) -> Result { let entries = self.store.scan_prefix(&key_codec::quarantine_scan_prefix()).await?; let mut count = 0; for (_key, data) in entries { match stemedb_core::serde::deserialize::(&data) { Ok(event) if event.is_pending() => count += 1, Ok(_) => {} Err(e) => { debug!(error = %e, "Skipping malformed quarantine event"); } } } debug!(count = count, "Counted pending quarantine events"); Ok(count) } #[instrument(skip(self))] async fn list_all(&self, limit: usize) -> Result> { let entries = self.store.scan_prefix(&key_codec::quarantine_scan_prefix()).await?; let mut events = Vec::new(); for (_key, data) in entries { if events.len() >= limit { break; } match stemedb_core::serde::deserialize::(&data) { Ok(event) => events.push(event), Err(e) => { debug!(error = %e, "Skipping malformed quarantine event"); } } } events.sort_by_key(|e| e.timestamp); debug!(count = events.len(), limit = limit, "Retrieved all quarantine events"); Ok(events) } } #[cfg(test)] mod tests { use super::*; use crate::HybridStore; use stemedb_core::types::{ContentQuality, QuarantineReason}; fn create_event(hash: Hash, reason: QuarantineReason, timestamp: u64) -> QuarantineEvent { QuarantineEvent::new( hash, vec![1, 2, 3, 4], // Mock assertion bytes reason, ContentQuality::new(), timestamp, ) } #[tokio::test] async fn test_write_and_get_quarantine() { let store = Arc::new(HybridStore::open_temp().expect("store")); let quar_store = GenericQuarantineStore::new(store); let hash = [1u8; 32]; let event = create_event(hash, QuarantineReason::Duplicate, 1000); quar_store.write_quarantine(&event).await.expect("write"); let retrieved = quar_store.get_quarantine(&hash).await.expect("get").expect("should exist"); assert_eq!(retrieved.hash, hash); assert_eq!(retrieved.reason, QuarantineReason::Duplicate); assert!(!retrieved.reviewed); } #[tokio::test] async fn test_list_pending() { let store = Arc::new(HybridStore::open_temp().expect("store")); let quar_store = GenericQuarantineStore::new(store); let e1 = create_event([1u8; 32], QuarantineReason::LowQuality, 1000); let e2 = create_event([2u8; 32], QuarantineReason::Duplicate, 2000); let e3 = create_event([3u8; 32], QuarantineReason::UntrustedHighConfidence, 3000); quar_store.write_quarantine(&e1).await.expect("write e1"); quar_store.write_quarantine(&e2).await.expect("write e2"); quar_store.write_quarantine(&e3).await.expect("write e3"); // All pending let pending = quar_store.list_pending(10).await.expect("list_pending"); assert_eq!(pending.len(), 3); // Approve one quar_store.approve(&e1.hash).await.expect("approve"); // Two pending let pending_after = quar_store.list_pending(10).await.expect("list_pending"); assert_eq!(pending_after.len(), 2); } #[tokio::test] async fn test_approve() { let store = Arc::new(HybridStore::open_temp().expect("store")); let quar_store = GenericQuarantineStore::new(store); let hash = [42u8; 32]; let event = create_event(hash, QuarantineReason::Duplicate, 1000); quar_store.write_quarantine(&event).await.expect("write"); // Approve let approved = quar_store.approve(&hash).await.expect("approve"); assert!(approved.reviewed); assert_eq!(approved.approved, Some(true)); // Verify persisted let retrieved = quar_store.get_quarantine(&hash).await.expect("get").expect("should exist"); assert!(retrieved.reviewed); assert_eq!(retrieved.approved, Some(true)); } #[tokio::test] async fn test_reject() { let store = Arc::new(HybridStore::open_temp().expect("store")); let quar_store = GenericQuarantineStore::new(store); let hash = [42u8; 32]; let event = create_event(hash, QuarantineReason::LowQuality, 1000); quar_store.write_quarantine(&event).await.expect("write"); // Reject quar_store.reject(&hash).await.expect("reject"); // Verify persisted let retrieved = quar_store.get_quarantine(&hash).await.expect("get").expect("should exist"); assert!(retrieved.reviewed); assert_eq!(retrieved.approved, Some(false)); } #[tokio::test] async fn test_approve_nonexistent() { let store = Arc::new(HybridStore::open_temp().expect("store")); let quar_store = GenericQuarantineStore::new(store); let nonexistent_hash = [99u8; 32]; let result = quar_store.approve(&nonexistent_hash).await; assert!(matches!(result, Err(StorageError::NotFound(_)))); } #[tokio::test] async fn test_reject_nonexistent() { let store = Arc::new(HybridStore::open_temp().expect("store")); let quar_store = GenericQuarantineStore::new(store); let nonexistent_hash = [99u8; 32]; let result = quar_store.reject(&nonexistent_hash).await; assert!(matches!(result, Err(StorageError::NotFound(_)))); } #[tokio::test] async fn test_pending_count() { let store = Arc::new(HybridStore::open_temp().expect("store")); let quar_store = GenericQuarantineStore::new(store); let e1 = create_event([1u8; 32], QuarantineReason::LowQuality, 1000); let e2 = create_event([2u8; 32], QuarantineReason::Duplicate, 2000); quar_store.write_quarantine(&e1).await.expect("write e1"); quar_store.write_quarantine(&e2).await.expect("write e2"); assert_eq!(quar_store.pending_count().await.expect("count"), 2); quar_store.approve(&e1.hash).await.expect("approve"); assert_eq!(quar_store.pending_count().await.expect("count"), 1); } #[tokio::test] async fn test_list_pending_with_limit() { let store = Arc::new(HybridStore::open_temp().expect("store")); let quar_store = GenericQuarantineStore::new(store); for i in 0..10 { let event = create_event([i; 32], QuarantineReason::LowQuality, (i as u64) * 1000); quar_store.write_quarantine(&event).await.expect("write"); } let limited = quar_store.list_pending(3).await.expect("list_pending"); assert_eq!(limited.len(), 3); } #[tokio::test] async fn test_parse_key() { let event = create_event([42u8; 32], QuarantineReason::Duplicate, 12345); let key = key_codec::quarantine_key(event.timestamp, &hex::encode(event.hash)); let (timestamp, hash) = GenericQuarantineStore::::parse_key(&key).expect("parse should succeed"); assert_eq!(timestamp, 12345); assert_eq!(hash, event.hash); } }