stemedb/crates/stemedb-cluster/tests/availability.rs
jordan d3a88585fe feat: Phase 6 UAT - Admission control, HLC recency, cluster coordination
This commit includes comprehensive work on Phase 6 features:

## Admission Control (Phase 6 admission middleware)
- AdmissionStore implementation backed by TrustRankStore
- PoW verification with tier-based difficulty computation
- Trust tier progression (Newcomer → Established → Trusted → Authority)
- API integration with admission status endpoints

## HLC Recency Lens (Phase 6C)
- HlcRecencyLens for distributed system ordering
- Hybrid logical clock integration with causality preservation

## Cluster Coordination (Phase 6C)
- Multi-node cluster tests (availability, partition tolerance)
- CRDT convergence tests for anti-entropy sync
- Gateway handler improvements

## Aphoria Code Linter (Phase 2A)
- RFC/OWASP corpus builders with network fetching and caching
- Concept hierarchy with auto-alias creation on conflict detection
- Multiple security extractors (TLS, JWT, CORS, secrets, rate limiting)

## Code Organization
- Split large files into modules to comply with 500-line limit
- Improved test organization with separate test modules
- Fixed rkyv serialization for EigenTrustState (AgentScore struct)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 00:43:37 -07:00

465 lines
16 KiB
Rust

//! Availability tests for distributed consistency.
//!
//! These tests verify that StemeDB provides high availability:
//! - Reads succeed on any replica that has the shard
//! - Writes are accepted by any replica (not just leader)
//! - Node failures don't block operations on other nodes
#![allow(clippy::unwrap_used, clippy::expect_used)]
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
use stemedb_cluster::config::SwimConfig;
use stemedb_cluster::membership::{NodeId, NodeInfo, SwimMembership};
use stemedb_cluster::sharding::{MetaRange, RangeRouter};
use stemedb_core::serde::serialize;
use stemedb_core::testing::AssertionBuilder;
use stemedb_core::types::HlcTimestamp;
use stemedb_merkle::MerkleTree;
use stemedb_storage::crdt::{AssertionTransfer, CrdtAssertionStore};
use stemedb_storage::HybridStore;
use tempfile::tempdir;
// =============================================================================
// Test Helpers
// =============================================================================
fn test_addr(port: u16) -> SocketAddr {
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port)
}
fn test_node_id(n: u8) -> NodeId {
NodeId::from_bytes([n; 16])
}
fn test_node_info(n: u8) -> NodeInfo {
let id = test_node_id(n);
NodeInfo::new(id, test_addr(9090 + n as u16), test_addr(8080 + n as u16))
}
/// A simulated cluster node for availability testing.
struct AvailabilityNode {
id: NodeId,
#[allow(dead_code)]
membership: Arc<SwimMembership>,
router: Arc<RangeRouter>,
#[allow(dead_code)]
store: Arc<HybridStore>,
crdt_store: Arc<CrdtAssertionStore<HybridStore>>,
merkle_tree: MerkleTree,
hash_to_data: HashMap<[u8; 32], (String, Vec<u8>)>,
/// Simulated node failure state.
failed: bool,
#[allow(dead_code)]
temp_dir: tempfile::TempDir,
}
impl AvailabilityNode {
fn new(n: u8) -> Self {
let id = test_node_id(n);
let info = test_node_info(n);
let temp_dir = tempdir().expect("create temp dir");
let store = Arc::new(HybridStore::open(temp_dir.path()).expect("open store"));
let crdt_store = Arc::new(CrdtAssertionStore::new(store.clone(), *id.as_bytes()));
let membership = Arc::new(SwimMembership::new(info, SwimConfig::default()));
let router = Arc::new(RangeRouter::new(id));
Self {
id,
membership,
router,
store,
crdt_store,
merkle_tree: MerkleTree::new(),
hash_to_data: HashMap::new(),
failed: false,
temp_dir,
}
}
fn init_shards(&self, nodes: &[NodeId], num_shards: u32, replication_factor: u32) {
let meta = MetaRange::with_initial_shards(num_shards, nodes, replication_factor);
self.router.update_meta_range(meta);
}
/// Check if this node is a replica for the given subject's shard.
#[allow(dead_code)]
fn is_replica_for(&self, subject: &str) -> bool {
if self.failed {
return false;
}
let shard_id = match self.router.route_subject(subject) {
Ok(id) => id,
Err(_) => return false,
};
match self.router.get_replicas(shard_id) {
Ok(replicas) => replicas.contains(&self.id),
Err(_) => false,
}
}
/// Check if this node is the leader for the given subject's shard.
fn is_leader_for(&self, subject: &str) -> bool {
if self.failed {
return false;
}
let shard_id = match self.router.route_subject(subject) {
Ok(id) => id,
Err(_) => return false,
};
match self.router.get_leader(shard_id) {
Ok(leader) => leader == self.id,
Err(_) => false,
}
}
/// Write an assertion (succeeds if node is not failed).
async fn write(&mut self, subject: &str, predicate: &str, hlc_time: u64) -> Option<[u8; 32]> {
if self.failed {
return None;
}
let assertion = AssertionBuilder::new()
.subject(subject)
.predicate(predicate)
.hlc_timestamp(HlcTimestamp::new(hlc_time, *self.id.as_bytes()))
.source_hash(rand_hash())
.build();
let data = serialize(&assertion).expect("serialize");
let hash = self.crdt_store.put_assertion(subject, &data).await.expect("put");
self.merkle_tree.insert(hash).expect("insert");
self.hash_to_data.insert(hash, (subject.to_string(), data));
Some(hash)
}
/// Read assertion data (succeeds if node is not failed).
async fn read(&self, subject: &str, hash: &[u8; 32]) -> Option<Vec<u8>> {
if self.failed {
return None;
}
self.crdt_store.get_assertion(subject, hash).await.ok().flatten()
}
/// Simulate node failure.
fn fail(&mut self) {
self.failed = true;
}
/// Recover from failure.
#[allow(dead_code)]
fn recover(&mut self) {
self.failed = false;
}
/// Check if node is available.
fn is_available(&self) -> bool {
!self.failed
}
/// Get all leaves.
fn leaves(&self) -> Vec<[u8; 32]> {
self.merkle_tree.leaves().to_vec()
}
/// Sync from another node.
async fn sync_from(&mut self, other: &AvailabilityNode) {
if self.failed || other.failed {
return;
}
let my_leaves: std::collections::HashSet<_> = self.leaves().into_iter().collect();
for hash in other.leaves() {
if !my_leaves.contains(&hash) {
if let Some((subject, data)) = other.hash_to_data.get(&hash) {
let transfer = AssertionTransfer { hash, data: data.clone() };
if self
.crdt_store
.merge_with_data(subject, std::slice::from_ref(&transfer))
.await
.is_ok()
{
self.merkle_tree.insert(hash).expect("insert");
self.hash_to_data.insert(hash, (subject.clone(), data.clone()));
}
}
}
}
}
}
fn rand_hash() -> [u8; 32] {
use std::time::{SystemTime, UNIX_EPOCH};
let nanos = SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.as_nanos()).unwrap_or(0);
let mut hash = [0u8; 32];
hash[..16].copy_from_slice(&nanos.to_le_bytes());
let tid = std::thread::current().id();
hash[16..24].copy_from_slice(&format!("{tid:?}").as_bytes()[..8.min(format!("{tid:?}").len())]);
hash
}
// =============================================================================
// Availability Tests
// =============================================================================
/// Test: Read succeeds on any replica that has the shard.
///
/// Write data to one node, sync to replicas, verify read works on any replica.
#[tokio::test]
async fn test_read_any_replica() {
let mut node_a = AvailabilityNode::new(1);
let mut node_b = AvailabilityNode::new(2);
let mut node_c = AvailabilityNode::new(3);
let nodes = vec![node_a.id, node_b.id, node_c.id];
// RF=3 means all nodes are replicas for all shards
node_a.init_shards(&nodes, 4, 3);
node_b.init_shards(&nodes, 4, 3);
node_c.init_shards(&nodes, 4, 3);
// Write on node A
let subject = "test:subject";
let hash = node_a.write(subject, "predicate", 1000).await.expect("write");
// Sync to all replicas
node_b.sync_from(&node_a).await;
node_c.sync_from(&node_a).await;
// Read should succeed on all replicas
let data_a = node_a.read(subject, &hash).await;
let data_b = node_b.read(subject, &hash).await;
let data_c = node_c.read(subject, &hash).await;
assert!(data_a.is_some(), "Read should succeed on node A (writer)");
assert!(data_b.is_some(), "Read should succeed on node B (replica)");
assert!(data_c.is_some(), "Read should succeed on node C (replica)");
// Data should be identical
assert_eq!(data_a, data_b, "Data should match across replicas A and B");
assert_eq!(data_b, data_c, "Data should match across replicas B and C");
}
/// Test: Write is accepted by any replica (not just leader).
///
/// StemeDB uses leaderless replication - any replica can accept writes.
#[tokio::test]
async fn test_write_any_replica() {
let mut node_a = AvailabilityNode::new(1);
let mut node_b = AvailabilityNode::new(2);
let mut node_c = AvailabilityNode::new(3);
let nodes = vec![node_a.id, node_b.id, node_c.id];
// RF=3 means all nodes are replicas
node_a.init_shards(&nodes, 4, 3);
node_b.init_shards(&nodes, 4, 3);
node_c.init_shards(&nodes, 4, 3);
let subject = "test:subject";
// Identify who is leader and who isn't
let a_is_leader = node_a.is_leader_for(subject);
let b_is_leader = node_b.is_leader_for(subject);
// Find a non-leader node
let (non_leader_writes, non_leader_id) = if !a_is_leader {
let hash = node_a.write(subject, "from-non-leader", 1000).await;
(hash.is_some(), "A")
} else if !b_is_leader {
let hash = node_b.write(subject, "from-non-leader", 1000).await;
(hash.is_some(), "B")
} else {
let hash = node_c.write(subject, "from-non-leader", 1000).await;
(hash.is_some(), "C")
};
assert!(
non_leader_writes,
"Non-leader node {} should accept writes (leaderless replication)",
non_leader_id
);
}
/// Test: Node failure doesn't block operations on other nodes.
///
/// When one node fails, other nodes should continue serving reads and writes.
#[tokio::test]
async fn test_node_failure_isolation() {
let mut node_a = AvailabilityNode::new(1);
let mut node_b = AvailabilityNode::new(2);
let mut node_c = AvailabilityNode::new(3);
let nodes = vec![node_a.id, node_b.id, node_c.id];
node_a.init_shards(&nodes, 4, 3);
node_b.init_shards(&nodes, 4, 3);
node_c.init_shards(&nodes, 4, 3);
// Initial write on A
let subject = "test:subject";
let hash1 = node_a.write(subject, "before-failure", 1000).await.expect("write");
// Sync before failure
node_b.sync_from(&node_a).await;
node_c.sync_from(&node_a).await;
// NODE A FAILS
node_a.fail();
assert!(!node_a.is_available(), "Node A should be unavailable");
// Verify node A operations fail
let a_read = node_a.read(subject, &hash1).await;
let a_write = node_a.write(subject, "during-failure", 2000).await;
assert!(a_read.is_none(), "Read on failed node should fail");
assert!(a_write.is_none(), "Write on failed node should fail");
// BUT: B and C should continue working normally
assert!(node_b.is_available(), "Node B should still be available");
assert!(node_c.is_available(), "Node C should still be available");
// Reads still work on B and C
let b_read = node_b.read(subject, &hash1).await;
let c_read = node_c.read(subject, &hash1).await;
assert!(b_read.is_some(), "Read on node B should succeed during A failure");
assert!(c_read.is_some(), "Read on node C should succeed during A failure");
// Writes still work on B and C
let hash2 = node_b.write(subject, "during-a-failure", 2000).await;
let hash3 = node_c.write(subject, "also-during-failure", 3000).await;
assert!(hash2.is_some(), "Write on node B should succeed during A failure");
assert!(hash3.is_some(), "Write on node C should succeed during A failure");
// Sync between surviving nodes
node_b.sync_from(&node_c).await;
node_c.sync_from(&node_b).await;
// Both B and C should have all data
assert_eq!(node_b.leaves().len(), 3, "Node B should have 3 assertions");
assert_eq!(node_c.leaves().len(), 3, "Node C should have 3 assertions");
}
/// Test: Read availability with quorum.
///
/// With RF=3 and 2 nodes available, reads should succeed.
#[tokio::test]
async fn test_read_quorum_availability() {
let mut node_a = AvailabilityNode::new(1);
let mut node_b = AvailabilityNode::new(2);
let mut node_c = AvailabilityNode::new(3);
let nodes = vec![node_a.id, node_b.id, node_c.id];
node_a.init_shards(&nodes, 4, 3);
node_b.init_shards(&nodes, 4, 3);
node_c.init_shards(&nodes, 4, 3);
// Write and sync to all
let subject = "test:subject";
let hash = node_a.write(subject, "predicate", 1000).await.expect("write");
node_b.sync_from(&node_a).await;
node_c.sync_from(&node_a).await;
// Fail one node - quorum (2/3) still available
node_c.fail();
// Read should succeed on remaining nodes
let read_a = node_a.read(subject, &hash).await;
let read_b = node_b.read(subject, &hash).await;
assert!(read_a.is_some(), "Read on A should succeed with quorum available");
assert!(read_b.is_some(), "Read on B should succeed with quorum available");
}
/// Test: Write availability with quorum.
///
/// With RF=3 and 2 nodes available, writes should succeed.
#[tokio::test]
async fn test_write_quorum_availability() {
let mut node_a = AvailabilityNode::new(1);
let mut node_b = AvailabilityNode::new(2);
let mut node_c = AvailabilityNode::new(3);
let nodes = vec![node_a.id, node_b.id, node_c.id];
node_a.init_shards(&nodes, 4, 3);
node_b.init_shards(&nodes, 4, 3);
node_c.init_shards(&nodes, 4, 3);
// Fail one node
node_c.fail();
// Writes should succeed on remaining nodes
let subject = "test:subject";
let write_a = node_a.write(subject, "pred1", 1000).await;
let write_b = node_b.write(subject, "pred2", 2000).await;
assert!(write_a.is_some(), "Write on A should succeed with quorum available");
assert!(write_b.is_some(), "Write on B should succeed with quorum available");
// Sync between surviving nodes
node_a.sync_from(&node_b).await;
node_b.sync_from(&node_a).await;
// Both should have both writes
assert_eq!(node_a.leaves().len(), 2);
assert_eq!(node_b.leaves().len(), 2);
}
/// Test: All replicas eventually have identical data.
///
/// This is the core eventual consistency guarantee.
#[tokio::test]
async fn test_eventual_consistency_across_replicas() {
let mut node_a = AvailabilityNode::new(1);
let mut node_b = AvailabilityNode::new(2);
let mut node_c = AvailabilityNode::new(3);
let nodes = vec![node_a.id, node_b.id, node_c.id];
node_a.init_shards(&nodes, 4, 3);
node_b.init_shards(&nodes, 4, 3);
node_c.init_shards(&nodes, 4, 3);
// Each node writes independently
let h1 = node_a.write("s1", "p1", 1000).await.expect("write");
let h2 = node_b.write("s2", "p2", 2000).await.expect("write");
let h3 = node_c.write("s3", "p3", 3000).await.expect("write");
// Before sync: each has only its own
assert_eq!(node_a.leaves().len(), 1);
assert_eq!(node_b.leaves().len(), 1);
assert_eq!(node_c.leaves().len(), 1);
// Full mesh sync (simulating anti-entropy)
node_a.sync_from(&node_b).await;
node_a.sync_from(&node_c).await;
node_b.sync_from(&node_a).await;
node_b.sync_from(&node_c).await;
node_c.sync_from(&node_a).await;
node_c.sync_from(&node_b).await;
// After sync: all have all data
assert_eq!(node_a.leaves().len(), 3, "Node A should have all 3 assertions");
assert_eq!(node_b.leaves().len(), 3, "Node B should have all 3 assertions");
assert_eq!(node_c.leaves().len(), 3, "Node C should have all 3 assertions");
// Verify specific hashes
let a_leaves: std::collections::HashSet<_> = node_a.leaves().into_iter().collect();
let b_leaves: std::collections::HashSet<_> = node_b.leaves().into_iter().collect();
let c_leaves: std::collections::HashSet<_> = node_c.leaves().into_iter().collect();
assert!(a_leaves.contains(&h1) && a_leaves.contains(&h2) && a_leaves.contains(&h3));
assert!(b_leaves.contains(&h1) && b_leaves.contains(&h2) && b_leaves.contains(&h3));
assert!(c_leaves.contains(&h1) && c_leaves.contains(&h2) && c_leaves.contains(&h3));
// All sets should be identical
assert_eq!(a_leaves, b_leaves, "A and B should have identical data");
assert_eq!(b_leaves, c_leaves, "B and C should have identical data");
}