Add CRC32C checksums to WAL record format (v2), implement crash recovery with automatic truncation of corrupt records, add feature-gated group commit buffer for batched fsync under concurrent load, and implement log rotation via segment files with global offset addressing. Key changes: - Record format v2: [len:u32][crc32c:u32][blake3:32][payload:N] - recover_file() scans and truncates corrupt tail records - GroupCommitBuffer batches fsync via MPSC channel (tokio feature gate) - SegmentManager with binary search resolution and cursor-based cleanup - Journal::read() auto-refreshes segments on miss for writer/reader split - Split recovery.rs and key_codec.rs into directory modules for 500-line max Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
235 lines
8.7 KiB
Rust
235 lines
8.7 KiB
Rust
//! Skeptic View builder for "Trust but Verify" queries.
|
|
//!
|
|
//! While `QueryEngine` returns assertions and `Materializer` creates single-winner
|
|
//! views, `SkepticResolver` builds conflict analysis views that surface disagreement.
|
|
//!
|
|
//! # Use Case
|
|
//!
|
|
//! A medical researcher queries "Semaglutide muscle_effect" and instead of getting
|
|
//! a single answer, they see:
|
|
//! - Status: Contested (conflict_score: 0.72)
|
|
//! - Claim A: "Significant loss" (45% support)
|
|
//! - Claim B: "Minimal loss" (35% support)
|
|
//! - Claim C: "No effect" (20% support)
|
|
//!
|
|
//! This enables transparent decision-making where users understand the certainty
|
|
//! of the data they're using.
|
|
//!
|
|
//! # Example
|
|
//!
|
|
//! ```ignore
|
|
//! use stemedb_query::SkepticResolver;
|
|
//!
|
|
//! let resolver = SkepticResolver::new(store, vote_store, trust_store);
|
|
//! let view = resolver.resolve("Semaglutide", "muscle_effect").await?;
|
|
//!
|
|
//! if view.status == ResolutionStatus::Contested {
|
|
//! println!("⚠️ This fact is disputed!");
|
|
//! for claim in &view.claims {
|
|
//! println!(" {} - {:.0}% support", claim.value, claim.weight_share * 100.0);
|
|
//! }
|
|
//! }
|
|
//! ```
|
|
|
|
use crate::error::Result;
|
|
use std::sync::Arc;
|
|
use stemedb_core::types::{ConflictAnalysis, EntityId, RelationId};
|
|
use stemedb_lens::{AnalysisLens, SkepticLens};
|
|
use stemedb_storage::trust_rank_store::TrustRankStore;
|
|
use stemedb_storage::vote_store::VoteStore;
|
|
use stemedb_storage::{key_codec, GenericIndexStore, IndexStore, KVStore};
|
|
use tracing::instrument;
|
|
|
|
/// A "Trust but Verify" view that shows disagreement instead of hiding it.
|
|
///
|
|
/// Where `MaterializedView` answers "What is the answer?",
|
|
/// `SkepticView` answers "What are the competing claims and how much do they disagree?"
|
|
#[derive(Debug, Clone)]
|
|
pub struct SkepticView {
|
|
/// The subject that was queried.
|
|
pub subject: EntityId,
|
|
/// The predicate that was queried.
|
|
pub predicate: RelationId,
|
|
/// The conflict analysis from SkepticLens.
|
|
pub analysis: ConflictAnalysis,
|
|
/// Unix timestamp when this view was computed.
|
|
pub computed_at: u64,
|
|
/// Which lens was used (always "Skeptic" for now).
|
|
pub lens_name: String,
|
|
}
|
|
|
|
/// Resolves subject+predicate pairs into SkepticViews.
|
|
///
|
|
/// Unlike the `Materializer` which picks winners, `SkepticResolver` surfaces
|
|
/// all competing claims with their relative support.
|
|
pub struct SkepticResolver<S, V, T> {
|
|
store: Arc<S>,
|
|
index_store: GenericIndexStore<Arc<S>>,
|
|
lens: SkepticLens<V, T>,
|
|
}
|
|
|
|
impl<S, V, T> SkepticResolver<S, V, T>
|
|
where
|
|
S: KVStore + 'static,
|
|
V: VoteStore + 'static,
|
|
T: TrustRankStore + 'static,
|
|
{
|
|
/// Create a new SkepticResolver.
|
|
pub fn new(store: Arc<S>, vote_store: Arc<V>, trust_store: Arc<T>) -> Self {
|
|
let index_store = GenericIndexStore::new(store.clone());
|
|
let lens = SkepticLens::new(vote_store, trust_store);
|
|
Self { store, index_store, lens }
|
|
}
|
|
|
|
/// Resolve a subject+predicate pair into a SkepticView.
|
|
///
|
|
/// Returns `None` if no assertions exist for the given subject+predicate.
|
|
#[instrument(skip(self), fields(subject = %subject, predicate = %predicate))]
|
|
pub async fn resolve(&self, subject: &str, predicate: &str) -> Result<Option<SkepticView>> {
|
|
// Fetch all candidate assertions using the compound index
|
|
let hash_list = self.index_store.get_by_subject_predicate(subject, predicate).await?;
|
|
|
|
if hash_list.is_empty() {
|
|
return Ok(None);
|
|
}
|
|
|
|
// Load all assertions
|
|
let mut candidates = Vec::with_capacity(hash_list.len());
|
|
for hash in hash_list {
|
|
let key = key_codec::assertion_key(subject, &hex::encode(hash));
|
|
if let Some(data) = self.store.get(&key).await? {
|
|
if let Ok(assertion) = stemedb_core::serde::deserialize(&data) {
|
|
candidates.push(assertion);
|
|
}
|
|
}
|
|
}
|
|
|
|
if candidates.is_empty() {
|
|
return Ok(None);
|
|
}
|
|
|
|
// Run the SkepticLens
|
|
let analysis = self.lens.analyze(&candidates).await;
|
|
|
|
Ok(Some(SkepticView {
|
|
subject: subject.to_string(),
|
|
predicate: predicate.to_string(),
|
|
analysis,
|
|
computed_at: std::time::SystemTime::now()
|
|
.duration_since(std::time::UNIX_EPOCH)
|
|
.map(|d| d.as_secs())
|
|
.unwrap_or(0),
|
|
lens_name: self.lens.name().to_string(),
|
|
}))
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use stemedb_core::testing::AssertionBuilder;
|
|
use stemedb_core::types::ResolutionStatus;
|
|
use stemedb_storage::{GenericTrustRankStore, GenericVoteStore, HybridStore};
|
|
|
|
async fn store_assertion(
|
|
store: &Arc<HybridStore>,
|
|
index_store: &GenericIndexStore<Arc<HybridStore>>,
|
|
subject: &str,
|
|
predicate: &str,
|
|
value: f64,
|
|
confidence: f32,
|
|
) {
|
|
let assertion = AssertionBuilder::new()
|
|
.subject(subject)
|
|
.predicate(predicate)
|
|
.object_number(value)
|
|
.confidence(confidence)
|
|
.build();
|
|
|
|
let bytes = stemedb_core::serde::serialize(&assertion).expect("serialize");
|
|
let hash = blake3::hash(&bytes);
|
|
let key = key_codec::assertion_key(subject, &hash.to_hex());
|
|
store.put(&key, &bytes).await.expect("put");
|
|
|
|
let assertion_hash: [u8; 32] = *hash.as_bytes();
|
|
index_store.add_to_indexes(subject, predicate, &assertion_hash).await.expect("index");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_resolve_empty() {
|
|
let store = Arc::new(HybridStore::open_temp().expect("store"));
|
|
let vote_store = Arc::new(GenericVoteStore::new(store.clone()));
|
|
let trust_store = Arc::new(GenericTrustRankStore::new(store.clone()));
|
|
let resolver = SkepticResolver::new(store, vote_store, trust_store);
|
|
|
|
let result = resolver.resolve("NonExistent", "predicate").await.expect("resolve");
|
|
assert!(result.is_none());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_resolve_single_claim() {
|
|
let store = Arc::new(HybridStore::open_temp().expect("store"));
|
|
let index_store = GenericIndexStore::new(store.clone());
|
|
|
|
store_assertion(&store, &index_store, "Drug", "effect", 100.0, 0.9).await;
|
|
|
|
let vote_store = Arc::new(GenericVoteStore::new(store.clone()));
|
|
let trust_store = Arc::new(GenericTrustRankStore::new(store.clone()));
|
|
let resolver = SkepticResolver::new(store, vote_store, trust_store);
|
|
|
|
let result = resolver.resolve("Drug", "effect").await.expect("resolve");
|
|
assert!(result.is_some());
|
|
|
|
let view = result.expect("view");
|
|
assert_eq!(view.subject, "Drug");
|
|
assert_eq!(view.predicate, "effect");
|
|
assert_eq!(view.analysis.status, ResolutionStatus::Unanimous);
|
|
assert_eq!(view.analysis.claims.len(), 1);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_resolve_contested_claims() {
|
|
let store = Arc::new(HybridStore::open_temp().expect("store"));
|
|
let index_store = GenericIndexStore::new(store.clone());
|
|
|
|
// Add two conflicting claims with equal weight
|
|
store_assertion(&store, &index_store, "Drug", "effect", 100.0, 0.5).await;
|
|
store_assertion(&store, &index_store, "Drug", "effect", 200.0, 0.5).await;
|
|
|
|
let vote_store = Arc::new(GenericVoteStore::new(store.clone()));
|
|
let trust_store = Arc::new(GenericTrustRankStore::new(store.clone()));
|
|
let resolver = SkepticResolver::new(store, vote_store, trust_store);
|
|
|
|
let result = resolver.resolve("Drug", "effect").await.expect("resolve");
|
|
assert!(result.is_some());
|
|
|
|
let view = result.expect("view");
|
|
assert_eq!(view.analysis.status, ResolutionStatus::Contested);
|
|
assert_eq!(view.analysis.claims.len(), 2);
|
|
assert!(view.analysis.conflict_score > 0.9); // Near 1.0 for 50/50 split
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_resolve_includes_computed_at() {
|
|
let store = Arc::new(HybridStore::open_temp().expect("store"));
|
|
let index_store = GenericIndexStore::new(store.clone());
|
|
|
|
store_assertion(&store, &index_store, "Drug", "effect", 100.0, 0.9).await;
|
|
|
|
let vote_store = Arc::new(GenericVoteStore::new(store.clone()));
|
|
let trust_store = Arc::new(GenericTrustRankStore::new(store.clone()));
|
|
let resolver = SkepticResolver::new(store, vote_store, trust_store);
|
|
|
|
let result = resolver.resolve("Drug", "effect").await.expect("resolve");
|
|
let view = result.expect("view");
|
|
|
|
// computed_at should be recent (within last minute)
|
|
let now = std::time::SystemTime::now()
|
|
.duration_since(std::time::UNIX_EPOCH)
|
|
.expect("time")
|
|
.as_secs();
|
|
assert!(view.computed_at <= now);
|
|
assert!(view.computed_at > now - 60);
|
|
}
|
|
}
|