//! Convergence verification tests for distributed consistency. //! //! These tests verify that two diverged nodes converge after sync. //! The CRDT properties are tested individually but end-to-end convergence //! is what we verify here. //! //! # Test Harness //! //! Uses in-process "nodes" that each have their own: //! - `KVStore` (temp dir) //! - `CrdtAssertionStore` //! - `MerkleTreeManager` //! //! Sync is simulated by calling `get_state()` → `merge()` directly, //! rather than going through gRPC. // Allow expect in test code for convenience #![allow(clippy::expect_used)] use std::collections::HashMap; use std::sync::Arc; use stemedb_core::serde::serialize; use stemedb_core::testing::AssertionBuilder; use stemedb_core::types::HlcTimestamp; use stemedb_lens::{HlcRecencyLens, Lens}; use stemedb_merkle::MerkleTree; use stemedb_storage::crdt::{AssertionTransfer, CrdtAssertionStore}; use stemedb_storage::HybridStore; use tempfile::tempdir; /// A simulated node for convergence testing. struct TestNode { #[allow(dead_code)] name: String, node_id: [u8; 16], #[allow(dead_code)] store: Arc, crdt_store: Arc>, merkle_tree: MerkleTree, /// Maps hash -> (subject, data) for sync operations. /// In production, this would be stored/retrieved differently. hash_to_data: HashMap<[u8; 32], (String, Vec)>, #[allow(dead_code)] temp_dir: tempfile::TempDir, } impl TestNode { /// Create a new test node with a unique node ID. fn new(name: &str, node_id: [u8; 16]) -> Self { let temp_dir = tempdir().expect("create temp dir"); let store = Arc::new(HybridStore::open(temp_dir.path()).expect("open store")); let crdt_store = Arc::new(CrdtAssertionStore::new(store.clone(), node_id)); Self { name: name.to_string(), node_id, store, crdt_store, merkle_tree: MerkleTree::new(), hash_to_data: HashMap::new(), temp_dir, } } /// Add an assertion to this node. async fn add_assertion(&mut self, subject: &str, predicate: &str, hlc_time: u64) -> [u8; 32] { let assertion = AssertionBuilder::new() .subject(subject) .predicate(predicate) .hlc_timestamp(HlcTimestamp::new(hlc_time, self.node_id)) .source_hash(rand_hash()) // Unique source hash for determinism .build(); let data = serialize(&assertion).expect("serialize"); let hash = self.crdt_store.put_assertion(subject, &data).await.expect("put"); self.merkle_tree.insert(hash).expect("insert"); self.hash_to_data.insert(hash, (subject.to_string(), data)); hash } /// Get the Merkle root of this node (order-sensitive). #[allow(dead_code)] fn merkle_root(&self) -> Option<[u8; 32]> { self.merkle_tree.root().ok() } /// Get a canonical Merkle root (sorted leaves) for convergence verification. /// /// The standard Merkle tree preserves insertion order, which differs between /// nodes depending on sync timing. For convergence testing, we compute a /// canonical root by sorting leaves first, ensuring deterministic comparison. fn canonical_merkle_root(&self) -> Option<[u8; 32]> { let mut sorted_leaves = self.merkle_tree.leaves().to_vec(); if sorted_leaves.is_empty() { return None; } sorted_leaves.sort(); // Rebuild tree with sorted leaves let mut canonical = MerkleTree::new(); for leaf in sorted_leaves { canonical.insert(leaf).ok()?; } canonical.root().ok() } /// Get all leaves (assertion hashes) in the Merkle tree. fn leaves(&self) -> Vec<[u8; 32]> { self.merkle_tree.leaves().to_vec() } /// Sync from another node by fetching missing assertions. /// /// This simulates the anti-entropy sync process: we find hashes that /// the other node has but we don't, then copy the assertion data. async fn sync_from(&mut self, other: &TestNode) { let my_leaves: std::collections::HashSet<_> = self.leaves().into_iter().collect(); // Find what the other node has that we don't for hash in other.leaves() { if !my_leaves.contains(&hash) { // In a real system, assertion data is fetched via RPC. // Here we use the hash_to_data map as our "RPC layer". if let Some((subject, data)) = other.hash_to_data.get(&hash) { let transfer = AssertionTransfer { hash, data: data.clone() }; if self .crdt_store .merge_with_data(subject, std::slice::from_ref(&transfer)) .await .is_ok() { self.merkle_tree.insert(hash).expect("insert"); // Also store in our hash_to_data for transitive sync self.hash_to_data.insert(hash, (subject.clone(), data.clone())); } } } } } } /// Generate a random hash for test assertions. fn rand_hash() -> [u8; 32] { use std::time::{SystemTime, UNIX_EPOCH}; let nanos = SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.as_nanos()).unwrap_or(0); let mut hash = [0u8; 32]; hash[..16].copy_from_slice(&nanos.to_le_bytes()); hash[16..32].copy_from_slice(&nanos.wrapping_add(1).to_le_bytes()); hash } // ============================================================================= // Convergence Tests // ============================================================================= /// Test: Two nodes with disjoint assertion sets converge after sync. /// /// Node A has [A1, A2], Node B has [B1]. /// After sync: both have [A1, A2, B1] and Merkle roots match. #[tokio::test] async fn test_two_node_disjoint_convergence() { let mut node_a = TestNode::new("NodeA", [1u8; 16]); let mut node_b = TestNode::new("NodeB", [2u8; 16]); // Node A adds A1, A2 let a1 = node_a.add_assertion("A", "pred1", 1000).await; let a2 = node_a.add_assertion("A", "pred2", 2000).await; // Node B adds B1 let b1 = node_b.add_assertion("B", "pred1", 1500).await; // Before sync: canonical roots differ assert_ne!(node_a.canonical_merkle_root(), node_b.canonical_merkle_root()); assert_eq!(node_a.leaves().len(), 2); assert_eq!(node_b.leaves().len(), 1); // Sync: A pulls from B, B pulls from A node_a.sync_from(&node_b).await; node_b.sync_from(&node_a).await; // After sync: both have all three assertions let a_leaves: std::collections::HashSet<_> = node_a.leaves().into_iter().collect(); let b_leaves: std::collections::HashSet<_> = node_b.leaves().into_iter().collect(); assert!(a_leaves.contains(&a1), "Node A should have A1"); assert!(a_leaves.contains(&a2), "Node A should have A2"); assert!(a_leaves.contains(&b1), "Node A should have B1 after sync"); assert!(b_leaves.contains(&a1), "Node B should have A1 after sync"); assert!(b_leaves.contains(&a2), "Node B should have A2 after sync"); assert!(b_leaves.contains(&b1), "Node B should have B1"); // Canonical Merkle roots should match (same set of leaves) assert_eq!( node_a.canonical_merkle_root(), node_b.canonical_merkle_root(), "Canonical Merkle roots should match after convergence" ); } /// Test: Two nodes with overlapping assertion sets converge without duplicates. /// /// Node A has [X, Y], Node B has [Y, Z]. /// After sync: both have [X, Y, Z] with no duplicates. #[tokio::test] async fn test_two_node_overlapping_convergence() { let mut node_a = TestNode::new("NodeA", [1u8; 16]); let mut node_b = TestNode::new("NodeB", [2u8; 16]); // Node A adds X, Y let x = node_a.add_assertion("X", "pred", 1000).await; let y = node_a.add_assertion("Y", "pred", 2000).await; // Copy Y to Node B (simulating earlier sync) if let Some((subject, y_data)) = node_a.hash_to_data.get(&y).cloned() { let transfer = AssertionTransfer { hash: y, data: y_data.clone() }; let _ = node_b.crdt_store.merge_with_data(&subject, std::slice::from_ref(&transfer)).await; node_b.merkle_tree.insert(y).expect("insert"); node_b.hash_to_data.insert(y, (subject, y_data)); } // Node B adds Z let z = node_b.add_assertion("Z", "pred", 3000).await; // Verify initial state assert_eq!(node_a.leaves().len(), 2); // X, Y assert_eq!(node_b.leaves().len(), 2); // Y, Z // Sync both ways node_a.sync_from(&node_b).await; node_b.sync_from(&node_a).await; // After sync: both have X, Y, Z let a_leaves: std::collections::HashSet<_> = node_a.leaves().into_iter().collect(); let b_leaves: std::collections::HashSet<_> = node_b.leaves().into_iter().collect(); assert_eq!(a_leaves.len(), 3, "Node A should have 3 assertions"); assert_eq!(b_leaves.len(), 3, "Node B should have 3 assertions"); assert!(a_leaves.contains(&x)); assert!(a_leaves.contains(&y)); assert!(a_leaves.contains(&z)); assert!(b_leaves.contains(&x)); assert!(b_leaves.contains(&y)); assert!(b_leaves.contains(&z)); // Canonical Merkle roots should match assert_eq!(node_a.canonical_merkle_root(), node_b.canonical_merkle_root()); } /// Test: HlcRecencyLens produces same winner on both nodes after convergence. /// /// After sync, both nodes should resolve to the same "most recent" assertion /// using HlcRecencyLens, demonstrating deterministic resolution. #[tokio::test] async fn test_lens_determinism_after_convergence() { let mut node_a = TestNode::new("NodeA", [1u8; 16]); let mut node_b = TestNode::new("NodeB", [2u8; 16]); // Both nodes add assertions for the same subject but with different HLC times // Node A: older assertion (HLC time 1000) let _older = node_a.add_assertion("S1", "pred", 1000).await; // Node B: newer assertion (HLC time 2000) let _newer = node_b.add_assertion("S1", "pred", 2000).await; // Sync both ways node_a.sync_from(&node_b).await; node_b.sync_from(&node_a).await; // Verify both nodes have the same assertions assert_eq!( node_a.canonical_merkle_root(), node_b.canonical_merkle_root(), "Canonical Merkle roots should match after sync" ); // Now use HlcRecencyLens on both nodes // In a real system, we'd query the CRDT store for all assertions for "S1" // and resolve with the lens. Here we verify the lens is deterministic // by creating equivalent assertion sets. let lens = HlcRecencyLens; // Create assertions that would be in both stores after sync let older_assertion = AssertionBuilder::new() .subject("S1") .predicate("pred") .hlc_timestamp(HlcTimestamp::new(1000, [1u8; 16])) .build(); let newer_assertion = AssertionBuilder::new() .subject("S1") .predicate("pred") .hlc_timestamp(HlcTimestamp::new(2000, [2u8; 16])) .build(); // Resolve on "Node A" (order: older first) let resolution_a = lens.resolve(&[older_assertion.clone(), newer_assertion.clone()]); // Resolve on "Node B" (order: newer first) let resolution_b = lens.resolve(&[newer_assertion.clone(), older_assertion.clone()]); // Both should pick the same winner (newer assertion with HLC 2000) assert!(resolution_a.winner.is_some()); assert!(resolution_b.winner.is_some()); let winner_a = resolution_a.winner.as_ref().expect("winner_a"); let winner_b = resolution_b.winner.as_ref().expect("winner_b"); assert_eq!( winner_a.hlc_timestamp.time_ntp64, winner_b.hlc_timestamp.time_ntp64, "Both nodes should resolve to the same HLC time" ); assert_eq!( winner_a.hlc_timestamp.time_ntp64, 2000, "Winner should be the assertion with HLC 2000" ); } /// Test: Three-node chain convergence. /// /// A → B sync, B → C sync, C → A sync. /// All three should converge to the same state. #[tokio::test] async fn test_three_node_chain_convergence() { let mut node_a = TestNode::new("NodeA", [1u8; 16]); let mut node_b = TestNode::new("NodeB", [2u8; 16]); let mut node_c = TestNode::new("NodeC", [3u8; 16]); // Each node adds one assertion let a1 = node_a.add_assertion("A", "pred", 1000).await; let b1 = node_b.add_assertion("B", "pred", 2000).await; let c1 = node_c.add_assertion("C", "pred", 3000).await; // Chain sync: A → B → C → A node_b.sync_from(&node_a).await; // B gets A1 node_c.sync_from(&node_b).await; // C gets A1, B1 node_a.sync_from(&node_c).await; // A gets B1, C1 // Additional round to fully propagate node_b.sync_from(&node_a).await; // B gets C1 node_c.sync_from(&node_b).await; // C is already complete // All nodes should have all assertions let a_leaves: std::collections::HashSet<_> = node_a.leaves().into_iter().collect(); let b_leaves: std::collections::HashSet<_> = node_b.leaves().into_iter().collect(); let c_leaves: std::collections::HashSet<_> = node_c.leaves().into_iter().collect(); for (name, leaves) in [("A", &a_leaves), ("B", &b_leaves), ("C", &c_leaves)] { assert!(leaves.contains(&a1), "Node {} should have A1", name); assert!(leaves.contains(&b1), "Node {} should have B1", name); assert!(leaves.contains(&c1), "Node {} should have C1", name); } // All canonical Merkle roots should match assert_eq!(node_a.canonical_merkle_root(), node_b.canonical_merkle_root()); assert_eq!(node_b.canonical_merkle_root(), node_c.canonical_merkle_root()); } /// Test: Merge commutativity - merge(A,B) == merge(B,A). /// /// CRDT property: the order of merge operations shouldn't matter. #[tokio::test] async fn test_merge_commutativity() { // Create two separate merge scenarios let mut node_ab = TestNode::new("NodeAB", [1u8; 16]); let mut node_ba = TestNode::new("NodeBA", [1u8; 16]); // Same node_id for comparable hashes // Assertions from "Node A" let a1 = AssertionBuilder::new() .subject("X") .predicate("p1") .hlc_timestamp(HlcTimestamp::new(1000, [1u8; 16])) .source_hash([10u8; 32]) .build(); let a2 = AssertionBuilder::new() .subject("X") .predicate("p2") .hlc_timestamp(HlcTimestamp::new(2000, [1u8; 16])) .source_hash([20u8; 32]) .build(); // Assertions from "Node B" let b1 = AssertionBuilder::new() .subject("X") .predicate("p3") .hlc_timestamp(HlcTimestamp::new(1500, [2u8; 16])) .source_hash([30u8; 32]) .build(); // Merge order 1: A then B let data_a1 = serialize(&a1).expect("serialize"); let data_a2 = serialize(&a2).expect("serialize"); let data_b1 = serialize(&b1).expect("serialize"); let hash_a1 = node_ab.crdt_store.put_assertion("X", &data_a1).await.expect("put"); let hash_a2 = node_ab.crdt_store.put_assertion("X", &data_a2).await.expect("put"); let hash_b1 = node_ab.crdt_store.put_assertion("X", &data_b1).await.expect("put"); node_ab.merkle_tree.insert(hash_a1).expect("insert"); node_ab.merkle_tree.insert(hash_a2).expect("insert"); node_ab.merkle_tree.insert(hash_b1).expect("insert"); // Merge order 2: B then A let hash_b1_2 = node_ba.crdt_store.put_assertion("X", &data_b1).await.expect("put"); let hash_a1_2 = node_ba.crdt_store.put_assertion("X", &data_a1).await.expect("put"); let hash_a2_2 = node_ba.crdt_store.put_assertion("X", &data_a2).await.expect("put"); node_ba.merkle_tree.insert(hash_b1_2).expect("insert"); node_ba.merkle_tree.insert(hash_a1_2).expect("insert"); node_ba.merkle_tree.insert(hash_a2_2).expect("insert"); // Hashes should be identical (content-addressed) assert_eq!(hash_a1, hash_a1_2, "A1 hash should be deterministic"); assert_eq!(hash_a2, hash_a2_2, "A2 hash should be deterministic"); assert_eq!(hash_b1, hash_b1_2, "B1 hash should be deterministic"); // Canonical Merkle roots should be identical regardless of merge order assert_eq!( node_ab.canonical_merkle_root(), node_ba.canonical_merkle_root(), "Merge order should not affect final state" ); } /// Test: Empty sync is a no-op. #[tokio::test] async fn test_empty_sync_is_noop() { let mut node_a = TestNode::new("NodeA", [1u8; 16]); let node_b = TestNode::new("NodeB", [2u8; 16]); // Node A has assertions let a1 = node_a.add_assertion("A", "pred", 1000).await; let root_before = node_a.canonical_merkle_root(); // Sync from empty node B node_a.sync_from(&node_b).await; // Node A should be unchanged assert_eq!(node_a.canonical_merkle_root(), root_before); assert!(node_a.leaves().contains(&a1)); } /// Test: Idempotent sync - syncing twice doesn't change state. #[tokio::test] async fn test_idempotent_sync() { let mut node_a = TestNode::new("NodeA", [1u8; 16]); let mut node_b = TestNode::new("NodeB", [2u8; 16]); // Both nodes have assertions let _a1 = node_a.add_assertion("A", "pred", 1000).await; let _b1 = node_b.add_assertion("B", "pred", 2000).await; // First sync node_a.sync_from(&node_b).await; let root_after_first_sync = node_a.canonical_merkle_root(); let leaves_after_first_sync = node_a.leaves().len(); // Second sync (should be no-op) node_a.sync_from(&node_b).await; let root_after_second_sync = node_a.canonical_merkle_root(); let leaves_after_second_sync = node_a.leaves().len(); assert_eq!( root_after_first_sync, root_after_second_sync, "Syncing twice should not change state" ); assert_eq!( leaves_after_first_sync, leaves_after_second_sync, "Number of assertions should not change on re-sync" ); }