stemedb/crates/stemedb-chaos/tests/partition_tests.rs
jordan b3e8a9a058 feat: Multi-application expansion with chaos testing and community UI
Major additions:
- Community Next.js app (port 18187) for browsing claims with API docs
- stemedb-chaos crate: Fault injection, chaos testing, CRDT properties
- Latent ingestion system: Reddit/FDA ingesters with ADK-Go agents
- Disputed claims handling: Manual review workflows and validation
- Aphoria security scanner: New extractors (SQL injection, command
  injection, weak crypto, TLS version), policy-based ignores, UAT reports
- Docker infrastructure: Dockerfile, docker-compose.yml for full stack
- VulnBank demo: Intentionally vulnerable multi-language test corpus

SDK & API enhancements:
- Source registry handlers for tracking data provenance
- Metrics endpoint
- Skeptic filtering improvements

Code quality:
- Split 14 large files (>500 lines) into focused modules
- All files now under 500-line limit per project guidelines

Documentation:
- Chaos testing guide, circuit breakers, observability docs
- Phase 7 UAT documentation updates
- Martin Kleppmann technical writer agent

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 01:24:14 -07:00

356 lines
12 KiB
Rust

//! Partition tolerance tests for distributed consistency.
//!
//! These tests verify that StemeDB clusters:
//! - Continue accepting writes during network partitions
//! - Converge correctly after partition heals
//! - Handle node failures and recovery
//! - Survive cascading failures
//!
//! # Test Strategy
//!
//! We use the `TestCluster` harness with `NetworkController` to simulate
//! partitions without requiring real network operations.
#![allow(clippy::unwrap_used, clippy::expect_used)]
use std::time::Duration;
use stemedb_chaos::{
TestCluster, TestClusterAccessExt, TestClusterConvergenceExt, TestClusterCreationExt,
TestClusterLifecycleExt, TestClusterSyncExt,
};
/// Test: 5-node cluster with 2 nodes killed converges after recovery.
///
/// Verifies:
/// - Writes succeed while nodes 0, 1, 2 are alive
/// - Nodes 3, 4 can recover by syncing from survivors
/// - All 5 nodes converge after recovery
#[tokio::test]
async fn test_5_node_kill_2_convergence() {
let mut cluster = TestCluster::spawn(5).await.expect("spawn cluster");
// Write baseline assertions to all nodes
cluster.get_node_mut(0).write_assertion("baseline:1", "pred", 1000).await.expect("write");
cluster.sync_all().await.expect("sync baseline");
cluster.assert_converged();
// Kill nodes 3 and 4
cluster.kill_node(3);
cluster.kill_node(4);
// Continue writes to nodes 0, 1, 2
cluster.get_node_mut(0).write_assertion("after_kill:1", "pred", 2000).await.expect("write");
cluster.get_node_mut(1).write_assertion("after_kill:2", "pred", 3000).await.expect("write");
cluster.get_node_mut(2).write_assertion("after_kill:3", "pred", 4000).await.expect("write");
// Sync among surviving nodes
cluster.sync_all().await.expect("sync survivors");
// Verify surviving nodes converged
let alive = cluster.alive_node_indices();
assert_eq!(alive.len(), 3);
cluster.assert_converged();
// "Restart" nodes 3 and 4 (fresh state, needs sync)
cluster.restart_node(3).expect("restart 3");
cluster.restart_node(4).expect("restart 4");
// Verify they start empty
assert_eq!(cluster.get_node(3).assertion_count(), 0);
assert_eq!(cluster.get_node(4).assertion_count(), 0);
// Sync all nodes
cluster.sync_all().await.expect("sync all");
// Verify all 5 nodes converged with all assertions
cluster.assert_converged();
// All nodes should have 4 assertions (1 baseline + 3 after kill)
for i in 0..5 {
assert_eq!(cluster.get_node(i).assertion_count(), 4, "Node {i} should have 4 assertions");
}
}
/// Test: Network partition between two groups converges after heal.
///
/// Verifies:
/// - Writes succeed on both sides of partition
/// - Groups cannot sync during partition
/// - Both groups' writes survive after heal
#[tokio::test]
async fn test_partition_between_groups_convergence() {
let mut cluster = TestCluster::spawn(5).await.expect("spawn cluster");
// Create partition: [0,1,2] and [3,4]
cluster.network().partition(&[0, 1, 2], &[3, 4]);
// Write assertion A to group 1
cluster.get_node_mut(0).write_assertion("group1:data", "value_a", 1000).await.expect("write A");
// Write assertion B to group 2
cluster.get_node_mut(3).write_assertion("group2:data", "value_b", 2000).await.expect("write B");
// Sync within groups only (partition blocks cross-group)
cluster.sync_all().await.expect("sync");
// Verify isolation: group 1 has A but not B
assert_eq!(cluster.get_node(0).assertion_count(), 1);
assert_eq!(cluster.get_node(1).assertion_count(), 1);
assert_eq!(cluster.get_node(2).assertion_count(), 1);
assert_eq!(cluster.get_node(3).assertion_count(), 1);
assert_eq!(cluster.get_node(4).assertion_count(), 1);
// Groups should NOT be converged (different data)
assert!(!cluster.is_converged(), "Should not be converged during partition");
// Heal partition
cluster.network().heal();
// Sync all nodes
cluster.sync_all().await.expect("sync after heal");
// Verify all nodes have both A and B
cluster.assert_converged();
for i in 0..5 {
assert_eq!(
cluster.get_node(i).assertion_count(),
2,
"Node {i} should have 2 assertions after heal"
);
}
}
/// Test: Message reordering doesn't affect convergence.
///
/// Verifies CRDT commutativity - same assertions in different order
/// produce the same final state.
#[tokio::test]
async fn test_message_reordering_convergence() {
let mut cluster = TestCluster::spawn(3).await.expect("spawn cluster");
// Write 100 unique assertions rapidly
for i in 0..100 {
let node_idx = i % 3;
let subject = format!("rapid:{i}");
cluster
.get_node_mut(node_idx)
.write_assertion(&subject, "pred", 1000 + i as u64)
.await
.expect("write");
}
// Sync in different patterns to simulate reordering
// First: 0 -> 1 -> 2
cluster.sync_pair(0, 1).await.expect("sync 0->1");
cluster.sync_pair(1, 2).await.expect("sync 1->2");
// Then: 2 -> 0 -> 1
cluster.sync_pair(2, 0).await.expect("sync 2->0");
cluster.sync_pair(0, 1).await.expect("sync 0->1");
// Finally: full mesh
cluster.sync_all().await.expect("sync all");
// All nodes should have all 100 assertions
cluster.assert_converged();
for i in 0..3 {
assert_eq!(
cluster.get_node(i).assertion_count(),
100,
"Node {i} should have 100 assertions"
);
}
}
/// Test: Message duplication is idempotent.
///
/// Verifies that syncing the same assertions multiple times
/// doesn't create duplicates (content-addressed).
#[tokio::test]
async fn test_message_duplication_idempotent() {
let mut cluster = TestCluster::spawn(3).await.expect("spawn cluster");
// Write 50 assertions
for i in 0..50 {
cluster
.get_node_mut(0)
.write_assertion(&format!("dup:{i}"), "pred", 1000 + i as u64)
.await
.expect("write");
}
// Sync multiple times (simulating duplication)
for _ in 0..5 {
cluster.sync_pair(0, 1).await.expect("sync");
cluster.sync_pair(0, 2).await.expect("sync");
}
// Verify no duplicates - should have exactly 50 assertions
cluster.assert_converged();
for i in 0..3 {
assert_eq!(
cluster.get_node(i).assertion_count(),
50,
"Node {i} should have exactly 50 assertions (no duplicates)"
);
}
}
/// Test: Cascading failure recovery.
///
/// Verifies cluster survives sequential node failures and recovers.
#[tokio::test]
async fn test_cascading_failure_recovery() {
let mut cluster = TestCluster::spawn(5).await.expect("spawn cluster");
// Write baseline
cluster.get_node_mut(0).write_assertion("baseline", "pred", 1000).await.expect("write");
cluster.sync_all().await.expect("sync baseline");
// Kill node 0
cluster.kill_node(0);
cluster.get_node_mut(1).write_assertion("after_0", "pred", 2000).await.expect("write");
cluster.sync_all().await.expect("sync");
// Kill node 1
cluster.kill_node(1);
cluster.get_node_mut(2).write_assertion("after_1", "pred", 3000).await.expect("write");
cluster.sync_all().await.expect("sync");
// Kill node 2 - only nodes 3, 4 remain
cluster.kill_node(2);
// Verify nodes 3, 4 are converged with all 3 assertions
cluster.assert_converged();
assert_eq!(cluster.alive_node_indices().len(), 2);
assert_eq!(cluster.get_node(3).assertion_count(), 3);
assert_eq!(cluster.get_node(4).assertion_count(), 3);
// Restart nodes sequentially
cluster.restart_node(0).expect("restart 0");
cluster.sync_all().await.expect("sync");
cluster.restart_node(1).expect("restart 1");
cluster.sync_all().await.expect("sync");
cluster.restart_node(2).expect("restart 2");
cluster.sync_all().await.expect("sync");
// Verify full cluster converged
cluster.assert_converged();
for i in 0..5 {
assert_eq!(cluster.get_node(i).assertion_count(), 3, "Node {i} should have 3 assertions");
}
}
/// Test: SWIM suspicion doesn't cause false positive under slow responses.
///
/// Verifies that a slow node is marked Suspect but recovers to Alive
/// when responses resume.
#[tokio::test]
async fn test_swim_suspicion_not_false_positive() {
// Create cluster to test SWIM behavior
let cluster = TestCluster::spawn_with_config(5, 4, 2).await.expect("spawn");
// Get node IDs for SWIM operations
let node1_id = cluster.get_node(1).node_id();
let node1_info = stemedb_cluster::membership::NodeInfo::new(
node1_id,
std::net::SocketAddr::from(([127, 0, 0, 1], 9091)),
std::net::SocketAddr::from(([127, 0, 0, 1], 8081)),
);
// Register node 1 with node 0's membership
cluster.get_node(0).membership().alive_node(node1_id, node1_info.clone());
// Slow down node 1's responses (add latency close to timeout)
cluster.network().set_latency_bidirectional(0, 1, Duration::from_millis(100));
// Node 0 suspects node 1 (simulating failed probe)
cluster.get_node(0).membership().suspect_node(node1_id);
// Verify node 1 is marked Suspect
let state = cluster.get_node(0).get_peer_state(node1_id);
assert_eq!(state, Some(stemedb_cluster::membership::NodeState::Suspect));
// Speed up node 1's responses (remove latency)
cluster.network().clear_latency(0, 1);
cluster.network().clear_latency(1, 0);
// Node 0 receives successful response, marks node 1 Alive again
cluster.get_node(0).membership().alive_node(node1_id, node1_info);
// Verify node 1 recovered to Alive
let state = cluster.get_node(0).get_peer_state(node1_id);
assert_eq!(state, Some(stemedb_cluster::membership::NodeState::Alive));
}
/// Test: Writes during asymmetric partition.
///
/// Tests where only one direction of communication is blocked.
#[tokio::test]
async fn test_asymmetric_partition() {
let mut cluster = TestCluster::spawn(3).await.expect("spawn cluster");
// Asymmetric partition: 0 can send to 1, but 1 cannot send to 0
cluster.network().partition_one_way(1, 0);
// Node 0 writes
cluster.get_node_mut(0).write_assertion("from_0", "pred", 1000).await.expect("write");
// Node 1 writes
cluster.get_node_mut(1).write_assertion("from_1", "pred", 2000).await.expect("write");
// Sync: 0->1 should work, 1->0 should be blocked
let merged_1 = cluster.sync_pair(0, 1).await.expect("sync 0->1");
let merged_0 = cluster.sync_pair(1, 0).await.expect("sync 1->0");
assert_eq!(merged_1, 1, "Node 1 should receive from 0");
assert_eq!(merged_0, 0, "Node 0 should not receive from 1 (blocked)");
// Heal and verify convergence
cluster.network().heal();
cluster.sync_all().await.expect("sync all");
cluster.assert_converged();
}
/// Test: Write availability during partition.
///
/// Verifies that nodes can accept writes even when partitioned from others.
#[tokio::test]
async fn test_write_availability_during_partition() {
let mut cluster = TestCluster::spawn(4).await.expect("spawn cluster");
// Severe partition: each node isolated
cluster.network().partition(&[0], &[1, 2, 3]);
cluster.network().partition(&[1], &[2, 3]);
cluster.network().partition(&[2], &[3]);
// All nodes can still write locally
for i in 0..4 {
cluster
.get_node_mut(i)
.write_assertion(&format!("isolated:{i}"), "pred", 1000 + i as u64)
.await
.expect("write should succeed");
}
// Each node has exactly 1 assertion
for i in 0..4 {
assert_eq!(cluster.get_node(i).assertion_count(), 1);
}
// Heal all partitions
cluster.network().heal();
cluster.sync_all().await.expect("sync");
// All nodes should have all 4 assertions
cluster.assert_converged();
for i in 0..4 {
assert_eq!(
cluster.get_node(i).assertion_count(),
4,
"Node {i} should have 4 assertions after heal"
);
}
}