Break monolith source files into focused modules: - stemedb-core/types.rs → types/ directory (assertion, source, gold_standard, etc.) - stemedb-storage: audit_store, quota_store, trust_rank_store, vector_index, vote_store → module directories - stemedb-ingest/worker.rs → worker/ with separate test modules - stemedb-query: engine, materializer, query → module directories - stemedb-lens: epoch_aware, skeptic → module directories - stemedb-sim/lib.rs → agent, arenas/, helpers, runner, strategy, types - stemedb-api/tests: integration_tests → http_basic, http_validation, http_epoch, http_pipeline - stemedb-api/tests: e2e_flow_test → e2e_full_pipeline, e2e_lens_resolution - stemedb-query/tests: e2e_pipeline → e2e_pipeline + e2e_decay Also adds new features: gold standard verification, escalation handlers, admin endpoints, concept hierarchy spec, arena roadmap, and Go SDK. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
416 lines
16 KiB
Rust
416 lines
16 KiB
Rust
//! E2E integration tests for the StemeDB pipeline.
|
|
//!
|
|
//! Validates the full data flow: WAL -> IngestWorker -> Materializer -> QueryEngine
|
|
//!
|
|
//! # Test Coverage
|
|
//!
|
|
//! | Test | Pipeline Stage | Validates |
|
|
//! |------|---------------|-----------|
|
|
//! | `test_e2e_write_materialize_read` | Full | Basic happy path |
|
|
//! | `test_e2e_vote_consensus` | Full + Votes | Vote-weighted resolution |
|
|
//! | `test_e2e_update_winner` | Full + Update | Winner changes on re-materialize |
|
|
//! | `test_e2e_cursor_persistence` | WAL + Ingest | Cursor survives worker restart |
|
|
//! | `test_e2e_notify_integration` | Ingest | Event-driven notification channel |
|
|
|
|
#![allow(clippy::expect_used)] // Test code uses expect() for clear failure messages
|
|
|
|
use ed25519_dalek::{Signer, SigningKey};
|
|
use rand::rngs::OsRng;
|
|
use std::sync::Arc;
|
|
use stemedb_core::serde::serialize;
|
|
use stemedb_core::testing::AssertionBuilder;
|
|
use stemedb_core::types::{Assertion, LifecycleStage, ObjectValue, SignatureEntry, Vote};
|
|
use stemedb_ingest::worker::{serialize_assertion, IngestWorker};
|
|
use stemedb_lens::{RecencyLens, SyncLensWrapper, VoteAwareConsensusLens};
|
|
use stemedb_query::{Materializer, Query, QueryEngine};
|
|
use stemedb_storage::{GenericVoteStore, KVStore, SledStore, VoteStore};
|
|
use stemedb_wal::Journal;
|
|
use tempfile::tempdir;
|
|
use tokio::sync::{Mutex, Notify};
|
|
|
|
// ============================================================================
|
|
// TEST HELPERS
|
|
// ============================================================================
|
|
|
|
/// Create a signed assertion with Ed25519 signature.
|
|
///
|
|
/// The signature signs the message `"{subject}:{predicate}"` which matches
|
|
/// IngestWorker's verification logic.
|
|
fn create_signed_assertion(
|
|
subject: &str,
|
|
predicate: &str,
|
|
value: f64,
|
|
timestamp: u64,
|
|
) -> Assertion {
|
|
let mut csprng = OsRng;
|
|
let signing_key = SigningKey::generate(&mut csprng);
|
|
let verifying_key = signing_key.verifying_key();
|
|
|
|
let message = format!("{}:{}", subject, predicate);
|
|
let signature = signing_key.sign(message.as_bytes());
|
|
|
|
AssertionBuilder::new()
|
|
.subject(subject)
|
|
.predicate(predicate)
|
|
.object_number(value)
|
|
.confidence(0.95)
|
|
.lifecycle(LifecycleStage::Proposed)
|
|
.timestamp(timestamp)
|
|
.signatures(vec![SignatureEntry {
|
|
agent_id: verifying_key.to_bytes(),
|
|
signature: signature.to_bytes(),
|
|
timestamp,
|
|
}])
|
|
.build()
|
|
}
|
|
|
|
/// Compute the content-addressed hash of an assertion.
|
|
///
|
|
/// Matches the hash computation in the ingestion pipeline.
|
|
fn compute_assertion_hash(assertion: &Assertion) -> [u8; 32] {
|
|
let bytes = serialize(assertion).expect("serialize assertion");
|
|
*blake3::hash(&bytes).as_bytes()
|
|
}
|
|
|
|
/// Create a test vote for an assertion.
|
|
fn create_vote(assertion_hash: [u8; 32], agent_idx: u8, weight: f32, timestamp: u64) -> Vote {
|
|
let mut agent_id = [0u8; 32];
|
|
agent_id[0] = agent_idx;
|
|
|
|
Vote {
|
|
assertion_hash,
|
|
agent_id,
|
|
weight,
|
|
signature: [0u8; 64],
|
|
timestamp,
|
|
source_url: None,
|
|
observed_context: None,
|
|
}
|
|
}
|
|
|
|
// ============================================================================
|
|
// E2E TESTS
|
|
// ============================================================================
|
|
|
|
/// Test 1: Basic pipeline - write, ingest, materialize, query.
|
|
///
|
|
/// Proves the fundamental flow works end-to-end:
|
|
/// 1. Write signed assertion to WAL
|
|
/// 2. IngestWorker reads from WAL and stores in KV
|
|
/// 3. Materializer creates MV:{subject}:{predicate}
|
|
/// 4. QueryEngine returns the assertion
|
|
#[tokio::test]
|
|
async fn test_e2e_write_materialize_read() {
|
|
let dir = tempdir().expect("create temp dir");
|
|
let wal_dir = dir.path().join("wal");
|
|
let db_dir = dir.path().join("db");
|
|
|
|
// === Step 1: Write assertion to WAL ===
|
|
let assertion = create_signed_assertion("Tesla_Inc", "has_revenue", 96.7, 1000);
|
|
|
|
let mut journal = Journal::open(&wal_dir).expect("open journal");
|
|
let payload = serialize_assertion(&assertion).expect("serialize");
|
|
journal.append(payload).expect("append to WAL");
|
|
|
|
// === Step 2: Run IngestWorker to process WAL ===
|
|
let journal = Arc::new(Mutex::new(journal));
|
|
let store = Arc::new(SledStore::open(&db_dir).expect("open store"));
|
|
let notify = Arc::new(Notify::new());
|
|
|
|
let mut worker = IngestWorker::new(journal.clone(), store.clone())
|
|
.await
|
|
.expect("create worker")
|
|
.with_notify(notify.clone());
|
|
|
|
let bytes_processed = worker.step().await.expect("ingest step");
|
|
assert!(bytes_processed > 0, "should have processed data from WAL");
|
|
|
|
// Verify assertion stored at H:{hash}
|
|
let assertion_hash = compute_assertion_hash(&assertion);
|
|
let h_key = format!("H:{}", hex::encode(assertion_hash)).into_bytes();
|
|
let stored = store.get(&h_key).await.expect("get assertion");
|
|
assert!(stored.is_some(), "assertion should be stored at H: key");
|
|
|
|
// Verify compound index SP:{subject}:{predicate} created
|
|
let sp_key = b"SP:Tesla_Inc:has_revenue";
|
|
let sp_entries = store.scan_prefix(sp_key).await.expect("scan SP: prefix");
|
|
assert_eq!(sp_entries.len(), 1, "should have one SP: index entry");
|
|
|
|
// === Step 3: Run Materializer ===
|
|
let vote_store = Arc::new(GenericVoteStore::new(store.clone()));
|
|
let lens = VoteAwareConsensusLens::new(vote_store);
|
|
let materializer = Materializer::new(store.clone(), Box::new(lens));
|
|
|
|
let report = materializer.step().await.expect("materialize step");
|
|
assert_eq!(report.pairs_scanned, 1, "should scan one subject+predicate pair");
|
|
assert_eq!(report.views_updated, 1, "should update one materialized view");
|
|
|
|
// Verify MV:{subject}:{predicate} written
|
|
let mv_key = b"MV:Tesla_Inc:has_revenue";
|
|
let mv_data = store.get(mv_key).await.expect("get MV");
|
|
assert!(mv_data.is_some(), "materialized view should exist");
|
|
|
|
// === Step 4: Query via QueryEngine ===
|
|
let engine = QueryEngine::new(store.clone());
|
|
let query = Query::builder().subject("Tesla_Inc").predicate("has_revenue").build();
|
|
|
|
let result = engine.execute(&query).await.expect("execute query");
|
|
assert_eq!(result.assertions.len(), 1, "query should return one assertion");
|
|
assert_eq!(result.assertions[0].subject, "Tesla_Inc");
|
|
assert_eq!(result.assertions[0].predicate, "has_revenue");
|
|
assert_eq!(result.assertions[0].object, ObjectValue::Number(96.7));
|
|
}
|
|
|
|
/// Test 2: Vote-weighted consensus.
|
|
///
|
|
/// Proves that VoteAwareConsensusLens selects the assertion with highest
|
|
/// aggregate vote weight:
|
|
/// 1. Write two assertions with same subject+predicate but different values
|
|
/// 2. Add votes favoring assertion_a (3 votes, weight 0.9 each)
|
|
/// 3. Add 1 vote for assertion_b (weight 0.2)
|
|
/// 4. Materializer picks assertion_a as winner
|
|
#[tokio::test]
|
|
async fn test_e2e_vote_consensus() {
|
|
let dir = tempdir().expect("create temp dir");
|
|
let wal_dir = dir.path().join("wal");
|
|
let db_dir = dir.path().join("db");
|
|
|
|
// Create two competing assertions
|
|
let assertion_a = create_signed_assertion("Semaglutide", "muscle_effect", -5.0, 1000);
|
|
let assertion_b = create_signed_assertion("Semaglutide", "muscle_effect", -15.0, 1100);
|
|
|
|
// Write both to WAL
|
|
let mut journal = Journal::open(&wal_dir).expect("open journal");
|
|
journal.append(serialize_assertion(&assertion_a).expect("ser")).expect("append");
|
|
journal.append(serialize_assertion(&assertion_b).expect("ser")).expect("append");
|
|
|
|
// Ingest both
|
|
let journal = Arc::new(Mutex::new(journal));
|
|
let store = Arc::new(SledStore::open(&db_dir).expect("open store"));
|
|
|
|
let mut worker =
|
|
IngestWorker::new(journal.clone(), store.clone()).await.expect("create worker");
|
|
|
|
let bytes1 = worker.step().await.expect("step 1");
|
|
assert!(bytes1 > 0, "should ingest first assertion");
|
|
|
|
let bytes2 = worker.step().await.expect("step 2");
|
|
assert!(bytes2 > 0, "should ingest second assertion");
|
|
|
|
// Verify both are stored
|
|
let h_entries = store.scan_prefix(b"H:").await.expect("scan H:");
|
|
assert_eq!(h_entries.len(), 2, "should have two assertions");
|
|
|
|
// Add votes via VoteStore
|
|
let vote_store = Arc::new(GenericVoteStore::new(store.clone()));
|
|
|
|
let hash_a = compute_assertion_hash(&assertion_a);
|
|
let hash_b = compute_assertion_hash(&assertion_b);
|
|
|
|
// assertion_a gets 3 votes (total weight = 2.7)
|
|
for i in 0..3 {
|
|
let vote = create_vote(hash_a, i, 0.9, 2000 + i as u64);
|
|
vote_store.put_vote(&vote).await.expect("put vote for a");
|
|
}
|
|
|
|
// assertion_b gets 1 vote (total weight = 0.2)
|
|
let vote_b = create_vote(hash_b, 10, 0.2, 2100);
|
|
vote_store.put_vote(&vote_b).await.expect("put vote for b");
|
|
|
|
// Materialize with VoteAwareConsensusLens
|
|
let lens = VoteAwareConsensusLens::new(Arc::clone(&vote_store));
|
|
let materializer = Materializer::new(store.clone(), Box::new(lens));
|
|
|
|
let report = materializer.step().await.expect("materialize");
|
|
assert_eq!(report.views_updated, 1);
|
|
|
|
// Query should return assertion_a (higher aggregate weight)
|
|
let engine = QueryEngine::new(store.clone());
|
|
let query = Query::builder().subject("Semaglutide").predicate("muscle_effect").build();
|
|
|
|
let result = engine.execute(&query).await.expect("query");
|
|
assert_eq!(result.assertions.len(), 1);
|
|
assert_eq!(
|
|
result.assertions[0].object,
|
|
ObjectValue::Number(-5.0),
|
|
"should return assertion_a (higher vote weight)"
|
|
);
|
|
}
|
|
|
|
/// Test 3: Winner changes after new assertion.
|
|
///
|
|
/// Proves that re-materialization updates the winner when new data arrives:
|
|
/// 1. Write assertion_v1 (timestamp=1000), ingest, materialize
|
|
/// 2. Query returns v1
|
|
/// 3. Write assertion_v2 (timestamp=2000, same subject+predicate)
|
|
/// 4. Re-ingest and re-materialize with RecencyLens
|
|
/// 5. Query returns v2 (newer timestamp wins)
|
|
#[tokio::test]
|
|
async fn test_e2e_update_winner() {
|
|
let dir = tempdir().expect("create temp dir");
|
|
let wal_dir = dir.path().join("wal");
|
|
let db_dir = dir.path().join("db");
|
|
|
|
// === Phase 1: Initial assertion ===
|
|
let assertion_v1 = create_signed_assertion("Apple_Inc", "stock_price", 150.0, 1000);
|
|
|
|
let mut journal = Journal::open(&wal_dir).expect("open journal");
|
|
journal.append(serialize_assertion(&assertion_v1).expect("ser")).expect("append v1");
|
|
|
|
let journal = Arc::new(Mutex::new(journal));
|
|
let store = Arc::new(SledStore::open(&db_dir).expect("open store"));
|
|
|
|
// Ingest v1
|
|
let mut worker = IngestWorker::new(journal.clone(), store.clone()).await.expect("worker");
|
|
let bytes = worker.step().await.expect("step v1");
|
|
assert!(bytes > 0);
|
|
|
|
// Materialize with RecencyLens (wrapped for AsyncLens trait)
|
|
let lens = SyncLensWrapper(RecencyLens);
|
|
let materializer = Materializer::new(store.clone(), Box::new(lens));
|
|
materializer.step().await.expect("materialize v1");
|
|
|
|
// Query returns v1
|
|
let engine = QueryEngine::new(store.clone());
|
|
let query = Query::builder().subject("Apple_Inc").predicate("stock_price").build();
|
|
|
|
let result = engine.execute(&query).await.expect("query v1");
|
|
assert_eq!(result.assertions.len(), 1);
|
|
assert_eq!(result.assertions[0].object, ObjectValue::Number(150.0));
|
|
assert_eq!(result.assertions[0].timestamp, 1000);
|
|
|
|
// === Phase 2: New assertion with higher timestamp ===
|
|
let assertion_v2 = create_signed_assertion("Apple_Inc", "stock_price", 175.0, 2000);
|
|
|
|
// Write to WAL
|
|
{
|
|
let mut journal = journal.lock().await;
|
|
journal.append(serialize_assertion(&assertion_v2).expect("ser")).expect("append v2");
|
|
}
|
|
|
|
// Re-ingest (worker resumes from cursor)
|
|
let mut worker2 = IngestWorker::new(journal.clone(), store.clone()).await.expect("worker2");
|
|
let bytes2 = worker2.step().await.expect("step v2");
|
|
assert!(bytes2 > 0, "should process new assertion");
|
|
|
|
// Verify both assertions are now stored
|
|
let h_entries = store.scan_prefix(b"H:").await.expect("scan");
|
|
assert_eq!(h_entries.len(), 2, "should have two assertions");
|
|
|
|
// Re-materialize
|
|
let lens2 = SyncLensWrapper(RecencyLens);
|
|
let materializer2 = Materializer::new(store.clone(), Box::new(lens2));
|
|
materializer2.step().await.expect("materialize v2");
|
|
|
|
// Query returns v2 (newer timestamp wins with RecencyLens)
|
|
let result2 = engine.execute(&query).await.expect("query v2");
|
|
assert_eq!(result2.assertions.len(), 1);
|
|
assert_eq!(
|
|
result2.assertions[0].object,
|
|
ObjectValue::Number(175.0),
|
|
"should return v2 (more recent)"
|
|
);
|
|
assert_eq!(result2.assertions[0].timestamp, 2000);
|
|
}
|
|
|
|
/// Test: Cursor persistence across worker restarts.
|
|
///
|
|
/// Validates that the IngestWorker correctly resumes from the last cursor,
|
|
/// avoiding replay of already-processed records.
|
|
#[tokio::test]
|
|
async fn test_e2e_cursor_persistence() {
|
|
let dir = tempdir().expect("create temp dir");
|
|
let wal_dir = dir.path().join("wal");
|
|
let db_dir = dir.path().join("db");
|
|
|
|
// Write 3 assertions to WAL
|
|
let a1 = create_signed_assertion("Entity_A", "prop", 1.0, 1000);
|
|
let a2 = create_signed_assertion("Entity_B", "prop", 2.0, 2000);
|
|
let a3 = create_signed_assertion("Entity_C", "prop", 3.0, 3000);
|
|
|
|
let mut journal = Journal::open(&wal_dir).expect("open journal");
|
|
journal.append(serialize_assertion(&a1).expect("ser")).expect("append");
|
|
journal.append(serialize_assertion(&a2).expect("ser")).expect("append");
|
|
journal.append(serialize_assertion(&a3).expect("ser")).expect("append");
|
|
|
|
let journal = Arc::new(Mutex::new(journal));
|
|
let store = Arc::new(SledStore::open(&db_dir).expect("open store"));
|
|
|
|
// Worker 1: Process first 2 assertions
|
|
let mut worker1 = IngestWorker::new(journal.clone(), store.clone()).await.expect("worker1");
|
|
worker1.step().await.expect("step 1");
|
|
worker1.step().await.expect("step 2");
|
|
|
|
// Verify 2 assertions stored
|
|
let h_entries = store.scan_prefix(b"H:").await.expect("scan");
|
|
assert_eq!(h_entries.len(), 2, "worker1 should have processed 2 assertions");
|
|
|
|
// Drop worker1, simulate restart
|
|
drop(worker1);
|
|
|
|
// Worker 2: Should resume from cursor and only process the third assertion
|
|
let mut worker2 = IngestWorker::new(journal.clone(), store.clone()).await.expect("worker2");
|
|
|
|
let mut steps = 0;
|
|
while worker2.step().await.expect("step") > 0 {
|
|
steps += 1;
|
|
}
|
|
assert_eq!(steps, 1, "worker2 should only process 1 new assertion");
|
|
|
|
// Verify all 3 assertions now stored
|
|
let h_entries = store.scan_prefix(b"H:").await.expect("scan");
|
|
assert_eq!(h_entries.len(), 3, "should have all 3 assertions");
|
|
}
|
|
|
|
/// Test: Event-driven materialization via Notify.
|
|
///
|
|
/// Validates that the IngestWorker signals downstream consumers (Materializer)
|
|
/// when new data is available.
|
|
#[tokio::test]
|
|
async fn test_e2e_notify_integration() {
|
|
let dir = tempdir().expect("create temp dir");
|
|
let wal_dir = dir.path().join("wal");
|
|
let db_dir = dir.path().join("db");
|
|
|
|
let assertion = create_signed_assertion("Tesla_Inc", "deliveries", 500000.0, 1000);
|
|
|
|
let mut journal = Journal::open(&wal_dir).expect("open journal");
|
|
journal.append(serialize_assertion(&assertion).expect("ser")).expect("append");
|
|
|
|
let journal = Arc::new(Mutex::new(journal));
|
|
let store = Arc::new(SledStore::open(&db_dir).expect("open store"));
|
|
let notify = Arc::new(Notify::new());
|
|
|
|
// Track if notification was received
|
|
let notify_clone = Arc::clone(¬ify);
|
|
let notification_received = Arc::new(std::sync::atomic::AtomicBool::new(false));
|
|
let notification_received_clone = Arc::clone(¬ification_received);
|
|
|
|
// Spawn listener that waits for notification
|
|
let listener = tokio::spawn(async move {
|
|
tokio::time::timeout(std::time::Duration::from_secs(5), notify_clone.notified())
|
|
.await
|
|
.expect("timeout waiting for notification");
|
|
notification_received_clone.store(true, std::sync::atomic::Ordering::SeqCst);
|
|
});
|
|
|
|
// Give listener time to start
|
|
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
|
|
|
// Ingest with notify attached
|
|
let mut worker = IngestWorker::new(journal.clone(), store.clone())
|
|
.await
|
|
.expect("worker")
|
|
.with_notify(notify);
|
|
|
|
worker.step().await.expect("step");
|
|
|
|
// Wait for listener to complete
|
|
listener.await.expect("listener task");
|
|
|
|
assert!(
|
|
notification_received.load(std::sync::atomic::Ordering::SeqCst),
|
|
"notification should have been received"
|
|
);
|
|
}
|