stemedb/crates/stemedb-chaos/tests/consistency_tests.rs
jordan b3e8a9a058 feat: Multi-application expansion with chaos testing and community UI
Major additions:
- Community Next.js app (port 18187) for browsing claims with API docs
- stemedb-chaos crate: Fault injection, chaos testing, CRDT properties
- Latent ingestion system: Reddit/FDA ingesters with ADK-Go agents
- Disputed claims handling: Manual review workflows and validation
- Aphoria security scanner: New extractors (SQL injection, command
  injection, weak crypto, TLS version), policy-based ignores, UAT reports
- Docker infrastructure: Dockerfile, docker-compose.yml for full stack
- VulnBank demo: Intentionally vulnerable multi-language test corpus

SDK & API enhancements:
- Source registry handlers for tracking data provenance
- Metrics endpoint
- Skeptic filtering improvements

Code quality:
- Split 14 large files (>500 lines) into focused modules
- All files now under 500-line limit per project guidelines

Documentation:
- Chaos testing guide, circuit breakers, observability docs
- Phase 7 UAT documentation updates
- Martin Kleppmann technical writer agent

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 01:24:14 -07:00

490 lines
17 KiB
Rust

//! Jepsen-style consistency tests for CRDT invariants and HLC behavior.
//!
//! These tests verify:
//! - CRDT eventual consistency
//! - CRDT commutativity, associativity, and idempotence
//! - HLC behavior under clock skew
//! - Correct supersession ordering with skewed clocks
//!
//! # Reference
//!
//! Inspired by Jepsen testing methodology for distributed systems.
#![allow(clippy::unwrap_used, clippy::expect_used)]
use std::collections::HashSet;
use stemedb_chaos::crdt_properties::{
verify_all_properties, verify_associativity, verify_commutativity, verify_eventual_consistency,
verify_idempotence,
};
use stemedb_chaos::{
TestCluster, TestClusterAccessExt, TestClusterConvergenceExt, TestClusterCreationExt,
TestClusterSyncExt,
};
/// Test: CRDT eventual consistency across 5 nodes.
///
/// Verifies that after concurrent writes and sync:
/// - All nodes have the same set of assertions
/// - Canonical Merkle roots match
#[tokio::test]
async fn test_crdt_eventual_consistency() {
let mut cluster = TestCluster::spawn(5).await.expect("spawn cluster");
// Concurrently write 1000 unique assertions across all nodes
for i in 0..1000 {
let node_idx = i % 5;
let subject = format!("concurrent:{i}");
cluster
.get_node_mut(node_idx)
.write_assertion(&subject, "pred", 1000 + i as u64)
.await
.expect("write");
}
// Sync all nodes via anti-entropy
cluster.sync_all().await.expect("sync");
// Collect assertion sets from each node
let sets: Vec<HashSet<[u8; 32]>> =
(0..5).map(|i| cluster.get_node(i).leaves().into_iter().collect()).collect();
// Assert: all nodes have 1000 assertions
for (i, set) in sets.iter().enumerate() {
assert_eq!(set.len(), 1000, "Node {i} should have 1000 assertions");
}
// Assert: all nodes have identical sets
for i in 1..5 {
assert_eq!(sets[0], sets[i], "Node 0 and node {i} should have identical sets");
}
// Assert: all nodes have identical canonical Merkle roots
cluster.assert_converged();
}
/// Test: CRDT commutativity.
///
/// Verifies that merging assertions in different orders produces the same result.
#[tokio::test]
async fn test_crdt_commutativity() {
let mut cluster = TestCluster::spawn(2).await.expect("spawn cluster");
// Node 0: ingest [A1, A2, A3]
let mut hashes_0 = Vec::new();
for i in 0..3 {
let hash = cluster
.get_node_mut(0)
.write_assertion(&format!("a:{i}"), "pred", 1000 + i as u64)
.await
.expect("write");
hashes_0.push(hash);
}
// Node 1: ingest [A3, A2, A1] (same data conceptually, different order)
// In our case, we write different subjects but verify merge behavior
let mut hashes_1 = Vec::new();
for i in (0..3).rev() {
let hash = cluster
.get_node_mut(1)
.write_assertion(&format!("b:{i}"), "pred", 2000 + i as u64)
.await
.expect("write");
hashes_1.push(hash);
}
// Verify commutativity using CRDT property function
assert!(verify_commutativity(&hashes_0, &hashes_1));
// Sync in both orders and verify same result
cluster.sync_pair(0, 1).await.expect("sync");
cluster.sync_pair(1, 0).await.expect("sync");
// Canonical roots should match (order-independent)
assert_eq!(
cluster.get_node(0).canonical_merkle_root(),
cluster.get_node(1).canonical_merkle_root(),
"Canonical roots should match regardless of merge order"
);
}
/// Test: CRDT associativity.
///
/// Verifies that `(A merge B) merge C = A merge (B merge C)`.
///
/// Note: Since content-addressed hashing produces different hashes for
/// different assertions, we test associativity by verifying that all nodes
/// converge to the same state regardless of merge order.
#[tokio::test]
async fn test_crdt_associativity() {
let mut cluster = TestCluster::spawn(3).await.expect("spawn cluster");
// Create disjoint data: A=[X], B=[Y], C=[Z]
let hash_x = cluster.get_node_mut(0).write_assertion("x", "pred", 1000).await.expect("write X");
let hash_y = cluster.get_node_mut(1).write_assertion("y", "pred", 2000).await.expect("write Y");
let hash_z = cluster.get_node_mut(2).write_assertion("z", "pred", 3000).await.expect("write Z");
// Verify associativity using property function
assert!(verify_associativity(&[hash_x], &[hash_y], &[hash_z]));
// Merge path 1: (A sync B) then result sync C
cluster.sync_pair(0, 1).await.expect("sync A->B");
cluster.sync_pair(1, 0).await.expect("sync B->A");
// At this point 0 and 1 have X and Y
assert_eq!(cluster.get_node(0).assertion_count(), 2);
assert_eq!(cluster.get_node(1).assertion_count(), 2);
cluster.sync_pair(0, 2).await.expect("sync AB->C");
cluster.sync_pair(2, 0).await.expect("sync C->AB");
let root_ab_c = cluster.get_node(0).canonical_merkle_root();
// Now do a different order: sync B with C first, then with A
// First, make sure 1 syncs with 2
cluster.sync_pair(1, 2).await.expect("sync B->C");
cluster.sync_pair(2, 1).await.expect("sync C->B");
// Then 0 syncs with 1 (which now has B and C)
cluster.sync_pair(0, 1).await.expect("sync A->BC");
cluster.sync_pair(1, 0).await.expect("sync BC->A");
let root_a_bc = cluster.get_node(0).canonical_merkle_root();
// Both paths should produce same result since all nodes now have all data
assert_eq!(root_ab_c, root_a_bc, "Merge should be associative");
// All nodes should have all 3 assertions
for i in 0..3 {
assert_eq!(cluster.get_node(i).assertion_count(), 3);
}
}
/// Test: CRDT idempotence.
///
/// Verifies that merging the same data multiple times doesn't change the result.
/// Content-addressed storage means syncing the same assertions repeatedly
/// should not create duplicates.
#[tokio::test]
async fn test_crdt_idempotence() {
let mut cluster = TestCluster::spawn(2).await.expect("spawn cluster");
// Write unique assertions to node 0 only
for i in 0..10 {
let subject = format!("idempotent:{i}");
cluster
.get_node_mut(0)
.write_assertion(&subject, "pred", 1000 + i as u64)
.await
.expect("write");
}
let hashes = cluster.get_node(0).leaves();
assert!(verify_idempotence(&hashes));
// Sync to node 1
cluster.sync_pair(0, 1).await.expect("sync");
// Both nodes should have 10 assertions
assert_eq!(cluster.get_node(0).assertion_count(), 10);
assert_eq!(cluster.get_node(1).assertion_count(), 10);
let root_before = cluster.get_node(0).canonical_merkle_root();
let count_before = cluster.get_node(0).assertion_count();
// Sync multiple times (idempotent operation)
for _ in 0..5 {
cluster.sync_pair(0, 1).await.expect("sync");
cluster.sync_pair(1, 0).await.expect("sync");
}
// Should be unchanged - same assertions, no duplicates
assert_eq!(cluster.get_node(0).canonical_merkle_root(), root_before);
assert_eq!(cluster.get_node(0).assertion_count(), count_before);
assert_eq!(cluster.get_node(1).assertion_count(), count_before);
}
/// Test: HLC handles clock skew.
///
/// Verifies that nodes with significant clock skew still converge
/// and that skew is detected by the clock controller.
#[tokio::test]
async fn test_hlc_handles_clock_skew() {
let mut cluster = TestCluster::spawn(3).await.expect("spawn cluster");
// Inject clock skew
cluster.clock().inject_skew(1, 5000); // +5 seconds
cluster.clock().inject_skew(2, -5000); // -5 seconds
// Verify skew is configured and detected
assert!(
cluster.clock().has_significant_skew(1, 2),
"Should detect 10s skew between nodes 1 and 2"
);
assert!(
cluster.clock().has_significant_skew(0, 1),
"Should detect 5s skew between nodes 0 and 1"
);
assert_eq!(cluster.clock().get_offset(1), 5000, "Node 1 should have +5s offset");
assert_eq!(cluster.clock().get_offset(2), -5000, "Node 2 should have -5s offset");
// Write assertions from each node with HLC timestamps
for i in 0..3 {
cluster
.get_node_mut(i)
.write_assertion(&format!("skewed:{i}"), "pred", 1000 + i as u64)
.await
.expect("write");
}
// Verify the clock offsets are being reported correctly
assert_eq!(cluster.get_node(0).clock_offset_ms(), 0);
assert_eq!(cluster.get_node(1).clock_offset_ms(), 5000);
assert_eq!(cluster.get_node(2).clock_offset_ms(), -5000);
// Sync all nodes
cluster.sync_all().await.expect("sync");
// Cluster should converge despite skew
cluster.assert_converged();
for i in 0..3 {
assert_eq!(cluster.get_node(i).assertion_count(), 3);
}
}
/// Test: HLC monotonicity under partition.
///
/// Verifies that each node's HLC remains monotonic even during partition.
#[tokio::test]
async fn test_hlc_monotonic_under_partition() {
let mut cluster = TestCluster::spawn(2).await.expect("spawn cluster");
// Partition nodes
cluster.network().partition(&[0], &[1]);
// Generate 100 timestamps on each node
let mut timestamps_0 = Vec::new();
let mut timestamps_1 = Vec::new();
for i in 0..100 {
let ts0 = cluster.get_node(0).current_hlc();
timestamps_0.push(ts0);
let ts1 = cluster.get_node(1).current_hlc();
timestamps_1.push(ts1);
// Write to advance the clock
cluster
.get_node_mut(0)
.write_assertion(&format!("mono0:{i}"), "pred", 1000 + i as u64)
.await
.expect("write");
cluster
.get_node_mut(1)
.write_assertion(&format!("mono1:{i}"), "pred", 2000 + i as u64)
.await
.expect("write");
}
// Verify monotonicity for each node
for (i, window) in timestamps_0.windows(2).enumerate() {
assert!(
window[0] <= window[1],
"Node 0 timestamp {} not monotonic at index {i}",
window[0].time_ntp64
);
}
for (i, window) in timestamps_1.windows(2).enumerate() {
assert!(
window[0] <= window[1],
"Node 1 timestamp {} not monotonic at index {i}",
window[0].time_ntp64
);
}
// Heal partition, sync
cluster.network().heal();
cluster.sync_all().await.expect("sync");
// Merged state should have all 200 assertions
cluster.assert_converged();
for i in 0..2 {
assert_eq!(cluster.get_node(i).assertion_count(), 200);
}
}
/// Test: Supersession ordering with clock skew.
///
/// Verifies that HLC-based ordering is consistent even with skewed clocks.
/// The clock controller applies a +2s offset to node 1's timestamps.
#[tokio::test]
async fn test_supersession_ordering_with_clock_skew() {
let mut cluster = TestCluster::spawn(3).await.expect("spawn cluster");
// Inject +2s skew on node 1
cluster.clock().inject_skew(1, 2000);
// Verify skew is configured correctly
assert_eq!(cluster.get_node(0).clock_offset_ms(), 0, "Node 0 should have no skew");
assert_eq!(cluster.get_node(1).clock_offset_ms(), 2000, "Node 1 should have +2s skew");
// Capture timestamps at nearly the same wall clock time to verify skew
let ts_node0 = cluster.get_node(0).current_hlc();
let ts_node1 = cluster.get_node(1).current_hlc();
// Node 1's timestamp should be ~2 seconds ahead
// NTP64 format: upper 32 bits = seconds, so 2 seconds ≈ 2 * 2^32 units
// Allow some tolerance for execution time (1.5s to 2.5s)
let diff_ntp64 = ts_node1.time_ntp64.saturating_sub(ts_node0.time_ntp64);
let one_second_ntp64: u64 = 1 << 32;
let min_expected = one_second_ntp64 + (one_second_ntp64 / 2); // 1.5 seconds
let max_expected = one_second_ntp64 * 2 + (one_second_ntp64 / 2); // 2.5 seconds
assert!(
diff_ntp64 >= min_expected && diff_ntp64 <= max_expected,
"Node 1 timestamp should be ~2s ahead, got diff: {} seconds",
diff_ntp64 as f64 / one_second_ntp64 as f64
);
// Create epoch E1 on node 0
let hash_e1 = cluster
.get_node_mut(0)
.write_assertion("epoch:v1", "supersedes:none", 1000)
.await
.expect("write E1");
// Create epoch E2 superseding E1 on node 1
let hash_e2 = cluster
.get_node_mut(1)
.write_assertion("epoch:v2", "supersedes:v1", 2000)
.await
.expect("write E2");
// Sync all nodes
cluster.sync_all().await.expect("sync");
// Both assertions should exist (append-only)
cluster.assert_converged();
for i in 0..3 {
let node = cluster.get_node(i);
assert!(node.leaves().contains(&hash_e1), "Node {i} should have E1");
assert!(node.leaves().contains(&hash_e2), "Node {i} should have E2");
}
}
/// Test: Concurrent writes to same subject under partition.
///
/// Verifies that both writes survive (append-only) and Lenses
/// can resolve them deterministically.
#[tokio::test]
async fn test_concurrent_writes_same_subject_under_partition() {
let mut cluster = TestCluster::spawn(2).await.expect("spawn cluster");
// Partition nodes
cluster.network().partition(&[0], &[1]);
// Both write to same subject with different values
let hash_0 = cluster
.get_node_mut(0)
.write_assertion("drug:aspirin", "dosage:100mg", 1000)
.await
.expect("write from 0");
let hash_1 = cluster
.get_node_mut(1)
.write_assertion("drug:aspirin", "dosage:200mg", 2000)
.await
.expect("write from 1");
// Hashes should be different
assert_ne!(hash_0, hash_1, "Different values should produce different hashes");
// Heal partition, sync
cluster.network().heal();
cluster.sync_all().await.expect("sync");
// Both assertions survive (append-only)
for i in 0..2 {
let node = cluster.get_node(i);
assert!(node.leaves().contains(&hash_0), "Node {i} should have assertion from node 0");
assert!(node.leaves().contains(&hash_1), "Node {i} should have assertion from node 1");
}
// Both nodes should have 2 assertions for the same subject
assert_eq!(cluster.get_node(0).assertion_count(), 2);
assert_eq!(cluster.get_node(1).assertion_count(), 2);
cluster.assert_converged();
}
/// Test: Large Merkle diff eventual convergence.
///
/// Verifies that large differences in assertion counts still converge.
#[tokio::test]
async fn test_large_merkle_diff_eventual_convergence() {
let mut cluster = TestCluster::spawn(2).await.expect("spawn cluster");
// Node 0 has 1500 assertions
for i in 0..1500 {
cluster
.get_node_mut(0)
.write_assertion(&format!("large_a:{i}"), "pred", 1000 + i as u64)
.await
.expect("write to 0");
}
// Node 1 has 500 assertions
for i in 0..500 {
cluster
.get_node_mut(1)
.write_assertion(&format!("large_b:{i}"), "pred", 2000 + i as u64)
.await
.expect("write to 1");
}
// Verify different counts
assert_eq!(cluster.get_node(0).assertion_count(), 1500);
assert_eq!(cluster.get_node(1).assertion_count(), 500);
// Sync multiple times if needed
for _ in 0..3 {
cluster.sync_all().await.expect("sync");
}
// Should eventually converge with 2000 assertions
cluster.assert_converged();
for i in 0..2 {
assert_eq!(
cluster.get_node(i).assertion_count(),
2000,
"Node {i} should have 2000 assertions after convergence"
);
}
}
/// Test: All CRDT properties with property-based testing.
#[tokio::test]
async fn test_all_crdt_properties() {
// Generate test data
let set_a: Vec<[u8; 32]> = (0..10).map(|i| [i; 32]).collect();
let set_b: Vec<[u8; 32]> = (5..15).map(|i| [i; 32]).collect();
let set_c: Vec<[u8; 32]> = (10..20).map(|i| [i; 32]).collect();
let result = verify_all_properties(&set_a, &set_b, &set_c);
assert!(result.all_passed(), "CRDT properties failed: {:?}", result.failures());
assert!(result.commutative, "Commutativity failed");
assert!(result.associative, "Associativity failed");
assert!(result.idempotent_a, "Idempotence A failed");
assert!(result.idempotent_b, "Idempotence B failed");
assert!(result.idempotent_c, "Idempotence C failed");
}
/// Test: Eventual consistency property verification.
#[tokio::test]
async fn test_eventual_consistency_property() {
// Simulate operations
let operations: Vec<[u8; 32]> = (0..50).map(|i| [i; 32]).collect();
assert!(verify_eventual_consistency(&operations), "Eventual consistency property should hold");
}