//! Availability tests for distributed consistency. //! //! These tests verify that StemeDB provides high availability: //! - Reads succeed on any replica that has the shard //! - Writes are accepted by any replica (not just leader) //! - Node failures don't block operations on other nodes #![allow(clippy::unwrap_used, clippy::expect_used)] use std::collections::HashMap; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::Arc; use stemedb_cluster::config::SwimConfig; use stemedb_cluster::membership::{NodeId, NodeInfo, SwimMembership}; use stemedb_cluster::sharding::{MetaRange, RangeRouter}; use stemedb_core::serde::serialize; use stemedb_core::testing::AssertionBuilder; use stemedb_core::types::HlcTimestamp; use stemedb_merkle::MerkleTree; use stemedb_storage::crdt::{AssertionTransfer, CrdtAssertionStore}; use stemedb_storage::HybridStore; use tempfile::tempdir; // ============================================================================= // Test Helpers // ============================================================================= fn test_addr(port: u16) -> SocketAddr { SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port) } fn test_node_id(n: u8) -> NodeId { NodeId::from_bytes([n; 16]) } fn test_node_info(n: u8) -> NodeInfo { let id = test_node_id(n); NodeInfo::new(id, test_addr(9090 + n as u16), test_addr(8080 + n as u16)) } /// A simulated cluster node for availability testing. struct AvailabilityNode { id: NodeId, #[allow(dead_code)] membership: Arc, router: Arc, #[allow(dead_code)] store: Arc, crdt_store: Arc>, merkle_tree: MerkleTree, hash_to_data: HashMap<[u8; 32], (String, Vec)>, /// Simulated node failure state. failed: bool, #[allow(dead_code)] temp_dir: tempfile::TempDir, } impl AvailabilityNode { fn new(n: u8) -> Self { let id = test_node_id(n); let info = test_node_info(n); let temp_dir = tempdir().expect("create temp dir"); let store = Arc::new(HybridStore::open(temp_dir.path()).expect("open store")); let crdt_store = Arc::new(CrdtAssertionStore::new(store.clone(), *id.as_bytes())); let membership = Arc::new(SwimMembership::new(info, SwimConfig::default())); let router = Arc::new(RangeRouter::new(id)); Self { id, membership, router, store, crdt_store, merkle_tree: MerkleTree::new(), hash_to_data: HashMap::new(), failed: false, temp_dir, } } fn init_shards(&self, nodes: &[NodeId], num_shards: u32, replication_factor: u32) { let meta = MetaRange::with_initial_shards(num_shards, nodes, replication_factor); self.router.update_meta_range(meta); } /// Check if this node is a replica for the given subject's shard. #[allow(dead_code)] fn is_replica_for(&self, subject: &str) -> bool { if self.failed { return false; } let shard_id = match self.router.route_subject(subject) { Ok(id) => id, Err(_) => return false, }; match self.router.get_replicas(shard_id) { Ok(replicas) => replicas.contains(&self.id), Err(_) => false, } } /// Check if this node is the leader for the given subject's shard. fn is_leader_for(&self, subject: &str) -> bool { if self.failed { return false; } let shard_id = match self.router.route_subject(subject) { Ok(id) => id, Err(_) => return false, }; match self.router.get_leader(shard_id) { Ok(leader) => leader == self.id, Err(_) => false, } } /// Write an assertion (succeeds if node is not failed). async fn write(&mut self, subject: &str, predicate: &str, hlc_time: u64) -> Option<[u8; 32]> { if self.failed { return None; } let assertion = AssertionBuilder::new() .subject(subject) .predicate(predicate) .hlc_timestamp(HlcTimestamp::new(hlc_time, *self.id.as_bytes())) .source_hash(rand_hash()) .build(); let data = serialize(&assertion).expect("serialize"); let hash = self.crdt_store.put_assertion(subject, &data).await.expect("put"); self.merkle_tree.insert(hash).expect("insert"); self.hash_to_data.insert(hash, (subject.to_string(), data)); Some(hash) } /// Read assertion data (succeeds if node is not failed). async fn read(&self, subject: &str, hash: &[u8; 32]) -> Option> { if self.failed { return None; } self.crdt_store.get_assertion(subject, hash).await.ok().flatten() } /// Simulate node failure. fn fail(&mut self) { self.failed = true; } /// Recover from failure. #[allow(dead_code)] fn recover(&mut self) { self.failed = false; } /// Check if node is available. fn is_available(&self) -> bool { !self.failed } /// Get all leaves. fn leaves(&self) -> Vec<[u8; 32]> { self.merkle_tree.leaves().to_vec() } /// Sync from another node. async fn sync_from(&mut self, other: &AvailabilityNode) { if self.failed || other.failed { return; } let my_leaves: std::collections::HashSet<_> = self.leaves().into_iter().collect(); for hash in other.leaves() { if !my_leaves.contains(&hash) { if let Some((subject, data)) = other.hash_to_data.get(&hash) { let transfer = AssertionTransfer { hash, data: data.clone() }; if self .crdt_store .merge_with_data(subject, std::slice::from_ref(&transfer)) .await .is_ok() { self.merkle_tree.insert(hash).expect("insert"); self.hash_to_data.insert(hash, (subject.clone(), data.clone())); } } } } } } fn rand_hash() -> [u8; 32] { use std::time::{SystemTime, UNIX_EPOCH}; let nanos = SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.as_nanos()).unwrap_or(0); let mut hash = [0u8; 32]; hash[..16].copy_from_slice(&nanos.to_le_bytes()); let tid = std::thread::current().id(); hash[16..24].copy_from_slice(&format!("{tid:?}").as_bytes()[..8.min(format!("{tid:?}").len())]); hash } // ============================================================================= // Availability Tests // ============================================================================= /// Test: Read succeeds on any replica that has the shard. /// /// Write data to one node, sync to replicas, verify read works on any replica. #[tokio::test] async fn test_read_any_replica() { let mut node_a = AvailabilityNode::new(1); let mut node_b = AvailabilityNode::new(2); let mut node_c = AvailabilityNode::new(3); let nodes = vec![node_a.id, node_b.id, node_c.id]; // RF=3 means all nodes are replicas for all shards node_a.init_shards(&nodes, 4, 3); node_b.init_shards(&nodes, 4, 3); node_c.init_shards(&nodes, 4, 3); // Write on node A let subject = "test:subject"; let hash = node_a.write(subject, "predicate", 1000).await.expect("write"); // Sync to all replicas node_b.sync_from(&node_a).await; node_c.sync_from(&node_a).await; // Read should succeed on all replicas let data_a = node_a.read(subject, &hash).await; let data_b = node_b.read(subject, &hash).await; let data_c = node_c.read(subject, &hash).await; assert!(data_a.is_some(), "Read should succeed on node A (writer)"); assert!(data_b.is_some(), "Read should succeed on node B (replica)"); assert!(data_c.is_some(), "Read should succeed on node C (replica)"); // Data should be identical assert_eq!(data_a, data_b, "Data should match across replicas A and B"); assert_eq!(data_b, data_c, "Data should match across replicas B and C"); } /// Test: Write is accepted by any replica (not just leader). /// /// StemeDB uses leaderless replication - any replica can accept writes. #[tokio::test] async fn test_write_any_replica() { let mut node_a = AvailabilityNode::new(1); let mut node_b = AvailabilityNode::new(2); let mut node_c = AvailabilityNode::new(3); let nodes = vec![node_a.id, node_b.id, node_c.id]; // RF=3 means all nodes are replicas node_a.init_shards(&nodes, 4, 3); node_b.init_shards(&nodes, 4, 3); node_c.init_shards(&nodes, 4, 3); let subject = "test:subject"; // Identify who is leader and who isn't let a_is_leader = node_a.is_leader_for(subject); let b_is_leader = node_b.is_leader_for(subject); // Find a non-leader node let (non_leader_writes, non_leader_id) = if !a_is_leader { let hash = node_a.write(subject, "from-non-leader", 1000).await; (hash.is_some(), "A") } else if !b_is_leader { let hash = node_b.write(subject, "from-non-leader", 1000).await; (hash.is_some(), "B") } else { let hash = node_c.write(subject, "from-non-leader", 1000).await; (hash.is_some(), "C") }; assert!( non_leader_writes, "Non-leader node {} should accept writes (leaderless replication)", non_leader_id ); } /// Test: Node failure doesn't block operations on other nodes. /// /// When one node fails, other nodes should continue serving reads and writes. #[tokio::test] async fn test_node_failure_isolation() { let mut node_a = AvailabilityNode::new(1); let mut node_b = AvailabilityNode::new(2); let mut node_c = AvailabilityNode::new(3); let nodes = vec![node_a.id, node_b.id, node_c.id]; node_a.init_shards(&nodes, 4, 3); node_b.init_shards(&nodes, 4, 3); node_c.init_shards(&nodes, 4, 3); // Initial write on A let subject = "test:subject"; let hash1 = node_a.write(subject, "before-failure", 1000).await.expect("write"); // Sync before failure node_b.sync_from(&node_a).await; node_c.sync_from(&node_a).await; // NODE A FAILS node_a.fail(); assert!(!node_a.is_available(), "Node A should be unavailable"); // Verify node A operations fail let a_read = node_a.read(subject, &hash1).await; let a_write = node_a.write(subject, "during-failure", 2000).await; assert!(a_read.is_none(), "Read on failed node should fail"); assert!(a_write.is_none(), "Write on failed node should fail"); // BUT: B and C should continue working normally assert!(node_b.is_available(), "Node B should still be available"); assert!(node_c.is_available(), "Node C should still be available"); // Reads still work on B and C let b_read = node_b.read(subject, &hash1).await; let c_read = node_c.read(subject, &hash1).await; assert!(b_read.is_some(), "Read on node B should succeed during A failure"); assert!(c_read.is_some(), "Read on node C should succeed during A failure"); // Writes still work on B and C let hash2 = node_b.write(subject, "during-a-failure", 2000).await; let hash3 = node_c.write(subject, "also-during-failure", 3000).await; assert!(hash2.is_some(), "Write on node B should succeed during A failure"); assert!(hash3.is_some(), "Write on node C should succeed during A failure"); // Sync between surviving nodes node_b.sync_from(&node_c).await; node_c.sync_from(&node_b).await; // Both B and C should have all data assert_eq!(node_b.leaves().len(), 3, "Node B should have 3 assertions"); assert_eq!(node_c.leaves().len(), 3, "Node C should have 3 assertions"); } /// Test: Read availability with quorum. /// /// With RF=3 and 2 nodes available, reads should succeed. #[tokio::test] async fn test_read_quorum_availability() { let mut node_a = AvailabilityNode::new(1); let mut node_b = AvailabilityNode::new(2); let mut node_c = AvailabilityNode::new(3); let nodes = vec![node_a.id, node_b.id, node_c.id]; node_a.init_shards(&nodes, 4, 3); node_b.init_shards(&nodes, 4, 3); node_c.init_shards(&nodes, 4, 3); // Write and sync to all let subject = "test:subject"; let hash = node_a.write(subject, "predicate", 1000).await.expect("write"); node_b.sync_from(&node_a).await; node_c.sync_from(&node_a).await; // Fail one node - quorum (2/3) still available node_c.fail(); // Read should succeed on remaining nodes let read_a = node_a.read(subject, &hash).await; let read_b = node_b.read(subject, &hash).await; assert!(read_a.is_some(), "Read on A should succeed with quorum available"); assert!(read_b.is_some(), "Read on B should succeed with quorum available"); } /// Test: Write availability with quorum. /// /// With RF=3 and 2 nodes available, writes should succeed. #[tokio::test] async fn test_write_quorum_availability() { let mut node_a = AvailabilityNode::new(1); let mut node_b = AvailabilityNode::new(2); let mut node_c = AvailabilityNode::new(3); let nodes = vec![node_a.id, node_b.id, node_c.id]; node_a.init_shards(&nodes, 4, 3); node_b.init_shards(&nodes, 4, 3); node_c.init_shards(&nodes, 4, 3); // Fail one node node_c.fail(); // Writes should succeed on remaining nodes let subject = "test:subject"; let write_a = node_a.write(subject, "pred1", 1000).await; let write_b = node_b.write(subject, "pred2", 2000).await; assert!(write_a.is_some(), "Write on A should succeed with quorum available"); assert!(write_b.is_some(), "Write on B should succeed with quorum available"); // Sync between surviving nodes node_a.sync_from(&node_b).await; node_b.sync_from(&node_a).await; // Both should have both writes assert_eq!(node_a.leaves().len(), 2); assert_eq!(node_b.leaves().len(), 2); } /// Test: All replicas eventually have identical data. /// /// This is the core eventual consistency guarantee. #[tokio::test] async fn test_eventual_consistency_across_replicas() { let mut node_a = AvailabilityNode::new(1); let mut node_b = AvailabilityNode::new(2); let mut node_c = AvailabilityNode::new(3); let nodes = vec![node_a.id, node_b.id, node_c.id]; node_a.init_shards(&nodes, 4, 3); node_b.init_shards(&nodes, 4, 3); node_c.init_shards(&nodes, 4, 3); // Each node writes independently let h1 = node_a.write("s1", "p1", 1000).await.expect("write"); let h2 = node_b.write("s2", "p2", 2000).await.expect("write"); let h3 = node_c.write("s3", "p3", 3000).await.expect("write"); // Before sync: each has only its own assert_eq!(node_a.leaves().len(), 1); assert_eq!(node_b.leaves().len(), 1); assert_eq!(node_c.leaves().len(), 1); // Full mesh sync (simulating anti-entropy) node_a.sync_from(&node_b).await; node_a.sync_from(&node_c).await; node_b.sync_from(&node_a).await; node_b.sync_from(&node_c).await; node_c.sync_from(&node_a).await; node_c.sync_from(&node_b).await; // After sync: all have all data assert_eq!(node_a.leaves().len(), 3, "Node A should have all 3 assertions"); assert_eq!(node_b.leaves().len(), 3, "Node B should have all 3 assertions"); assert_eq!(node_c.leaves().len(), 3, "Node C should have all 3 assertions"); // Verify specific hashes let a_leaves: std::collections::HashSet<_> = node_a.leaves().into_iter().collect(); let b_leaves: std::collections::HashSet<_> = node_b.leaves().into_iter().collect(); let c_leaves: std::collections::HashSet<_> = node_c.leaves().into_iter().collect(); assert!(a_leaves.contains(&h1) && a_leaves.contains(&h2) && a_leaves.contains(&h3)); assert!(b_leaves.contains(&h1) && b_leaves.contains(&h2) && b_leaves.contains(&h3)); assert!(c_leaves.contains(&h1) && c_leaves.contains(&h2) && c_leaves.contains(&h3)); // All sets should be identical assert_eq!(a_leaves, b_leaves, "A and B should have identical data"); assert_eq!(b_leaves, c_leaves, "B and C should have identical data"); }