//! Drift detection for local Episteme operations. //! //! Tracks changes between current code claims and prior observations. use stemedb_core::types::Assertion; use tracing::{debug, info, instrument}; use crate::types::{predicates, DriftResult, Observation, Verdict}; use crate::AphoriaError; use super::helpers::assertion_to_prior_observation; use super::local::LocalEpisteme; impl LocalEpisteme { /// Check for drift between current claims and prior observations. /// /// A drift is detected when a claim's value differs from a previously /// recorded observation for the same concept path. Only claims that /// have no authority conflict should be passed here. /// /// Returns a list of drift results for claims whose values changed. #[instrument(skip(self, claims), fields(claim_count = claims.len()))] pub async fn check_drift( &self, claims: &[Observation], ) -> Result, AphoriaError> { let mut drifts = Vec::new(); for claim in claims { // Look up prior observations for this concept let observations = self.fetch_observations_for_concept(&claim.concept_path).await?; // If there's a prior observation, check if the value changed if let Some(prior_assertion) = observations.first() { // Value differs - this is drift if prior_assertion.object != claim.value { let prior = assertion_to_prior_observation(prior_assertion); drifts.push(DriftResult { claim: claim.clone(), prior, verdict: Verdict::Drift, }); debug!( concept_path = %claim.concept_path, prior_value = ?prior_assertion.object, current_value = ?claim.value, "Drift detected" ); } } } info!(drifts = drifts.len(), "Drift check complete"); Ok(drifts) } /// Fetch prior observations for a specific concept path. /// /// Returns observations sorted by timestamp descending (most recent first). #[instrument(skip(self), fields(concept_path = %concept_path))] pub async fn fetch_observations_for_concept( &self, concept_path: &str, ) -> Result, AphoriaError> { use stemedb_storage::PredicateIndexStore; // Get all observation hashes from the predicate index let hashes = self.predicate_index_store.get_by_predicate(predicates::OBSERVATION).await.map_err( |e| { AphoriaError::Storage(format!( "Failed to get observation hashes for predicate index: {e}" )) }, )?; let mut observations = Vec::new(); for hash in hashes { if let Some(assertion) = self.load_assertion_by_hash(&hash).await { // Check if this observation is for the same concept (subject match) // The observation's subject should match the concept_path if assertion.subject == concept_path { observations.push(assertion); } } } // Sort by timestamp descending (most recent first) observations.sort_by(|a, b| b.timestamp.cmp(&a.timestamp)); debug!(concept_path, count = observations.len(), "Fetched observations for concept"); Ok(observations) } }