Major additions: - Community Next.js app (port 18187) for browsing claims with API docs - stemedb-chaos crate: Fault injection, chaos testing, CRDT properties - Latent ingestion system: Reddit/FDA ingesters with ADK-Go agents - Disputed claims handling: Manual review workflows and validation - Aphoria security scanner: New extractors (SQL injection, command injection, weak crypto, TLS version), policy-based ignores, UAT reports - Docker infrastructure: Dockerfile, docker-compose.yml for full stack - VulnBank demo: Intentionally vulnerable multi-language test corpus SDK & API enhancements: - Source registry handlers for tracking data provenance - Metrics endpoint - Skeptic filtering improvements Code quality: - Split 14 large files (>500 lines) into focused modules - All files now under 500-line limit per project guidelines Documentation: - Chaos testing guide, circuit breakers, observability docs - Phase 7 UAT documentation updates - Martin Kleppmann technical writer agent Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
315 lines
11 KiB
Rust
315 lines
11 KiB
Rust
//! Battery 11: Two-Node Replication Tests
|
|
//!
|
|
//! Tests for gossip broadcast and anti-entropy sync between two nodes.
|
|
//! Verifies that assertions replicate correctly and nodes converge.
|
|
|
|
#![allow(clippy::expect_used)] // Test code uses expect() for clear failure messages
|
|
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
|
|
use ed25519_dalek::{Signer, SigningKey};
|
|
use rand::rngs::OsRng;
|
|
use stemedb_core::serde::serialize;
|
|
use stemedb_core::testing::AssertionBuilder;
|
|
use stemedb_core::types::{LifecycleStage, ObjectValue, SignatureEntry, SourceClass};
|
|
use stemedb_ingest::GossipBroadcast; // Import trait for methods
|
|
use stemedb_merkle::MerkleTree;
|
|
use stemedb_storage::crdt::CrdtAssertionStore;
|
|
use stemedb_storage::{key_codec, HybridStore, KVStore};
|
|
use stemedb_sync::gossip::GossipBroadcaster;
|
|
use stemedb_sync::merkle_manager::MerkleTreeManager;
|
|
use stemedb_sync::SyncConfig;
|
|
use tempfile::tempdir;
|
|
|
|
/// Create a signed assertion for testing.
|
|
fn create_test_assertion(subject: &str, predicate: &str, value: i64, timestamp: u64) -> Vec<u8> {
|
|
let mut csprng = OsRng;
|
|
let signing_key = SigningKey::generate(&mut csprng);
|
|
let verifying_key = signing_key.verifying_key();
|
|
|
|
let message = format!("{}:{}", subject, predicate);
|
|
let signature = signing_key.sign(message.as_bytes());
|
|
|
|
let assertion = AssertionBuilder::new()
|
|
.subject(subject)
|
|
.predicate(predicate)
|
|
.object(ObjectValue::Number(value as f64))
|
|
.source_class(SourceClass::Regulatory) // Using valid variant
|
|
.confidence(0.9)
|
|
.lifecycle(LifecycleStage::Proposed)
|
|
.timestamp(timestamp)
|
|
.signatures(vec![SignatureEntry {
|
|
agent_id: verifying_key.to_bytes(),
|
|
signature: signature.to_bytes(),
|
|
timestamp,
|
|
version: 1,
|
|
}])
|
|
.build();
|
|
|
|
serialize(&assertion).expect("serialize assertion")
|
|
}
|
|
|
|
/// Test node with storage and sync components.
|
|
struct TestNode {
|
|
store: Arc<HybridStore>,
|
|
merkle_manager: Arc<MerkleTreeManager<HybridStore>>,
|
|
#[allow(dead_code)]
|
|
crdt_store: Arc<CrdtAssertionStore<HybridStore>>,
|
|
#[allow(dead_code)]
|
|
node_id: [u8; 16],
|
|
_temp_dir: tempfile::TempDir,
|
|
}
|
|
|
|
impl TestNode {
|
|
async fn new(node_id: [u8; 16]) -> Self {
|
|
let temp_dir = tempdir().expect("create temp dir");
|
|
let store = Arc::new(HybridStore::open(temp_dir.path()).expect("open store"));
|
|
let merkle_manager = Arc::new(
|
|
MerkleTreeManager::load_or_create(store.clone()).await.expect("create merkle manager"),
|
|
);
|
|
// CrdtAssertionStore takes S where it stores Arc<S> internally
|
|
let crdt_store = Arc::new(CrdtAssertionStore::new(store.clone(), node_id));
|
|
|
|
Self { store, merkle_manager, crdt_store, node_id, _temp_dir: temp_dir }
|
|
}
|
|
|
|
/// Store an assertion and update Merkle tree.
|
|
async fn ingest_assertion(&self, data: &[u8]) {
|
|
let hash = blake3::hash(data);
|
|
let hash_bytes = *hash.as_bytes();
|
|
let hash_hex = hash.to_hex().to_string();
|
|
|
|
// Store assertion
|
|
let key = key_codec::assertion_key("test_subject", &hash_hex);
|
|
self.store.put(&key, data).await.expect("put assertion");
|
|
|
|
// Update Merkle tree
|
|
self.merkle_manager.insert(hash_bytes).await.expect("insert into merkle");
|
|
}
|
|
|
|
/// Check if an assertion exists by hash.
|
|
#[allow(dead_code)]
|
|
async fn has_assertion(&self, hash: &[u8; 32]) -> bool {
|
|
let hash_hex = hex::encode(hash);
|
|
let key = key_codec::assertion_key("test_subject", &hash_hex);
|
|
self.store.get(&key).await.expect("get assertion").is_some()
|
|
}
|
|
|
|
/// Get assertion count.
|
|
#[allow(dead_code)]
|
|
async fn assertion_count(&self) -> usize {
|
|
self.merkle_manager.len().await
|
|
}
|
|
|
|
/// Get Merkle root.
|
|
async fn merkle_root(&self) -> Option<[u8; 32]> {
|
|
self.merkle_manager.root().await.expect("get root")
|
|
}
|
|
}
|
|
|
|
/// Test 1: Merkle root comparison for identical trees.
|
|
#[tokio::test]
|
|
async fn test_identical_trees_same_root() {
|
|
let node_a = TestNode::new([1u8; 16]).await;
|
|
let node_b = TestNode::new([2u8; 16]).await;
|
|
|
|
// Insert same assertions in same order
|
|
let data1 = create_test_assertion("test_subject", "price", 100, 1000);
|
|
let data2 = create_test_assertion("test_subject", "price", 200, 1001);
|
|
|
|
node_a.ingest_assertion(&data1).await;
|
|
node_a.ingest_assertion(&data2).await;
|
|
|
|
node_b.ingest_assertion(&data1).await;
|
|
node_b.ingest_assertion(&data2).await;
|
|
|
|
// Merkle roots should match
|
|
let root_a = node_a.merkle_root().await.expect("root A");
|
|
let root_b = node_b.merkle_root().await.expect("root B");
|
|
|
|
assert_eq!(root_a, root_b, "Identical trees should have same root");
|
|
}
|
|
|
|
/// Test 2: Merkle root comparison for different trees.
|
|
#[tokio::test]
|
|
async fn test_different_trees_different_roots() {
|
|
let node_a = TestNode::new([1u8; 16]).await;
|
|
let node_b = TestNode::new([2u8; 16]).await;
|
|
|
|
// Insert different assertions
|
|
let data1 = create_test_assertion("test_subject", "price", 100, 1000);
|
|
let data2 = create_test_assertion("test_subject", "price", 200, 1001);
|
|
|
|
node_a.ingest_assertion(&data1).await;
|
|
node_b.ingest_assertion(&data2).await;
|
|
|
|
// Merkle roots should differ
|
|
let root_a = node_a.merkle_root().await.expect("root A");
|
|
let root_b = node_b.merkle_root().await.expect("root B");
|
|
|
|
assert_ne!(root_a, root_b, "Different trees should have different roots");
|
|
}
|
|
|
|
/// Test 3: Merkle diff finds missing assertions.
|
|
#[tokio::test]
|
|
async fn test_merkle_diff_finds_missing() {
|
|
use stemedb_merkle::DiffResult;
|
|
|
|
let node_a = TestNode::new([1u8; 16]).await;
|
|
let node_b = TestNode::new([2u8; 16]).await;
|
|
|
|
// Node A has assertions 1, 2
|
|
let data1 = create_test_assertion("test_subject", "price", 100, 1000);
|
|
let data2 = create_test_assertion("test_subject", "price", 200, 1001);
|
|
let data3 = create_test_assertion("test_subject", "price", 300, 1002);
|
|
|
|
node_a.ingest_assertion(&data1).await;
|
|
node_a.ingest_assertion(&data2).await;
|
|
|
|
// Node B has assertions 1, 2, 3
|
|
node_b.ingest_assertion(&data1).await;
|
|
node_b.ingest_assertion(&data2).await;
|
|
node_b.ingest_assertion(&data3).await;
|
|
|
|
// Build Merkle trees from leaves
|
|
let leaves_a = node_a.merkle_manager.leaves().await;
|
|
let leaves_b = node_b.merkle_manager.leaves().await;
|
|
|
|
let mut tree_a = MerkleTree::new();
|
|
for leaf in &leaves_a {
|
|
tree_a.insert(*leaf).expect("insert");
|
|
}
|
|
|
|
let mut tree_b = MerkleTree::new();
|
|
for leaf in &leaves_b {
|
|
tree_b.insert(*leaf).expect("insert");
|
|
}
|
|
|
|
// Diff should find the missing assertion
|
|
let diff = DiffResult::diff(&tree_a, &tree_b);
|
|
|
|
assert_eq!(diff.missing_hashes.len(), 1, "Should find 1 missing hash");
|
|
|
|
// The missing hash should be data3
|
|
let hash3 = *blake3::hash(&data3).as_bytes();
|
|
assert!(diff.missing_hashes.contains(&hash3), "Missing hash should be data3");
|
|
}
|
|
|
|
/// Test 4: Gossip broadcaster can be enabled/disabled.
|
|
#[tokio::test]
|
|
async fn test_gossip_enable_disable() {
|
|
// Create broadcaster with no peers (won't try to connect)
|
|
let broadcaster = GossipBroadcaster::new(vec![]).await.expect("create broadcaster");
|
|
|
|
assert!(broadcaster.is_enabled(), "Should be enabled by default");
|
|
|
|
broadcaster.disable();
|
|
assert!(!broadcaster.is_enabled(), "Should be disabled after disable()");
|
|
|
|
broadcaster.enable();
|
|
assert!(broadcaster.is_enabled(), "Should be enabled after enable()");
|
|
}
|
|
|
|
/// Test 5: Merkle tree checkpoint and restore.
|
|
#[tokio::test]
|
|
async fn test_merkle_checkpoint_restore() {
|
|
let temp_dir = tempdir().expect("create temp dir");
|
|
let store_path = temp_dir.path().to_path_buf();
|
|
|
|
// Insert some assertions and checkpoint
|
|
let hash1 = [1u8; 32];
|
|
let hash2 = [2u8; 32];
|
|
let hash3 = [3u8; 32];
|
|
|
|
{
|
|
let store = Arc::new(HybridStore::open(&store_path).expect("open store"));
|
|
let manager = MerkleTreeManager::load_or_create(store).await.expect("create manager");
|
|
|
|
manager.insert(hash1).await.expect("insert 1");
|
|
manager.insert(hash2).await.expect("insert 2");
|
|
manager.insert(hash3).await.expect("insert 3");
|
|
|
|
manager.checkpoint().await.expect("checkpoint");
|
|
}
|
|
|
|
// Reopen and verify
|
|
{
|
|
let store = Arc::new(HybridStore::open(&store_path).expect("open store"));
|
|
let manager = MerkleTreeManager::load_or_create(store).await.expect("create manager");
|
|
|
|
assert_eq!(manager.len().await, 3, "Should have 3 leaves after restore");
|
|
|
|
let leaves = manager.leaves().await;
|
|
assert_eq!(leaves[0], hash1, "First leaf should match");
|
|
assert_eq!(leaves[1], hash2, "Second leaf should match");
|
|
assert_eq!(leaves[2], hash3, "Third leaf should match");
|
|
}
|
|
}
|
|
|
|
/// Test 6: Content-addressed storage is idempotent.
|
|
#[tokio::test]
|
|
async fn test_content_addressed_idempotent() {
|
|
let node = TestNode::new([1u8; 16]).await;
|
|
|
|
// Same assertion stored multiple times via CRDT store
|
|
let data = create_test_assertion("test_subject", "price", 100, 1000);
|
|
let hash = *blake3::hash(&data).as_bytes();
|
|
let hash_hex = hex::encode(hash);
|
|
|
|
// Store same data multiple times
|
|
let key = key_codec::assertion_key("test_subject", &hash_hex);
|
|
node.store.put(&key, &data).await.expect("put 1");
|
|
node.store.put(&key, &data).await.expect("put 2");
|
|
node.store.put(&key, &data).await.expect("put 3");
|
|
|
|
// Should still retrieve the same data (content-addressed, no duplicates)
|
|
let retrieved = node.store.get(&key).await.expect("get").expect("should exist");
|
|
assert_eq!(retrieved, data, "Should retrieve same data");
|
|
}
|
|
|
|
/// Test 7: CRDT assertion store merge with data.
|
|
#[tokio::test]
|
|
async fn test_crdt_merge_with_data() {
|
|
use stemedb_storage::crdt::AssertionTransfer;
|
|
|
|
let node = TestNode::new([1u8; 16]).await;
|
|
|
|
// Create some assertion data
|
|
let data1 = create_test_assertion("test_subject", "predA", 100, 1000);
|
|
let data2 = create_test_assertion("test_subject", "predB", 200, 1001);
|
|
|
|
let hash1 = *blake3::hash(&data1).as_bytes();
|
|
let hash2 = *blake3::hash(&data2).as_bytes();
|
|
|
|
// Merge assertions via CRDT store
|
|
let transfers = vec![
|
|
AssertionTransfer { hash: hash1, data: data1.clone() },
|
|
AssertionTransfer { hash: hash2, data: data2.clone() },
|
|
];
|
|
|
|
let merged = node.crdt_store.merge_with_data("test_subject", &transfers).await.expect("merge");
|
|
|
|
assert_eq!(merged, 2, "Should have merged 2 assertions");
|
|
|
|
// Verify assertions are stored
|
|
assert!(node.crdt_store.has_assertion("test_subject", &hash1).await.expect("has 1"));
|
|
assert!(node.crdt_store.has_assertion("test_subject", &hash2).await.expect("has 2"));
|
|
}
|
|
|
|
/// Test 8: SyncConfig builder pattern.
|
|
#[tokio::test]
|
|
async fn test_sync_config_builder() {
|
|
let config = SyncConfig::new()
|
|
.with_peer("http://localhost:18182")
|
|
.with_peer("http://localhost:18192")
|
|
.with_gossip_enabled(true)
|
|
.with_gossip_fanout(2)
|
|
.with_anti_entropy_interval(Duration::from_secs(30));
|
|
|
|
assert_eq!(config.peers.len(), 2);
|
|
assert!(config.gossip_enabled);
|
|
assert_eq!(config.gossip_fanout, 2);
|
|
assert_eq!(config.anti_entropy_interval, Duration::from_secs(30));
|
|
}
|