stemedb/crates/stemedb-ingest/src/content_defense.rs
jordan a734be3a0d feat: Phase 7 Content Defense + code structure refactoring
Content Defense (Phase 7):
- Add SimilarityIndex with MinHash/LSH for near-duplicate detection
- Add QuarantineStore for flagged assertions awaiting admin review
- Add CircuitBreakerStore for per-agent circuit breaker state
- Add ContentDefenseLayer for ingestion pipeline integration
- Add API endpoints for quarantine and circuit breaker management
- Add research module with gap detection and documentation fetching

Code Structure Improvements:
- Extract research CLI commands to research_commands.rs
- Extract API routers to routers.rs module
- Extract key_codec extraction functions to separate module
- Extract test modules to separate files across multiple crates
- All files now under 500 line limit per pre-commit hook

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 12:44:05 -07:00

453 lines
16 KiB
Rust

//! Content defense layer for spam detection and quality control.
//!
//! This module provides the `ContentDefenseLayer` that coordinates:
//! - Bloom filter for fast duplicate detection
//! - MinHash + LSH for near-duplicate detection
//! - Quality scoring for spam and low-quality content detection
//! - Suspicious pattern detection (untrusted + high confidence)
//!
//! # Usage
//!
//! ```ignore
//! use stemedb_ingest::ContentDefenseLayer;
//!
//! let defense = ContentDefenseLayer::new(
//! similarity_index,
//! quality_scorer,
//! quarantine_store,
//! );
//!
//! // Check content before indexing
//! let decision = defense.check(&assertion, trust_tier).await?;
//! match decision {
//! QuarantineDecision::Pass => { /* index normally */ }
//! QuarantineDecision::Quarantine(reason) => { /* store in quarantine */ }
//! }
//! ```
use std::sync::Arc;
use stemedb_core::types::{
Assertion, ContentQuality, Hash, QuarantineDecision, QuarantineEvent, QuarantineReason,
TrustTier,
};
use stemedb_storage::{
ContentQualityScorer, QualityScoringConfig, QuarantineStore, Result as StorageResult,
SimilarityIndex,
};
use tracing::{debug, info, instrument};
use crate::error::Result;
/// Configuration for the content defense layer.
#[derive(Debug, Clone)]
pub struct ContentDefenseConfig {
/// Enable near-duplicate detection via MinHash + LSH.
pub enable_duplicate_detection: bool,
/// Enable quality scoring (entropy, length, structure).
pub enable_quality_scoring: bool,
/// Enable suspicious pattern detection (untrusted + high confidence).
pub enable_pattern_detection: bool,
/// Quality scoring configuration.
pub quality_config: QualityScoringConfig,
}
impl Default for ContentDefenseConfig {
fn default() -> Self {
Self {
enable_duplicate_detection: true,
enable_quality_scoring: true,
enable_pattern_detection: true,
quality_config: QualityScoringConfig::default(),
}
}
}
/// Content defense layer that coordinates spam and quality checks.
///
/// This layer sits between signature verification and storage in the
/// ingestion pipeline. It checks each assertion against:
///
/// 1. **Bloom filter**: Fast "definitely not duplicate" check
/// 2. **MinHash + LSH**: Near-duplicate detection
/// 3. **Quality scoring**: Entropy, length, structure checks
/// 4. **Pattern detection**: Suspicious agent behavior
///
/// If any check fails, the assertion is quarantined for admin review.
pub struct ContentDefenseLayer<S, Q> {
/// Similarity index for duplicate detection.
similarity_index: Arc<S>,
/// Quality scorer for content analysis.
quality_scorer: ContentQualityScorer,
/// Quarantine store for flagged assertions.
quarantine_store: Arc<Q>,
/// Configuration.
config: ContentDefenseConfig,
}
impl<S: SimilarityIndex, Q: QuarantineStore> ContentDefenseLayer<S, Q> {
/// Create a new content defense layer.
pub fn new(
similarity_index: Arc<S>,
quarantine_store: Arc<Q>,
config: ContentDefenseConfig,
) -> Self {
let quality_scorer = ContentQualityScorer::new(config.quality_config.clone());
Self { similarity_index, quality_scorer, quarantine_store, config }
}
/// Create a new content defense layer with default configuration.
pub fn with_defaults(similarity_index: Arc<S>, quarantine_store: Arc<Q>) -> Self {
Self::new(similarity_index, quarantine_store, ContentDefenseConfig::default())
}
/// Get the configuration.
pub fn config(&self) -> &ContentDefenseConfig {
&self.config
}
/// Check an assertion against all defense mechanisms.
///
/// Returns a decision on whether to pass or quarantine the assertion.
///
/// # Arguments
///
/// * `assertion` - The assertion to check
/// * `assertion_bytes` - The serialized assertion (for quarantine storage)
/// * `assertion_hash` - The content hash of the assertion
/// * `trust_tier` - The submitting agent's trust tier
///
/// # Returns
///
/// - `Ok((QuarantineDecision::Pass, quality))` - Assertion passed all checks
/// - `Ok((QuarantineDecision::Quarantine(reason), quality))` - Assertion should be quarantined
#[instrument(skip(self, assertion, assertion_bytes), fields(
subject = %assertion.subject,
predicate = %assertion.predicate,
trust_tier = ?trust_tier,
))]
pub async fn check(
&self,
assertion: &Assertion,
assertion_bytes: &[u8],
assertion_hash: Hash,
trust_tier: TrustTier,
) -> Result<(QuarantineDecision, ContentQuality)> {
// 1. Quality scoring (fast, no I/O)
let mut quality = self.quality_scorer.score(assertion, trust_tier);
// 2. Check for suspicious pattern (untrusted + high confidence)
if self.config.enable_pattern_detection
&& self.quality_scorer.is_suspicious_pattern(trust_tier, assertion.confidence)
{
debug!(
confidence = assertion.confidence,
"Suspicious pattern: untrusted agent with high confidence"
);
return self
.quarantine(
assertion_hash,
assertion_bytes,
QuarantineReason::UntrustedHighConfidence,
quality,
assertion,
)
.await;
}
// 3. Check quality threshold
if self.config.enable_quality_scoring && !self.quality_scorer.meets_threshold(&quality) {
debug!(score = quality.score, entropy = quality.entropy, "Low quality score");
return self
.quarantine(
assertion_hash,
assertion_bytes,
QuarantineReason::LowQuality,
quality,
assertion,
)
.await;
}
// 4. Check for duplicates (requires I/O)
if self.config.enable_duplicate_detection {
let result = self
.similarity_index
.check_similarity(&assertion.subject, &assertion.predicate)
.await
.map_err(crate::error::IngestError::Storage)?;
if result.is_duplicate {
quality.duplicate = true;
debug!(
max_similarity = result.max_similarity,
similar_count = result.similar_entries.len(),
"Near-duplicate detected"
);
return self
.quarantine_with_similar(
assertion_hash,
assertion_bytes,
QuarantineReason::Duplicate,
quality,
result.similar_entries.first().copied(),
assertion,
)
.await;
}
}
debug!("Content defense: passed all checks");
Ok((QuarantineDecision::Pass, quality))
}
/// Add an assertion to the similarity index after it passes all checks.
///
/// Call this after successfully indexing an assertion so future duplicates
/// can be detected.
#[instrument(skip(self, assertion), fields(
subject = %assertion.subject,
predicate = %assertion.predicate,
))]
pub async fn add_to_index(&self, assertion: &Assertion, timestamp: u64) -> Result<()> {
if self.config.enable_duplicate_detection {
self.similarity_index
.add(&assertion.subject, &assertion.predicate, timestamp)
.await
.map_err(crate::error::IngestError::Storage)?;
}
Ok(())
}
/// Quarantine an assertion.
async fn quarantine(
&self,
hash: Hash,
assertion_bytes: &[u8],
reason: QuarantineReason,
quality: ContentQuality,
assertion: &Assertion,
) -> Result<(QuarantineDecision, ContentQuality)> {
self.quarantine_with_similar(hash, assertion_bytes, reason, quality, None, assertion).await
}
/// Quarantine an assertion with a reference to a similar entry.
async fn quarantine_with_similar(
&self,
hash: Hash,
assertion_bytes: &[u8],
reason: QuarantineReason,
quality: ContentQuality,
similar_to: Option<Hash>,
assertion: &Assertion,
) -> Result<(QuarantineDecision, ContentQuality)> {
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
let mut event = QuarantineEvent::new(
hash,
assertion_bytes.to_vec(),
reason,
quality.clone(),
timestamp,
);
if let Some(similar) = similar_to {
event = event.with_similar_to(similar);
}
// Extract agent ID from first signature if available
if let Some(sig) = assertion.signatures.first() {
event = event.with_agent_id(sig.agent_id);
}
self.quarantine_store
.write_quarantine(&event)
.await
.map_err(crate::error::IngestError::Storage)?;
info!(
hash = %hex::encode(hash),
reason = ?reason,
"Assertion quarantined"
);
Ok((QuarantineDecision::Quarantine(reason), quality))
}
/// Rebuild the similarity index Bloom filter from persisted data.
///
/// Call this on startup to restore in-memory state.
pub async fn rebuild_bloom_filter(&self) -> StorageResult<usize> {
self.similarity_index.rebuild_bloom_filter().await
}
/// Get the number of pending quarantine events.
pub async fn pending_quarantine_count(&self) -> StorageResult<usize> {
self.quarantine_store.pending_count().await
}
}
#[cfg(test)]
mod tests {
use super::*;
use stemedb_core::testing::AssertionBuilder;
use stemedb_core::types::{LifecycleStage, ObjectValue};
use stemedb_storage::{GenericQuarantineStore, GenericSimilarityIndex, HybridStore};
fn create_test_assertion(subject: &str, predicate: &str) -> Assertion {
AssertionBuilder::new()
.subject(subject)
.predicate(predicate)
.object(ObjectValue::Text("test value for content defense".to_string()))
.confidence(0.5)
.lifecycle(LifecycleStage::Proposed)
.build()
}
#[tokio::test]
async fn test_pass_normal_assertion() {
let store = Arc::new(HybridStore::open_temp().expect("store"));
let similarity_index = Arc::new(GenericSimilarityIndex::with_defaults(Arc::clone(&store)));
let quarantine_store = Arc::new(GenericQuarantineStore::new(Arc::clone(&store)));
let defense = ContentDefenseLayer::with_defaults(similarity_index, quarantine_store);
let assertion = create_test_assertion("Tesla_Inc", "has_revenue");
let assertion_bytes = stemedb_core::serde::serialize(&assertion).expect("serialize");
let hash = *blake3::hash(&assertion_bytes).as_bytes();
let (decision, quality) = defense
.check(&assertion, &assertion_bytes, hash, TrustTier::Verified)
.await
.expect("check");
assert!(decision.is_pass(), "Normal assertion should pass");
assert!(quality.score >= 0.4, "Quality score should be acceptable");
}
#[tokio::test]
async fn test_quarantine_short_subject() {
let store = Arc::new(HybridStore::open_temp().expect("store"));
let similarity_index = Arc::new(GenericSimilarityIndex::with_defaults(Arc::clone(&store)));
let quarantine_store = Arc::new(GenericQuarantineStore::new(Arc::clone(&store)));
let defense = ContentDefenseLayer::with_defaults(similarity_index, quarantine_store);
let assertion = create_test_assertion("AB", "x");
let assertion_bytes = stemedb_core::serde::serialize(&assertion).expect("serialize");
let hash = *blake3::hash(&assertion_bytes).as_bytes();
let (decision, _quality) = defense
.check(&assertion, &assertion_bytes, hash, TrustTier::Verified)
.await
.expect("check");
assert!(decision.is_quarantine(), "Short content should be quarantined");
assert_eq!(decision.reason(), Some(QuarantineReason::LowQuality));
}
#[tokio::test]
async fn test_quarantine_untrusted_high_confidence() {
let store = Arc::new(HybridStore::open_temp().expect("store"));
let similarity_index = Arc::new(GenericSimilarityIndex::with_defaults(Arc::clone(&store)));
let quarantine_store = Arc::new(GenericQuarantineStore::new(Arc::clone(&store)));
let defense = ContentDefenseLayer::with_defaults(similarity_index, quarantine_store);
let mut assertion = create_test_assertion("Tesla_Inc", "has_revenue");
assertion.confidence = 0.95;
let assertion_bytes = stemedb_core::serde::serialize(&assertion).expect("serialize");
let hash = *blake3::hash(&assertion_bytes).as_bytes();
let (decision, _quality) = defense
.check(&assertion, &assertion_bytes, hash, TrustTier::Untrusted)
.await
.expect("check");
assert!(decision.is_quarantine(), "Untrusted + high confidence should be quarantined");
assert_eq!(decision.reason(), Some(QuarantineReason::UntrustedHighConfidence));
}
#[tokio::test]
async fn test_quarantine_duplicate() {
let store = Arc::new(HybridStore::open_temp().expect("store"));
let similarity_index = Arc::new(GenericSimilarityIndex::with_defaults(Arc::clone(&store)));
let quarantine_store = Arc::new(GenericQuarantineStore::new(Arc::clone(&store)));
let defense = ContentDefenseLayer::with_defaults(
Arc::clone(&similarity_index),
Arc::clone(&quarantine_store),
);
// First assertion - should pass
let assertion1 = create_test_assertion("Tesla_Inc", "has_revenue");
let assertion_bytes1 = stemedb_core::serde::serialize(&assertion1).expect("serialize");
let hash1 = *blake3::hash(&assertion_bytes1).as_bytes();
let (decision1, _) = defense
.check(&assertion1, &assertion_bytes1, hash1, TrustTier::Verified)
.await
.expect("check");
assert!(decision1.is_pass());
// Add to index
defense.add_to_index(&assertion1, 1000).await.expect("add_to_index");
// Second assertion with identical content - should be quarantined as duplicate
let assertion2 = create_test_assertion("Tesla_Inc", "has_revenue");
let assertion_bytes2 = stemedb_core::serde::serialize(&assertion2).expect("serialize");
let hash2 = *blake3::hash(&assertion_bytes2).as_bytes();
let (decision2, quality2) = defense
.check(&assertion2, &assertion_bytes2, hash2, TrustTier::Verified)
.await
.expect("check");
assert!(decision2.is_quarantine(), "Duplicate should be quarantined");
assert_eq!(decision2.reason(), Some(QuarantineReason::Duplicate));
assert!(quality2.duplicate, "Quality should indicate duplicate");
}
#[tokio::test]
async fn test_config_disable_duplicate_detection() {
let store = Arc::new(HybridStore::open_temp().expect("store"));
let similarity_index = Arc::new(GenericSimilarityIndex::with_defaults(Arc::clone(&store)));
let quarantine_store = Arc::new(GenericQuarantineStore::new(Arc::clone(&store)));
let config =
ContentDefenseConfig { enable_duplicate_detection: false, ..Default::default() };
let defense = ContentDefenseLayer::new(
Arc::clone(&similarity_index),
Arc::clone(&quarantine_store),
config,
);
// Add first assertion
let assertion1 = create_test_assertion("Tesla_Inc", "has_revenue");
defense.add_to_index(&assertion1, 1000).await.expect("add_to_index");
// Second identical assertion - should pass because duplicate detection is disabled
let assertion2 = create_test_assertion("Tesla_Inc", "has_revenue");
let assertion_bytes2 = stemedb_core::serde::serialize(&assertion2).expect("serialize");
let hash2 = *blake3::hash(&assertion_bytes2).as_bytes();
let (decision2, _) = defense
.check(&assertion2, &assertion_bytes2, hash2, TrustTier::Verified)
.await
.expect("check");
assert!(decision2.is_pass(), "Should pass when duplicate detection disabled");
}
}