stemedb/crates/stemedb-query/tests/e2e_pipeline.rs
jordan c59066949a feat: Add quickstart "Beyond Hello World" sections with Skeptic and Layered endpoints
- Add Layered() method to Go SDK for per-source-class consensus queries
- Add LayeredQueryParams, LayeredResult, TierResolution types to Go SDK
- Create conflict example demonstrating Skeptic and Layered endpoints
- Update quickstart.md with sections 6 (conflict detection) and 7 (authority tiers)
- Remove tracked Go binary and add data/ to .gitignore

The new quickstart sections demonstrate Episteme's differentiating features:
- Skeptic endpoint shows "Trust but Verify" conflict analysis
- Layered endpoint shows per-tier resolution (Clinical vs Anecdotal)

Note: Pre-existing large files flagged by pre-commit hook (technical debt from prior sessions)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-01 21:00:59 -07:00

531 lines
21 KiB
Rust

//! E2E integration tests for the StemeDB pipeline.
//!
//! Validates the full data flow: WAL -> IngestWorker -> Materializer -> QueryEngine
//!
//! # Test Coverage
//!
//! | Test | Pipeline Stage | Validates |
//! |------|---------------|-----------|
//! | `test_e2e_write_materialize_read` | Full | Basic happy path |
//! | `test_e2e_vote_consensus` | Full + Votes | Vote-weighted resolution |
//! | `test_e2e_update_winner` | Full + Update | Winner changes on re-materialize |
//! | `test_e2e_cursor_persistence` | WAL + Ingest | Cursor survives worker restart |
//! | `test_e2e_notify_integration` | Ingest | Event-driven notification channel |
#![allow(clippy::expect_used)] // Test code uses expect() for clear failure messages
use ed25519_dalek::{Signer, SigningKey};
use rand::rngs::OsRng;
use std::sync::Arc;
use stemedb_core::serde::serialize;
use stemedb_core::testing::AssertionBuilder;
use stemedb_core::types::{Assertion, LifecycleStage, ObjectValue, SignatureEntry, Vote};
use stemedb_ingest::worker::{serialize_assertion, IngestWorker};
use stemedb_lens::{RecencyLens, SyncLensWrapper, VoteAwareConsensusLens};
use stemedb_query::{Materializer, Query, QueryEngine};
use stemedb_storage::{GenericVoteStore, KVStore, SledStore, VoteStore};
use stemedb_wal::Journal;
use tempfile::tempdir;
use tokio::sync::{Mutex, Notify};
// ============================================================================
// TEST HELPERS
// ============================================================================
/// Create a signed assertion with Ed25519 signature.
///
/// The signature signs the message `"{subject}:{predicate}"` which matches
/// IngestWorker's verification logic.
fn create_signed_assertion(
subject: &str,
predicate: &str,
value: f64,
timestamp: u64,
) -> Assertion {
let mut csprng = OsRng;
let signing_key = SigningKey::generate(&mut csprng);
let verifying_key = signing_key.verifying_key();
let message = format!("{}:{}", subject, predicate);
let signature = signing_key.sign(message.as_bytes());
AssertionBuilder::new()
.subject(subject)
.predicate(predicate)
.object_number(value)
.confidence(0.95)
.lifecycle(LifecycleStage::Proposed)
.timestamp(timestamp)
.signatures(vec![SignatureEntry {
agent_id: verifying_key.to_bytes(),
signature: signature.to_bytes(),
timestamp,
}])
.build()
}
/// Compute the content-addressed hash of an assertion.
///
/// Matches the hash computation in the ingestion pipeline.
fn compute_assertion_hash(assertion: &Assertion) -> [u8; 32] {
let bytes = serialize(assertion).expect("serialize assertion");
*blake3::hash(&bytes).as_bytes()
}
/// Create a test vote for an assertion.
fn create_vote(assertion_hash: [u8; 32], agent_idx: u8, weight: f32, timestamp: u64) -> Vote {
let mut agent_id = [0u8; 32];
agent_id[0] = agent_idx;
Vote { assertion_hash, agent_id, weight, signature: [0u8; 64], timestamp }
}
// ============================================================================
// E2E TESTS
// ============================================================================
/// Test 1: Basic pipeline - write, ingest, materialize, query.
///
/// Proves the fundamental flow works end-to-end:
/// 1. Write signed assertion to WAL
/// 2. IngestWorker reads from WAL and stores in KV
/// 3. Materializer creates MV:{subject}:{predicate}
/// 4. QueryEngine returns the assertion
#[tokio::test]
async fn test_e2e_write_materialize_read() {
let dir = tempdir().expect("create temp dir");
let wal_dir = dir.path().join("wal");
let db_dir = dir.path().join("db");
// === Step 1: Write assertion to WAL ===
let assertion = create_signed_assertion("Tesla_Inc", "has_revenue", 96.7, 1000);
let mut journal = Journal::open(&wal_dir).expect("open journal");
let payload = serialize_assertion(&assertion).expect("serialize");
journal.append(payload).expect("append to WAL");
// === Step 2: Run IngestWorker to process WAL ===
let journal = Arc::new(Mutex::new(journal));
let store = Arc::new(SledStore::open(&db_dir).expect("open store"));
let notify = Arc::new(Notify::new());
let mut worker = IngestWorker::new(journal.clone(), store.clone())
.await
.expect("create worker")
.with_notify(notify.clone());
let bytes_processed = worker.step().await.expect("ingest step");
assert!(bytes_processed > 0, "should have processed data from WAL");
// Verify assertion stored at H:{hash}
let assertion_hash = compute_assertion_hash(&assertion);
let h_key = format!("H:{}", hex::encode(assertion_hash)).into_bytes();
let stored = store.get(&h_key).await.expect("get assertion");
assert!(stored.is_some(), "assertion should be stored at H: key");
// Verify compound index SP:{subject}:{predicate} created
let sp_key = b"SP:Tesla_Inc:has_revenue";
let sp_entries = store.scan_prefix(sp_key).await.expect("scan SP: prefix");
assert_eq!(sp_entries.len(), 1, "should have one SP: index entry");
// === Step 3: Run Materializer ===
let vote_store = Arc::new(GenericVoteStore::new(store.clone()));
let lens = VoteAwareConsensusLens::new(vote_store);
let materializer = Materializer::new(store.clone(), Box::new(lens));
let report = materializer.step().await.expect("materialize step");
assert_eq!(report.pairs_scanned, 1, "should scan one subject+predicate pair");
assert_eq!(report.views_updated, 1, "should update one materialized view");
// Verify MV:{subject}:{predicate} written
let mv_key = b"MV:Tesla_Inc:has_revenue";
let mv_data = store.get(mv_key).await.expect("get MV");
assert!(mv_data.is_some(), "materialized view should exist");
// === Step 4: Query via QueryEngine ===
let engine = QueryEngine::new(store.clone());
let query = Query::builder().subject("Tesla_Inc").predicate("has_revenue").build();
let result = engine.execute(&query).await.expect("execute query");
assert_eq!(result.assertions.len(), 1, "query should return one assertion");
assert_eq!(result.assertions[0].subject, "Tesla_Inc");
assert_eq!(result.assertions[0].predicate, "has_revenue");
assert_eq!(result.assertions[0].object, ObjectValue::Number(96.7));
}
/// Test 2: Vote-weighted consensus.
///
/// Proves that VoteAwareConsensusLens selects the assertion with highest
/// aggregate vote weight:
/// 1. Write two assertions with same subject+predicate but different values
/// 2. Add votes favoring assertion_a (3 votes, weight 0.9 each)
/// 3. Add 1 vote for assertion_b (weight 0.2)
/// 4. Materializer picks assertion_a as winner
#[tokio::test]
async fn test_e2e_vote_consensus() {
let dir = tempdir().expect("create temp dir");
let wal_dir = dir.path().join("wal");
let db_dir = dir.path().join("db");
// Create two competing assertions
let assertion_a = create_signed_assertion("Semaglutide", "muscle_effect", -5.0, 1000);
let assertion_b = create_signed_assertion("Semaglutide", "muscle_effect", -15.0, 1100);
// Write both to WAL
let mut journal = Journal::open(&wal_dir).expect("open journal");
journal.append(serialize_assertion(&assertion_a).expect("ser")).expect("append");
journal.append(serialize_assertion(&assertion_b).expect("ser")).expect("append");
// Ingest both
let journal = Arc::new(Mutex::new(journal));
let store = Arc::new(SledStore::open(&db_dir).expect("open store"));
let mut worker =
IngestWorker::new(journal.clone(), store.clone()).await.expect("create worker");
let bytes1 = worker.step().await.expect("step 1");
assert!(bytes1 > 0, "should ingest first assertion");
let bytes2 = worker.step().await.expect("step 2");
assert!(bytes2 > 0, "should ingest second assertion");
// Verify both are stored
let h_entries = store.scan_prefix(b"H:").await.expect("scan H:");
assert_eq!(h_entries.len(), 2, "should have two assertions");
// Add votes via VoteStore
let vote_store = Arc::new(GenericVoteStore::new(store.clone()));
let hash_a = compute_assertion_hash(&assertion_a);
let hash_b = compute_assertion_hash(&assertion_b);
// assertion_a gets 3 votes (total weight = 2.7)
for i in 0..3 {
let vote = create_vote(hash_a, i, 0.9, 2000 + i as u64);
vote_store.put_vote(&vote).await.expect("put vote for a");
}
// assertion_b gets 1 vote (total weight = 0.2)
let vote_b = create_vote(hash_b, 10, 0.2, 2100);
vote_store.put_vote(&vote_b).await.expect("put vote for b");
// Materialize with VoteAwareConsensusLens
let lens = VoteAwareConsensusLens::new(Arc::clone(&vote_store));
let materializer = Materializer::new(store.clone(), Box::new(lens));
let report = materializer.step().await.expect("materialize");
assert_eq!(report.views_updated, 1);
// Query should return assertion_a (higher aggregate weight)
let engine = QueryEngine::new(store.clone());
let query = Query::builder().subject("Semaglutide").predicate("muscle_effect").build();
let result = engine.execute(&query).await.expect("query");
assert_eq!(result.assertions.len(), 1);
assert_eq!(
result.assertions[0].object,
ObjectValue::Number(-5.0),
"should return assertion_a (higher vote weight)"
);
}
/// Test 3: Winner changes after new assertion.
///
/// Proves that re-materialization updates the winner when new data arrives:
/// 1. Write assertion_v1 (timestamp=1000), ingest, materialize
/// 2. Query returns v1
/// 3. Write assertion_v2 (timestamp=2000, same subject+predicate)
/// 4. Re-ingest and re-materialize with RecencyLens
/// 5. Query returns v2 (newer timestamp wins)
#[tokio::test]
async fn test_e2e_update_winner() {
let dir = tempdir().expect("create temp dir");
let wal_dir = dir.path().join("wal");
let db_dir = dir.path().join("db");
// === Phase 1: Initial assertion ===
let assertion_v1 = create_signed_assertion("Apple_Inc", "stock_price", 150.0, 1000);
let mut journal = Journal::open(&wal_dir).expect("open journal");
journal.append(serialize_assertion(&assertion_v1).expect("ser")).expect("append v1");
let journal = Arc::new(Mutex::new(journal));
let store = Arc::new(SledStore::open(&db_dir).expect("open store"));
// Ingest v1
let mut worker = IngestWorker::new(journal.clone(), store.clone()).await.expect("worker");
let bytes = worker.step().await.expect("step v1");
assert!(bytes > 0);
// Materialize with RecencyLens (wrapped for AsyncLens trait)
let lens = SyncLensWrapper(RecencyLens);
let materializer = Materializer::new(store.clone(), Box::new(lens));
materializer.step().await.expect("materialize v1");
// Query returns v1
let engine = QueryEngine::new(store.clone());
let query = Query::builder().subject("Apple_Inc").predicate("stock_price").build();
let result = engine.execute(&query).await.expect("query v1");
assert_eq!(result.assertions.len(), 1);
assert_eq!(result.assertions[0].object, ObjectValue::Number(150.0));
assert_eq!(result.assertions[0].timestamp, 1000);
// === Phase 2: New assertion with higher timestamp ===
let assertion_v2 = create_signed_assertion("Apple_Inc", "stock_price", 175.0, 2000);
// Write to WAL
{
let mut journal = journal.lock().await;
journal.append(serialize_assertion(&assertion_v2).expect("ser")).expect("append v2");
}
// Re-ingest (worker resumes from cursor)
let mut worker2 = IngestWorker::new(journal.clone(), store.clone()).await.expect("worker2");
let bytes2 = worker2.step().await.expect("step v2");
assert!(bytes2 > 0, "should process new assertion");
// Verify both assertions are now stored
let h_entries = store.scan_prefix(b"H:").await.expect("scan");
assert_eq!(h_entries.len(), 2, "should have two assertions");
// Re-materialize
let lens2 = SyncLensWrapper(RecencyLens);
let materializer2 = Materializer::new(store.clone(), Box::new(lens2));
materializer2.step().await.expect("materialize v2");
// Query returns v2 (newer timestamp wins with RecencyLens)
let result2 = engine.execute(&query).await.expect("query v2");
assert_eq!(result2.assertions.len(), 1);
assert_eq!(
result2.assertions[0].object,
ObjectValue::Number(175.0),
"should return v2 (more recent)"
);
assert_eq!(result2.assertions[0].timestamp, 2000);
}
/// Test: Cursor persistence across worker restarts.
///
/// Validates that the IngestWorker correctly resumes from the last cursor,
/// avoiding replay of already-processed records.
#[tokio::test]
async fn test_e2e_cursor_persistence() {
let dir = tempdir().expect("create temp dir");
let wal_dir = dir.path().join("wal");
let db_dir = dir.path().join("db");
// Write 3 assertions to WAL
let a1 = create_signed_assertion("Entity_A", "prop", 1.0, 1000);
let a2 = create_signed_assertion("Entity_B", "prop", 2.0, 2000);
let a3 = create_signed_assertion("Entity_C", "prop", 3.0, 3000);
let mut journal = Journal::open(&wal_dir).expect("open journal");
journal.append(serialize_assertion(&a1).expect("ser")).expect("append");
journal.append(serialize_assertion(&a2).expect("ser")).expect("append");
journal.append(serialize_assertion(&a3).expect("ser")).expect("append");
let journal = Arc::new(Mutex::new(journal));
let store = Arc::new(SledStore::open(&db_dir).expect("open store"));
// Worker 1: Process first 2 assertions
let mut worker1 = IngestWorker::new(journal.clone(), store.clone()).await.expect("worker1");
worker1.step().await.expect("step 1");
worker1.step().await.expect("step 2");
// Verify 2 assertions stored
let h_entries = store.scan_prefix(b"H:").await.expect("scan");
assert_eq!(h_entries.len(), 2, "worker1 should have processed 2 assertions");
// Drop worker1, simulate restart
drop(worker1);
// Worker 2: Should resume from cursor and only process the third assertion
let mut worker2 = IngestWorker::new(journal.clone(), store.clone()).await.expect("worker2");
let mut steps = 0;
while worker2.step().await.expect("step") > 0 {
steps += 1;
}
assert_eq!(steps, 1, "worker2 should only process 1 new assertion");
// Verify all 3 assertions now stored
let h_entries = store.scan_prefix(b"H:").await.expect("scan");
assert_eq!(h_entries.len(), 3, "should have all 3 assertions");
}
/// Test: Event-driven materialization via Notify.
///
/// Validates that the IngestWorker signals downstream consumers (Materializer)
/// when new data is available.
#[tokio::test]
async fn test_e2e_notify_integration() {
let dir = tempdir().expect("create temp dir");
let wal_dir = dir.path().join("wal");
let db_dir = dir.path().join("db");
let assertion = create_signed_assertion("Tesla_Inc", "deliveries", 500000.0, 1000);
let mut journal = Journal::open(&wal_dir).expect("open journal");
journal.append(serialize_assertion(&assertion).expect("ser")).expect("append");
let journal = Arc::new(Mutex::new(journal));
let store = Arc::new(SledStore::open(&db_dir).expect("open store"));
let notify = Arc::new(Notify::new());
// Track if notification was received
let notify_clone = Arc::clone(&notify);
let notification_received = Arc::new(std::sync::atomic::AtomicBool::new(false));
let notification_received_clone = Arc::clone(&notification_received);
// Spawn listener that waits for notification
let listener = tokio::spawn(async move {
tokio::time::timeout(std::time::Duration::from_secs(5), notify_clone.notified())
.await
.expect("timeout waiting for notification");
notification_received_clone.store(true, std::sync::atomic::Ordering::SeqCst);
});
// Give listener time to start
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
// Ingest with notify attached
let mut worker = IngestWorker::new(journal.clone(), store.clone())
.await
.expect("worker")
.with_notify(notify);
worker.step().await.expect("step");
// Wait for listener to complete
listener.await.expect("listener task");
assert!(
notification_received.load(std::sync::atomic::Ordering::SeqCst),
"notification should have been received"
);
}
// ============================================================================
// DECAY INTEGRATION TESTS
// ============================================================================
/// Test: Decay reduces effective confidence of old assertions.
///
/// Proves that when `decay_halflife` is set, older assertions have their
/// confidence reduced, allowing newer lower-confidence assertions to win.
#[tokio::test]
async fn test_e2e_decay_reduces_old_confidence() {
let dir = tempdir().expect("create temp dir");
let wal_dir = dir.path().join("wal");
let db_dir = dir.path().join("db");
// Constants for decay calculation
let now: u64 = 1_000_000_000;
let one_year_ago = now - (365 * 24 * 60 * 60);
let one_week_ago = now - (7 * 24 * 60 * 60);
let one_year_seconds: u64 = 365 * 24 * 60 * 60;
// Old assertion with HIGH original confidence (0.95)
// But 1 year old with 1-year halflife = ~0.475 effective
let old_assertion = {
let mut a = create_signed_assertion("Semaglutide", "muscle_effect", -5.0, one_year_ago);
a.confidence = 0.95;
a
};
// New assertion with LOWER original confidence (0.6)
// Only 1 week old = ~0.59 effective (minimal decay)
let new_assertion = {
let mut a = create_signed_assertion("Semaglutide", "muscle_effect", -2.0, one_week_ago);
a.confidence = 0.6;
a
};
// Write both to WAL and ingest
let mut journal = Journal::open(&wal_dir).expect("open journal");
journal.append(serialize_assertion(&old_assertion).expect("ser")).expect("append");
journal.append(serialize_assertion(&new_assertion).expect("ser")).expect("append");
let journal = Arc::new(Mutex::new(journal));
let store = Arc::new(SledStore::open(&db_dir).expect("open store"));
let mut worker = IngestWorker::new(journal.clone(), store.clone()).await.expect("worker");
worker.step().await.expect("step 1");
worker.step().await.expect("step 2");
// Verify both assertions are stored
let h_entries = store.scan_prefix(b"H:").await.expect("scan");
assert_eq!(h_entries.len(), 2, "should have two assertions");
// Query WITHOUT decay: old assertion wins (0.95 > 0.6)
let engine = QueryEngine::new(store.clone());
let query_no_decay = Query::builder().subject("Semaglutide").predicate("muscle_effect").build();
let result_no_decay = engine.execute(&query_no_decay).await.expect("query no decay");
assert_eq!(result_no_decay.assertions.len(), 2);
// Find the highest confidence one without decay
let highest_no_decay = result_no_decay
.assertions
.iter()
.max_by(|a, b| a.confidence.partial_cmp(&b.confidence).unwrap_or(std::cmp::Ordering::Equal))
.expect("at least one assertion");
assert_eq!(
highest_no_decay.object,
ObjectValue::Number(-5.0),
"Without decay, old high-confidence assertion has highest confidence"
);
// Query WITH decay: new assertion should have higher effective confidence
// Old: 0.95 * 2^(-1) = 0.475
// New: 0.6 * 2^(-(7/365)) ≈ 0.59
let query_with_decay = Query::builder()
.subject("Semaglutide")
.predicate("muscle_effect")
.decay_halflife(one_year_seconds)
.as_of(now) // Use as_of to control "now" for deterministic test
.build();
let result_with_decay = engine.execute(&query_with_decay).await.expect("query with decay");
assert_eq!(result_with_decay.assertions.len(), 2);
// Find the highest confidence one WITH decay applied
let highest_with_decay = result_with_decay
.assertions
.iter()
.max_by(|a, b| a.confidence.partial_cmp(&b.confidence).unwrap_or(std::cmp::Ordering::Equal))
.expect("at least one assertion");
assert_eq!(
highest_with_decay.object,
ObjectValue::Number(-2.0),
"With decay, newer assertion should have higher effective confidence"
);
// Verify the actual decayed confidence values
let old_decayed = result_with_decay
.assertions
.iter()
.find(|a| a.object == ObjectValue::Number(-5.0))
.expect("find old assertion");
let new_decayed = result_with_decay
.assertions
.iter()
.find(|a| a.object == ObjectValue::Number(-2.0))
.expect("find new assertion");
// Old: 0.95 * 2^(-1) ≈ 0.475
assert!(
(old_decayed.confidence - 0.475).abs() < 0.02,
"Old assertion should decay to ~0.475, got {}",
old_decayed.confidence
);
// New: 0.6 * 2^(-(7/365)) ≈ 0.592
assert!(
(new_decayed.confidence - 0.592).abs() < 0.02,
"New assertion should decay minimally to ~0.592, got {}",
new_decayed.confidence
);
}