//! M8 UAT: End-to-end acceptance tests for the Distributed Fabric milestone. //! //! Validates all five UAT steps: //! 1. Cross-region signal replication (convergence within timeout) //! 2. Leader crash and follower promotion (no data loss) //! 3. Degraded query during partition (query succeeds with partial data) //! 4. Partition heal and reconciliation (CRDT merge, hard negatives) //! 5. Tenant migration state machine (zero-downtime transitions) //! //! Plus three performance assertions: //! - Replication latency p99 < 2000ms (in-process) //! - Failover (promotion) < 10s //! - Reconciliation of 1000 events < 100ms #![allow( clippy::unwrap_used, clippy::items_after_statements, clippy::doc_markdown, clippy::significant_drop_tightening, clippy::suboptimal_flops, clippy::cast_precision_loss, clippy::needless_pass_by_value, clippy::cast_possible_truncation )] use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; use tidaldb::replication::crdt::{CrdtSignalState, HlcTimestamp, LWWRegister}; use tidaldb::replication::lag::ReplicationLagGauge; use tidaldb::replication::reconcile::{HardNegAction, ReconciliationEngine, StateSnapshot}; use tidaldb::replication::state::ReplicationState; use tidaldb::replication::{ ClusterTopology, ControlPlane, MigrationState, RegionId, ShardId, TenantConfig, TenantId, TenantMigration, TenantRouter, }; use tidaldb::schema::{DecaySpec, EntityId, EntityKind, SchemaBuilder, Timestamp, Window}; use tidaldb::signals::SignalTypeId; use tidaldb::testing::cluster::{ClusterConfig, SimulatedCluster}; use tidaldb::testing::faults::NetworkPartition; // ── Shared helpers ─────────────────────────────────────────────────────── fn m8_schema() -> tidaldb::schema::Schema { let mut builder = SchemaBuilder::new(); let _ = builder .signal( "view", EntityKind::Item, DecaySpec::Exponential { half_life: Duration::from_secs(7 * 24 * 3600), }, ) .windows(&[Window::OneHour, Window::TwentyFourHours]) .velocity(false) .add(); let _ = builder .signal( "like", EntityKind::Item, DecaySpec::Exponential { half_life: Duration::from_secs(24 * 3600), }, ) .windows(&[Window::OneHour]) .velocity(false) .add(); builder.build().unwrap() } fn three_region_config() -> ClusterConfig { ClusterConfig { regions: vec![RegionId(0), RegionId(1), RegionId(2)], leader_region: RegionId(0), schema: m8_schema(), } } fn two_region_config() -> ClusterConfig { ClusterConfig { regions: vec![RegionId(0), RegionId(1)], leader_region: RegionId(0), schema: m8_schema(), } } // ── UAT Step 1: Cross-region signal replication ────────────────────────── /// Write 25 signals to the leader (region 0). After convergence, /// verify that all three regions report the same decay score within /// floating-point epsilon. #[test] fn uat_step1_cross_region_replication() { let cluster = SimulatedCluster::build(three_region_config()); let item = EntityId::new(1); // Write 25 signals in region 0 (leader). for _ in 0..25 { cluster.write_signal("view", item, 1.0).unwrap(); } // Wait for convergence (< 2 seconds on in-process relay). cluster.await_convergence(Duration::from_secs(2)); // Read decay scores from all regions. let score_east = cluster .read_decay_score(RegionId(0), item, "view") .expect("leader must have score"); let score_west = cluster .read_decay_score(RegionId(1), item, "view") .expect("follower region 1 must have score"); let score_south = cluster .read_decay_score(RegionId(2), item, "view") .expect("follower region 2 must have score"); // All regions should report the same score within epsilon. // The small difference comes from the fact that leader writes and // follower replays happen at slightly different wall-clock times, // causing slightly different decay factors. With a 7-day half-life // and sub-second elapsed time, the difference is negligible. let epsilon = 0.5; // generous epsilon for wall-clock timing differences assert!( (score_east - score_west).abs() < epsilon, "region 1 score {score_west} diverges from leader score {score_east} by > {epsilon}" ); assert!( (score_east - score_south).abs() < epsilon, "region 2 score {score_south} diverges from leader score {score_east} by > {epsilon}" ); // All scores must be significantly greater than zero (25 events accumulated). assert!(score_east > 10.0, "leader score {score_east} is too low"); assert!( score_west > 10.0, "follower 1 score {score_west} is too low" ); assert!( score_south > 10.0, "follower 2 score {score_south} is too low" ); // Verify relay log has all 25 events. assert_eq!(cluster.relay_log_len(), 25); // Verify all followers applied all events. assert_eq!(cluster.applied_count(RegionId(1)), 25); assert_eq!(cluster.applied_count(RegionId(2)), 25); } // ── UAT Step 2: Leader crash and follower promotion ────────────────────── /// Write 100 signals to the leader. Converge. "Crash" the leader by /// promoting a follower. Verify the new leader has all signals. /// Write post-crash signals to the new leader. #[test] fn uat_step2_leader_crash_and_failover() { let cluster = SimulatedCluster::build(three_region_config()); let item = EntityId::new(2); // Write 100 signals on the leader (region 0). for _ in 0..100 { cluster.write_signal("view", item, 1.0).unwrap(); } // Wait for all followers to receive the events. cluster.await_convergence(Duration::from_secs(2)); // Record pre-crash score on region 1 (will become new leader). let pre_crash_score = cluster .read_decay_score(RegionId(1), item, "view") .expect("follower must have score before crash"); assert!( pre_crash_score > 50.0, "pre-crash score {pre_crash_score} too low; 100 events should yield > 50" ); // "Crash" the leader: promote region 1. let start = Instant::now(); cluster.promote_leader(RegionId(1)); let failover_time = start.elapsed(); // Failover must complete instantly (in-process, no election protocol). assert!( failover_time < Duration::from_secs(10), "failover took {:?}, must be < 10s", failover_time ); // Verify the new leader is region 1. assert_eq!(cluster.leader_region(), RegionId(1)); // New leader must have all 100 signals. let new_leader_score = cluster .read_decay_score(RegionId(1), item, "view") .expect("new leader must have score"); assert!( new_leader_score > 50.0, "new leader score {new_leader_score} too low after promotion" ); // Write 10 more signals to the new leader. for _ in 0..10 { cluster.write_signal("view", item, 1.0).unwrap(); } // Converge: region 2 should receive the 10 new signals. cluster.await_convergence(Duration::from_secs(2)); let score_r2 = cluster .read_decay_score(RegionId(2), item, "view") .expect("region 2 must have score"); // Region 2 should have 100 (original) + 10 (post-crash) events. assert!( score_r2 > 50.0, "region 2 score {score_r2} too low after post-crash writes" ); } // ── UAT Step 3: Degraded query during partition ────────────────────────── /// Inject a partition isolating region 2. Write more signals. /// Query the leader -- query must succeed. Region 2 does not receive /// the new signals. #[test] fn uat_step3_degraded_query_during_partition() { let cluster = SimulatedCluster::build(three_region_config()); let item = EntityId::new(3); // Seed some data before partition. for _ in 0..10 { cluster.write_signal("view", item, 1.0).unwrap(); } cluster.await_convergence(Duration::from_secs(1)); // Inject partition: region 2 is isolated. let _partition = NetworkPartition::isolate(RegionId(2), cluster.partitioned_regions()); // Write more signals during the partition. for _ in 0..5 { cluster.write_signal("view", item, 1.0).unwrap(); } // Converge: only region 1 should get the new signals. cluster.await_convergence(Duration::from_secs(1)); // Leader query must succeed. let leader_score = cluster .read_decay_score(RegionId(0), item, "view") .expect("leader must have score"); assert!( leader_score > 5.0, "leader score {leader_score} must reflect all 15 events" ); // Region 1 (non-partitioned follower) should also have the new events. let r1_score = cluster .read_decay_score(RegionId(1), item, "view") .expect("region 1 must have score"); assert!( r1_score > 5.0, "region 1 score {r1_score} must reflect 15 events" ); // Region 2 (partitioned) should still have only the 10 pre-partition events. let r2_score = cluster .read_decay_score(RegionId(2), item, "view") .expect("region 2 must have pre-partition score"); let r2_applied = cluster.applied_count(RegionId(2)); assert_eq!( r2_applied, 10, "region 2 should only have 10 events applied" ); // Region 2 score < leader score (it's missing 5 events). assert!( r2_score < leader_score, "partitioned region 2 ({r2_score}) should lag behind leader ({leader_score})" ); } // ── UAT Step 4: Partition heal and reconciliation ──────────────────────── /// Heal the partition from Step 3. Verify region 2 catches up. /// Also test CRDT reconciliation of diverged signal states and /// hard negative propagation. #[test] fn uat_step4_partition_heal_and_reconciliation() { let cluster = SimulatedCluster::build(three_region_config()); let item = EntityId::new(4); // Phase 1: Write some events, then partition region 2. for _ in 0..20 { cluster.write_signal("view", item, 1.0).unwrap(); } cluster.await_convergence(Duration::from_secs(1)); // Partition region 2. { let _partition = NetworkPartition::isolate(RegionId(2), cluster.partitioned_regions()); // Write 30 more events to the leader during partition. for _ in 0..30 { cluster.write_signal("view", item, 1.0).unwrap(); } // Converge region 1 only (region 2 is partitioned). cluster.await_convergence(Duration::from_secs(1)); // Verify region 2 is behind. assert_eq!( cluster.applied_count(RegionId(2)), 20, "region 2 should only have 20 events" ); } // Partition dropped here -- healed automatically. // Phase 2: Converge after partition heal. cluster.await_convergence(Duration::from_secs(2)); // Region 2 should now have all 50 events. assert_eq!( cluster.applied_count(RegionId(2)), 50, "region 2 should have all 50 events after partition heal" ); let score_east = cluster.read_decay_score(RegionId(0), item, "view").unwrap(); let score_south = cluster.read_decay_score(RegionId(2), item, "view").unwrap(); // Scores should be close (within wall-clock timing epsilon). let epsilon = 1.0; assert!( (score_east - score_south).abs() < epsilon, "post-heal scores diverge: leader={score_east}, region2={score_south}" ); // ── CRDT reconciliation sub-test ───────────────────────────────── // Test the ReconciliationEngine with diverged CRDT states. let lambda = std::f64::consts::LN_2 / (7.0 * 24.0 * 3600.0); let now_ns = Timestamp::now().as_nanos(); // Node A (shard 0) has 50 events. let mut state_a = CrdtSignalState::new(lambda); for _ in 0..50 { state_a.on_signal(ShardId(0), 1.0, now_ns); } // Node B (shard 1) has 30 events (accumulated during partition). let mut state_b = CrdtSignalState::new(lambda); for _ in 0..30 { state_b.on_signal(ShardId(1), 1.0, now_ns); } let mut snap_a = StateSnapshot::new(); snap_a.add_signal_state(item, SignalTypeId::new(0), state_a.clone()); let mut snap_b = StateSnapshot::new(); snap_b.add_signal_state(item, SignalTypeId::new(0), state_b.clone()); // Use a standalone ledger and hard-neg index for reconciliation. let recon_schema = m8_schema(); let ledger = Arc::new(tidaldb::signals::SignalLedger::new( recon_schema, Box::new(tidaldb::signals::NoopWalWriter), )); let hard_negs = Arc::new(tidaldb::entities::HardNegIndex::new()); let engine = ReconciliationEngine::new(Arc::clone(&ledger), Arc::clone(&hard_negs)); let plan = engine.plan(&snap_a, &snap_b); // Plan should have 1 signal merge (entity 4, signal type 0). assert_eq!( plan.signal_merges.len(), 1, "plan should have exactly 1 signal merge" ); // Verify merged score = sum of both sides. let merged = &plan.signal_merges[0]; let merged_score = merged.merged_state.decay_score(now_ns); let expected = state_a.decay_score(now_ns) + state_b.decay_score(now_ns); assert!( (merged_score - expected).abs() < 1e-6, "merged CRDT score {merged_score} != expected {expected}" ); // Apply the plan. engine.apply(&plan).unwrap(); // ── Hard negative sub-test ─────────────────────────────────────── let user = EntityId::new(100); // Simulate: region 2 hides an item during partition (newer timestamp). let ts_hide = HlcTimestamp { wall_ns: 200, logical: 0, node_id: 2, }; let ts_older = HlcTimestamp { wall_ns: 100, logical: 0, node_id: 0, }; let mut snap_local = StateSnapshot::new(); // Local side: an older unhide. let mut reg_local: LWWRegister = LWWRegister::empty(); reg_local.write(HardNegAction::Unhide, ts_older); snap_local.add_hardneg_register(user, item, reg_local); let mut snap_remote = StateSnapshot::new(); // Remote side: hide at a later timestamp. let mut reg_remote: LWWRegister = LWWRegister::empty(); reg_remote.write(HardNegAction::Hide, ts_hide); snap_remote.add_hardneg_register(user, item, reg_remote); let plan = engine.plan(&snap_local, &snap_remote); assert_eq!( plan.hardneg_resolutions.len(), 1, "should have 1 hard-neg resolution" ); assert_eq!( plan.hardneg_resolutions[0].action, Some(HardNegAction::Hide), "LWW should resolve to Hide (newer timestamp)" ); engine.apply(&plan).unwrap(); // Verify hard negative is applied. assert!( hard_negs.is_negative(user.as_u64(), item.as_u64() as u32), "hard negative must be applied after reconciliation" ); } // ── UAT Step 5: Tenant migration state machine ────────────────────────── /// Drive the `TenantMigration` state machine through all transitions /// while simultaneously writing signals. Verify all transitions succeed. #[test] fn uat_step5_tenant_migration() { let cluster = SimulatedCluster::build(three_region_config()); let tenant = TenantId(42); let item = EntityId::new(5); // Register tenant on the leader's router. cluster .leader() .db .tenant_router() .register_tenant(TenantConfig { tenant_id: tenant, max_signals_per_sec: None, max_entities: None, max_storage_bytes: None, required_regions: vec![RegionId(0)], label: "migrating-tenant".into(), }); // Write 100 signals before migration. for _ in 0..100 { cluster .leader() .db .signal_for_tenant(tenant, "view", item, 1.0, Timestamp::now()) .unwrap(); } cluster.await_convergence(Duration::from_secs(1)); // Build the TenantMigration state machine. let topo = Arc::new(RwLock::new(ClusterTopology::single())); let router = Arc::new(TenantRouter::new(Arc::clone(&topo))); let rep_state = Arc::new(ReplicationState::single()); let lag = Arc::new(ReplicationLagGauge::new(ShardId::SINGLE, rep_state)); let cp = Arc::new(ControlPlane::new(topo, Arc::clone(&router), lag)); let migration = TenantMigration::new(tenant, ShardId(0), ShardId(1), cp, router); // Step 1: Idle -> PreparingTarget assert_eq!(migration.current_state(), MigrationState::Idle); let seqno = migration.prepare_target(42).unwrap(); assert_eq!(seqno, 42); assert!(matches!( migration.current_state(), MigrationState::PreparingTarget { .. } )); // Step 2: PreparingTarget -> DualWrite let cutover = migration.enter_dual_write(100).unwrap(); assert_eq!(cutover, 100); assert!(matches!( migration.current_state(), MigrationState::DualWrite { .. } )); // Write 50 more signals during dual-write. for _ in 0..50 { cluster .leader() .db .signal_for_tenant(tenant, "view", item, 1.0, Timestamp::now()) .unwrap(); } // Step 3: DualWrite -> Finalizing migration.finalize(150).unwrap(); assert!(matches!( migration.current_state(), MigrationState::Finalizing { .. } )); // Step 4: Finalizing -> Complete (gc_window_ns = 0 for instant GC). migration.gc_source(0).unwrap(); assert_eq!(migration.current_state(), MigrationState::Complete); // Verify all 150 signals are present on the leader. let score = cluster .read_decay_score(RegionId(0), item, "view") .expect("leader must have score"); assert!( score > 50.0, "leader score {score} must reflect 150 signals" ); } // ── Performance: Replication latency p99 ───────────────────────────────── /// Measure time from `write_signal()` on leader to `read_decay_score()` /// on follower returning `Some`. 1000 samples, p99 < 2000ms. #[test] fn perf_replication_latency_p99() { let cluster = SimulatedCluster::build(two_region_config()); let mut latencies_us: Vec = Vec::with_capacity(1000); for i in 0u64..1000 { let item = EntityId::new(1_000_000 + i); let before = Instant::now(); cluster.write_signal("view", item, 1.0).unwrap(); cluster.await_convergence(Duration::from_secs(3)); let score = cluster.read_decay_score(RegionId(1), item, "view"); assert!( score.is_some(), "follower must have score for entity {i} after convergence" ); let elapsed = before.elapsed().as_micros(); latencies_us.push(elapsed); } latencies_us.sort_unstable(); let p99_idx = (latencies_us.len() as f64 * 0.99) as usize; let p99_us = latencies_us[p99_idx.min(latencies_us.len() - 1)]; let p99_ms = p99_us / 1_000; println!( "Replication latency: p50={}us p99={}us ({}ms)", latencies_us[latencies_us.len() / 2], p99_us, p99_ms, ); assert!( p99_ms < 2000, "replication latency p99 = {p99_ms}ms, must be < 2000ms" ); } // ── Performance: Failover under 10s ────────────────────────────────────── /// Measure time to promote a follower after leader crash. #[test] fn perf_failover_under_10s() { let cluster = SimulatedCluster::build(three_region_config()); // Write some data. for _ in 0..50 { cluster .write_signal("view", EntityId::new(10), 1.0) .unwrap(); } cluster.await_convergence(Duration::from_secs(1)); let start = Instant::now(); cluster.promote_leader(RegionId(1)); let elapsed = start.elapsed(); println!("Failover completed in {:?}", elapsed); assert!( elapsed < Duration::from_secs(10), "failover must complete within 10 seconds, took {:?}", elapsed ); // Verify the new leader can accept writes. cluster .write_signal("view", EntityId::new(10), 1.0) .unwrap(); let score = cluster .read_decay_score(RegionId(1), EntityId::new(10), "view") .expect("new leader must have score"); assert!(score > 0.0, "new leader must have positive score"); } // ── Performance: Reconciliation overhead ───────────────────────────────── /// Reconcile 1000 events (500 per side). Measure time. Assert < 100ms. #[test] fn perf_reconciliation_overhead() { let lambda = std::f64::consts::LN_2 / (7.0 * 24.0 * 3600.0); let now_ns = Timestamp::now().as_nanos(); // Build two snapshots: 500 distinct entities per side. let mut snap_a = StateSnapshot::new(); let mut snap_b = StateSnapshot::new(); for i in 0u64..500 { let mut state = CrdtSignalState::new(lambda); state.on_signal(ShardId(0), 1.0, now_ns); snap_a.add_signal_state(EntityId::new(i), SignalTypeId::new(0), state); } for i in 500u64..1000 { let mut state = CrdtSignalState::new(lambda); state.on_signal(ShardId(1), 1.0, now_ns); snap_b.add_signal_state(EntityId::new(i), SignalTypeId::new(0), state); } let schema = m8_schema(); let ledger = Arc::new(tidaldb::signals::SignalLedger::new( schema, Box::new(tidaldb::signals::NoopWalWriter), )); let hard_negs = Arc::new(tidaldb::entities::HardNegIndex::new()); let engine = ReconciliationEngine::new(Arc::clone(&ledger), Arc::clone(&hard_negs)); let start = Instant::now(); let plan = engine.plan(&snap_a, &snap_b); engine.apply(&plan).unwrap(); let elapsed = start.elapsed(); println!( "Reconciliation of {} signal merges took {:?}", plan.signal_merges.len(), elapsed ); assert_eq!( plan.signal_merges.len(), 1000, "plan should have 1000 signal merges (500 + 500 disjoint entities)" ); assert!( elapsed < Duration::from_millis(100), "reconciliation took {:?}, must be < 100ms", elapsed ); }