//! Partition tolerance tests for distributed consistency. //! //! These tests verify that StemeDB continues to accept writes during network //! partitions and converges correctly after partition heals. //! //! # Test Strategy //! //! We simulate partitions by: //! 1. Creating multiple in-process "nodes" with separate membership views //! 2. "Partitioning" = stopping gossip propagation between groups //! 3. Verifying writes succeed on both sides of the partition //! 4. "Healing" = resuming gossip propagation //! 5. Verifying convergence via CRDT merge #![allow(clippy::unwrap_used, clippy::expect_used)] use std::collections::{HashMap, HashSet}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::Arc; use stemedb_cluster::config::SwimConfig; use stemedb_cluster::membership::{NodeId, NodeInfo, SwimMembership}; use stemedb_cluster::sharding::{MetaRange, RangeRouter}; use stemedb_core::serde::serialize; use stemedb_core::testing::AssertionBuilder; use stemedb_core::types::HlcTimestamp; use stemedb_merkle::MerkleTree; use stemedb_storage::crdt::{AssertionTransfer, CrdtAssertionStore}; use stemedb_storage::HybridStore; use tempfile::tempdir; // ============================================================================= // Test Helpers // ============================================================================= fn test_addr(port: u16) -> SocketAddr { SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port) } fn test_node_id(n: u8) -> NodeId { NodeId::from_bytes([n; 16]) } fn test_node_info(n: u8) -> NodeInfo { let id = test_node_id(n); NodeInfo::new(id, test_addr(9090 + n as u16), test_addr(8080 + n as u16)) } /// A simulated cluster node for partition tolerance testing. struct SimNode { id: NodeId, #[allow(dead_code)] membership: Arc, router: Arc, #[allow(dead_code)] store: Arc, crdt_store: Arc>, merkle_tree: MerkleTree, /// Maps hash -> (subject, data) for sync operations. hash_to_data: HashMap<[u8; 32], (String, Vec)>, #[allow(dead_code)] temp_dir: tempfile::TempDir, } impl SimNode { /// Create a new simulated node. fn new(n: u8) -> Self { let id = test_node_id(n); let info = test_node_info(n); let temp_dir = tempdir().expect("create temp dir"); let store = Arc::new(HybridStore::open(temp_dir.path()).expect("open store")); let crdt_store = Arc::new(CrdtAssertionStore::new(store.clone(), *id.as_bytes())); let membership = Arc::new(SwimMembership::new(info, SwimConfig::default())); let router = Arc::new(RangeRouter::new(id)); Self { id, membership, router, store, crdt_store, merkle_tree: MerkleTree::new(), hash_to_data: HashMap::new(), temp_dir, } } /// Initialize sharding with the given nodes. fn init_shards(&self, nodes: &[NodeId], num_shards: u32, replication_factor: u32) { let meta = MetaRange::with_initial_shards(num_shards, nodes, replication_factor); self.router.update_meta_range(meta); } /// Add an assertion to this node (simulating a local write). async fn write_assertion(&mut self, subject: &str, predicate: &str, hlc_time: u64) -> [u8; 32] { let assertion = AssertionBuilder::new() .subject(subject) .predicate(predicate) .hlc_timestamp(HlcTimestamp::new(hlc_time, *self.id.as_bytes())) .source_hash(rand_hash()) .build(); let data = serialize(&assertion).expect("serialize"); let hash = self.crdt_store.put_assertion(subject, &data).await.expect("put"); self.merkle_tree.insert(hash).expect("insert"); self.hash_to_data.insert(hash, (subject.to_string(), data)); hash } /// Check if this node can accept a write for the given subject. fn can_accept_write(&self, subject: &str) -> bool { // Route the subject to a shard let shard_id = match self.router.route_subject(subject) { Ok(id) => id, Err(_) => return false, }; // Check if local node is a replica for this shard match self.router.get_replicas(shard_id) { Ok(replicas) => replicas.contains(&self.id), Err(_) => false, } } /// Get all leaves (assertion hashes). fn leaves(&self) -> Vec<[u8; 32]> { self.merkle_tree.leaves().to_vec() } /// Canonical Merkle root for convergence verification. fn canonical_merkle_root(&self) -> Option<[u8; 32]> { let mut sorted_leaves = self.merkle_tree.leaves().to_vec(); if sorted_leaves.is_empty() { return None; } sorted_leaves.sort(); let mut canonical = MerkleTree::new(); for leaf in sorted_leaves { canonical.insert(leaf).ok()?; } canonical.root().ok() } /// Sync from another node (simulating anti-entropy after partition heals). async fn sync_from(&mut self, other: &SimNode) { let my_leaves: HashSet<_> = self.leaves().into_iter().collect(); for hash in other.leaves() { if !my_leaves.contains(&hash) { if let Some((subject, data)) = other.hash_to_data.get(&hash) { let transfer = AssertionTransfer { hash, data: data.clone() }; if self .crdt_store .merge_with_data(subject, std::slice::from_ref(&transfer)) .await .is_ok() { self.merkle_tree.insert(hash).expect("insert"); self.hash_to_data.insert(hash, (subject.clone(), data.clone())); } } } } } } /// Generate a random hash for test assertions. fn rand_hash() -> [u8; 32] { use std::time::{SystemTime, UNIX_EPOCH}; let nanos = SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.as_nanos()).unwrap_or(0); let mut hash = [0u8; 32]; hash[..16].copy_from_slice(&nanos.to_le_bytes()); // Add some randomness with thread ID let tid = std::thread::current().id(); hash[16..24].copy_from_slice(&format!("{tid:?}").as_bytes()[..8.min(format!("{tid:?}").len())]); hash } // ============================================================================= // Partition Tolerance Tests // ============================================================================= /// Test: Writes succeed on both sides of a partition. /// /// Simulates a 3-node cluster partitioned into [A] and [B, C]. /// Both sides should continue accepting writes for their shards. #[tokio::test] async fn test_write_succeeds_during_partition() { // Create 3 nodes let mut node_a = SimNode::new(1); let mut node_b = SimNode::new(2); let node_c = SimNode::new(3); let nodes = vec![node_a.id, node_b.id, node_c.id]; // Initialize shards: 4 shards, RF=2 // Each node will be replica for some shards node_a.init_shards(&nodes, 4, 2); node_b.init_shards(&nodes, 4, 2); node_c.init_shards(&nodes, 4, 2); // PARTITION: A is isolated from B and C // (In this simulation, we simply don't sync between partitions) // Find subjects that route to shards replicated on node A let mut subject_for_a = None; for i in 0..100 { let subject = format!("test:subject:{i}"); if node_a.can_accept_write(&subject) { subject_for_a = Some(subject); break; } } // Find subjects that route to shards replicated on node B let mut subject_for_b = None; for i in 100..200 { let subject = format!("test:subject:{i}"); if node_b.can_accept_write(&subject) { subject_for_b = Some(subject); break; } } let subject_a = subject_for_a.expect("should find subject for node A"); let subject_b = subject_for_b.expect("should find subject for node B"); // Both sides of partition can write let hash_a = node_a.write_assertion(&subject_a, "predicate", 1000).await; let hash_b = node_b.write_assertion(&subject_b, "predicate", 2000).await; // Verify writes succeeded assert!(!hash_a.iter().all(|&b| b == 0), "Node A write should succeed"); assert!(!hash_b.iter().all(|&b| b == 0), "Node B write should succeed"); // Each node has its own assertion assert_eq!(node_a.leaves().len(), 1); assert_eq!(node_b.leaves().len(), 1); } /// Test: Post-partition convergence. /// /// After a partition heals, both sides should have all writes /// via anti-entropy sync. #[tokio::test] async fn test_post_partition_convergence() { let mut node_a = SimNode::new(1); let mut node_b = SimNode::new(2); let nodes = vec![node_a.id, node_b.id]; node_a.init_shards(&nodes, 4, 2); node_b.init_shards(&nodes, 4, 2); // PARTITION: A and B are isolated // Node A writes assertion A1 let _hash_a = node_a.write_assertion("subject:a", "pred", 1000).await; // Node B writes assertion B1 let _hash_b = node_b.write_assertion("subject:b", "pred", 2000).await; // Before heal: each has only its own assert_eq!(node_a.leaves().len(), 1); assert_eq!(node_b.leaves().len(), 1); assert_ne!(node_a.canonical_merkle_root(), node_b.canonical_merkle_root()); // PARTITION HEALS: Sync both ways node_a.sync_from(&node_b).await; node_b.sync_from(&node_a).await; // After heal: both have all assertions assert_eq!(node_a.leaves().len(), 2, "Node A should have 2 assertions after sync"); assert_eq!(node_b.leaves().len(), 2, "Node B should have 2 assertions after sync"); // Canonical roots should match assert_eq!( node_a.canonical_merkle_root(), node_b.canonical_merkle_root(), "Nodes should converge after partition heals" ); } /// Test: Concurrent writes to same subject from both sides of partition. /// /// Both partitions write to the same subject. After heal: /// - Both assertions should exist (append-only) /// - Lens should resolve deterministically #[tokio::test] async fn test_concurrent_writes_both_survive() { let mut node_a = SimNode::new(1); let mut node_b = SimNode::new(2); let nodes = vec![node_a.id, node_b.id]; node_a.init_shards(&nodes, 4, 2); node_b.init_shards(&nodes, 4, 2); // Both write to same subject during partition let subject = "claim:earth-shape"; let hash_a = node_a.write_assertion(subject, "is:round", 1000).await; let hash_b = node_b.write_assertion(subject, "is:spheroid", 2000).await; // Hashes are different (different predicates, different HLC times) assert_ne!(hash_a, hash_b); // PARTITION HEALS node_a.sync_from(&node_b).await; node_b.sync_from(&node_a).await; // Both assertions survive - append-only means no data loss let a_leaves: HashSet<_> = node_a.leaves().into_iter().collect(); let b_leaves: HashSet<_> = node_b.leaves().into_iter().collect(); assert!(a_leaves.contains(&hash_a), "Node A should have assertion A"); assert!(a_leaves.contains(&hash_b), "Node A should have assertion B after sync"); assert!(b_leaves.contains(&hash_a), "Node B should have assertion A after sync"); assert!(b_leaves.contains(&hash_b), "Node B should have assertion B"); // Same set on both nodes assert_eq!(a_leaves, b_leaves, "Both nodes should have identical assertion sets"); } /// Test: Multi-partition scenario with 4 nodes. /// /// Partition into [A, B] and [C, D]. Each partition writes. /// After heal, all 4 nodes should converge. #[tokio::test] async fn test_multi_partition_convergence() { let mut node_a = SimNode::new(1); let mut node_b = SimNode::new(2); let mut node_c = SimNode::new(3); let mut node_d = SimNode::new(4); let nodes = vec![node_a.id, node_b.id, node_c.id, node_d.id]; for node in [&mut node_a, &mut node_b, &mut node_c, &mut node_d] { node.init_shards(&nodes, 8, 2); } // PARTITION: [A, B] and [C, D] // Partition 1 writes let _h1 = node_a.write_assertion("partition1:data", "value1", 1000).await; node_b.sync_from(&node_a).await; // Sync within partition // Partition 2 writes let _h2 = node_c.write_assertion("partition2:data", "value2", 2000).await; node_d.sync_from(&node_c).await; // Sync within partition // Before heal: partitions have different data assert_ne!(node_a.canonical_merkle_root(), node_c.canonical_merkle_root()); // PARTITION HEALS: Full mesh sync node_a.sync_from(&node_c).await; node_b.sync_from(&node_d).await; node_c.sync_from(&node_a).await; node_d.sync_from(&node_b).await; // All nodes should have same canonical root let root_a = node_a.canonical_merkle_root(); let root_b = node_b.canonical_merkle_root(); let root_c = node_c.canonical_merkle_root(); let root_d = node_d.canonical_merkle_root(); assert_eq!(root_a, root_b, "A and B should match"); assert_eq!(root_b, root_c, "B and C should match"); assert_eq!(root_c, root_d, "C and D should match"); // All should have 2 assertions assert_eq!(node_a.leaves().len(), 2); assert_eq!(node_b.leaves().len(), 2); assert_eq!(node_c.leaves().len(), 2); assert_eq!(node_d.leaves().len(), 2); } /// Test: Rapid writes during partition don't cause data loss. /// /// Simulate high-frequency writes on both sides of partition, /// then verify all writes survive after heal. #[tokio::test] async fn test_rapid_writes_during_partition_no_data_loss() { let mut node_a = SimNode::new(1); let mut node_b = SimNode::new(2); let nodes = vec![node_a.id, node_b.id]; node_a.init_shards(&nodes, 4, 2); node_b.init_shards(&nodes, 4, 2); // Rapid writes on both sides let mut hashes_a = Vec::new(); let mut hashes_b = Vec::new(); for i in 0..10 { let subject = format!("rapid:a:{i}"); hashes_a.push(node_a.write_assertion(&subject, "pred", 1000 + i).await); } for i in 0..10 { let subject = format!("rapid:b:{i}"); hashes_b.push(node_b.write_assertion(&subject, "pred", 2000 + i).await); } // Before heal assert_eq!(node_a.leaves().len(), 10); assert_eq!(node_b.leaves().len(), 10); // PARTITION HEALS node_a.sync_from(&node_b).await; node_b.sync_from(&node_a).await; // All 20 assertions should exist on both nodes assert_eq!(node_a.leaves().len(), 20, "Node A should have all 20 assertions"); assert_eq!(node_b.leaves().len(), 20, "Node B should have all 20 assertions"); // Verify specific hashes let a_leaves: HashSet<_> = node_a.leaves().into_iter().collect(); let b_leaves: HashSet<_> = node_b.leaves().into_iter().collect(); for hash in &hashes_a { assert!(a_leaves.contains(hash), "Node A should have its own assertion"); assert!(b_leaves.contains(hash), "Node B should have A's assertion after sync"); } for hash in &hashes_b { assert!(a_leaves.contains(hash), "Node A should have B's assertion after sync"); assert!(b_leaves.contains(hash), "Node B should have its own assertion"); } }