stemedb/crates/stemedb-query/tests/e2e_pipeline.rs
jordan 3320c24afa feat: WAL hardening (Phase 5B) - CRC32C, crash recovery, group commit, log rotation
Add CRC32C checksums to WAL record format (v2), implement crash recovery
with automatic truncation of corrupt records, add feature-gated group commit
buffer for batched fsync under concurrent load, and implement log rotation
via segment files with global offset addressing.

Key changes:
- Record format v2: [len:u32][crc32c:u32][blake3:32][payload:N]
- recover_file() scans and truncates corrupt tail records
- GroupCommitBuffer batches fsync via MPSC channel (tokio feature gate)
- SegmentManager with binary search resolution and cursor-based cleanup
- Journal::read() auto-refreshes segments on miss for writer/reader split
- Split recovery.rs and key_codec.rs into directory modules for 500-line max

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-02 12:36:35 -07:00

428 lines
17 KiB
Rust

//! E2E integration tests for the StemeDB pipeline.
//!
//! Validates the full data flow: WAL -> IngestWorker -> Materializer -> QueryEngine
//!
//! # Test Coverage
//!
//! | Test | Pipeline Stage | Validates |
//! |------|---------------|-----------|
//! | `test_e2e_write_materialize_read` | Full | Basic happy path |
//! | `test_e2e_vote_consensus` | Full + Votes | Vote-weighted resolution |
//! | `test_e2e_update_winner` | Full + Update | Winner changes on re-materialize |
//! | `test_e2e_cursor_persistence` | WAL + Ingest | Cursor survives worker restart |
//! | `test_e2e_notify_integration` | Ingest | Event-driven notification channel |
#![allow(clippy::expect_used)] // Test code uses expect() for clear failure messages
use ed25519_dalek::{Signer, SigningKey};
use rand::rngs::OsRng;
use std::sync::Arc;
use stemedb_core::serde::serialize;
use stemedb_core::testing::AssertionBuilder;
use stemedb_core::types::{Assertion, LifecycleStage, ObjectValue, SignatureEntry, Vote};
use stemedb_ingest::worker::{serialize_assertion, IngestWorker};
use stemedb_lens::{RecencyLens, SyncLensWrapper, VoteAwareConsensusLens};
use stemedb_query::{Materializer, Query, QueryEngine};
use stemedb_storage::{key_codec, GenericVoteStore, HybridStore, KVStore, VoteStore};
use stemedb_wal::Journal;
use tempfile::tempdir;
use tokio::sync::{Mutex, Notify};
// ============================================================================
// TEST HELPERS
// ============================================================================
/// Create a signed assertion with Ed25519 signature.
///
/// The signature signs the message `"{subject}:{predicate}"` which matches
/// IngestWorker's verification logic.
fn create_signed_assertion(
subject: &str,
predicate: &str,
value: f64,
timestamp: u64,
) -> Assertion {
let mut csprng = OsRng;
let signing_key = SigningKey::generate(&mut csprng);
let verifying_key = signing_key.verifying_key();
let message = format!("{}:{}", subject, predicate);
let signature = signing_key.sign(message.as_bytes());
AssertionBuilder::new()
.subject(subject)
.predicate(predicate)
.object_number(value)
.confidence(0.95)
.lifecycle(LifecycleStage::Proposed)
.timestamp(timestamp)
.signatures(vec![SignatureEntry {
agent_id: verifying_key.to_bytes(),
signature: signature.to_bytes(),
timestamp,
}])
.build()
}
/// Compute the content-addressed hash of an assertion.
///
/// Matches the hash computation in the ingestion pipeline.
fn compute_assertion_hash(assertion: &Assertion) -> [u8; 32] {
let bytes = serialize(assertion).expect("serialize assertion");
*blake3::hash(&bytes).as_bytes()
}
/// Create a test vote for an assertion.
fn create_vote(assertion_hash: [u8; 32], agent_idx: u8, weight: f32, timestamp: u64) -> Vote {
let mut agent_id = [0u8; 32];
agent_id[0] = agent_idx;
Vote {
assertion_hash,
agent_id,
weight,
signature: [0u8; 64],
timestamp,
source_url: None,
observed_context: None,
}
}
// ============================================================================
// E2E TESTS
// ============================================================================
/// Test 1: Basic pipeline - write, ingest, materialize, query.
///
/// Proves the fundamental flow works end-to-end:
/// 1. Write signed assertion to WAL
/// 2. IngestWorker reads from WAL and stores in KV
/// 3. Materializer creates MV:{subject}:{predicate}
/// 4. QueryEngine returns the assertion
#[tokio::test]
async fn test_e2e_write_materialize_read() {
let dir = tempdir().expect("create temp dir");
let wal_dir = dir.path().join("wal");
let db_dir = dir.path().join("db");
// === Step 1: Write assertion to WAL ===
let assertion = create_signed_assertion("Tesla_Inc", "has_revenue", 96.7, 1000);
let mut journal = Journal::open(&wal_dir).expect("open journal");
let payload = serialize_assertion(&assertion).expect("serialize");
journal.append(payload).expect("append to WAL");
// === Step 2: Run IngestWorker to process WAL ===
let journal = Arc::new(Mutex::new(journal));
let store = Arc::new(HybridStore::open(&db_dir).expect("open store"));
let notify = Arc::new(Notify::new());
let mut worker = IngestWorker::new(journal.clone(), store.clone())
.await
.expect("create worker")
.with_notify(notify.clone());
let bytes_processed = worker.step().await.expect("ingest step");
assert!(bytes_processed > 0, "should have processed data from WAL");
// Verify assertion stored at {subject}\x00H:{hash}
let assertion_hash = compute_assertion_hash(&assertion);
let h_key = key_codec::assertion_key("Tesla_Inc", &hex::encode(assertion_hash));
let stored = store.get(&h_key).await.expect("get assertion");
assert!(stored.is_some(), "assertion should be stored at H: key");
// Verify compound index {subject}\x00SP:{predicate} created
let sp_prefix = key_codec::subject_predicate_scan_prefix("Tesla_Inc");
let sp_entries = store.scan_prefix(&sp_prefix).await.expect("scan SP: prefix");
assert_eq!(sp_entries.len(), 1, "should have one SP: index entry");
// === Step 3: Run Materializer ===
let vote_store = Arc::new(GenericVoteStore::new(store.clone()));
let lens = VoteAwareConsensusLens::new(vote_store);
let materializer = Materializer::new(store.clone(), Box::new(lens));
let report = materializer.step().await.expect("materialize step");
assert_eq!(report.pairs_scanned, 1, "should scan one subject+predicate pair");
assert_eq!(report.views_updated, 1, "should update one materialized view");
// Verify {subject}\x00MV:{predicate} written
let mv_key = key_codec::mv_key("Tesla_Inc", "has_revenue");
let mv_data = store.get(&mv_key).await.expect("get MV");
assert!(mv_data.is_some(), "materialized view should exist");
// === Step 4: Query via QueryEngine ===
let engine = QueryEngine::new(store.clone());
let query = Query::builder().subject("Tesla_Inc").predicate("has_revenue").build();
let result = engine.execute(&query).await.expect("execute query");
assert_eq!(result.assertions.len(), 1, "query should return one assertion");
assert_eq!(result.assertions[0].subject, "Tesla_Inc");
assert_eq!(result.assertions[0].predicate, "has_revenue");
assert_eq!(result.assertions[0].object, ObjectValue::Number(96.7));
}
/// Test 2: Vote-weighted consensus.
///
/// Proves that VoteAwareConsensusLens selects the assertion with highest
/// aggregate vote weight:
/// 1. Write two assertions with same subject+predicate but different values
/// 2. Add votes favoring assertion_a (3 votes, weight 0.9 each)
/// 3. Add 1 vote for assertion_b (weight 0.2)
/// 4. Materializer picks assertion_a as winner
#[tokio::test]
async fn test_e2e_vote_consensus() {
let dir = tempdir().expect("create temp dir");
let wal_dir = dir.path().join("wal");
let db_dir = dir.path().join("db");
// Create two competing assertions
let assertion_a = create_signed_assertion("Semaglutide", "muscle_effect", -5.0, 1000);
let assertion_b = create_signed_assertion("Semaglutide", "muscle_effect", -15.0, 1100);
// Write both to WAL
let mut journal = Journal::open(&wal_dir).expect("open journal");
journal.append(serialize_assertion(&assertion_a).expect("ser")).expect("append");
journal.append(serialize_assertion(&assertion_b).expect("ser")).expect("append");
// Ingest both
let journal = Arc::new(Mutex::new(journal));
let store = Arc::new(HybridStore::open(&db_dir).expect("open store"));
let mut worker =
IngestWorker::new(journal.clone(), store.clone()).await.expect("create worker");
let bytes1 = worker.step().await.expect("step 1");
assert!(bytes1 > 0, "should ingest first assertion");
let bytes2 = worker.step().await.expect("step 2");
assert!(bytes2 > 0, "should ingest second assertion");
// Compute hashes for both assertions
let hash_a = compute_assertion_hash(&assertion_a);
let hash_b = compute_assertion_hash(&assertion_b);
// Verify both are stored
let h_key_a = key_codec::assertion_key("Semaglutide", &hex::encode(hash_a));
let h_key_b = key_codec::assertion_key("Semaglutide", &hex::encode(hash_b));
assert!(store.get(&h_key_a).await.expect("get a").is_some(), "assertion_a should be stored");
assert!(store.get(&h_key_b).await.expect("get b").is_some(), "assertion_b should be stored");
// Add votes via VoteStore
let vote_store = Arc::new(GenericVoteStore::new(store.clone()));
// assertion_a gets 3 votes (total weight = 2.7)
for i in 0..3 {
let vote = create_vote(hash_a, i, 0.9, 2000 + i as u64);
vote_store.put_vote(&vote, "Semaglutide").await.expect("put vote for a");
}
// assertion_b gets 1 vote (total weight = 0.2)
let vote_b = create_vote(hash_b, 10, 0.2, 2100);
vote_store.put_vote(&vote_b, "Semaglutide").await.expect("put vote for b");
// Materialize with VoteAwareConsensusLens
let lens = VoteAwareConsensusLens::new(Arc::clone(&vote_store));
let materializer = Materializer::new(store.clone(), Box::new(lens));
let report = materializer.step().await.expect("materialize");
assert_eq!(report.views_updated, 1);
// Query should return assertion_a (higher aggregate weight)
let engine = QueryEngine::new(store.clone());
let query = Query::builder().subject("Semaglutide").predicate("muscle_effect").build();
let result = engine.execute(&query).await.expect("query");
assert_eq!(result.assertions.len(), 1);
assert_eq!(
result.assertions[0].object,
ObjectValue::Number(-5.0),
"should return assertion_a (higher vote weight)"
);
}
/// Test 3: Winner changes after new assertion.
///
/// Proves that re-materialization updates the winner when new data arrives:
/// 1. Write assertion_v1 (timestamp=1000), ingest, materialize
/// 2. Query returns v1
/// 3. Write assertion_v2 (timestamp=2000, same subject+predicate)
/// 4. Re-ingest and re-materialize with RecencyLens
/// 5. Query returns v2 (newer timestamp wins)
#[tokio::test]
async fn test_e2e_update_winner() {
let dir = tempdir().expect("create temp dir");
let wal_dir = dir.path().join("wal");
let db_dir = dir.path().join("db");
// === Phase 1: Initial assertion ===
let assertion_v1 = create_signed_assertion("Apple_Inc", "stock_price", 150.0, 1000);
let mut journal = Journal::open(&wal_dir).expect("open journal");
journal.append(serialize_assertion(&assertion_v1).expect("ser")).expect("append v1");
let journal = Arc::new(Mutex::new(journal));
let store = Arc::new(HybridStore::open(&db_dir).expect("open store"));
// Ingest v1
let mut worker = IngestWorker::new(journal.clone(), store.clone()).await.expect("worker");
let bytes = worker.step().await.expect("step v1");
assert!(bytes > 0);
// Materialize with RecencyLens (wrapped for AsyncLens trait)
let lens = SyncLensWrapper(RecencyLens);
let materializer = Materializer::new(store.clone(), Box::new(lens));
materializer.step().await.expect("materialize v1");
// Query returns v1
let engine = QueryEngine::new(store.clone());
let query = Query::builder().subject("Apple_Inc").predicate("stock_price").build();
let result = engine.execute(&query).await.expect("query v1");
assert_eq!(result.assertions.len(), 1);
assert_eq!(result.assertions[0].object, ObjectValue::Number(150.0));
assert_eq!(result.assertions[0].timestamp, 1000);
// === Phase 2: New assertion with higher timestamp ===
let assertion_v2 = create_signed_assertion("Apple_Inc", "stock_price", 175.0, 2000);
// Write to WAL
{
let mut journal = journal.lock().await;
journal.append(serialize_assertion(&assertion_v2).expect("ser")).expect("append v2");
}
// Re-ingest (worker resumes from cursor)
let mut worker2 = IngestWorker::new(journal.clone(), store.clone()).await.expect("worker2");
let bytes2 = worker2.step().await.expect("step v2");
assert!(bytes2 > 0, "should process new assertion");
// Verify both assertions are now stored
let hash_v1 = compute_assertion_hash(&assertion_v1);
let hash_v2 = compute_assertion_hash(&assertion_v2);
let key_v1 = key_codec::assertion_key("Apple_Inc", &hex::encode(hash_v1));
let key_v2 = key_codec::assertion_key("Apple_Inc", &hex::encode(hash_v2));
assert!(store.get(&key_v1).await.expect("get v1").is_some(), "v1 should be stored");
assert!(store.get(&key_v2).await.expect("get v2").is_some(), "v2 should be stored");
// Re-materialize
let lens2 = SyncLensWrapper(RecencyLens);
let materializer2 = Materializer::new(store.clone(), Box::new(lens2));
materializer2.step().await.expect("materialize v2");
// Query returns v2 (newer timestamp wins with RecencyLens)
let result2 = engine.execute(&query).await.expect("query v2");
assert_eq!(result2.assertions.len(), 1);
assert_eq!(
result2.assertions[0].object,
ObjectValue::Number(175.0),
"should return v2 (more recent)"
);
assert_eq!(result2.assertions[0].timestamp, 2000);
}
/// Test: Cursor persistence across worker restarts.
///
/// Validates that the IngestWorker correctly resumes from the last cursor,
/// avoiding replay of already-processed records.
#[tokio::test]
async fn test_e2e_cursor_persistence() {
let dir = tempdir().expect("create temp dir");
let wal_dir = dir.path().join("wal");
let db_dir = dir.path().join("db");
// Write 3 assertions to WAL
let a1 = create_signed_assertion("Entity_A", "prop", 1.0, 1000);
let a2 = create_signed_assertion("Entity_B", "prop", 2.0, 2000);
let a3 = create_signed_assertion("Entity_C", "prop", 3.0, 3000);
let mut journal = Journal::open(&wal_dir).expect("open journal");
journal.append(serialize_assertion(&a1).expect("ser")).expect("append");
journal.append(serialize_assertion(&a2).expect("ser")).expect("append");
journal.append(serialize_assertion(&a3).expect("ser")).expect("append");
let journal = Arc::new(Mutex::new(journal));
let store = Arc::new(HybridStore::open(&db_dir).expect("open store"));
// Worker 1: Process first 2 assertions
let mut worker1 = IngestWorker::new(journal.clone(), store.clone()).await.expect("worker1");
worker1.step().await.expect("step 1");
worker1.step().await.expect("step 2");
// Verify 2 assertions stored
let hash1 = compute_assertion_hash(&a1);
let hash2 = compute_assertion_hash(&a2);
let key1 = key_codec::assertion_key("Entity_A", &hex::encode(hash1));
let key2 = key_codec::assertion_key("Entity_B", &hex::encode(hash2));
assert!(store.get(&key1).await.expect("get a1").is_some(), "a1 should be stored");
assert!(store.get(&key2).await.expect("get a2").is_some(), "a2 should be stored");
// Drop worker1, simulate restart
drop(worker1);
// Worker 2: Should resume from cursor and only process the third assertion
let mut worker2 = IngestWorker::new(journal.clone(), store.clone()).await.expect("worker2");
let mut steps = 0;
while worker2.step().await.expect("step") > 0 {
steps += 1;
}
assert_eq!(steps, 1, "worker2 should only process 1 new assertion");
// Verify all 3 assertions now stored
let hash3 = compute_assertion_hash(&a3);
let key3 = key_codec::assertion_key("Entity_C", &hex::encode(hash3));
assert!(store.get(&key3).await.expect("get a3").is_some(), "a3 should be stored");
}
/// Test: Event-driven materialization via Notify.
///
/// Validates that the IngestWorker signals downstream consumers (Materializer)
/// when new data is available.
#[tokio::test]
async fn test_e2e_notify_integration() {
let dir = tempdir().expect("create temp dir");
let wal_dir = dir.path().join("wal");
let db_dir = dir.path().join("db");
let assertion = create_signed_assertion("Tesla_Inc", "deliveries", 500000.0, 1000);
let mut journal = Journal::open(&wal_dir).expect("open journal");
journal.append(serialize_assertion(&assertion).expect("ser")).expect("append");
let journal = Arc::new(Mutex::new(journal));
let store = Arc::new(HybridStore::open(&db_dir).expect("open store"));
let notify = Arc::new(Notify::new());
// Track if notification was received
let notify_clone = Arc::clone(&notify);
let notification_received = Arc::new(std::sync::atomic::AtomicBool::new(false));
let notification_received_clone = Arc::clone(&notification_received);
// Spawn listener that waits for notification
let listener = tokio::spawn(async move {
tokio::time::timeout(std::time::Duration::from_secs(5), notify_clone.notified())
.await
.expect("timeout waiting for notification");
notification_received_clone.store(true, std::sync::atomic::Ordering::SeqCst);
});
// Give listener time to start
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
// Ingest with notify attached
let mut worker = IngestWorker::new(journal.clone(), store.clone())
.await
.expect("worker")
.with_notify(notify);
worker.step().await.expect("step");
// Wait for listener to complete
listener.await.expect("listener task");
assert!(
notification_received.load(std::sync::atomic::Ordering::SeqCst),
"notification should have been received"
);
}