stemedb/applications/aphoria/src/episteme/mod.rs
jordan a734be3a0d feat: Phase 7 Content Defense + code structure refactoring
Content Defense (Phase 7):
- Add SimilarityIndex with MinHash/LSH for near-duplicate detection
- Add QuarantineStore for flagged assertions awaiting admin review
- Add CircuitBreakerStore for per-agent circuit breaker state
- Add ContentDefenseLayer for ingestion pipeline integration
- Add API endpoints for quarantine and circuit breaker management
- Add research module with gap detection and documentation fetching

Code Structure Improvements:
- Extract research CLI commands to research_commands.rs
- Extract API routers to routers.rs module
- Extract key_codec extraction functions to separate module
- Extract test modules to separate files across multiple crates
- All files now under 500 line limit per pre-commit hook

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 12:44:05 -07:00

437 lines
16 KiB
Rust

//! Local Episteme integration for Aphoria.
//!
//! Provides a simplified interface to the local Episteme instance for:
//! - Ingesting assertions from extracted claims
//! - Querying for conflicts with authoritative sources
//! - Managing the authoritative corpus
//! - Auto-creating aliases when conflicts are detected (Phase 2A.3)
mod corpus;
#[cfg(test)]
mod tests;
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use ed25519_dalek::SigningKey;
use stemedb_core::types::{AliasOrigin, Assertion, ConceptAlias, ConceptPath, SourceClass};
use stemedb_ingest::{serialize_assertion, Ingestor};
use stemedb_storage::{AliasStore, GenericAliasStore, HybridStore};
use stemedb_wal::Journal;
use tokio::sync::Mutex;
use tracing::{debug, info, instrument, warn};
use crate::bridge::{claim_to_assertion, load_or_generate_key};
use crate::config::AphoriaConfig;
use crate::types::{ConflictResult, ConflictingSource, ExtractedClaim, Verdict};
use crate::AphoriaError;
use corpus::current_timestamp;
pub use corpus::{create_authoritative_assertion, create_authoritative_corpus};
/// In-memory index for concept matching by tail path segments.
///
/// Maps `{tail_seg1}/{tail_seg2}::{predicate}` → `Vec<Assertion>`.
/// This enables matching claims across different URI schemes by their
/// trailing path components.
///
/// # Example
///
/// Both of these subjects produce the same key `"tls/cert_verification::enabled"`:
/// - `rfc://5246/tls/cert_verification`
/// - `code://rust/myapp/client/tls/cert_verification`
pub struct ConceptIndex {
entries: HashMap<String, Vec<Assertion>>,
}
impl ConceptIndex {
/// Build a ConceptIndex from a slice of assertions.
pub fn build(assertions: &[Assertion]) -> Self {
// Pre-allocate based on expected unique keys
let mut entries: HashMap<String, Vec<Assertion>> = HashMap::with_capacity(assertions.len());
for assertion in assertions {
if let Some(key) = Self::make_key(&assertion.subject, &assertion.predicate) {
entries.entry(key).or_default().push(assertion.clone());
}
}
Self { entries }
}
/// Look up assertions matching the tail segments of a subject and predicate.
pub fn lookup(&self, subject: &str, predicate: &str) -> Option<&Vec<Assertion>> {
let key = Self::make_key(subject, predicate)?;
self.entries.get(&key)
}
/// Create a lookup key from subject and predicate.
///
/// Algorithm:
/// 1. Split subject on `"://"`, take path part
/// 2. Split path on `"/"` in reverse, get last 2 non-empty segments
/// 3. If < 2 segments, return None
/// 4. Return `"{seg[-2]}/{seg[-1]}::{predicate}"`
pub fn make_key(subject: &str, predicate: &str) -> Option<String> {
// Split on "://" to separate scheme from path
let path = subject.find("://").map(|i| &subject[i + 3..]).unwrap_or(subject);
// Get last two non-empty segments using rsplit (avoids Vec allocation)
let mut segments = path.rsplit('/').filter(|s| !s.is_empty());
let tail2 = segments.next()?;
let tail1 = segments.next()?;
Some(format!("{}/{}::{}", tail1, tail2, predicate))
}
}
/// Local Episteme instance for Aphoria.
pub struct LocalEpisteme {
journal: Arc<Mutex<Journal>>,
/// Store is owned by this struct but accessed via the Ingestor and AliasStore.
/// Keeping a reference ensures the store outlives dependent structs.
#[allow(dead_code)]
store: Arc<HybridStore>,
ingestor: Ingestor<HybridStore>,
signing_key: SigningKey,
/// AliasStore for persisting cross-scheme aliases discovered during conflict detection.
alias_store: GenericAliasStore<Arc<HybridStore>>,
}
impl LocalEpisteme {
/// Open or create a local Episteme instance.
#[instrument(skip(config), fields(data_dir = %config.episteme.data_dir.display()))]
pub async fn open(config: &AphoriaConfig, project_root: &Path) -> Result<Self, AphoriaError> {
let data_dir = &config.episteme.data_dir;
// Create directories if needed
std::fs::create_dir_all(data_dir)?;
// Canonicalize paths (required by fjall/lsm-tree)
let data_dir = data_dir.canonicalize().map_err(|e| {
AphoriaError::Storage(format!("Failed to canonicalize data_dir: {}", e))
})?;
let wal_dir = data_dir.join("wal");
let store_dir = data_dir.join("store");
std::fs::create_dir_all(&wal_dir)?;
std::fs::create_dir_all(&store_dir)?;
info!("Opening local Episteme at {}", data_dir.display());
// Open WAL
let journal = Arc::new(Mutex::new(
Journal::open(&wal_dir).map_err(|e| AphoriaError::Storage(e.to_string()))?,
));
// Open store
let store = Arc::new(
HybridStore::open(&store_dir).map_err(|e| AphoriaError::Storage(e.to_string()))?,
);
// Create ingestor
let mut ingestor = Ingestor::new(journal.clone(), store.clone())
.await
.map_err(|e| AphoriaError::Storage(e.to_string()))?;
ingestor.start();
// Load or generate signing key
let signing_key =
load_or_generate_key(project_root).map_err(|e| AphoriaError::Storage(e.to_string()))?;
// Create alias store for auto-alias persistence
let alias_store = GenericAliasStore::new(store.clone());
Ok(Self { journal, store, ingestor, signing_key, alias_store })
}
/// Ingest a batch of extracted claims into Episteme.
#[instrument(skip(self, claims), fields(claim_count = claims.len()))]
pub async fn ingest_claims(&self, claims: &[ExtractedClaim]) -> Result<usize, AphoriaError> {
let timestamp = current_timestamp();
let mut ingested = 0;
for claim in claims {
let assertion = claim_to_assertion(claim, &self.signing_key, timestamp);
// Serialize and write to WAL
let record_bytes = serialize_assertion(&assertion)
.map_err(|e| AphoriaError::Storage(e.to_string()))?;
let mut journal = self.journal.lock().await;
journal.append(record_bytes).map_err(|e| AphoriaError::Storage(e.to_string()))?;
debug!(
concept_path = %claim.concept_path,
predicate = %claim.predicate,
"Ingested claim"
);
ingested += 1;
}
// Sync WAL
{
let mut journal = self.journal.lock().await;
journal.force_sync().map_err(|e| AphoriaError::Storage(e.to_string()))?;
}
// Wait for ingestion to process
self.ingestor.process_pending().await.map_err(|e| AphoriaError::Storage(e.to_string()))?;
info!(ingested, "Ingested claims into Episteme");
Ok(ingested)
}
/// Check for conflicts between extracted claims and authoritative sources.
///
/// Uses tail-path matching via `ConceptIndex` to find conflicts across different
/// URI schemes. For example, a code claim at `code://rust/myapp/tls/cert_verification`
/// will match authoritative assertions at `rfc://5246/tls/cert_verification`.
///
/// When `config.aliases.auto_create_aliases` is enabled, this method will
/// automatically persist aliases for matched concepts, enabling faster future
/// queries via `QueryEngine` with `resolve_aliases: true`.
#[instrument(skip(self, claims, config, index), fields(claim_count = claims.len()))]
pub async fn check_conflicts(
&self,
claims: &[ExtractedClaim],
config: &AphoriaConfig,
index: &ConceptIndex,
) -> Result<Vec<ConflictResult>, AphoriaError> {
let mut results = Vec::new();
let mut aliases_created = 0usize;
let timestamp = current_timestamp();
let agent_id = self.agent_id();
for claim in claims {
// Look up authoritative assertions matching this claim's tail path
let auth_assertions = match index.lookup(&claim.concept_path, &claim.predicate) {
Some(assertions) => assertions,
None => continue, // No authoritative coverage for this concept
};
// Find conflicting authoritative sources
let mut conflicts = Vec::new();
for assertion in auth_assertions {
// Skip if it's our own assertion (same source class)
if assertion.source_class == SourceClass::Expert {
continue;
}
// Auto-create alias if enabled (regardless of value conflict)
// This bridges the code path to the authoritative path for future queries
if config.aliases.auto_create_aliases {
if let Err(e) = self
.create_alias_if_new(
&claim.concept_path,
&assertion.subject,
agent_id,
timestamp,
)
.await
{
warn!(
code_path = %claim.concept_path,
auth_path = %assertion.subject,
error = %e,
"Failed to create alias"
);
} else {
aliases_created += 1;
}
}
// Check if value differs (for conflict reporting)
if assertion.object != claim.value {
// Only consider Tier 0-2 as authoritative
if assertion.source_class.tier() <= 2 {
conflicts.push(ConflictingSource {
path: assertion.subject.clone(),
source_class: assertion.source_class,
value: assertion.object.clone(),
confidence: assertion.confidence,
});
}
}
}
if conflicts.is_empty() {
continue;
}
// Compute conflict score
let conflict_score = compute_conflict_score(&conflicts, claim.confidence);
// Determine verdict
let verdict = if conflict_score >= config.thresholds.block {
Verdict::Block
} else if conflict_score >= config.thresholds.flag {
Verdict::Flag
} else {
Verdict::Pass
};
results.push(ConflictResult {
claim: claim.clone(),
conflicts,
conflict_score,
verdict,
acknowledged: None,
});
}
info!(
conflicts = results.len(),
blocks = results.iter().filter(|r| r.verdict == Verdict::Block).count(),
flags = results.iter().filter(|r| r.verdict == Verdict::Flag).count(),
aliases_created,
"Conflict check complete"
);
Ok(results)
}
/// Ingest authoritative assertions (RFC, OWASP, etc.).
#[instrument(skip(self, assertions), fields(count = assertions.len()))]
pub async fn ingest_authoritative(
&self,
assertions: &[Assertion],
) -> Result<usize, AphoriaError> {
let mut ingested = 0;
for assertion in assertions {
let record_bytes =
serialize_assertion(assertion).map_err(|e| AphoriaError::Storage(e.to_string()))?;
let mut journal = self.journal.lock().await;
journal.append(record_bytes).map_err(|e| AphoriaError::Storage(e.to_string()))?;
ingested += 1;
}
// Sync and process
{
let mut journal = self.journal.lock().await;
journal.force_sync().map_err(|e| AphoriaError::Storage(e.to_string()))?;
}
self.ingestor.process_pending().await.map_err(|e| AphoriaError::Storage(e.to_string()))?;
info!(ingested, "Ingested authoritative assertions");
Ok(ingested)
}
/// Shut down the Episteme instance gracefully.
pub async fn shutdown(&mut self) {
info!("Shutting down local Episteme");
self.ingestor.shutdown(std::time::Duration::from_secs(2)).await;
}
/// Get the signing key's public key bytes for alias creation.
pub fn agent_id(&self) -> [u8; 32] {
self.signing_key.verifying_key().to_bytes()
}
/// Create an alias from a code path to an authoritative path, if it doesn't already exist.
///
/// This is used during conflict detection to persist the relationship between
/// code concepts and their authoritative counterparts.
#[instrument(skip(self), fields(code_path = %code_path, auth_path = %auth_path))]
async fn create_alias_if_new(
&self,
code_path: &str,
auth_path: &str,
agent_id: [u8; 32],
timestamp: u64,
) -> Result<(), AphoriaError> {
// Check if alias already exists
let existing = self
.alias_store
.get_canonical(code_path)
.await
.map_err(|e| AphoriaError::Storage(e.to_string()))?;
if existing.is_some() {
debug!("Alias already exists, skipping");
return Ok(());
}
// Parse paths
let alias_path = ConceptPath::parse(code_path)
.map_err(|e| AphoriaError::Storage(format!("Invalid code path: {}", e)))?;
let canonical_path = ConceptPath::parse(auth_path)
.map_err(|e| AphoriaError::Storage(format!("Invalid auth path: {}", e)))?;
// Create and persist alias
let alias = ConceptAlias::new(
alias_path,
canonical_path,
agent_id,
timestamp,
AliasOrigin::AutoDetected,
);
self.alias_store
.set_alias(&alias)
.await
.map_err(|e| AphoriaError::Storage(e.to_string()))?;
debug!("Created auto-detected alias");
Ok(())
}
/// Get a reference to the alias store for querying created aliases.
#[allow(dead_code)]
pub fn alias_store(&self) -> &GenericAliasStore<Arc<HybridStore>> {
&self.alias_store
}
}
/// Compute conflict score based on authoritative sources and claim confidence.
///
/// The score uses two approaches and takes the maximum:
///
/// 1. **Boosted score**: `max_tier_weight * (1.0 - code_weight) * max_confidence`
/// where code_weight = Expert (Tier 3) = 0.5. This is low unless the
/// authoritative source has very high authority weight.
///
/// 2. **Normalized score**: Linear mapping from tier distance to score:
/// - Tier 0 (Regulatory) vs code → 0.95 (above BLOCK threshold 0.7)
/// - Tier 1 (Clinical) vs code → 0.77 (above BLOCK threshold 0.7)
/// - Tier 2 (Observational) vs code → 0.58 (above FLAG threshold 0.4)
/// - Tier 3 (same tier) vs code → 0.40 (at FLAG threshold)
///
/// The final score is capped at 1.0.
fn compute_conflict_score(conflicts: &[ConflictingSource], _claim_confidence: f32) -> f32 {
if conflicts.is_empty() {
return 0.0;
}
// Get max tier weight from conflicting sources
let max_tier_weight = conflicts
.iter()
.map(|c| c.source_class.authority_weight())
.max_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal))
.unwrap_or(0.0);
// Code claims are Expert (Tier 3) = 0.5 weight
let code_weight = SourceClass::Expert.authority_weight();
// Base conflict score from tier spread
let base_score = max_tier_weight * (1.0 - code_weight);
// Boost by authoritative source confidence
let max_confidence = conflicts
.iter()
.map(|c| c.confidence)
.max_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal))
.unwrap_or(1.0);
let boosted_score = base_score * max_confidence;
// Normalize: tier spread 0→3 maps to 0.4→0.95
let min_tier = conflicts.iter().map(|c| c.source_class.tier()).min().unwrap_or(3) as f32;
let normalized = 0.4 + (3.0 - min_tier) / 3.0 * 0.55;
normalized.max(boosted_score).min(1.0)
}