//! m8p4 integration tests: session continuity and agent memory across regions. //! //! Tests use components directly (no `TidalDb` instances needed) for speed //! and determinism. No async, no sleep, no background threads. #![allow(clippy::unwrap_used)] use std::sync::Arc; use std::time::Instant; use tidaldb::entities::HardNegIndex; use tidaldb::replication::crdt::HlcTimestamp; use tidaldb::replication::{ IdempotencyKey, IdempotencyStore, InProcessSessionTransportFactory, SessionReplicationBridge, ShardId, }; use tidaldb::session::SessionSeqNoTracker; use tidaldb::wal::format::session::{SessionSeqNo, SessionWalEvent}; // -- helpers ------------------------------------------------------------------ const fn ts(wall_ns: u64, logical: u32, node_id: u16) -> HlcTimestamp { HlcTimestamp { wall_ns, logical, node_id, } } fn signal_event(session_id: u64, entity_id: u64, seqno: u64) -> SessionWalEvent { SessionWalEvent::Signal { session_id, entity_id, weight: 1.0, ts_ns: 1_000_000_000, signal_name: "view".to_string(), annotation: None, session_seqno: Some(SessionSeqNo(seqno)), idempotency_key: Some(u128::from(seqno) * 1_000_000 + u128::from(session_id)), } } fn make_two_bridges() -> (SessionReplicationBridge, SessionReplicationBridge) { let shards = [ShardId(0), ShardId(1)]; let mut transports = InProcessSessionTransportFactory::new(&shards).build(); let t0 = transports.remove(&ShardId(0)).unwrap(); let t1 = transports.remove(&ShardId(1)).unwrap(); let b0 = SessionReplicationBridge::new( t0, Arc::new(IdempotencyStore::new(1000)), Arc::new(SessionSeqNoTracker::new()), ); let b1 = SessionReplicationBridge::new( t1, Arc::new(IdempotencyStore::new(1000)), Arc::new(SessionSeqNoTracker::new()), ); (b0, b1) } // -- Test 1: SeqNo HWM rejects duplicates ------------------------------------- #[test] fn test_session_seqno_hwm_rejects_duplicates() { let tracker = SessionSeqNoTracker::new(); let session = 1u64; // Sequence 1..5 -- all accepted. for i in 1..=5u64 { assert!( tracker.should_apply(session, SessionSeqNo(i)), "seqno {i} should be accepted" ); } // Re-send seqno 3 -- rejected. assert!( !tracker.should_apply(session, SessionSeqNo(3)), "seqno 3 is a duplicate, should be rejected" ); // Seqno 6 -- accepted (monotone resume). assert!(tracker.should_apply(session, SessionSeqNo(6))); // HWM should be 6. assert_eq!(tracker.hwm(session), SessionSeqNo(6)); } // -- Test 2: 5 session signals replicated region A -> B ----------------------- #[test] fn test_session_cross_region_visibility() { let (b0, b1) = make_two_bridges(); let session_id = 10u64; let events: Vec = (1..=5) .map(|i| signal_event(session_id, 100 + i, i)) .collect(); // Ship from region A (shard 0) to region B (shard 1). let start = Instant::now(); let shipped = b0.ship(ShardId(1), session_id, &events).unwrap(); assert_eq!(shipped, 5, "all 5 events should be shipped"); // Receive and apply on region B. let mut received = Vec::new(); let applied = b1.recv_and_apply(|e| received.push(e.clone())).unwrap(); let elapsed = start.elapsed(); assert_eq!(applied, 5, "all 5 events should be applied on follower"); assert_eq!(received.len(), 5); // AC: session visible in region B within 2 seconds (in-process transport). assert!( elapsed.as_secs() < 2, "cross-region visibility must be < 2s, was {elapsed:?}" ); } // -- Test 3: Idempotent replication (no double-counting) ---------------------- #[test] fn test_session_replication_idempotent() { let (b0, b1) = make_two_bridges(); let session_id = 20u64; let events = vec![signal_event(session_id, 200, 1)]; // First ship -- gets through. b0.ship(ShardId(1), session_id, &events).unwrap(); let applied_first = b1.recv_and_apply(|_| {}).unwrap(); assert_eq!(applied_first, 1); // Second ship of the same events -- ship HWM blocks re-shipping. let reshipped = b0.ship(ShardId(1), session_id, &events).unwrap(); assert_eq!(reshipped, 0, "ship HWM should prevent re-shipping"); } // -- Test 4: Sequential events only ship the new ones ------------------------- #[test] fn test_incremental_shipping() { let (b0, b1) = make_two_bridges(); let session_id = 30u64; // Ship batch 1 (seqno 1-3). let batch1: Vec<_> = (1..=3) .map(|i| signal_event(session_id, 300 + i, i)) .collect(); let shipped1 = b0.ship(ShardId(1), session_id, &batch1).unwrap(); assert_eq!(shipped1, 3); b1.recv_and_apply(|_| {}).unwrap(); // Ship batch 2 (seqno 1-5, but 1-3 already sent -> only 4-5 shipped). let batch2: Vec<_> = (1..=5) .map(|i| signal_event(session_id, 300 + i, i)) .collect(); let shipped2 = b0.ship(ShardId(1), session_id, &batch2).unwrap(); assert_eq!(shipped2, 2, "only new events (seqno 4-5) should be shipped"); let mut count = 0; let applied = b1.recv_and_apply(|_| count += 1).unwrap(); assert_eq!(applied, 2); assert_eq!(count, 2); } // -- Test 5: HardNeg monotonicity -- hide wins with lower unhide HLC ---------- #[test] fn test_hardneg_monotonicity_hide_wins() { let idx = HardNegIndex::new(); let user_id = 3u64; let item_id = 300u32; // Shard A: user hides item at HLC t=100. idx.apply_replication_hide(user_id, item_id, ts(100, 0, 0)); // Shard B: user had an earlier unhide at HLC t=50 (arrives during convergence). let cleared = idx.apply_replication_unhide(user_id, item_id, ts(50, 0, 1)); // Hide wins: unhide's ts (50) < hide's ts (100). assert!(!cleared, "unhide with lower HLC should be blocked"); assert!( idx.is_negative(user_id, item_id), "item must still be suppressed after lower-ts unhide" ); } // -- Test 6: Explicit unhide with higher HLC does clear the hide -------------- #[test] fn test_hardneg_explicit_unhide_with_higher_hlc() { let idx = HardNegIndex::new(); let user_id = 4u64; let item_id = 400u32; // Hide at t=50. idx.apply_replication_hide(user_id, item_id, ts(50, 0, 0)); assert!(idx.is_negative(user_id, item_id)); // Explicit unhide at t=200 (> 50). let cleared = idx.apply_replication_unhide(user_id, item_id, ts(200, 0, 1)); assert!(cleared, "unhide with strictly higher HLC should succeed"); assert!( !idx.is_negative(user_id, item_id), "item must be visible again after successful unhide" ); } // -- Test 7: Multiple hides from multiple shards -- union accumulates --------- #[test] fn test_hardneg_multi_shard_union() { let idx = HardNegIndex::new(); let user_id = 5u64; let item_id = 500u32; // Shard 0 hides at t=10. idx.apply_replication_hide(user_id, item_id, ts(10, 0, 0)); // Shard 1 hides at t=30 (higher). idx.apply_replication_hide(user_id, item_id, ts(30, 0, 1)); // Shard 2 hides at t=20 (middle). idx.apply_replication_hide(user_id, item_id, ts(20, 0, 2)); // Max hide ts is 30. Unhide at t=25 is blocked. assert!(!idx.apply_replication_unhide(user_id, item_id, ts(25, 0, 0))); assert!(idx.is_negative(user_id, item_id)); // Unhide at t=40 (> 30) succeeds. assert!(idx.apply_replication_unhide(user_id, item_id, ts(40, 0, 0))); assert!(!idx.is_negative(user_id, item_id)); } // -- Test 8: Idempotency store catches within-window duplicates --------------- #[test] fn test_idempotency_store_deduplication() { let store = IdempotencyStore::new(100); let key = IdempotencyKey::derive(1, SessionSeqNo(1), b"op"); assert!(store.check_and_record(key), "first time: accepted"); assert!(!store.check_and_record(key), "second time: duplicate"); assert!(!store.check_and_record(key), "third time: still duplicate"); // Different key is accepted. let key2 = IdempotencyKey::derive(1, SessionSeqNo(2), b"op"); assert!(store.check_and_record(key2)); }