stemedb/crates/stemedb-sync/tests/convergence.rs
jordan d3a88585fe feat: Phase 6 UAT - Admission control, HLC recency, cluster coordination
This commit includes comprehensive work on Phase 6 features:

## Admission Control (Phase 6 admission middleware)
- AdmissionStore implementation backed by TrustRankStore
- PoW verification with tier-based difficulty computation
- Trust tier progression (Newcomer → Established → Trusted → Authority)
- API integration with admission status endpoints

## HLC Recency Lens (Phase 6C)
- HlcRecencyLens for distributed system ordering
- Hybrid logical clock integration with causality preservation

## Cluster Coordination (Phase 6C)
- Multi-node cluster tests (availability, partition tolerance)
- CRDT convergence tests for anti-entropy sync
- Gateway handler improvements

## Aphoria Code Linter (Phase 2A)
- RFC/OWASP corpus builders with network fetching and caching
- Concept hierarchy with auto-alias creation on conflict detection
- Multiple security extractors (TLS, JWT, CORS, secrets, rate limiting)

## Code Organization
- Split large files into modules to comply with 500-line limit
- Improved test organization with separate test modules
- Fixed rkyv serialization for EigenTrustState (AgentScore struct)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 00:43:37 -07:00

477 lines
18 KiB
Rust

//! Convergence verification tests for distributed consistency.
//!
//! These tests verify that two diverged nodes converge after sync.
//! The CRDT properties are tested individually but end-to-end convergence
//! is what we verify here.
//!
//! # Test Harness
//!
//! Uses in-process "nodes" that each have their own:
//! - `KVStore` (temp dir)
//! - `CrdtAssertionStore`
//! - `MerkleTreeManager`
//!
//! Sync is simulated by calling `get_state()` → `merge()` directly,
//! rather than going through gRPC.
// Allow expect in test code for convenience
#![allow(clippy::expect_used)]
use std::collections::HashMap;
use std::sync::Arc;
use stemedb_core::serde::serialize;
use stemedb_core::testing::AssertionBuilder;
use stemedb_core::types::HlcTimestamp;
use stemedb_lens::{HlcRecencyLens, Lens};
use stemedb_merkle::MerkleTree;
use stemedb_storage::crdt::{AssertionTransfer, CrdtAssertionStore};
use stemedb_storage::HybridStore;
use tempfile::tempdir;
/// A simulated node for convergence testing.
struct TestNode {
#[allow(dead_code)]
name: String,
node_id: [u8; 16],
#[allow(dead_code)]
store: Arc<HybridStore>,
crdt_store: Arc<CrdtAssertionStore<HybridStore>>,
merkle_tree: MerkleTree,
/// Maps hash -> (subject, data) for sync operations.
/// In production, this would be stored/retrieved differently.
hash_to_data: HashMap<[u8; 32], (String, Vec<u8>)>,
#[allow(dead_code)]
temp_dir: tempfile::TempDir,
}
impl TestNode {
/// Create a new test node with a unique node ID.
fn new(name: &str, node_id: [u8; 16]) -> Self {
let temp_dir = tempdir().expect("create temp dir");
let store = Arc::new(HybridStore::open(temp_dir.path()).expect("open store"));
let crdt_store = Arc::new(CrdtAssertionStore::new(store.clone(), node_id));
Self {
name: name.to_string(),
node_id,
store,
crdt_store,
merkle_tree: MerkleTree::new(),
hash_to_data: HashMap::new(),
temp_dir,
}
}
/// Add an assertion to this node.
async fn add_assertion(&mut self, subject: &str, predicate: &str, hlc_time: u64) -> [u8; 32] {
let assertion = AssertionBuilder::new()
.subject(subject)
.predicate(predicate)
.hlc_timestamp(HlcTimestamp::new(hlc_time, self.node_id))
.source_hash(rand_hash()) // Unique source hash for determinism
.build();
let data = serialize(&assertion).expect("serialize");
let hash = self.crdt_store.put_assertion(subject, &data).await.expect("put");
self.merkle_tree.insert(hash).expect("insert");
self.hash_to_data.insert(hash, (subject.to_string(), data));
hash
}
/// Get the Merkle root of this node (order-sensitive).
#[allow(dead_code)]
fn merkle_root(&self) -> Option<[u8; 32]> {
self.merkle_tree.root().ok()
}
/// Get a canonical Merkle root (sorted leaves) for convergence verification.
///
/// The standard Merkle tree preserves insertion order, which differs between
/// nodes depending on sync timing. For convergence testing, we compute a
/// canonical root by sorting leaves first, ensuring deterministic comparison.
fn canonical_merkle_root(&self) -> Option<[u8; 32]> {
let mut sorted_leaves = self.merkle_tree.leaves().to_vec();
if sorted_leaves.is_empty() {
return None;
}
sorted_leaves.sort();
// Rebuild tree with sorted leaves
let mut canonical = MerkleTree::new();
for leaf in sorted_leaves {
canonical.insert(leaf).ok()?;
}
canonical.root().ok()
}
/// Get all leaves (assertion hashes) in the Merkle tree.
fn leaves(&self) -> Vec<[u8; 32]> {
self.merkle_tree.leaves().to_vec()
}
/// Sync from another node by fetching missing assertions.
///
/// This simulates the anti-entropy sync process: we find hashes that
/// the other node has but we don't, then copy the assertion data.
async fn sync_from(&mut self, other: &TestNode) {
let my_leaves: std::collections::HashSet<_> = self.leaves().into_iter().collect();
// Find what the other node has that we don't
for hash in other.leaves() {
if !my_leaves.contains(&hash) {
// In a real system, assertion data is fetched via RPC.
// Here we use the hash_to_data map as our "RPC layer".
if let Some((subject, data)) = other.hash_to_data.get(&hash) {
let transfer = AssertionTransfer { hash, data: data.clone() };
if self
.crdt_store
.merge_with_data(subject, std::slice::from_ref(&transfer))
.await
.is_ok()
{
self.merkle_tree.insert(hash).expect("insert");
// Also store in our hash_to_data for transitive sync
self.hash_to_data.insert(hash, (subject.clone(), data.clone()));
}
}
}
}
}
}
/// Generate a random hash for test assertions.
fn rand_hash() -> [u8; 32] {
use std::time::{SystemTime, UNIX_EPOCH};
let nanos = SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.as_nanos()).unwrap_or(0);
let mut hash = [0u8; 32];
hash[..16].copy_from_slice(&nanos.to_le_bytes());
hash[16..32].copy_from_slice(&nanos.wrapping_add(1).to_le_bytes());
hash
}
// =============================================================================
// Convergence Tests
// =============================================================================
/// Test: Two nodes with disjoint assertion sets converge after sync.
///
/// Node A has [A1, A2], Node B has [B1].
/// After sync: both have [A1, A2, B1] and Merkle roots match.
#[tokio::test]
async fn test_two_node_disjoint_convergence() {
let mut node_a = TestNode::new("NodeA", [1u8; 16]);
let mut node_b = TestNode::new("NodeB", [2u8; 16]);
// Node A adds A1, A2
let a1 = node_a.add_assertion("A", "pred1", 1000).await;
let a2 = node_a.add_assertion("A", "pred2", 2000).await;
// Node B adds B1
let b1 = node_b.add_assertion("B", "pred1", 1500).await;
// Before sync: canonical roots differ
assert_ne!(node_a.canonical_merkle_root(), node_b.canonical_merkle_root());
assert_eq!(node_a.leaves().len(), 2);
assert_eq!(node_b.leaves().len(), 1);
// Sync: A pulls from B, B pulls from A
node_a.sync_from(&node_b).await;
node_b.sync_from(&node_a).await;
// After sync: both have all three assertions
let a_leaves: std::collections::HashSet<_> = node_a.leaves().into_iter().collect();
let b_leaves: std::collections::HashSet<_> = node_b.leaves().into_iter().collect();
assert!(a_leaves.contains(&a1), "Node A should have A1");
assert!(a_leaves.contains(&a2), "Node A should have A2");
assert!(a_leaves.contains(&b1), "Node A should have B1 after sync");
assert!(b_leaves.contains(&a1), "Node B should have A1 after sync");
assert!(b_leaves.contains(&a2), "Node B should have A2 after sync");
assert!(b_leaves.contains(&b1), "Node B should have B1");
// Canonical Merkle roots should match (same set of leaves)
assert_eq!(
node_a.canonical_merkle_root(),
node_b.canonical_merkle_root(),
"Canonical Merkle roots should match after convergence"
);
}
/// Test: Two nodes with overlapping assertion sets converge without duplicates.
///
/// Node A has [X, Y], Node B has [Y, Z].
/// After sync: both have [X, Y, Z] with no duplicates.
#[tokio::test]
async fn test_two_node_overlapping_convergence() {
let mut node_a = TestNode::new("NodeA", [1u8; 16]);
let mut node_b = TestNode::new("NodeB", [2u8; 16]);
// Node A adds X, Y
let x = node_a.add_assertion("X", "pred", 1000).await;
let y = node_a.add_assertion("Y", "pred", 2000).await;
// Copy Y to Node B (simulating earlier sync)
if let Some((subject, y_data)) = node_a.hash_to_data.get(&y).cloned() {
let transfer = AssertionTransfer { hash: y, data: y_data.clone() };
let _ = node_b.crdt_store.merge_with_data(&subject, std::slice::from_ref(&transfer)).await;
node_b.merkle_tree.insert(y).expect("insert");
node_b.hash_to_data.insert(y, (subject, y_data));
}
// Node B adds Z
let z = node_b.add_assertion("Z", "pred", 3000).await;
// Verify initial state
assert_eq!(node_a.leaves().len(), 2); // X, Y
assert_eq!(node_b.leaves().len(), 2); // Y, Z
// Sync both ways
node_a.sync_from(&node_b).await;
node_b.sync_from(&node_a).await;
// After sync: both have X, Y, Z
let a_leaves: std::collections::HashSet<_> = node_a.leaves().into_iter().collect();
let b_leaves: std::collections::HashSet<_> = node_b.leaves().into_iter().collect();
assert_eq!(a_leaves.len(), 3, "Node A should have 3 assertions");
assert_eq!(b_leaves.len(), 3, "Node B should have 3 assertions");
assert!(a_leaves.contains(&x));
assert!(a_leaves.contains(&y));
assert!(a_leaves.contains(&z));
assert!(b_leaves.contains(&x));
assert!(b_leaves.contains(&y));
assert!(b_leaves.contains(&z));
// Canonical Merkle roots should match
assert_eq!(node_a.canonical_merkle_root(), node_b.canonical_merkle_root());
}
/// Test: HlcRecencyLens produces same winner on both nodes after convergence.
///
/// After sync, both nodes should resolve to the same "most recent" assertion
/// using HlcRecencyLens, demonstrating deterministic resolution.
#[tokio::test]
async fn test_lens_determinism_after_convergence() {
let mut node_a = TestNode::new("NodeA", [1u8; 16]);
let mut node_b = TestNode::new("NodeB", [2u8; 16]);
// Both nodes add assertions for the same subject but with different HLC times
// Node A: older assertion (HLC time 1000)
let _older = node_a.add_assertion("S1", "pred", 1000).await;
// Node B: newer assertion (HLC time 2000)
let _newer = node_b.add_assertion("S1", "pred", 2000).await;
// Sync both ways
node_a.sync_from(&node_b).await;
node_b.sync_from(&node_a).await;
// Verify both nodes have the same assertions
assert_eq!(
node_a.canonical_merkle_root(),
node_b.canonical_merkle_root(),
"Canonical Merkle roots should match after sync"
);
// Now use HlcRecencyLens on both nodes
// In a real system, we'd query the CRDT store for all assertions for "S1"
// and resolve with the lens. Here we verify the lens is deterministic
// by creating equivalent assertion sets.
let lens = HlcRecencyLens;
// Create assertions that would be in both stores after sync
let older_assertion = AssertionBuilder::new()
.subject("S1")
.predicate("pred")
.hlc_timestamp(HlcTimestamp::new(1000, [1u8; 16]))
.build();
let newer_assertion = AssertionBuilder::new()
.subject("S1")
.predicate("pred")
.hlc_timestamp(HlcTimestamp::new(2000, [2u8; 16]))
.build();
// Resolve on "Node A" (order: older first)
let resolution_a = lens.resolve(&[older_assertion.clone(), newer_assertion.clone()]);
// Resolve on "Node B" (order: newer first)
let resolution_b = lens.resolve(&[newer_assertion.clone(), older_assertion.clone()]);
// Both should pick the same winner (newer assertion with HLC 2000)
assert!(resolution_a.winner.is_some());
assert!(resolution_b.winner.is_some());
let winner_a = resolution_a.winner.as_ref().expect("winner_a");
let winner_b = resolution_b.winner.as_ref().expect("winner_b");
assert_eq!(
winner_a.hlc_timestamp.time_ntp64, winner_b.hlc_timestamp.time_ntp64,
"Both nodes should resolve to the same HLC time"
);
assert_eq!(
winner_a.hlc_timestamp.time_ntp64, 2000,
"Winner should be the assertion with HLC 2000"
);
}
/// Test: Three-node chain convergence.
///
/// A → B sync, B → C sync, C → A sync.
/// All three should converge to the same state.
#[tokio::test]
async fn test_three_node_chain_convergence() {
let mut node_a = TestNode::new("NodeA", [1u8; 16]);
let mut node_b = TestNode::new("NodeB", [2u8; 16]);
let mut node_c = TestNode::new("NodeC", [3u8; 16]);
// Each node adds one assertion
let a1 = node_a.add_assertion("A", "pred", 1000).await;
let b1 = node_b.add_assertion("B", "pred", 2000).await;
let c1 = node_c.add_assertion("C", "pred", 3000).await;
// Chain sync: A → B → C → A
node_b.sync_from(&node_a).await; // B gets A1
node_c.sync_from(&node_b).await; // C gets A1, B1
node_a.sync_from(&node_c).await; // A gets B1, C1
// Additional round to fully propagate
node_b.sync_from(&node_a).await; // B gets C1
node_c.sync_from(&node_b).await; // C is already complete
// All nodes should have all assertions
let a_leaves: std::collections::HashSet<_> = node_a.leaves().into_iter().collect();
let b_leaves: std::collections::HashSet<_> = node_b.leaves().into_iter().collect();
let c_leaves: std::collections::HashSet<_> = node_c.leaves().into_iter().collect();
for (name, leaves) in [("A", &a_leaves), ("B", &b_leaves), ("C", &c_leaves)] {
assert!(leaves.contains(&a1), "Node {} should have A1", name);
assert!(leaves.contains(&b1), "Node {} should have B1", name);
assert!(leaves.contains(&c1), "Node {} should have C1", name);
}
// All canonical Merkle roots should match
assert_eq!(node_a.canonical_merkle_root(), node_b.canonical_merkle_root());
assert_eq!(node_b.canonical_merkle_root(), node_c.canonical_merkle_root());
}
/// Test: Merge commutativity - merge(A,B) == merge(B,A).
///
/// CRDT property: the order of merge operations shouldn't matter.
#[tokio::test]
async fn test_merge_commutativity() {
// Create two separate merge scenarios
let mut node_ab = TestNode::new("NodeAB", [1u8; 16]);
let mut node_ba = TestNode::new("NodeBA", [1u8; 16]); // Same node_id for comparable hashes
// Assertions from "Node A"
let a1 = AssertionBuilder::new()
.subject("X")
.predicate("p1")
.hlc_timestamp(HlcTimestamp::new(1000, [1u8; 16]))
.source_hash([10u8; 32])
.build();
let a2 = AssertionBuilder::new()
.subject("X")
.predicate("p2")
.hlc_timestamp(HlcTimestamp::new(2000, [1u8; 16]))
.source_hash([20u8; 32])
.build();
// Assertions from "Node B"
let b1 = AssertionBuilder::new()
.subject("X")
.predicate("p3")
.hlc_timestamp(HlcTimestamp::new(1500, [2u8; 16]))
.source_hash([30u8; 32])
.build();
// Merge order 1: A then B
let data_a1 = serialize(&a1).expect("serialize");
let data_a2 = serialize(&a2).expect("serialize");
let data_b1 = serialize(&b1).expect("serialize");
let hash_a1 = node_ab.crdt_store.put_assertion("X", &data_a1).await.expect("put");
let hash_a2 = node_ab.crdt_store.put_assertion("X", &data_a2).await.expect("put");
let hash_b1 = node_ab.crdt_store.put_assertion("X", &data_b1).await.expect("put");
node_ab.merkle_tree.insert(hash_a1).expect("insert");
node_ab.merkle_tree.insert(hash_a2).expect("insert");
node_ab.merkle_tree.insert(hash_b1).expect("insert");
// Merge order 2: B then A
let hash_b1_2 = node_ba.crdt_store.put_assertion("X", &data_b1).await.expect("put");
let hash_a1_2 = node_ba.crdt_store.put_assertion("X", &data_a1).await.expect("put");
let hash_a2_2 = node_ba.crdt_store.put_assertion("X", &data_a2).await.expect("put");
node_ba.merkle_tree.insert(hash_b1_2).expect("insert");
node_ba.merkle_tree.insert(hash_a1_2).expect("insert");
node_ba.merkle_tree.insert(hash_a2_2).expect("insert");
// Hashes should be identical (content-addressed)
assert_eq!(hash_a1, hash_a1_2, "A1 hash should be deterministic");
assert_eq!(hash_a2, hash_a2_2, "A2 hash should be deterministic");
assert_eq!(hash_b1, hash_b1_2, "B1 hash should be deterministic");
// Canonical Merkle roots should be identical regardless of merge order
assert_eq!(
node_ab.canonical_merkle_root(),
node_ba.canonical_merkle_root(),
"Merge order should not affect final state"
);
}
/// Test: Empty sync is a no-op.
#[tokio::test]
async fn test_empty_sync_is_noop() {
let mut node_a = TestNode::new("NodeA", [1u8; 16]);
let node_b = TestNode::new("NodeB", [2u8; 16]);
// Node A has assertions
let a1 = node_a.add_assertion("A", "pred", 1000).await;
let root_before = node_a.canonical_merkle_root();
// Sync from empty node B
node_a.sync_from(&node_b).await;
// Node A should be unchanged
assert_eq!(node_a.canonical_merkle_root(), root_before);
assert!(node_a.leaves().contains(&a1));
}
/// Test: Idempotent sync - syncing twice doesn't change state.
#[tokio::test]
async fn test_idempotent_sync() {
let mut node_a = TestNode::new("NodeA", [1u8; 16]);
let mut node_b = TestNode::new("NodeB", [2u8; 16]);
// Both nodes have assertions
let _a1 = node_a.add_assertion("A", "pred", 1000).await;
let _b1 = node_b.add_assertion("B", "pred", 2000).await;
// First sync
node_a.sync_from(&node_b).await;
let root_after_first_sync = node_a.canonical_merkle_root();
let leaves_after_first_sync = node_a.leaves().len();
// Second sync (should be no-op)
node_a.sync_from(&node_b).await;
let root_after_second_sync = node_a.canonical_merkle_root();
let leaves_after_second_sync = node_a.leaves().len();
assert_eq!(
root_after_first_sync, root_after_second_sync,
"Syncing twice should not change state"
);
assert_eq!(
leaves_after_first_sync, leaves_after_second_sync,
"Number of assertions should not change on re-sync"
);
}