stemedb/crates/stemedb-cluster/tests/partition_tolerance.rs
jordan d3a88585fe feat: Phase 6 UAT - Admission control, HLC recency, cluster coordination
This commit includes comprehensive work on Phase 6 features:

## Admission Control (Phase 6 admission middleware)
- AdmissionStore implementation backed by TrustRankStore
- PoW verification with tier-based difficulty computation
- Trust tier progression (Newcomer → Established → Trusted → Authority)
- API integration with admission status endpoints

## HLC Recency Lens (Phase 6C)
- HlcRecencyLens for distributed system ordering
- Hybrid logical clock integration with causality preservation

## Cluster Coordination (Phase 6C)
- Multi-node cluster tests (availability, partition tolerance)
- CRDT convergence tests for anti-entropy sync
- Gateway handler improvements

## Aphoria Code Linter (Phase 2A)
- RFC/OWASP corpus builders with network fetching and caching
- Concept hierarchy with auto-alias creation on conflict detection
- Multiple security extractors (TLS, JWT, CORS, secrets, rate limiting)

## Code Organization
- Split large files into modules to comply with 500-line limit
- Improved test organization with separate test modules
- Fixed rkyv serialization for EigenTrustState (AgentScore struct)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 00:43:37 -07:00

431 lines
15 KiB
Rust

//! Partition tolerance tests for distributed consistency.
//!
//! These tests verify that StemeDB continues to accept writes during network
//! partitions and converges correctly after partition heals.
//!
//! # Test Strategy
//!
//! We simulate partitions by:
//! 1. Creating multiple in-process "nodes" with separate membership views
//! 2. "Partitioning" = stopping gossip propagation between groups
//! 3. Verifying writes succeed on both sides of the partition
//! 4. "Healing" = resuming gossip propagation
//! 5. Verifying convergence via CRDT merge
#![allow(clippy::unwrap_used, clippy::expect_used)]
use std::collections::{HashMap, HashSet};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
use stemedb_cluster::config::SwimConfig;
use stemedb_cluster::membership::{NodeId, NodeInfo, SwimMembership};
use stemedb_cluster::sharding::{MetaRange, RangeRouter};
use stemedb_core::serde::serialize;
use stemedb_core::testing::AssertionBuilder;
use stemedb_core::types::HlcTimestamp;
use stemedb_merkle::MerkleTree;
use stemedb_storage::crdt::{AssertionTransfer, CrdtAssertionStore};
use stemedb_storage::HybridStore;
use tempfile::tempdir;
// =============================================================================
// Test Helpers
// =============================================================================
fn test_addr(port: u16) -> SocketAddr {
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port)
}
fn test_node_id(n: u8) -> NodeId {
NodeId::from_bytes([n; 16])
}
fn test_node_info(n: u8) -> NodeInfo {
let id = test_node_id(n);
NodeInfo::new(id, test_addr(9090 + n as u16), test_addr(8080 + n as u16))
}
/// A simulated cluster node for partition tolerance testing.
struct SimNode {
id: NodeId,
#[allow(dead_code)]
membership: Arc<SwimMembership>,
router: Arc<RangeRouter>,
#[allow(dead_code)]
store: Arc<HybridStore>,
crdt_store: Arc<CrdtAssertionStore<HybridStore>>,
merkle_tree: MerkleTree,
/// Maps hash -> (subject, data) for sync operations.
hash_to_data: HashMap<[u8; 32], (String, Vec<u8>)>,
#[allow(dead_code)]
temp_dir: tempfile::TempDir,
}
impl SimNode {
/// Create a new simulated node.
fn new(n: u8) -> Self {
let id = test_node_id(n);
let info = test_node_info(n);
let temp_dir = tempdir().expect("create temp dir");
let store = Arc::new(HybridStore::open(temp_dir.path()).expect("open store"));
let crdt_store = Arc::new(CrdtAssertionStore::new(store.clone(), *id.as_bytes()));
let membership = Arc::new(SwimMembership::new(info, SwimConfig::default()));
let router = Arc::new(RangeRouter::new(id));
Self {
id,
membership,
router,
store,
crdt_store,
merkle_tree: MerkleTree::new(),
hash_to_data: HashMap::new(),
temp_dir,
}
}
/// Initialize sharding with the given nodes.
fn init_shards(&self, nodes: &[NodeId], num_shards: u32, replication_factor: u32) {
let meta = MetaRange::with_initial_shards(num_shards, nodes, replication_factor);
self.router.update_meta_range(meta);
}
/// Add an assertion to this node (simulating a local write).
async fn write_assertion(&mut self, subject: &str, predicate: &str, hlc_time: u64) -> [u8; 32] {
let assertion = AssertionBuilder::new()
.subject(subject)
.predicate(predicate)
.hlc_timestamp(HlcTimestamp::new(hlc_time, *self.id.as_bytes()))
.source_hash(rand_hash())
.build();
let data = serialize(&assertion).expect("serialize");
let hash = self.crdt_store.put_assertion(subject, &data).await.expect("put");
self.merkle_tree.insert(hash).expect("insert");
self.hash_to_data.insert(hash, (subject.to_string(), data));
hash
}
/// Check if this node can accept a write for the given subject.
fn can_accept_write(&self, subject: &str) -> bool {
// Route the subject to a shard
let shard_id = match self.router.route_subject(subject) {
Ok(id) => id,
Err(_) => return false,
};
// Check if local node is a replica for this shard
match self.router.get_replicas(shard_id) {
Ok(replicas) => replicas.contains(&self.id),
Err(_) => false,
}
}
/// Get all leaves (assertion hashes).
fn leaves(&self) -> Vec<[u8; 32]> {
self.merkle_tree.leaves().to_vec()
}
/// Canonical Merkle root for convergence verification.
fn canonical_merkle_root(&self) -> Option<[u8; 32]> {
let mut sorted_leaves = self.merkle_tree.leaves().to_vec();
if sorted_leaves.is_empty() {
return None;
}
sorted_leaves.sort();
let mut canonical = MerkleTree::new();
for leaf in sorted_leaves {
canonical.insert(leaf).ok()?;
}
canonical.root().ok()
}
/// Sync from another node (simulating anti-entropy after partition heals).
async fn sync_from(&mut self, other: &SimNode) {
let my_leaves: HashSet<_> = self.leaves().into_iter().collect();
for hash in other.leaves() {
if !my_leaves.contains(&hash) {
if let Some((subject, data)) = other.hash_to_data.get(&hash) {
let transfer = AssertionTransfer { hash, data: data.clone() };
if self
.crdt_store
.merge_with_data(subject, std::slice::from_ref(&transfer))
.await
.is_ok()
{
self.merkle_tree.insert(hash).expect("insert");
self.hash_to_data.insert(hash, (subject.clone(), data.clone()));
}
}
}
}
}
}
/// Generate a random hash for test assertions.
fn rand_hash() -> [u8; 32] {
use std::time::{SystemTime, UNIX_EPOCH};
let nanos = SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.as_nanos()).unwrap_or(0);
let mut hash = [0u8; 32];
hash[..16].copy_from_slice(&nanos.to_le_bytes());
// Add some randomness with thread ID
let tid = std::thread::current().id();
hash[16..24].copy_from_slice(&format!("{tid:?}").as_bytes()[..8.min(format!("{tid:?}").len())]);
hash
}
// =============================================================================
// Partition Tolerance Tests
// =============================================================================
/// Test: Writes succeed on both sides of a partition.
///
/// Simulates a 3-node cluster partitioned into [A] and [B, C].
/// Both sides should continue accepting writes for their shards.
#[tokio::test]
async fn test_write_succeeds_during_partition() {
// Create 3 nodes
let mut node_a = SimNode::new(1);
let mut node_b = SimNode::new(2);
let node_c = SimNode::new(3);
let nodes = vec![node_a.id, node_b.id, node_c.id];
// Initialize shards: 4 shards, RF=2
// Each node will be replica for some shards
node_a.init_shards(&nodes, 4, 2);
node_b.init_shards(&nodes, 4, 2);
node_c.init_shards(&nodes, 4, 2);
// PARTITION: A is isolated from B and C
// (In this simulation, we simply don't sync between partitions)
// Find subjects that route to shards replicated on node A
let mut subject_for_a = None;
for i in 0..100 {
let subject = format!("test:subject:{i}");
if node_a.can_accept_write(&subject) {
subject_for_a = Some(subject);
break;
}
}
// Find subjects that route to shards replicated on node B
let mut subject_for_b = None;
for i in 100..200 {
let subject = format!("test:subject:{i}");
if node_b.can_accept_write(&subject) {
subject_for_b = Some(subject);
break;
}
}
let subject_a = subject_for_a.expect("should find subject for node A");
let subject_b = subject_for_b.expect("should find subject for node B");
// Both sides of partition can write
let hash_a = node_a.write_assertion(&subject_a, "predicate", 1000).await;
let hash_b = node_b.write_assertion(&subject_b, "predicate", 2000).await;
// Verify writes succeeded
assert!(!hash_a.iter().all(|&b| b == 0), "Node A write should succeed");
assert!(!hash_b.iter().all(|&b| b == 0), "Node B write should succeed");
// Each node has its own assertion
assert_eq!(node_a.leaves().len(), 1);
assert_eq!(node_b.leaves().len(), 1);
}
/// Test: Post-partition convergence.
///
/// After a partition heals, both sides should have all writes
/// via anti-entropy sync.
#[tokio::test]
async fn test_post_partition_convergence() {
let mut node_a = SimNode::new(1);
let mut node_b = SimNode::new(2);
let nodes = vec![node_a.id, node_b.id];
node_a.init_shards(&nodes, 4, 2);
node_b.init_shards(&nodes, 4, 2);
// PARTITION: A and B are isolated
// Node A writes assertion A1
let _hash_a = node_a.write_assertion("subject:a", "pred", 1000).await;
// Node B writes assertion B1
let _hash_b = node_b.write_assertion("subject:b", "pred", 2000).await;
// Before heal: each has only its own
assert_eq!(node_a.leaves().len(), 1);
assert_eq!(node_b.leaves().len(), 1);
assert_ne!(node_a.canonical_merkle_root(), node_b.canonical_merkle_root());
// PARTITION HEALS: Sync both ways
node_a.sync_from(&node_b).await;
node_b.sync_from(&node_a).await;
// After heal: both have all assertions
assert_eq!(node_a.leaves().len(), 2, "Node A should have 2 assertions after sync");
assert_eq!(node_b.leaves().len(), 2, "Node B should have 2 assertions after sync");
// Canonical roots should match
assert_eq!(
node_a.canonical_merkle_root(),
node_b.canonical_merkle_root(),
"Nodes should converge after partition heals"
);
}
/// Test: Concurrent writes to same subject from both sides of partition.
///
/// Both partitions write to the same subject. After heal:
/// - Both assertions should exist (append-only)
/// - Lens should resolve deterministically
#[tokio::test]
async fn test_concurrent_writes_both_survive() {
let mut node_a = SimNode::new(1);
let mut node_b = SimNode::new(2);
let nodes = vec![node_a.id, node_b.id];
node_a.init_shards(&nodes, 4, 2);
node_b.init_shards(&nodes, 4, 2);
// Both write to same subject during partition
let subject = "claim:earth-shape";
let hash_a = node_a.write_assertion(subject, "is:round", 1000).await;
let hash_b = node_b.write_assertion(subject, "is:spheroid", 2000).await;
// Hashes are different (different predicates, different HLC times)
assert_ne!(hash_a, hash_b);
// PARTITION HEALS
node_a.sync_from(&node_b).await;
node_b.sync_from(&node_a).await;
// Both assertions survive - append-only means no data loss
let a_leaves: HashSet<_> = node_a.leaves().into_iter().collect();
let b_leaves: HashSet<_> = node_b.leaves().into_iter().collect();
assert!(a_leaves.contains(&hash_a), "Node A should have assertion A");
assert!(a_leaves.contains(&hash_b), "Node A should have assertion B after sync");
assert!(b_leaves.contains(&hash_a), "Node B should have assertion A after sync");
assert!(b_leaves.contains(&hash_b), "Node B should have assertion B");
// Same set on both nodes
assert_eq!(a_leaves, b_leaves, "Both nodes should have identical assertion sets");
}
/// Test: Multi-partition scenario with 4 nodes.
///
/// Partition into [A, B] and [C, D]. Each partition writes.
/// After heal, all 4 nodes should converge.
#[tokio::test]
async fn test_multi_partition_convergence() {
let mut node_a = SimNode::new(1);
let mut node_b = SimNode::new(2);
let mut node_c = SimNode::new(3);
let mut node_d = SimNode::new(4);
let nodes = vec![node_a.id, node_b.id, node_c.id, node_d.id];
for node in [&mut node_a, &mut node_b, &mut node_c, &mut node_d] {
node.init_shards(&nodes, 8, 2);
}
// PARTITION: [A, B] and [C, D]
// Partition 1 writes
let _h1 = node_a.write_assertion("partition1:data", "value1", 1000).await;
node_b.sync_from(&node_a).await; // Sync within partition
// Partition 2 writes
let _h2 = node_c.write_assertion("partition2:data", "value2", 2000).await;
node_d.sync_from(&node_c).await; // Sync within partition
// Before heal: partitions have different data
assert_ne!(node_a.canonical_merkle_root(), node_c.canonical_merkle_root());
// PARTITION HEALS: Full mesh sync
node_a.sync_from(&node_c).await;
node_b.sync_from(&node_d).await;
node_c.sync_from(&node_a).await;
node_d.sync_from(&node_b).await;
// All nodes should have same canonical root
let root_a = node_a.canonical_merkle_root();
let root_b = node_b.canonical_merkle_root();
let root_c = node_c.canonical_merkle_root();
let root_d = node_d.canonical_merkle_root();
assert_eq!(root_a, root_b, "A and B should match");
assert_eq!(root_b, root_c, "B and C should match");
assert_eq!(root_c, root_d, "C and D should match");
// All should have 2 assertions
assert_eq!(node_a.leaves().len(), 2);
assert_eq!(node_b.leaves().len(), 2);
assert_eq!(node_c.leaves().len(), 2);
assert_eq!(node_d.leaves().len(), 2);
}
/// Test: Rapid writes during partition don't cause data loss.
///
/// Simulate high-frequency writes on both sides of partition,
/// then verify all writes survive after heal.
#[tokio::test]
async fn test_rapid_writes_during_partition_no_data_loss() {
let mut node_a = SimNode::new(1);
let mut node_b = SimNode::new(2);
let nodes = vec![node_a.id, node_b.id];
node_a.init_shards(&nodes, 4, 2);
node_b.init_shards(&nodes, 4, 2);
// Rapid writes on both sides
let mut hashes_a = Vec::new();
let mut hashes_b = Vec::new();
for i in 0..10 {
let subject = format!("rapid:a:{i}");
hashes_a.push(node_a.write_assertion(&subject, "pred", 1000 + i).await);
}
for i in 0..10 {
let subject = format!("rapid:b:{i}");
hashes_b.push(node_b.write_assertion(&subject, "pred", 2000 + i).await);
}
// Before heal
assert_eq!(node_a.leaves().len(), 10);
assert_eq!(node_b.leaves().len(), 10);
// PARTITION HEALS
node_a.sync_from(&node_b).await;
node_b.sync_from(&node_a).await;
// All 20 assertions should exist on both nodes
assert_eq!(node_a.leaves().len(), 20, "Node A should have all 20 assertions");
assert_eq!(node_b.leaves().len(), 20, "Node B should have all 20 assertions");
// Verify specific hashes
let a_leaves: HashSet<_> = node_a.leaves().into_iter().collect();
let b_leaves: HashSet<_> = node_b.leaves().into_iter().collect();
for hash in &hashes_a {
assert!(a_leaves.contains(hash), "Node A should have its own assertion");
assert!(b_leaves.contains(hash), "Node B should have A's assertion after sync");
}
for hash in &hashes_b {
assert!(a_leaves.contains(hash), "Node A should have B's assertion after sync");
assert!(b_leaves.contains(hash), "Node B should have its own assertion");
}
}