stemedb/crates/stemedb-query/tests/battery/battery11_replication.rs
jordan 2b0923f20e feat: Distributed replication foundation (Phase 6A) - HLC, Merkle trees, CRDT stores, sync protocol
- Add Hybrid Logical Clock (HLC) for causality tracking across nodes
- Implement Merkle tree for efficient diff/sync with BLAKE3 hashing
- Add CRDT-aware stores for assertions and votes with vector clocks
- Create stemedb-sync crate with anti-entropy and gossip protocols
- Add stemedb-rpc crate with gRPC sync service (proto definitions)
- Implement SupersessionChain for tracking assertion lifecycles
- Add Aphoria application for code analysis/reporting
- Add battery11 replication test scaffolding
- Fix .gitignore to exclude nested target directories

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-02 19:31:54 -07:00

315 lines
11 KiB
Rust

//! Battery 11: Two-Node Replication Tests
//!
//! Tests for gossip broadcast and anti-entropy sync between two nodes.
//! Verifies that assertions replicate correctly and nodes converge.
#![allow(clippy::expect_used)] // Test code uses expect() for clear failure messages
use std::sync::Arc;
use std::time::Duration;
use ed25519_dalek::{Signer, SigningKey};
use rand::rngs::OsRng;
use stemedb_core::serde::serialize;
use stemedb_core::testing::AssertionBuilder;
use stemedb_core::types::{LifecycleStage, ObjectValue, SignatureEntry, SourceClass};
use stemedb_ingest::GossipBroadcast; // Import trait for methods
use stemedb_merkle::MerkleTree;
use stemedb_storage::crdt::CrdtAssertionStore;
use stemedb_storage::{key_codec, HybridStore, KVStore};
use stemedb_sync::gossip::GossipBroadcaster;
use stemedb_sync::merkle_manager::MerkleTreeManager;
use stemedb_sync::SyncConfig;
use tempfile::tempdir;
/// Create a signed assertion for testing.
fn create_test_assertion(subject: &str, predicate: &str, value: i64, timestamp: u64) -> Vec<u8> {
let mut csprng = OsRng;
let signing_key = SigningKey::generate(&mut csprng);
let verifying_key = signing_key.verifying_key();
let message = format!("{}:{}", subject, predicate);
let signature = signing_key.sign(message.as_bytes());
let assertion = AssertionBuilder::new()
.subject(subject)
.predicate(predicate)
.object(ObjectValue::Number(value as f64))
.source_class(SourceClass::Regulatory) // Using valid variant
.confidence(0.9)
.lifecycle(LifecycleStage::Proposed)
.timestamp(timestamp)
.signatures(vec![SignatureEntry {
agent_id: verifying_key.to_bytes(),
signature: signature.to_bytes(),
timestamp,
version: 1,
}])
.build();
serialize(&assertion).expect("serialize assertion")
}
/// Test node with storage and sync components.
struct TestNode {
store: Arc<HybridStore>,
merkle_manager: Arc<MerkleTreeManager<HybridStore>>,
#[allow(dead_code)]
crdt_store: Arc<CrdtAssertionStore<HybridStore>>,
#[allow(dead_code)]
node_id: [u8; 16],
_temp_dir: tempfile::TempDir,
}
impl TestNode {
async fn new(node_id: [u8; 16]) -> Self {
let temp_dir = tempdir().expect("create temp dir");
let store = Arc::new(HybridStore::open(temp_dir.path()).expect("open store"));
let merkle_manager = Arc::new(
MerkleTreeManager::load_or_create(store.clone()).await.expect("create merkle manager"),
);
// CrdtAssertionStore takes S where it stores Arc<S> internally
let crdt_store = Arc::new(CrdtAssertionStore::new(store.clone(), node_id));
Self { store, merkle_manager, crdt_store, node_id, _temp_dir: temp_dir }
}
/// Store an assertion and update Merkle tree.
async fn ingest_assertion(&self, data: &[u8]) {
let hash = blake3::hash(data);
let hash_bytes = *hash.as_bytes();
let hash_hex = hash.to_hex().to_string();
// Store assertion
let key = key_codec::assertion_key("test_subject", &hash_hex);
self.store.put(&key, data).await.expect("put assertion");
// Update Merkle tree
self.merkle_manager.insert(hash_bytes).await.expect("insert into merkle");
}
/// Check if an assertion exists by hash.
#[allow(dead_code)]
async fn has_assertion(&self, hash: &[u8; 32]) -> bool {
let hash_hex = hex::encode(hash);
let key = key_codec::assertion_key("test_subject", &hash_hex);
self.store.get(&key).await.expect("get assertion").is_some()
}
/// Get assertion count.
#[allow(dead_code)]
async fn assertion_count(&self) -> usize {
self.merkle_manager.len().await
}
/// Get Merkle root.
async fn merkle_root(&self) -> Option<[u8; 32]> {
self.merkle_manager.root().await.expect("get root")
}
}
/// Test 1: Merkle root comparison for identical trees.
#[tokio::test]
async fn test_identical_trees_same_root() {
let node_a = TestNode::new([1u8; 16]).await;
let node_b = TestNode::new([2u8; 16]).await;
// Insert same assertions in same order
let data1 = create_test_assertion("test_subject", "price", 100, 1000);
let data2 = create_test_assertion("test_subject", "price", 200, 1001);
node_a.ingest_assertion(&data1).await;
node_a.ingest_assertion(&data2).await;
node_b.ingest_assertion(&data1).await;
node_b.ingest_assertion(&data2).await;
// Merkle roots should match
let root_a = node_a.merkle_root().await.expect("root A");
let root_b = node_b.merkle_root().await.expect("root B");
assert_eq!(root_a, root_b, "Identical trees should have same root");
}
/// Test 2: Merkle root comparison for different trees.
#[tokio::test]
async fn test_different_trees_different_roots() {
let node_a = TestNode::new([1u8; 16]).await;
let node_b = TestNode::new([2u8; 16]).await;
// Insert different assertions
let data1 = create_test_assertion("test_subject", "price", 100, 1000);
let data2 = create_test_assertion("test_subject", "price", 200, 1001);
node_a.ingest_assertion(&data1).await;
node_b.ingest_assertion(&data2).await;
// Merkle roots should differ
let root_a = node_a.merkle_root().await.expect("root A");
let root_b = node_b.merkle_root().await.expect("root B");
assert_ne!(root_a, root_b, "Different trees should have different roots");
}
/// Test 3: Merkle diff finds missing assertions.
#[tokio::test]
async fn test_merkle_diff_finds_missing() {
use stemedb_merkle::DiffResult;
let node_a = TestNode::new([1u8; 16]).await;
let node_b = TestNode::new([2u8; 16]).await;
// Node A has assertions 1, 2
let data1 = create_test_assertion("test_subject", "price", 100, 1000);
let data2 = create_test_assertion("test_subject", "price", 200, 1001);
let data3 = create_test_assertion("test_subject", "price", 300, 1002);
node_a.ingest_assertion(&data1).await;
node_a.ingest_assertion(&data2).await;
// Node B has assertions 1, 2, 3
node_b.ingest_assertion(&data1).await;
node_b.ingest_assertion(&data2).await;
node_b.ingest_assertion(&data3).await;
// Build Merkle trees from leaves
let leaves_a = node_a.merkle_manager.leaves().await;
let leaves_b = node_b.merkle_manager.leaves().await;
let mut tree_a = MerkleTree::new();
for leaf in &leaves_a {
tree_a.insert(*leaf).expect("insert");
}
let mut tree_b = MerkleTree::new();
for leaf in &leaves_b {
tree_b.insert(*leaf).expect("insert");
}
// Diff should find the missing assertion
let diff = DiffResult::diff(&tree_a, &tree_b);
assert_eq!(diff.missing_hashes.len(), 1, "Should find 1 missing hash");
// The missing hash should be data3
let hash3 = *blake3::hash(&data3).as_bytes();
assert!(diff.missing_hashes.contains(&hash3), "Missing hash should be data3");
}
/// Test 4: Gossip broadcaster can be enabled/disabled.
#[tokio::test]
async fn test_gossip_enable_disable() {
// Create broadcaster with no peers (won't try to connect)
let broadcaster = GossipBroadcaster::new(vec![]).await.expect("create broadcaster");
assert!(broadcaster.is_enabled(), "Should be enabled by default");
broadcaster.disable();
assert!(!broadcaster.is_enabled(), "Should be disabled after disable()");
broadcaster.enable();
assert!(broadcaster.is_enabled(), "Should be enabled after enable()");
}
/// Test 5: Merkle tree checkpoint and restore.
#[tokio::test]
async fn test_merkle_checkpoint_restore() {
let temp_dir = tempdir().expect("create temp dir");
let store_path = temp_dir.path().to_path_buf();
// Insert some assertions and checkpoint
let hash1 = [1u8; 32];
let hash2 = [2u8; 32];
let hash3 = [3u8; 32];
{
let store = Arc::new(HybridStore::open(&store_path).expect("open store"));
let manager = MerkleTreeManager::load_or_create(store).await.expect("create manager");
manager.insert(hash1).await.expect("insert 1");
manager.insert(hash2).await.expect("insert 2");
manager.insert(hash3).await.expect("insert 3");
manager.checkpoint().await.expect("checkpoint");
}
// Reopen and verify
{
let store = Arc::new(HybridStore::open(&store_path).expect("open store"));
let manager = MerkleTreeManager::load_or_create(store).await.expect("create manager");
assert_eq!(manager.len().await, 3, "Should have 3 leaves after restore");
let leaves = manager.leaves().await;
assert_eq!(leaves[0], hash1, "First leaf should match");
assert_eq!(leaves[1], hash2, "Second leaf should match");
assert_eq!(leaves[2], hash3, "Third leaf should match");
}
}
/// Test 6: Content-addressed storage is idempotent.
#[tokio::test]
async fn test_content_addressed_idempotent() {
let node = TestNode::new([1u8; 16]).await;
// Same assertion stored multiple times via CRDT store
let data = create_test_assertion("test_subject", "price", 100, 1000);
let hash = *blake3::hash(&data).as_bytes();
let hash_hex = hex::encode(hash);
// Store same data multiple times
let key = key_codec::assertion_key("test_subject", &hash_hex);
node.store.put(&key, &data).await.expect("put 1");
node.store.put(&key, &data).await.expect("put 2");
node.store.put(&key, &data).await.expect("put 3");
// Should still retrieve the same data (content-addressed, no duplicates)
let retrieved = node.store.get(&key).await.expect("get").expect("should exist");
assert_eq!(retrieved, data, "Should retrieve same data");
}
/// Test 7: CRDT assertion store merge with data.
#[tokio::test]
async fn test_crdt_merge_with_data() {
use stemedb_storage::crdt::AssertionTransfer;
let node = TestNode::new([1u8; 16]).await;
// Create some assertion data
let data1 = create_test_assertion("test_subject", "predA", 100, 1000);
let data2 = create_test_assertion("test_subject", "predB", 200, 1001);
let hash1 = *blake3::hash(&data1).as_bytes();
let hash2 = *blake3::hash(&data2).as_bytes();
// Merge assertions via CRDT store
let transfers = vec![
AssertionTransfer { hash: hash1, data: data1.clone() },
AssertionTransfer { hash: hash2, data: data2.clone() },
];
let merged = node.crdt_store.merge_with_data("test_subject", &transfers).await.expect("merge");
assert_eq!(merged, 2, "Should have merged 2 assertions");
// Verify assertions are stored
assert!(node.crdt_store.has_assertion("test_subject", &hash1).await.expect("has 1"));
assert!(node.crdt_store.has_assertion("test_subject", &hash2).await.expect("has 2"));
}
/// Test 8: SyncConfig builder pattern.
#[tokio::test]
async fn test_sync_config_builder() {
let config = SyncConfig::new()
.with_peer("http://localhost:9090")
.with_peer("http://localhost:9091")
.with_gossip_enabled(true)
.with_gossip_fanout(2)
.with_anti_entropy_interval(Duration::from_secs(30));
assert_eq!(config.peers.len(), 2);
assert!(config.gossip_enabled);
assert_eq!(config.gossip_fanout, 2);
assert_eq!(config.anti_entropy_interval, Duration::from_secs(30));
}