stemedb/crates/stemedb-api/tests/e2e_full_pipeline.rs
jordan 3320c24afa feat: WAL hardening (Phase 5B) - CRC32C, crash recovery, group commit, log rotation
Add CRC32C checksums to WAL record format (v2), implement crash recovery
with automatic truncation of corrupt records, add feature-gated group commit
buffer for batched fsync under concurrent load, and implement log rotation
via segment files with global offset addressing.

Key changes:
- Record format v2: [len:u32][crc32c:u32][blake3:32][payload:N]
- recover_file() scans and truncates corrupt tail records
- GroupCommitBuffer batches fsync via MPSC channel (tokio feature gate)
- SegmentManager with binary search resolution and cursor-based cleanup
- Journal::read() auto-refreshes segments on miss for writer/reader split
- Split recovery.rs and key_codec.rs into directory modules for 500-line max

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-02 12:36:35 -07:00

327 lines
12 KiB
Rust

//! End-to-end integration test for the complete write → materialize → query flow.
//!
//! This test proves the complete StemeDB pipeline works:
//! 1. POST /v1/assert - Write a signed assertion to the WAL
//! 2. IngestWorker - Process the WAL and write to KV store + indexes
//! 3. Materializer - Resolve conflicts and write to MV:{subject}:{predicate}
//! 4. GET /v1/query - Read the materialized view through a lens
//!
//! The test uses Ed25519 signatures, tokio::sync::Notify for event-driven
//! materialization, and axum's test utilities for HTTP requests.
#![allow(clippy::expect_used)]
mod common;
use axum::{
body::Body,
http::{Request, StatusCode},
};
use ed25519_dalek::{Signer, SigningKey};
use rand::rngs::OsRng;
use serde_json::json;
use std::sync::Arc;
use tokio::sync::{Mutex, Notify};
use tower::ServiceExt;
use stemedb_api::create_router;
use stemedb_ingest::worker::IngestWorker;
use stemedb_lens::VoteAwareConsensusLens;
use stemedb_query::Materializer;
use stemedb_storage::{GenericVoteStore, HybridStore};
use stemedb_wal::Journal;
// Test configuration constants
const INGEST_ITERATIONS: usize = 10;
const INGEST_SLEEP_MS: u64 = 10;
const MATERIALIZER_ITERATIONS: usize = 5;
const MATERIALIZER_TIMEOUT_MS: u64 = 200;
const WORKER_SHUTDOWN_MS: u64 = 50;
const POLLING_TIMEOUT_MS: u64 = 500;
const POLLING_INTERVAL_MS: u64 = 50;
/// Test environment that keeps temp directories alive.
struct TestEnvironment {
_temp_dir: tempfile::TempDir,
state: stemedb_api::AppState,
store: Arc<HybridStore>,
journal: Arc<Mutex<Journal>>,
}
/// Helper to create a test environment with temporary directories.
async fn create_test_environment() -> TestEnvironment {
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let wal_dir = temp_dir.path().join("wal");
let db_dir = temp_dir.path().join("db");
std::fs::create_dir_all(&wal_dir).expect("Failed to create WAL dir");
std::fs::create_dir_all(&db_dir).expect("Failed to create DB dir");
let store = HybridStore::open(&db_dir).expect("Failed to open store");
let store_arc = Arc::new(store);
// Open journals: one for IngestWorker reads, one for AppState (write + read)
let journal_arc =
Arc::new(Mutex::new(Journal::open(&wal_dir).expect("Failed to open journal for ingest")));
let write_journal = Journal::open(&wal_dir).expect("Failed to open write journal");
let read_journal = Journal::open(&wal_dir).expect("Failed to open read journal");
let state = stemedb_api::AppState::new(write_journal, read_journal, Arc::clone(&store_arc));
TestEnvironment { _temp_dir: temp_dir, state, store: store_arc, journal: journal_arc }
}
/// Sign a message using Ed25519 and return the signature + public key.
fn sign_message(message: &str) -> ([u8; 32], [u8; 64]) {
let mut csprng = OsRng;
let signing_key = SigningKey::generate(&mut csprng);
let verifying_key = signing_key.verifying_key();
let signature = signing_key.sign(message.as_bytes());
(verifying_key.to_bytes(), signature.to_bytes())
}
#[tokio::test]
async fn test_e2e_write_materialize_query_flow() {
// ========================================================================
// Phase 1: Setup - Create environment and spawn background workers
// ========================================================================
let env = create_test_environment().await;
let state = env.state.clone();
let store = Arc::clone(&env.store);
let journal = Arc::clone(&env.journal);
// Create a notification channel for event-driven materialization
let notify = Arc::new(Notify::new());
// Spawn IngestWorker in the background with notification
let ingest_notify = Arc::clone(&notify);
let mut ingest_worker = IngestWorker::new(Arc::clone(&journal), Arc::clone(&store))
.await
.expect("Failed to create IngestWorker")
.with_notify(ingest_notify);
let ingest_handle = tokio::spawn(async move {
// Run a few ingestion steps (not infinite loop for tests)
for _ in 0..INGEST_ITERATIONS {
match ingest_worker.step().await {
Ok(0) => {
// No data, sleep briefly
tokio::time::sleep(tokio::time::Duration::from_millis(INGEST_SLEEP_MS)).await;
}
Ok(_) => {
// Processed data, continue immediately
}
Err(e) => {
tracing::error!("Ingestion error: {:?}", e);
break;
}
}
}
});
// Spawn Materializer in the background (event-driven mode)
let materializer_notify = Arc::clone(&notify);
let vote_store = Arc::new(GenericVoteStore::new(Arc::clone(&store)));
let lens = VoteAwareConsensusLens::new(vote_store);
let materializer = Arc::new(Materializer::new(Arc::clone(&store), Box::new(lens)));
let materializer_clone = Arc::clone(&materializer);
let materializer_handle = tokio::spawn(async move {
// Run event-driven materialization with a short timeout
// We'll only run a few passes for the test
for _ in 0..MATERIALIZER_ITERATIONS {
let was_notified = tokio::time::timeout(
tokio::time::Duration::from_millis(MATERIALIZER_TIMEOUT_MS),
materializer_notify.notified(),
)
.await
.is_ok();
if was_notified {
if let Err(e) = materializer_clone.step().await {
tracing::error!("Materialization error: {:?}", e);
}
}
}
});
// ========================================================================
// Phase 2: Write - Create a properly signed assertion via API
// ========================================================================
let subject = "Tesla_Inc";
let predicate = "has_revenue";
let message = format!("{}:{}", subject, predicate);
let (agent_id, signature) = sign_message(&message);
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Time error")
.as_secs();
let assertion_request = json!({
"subject": subject,
"predicate": predicate,
"object": {"type": "Number", "value": 96.7},
"confidence": 0.95,
"signatures": [{
"agent_id": hex::encode(agent_id),
"signature": hex::encode(signature),
"timestamp": timestamp
}],
"source_hash": hex::encode([3u8; 32])
});
let app = create_router(state.clone());
let create_request = Request::builder()
.uri("/v1/assert")
.method("POST")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&assertion_request).expect("JSON serialization")))
.expect("Failed to build request");
let create_response = app.clone().oneshot(create_request).await.expect("Request failed");
let status = create_response.status();
// Debug: Print response if not successful
if status != StatusCode::CREATED {
let body_bytes = axum::body::to_bytes(create_response.into_body(), usize::MAX)
.await
.expect("Failed to read error body");
let error_text = String::from_utf8_lossy(&body_bytes);
panic!("Assertion creation failed with status {}: {}", status, error_text);
}
assert_eq!(status, StatusCode::CREATED, "Assertion creation should succeed");
let create_body = axum::body::to_bytes(create_response.into_body(), usize::MAX)
.await
.expect("Failed to read response body");
let create_json: serde_json::Value =
serde_json::from_slice(&create_body).expect("Failed to parse JSON");
let assertion_hash = create_json["hash"].as_str().expect("Missing hash field").to_string();
assert_eq!(create_json["status"], "created");
assert!(!assertion_hash.is_empty(), "Hash should not be empty");
// ========================================================================
// Phase 3: Wait - Allow ingestion and materialization to complete
// ========================================================================
// Poll for the assertion to be available instead of blind sleep
// This is more robust and fails fast if something goes wrong
let start = tokio::time::Instant::now();
let timeout = tokio::time::Duration::from_millis(POLLING_TIMEOUT_MS);
let poll_interval = tokio::time::Duration::from_millis(POLLING_INTERVAL_MS);
loop {
// Try to query the assertion
let test_query = Request::builder()
.uri(format!("/v1/query?subject={}&predicate={}", subject, predicate))
.method("GET")
.body(Body::empty())
.expect("Failed to build test query");
let test_response = app.clone().oneshot(test_query).await.expect("Query failed");
let test_body = axum::body::to_bytes(test_response.into_body(), usize::MAX)
.await
.expect("Failed to read body");
let test_json: serde_json::Value =
serde_json::from_slice(&test_body).expect("Failed to parse JSON");
if test_json["total_count"].as_u64().unwrap_or(0) > 0 {
// Assertion is available, proceed
break;
}
if start.elapsed() > timeout {
panic!("Timeout waiting for assertion to be ingested and materialized");
}
tokio::time::sleep(poll_interval).await;
}
// ========================================================================
// Phase 4: Query - Read the assertion via lens-based query
// ========================================================================
let query_request = Request::builder()
.uri(format!("/v1/query?subject={}&predicate={}&lens=Consensus", subject, predicate))
.method("GET")
.body(Body::empty())
.expect("Failed to build query request");
let query_response = app.oneshot(query_request).await.expect("Query request failed");
assert_eq!(query_response.status(), StatusCode::OK, "Query should succeed");
let query_body = axum::body::to_bytes(query_response.into_body(), usize::MAX)
.await
.expect("Failed to read query response body");
let query_json: serde_json::Value =
serde_json::from_slice(&query_body).expect("Failed to parse query JSON");
// ========================================================================
// Phase 5: Verify - Assertion matches what we created
// ========================================================================
let assertions = query_json["assertions"].as_array().expect("Missing assertions array");
assert_eq!(assertions.len(), 1, "Should return exactly one assertion after lens resolution");
let returned_assertion = &assertions[0];
// Verify the assertion fields
assert_eq!(
returned_assertion["hash"].as_str().expect("Missing hash"),
assertion_hash,
"Returned assertion hash should match created hash"
);
assert_eq!(returned_assertion["subject"].as_str().expect("Missing subject"), subject);
assert_eq!(returned_assertion["predicate"].as_str().expect("Missing predicate"), predicate);
assert_eq!(
returned_assertion["object"]["type"].as_str().expect("Missing object type"),
"Number"
);
assert!(
(returned_assertion["object"]["value"].as_f64().expect("Missing object value") - 96.7)
.abs()
< 0.01
);
assert!(
(returned_assertion["confidence"].as_f64().expect("Missing confidence") - 0.95).abs()
< 0.01
);
// Verify signature round-trip
let signatures = returned_assertion["signatures"].as_array().expect("Missing signatures array");
assert_eq!(signatures.len(), 1, "Should have one signature");
assert_eq!(
signatures[0]["agent_id"].as_str().expect("Missing agent_id"),
hex::encode(agent_id)
);
assert_eq!(
signatures[0]["signature"].as_str().expect("Missing signature"),
hex::encode(signature)
);
// Verify metadata
assert_eq!(query_json["total_count"], 1);
assert_eq!(query_json["has_more"], false);
// ========================================================================
// Cleanup - Abort background workers
// ========================================================================
ingest_handle.abort();
materializer_handle.abort();
// Wait a bit to ensure clean shutdown
tokio::time::sleep(tokio::time::Duration::from_millis(WORKER_SHUTDOWN_MS)).await;
}