//! M8p3 CRDT property tests. //! //! Verifies the three CMA (commutative, associative, idempotent) properties //! for `PNCounter` and `LWWRegister` using proptest (10,000 cases each). //! `CrdtSignalState` CMA is verified by unit tests in //! `tidal/src/replication/crdt/signal_state.rs`. //! Also covers `HardNegAction` hide/unhide LWW semantics and a two-node //! reconciliation integration test using `ReconciliationEngine`. #![allow(clippy::unwrap_used)] use std::sync::Arc; use std::time::Duration; use proptest::prelude::*; use tidaldb::entities::HardNegIndex; use tidaldb::replication::crdt::hlc::HlcTimestamp; use tidaldb::replication::crdt::{CrdtSignalState, LWWRegister, PNCounter}; use tidaldb::replication::reconcile::{HardNegAction, ReconciliationEngine, StateSnapshot}; use tidaldb::replication::shard::ShardId; use tidaldb::schema::{DecaySpec, EntityId, EntityKind, SchemaBuilder, Window}; use tidaldb::signals::{NoopWalWriter, SignalLedger, SignalTypeId}; // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- /// 7-day half-life lambda (matches the standard "view" signal). const LAMBDA: f64 = std::f64::consts::LN_2 / (7.0 * 24.0 * 3600.0); fn make_schema() -> tidaldb::schema::Schema { let mut builder = SchemaBuilder::new(); let _ = builder .signal( "view", EntityKind::Item, DecaySpec::Exponential { half_life: Duration::from_secs(7 * 24 * 3600), }, ) .windows(&[Window::AllTime]) .velocity(false) .add(); builder.build().expect("schema must be valid") } fn make_ledger() -> Arc { Arc::new(SignalLedger::new(make_schema(), Box::new(NoopWalWriter))) } fn make_engine() -> (ReconciliationEngine, Arc, Arc) { let ledger = make_ledger(); let hard_neg = Arc::new(HardNegIndex::new()); let engine = ReconciliationEngine::new(Arc::clone(&ledger), Arc::clone(&hard_neg)); (engine, ledger, hard_neg) } // --------------------------------------------------------------------------- // Proptest strategies // --------------------------------------------------------------------------- /// Arbitrary `ShardId` in range [0, 8) -- enough to exercise multi-node /// merge without blowing up `HashMap` sizes. fn arb_shard_id() -> impl Strategy { (0u16..8).prop_map(ShardId) } /// Arbitrary `PNCounter` with up to 4 nodes, each with up to 10K increments /// and up to 10K decrements. fn arb_pn_counter() -> impl Strategy { prop::collection::vec((arb_shard_id(), 0u64..10_000, 0u64..10_000), 0..=4).prop_map(|entries| { let mut counter = PNCounter::new(); for (node, inc, dec) in entries { if inc > 0 { counter.increment(node, inc); } if dec > 0 { counter.decrement(node, dec); } } counter }) } /// Arbitrary `HlcTimestamp` within a bounded range. fn arb_hlc_timestamp() -> impl Strategy { (0..=1_000_000u64, 0..=100u32, 0..=10u16).prop_map(|(w, l, n)| HlcTimestamp { wall_ns: w, logical: l, node_id: n, }) } /// Arbitrary `LWWRegister` -- either empty or with a single write. fn arb_lww_register() -> impl Strategy> { prop_oneof![ Just(LWWRegister::empty()), (any::(), arb_hlc_timestamp()).prop_map(|(v, ts)| { let mut r = LWWRegister::empty(); r.write(v, ts); r }), ] } /// Arbitrary `HardNegAction`. fn arb_hard_neg_action() -> impl Strategy { prop_oneof![Just(HardNegAction::Hide), Just(HardNegAction::Unhide),] } /// A (`HardNegAction`, `HlcTimestamp`) pair for register writes. fn arb_action_write() -> impl Strategy { (arb_hard_neg_action(), arb_hlc_timestamp()) } // ========================================================================= // 1. PNCounter CMA properties // ========================================================================= proptest! { #![proptest_config(ProptestConfig::with_cases(10_000))] /// merge(A, B) == merge(B, A) -- same value(), total_positive(), total_negative(). #[test] fn pn_counter_commutative(a in arb_pn_counter(), b in arb_pn_counter()) { let mut ab = a.clone(); ab.merge(&b); let mut ba = b.clone(); ba.merge(&a); prop_assert_eq!(ab.value(), ba.value(), "value differs: merge(A,B)={} vs merge(B,A)={}", ab.value(), ba.value()); prop_assert_eq!(ab.total_positive(), ba.total_positive(), "total_positive differs"); prop_assert_eq!(ab.total_negative(), ba.total_negative(), "total_negative differs"); } /// merge(A, merge(B, C)) == merge(merge(A, B), C). #[test] fn pn_counter_associative( a in arb_pn_counter(), b in arb_pn_counter(), c in arb_pn_counter(), ) { // (A merge B) merge C let mut ab_c = a.clone(); ab_c.merge(&b); ab_c.merge(&c); // A merge (B merge C) let mut bc = b; bc.merge(&c); let mut a_bc = a; a_bc.merge(&bc); prop_assert_eq!(ab_c.value(), a_bc.value(), "associativity violated for value()"); prop_assert_eq!(ab_c.total_positive(), a_bc.total_positive(), "associativity violated for total_positive()"); prop_assert_eq!(ab_c.total_negative(), a_bc.total_negative(), "associativity violated for total_negative()"); } /// merge(A, A) == A -- self-merge does not change value. #[test] fn pn_counter_idempotent(a in arb_pn_counter()) { let snapshot = a.clone(); let mut merged = a; merged.merge(&snapshot); prop_assert_eq!(merged.value(), snapshot.value(), "idempotency violated for value()"); prop_assert_eq!(merged.total_positive(), snapshot.total_positive(), "idempotency violated for total_positive()"); prop_assert_eq!(merged.total_negative(), snapshot.total_negative(), "idempotency violated for total_negative()"); } /// After merge, total_positive == sum of independent node contributions /// (no duplication). When two counters have disjoint node sets, the merged /// total_positive should equal the sum of both. #[test] fn pn_counter_no_double_count( amount_a in 1u64..10_000, amount_b in 1u64..10_000, node_a_raw in 0u16..4, node_b_raw in 4u16..8, // disjoint from node_a ) { let node_a = ShardId(node_a_raw); let node_b = ShardId(node_b_raw); let mut a = PNCounter::new(); a.increment(node_a, amount_a); let mut b = PNCounter::new(); b.increment(node_b, amount_b); let mut merged = a.clone(); merged.merge(&b); prop_assert_eq!( merged.total_positive(), amount_a + amount_b, "disjoint nodes: total_positive should be sum of contributions" ); } } // ========================================================================= // 2. LWWRegister CMA properties // ========================================================================= proptest! { #![proptest_config(ProptestConfig::with_cases(10_000))] /// merge(A, B) == merge(B, A) -- same get() value and timestamp(). #[test] fn lww_register_commutative(a in arb_lww_register(), b in arb_lww_register()) { let mut ab = a.clone(); ab.merge(&b); let mut ba = b.clone(); ba.merge(&a); prop_assert_eq!(ab.get(), ba.get(), "commutativity violated for get()"); prop_assert_eq!(ab.timestamp(), ba.timestamp(), "commutativity violated for timestamp()"); } /// merge(A, merge(B, C)) == merge(merge(A, B), C). #[test] fn lww_register_associative( a in arb_lww_register(), b in arb_lww_register(), c in arb_lww_register(), ) { let mut ab_c = a.clone(); ab_c.merge(&b); ab_c.merge(&c); let mut bc = b; bc.merge(&c); let mut a_bc = a; a_bc.merge(&bc); prop_assert_eq!(ab_c.get(), a_bc.get(), "associativity violated for get()"); prop_assert_eq!(ab_c.timestamp(), a_bc.timestamp(), "associativity violated for timestamp()"); } /// merge(A, A) == A. #[test] fn lww_register_idempotent(a in arb_lww_register()) { let snapshot = a.clone(); let mut merged = a; merged.merge(&snapshot); prop_assert_eq!(merged.get(), snapshot.get(), "idempotency violated for get()"); prop_assert_eq!(merged.timestamp(), snapshot.timestamp(), "idempotency violated for timestamp()"); } /// Write with strictly higher HLC timestamp always wins regardless of /// merge order. #[test] fn lww_register_higher_hlc_wins( v1 in any::(), v2 in any::(), ts1 in arb_hlc_timestamp(), ) { // Construct ts2 strictly greater than ts1. let ts2 = HlcTimestamp { wall_ns: ts1.wall_ns + 1, logical: ts1.logical, node_id: ts1.node_id, }; // Forward merge order: write v1 at ts1, merge with v2 at ts2. let mut fwd: LWWRegister = LWWRegister::empty(); fwd.write(v1, ts1); let mut other: LWWRegister = LWWRegister::empty(); other.write(v2, ts2); fwd.merge(&other); prop_assert_eq!(fwd.get(), Some(&v2), "higher HLC should win in forward merge"); // Reverse merge order. let mut rev: LWWRegister = LWWRegister::empty(); rev.write(v2, ts2); let mut other2: LWWRegister = LWWRegister::empty(); other2.write(v1, ts1); rev.merge(&other2); prop_assert_eq!(rev.get(), Some(&v2), "higher HLC should win in reverse merge"); } } // ========================================================================= // 3. HardNegAction hide/unhide semantics // ========================================================================= proptest! { #![proptest_config(ProptestConfig::with_cases(10_000))] /// Two writes (Hide at ts1, Unhide at ts2); higher HLC always wins. #[test] fn hard_neg_hide_wins_with_higher_hlc( ts_low in arb_hlc_timestamp(), ) { let ts_high = HlcTimestamp { wall_ns: ts_low.wall_ns + 1, logical: ts_low.logical, node_id: ts_low.node_id, }; // Hide at higher ts, Unhide at lower ts. let mut reg_a: LWWRegister = LWWRegister::empty(); reg_a.write(HardNegAction::Hide, ts_high); let mut reg_b: LWWRegister = LWWRegister::empty(); reg_b.write(HardNegAction::Unhide, ts_low); let mut merged = reg_a.clone(); merged.merge(®_b); prop_assert_eq!(merged.get(), Some(&HardNegAction::Hide), "Hide with higher HLC must win"); // Reverse merge order. let mut merged_rev = reg_b.clone(); merged_rev.merge(®_a); prop_assert_eq!(merged_rev.get(), Some(&HardNegAction::Hide), "Hide with higher HLC must win (reverse merge)"); // Unhide at higher ts, Hide at lower ts. let mut reg_c: LWWRegister = LWWRegister::empty(); reg_c.write(HardNegAction::Unhide, ts_high); let mut reg_d: LWWRegister = LWWRegister::empty(); reg_d.write(HardNegAction::Hide, ts_low); let mut merged2 = reg_c.clone(); merged2.merge(®_d); prop_assert_eq!(merged2.get(), Some(&HardNegAction::Unhide), "Unhide with higher HLC must win"); } /// N writes with random timestamps; result is always the write with max /// timestamp. We generate a sequence of (action, timestamp) pairs and /// apply them via merge. The winner should always be the one with the /// maximum HlcTimestamp. #[test] fn hard_neg_latest_always_wins( writes in prop::collection::vec(arb_action_write(), 2..=10), ) { // Find the expected winner: the write with the maximum timestamp. let (expected_action, expected_ts) = writes.iter() .max_by_key(|(_, ts)| *ts) .unwrap(); // Build individual registers and merge them all together. let mut merged: LWWRegister = LWWRegister::empty(); for (action, ts) in &writes { let mut r: LWWRegister = LWWRegister::empty(); r.write(action.clone(), *ts); merged.merge(&r); } prop_assert_eq!(merged.get(), Some(expected_action), "latest write (ts={:?}) should win", expected_ts); prop_assert_eq!(merged.timestamp(), Some(*expected_ts), "timestamp should be the max across all writes"); } } // ========================================================================= // 4. Integration test: two-node reconciliation // ========================================================================= /// Two-node reconciliation test. /// /// Constructs `StateSnapshot`s from two simulated nodes (each with independent /// signal contributions), runs `ReconciliationEngine::plan()`, applies the /// plan, and verifies the merged decay score is approximately the sum of the /// individual contributions. #[test] fn two_node_reconciliation_signal_merge() { let (engine, ledger, _) = make_engine(); let entity = EntityId::new(42); let sig_id = SignalTypeId::new(0); // "view" -- only signal type // Use a fixed recent timestamp so decay is negligible. let now_ns = tidaldb::schema::Timestamp::now().as_nanos(); // Node 0: contributed 3.0 let node0_state = CrdtSignalState::from_node_contribution(ShardId(0), 3.0, now_ns, LAMBDA); let score_node0 = node0_state.decay_score(now_ns); // Node 1: contributed 7.0 let node1_state = CrdtSignalState::from_node_contribution(ShardId(1), 7.0, now_ns, LAMBDA); let score_node1 = node1_state.decay_score(now_ns); // Build snapshots. let mut local_snap = StateSnapshot::new(); local_snap.add_signal_state(entity, sig_id, node0_state); let mut remote_snap = StateSnapshot::new(); remote_snap.add_signal_state(entity, sig_id, node1_state); // Plan and apply. let plan = engine.plan(&local_snap, &remote_snap); assert_eq!( plan.signal_merges.len(), 1, "should have exactly one signal merge" ); assert!( plan.hardneg_resolutions.is_empty(), "no hard-neg ops expected" ); // Verify the merged state in the plan. let merged_score = plan.signal_merges[0].merged_state.decay_score(now_ns); let expected = score_node0 + score_node1; assert!( (merged_score - expected).abs() < 1e-6, "merged score {merged_score} should equal sum {expected} of node contributions" ); // Apply the plan to the ledger. engine.apply(&plan).unwrap(); // Verify the ledger has the entry. assert!( ledger.entries().get(&(entity, sig_id)).is_some(), "ledger should have an entry for the entity after apply" ); } /// Two-node reconciliation with hard negatives. /// /// Verifies that LWW resolution for hard negatives works end-to-end through /// the reconciliation engine. #[test] fn two_node_reconciliation_hardneg_lww() { let (engine, _, hard_neg) = make_engine(); let user = EntityId::new(100); let item = EntityId::new(200); // Local: hide at t=1000 let mut local_snap = StateSnapshot::new(); let mut local_reg = LWWRegister::empty(); local_reg.write( HardNegAction::Hide, HlcTimestamp { wall_ns: 1000, logical: 0, node_id: 0, }, ); local_snap.add_hardneg_register(user, item, local_reg); // Remote: unhide at t=2000 (later -- should win) let mut remote_snap = StateSnapshot::new(); let mut remote_reg = LWWRegister::empty(); remote_reg.write( HardNegAction::Unhide, HlcTimestamp { wall_ns: 2000, logical: 0, node_id: 1, }, ); remote_snap.add_hardneg_register(user, item, remote_reg); let plan = engine.plan(&local_snap, &remote_snap); assert_eq!(plan.hardneg_resolutions.len(), 1); assert_eq!( plan.hardneg_resolutions[0].action, Some(HardNegAction::Unhide), "unhide at t=2000 should beat hide at t=1000" ); // Pre-populate a hard negative, then apply the unhide resolution. hard_neg.add(100, 200); assert!(hard_neg.is_negative(100, 200)); engine.apply(&plan).unwrap(); assert!( !hard_neg.is_negative(100, 200), "item 200 should no longer be hidden for user 100 after unhide resolution" ); } /// Two-node reconciliation: combined signals + hard negatives in one plan. #[test] fn two_node_reconciliation_combined() { let (engine, ledger, hard_neg) = make_engine(); let entity = EntityId::new(1); let sig_id = SignalTypeId::new(0); let user = EntityId::new(10); let item = EntityId::new(20); let now_ns = tidaldb::schema::Timestamp::now().as_nanos(); // Build local snapshot: node 0 signal + hide. let mut local_snap = StateSnapshot::new(); let node0 = CrdtSignalState::from_node_contribution(ShardId(0), 5.0, now_ns, LAMBDA); local_snap.add_signal_state(entity, sig_id, node0); let mut hide_reg = LWWRegister::empty(); hide_reg.write( HardNegAction::Hide, HlcTimestamp { wall_ns: 5000, logical: 0, node_id: 0, }, ); local_snap.add_hardneg_register(user, item, hide_reg); // Build remote snapshot: node 1 signal + unhide (earlier -- hide should win). let mut remote_snap = StateSnapshot::new(); let node1 = CrdtSignalState::from_node_contribution(ShardId(1), 3.0, now_ns, LAMBDA); remote_snap.add_signal_state(entity, sig_id, node1); let mut unhide_reg = LWWRegister::empty(); unhide_reg.write( HardNegAction::Unhide, HlcTimestamp { wall_ns: 1000, logical: 0, node_id: 1, }, ); remote_snap.add_hardneg_register(user, item, unhide_reg); let plan = engine.plan(&local_snap, &remote_snap); assert_eq!(plan.signal_merges.len(), 1); assert_eq!(plan.hardneg_resolutions.len(), 1); // Signal: merged score should be ~8.0 let merged_score = plan.signal_merges[0].merged_state.decay_score(now_ns); assert!( (merged_score - 8.0).abs() < 1e-6, "merged signal score {merged_score} should be ~8.0" ); // Hard neg: hide at t=5000 wins over unhide at t=1000. assert_eq!( plan.hardneg_resolutions[0].action, Some(HardNegAction::Hide) ); // Apply and verify. engine.apply(&plan).unwrap(); assert!( ledger.entries().get(&(entity, sig_id)).is_some(), "ledger should have signal entry" ); assert!( hard_neg.is_negative(10, 20), "item 20 should be hidden for user 10" ); } // ========================================================================= // 5. Scale integration: 500-event two-node reconciliation // ========================================================================= /// Two nodes each process 500 signals, reconcile, verify merged score. /// /// Each node records 500 events with weight 1.0 spread across 1ms intervals. /// After reconciliation the merged score should equal the sum of both /// nodes' individual scores (1000 total contributions). #[test] fn two_node_reconciliation_500_events() { let (engine, ledger, _) = make_engine(); let entity = EntityId::new(99); let sig_id = SignalTypeId::new(0); let now_ns = tidaldb::schema::Timestamp::now().as_nanos(); // Node 0: 500 events. let mut node0_state = CrdtSignalState::from_node_contribution(ShardId(0), 0.0, now_ns, LAMBDA); for i in 0..500u64 { let t = now_ns + i * 1_000_000; // 1ms apart node0_state.on_signal(ShardId(0), 1.0, t); } let end_ns = now_ns + 499 * 1_000_000; // Node 1: 500 events. let mut node1_state = CrdtSignalState::from_node_contribution(ShardId(1), 0.0, now_ns, LAMBDA); for i in 0..500u64 { let t = now_ns + i * 1_000_000; node1_state.on_signal(ShardId(1), 1.0, t); } let score_node0 = node0_state.decay_score(end_ns); let score_node1 = node1_state.decay_score(end_ns); let mut local_snap = StateSnapshot::new(); local_snap.add_signal_state(entity, sig_id, node0_state); let mut remote_snap = StateSnapshot::new(); remote_snap.add_signal_state(entity, sig_id, node1_state); let plan = engine.plan(&local_snap, &remote_snap); assert_eq!(plan.signal_merges.len(), 1); let merged_score = plan.signal_merges[0].merged_state.decay_score(end_ns); let expected = score_node0 + score_node1; assert!( (merged_score - expected).abs() < 1e-6, "500+500 event reconciliation: merged {merged_score} should equal sum {expected}" ); engine.apply(&plan).unwrap(); assert!( ledger.entries().get(&(entity, sig_id)).is_some(), "ledger should have entry after 500-event reconciliation" ); } // ========================================================================= // 6. MergePlan serde roundtrip // ========================================================================= /// Serialize and deserialize a `MergePlan`, verify structural equality. #[test] fn merge_plan_serde_roundtrip() { use tidaldb::replication::reconcile::{HardNegResolutionOp, MergePlan, SignalMergeOp}; let now_ns = tidaldb::schema::Timestamp::now().as_nanos(); let plan = MergePlan { signal_merges: vec![SignalMergeOp { entity_id: EntityId::new(1), signal_type_id: SignalTypeId::new(0), merged_state: CrdtSignalState::from_node_contribution(ShardId(0), 5.0, now_ns, LAMBDA), }], hardneg_resolutions: vec![HardNegResolutionOp { user_id: EntityId::new(10), item_id: EntityId::new(20), action: Some(HardNegAction::Hide), }], }; let json = serde_json::to_string(&plan).expect("serialize"); let decoded: MergePlan = serde_json::from_str(&json).expect("deserialize"); assert_eq!(decoded.signal_merges.len(), 1); assert_eq!(decoded.hardneg_resolutions.len(), 1); assert_eq!(decoded.signal_merges[0].entity_id, EntityId::new(1)); // Compare decay scores approximately (f64 JSON roundtrip loses last ULP). let orig_score = plan.signal_merges[0].merged_state.decay_score(now_ns); let decoded_score = decoded.signal_merges[0].merged_state.decay_score(now_ns); assert!( (orig_score - decoded_score).abs() < 1e-10, "CrdtSignalState decay_score should survive serde roundtrip: orig={orig_score}, decoded={decoded_score}" ); assert_eq!( decoded.hardneg_resolutions[0].action, Some(HardNegAction::Hide) ); assert_eq!(decoded.hardneg_resolutions[0].user_id, EntityId::new(10)); assert_eq!(decoded.hardneg_resolutions[0].item_id, EntityId::new(20)); }