tidaldb/tidal/tests/m8_uat.rs
jordan eca7765e8d fix: heal_region re-delivers missed WAL batches so partitioned followers converge immediately after heal
- Extract redeliver_missed(tx, db, log) helper into cluster_transport.rs
- heal_region now removes partition then immediately ships any missed
  batch-log entries to the healed follower's channel
- await_convergence refactored to call the same helper (no logic change)
- tidal-server: reload_text_index before search in cluster mode
- tidal-server: write_signal returns Result instead of panicking on unknown signal
- tidal-server: leader shows lag_events=0 (writes directly, no receiver thread)
- tidal-server: fix cluster mode error propagation (ServerError::from)
- docs/runbooks/cluster.md: add full cluster operations runbook
- docker/: add Dockerfile for containerised cluster deployment
- README.md: add tidal-server HTTP API getting-started section
- Split oversized source files per CODING_GUIDELINES §9

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-25 11:57:01 -07:00

670 lines
23 KiB
Rust

//! M8 UAT: End-to-end acceptance tests for the Distributed Fabric milestone.
//!
//! Validates all five UAT steps:
//! 1. Cross-region signal replication (convergence within timeout)
//! 2. Leader crash and follower promotion (no data loss)
//! 3. Degraded query during partition (query succeeds with partial data)
//! 4. Partition heal and reconciliation (CRDT merge, hard negatives)
//! 5. Tenant migration state machine (zero-downtime transitions)
//!
//! Plus three performance assertions:
//! - Replication latency p99 < 2000ms (in-process)
//! - Failover (promotion) < 10s
//! - Reconciliation of 1000 events < 100ms
#![allow(
clippy::unwrap_used,
clippy::items_after_statements,
clippy::doc_markdown,
clippy::significant_drop_tightening,
clippy::suboptimal_flops,
clippy::cast_precision_loss,
clippy::needless_pass_by_value,
clippy::cast_possible_truncation
)]
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use tidaldb::replication::crdt::{CrdtSignalState, HlcTimestamp, LWWRegister};
use tidaldb::replication::lag::ReplicationLagGauge;
use tidaldb::replication::reconcile::{HardNegAction, ReconciliationEngine, StateSnapshot};
use tidaldb::replication::state::ReplicationState;
use tidaldb::replication::{
ClusterTopology, ControlPlane, MigrationState, RegionId, ShardId, TenantConfig, TenantId,
TenantMigration, TenantRouter,
};
use tidaldb::schema::{DecaySpec, EntityId, EntityKind, SchemaBuilder, Timestamp, Window};
use tidaldb::signals::SignalTypeId;
use tidaldb::testing::cluster::{ClusterConfig, SimulatedCluster};
use tidaldb::testing::faults::NetworkPartition;
// ── Shared helpers ───────────────────────────────────────────────────────
fn m8_schema() -> tidaldb::schema::Schema {
let mut builder = SchemaBuilder::new();
let _ = builder
.signal(
"view",
EntityKind::Item,
DecaySpec::Exponential {
half_life: Duration::from_secs(7 * 24 * 3600),
},
)
.windows(&[Window::OneHour, Window::TwentyFourHours])
.velocity(false)
.add();
let _ = builder
.signal(
"like",
EntityKind::Item,
DecaySpec::Exponential {
half_life: Duration::from_secs(24 * 3600),
},
)
.windows(&[Window::OneHour])
.velocity(false)
.add();
builder.build().unwrap()
}
fn three_region_config() -> ClusterConfig {
ClusterConfig {
regions: vec![RegionId(0), RegionId(1), RegionId(2)],
leader_region: RegionId(0),
schema: m8_schema(),
}
}
fn two_region_config() -> ClusterConfig {
ClusterConfig {
regions: vec![RegionId(0), RegionId(1)],
leader_region: RegionId(0),
schema: m8_schema(),
}
}
// ── UAT Step 1: Cross-region signal replication ──────────────────────────
/// Write 25 signals to the leader (region 0). After convergence,
/// verify that all three regions report the same decay score within
/// floating-point epsilon.
#[test]
fn uat_step1_cross_region_replication() {
let cluster = SimulatedCluster::build(three_region_config());
let item = EntityId::new(1);
// Write 25 signals in region 0 (leader).
for _ in 0..25 {
cluster.write_signal("view", item, 1.0).unwrap();
}
// Wait for convergence (< 2 seconds on in-process relay).
cluster.await_convergence(Duration::from_secs(2));
// Read decay scores from all regions.
let score_east = cluster
.read_decay_score(RegionId(0), item, "view")
.expect("leader must have score");
let score_west = cluster
.read_decay_score(RegionId(1), item, "view")
.expect("follower region 1 must have score");
let score_south = cluster
.read_decay_score(RegionId(2), item, "view")
.expect("follower region 2 must have score");
// All regions should report the same score within epsilon.
// The small difference comes from the fact that leader writes and
// follower replays happen at slightly different wall-clock times,
// causing slightly different decay factors. With a 7-day half-life
// and sub-second elapsed time, the difference is negligible.
let epsilon = 0.5; // generous epsilon for wall-clock timing differences
assert!(
(score_east - score_west).abs() < epsilon,
"region 1 score {score_west} diverges from leader score {score_east} by > {epsilon}"
);
assert!(
(score_east - score_south).abs() < epsilon,
"region 2 score {score_south} diverges from leader score {score_east} by > {epsilon}"
);
// All scores must be significantly greater than zero (25 events accumulated).
assert!(score_east > 10.0, "leader score {score_east} is too low");
assert!(
score_west > 10.0,
"follower 1 score {score_west} is too low"
);
assert!(
score_south > 10.0,
"follower 2 score {score_south} is too low"
);
// Verify relay log has all 25 events.
assert_eq!(cluster.relay_log_len(), 25);
// Verify all followers applied all events.
assert_eq!(cluster.applied_count(RegionId(1)), 25);
assert_eq!(cluster.applied_count(RegionId(2)), 25);
}
// ── UAT Step 2: Leader crash and follower promotion ──────────────────────
/// Write 100 signals to the leader. Converge. "Crash" the leader by
/// promoting a follower. Verify the new leader has all signals.
/// Write post-crash signals to the new leader.
#[test]
fn uat_step2_leader_crash_and_failover() {
let cluster = SimulatedCluster::build(three_region_config());
let item = EntityId::new(2);
// Write 100 signals on the leader (region 0).
for _ in 0..100 {
cluster.write_signal("view", item, 1.0).unwrap();
}
// Wait for all followers to receive the events.
cluster.await_convergence(Duration::from_secs(2));
// Record pre-crash score on region 1 (will become new leader).
let pre_crash_score = cluster
.read_decay_score(RegionId(1), item, "view")
.expect("follower must have score before crash");
assert!(
pre_crash_score > 50.0,
"pre-crash score {pre_crash_score} too low; 100 events should yield > 50"
);
// "Crash" the leader: promote region 1.
let start = Instant::now();
cluster.promote_leader(RegionId(1));
let failover_time = start.elapsed();
// Failover must complete instantly (in-process, no election protocol).
assert!(
failover_time < Duration::from_secs(10),
"failover took {:?}, must be < 10s",
failover_time
);
// Verify the new leader is region 1.
assert_eq!(cluster.leader_region(), RegionId(1));
// New leader must have all 100 signals.
let new_leader_score = cluster
.read_decay_score(RegionId(1), item, "view")
.expect("new leader must have score");
assert!(
new_leader_score > 50.0,
"new leader score {new_leader_score} too low after promotion"
);
// Write 10 more signals to the new leader.
for _ in 0..10 {
cluster.write_signal("view", item, 1.0).unwrap();
}
// Converge: region 2 should receive the 10 new signals.
cluster.await_convergence(Duration::from_secs(2));
let score_r2 = cluster
.read_decay_score(RegionId(2), item, "view")
.expect("region 2 must have score");
// Region 2 should have 100 (original) + 10 (post-crash) events.
assert!(
score_r2 > 50.0,
"region 2 score {score_r2} too low after post-crash writes"
);
}
// ── UAT Step 3: Degraded query during partition ──────────────────────────
/// Inject a partition isolating region 2. Write more signals.
/// Query the leader -- query must succeed. Region 2 does not receive
/// the new signals.
#[test]
fn uat_step3_degraded_query_during_partition() {
let cluster = SimulatedCluster::build(three_region_config());
let item = EntityId::new(3);
// Seed some data before partition.
for _ in 0..10 {
cluster.write_signal("view", item, 1.0).unwrap();
}
cluster.await_convergence(Duration::from_secs(1));
// Inject partition: region 2 is isolated.
let _partition = NetworkPartition::isolate(RegionId(2), cluster.partitioned_regions());
// Write more signals during the partition.
for _ in 0..5 {
cluster.write_signal("view", item, 1.0).unwrap();
}
// Converge: only region 1 should get the new signals.
cluster.await_convergence(Duration::from_secs(1));
// Leader query must succeed.
let leader_score = cluster
.read_decay_score(RegionId(0), item, "view")
.expect("leader must have score");
assert!(
leader_score > 5.0,
"leader score {leader_score} must reflect all 15 events"
);
// Region 1 (non-partitioned follower) should also have the new events.
let r1_score = cluster
.read_decay_score(RegionId(1), item, "view")
.expect("region 1 must have score");
assert!(
r1_score > 5.0,
"region 1 score {r1_score} must reflect 15 events"
);
// Region 2 (partitioned) should still have only the 10 pre-partition events.
let r2_score = cluster
.read_decay_score(RegionId(2), item, "view")
.expect("region 2 must have pre-partition score");
let r2_applied = cluster.applied_count(RegionId(2));
assert_eq!(
r2_applied, 10,
"region 2 should only have 10 events applied"
);
// Region 2 score < leader score (it's missing 5 events).
assert!(
r2_score < leader_score,
"partitioned region 2 ({r2_score}) should lag behind leader ({leader_score})"
);
}
// ── UAT Step 4: Partition heal and reconciliation ────────────────────────
/// Heal the partition from Step 3. Verify region 2 catches up.
/// Also test CRDT reconciliation of diverged signal states and
/// hard negative propagation.
#[test]
fn uat_step4_partition_heal_and_reconciliation() {
let cluster = SimulatedCluster::build(three_region_config());
let item = EntityId::new(4);
// Phase 1: Write some events, then partition region 2.
for _ in 0..20 {
cluster.write_signal("view", item, 1.0).unwrap();
}
cluster.await_convergence(Duration::from_secs(1));
// Partition region 2.
{
let _partition = NetworkPartition::isolate(RegionId(2), cluster.partitioned_regions());
// Write 30 more events to the leader during partition.
for _ in 0..30 {
cluster.write_signal("view", item, 1.0).unwrap();
}
// Converge region 1 only (region 2 is partitioned).
cluster.await_convergence(Duration::from_secs(1));
// Verify region 2 is behind.
assert_eq!(
cluster.applied_count(RegionId(2)),
20,
"region 2 should only have 20 events"
);
}
// Partition dropped here -- healed automatically.
// Phase 2: Converge after partition heal.
cluster.await_convergence(Duration::from_secs(2));
// Region 2 should now have all 50 events.
assert_eq!(
cluster.applied_count(RegionId(2)),
50,
"region 2 should have all 50 events after partition heal"
);
let score_east = cluster.read_decay_score(RegionId(0), item, "view").unwrap();
let score_south = cluster.read_decay_score(RegionId(2), item, "view").unwrap();
// Scores should be close (within wall-clock timing epsilon).
let epsilon = 1.0;
assert!(
(score_east - score_south).abs() < epsilon,
"post-heal scores diverge: leader={score_east}, region2={score_south}"
);
// ── CRDT reconciliation sub-test ─────────────────────────────────
// Test the ReconciliationEngine with diverged CRDT states.
let lambda = std::f64::consts::LN_2 / (7.0 * 24.0 * 3600.0);
let now_ns = Timestamp::now().as_nanos();
// Node A (shard 0) has 50 events.
let mut state_a = CrdtSignalState::new(lambda);
for _ in 0..50 {
state_a.on_signal(ShardId(0), 1.0, now_ns);
}
// Node B (shard 1) has 30 events (accumulated during partition).
let mut state_b = CrdtSignalState::new(lambda);
for _ in 0..30 {
state_b.on_signal(ShardId(1), 1.0, now_ns);
}
let mut snap_a = StateSnapshot::new();
snap_a.add_signal_state(item, SignalTypeId::new(0), state_a.clone());
let mut snap_b = StateSnapshot::new();
snap_b.add_signal_state(item, SignalTypeId::new(0), state_b.clone());
// Use a standalone ledger and hard-neg index for reconciliation.
let recon_schema = m8_schema();
let ledger = Arc::new(tidaldb::signals::SignalLedger::new(
recon_schema,
Box::new(tidaldb::signals::NoopWalWriter),
));
let hard_negs = Arc::new(tidaldb::entities::HardNegIndex::new());
let engine = ReconciliationEngine::new(Arc::clone(&ledger), Arc::clone(&hard_negs));
let plan = engine.plan(&snap_a, &snap_b);
// Plan should have 1 signal merge (entity 4, signal type 0).
assert_eq!(
plan.signal_merges.len(),
1,
"plan should have exactly 1 signal merge"
);
// Verify merged score = sum of both sides.
let merged = &plan.signal_merges[0];
let merged_score = merged.merged_state.decay_score(now_ns);
let expected = state_a.decay_score(now_ns) + state_b.decay_score(now_ns);
assert!(
(merged_score - expected).abs() < 1e-6,
"merged CRDT score {merged_score} != expected {expected}"
);
// Apply the plan.
engine.apply(&plan).unwrap();
// ── Hard negative sub-test ───────────────────────────────────────
let user = EntityId::new(100);
// Simulate: region 2 hides an item during partition (newer timestamp).
let ts_hide = HlcTimestamp {
wall_ns: 200,
logical: 0,
node_id: 2,
};
let ts_older = HlcTimestamp {
wall_ns: 100,
logical: 0,
node_id: 0,
};
let mut snap_local = StateSnapshot::new();
// Local side: an older unhide.
let mut reg_local: LWWRegister<HardNegAction> = LWWRegister::empty();
reg_local.write(HardNegAction::Unhide, ts_older);
snap_local.add_hardneg_register(user, item, reg_local);
let mut snap_remote = StateSnapshot::new();
// Remote side: hide at a later timestamp.
let mut reg_remote: LWWRegister<HardNegAction> = LWWRegister::empty();
reg_remote.write(HardNegAction::Hide, ts_hide);
snap_remote.add_hardneg_register(user, item, reg_remote);
let plan = engine.plan(&snap_local, &snap_remote);
assert_eq!(
plan.hardneg_resolutions.len(),
1,
"should have 1 hard-neg resolution"
);
assert_eq!(
plan.hardneg_resolutions[0].action,
Some(HardNegAction::Hide),
"LWW should resolve to Hide (newer timestamp)"
);
engine.apply(&plan).unwrap();
// Verify hard negative is applied.
assert!(
hard_negs.is_negative(user.as_u64(), item.as_u64() as u32),
"hard negative must be applied after reconciliation"
);
}
// ── UAT Step 5: Tenant migration state machine ──────────────────────────
/// Drive the `TenantMigration` state machine through all transitions
/// while simultaneously writing signals. Verify all transitions succeed.
#[test]
fn uat_step5_tenant_migration() {
let cluster = SimulatedCluster::build(three_region_config());
let tenant = TenantId(42);
let item = EntityId::new(5);
// Register tenant on the leader's router.
cluster
.leader()
.db
.tenant_router()
.register_tenant(TenantConfig {
tenant_id: tenant,
max_signals_per_sec: None,
max_entities: None,
max_storage_bytes: None,
required_regions: vec![RegionId(0)],
label: "migrating-tenant".into(),
});
// Write 100 signals before migration.
for _ in 0..100 {
cluster
.leader()
.db
.signal_for_tenant(tenant, "view", item, 1.0, Timestamp::now())
.unwrap();
}
cluster.await_convergence(Duration::from_secs(1));
// Build the TenantMigration state machine.
let topo = Arc::new(RwLock::new(ClusterTopology::single()));
let router = Arc::new(TenantRouter::new(Arc::clone(&topo)));
let rep_state = Arc::new(ReplicationState::single());
let lag = Arc::new(ReplicationLagGauge::new(ShardId::SINGLE, rep_state));
let cp = Arc::new(ControlPlane::new(topo, Arc::clone(&router), lag));
let migration = TenantMigration::new(tenant, ShardId(0), ShardId(1), cp, router);
// Step 1: Idle -> PreparingTarget
assert_eq!(migration.current_state(), MigrationState::Idle);
let seqno = migration.prepare_target(42).unwrap();
assert_eq!(seqno, 42);
assert!(matches!(
migration.current_state(),
MigrationState::PreparingTarget { .. }
));
// Step 2: PreparingTarget -> DualWrite
let cutover = migration.enter_dual_write(100).unwrap();
assert_eq!(cutover, 100);
assert!(matches!(
migration.current_state(),
MigrationState::DualWrite { .. }
));
// Write 50 more signals during dual-write.
for _ in 0..50 {
cluster
.leader()
.db
.signal_for_tenant(tenant, "view", item, 1.0, Timestamp::now())
.unwrap();
}
// Step 3: DualWrite -> Finalizing
migration.finalize(150).unwrap();
assert!(matches!(
migration.current_state(),
MigrationState::Finalizing { .. }
));
// Step 4: Finalizing -> Complete (gc_window_ns = 0 for instant GC).
migration.gc_source(0).unwrap();
assert_eq!(migration.current_state(), MigrationState::Complete);
// Verify all 150 signals are present on the leader.
let score = cluster
.read_decay_score(RegionId(0), item, "view")
.expect("leader must have score");
assert!(
score > 50.0,
"leader score {score} must reflect 150 signals"
);
}
// ── Performance: Replication latency p99 ─────────────────────────────────
/// Measure time from `write_signal()` on leader to `read_decay_score()`
/// on follower returning `Some`. 1000 samples, p99 < 2000ms.
#[test]
fn perf_replication_latency_p99() {
let cluster = SimulatedCluster::build(two_region_config());
let mut latencies_us: Vec<u128> = Vec::with_capacity(1000);
for i in 0u64..1000 {
let item = EntityId::new(1_000_000 + i);
let before = Instant::now();
cluster.write_signal("view", item, 1.0).unwrap();
cluster.await_convergence(Duration::from_secs(3));
let score = cluster.read_decay_score(RegionId(1), item, "view");
assert!(
score.is_some(),
"follower must have score for entity {i} after convergence"
);
let elapsed = before.elapsed().as_micros();
latencies_us.push(elapsed);
}
latencies_us.sort_unstable();
let p99_idx = (latencies_us.len() as f64 * 0.99) as usize;
let p99_us = latencies_us[p99_idx.min(latencies_us.len() - 1)];
let p99_ms = p99_us / 1_000;
println!(
"Replication latency: p50={}us p99={}us ({}ms)",
latencies_us[latencies_us.len() / 2],
p99_us,
p99_ms,
);
assert!(
p99_ms < 2000,
"replication latency p99 = {p99_ms}ms, must be < 2000ms"
);
}
// ── Performance: Failover under 10s ──────────────────────────────────────
/// Measure time to promote a follower after leader crash.
#[test]
fn perf_failover_under_10s() {
let cluster = SimulatedCluster::build(three_region_config());
// Write some data.
for _ in 0..50 {
cluster
.write_signal("view", EntityId::new(10), 1.0)
.unwrap();
}
cluster.await_convergence(Duration::from_secs(1));
let start = Instant::now();
cluster.promote_leader(RegionId(1));
let elapsed = start.elapsed();
println!("Failover completed in {:?}", elapsed);
assert!(
elapsed < Duration::from_secs(10),
"failover must complete within 10 seconds, took {:?}",
elapsed
);
// Verify the new leader can accept writes.
cluster
.write_signal("view", EntityId::new(10), 1.0)
.unwrap();
let score = cluster
.read_decay_score(RegionId(1), EntityId::new(10), "view")
.expect("new leader must have score");
assert!(score > 0.0, "new leader must have positive score");
}
// ── Performance: Reconciliation overhead ─────────────────────────────────
/// Reconcile 1000 events (500 per side). Measure time. Assert < 100ms.
#[test]
fn perf_reconciliation_overhead() {
let lambda = std::f64::consts::LN_2 / (7.0 * 24.0 * 3600.0);
let now_ns = Timestamp::now().as_nanos();
// Build two snapshots: 500 distinct entities per side.
let mut snap_a = StateSnapshot::new();
let mut snap_b = StateSnapshot::new();
for i in 0u64..500 {
let mut state = CrdtSignalState::new(lambda);
state.on_signal(ShardId(0), 1.0, now_ns);
snap_a.add_signal_state(EntityId::new(i), SignalTypeId::new(0), state);
}
for i in 500u64..1000 {
let mut state = CrdtSignalState::new(lambda);
state.on_signal(ShardId(1), 1.0, now_ns);
snap_b.add_signal_state(EntityId::new(i), SignalTypeId::new(0), state);
}
let schema = m8_schema();
let ledger = Arc::new(tidaldb::signals::SignalLedger::new(
schema,
Box::new(tidaldb::signals::NoopWalWriter),
));
let hard_negs = Arc::new(tidaldb::entities::HardNegIndex::new());
let engine = ReconciliationEngine::new(Arc::clone(&ledger), Arc::clone(&hard_negs));
let start = Instant::now();
let plan = engine.plan(&snap_a, &snap_b);
engine.apply(&plan).unwrap();
let elapsed = start.elapsed();
println!(
"Reconciliation of {} signal merges took {:?}",
plan.signal_merges.len(),
elapsed
);
assert_eq!(
plan.signal_merges.len(),
1000,
"plan should have 1000 signal merges (500 + 500 disjoint entities)"
);
assert!(
elapsed < Duration::from_millis(100),
"reconciliation took {:?}, must be < 100ms",
elapsed
);
}