//! E2E integration tests for the StemeDB pipeline. //! //! Validates the full data flow: WAL -> IngestWorker -> Materializer -> QueryEngine //! //! # Test Coverage //! //! | Test | Pipeline Stage | Validates | //! |------|---------------|-----------| //! | `test_e2e_write_materialize_read` | Full | Basic happy path | //! | `test_e2e_vote_consensus` | Full + Votes | Vote-weighted resolution | //! | `test_e2e_update_winner` | Full + Update | Winner changes on re-materialize | //! | `test_e2e_cursor_persistence` | WAL + Ingest | Cursor survives worker restart | //! | `test_e2e_notify_integration` | Ingest | Event-driven notification channel | #![allow(clippy::expect_used)] // Test code uses expect() for clear failure messages use ed25519_dalek::{Signer, SigningKey}; use rand::rngs::OsRng; use std::sync::Arc; use stemedb_core::serde::serialize; use stemedb_core::testing::AssertionBuilder; use stemedb_core::types::{Assertion, LifecycleStage, ObjectValue, SignatureEntry, Vote}; use stemedb_ingest::worker::{serialize_assertion, IngestWorker}; use stemedb_lens::{RecencyLens, SyncLensWrapper, VoteAwareConsensusLens}; use stemedb_query::{Materializer, Query, QueryEngine}; use stemedb_storage::{key_codec, GenericVoteStore, HybridStore, KVStore, VoteStore}; use stemedb_wal::Journal; use tempfile::tempdir; use tokio::sync::{Mutex, Notify}; // ============================================================================ // TEST HELPERS // ============================================================================ /// Create a signed assertion with Ed25519 signature (v1). /// /// The signature signs the message `"{subject}:{predicate}"` which matches /// IngestWorker's v1 verification logic. fn create_signed_assertion( subject: &str, predicate: &str, value: f64, timestamp: u64, ) -> Assertion { let mut csprng = OsRng; let signing_key = SigningKey::generate(&mut csprng); let verifying_key = signing_key.verifying_key(); let message = format!("{}:{}", subject, predicate); let signature = signing_key.sign(message.as_bytes()); AssertionBuilder::new() .subject(subject) .predicate(predicate) .object_number(value) .confidence(0.95) .lifecycle(LifecycleStage::Proposed) .timestamp(timestamp) .signatures(vec![SignatureEntry { agent_id: verifying_key.to_bytes(), signature: signature.to_bytes(), timestamp, version: 1, }]) .build() } /// Compute the content-addressed hash of an assertion. /// /// Matches the hash computation in the ingestion pipeline. fn compute_assertion_hash(assertion: &Assertion) -> [u8; 32] { let bytes = serialize(assertion).expect("serialize assertion"); *blake3::hash(&bytes).as_bytes() } /// Create a test vote for an assertion. fn create_vote(assertion_hash: [u8; 32], agent_idx: u8, weight: f32, timestamp: u64) -> Vote { let mut agent_id = [0u8; 32]; agent_id[0] = agent_idx; Vote { assertion_hash, agent_id, weight, signature: [0u8; 64], timestamp, source_url: None, observed_context: None, } } // ============================================================================ // E2E TESTS // ============================================================================ /// Test 1: Basic pipeline - write, ingest, materialize, query. /// /// Proves the fundamental flow works end-to-end: /// 1. Write signed assertion to WAL /// 2. IngestWorker reads from WAL and stores in KV /// 3. Materializer creates MV:{subject}:{predicate} /// 4. QueryEngine returns the assertion #[tokio::test] async fn test_e2e_write_materialize_read() { let dir = tempdir().expect("create temp dir"); let wal_dir = dir.path().join("wal"); let db_dir = dir.path().join("db"); // === Step 1: Write assertion to WAL === let assertion = create_signed_assertion("Tesla_Inc", "has_revenue", 96.7, 1000); let mut journal = Journal::open(&wal_dir).expect("open journal"); let payload = serialize_assertion(&assertion).expect("serialize"); journal.append(payload).expect("append to WAL"); // === Step 2: Run IngestWorker to process WAL === let journal = Arc::new(Mutex::new(journal)); let store = Arc::new(HybridStore::open(&db_dir).expect("open store")); let notify = Arc::new(Notify::new()); let mut worker = IngestWorker::new(journal.clone(), store.clone()) .await .expect("create worker") .with_notify(notify.clone()); let bytes_processed = worker.step().await.expect("ingest step"); assert!(bytes_processed > 0, "should have processed data from WAL"); // Verify assertion stored at {subject}\x00H:{hash} let assertion_hash = compute_assertion_hash(&assertion); let h_key = key_codec::assertion_key("Tesla_Inc", &hex::encode(assertion_hash)); let stored = store.get(&h_key).await.expect("get assertion"); assert!(stored.is_some(), "assertion should be stored at H: key"); // Verify compound index {subject}\x00SP:{predicate} created let sp_prefix = key_codec::subject_predicate_scan_prefix("Tesla_Inc"); let sp_entries = store.scan_prefix(&sp_prefix).await.expect("scan SP: prefix"); assert_eq!(sp_entries.len(), 1, "should have one SP: index entry"); // === Step 3: Run Materializer === let vote_store = Arc::new(GenericVoteStore::new(store.clone())); let lens = VoteAwareConsensusLens::new(vote_store); let materializer = Materializer::new(store.clone(), Box::new(lens)); let report = materializer.step().await.expect("materialize step"); assert_eq!(report.pairs_scanned, 1, "should scan one subject+predicate pair"); assert_eq!(report.views_updated, 1, "should update one materialized view"); // Verify {subject}\x00MV:{predicate} written let mv_key = key_codec::mv_key("Tesla_Inc", "has_revenue"); let mv_data = store.get(&mv_key).await.expect("get MV"); assert!(mv_data.is_some(), "materialized view should exist"); // === Step 4: Query via QueryEngine === let engine = QueryEngine::new(store.clone()); let query = Query::builder().subject("Tesla_Inc").predicate("has_revenue").build(); let result = engine.execute(&query).await.expect("execute query"); assert_eq!(result.assertions.len(), 1, "query should return one assertion"); assert_eq!(result.assertions[0].subject, "Tesla_Inc"); assert_eq!(result.assertions[0].predicate, "has_revenue"); assert_eq!(result.assertions[0].object, ObjectValue::Number(96.7)); } /// Test 2: Vote-weighted consensus. /// /// Proves that VoteAwareConsensusLens selects the assertion with highest /// aggregate vote weight: /// 1. Write two assertions with same subject+predicate but different values /// 2. Add votes favoring assertion_a (3 votes, weight 0.9 each) /// 3. Add 1 vote for assertion_b (weight 0.2) /// 4. Materializer picks assertion_a as winner #[tokio::test] async fn test_e2e_vote_consensus() { let dir = tempdir().expect("create temp dir"); let wal_dir = dir.path().join("wal"); let db_dir = dir.path().join("db"); // Create two competing assertions let assertion_a = create_signed_assertion("Semaglutide", "muscle_effect", -5.0, 1000); let assertion_b = create_signed_assertion("Semaglutide", "muscle_effect", -15.0, 1100); // Write both to WAL let mut journal = Journal::open(&wal_dir).expect("open journal"); journal.append(serialize_assertion(&assertion_a).expect("ser")).expect("append"); journal.append(serialize_assertion(&assertion_b).expect("ser")).expect("append"); // Ingest both let journal = Arc::new(Mutex::new(journal)); let store = Arc::new(HybridStore::open(&db_dir).expect("open store")); let mut worker = IngestWorker::new(journal.clone(), store.clone()).await.expect("create worker"); let bytes1 = worker.step().await.expect("step 1"); assert!(bytes1 > 0, "should ingest first assertion"); let bytes2 = worker.step().await.expect("step 2"); assert!(bytes2 > 0, "should ingest second assertion"); // Compute hashes for both assertions let hash_a = compute_assertion_hash(&assertion_a); let hash_b = compute_assertion_hash(&assertion_b); // Verify both are stored let h_key_a = key_codec::assertion_key("Semaglutide", &hex::encode(hash_a)); let h_key_b = key_codec::assertion_key("Semaglutide", &hex::encode(hash_b)); assert!(store.get(&h_key_a).await.expect("get a").is_some(), "assertion_a should be stored"); assert!(store.get(&h_key_b).await.expect("get b").is_some(), "assertion_b should be stored"); // Add votes via VoteStore let vote_store = Arc::new(GenericVoteStore::new(store.clone())); // assertion_a gets 3 votes (total weight = 2.7) for i in 0..3 { let vote = create_vote(hash_a, i, 0.9, 2000 + i as u64); vote_store.put_vote(&vote, "Semaglutide").await.expect("put vote for a"); } // assertion_b gets 1 vote (total weight = 0.2) let vote_b = create_vote(hash_b, 10, 0.2, 2100); vote_store.put_vote(&vote_b, "Semaglutide").await.expect("put vote for b"); // Materialize with VoteAwareConsensusLens let lens = VoteAwareConsensusLens::new(Arc::clone(&vote_store)); let materializer = Materializer::new(store.clone(), Box::new(lens)); let report = materializer.step().await.expect("materialize"); assert_eq!(report.views_updated, 1); // Query should return assertion_a (higher aggregate weight) let engine = QueryEngine::new(store.clone()); let query = Query::builder().subject("Semaglutide").predicate("muscle_effect").build(); let result = engine.execute(&query).await.expect("query"); assert_eq!(result.assertions.len(), 1); assert_eq!( result.assertions[0].object, ObjectValue::Number(-5.0), "should return assertion_a (higher vote weight)" ); } /// Test 3: Winner changes after new assertion. /// /// Proves that re-materialization updates the winner when new data arrives: /// 1. Write assertion_v1 (timestamp=1000), ingest, materialize /// 2. Query returns v1 /// 3. Write assertion_v2 (timestamp=2000, same subject+predicate) /// 4. Re-ingest and re-materialize with RecencyLens /// 5. Query returns v2 (newer timestamp wins) #[tokio::test] async fn test_e2e_update_winner() { let dir = tempdir().expect("create temp dir"); let wal_dir = dir.path().join("wal"); let db_dir = dir.path().join("db"); // === Phase 1: Initial assertion === let assertion_v1 = create_signed_assertion("Apple_Inc", "stock_price", 150.0, 1000); let mut journal = Journal::open(&wal_dir).expect("open journal"); journal.append(serialize_assertion(&assertion_v1).expect("ser")).expect("append v1"); let journal = Arc::new(Mutex::new(journal)); let store = Arc::new(HybridStore::open(&db_dir).expect("open store")); // Ingest v1 let mut worker = IngestWorker::new(journal.clone(), store.clone()).await.expect("worker"); let bytes = worker.step().await.expect("step v1"); assert!(bytes > 0); // Materialize with RecencyLens (wrapped for AsyncLens trait) let lens = SyncLensWrapper(RecencyLens); let materializer = Materializer::new(store.clone(), Box::new(lens)); materializer.step().await.expect("materialize v1"); // Query returns v1 let engine = QueryEngine::new(store.clone()); let query = Query::builder().subject("Apple_Inc").predicate("stock_price").build(); let result = engine.execute(&query).await.expect("query v1"); assert_eq!(result.assertions.len(), 1); assert_eq!(result.assertions[0].object, ObjectValue::Number(150.0)); assert_eq!(result.assertions[0].timestamp, 1000); // === Phase 2: New assertion with higher timestamp === let assertion_v2 = create_signed_assertion("Apple_Inc", "stock_price", 175.0, 2000); // Write to WAL { let mut journal = journal.lock().await; journal.append(serialize_assertion(&assertion_v2).expect("ser")).expect("append v2"); } // Re-ingest (worker resumes from cursor) let mut worker2 = IngestWorker::new(journal.clone(), store.clone()).await.expect("worker2"); let bytes2 = worker2.step().await.expect("step v2"); assert!(bytes2 > 0, "should process new assertion"); // Verify both assertions are now stored let hash_v1 = compute_assertion_hash(&assertion_v1); let hash_v2 = compute_assertion_hash(&assertion_v2); let key_v1 = key_codec::assertion_key("Apple_Inc", &hex::encode(hash_v1)); let key_v2 = key_codec::assertion_key("Apple_Inc", &hex::encode(hash_v2)); assert!(store.get(&key_v1).await.expect("get v1").is_some(), "v1 should be stored"); assert!(store.get(&key_v2).await.expect("get v2").is_some(), "v2 should be stored"); // Re-materialize let lens2 = SyncLensWrapper(RecencyLens); let materializer2 = Materializer::new(store.clone(), Box::new(lens2)); materializer2.step().await.expect("materialize v2"); // Query returns v2 (newer timestamp wins with RecencyLens) let result2 = engine.execute(&query).await.expect("query v2"); assert_eq!(result2.assertions.len(), 1); assert_eq!( result2.assertions[0].object, ObjectValue::Number(175.0), "should return v2 (more recent)" ); assert_eq!(result2.assertions[0].timestamp, 2000); } /// Test: Cursor persistence across worker restarts. /// /// Validates that the IngestWorker correctly resumes from the last cursor, /// avoiding replay of already-processed records. #[tokio::test] async fn test_e2e_cursor_persistence() { let dir = tempdir().expect("create temp dir"); let wal_dir = dir.path().join("wal"); let db_dir = dir.path().join("db"); // Write 3 assertions to WAL let a1 = create_signed_assertion("Entity_A", "prop", 1.0, 1000); let a2 = create_signed_assertion("Entity_B", "prop", 2.0, 2000); let a3 = create_signed_assertion("Entity_C", "prop", 3.0, 3000); let mut journal = Journal::open(&wal_dir).expect("open journal"); journal.append(serialize_assertion(&a1).expect("ser")).expect("append"); journal.append(serialize_assertion(&a2).expect("ser")).expect("append"); journal.append(serialize_assertion(&a3).expect("ser")).expect("append"); let journal = Arc::new(Mutex::new(journal)); let store = Arc::new(HybridStore::open(&db_dir).expect("open store")); // Worker 1: Process first 2 assertions let mut worker1 = IngestWorker::new(journal.clone(), store.clone()).await.expect("worker1"); worker1.step().await.expect("step 1"); worker1.step().await.expect("step 2"); // Verify 2 assertions stored let hash1 = compute_assertion_hash(&a1); let hash2 = compute_assertion_hash(&a2); let key1 = key_codec::assertion_key("Entity_A", &hex::encode(hash1)); let key2 = key_codec::assertion_key("Entity_B", &hex::encode(hash2)); assert!(store.get(&key1).await.expect("get a1").is_some(), "a1 should be stored"); assert!(store.get(&key2).await.expect("get a2").is_some(), "a2 should be stored"); // Drop worker1, simulate restart drop(worker1); // Worker 2: Should resume from cursor and only process the third assertion let mut worker2 = IngestWorker::new(journal.clone(), store.clone()).await.expect("worker2"); let mut steps = 0; while worker2.step().await.expect("step") > 0 { steps += 1; } assert_eq!(steps, 1, "worker2 should only process 1 new assertion"); // Verify all 3 assertions now stored let hash3 = compute_assertion_hash(&a3); let key3 = key_codec::assertion_key("Entity_C", &hex::encode(hash3)); assert!(store.get(&key3).await.expect("get a3").is_some(), "a3 should be stored"); } /// Test: Event-driven materialization via Notify. /// /// Validates that the IngestWorker signals downstream consumers (Materializer) /// when new data is available. #[tokio::test] async fn test_e2e_notify_integration() { let dir = tempdir().expect("create temp dir"); let wal_dir = dir.path().join("wal"); let db_dir = dir.path().join("db"); let assertion = create_signed_assertion("Tesla_Inc", "deliveries", 500000.0, 1000); let mut journal = Journal::open(&wal_dir).expect("open journal"); journal.append(serialize_assertion(&assertion).expect("ser")).expect("append"); let journal = Arc::new(Mutex::new(journal)); let store = Arc::new(HybridStore::open(&db_dir).expect("open store")); let notify = Arc::new(Notify::new()); // Track if notification was received let notify_clone = Arc::clone(¬ify); let notification_received = Arc::new(std::sync::atomic::AtomicBool::new(false)); let notification_received_clone = Arc::clone(¬ification_received); // Spawn listener that waits for notification let listener = tokio::spawn(async move { tokio::time::timeout(std::time::Duration::from_secs(5), notify_clone.notified()) .await .expect("timeout waiting for notification"); notification_received_clone.store(true, std::sync::atomic::Ordering::SeqCst); }); // Give listener time to start tokio::time::sleep(std::time::Duration::from_millis(50)).await; // Ingest with notify attached let mut worker = IngestWorker::new(journal.clone(), store.clone()) .await .expect("worker") .with_notify(notify); worker.step().await.expect("step"); // Wait for listener to complete listener.await.expect("listener task"); assert!( notification_received.load(std::sync::atomic::Ordering::SeqCst), "notification should have been received" ); }