//! Local Episteme instance for persistent storage and alias management. //! //! Provides ingestion, conflict checking, and auto-alias creation backed by //! write-ahead log and KV store. use std::path::Path; use std::sync::Arc; use ed25519_dalek::SigningKey; use stemedb_core::types::{AliasOrigin, Assertion, ConceptAlias, ConceptPath, SourceClass}; use stemedb_ingest::{serialize_assertion, Ingestor}; use stemedb_storage::{ AliasStore, GenericAliasStore, GenericPredicateIndexStore, HybridStore, KVStore, PredicateIndexStore, }; use stemedb_wal::Journal; use tokio::sync::Mutex; use tracing::{debug, info, instrument, warn}; use crate::bridge::{claim_to_assertion, load_or_generate_key}; use crate::config::AphoriaConfig; use crate::types::{ConflictResult, ConflictingSource, ExtractedClaim, Verdict}; use crate::AphoriaError; use super::concept_index::ConceptIndex; use super::conflict::compute_conflict_score; use super::corpus::current_timestamp; /// Local Episteme instance for Aphoria. pub struct LocalEpisteme { journal: Arc>, /// Store is owned by this struct but accessed via the Ingestor and other stores. /// Keeping a reference ensures the store outlives dependent structs. store: Arc, ingestor: Ingestor, signing_key: SigningKey, /// AliasStore for persisting cross-scheme aliases discovered during conflict detection. alias_store: GenericAliasStore>, /// PredicateIndexStore for querying assertions by predicate (e.g., "acknowledged"). predicate_index_store: GenericPredicateIndexStore>, } impl LocalEpisteme { /// Open or create a local Episteme instance. #[instrument(skip(config), fields(data_dir = %config.episteme.data_dir.display()))] pub async fn open(config: &AphoriaConfig, project_root: &Path) -> Result { let data_dir = &config.episteme.data_dir; // Create directories if needed std::fs::create_dir_all(data_dir)?; // Canonicalize paths (required by fjall/lsm-tree) let data_dir = data_dir.canonicalize().map_err(|e| { AphoriaError::Storage(format!("Failed to canonicalize data_dir: {}", e)) })?; let wal_dir = data_dir.join("wal"); let store_dir = data_dir.join("store"); std::fs::create_dir_all(&wal_dir)?; std::fs::create_dir_all(&store_dir)?; info!("Opening local Episteme at {}", data_dir.display()); // Open WAL let journal = Arc::new(Mutex::new( Journal::open(&wal_dir).map_err(|e| AphoriaError::Storage(e.to_string()))?, )); // Open store let store = Arc::new( HybridStore::open(&store_dir).map_err(|e| AphoriaError::Storage(e.to_string()))?, ); // Create ingestor let mut ingestor = Ingestor::new(journal.clone(), store.clone()) .await .map_err(|e| AphoriaError::Storage(e.to_string()))?; ingestor.start(); // Load or generate signing key let signing_key = load_or_generate_key(project_root).map_err(|e| AphoriaError::Storage(e.to_string()))?; // Create alias store for auto-alias persistence let alias_store = GenericAliasStore::new(store.clone()); // Create predicate index store for predicate-based queries let predicate_index_store = GenericPredicateIndexStore::new(store.clone()); Ok(Self { journal, store, ingestor, signing_key, alias_store, predicate_index_store }) } /// Ingest a batch of extracted claims into Episteme. #[instrument(skip(self, claims), fields(claim_count = claims.len()))] pub async fn ingest_claims(&self, claims: &[ExtractedClaim]) -> Result { let timestamp = current_timestamp(); let mut ingested = 0; // Collect claims with "acknowledged" predicate for predicate index let mut acknowledged_claims = Vec::new(); for claim in claims { let assertion = claim_to_assertion(claim, &self.signing_key, timestamp); // Serialize and write to WAL let record_bytes = serialize_assertion(&assertion) .map_err(|e| AphoriaError::Storage(e.to_string()))?; // Compute hash for predicate indexing (same as Ingestor uses) let hash = *blake3::hash(&record_bytes[8..]).as_bytes(); // Skip 8-byte header let mut journal = self.journal.lock().await; journal.append(record_bytes).map_err(|e| AphoriaError::Storage(e.to_string()))?; // Track acknowledged claims for predicate index update if claim.predicate == "acknowledged" { acknowledged_claims.push(hash); } debug!( concept_path = %claim.concept_path, predicate = %claim.predicate, "Ingested claim" ); ingested += 1; } // Sync WAL { let mut journal = self.journal.lock().await; journal.force_sync().map_err(|e| AphoriaError::Storage(e.to_string()))?; } // Wait for ingestion to process self.ingestor.process_pending().await.map_err(|e| AphoriaError::Storage(e.to_string()))?; // Update predicate index for acknowledged claims for hash in acknowledged_claims { if let Err(e) = self.predicate_index_store.add_to_predicate_index("acknowledged", &hash).await { warn!(hash = %hex::encode(hash), error = %e, "Failed to add to predicate index"); } } info!(ingested, "Ingested claims into Episteme"); Ok(ingested) } /// Check for conflicts between extracted claims and authoritative sources. /// /// Uses tail-path matching via `ConceptIndex` to find conflicts across different /// URI schemes. For example, a code claim at `code://rust/myapp/tls/cert_verification` /// will match authoritative assertions at `rfc://5246/tls/cert_verification`. /// /// When `config.aliases.auto_create_aliases` is enabled, this method will /// automatically persist aliases for matched concepts, enabling faster future /// queries via `QueryEngine` with `resolve_aliases: true`. #[instrument(skip(self, claims, config, index), fields(claim_count = claims.len()))] pub async fn check_conflicts( &self, claims: &[ExtractedClaim], config: &AphoriaConfig, index: &ConceptIndex, ) -> Result, AphoriaError> { let mut results = Vec::new(); let mut aliases_created = 0usize; let timestamp = current_timestamp(); let agent_id = self.agent_id(); for claim in claims { // Look up authoritative assertions matching this claim's tail path let auth_assertions = match index.lookup(&claim.concept_path, &claim.predicate) { Some(assertions) => assertions, None => continue, // No authoritative coverage for this concept }; // Find conflicting authoritative sources let mut conflicts = Vec::new(); for assertion in auth_assertions { // Skip if it's our own assertion (same source class) if assertion.source_class == SourceClass::Expert { continue; } // Auto-create alias if enabled (regardless of value conflict) // This bridges the code path to the authoritative path for future queries if config.aliases.auto_create_aliases { if let Err(e) = self .create_alias_if_new( &claim.concept_path, &assertion.subject, agent_id, timestamp, ) .await { warn!( code_path = %claim.concept_path, auth_path = %assertion.subject, error = %e, "Failed to create alias" ); } else { aliases_created += 1; } } // Check if value differs (for conflict reporting) if assertion.object != claim.value { // Only consider Tier 0-2 as authoritative if assertion.source_class.tier() <= 2 { let rfc_citation = ConflictingSource::extract_citation(&assertion.subject); conflicts.push(ConflictingSource { path: assertion.subject.clone(), source_class: assertion.source_class, value: assertion.object.clone(), confidence: assertion.confidence, rfc_citation, }); } } } if conflicts.is_empty() { continue; } // Compute conflict score let conflict_score = compute_conflict_score(&conflicts, claim.confidence); // Determine verdict let verdict = if conflict_score >= config.thresholds.block { Verdict::Block } else if conflict_score >= config.thresholds.flag { Verdict::Flag } else { Verdict::Pass }; results.push(ConflictResult { claim: claim.clone(), conflicts, conflict_score, verdict, acknowledged: None, trace: None, // Persistent mode doesn't populate traces (for now) }); } info!( conflicts = results.len(), blocks = results.iter().filter(|r| r.verdict == Verdict::Block).count(), flags = results.iter().filter(|r| r.verdict == Verdict::Flag).count(), aliases_created, "Conflict check complete" ); Ok(results) } /// Ingest authoritative assertions (RFC, OWASP, etc.). #[instrument(skip(self, assertions), fields(count = assertions.len()))] pub async fn ingest_authoritative( &self, assertions: &[Assertion], ) -> Result { let mut ingested = 0; for assertion in assertions { let record_bytes = serialize_assertion(assertion).map_err(|e| AphoriaError::Storage(e.to_string()))?; let mut journal = self.journal.lock().await; journal.append(record_bytes).map_err(|e| AphoriaError::Storage(e.to_string()))?; ingested += 1; } // Sync and process { let mut journal = self.journal.lock().await; journal.force_sync().map_err(|e| AphoriaError::Storage(e.to_string()))?; } self.ingestor.process_pending().await.map_err(|e| AphoriaError::Storage(e.to_string()))?; info!(ingested, "Ingested authoritative assertions"); Ok(ingested) } /// Fetch all acknowledgment assertions. /// /// Returns all assertions with predicate "acknowledged" for policy export. /// These are conflicts that have been reviewed and marked as intentional. pub async fn fetch_acknowledgments(&self) -> Result, AphoriaError> { // Use predicate index to find all "acknowledged" assertions let hashes = self .predicate_index_store .get_by_predicate("acknowledged") .await .map_err(|e| AphoriaError::Storage(e.to_string()))?; let mut assertions = Vec::new(); // Load each assertion from the store using the hash-to-subject reverse index for hash in hashes { let hash_hex = hex::encode(hash); // Look up subject from reverse index let reverse_key = stemedb_storage::key_codec::hash_subject_key(&hash_hex); let subject = match self.store.get(&reverse_key).await { Ok(Some(bytes)) => match String::from_utf8(bytes) { Ok(s) => s, Err(e) => { warn!(hash = %hash_hex, error = %e, "Invalid UTF-8 in reverse index"); continue; } }, Ok(None) => { warn!(hash = %hash_hex, "No reverse index entry for assertion"); continue; } Err(e) => { warn!(hash = %hash_hex, error = %e, "Failed to read reverse index"); continue; } }; // Load assertion using subject + hash let assertion_key = stemedb_storage::key_codec::assertion_key(&subject, &hash_hex); match self.store.get(&assertion_key).await { Ok(Some(bytes)) => match stemedb_core::serde::deserialize::(&bytes) { Ok(assertion) => assertions.push(assertion), Err(e) => { warn!(hash = %hash_hex, error = %e, "Failed to deserialize assertion"); } }, Ok(None) => { warn!(hash = %hash_hex, "Assertion not found in store"); } Err(e) => { warn!(hash = %hash_hex, error = %e, "Failed to read assertion"); } } } info!(count = assertions.len(), "Fetched acknowledgment assertions"); Ok(assertions) } /// Fetch manual aliases for policy export. /// /// Returns all aliases stored in the local Episteme instance. /// These can be auto-detected aliases from conflict detection or /// manually created aliases. pub async fn fetch_manual_aliases(&self) -> Result, AphoriaError> { let alias_tuples = self .alias_store .list_all_aliases() .await .map_err(|e| AphoriaError::Storage(e.to_string()))?; let timestamp = current_timestamp(); let agent_id = self.agent_id(); // Convert (alias_str, canonical_str) tuples to ConceptAlias structs let aliases = alias_tuples .into_iter() .filter_map(|(alias_str, canonical_str)| { let alias_path = ConceptPath::parse(&alias_str).ok()?; let canonical_path = ConceptPath::parse(&canonical_str).ok()?; Some(ConceptAlias::new( alias_path, canonical_path, agent_id, timestamp, AliasOrigin::Manual, // Treat all exported aliases as manual )) }) .collect(); Ok(aliases) } /// Shut down the Episteme instance gracefully. pub async fn shutdown(&mut self) { info!("Shutting down local Episteme"); self.ingestor.shutdown(std::time::Duration::from_secs(2)).await; } /// Get the signing key's public key bytes for alias creation. pub fn agent_id(&self) -> [u8; 32] { self.signing_key.verifying_key().to_bytes() } /// Create an alias from a code path to an authoritative path, if it doesn't already exist. /// /// This is used during conflict detection to persist the relationship between /// code concepts and their authoritative counterparts. #[instrument(skip(self), fields(code_path = %code_path, auth_path = %auth_path))] async fn create_alias_if_new( &self, code_path: &str, auth_path: &str, agent_id: [u8; 32], timestamp: u64, ) -> Result<(), AphoriaError> { // Check if alias already exists let existing = self .alias_store .get_canonical(code_path) .await .map_err(|e| AphoriaError::Storage(e.to_string()))?; if existing.is_some() { debug!("Alias already exists, skipping"); return Ok(()); } // Parse paths let alias_path = ConceptPath::parse(code_path) .map_err(|e| AphoriaError::Storage(format!("Invalid code path: {}", e)))?; let canonical_path = ConceptPath::parse(auth_path) .map_err(|e| AphoriaError::Storage(format!("Invalid auth path: {}", e)))?; // Create and persist alias let alias = ConceptAlias::new( alias_path, canonical_path, agent_id, timestamp, AliasOrigin::AutoDetected, ); self.alias_store .set_alias(&alias) .await .map_err(|e| AphoriaError::Storage(e.to_string()))?; debug!("Created auto-detected alias"); Ok(()) } /// Get a reference to the alias store for querying created aliases. #[allow(dead_code)] pub fn alias_store(&self) -> &GenericAliasStore> { &self.alias_store } /// Get a reference to the underlying KV store. /// /// Used for direct storage operations like importing policies. pub fn store(&self) -> &Arc { &self.store } }