//! End-to-end integration test for the write → materialize → query flow. //! //! This test proves the complete StemeDB pipeline works: //! 1. POST /v1/assert - Write a signed assertion to the WAL //! 2. IngestWorker - Process the WAL and write to KV store + indexes //! 3. Materializer - Resolve conflicts and write to MV:{subject}:{predicate} //! 4. GET /v1/query - Read the materialized view through a lens //! //! The test uses Ed25519 signatures, tokio::sync::Notify for event-driven //! materialization, and axum's test utilities for HTTP requests. #![allow(clippy::expect_used)] use axum::{ body::Body, http::{Request, StatusCode}, }; use ed25519_dalek::{Signer, SigningKey}; use rand::rngs::OsRng; use serde_json::json; use std::sync::Arc; use tokio::sync::{Mutex, Notify}; use tower::ServiceExt; use stemedb_api::{create_router, AppState}; use stemedb_ingest::worker::IngestWorker; use stemedb_lens::VoteAwareConsensusLens; use stemedb_query::Materializer; use stemedb_storage::{GenericVoteStore, SledStore}; use stemedb_wal::Journal; // Test configuration constants const INGEST_ITERATIONS: usize = 10; const INGEST_SLEEP_MS: u64 = 10; const MATERIALIZER_ITERATIONS: usize = 5; const MATERIALIZER_TIMEOUT_MS: u64 = 200; const WORKER_SHUTDOWN_MS: u64 = 50; const POLLING_TIMEOUT_MS: u64 = 500; const POLLING_INTERVAL_MS: u64 = 50; /// Test environment that keeps temp directories alive. struct TestEnvironment { _temp_dir: tempfile::TempDir, state: AppState, store: Arc, journal: Arc>, } /// Helper to create a test environment with temporary directories. async fn create_test_environment() -> TestEnvironment { let temp_dir = tempfile::tempdir().expect("Failed to create temp dir"); let wal_dir = temp_dir.path().join("wal"); let db_dir = temp_dir.path().join("db"); std::fs::create_dir_all(&wal_dir).expect("Failed to create WAL dir"); std::fs::create_dir_all(&db_dir).expect("Failed to create DB dir"); let journal = Journal::open(&wal_dir).expect("Failed to open journal"); let store = SledStore::open(&db_dir).expect("Failed to open store"); let journal_arc = Arc::new(Mutex::new(journal)); let store_arc = Arc::new(store); // Open a second journal handle for AppState (WAL supports multiple readers) let journal_for_state = Journal::open(&wal_dir).expect("Failed to open second journal handle"); let state = AppState::new(journal_for_state, (*store_arc).clone()); TestEnvironment { _temp_dir: temp_dir, state, store: store_arc, journal: journal_arc } } /// Sign a message using Ed25519 and return the signature + public key. fn sign_message(message: &str) -> ([u8; 32], [u8; 64]) { let mut csprng = OsRng; let signing_key = SigningKey::generate(&mut csprng); let verifying_key = signing_key.verifying_key(); let signature = signing_key.sign(message.as_bytes()); (verifying_key.to_bytes(), signature.to_bytes()) } #[tokio::test] async fn test_e2e_write_materialize_query_flow() { // ======================================================================== // Phase 1: Setup - Create environment and spawn background workers // ======================================================================== let env = create_test_environment().await; let state = env.state.clone(); let store = Arc::clone(&env.store); let journal = Arc::clone(&env.journal); // Create a notification channel for event-driven materialization let notify = Arc::new(Notify::new()); // Spawn IngestWorker in the background with notification let ingest_notify = Arc::clone(¬ify); let mut ingest_worker = IngestWorker::new(Arc::clone(&journal), Arc::clone(&store)) .await .expect("Failed to create IngestWorker") .with_notify(ingest_notify); let ingest_handle = tokio::spawn(async move { // Run a few ingestion steps (not infinite loop for tests) for _ in 0..INGEST_ITERATIONS { match ingest_worker.step().await { Ok(0) => { // No data, sleep briefly tokio::time::sleep(tokio::time::Duration::from_millis(INGEST_SLEEP_MS)).await; } Ok(_) => { // Processed data, continue immediately } Err(e) => { tracing::error!("Ingestion error: {:?}", e); break; } } } }); // Spawn Materializer in the background (event-driven mode) let materializer_notify = Arc::clone(¬ify); let vote_store = Arc::new(GenericVoteStore::new(Arc::clone(&store))); let lens = VoteAwareConsensusLens::new(vote_store); let materializer = Arc::new(Materializer::new(Arc::clone(&store), Box::new(lens))); let materializer_clone = Arc::clone(&materializer); let materializer_handle = tokio::spawn(async move { // Run event-driven materialization with a short timeout // We'll only run a few passes for the test for _ in 0..MATERIALIZER_ITERATIONS { let was_notified = tokio::time::timeout( tokio::time::Duration::from_millis(MATERIALIZER_TIMEOUT_MS), materializer_notify.notified(), ) .await .is_ok(); if was_notified { if let Err(e) = materializer_clone.step().await { tracing::error!("Materialization error: {:?}", e); } } } }); // ======================================================================== // Phase 2: Write - Create a properly signed assertion via API // ======================================================================== let subject = "Tesla_Inc"; let predicate = "has_revenue"; let message = format!("{}:{}", subject, predicate); let (agent_id, signature) = sign_message(&message); let timestamp = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .expect("Time error") .as_secs(); let assertion_request = json!({ "subject": subject, "predicate": predicate, "object": {"type": "Number", "value": 96.7}, "confidence": 0.95, "signatures": [{ "agent_id": hex::encode(agent_id), "signature": hex::encode(signature), "timestamp": timestamp }], "source_hash": hex::encode([3u8; 32]) }); let app = create_router(state.clone()); let create_request = Request::builder() .uri("/v1/assert") .method("POST") .header("content-type", "application/json") .body(Body::from(serde_json::to_vec(&assertion_request).expect("JSON serialization"))) .expect("Failed to build request"); let create_response = app.clone().oneshot(create_request).await.expect("Request failed"); let status = create_response.status(); // Debug: Print response if not successful if status != StatusCode::CREATED { let body_bytes = axum::body::to_bytes(create_response.into_body(), usize::MAX) .await .expect("Failed to read error body"); let error_text = String::from_utf8_lossy(&body_bytes); panic!("Assertion creation failed with status {}: {}", status, error_text); } assert_eq!(status, StatusCode::CREATED, "Assertion creation should succeed"); let create_body = axum::body::to_bytes(create_response.into_body(), usize::MAX) .await .expect("Failed to read response body"); let create_json: serde_json::Value = serde_json::from_slice(&create_body).expect("Failed to parse JSON"); let assertion_hash = create_json["hash"].as_str().expect("Missing hash field").to_string(); assert_eq!(create_json["status"], "created"); assert!(!assertion_hash.is_empty(), "Hash should not be empty"); // ======================================================================== // Phase 3: Wait - Allow ingestion and materialization to complete // ======================================================================== // Poll for the assertion to be available instead of blind sleep // This is more robust and fails fast if something goes wrong let start = tokio::time::Instant::now(); let timeout = tokio::time::Duration::from_millis(POLLING_TIMEOUT_MS); let poll_interval = tokio::time::Duration::from_millis(POLLING_INTERVAL_MS); loop { // Try to query the assertion let test_query = Request::builder() .uri(format!("/v1/query?subject={}&predicate={}", subject, predicate)) .method("GET") .body(Body::empty()) .expect("Failed to build test query"); let test_response = app.clone().oneshot(test_query).await.expect("Query failed"); let test_body = axum::body::to_bytes(test_response.into_body(), usize::MAX) .await .expect("Failed to read body"); let test_json: serde_json::Value = serde_json::from_slice(&test_body).expect("Failed to parse JSON"); if test_json["total_count"].as_u64().unwrap_or(0) > 0 { // Assertion is available, proceed break; } if start.elapsed() > timeout { panic!("Timeout waiting for assertion to be ingested and materialized"); } tokio::time::sleep(poll_interval).await; } // ======================================================================== // Phase 4: Query - Read the assertion via lens-based query // ======================================================================== let query_request = Request::builder() .uri(format!("/v1/query?subject={}&predicate={}&lens=Consensus", subject, predicate)) .method("GET") .body(Body::empty()) .expect("Failed to build query request"); let query_response = app.oneshot(query_request).await.expect("Query request failed"); assert_eq!(query_response.status(), StatusCode::OK, "Query should succeed"); let query_body = axum::body::to_bytes(query_response.into_body(), usize::MAX) .await .expect("Failed to read query response body"); let query_json: serde_json::Value = serde_json::from_slice(&query_body).expect("Failed to parse query JSON"); // ======================================================================== // Phase 5: Verify - Assertion matches what we created // ======================================================================== let assertions = query_json["assertions"].as_array().expect("Missing assertions array"); assert_eq!(assertions.len(), 1, "Should return exactly one assertion after lens resolution"); let returned_assertion = &assertions[0]; // Verify the assertion fields assert_eq!( returned_assertion["hash"].as_str().expect("Missing hash"), assertion_hash, "Returned assertion hash should match created hash" ); assert_eq!(returned_assertion["subject"].as_str().expect("Missing subject"), subject); assert_eq!(returned_assertion["predicate"].as_str().expect("Missing predicate"), predicate); assert_eq!( returned_assertion["object"]["type"].as_str().expect("Missing object type"), "Number" ); assert!( (returned_assertion["object"]["value"].as_f64().expect("Missing object value") - 96.7) .abs() < 0.01 ); assert!( (returned_assertion["confidence"].as_f64().expect("Missing confidence") - 0.95).abs() < 0.01 ); // Verify signature round-trip let signatures = returned_assertion["signatures"].as_array().expect("Missing signatures array"); assert_eq!(signatures.len(), 1, "Should have one signature"); assert_eq!( signatures[0]["agent_id"].as_str().expect("Missing agent_id"), hex::encode(agent_id) ); assert_eq!( signatures[0]["signature"].as_str().expect("Missing signature"), hex::encode(signature) ); // Verify metadata assert_eq!(query_json["total_count"], 1); assert_eq!(query_json["has_more"], false); // ======================================================================== // Cleanup - Abort background workers // ======================================================================== ingest_handle.abort(); materializer_handle.abort(); // Wait a bit to ensure clean shutdown tokio::time::sleep(tokio::time::Duration::from_millis(WORKER_SHUTDOWN_MS)).await; } #[tokio::test] async fn test_e2e_query_with_no_lens_returns_all_candidates() { // ======================================================================== // Setup // ======================================================================== let env = create_test_environment().await; let state = env.state.clone(); let store = Arc::clone(&env.store); let journal = Arc::clone(&env.journal); let notify = Arc::new(Notify::new()); let ingest_notify = Arc::clone(¬ify); let mut ingest_worker = IngestWorker::new(Arc::clone(&journal), Arc::clone(&store)) .await .expect("Failed to create IngestWorker") .with_notify(ingest_notify); // ======================================================================== // Create two competing assertions for the same subject+predicate // ======================================================================== let subject = "Apple_Inc"; let predicate = "has_revenue"; // Assertion 1: revenue = 380.0 let message1 = format!("{}:{}", subject, predicate); let (agent_id1, signature1) = sign_message(&message1); let timestamp1 = 1000; let assertion1 = json!({ "subject": subject, "predicate": predicate, "object": {"type": "Number", "value": 380.0}, "confidence": 0.9, "signatures": [{ "agent_id": hex::encode(agent_id1), "signature": hex::encode(signature1), "timestamp": timestamp1 }], "source_hash": hex::encode([1u8; 32]) }); // Assertion 2: revenue = 385.0 (newer timestamp) let message2 = format!("{}:{}", subject, predicate); let (agent_id2, signature2) = sign_message(&message2); let timestamp2 = 2000; let assertion2 = json!({ "subject": subject, "predicate": predicate, "object": {"type": "Number", "value": 385.0}, "confidence": 0.85, "signatures": [{ "agent_id": hex::encode(agent_id2), "signature": hex::encode(signature2), "timestamp": timestamp2 }], "source_hash": hex::encode([2u8; 32]) }); let app = create_router(state.clone()); // Create both assertions for assertion in [&assertion1, &assertion2] { let request = Request::builder() .uri("/v1/assert") .method("POST") .header("content-type", "application/json") .body(Body::from(serde_json::to_vec(assertion).expect("JSON serialization"))) .expect("Failed to build request"); let response = app.clone().oneshot(request).await.expect("Request failed"); assert_eq!(response.status(), StatusCode::CREATED); } // Process ingestion for _ in 0..INGEST_ITERATIONS { if ingest_worker.step().await.expect("Ingestion failed") == 0 { break; } } // ======================================================================== // Query without a lens - should return both assertions // ======================================================================== let query_request = Request::builder() .uri(format!("/v1/query?subject={}&predicate={}", subject, predicate)) .method("GET") .body(Body::empty()) .expect("Failed to build query request"); let query_response = app.oneshot(query_request).await.expect("Query request failed"); assert_eq!(query_response.status(), StatusCode::OK); let query_body = axum::body::to_bytes(query_response.into_body(), usize::MAX) .await .expect("Failed to read query response body"); let query_json: serde_json::Value = serde_json::from_slice(&query_body).expect("Failed to parse query JSON"); let assertions = query_json["assertions"].as_array().expect("Missing assertions array"); assert_eq!(assertions.len(), 2, "Without a lens, should return all candidate assertions"); // Verify both values are present let values: Vec = assertions.iter().map(|a| a["object"]["value"].as_f64().expect("Missing value")).collect(); assert!(values.contains(&380.0), "Should contain first assertion value"); assert!(values.contains(&385.0), "Should contain second assertion value"); assert_eq!(query_json["total_count"], 2); } #[tokio::test] async fn test_e2e_lens_resolution_picks_most_recent() { // ======================================================================== // Setup // ======================================================================== let env = create_test_environment().await; let state = env.state.clone(); let store = Arc::clone(&env.store); let journal = Arc::clone(&env.journal); let notify = Arc::new(Notify::new()); let ingest_notify = Arc::clone(¬ify); let mut ingest_worker = IngestWorker::new(Arc::clone(&journal), Arc::clone(&store)) .await .expect("Failed to create IngestWorker") .with_notify(ingest_notify); // ======================================================================== // Create two assertions with different timestamps // ======================================================================== let subject = "Microsoft_Corp"; let predicate = "has_ceo"; // Older assertion: Satya (timestamp 1000) let message1 = format!("{}:{}", subject, predicate); let (agent_id1, signature1) = sign_message(&message1); let old_assertion = json!({ "subject": subject, "predicate": predicate, "object": {"type": "Text", "value": "Satya_Nadella"}, "confidence": 0.9, "signatures": [{ "agent_id": hex::encode(agent_id1), "signature": hex::encode(signature1), "timestamp": 1000 }], "source_hash": hex::encode([1u8; 32]) }); // Newer assertion: Bill (timestamp 5000) - hypothetically outdated info let message2 = format!("{}:{}", subject, predicate); let (agent_id2, signature2) = sign_message(&message2); let new_assertion = json!({ "subject": subject, "predicate": predicate, "object": {"type": "Text", "value": "Bill_Gates"}, "confidence": 0.8, "signatures": [{ "agent_id": hex::encode(agent_id2), "signature": hex::encode(signature2), "timestamp": 5000 }], "source_hash": hex::encode([2u8; 32]) }); let app = create_router(state.clone()); // Create both assertions for assertion in [&old_assertion, &new_assertion] { let request = Request::builder() .uri("/v1/assert") .method("POST") .header("content-type", "application/json") .body(Body::from(serde_json::to_vec(assertion).expect("JSON serialization"))) .expect("Failed to build request"); let response = app.clone().oneshot(request).await.expect("Request failed"); assert_eq!(response.status(), StatusCode::CREATED); } // Process ingestion for _ in 0..INGEST_ITERATIONS { if ingest_worker.step().await.expect("Ingestion failed") == 0 { break; } } // ======================================================================== // Query with Recency lens - should pick the newer one // ======================================================================== let query_request = Request::builder() .uri(format!("/v1/query?subject={}&predicate={}&lens=Recency", subject, predicate)) .method("GET") .body(Body::empty()) .expect("Failed to build query request"); let query_response = app.oneshot(query_request).await.expect("Query request failed"); assert_eq!(query_response.status(), StatusCode::OK); let query_body = axum::body::to_bytes(query_response.into_body(), usize::MAX) .await .expect("Failed to read query response body"); let query_json: serde_json::Value = serde_json::from_slice(&query_body).expect("Failed to parse query JSON"); let assertions = query_json["assertions"].as_array().expect("Missing assertions array"); assert_eq!(assertions.len(), 1, "Recency lens should return exactly one assertion"); // Verify it picked the newer one (Bill_Gates at timestamp 5000) let winner = &assertions[0]; assert_eq!( winner["object"]["value"].as_str().expect("Missing value"), "Bill_Gates", "Recency lens should pick the most recent assertion" ); assert_eq!(winner["signatures"][0]["timestamp"].as_u64().expect("Missing timestamp"), 5000); } #[tokio::test] async fn test_e2e_empty_query_returns_no_results() { // ======================================================================== // Setup with no data // ======================================================================== let env = create_test_environment().await; let app = create_router(env.state); // ======================================================================== // Query for non-existent subject // ======================================================================== let query_request = Request::builder() .uri("/v1/query?subject=Nonexistent_Entity&predicate=some_property") .method("GET") .body(Body::empty()) .expect("Failed to build query request"); let query_response = app.oneshot(query_request).await.expect("Query request failed"); assert_eq!(query_response.status(), StatusCode::OK); let query_body = axum::body::to_bytes(query_response.into_body(), usize::MAX) .await .expect("Failed to read query response body"); let query_json: serde_json::Value = serde_json::from_slice(&query_body).expect("Failed to parse query JSON"); assert_eq!(query_json["assertions"], json!([])); assert_eq!(query_json["total_count"], 0); assert_eq!(query_json["has_more"], false); }