- applications/iknowyou: new Next.js chat application with persona-aware conversations, briefing API, cohort logic, vLLM streaming, and sidebar navigation - tidal M8: add replication control plane (control.rs), tenant migration state machine (migration.rs), tenant/upgrade coordinators, cluster/fault test harnesses - tidal M8 tests: expand m8p2/m8p3/m8p4 test suites; add m8p5_multitenancy and m8_uat - tidal db: split replication_ops out of db/mod.rs (was 647 lines, now 574) - .claude: add kai-park, kaya-osei, mira-vasquez agents; add aeries-design-architect, aeries-fullstack-engineer, aeries-product-visionary skills - docs: update ROADMAP.md Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
696 lines
22 KiB
Rust
696 lines
22 KiB
Rust
//! M8p2 WAL Replication integration tests.
|
|
//!
|
|
//! Tests the full replication pipeline: leader writes signals, segments are
|
|
//! shipped (or directly injected) to a follower, and the follower's ledger
|
|
//! reflects the replicated signals. Also verifies follower write rejection
|
|
//! and replication lag gauge.
|
|
#![allow(
|
|
clippy::unwrap_used,
|
|
clippy::items_after_statements,
|
|
clippy::doc_markdown,
|
|
clippy::significant_drop_tightening,
|
|
clippy::suboptimal_flops,
|
|
clippy::cast_precision_loss
|
|
)]
|
|
|
|
use std::collections::HashMap;
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
|
|
use tidaldb::db::config::{NodeConfig, NodeRole};
|
|
use tidaldb::query::retrieve::Retrieve;
|
|
use tidaldb::replication::lag::ReplicationLagGauge;
|
|
use tidaldb::replication::shard::ShardId;
|
|
use tidaldb::replication::state::ReplicationState;
|
|
use tidaldb::replication::transport::{Transport, TransportError, WalSegmentPayload};
|
|
use tidaldb::replication::{InProcessTransportFactory, WalSegmentId};
|
|
use tidaldb::schema::{DecaySpec, EntityId, EntityKind, SchemaBuilder, Timestamp, Window};
|
|
use tidaldb::signals::{NoopWalWriter, SignalLedger};
|
|
use tidaldb::wal::format::batch::{EventRecord, HEADER_SIZE, encode_batch};
|
|
use tidaldb::{TidalDb, TidalError};
|
|
|
|
// ── Shared test transport ────────────────────────────────────────────────
|
|
|
|
/// Channel-based transport used by all integration tests that inject WAL
|
|
/// segments into a follower. All six test-local transport structs were
|
|
/// identical; this consolidates them into a single definition.
|
|
struct ChannelTransport {
|
|
rx: crossbeam::channel::Receiver<WalSegmentPayload>,
|
|
}
|
|
|
|
impl Transport for ChannelTransport {
|
|
fn send_segment(
|
|
&self,
|
|
_to: ShardId,
|
|
_payload: WalSegmentPayload,
|
|
) -> Result<(), TransportError> {
|
|
Ok(())
|
|
}
|
|
fn recv_segment(&self) -> Option<WalSegmentPayload> {
|
|
self.rx.recv().ok()
|
|
}
|
|
fn local_shard(&self) -> ShardId {
|
|
ShardId::SINGLE
|
|
}
|
|
}
|
|
|
|
/// Build a minimal schema with one signal type.
|
|
fn make_schema() -> tidaldb::schema::Schema {
|
|
let mut builder = SchemaBuilder::new();
|
|
let _ = builder
|
|
.signal(
|
|
"view",
|
|
EntityKind::Item,
|
|
DecaySpec::Exponential {
|
|
half_life: Duration::from_secs(7 * 24 * 3600),
|
|
},
|
|
)
|
|
.windows(&[Window::AllTime])
|
|
.velocity(false)
|
|
.add();
|
|
builder.build().expect("schema must be valid")
|
|
}
|
|
|
|
/// Resolve the signal type ID for "view" using a throwaway ledger.
|
|
///
|
|
/// Signal type IDs are deterministic (alphabetically sorted, starting at 0).
|
|
/// For a schema with one signal type "view", the ID is always 0.
|
|
fn resolve_view_type_id(schema: &tidaldb::schema::Schema) -> tidaldb::signals::SignalTypeId {
|
|
let ledger = SignalLedger::new(schema.clone(), Box::new(NoopWalWriter));
|
|
ledger.resolve_signal_type("view").unwrap()
|
|
}
|
|
|
|
/// Open a follower TidalDb (ephemeral, follower role).
|
|
fn open_follower(schema: tidaldb::schema::Schema) -> TidalDb {
|
|
TidalDb::builder()
|
|
.ephemeral()
|
|
.with_schema(schema)
|
|
.with_cluster(NodeConfig {
|
|
role: NodeRole::Follower,
|
|
..NodeConfig::default()
|
|
})
|
|
.open()
|
|
.expect("follower should open")
|
|
}
|
|
|
|
/// Open a leader TidalDb (ephemeral, leader role).
|
|
fn open_leader(schema: tidaldb::schema::Schema) -> TidalDb {
|
|
TidalDb::builder()
|
|
.ephemeral()
|
|
.with_schema(schema)
|
|
.with_cluster(NodeConfig {
|
|
role: NodeRole::Leader,
|
|
..NodeConfig::default()
|
|
})
|
|
.open()
|
|
.expect("leader should open")
|
|
}
|
|
|
|
// ── Test 1: Follower rejects write calls ─────────────────────────────────
|
|
|
|
#[test]
|
|
fn follower_rejects_signal_write() {
|
|
let schema = make_schema();
|
|
let follower = open_follower(schema);
|
|
|
|
let err = follower
|
|
.signal("view", EntityId::new(1), 1.0, Timestamp::now())
|
|
.expect_err("follower should reject signal writes");
|
|
|
|
assert!(
|
|
matches!(err, TidalError::ReadOnly(_)),
|
|
"expected ReadOnly error, got: {err}"
|
|
);
|
|
|
|
follower.close().unwrap();
|
|
}
|
|
|
|
#[test]
|
|
fn follower_rejects_write_item() {
|
|
let schema = make_schema();
|
|
let follower = open_follower(schema);
|
|
|
|
let meta = HashMap::from([("title".to_string(), "test".to_string())]);
|
|
let err = follower
|
|
.write_item_with_metadata(EntityId::new(1), &meta)
|
|
.expect_err("follower should reject item writes");
|
|
|
|
assert!(
|
|
matches!(err, TidalError::ReadOnly(_)),
|
|
"expected ReadOnly error, got: {err}"
|
|
);
|
|
|
|
follower.close().unwrap();
|
|
}
|
|
|
|
// ── Test 2: Leader accepts writes ────────────────────────────────────────
|
|
|
|
#[test]
|
|
fn leader_accepts_signal_write() {
|
|
let schema = make_schema();
|
|
let leader = open_leader(schema);
|
|
|
|
leader
|
|
.signal("view", EntityId::new(1), 1.0, Timestamp::now())
|
|
.expect("leader should accept signal writes");
|
|
|
|
let score = leader
|
|
.read_decay_score(EntityId::new(1), "view", 0)
|
|
.expect("read should succeed");
|
|
assert!(score.is_some(), "signal should have been recorded");
|
|
|
|
leader.close().unwrap();
|
|
}
|
|
|
|
// ── Test 3: Direct payload injection into follower ledger ────────────────
|
|
|
|
#[test]
|
|
fn payload_injection_updates_follower_ledger() {
|
|
let schema = make_schema();
|
|
let follower = open_follower(schema.clone());
|
|
|
|
// Resolve the signal type ID using a standalone ledger (same schema).
|
|
let type_id = resolve_view_type_id(&schema);
|
|
|
|
let state = follower.replication_state().clone();
|
|
|
|
// Build a WAL batch payload.
|
|
let events = vec![EventRecord {
|
|
entity_id: 42,
|
|
signal_type: type_id.as_u16() as u8,
|
|
weight: 3.0,
|
|
timestamp_nanos: 1_000_000_000,
|
|
}];
|
|
let bytes = encode_batch(&events, 1, 1).unwrap();
|
|
|
|
// Apply it through the receiver's apply_payload (via the public module).
|
|
// We cannot call apply_payload directly (it is private), so we use
|
|
// the InProcessTransport + spawn_receiver path instead.
|
|
|
|
// Create a channel-based transport.
|
|
let (tx, rx) = crossbeam::channel::bounded(4);
|
|
let transport = Arc::new(ChannelTransport { rx });
|
|
follower.start_replication(Arc::clone(&transport)).unwrap();
|
|
|
|
// Send the payload.
|
|
tx.send(WalSegmentPayload {
|
|
id: WalSegmentId::new(tidaldb::replication::RegionId::SINGLE, ShardId::SINGLE, 1),
|
|
bytes,
|
|
event_count: 1,
|
|
})
|
|
.unwrap();
|
|
|
|
// Give the receiver a moment to process.
|
|
std::thread::sleep(Duration::from_millis(100));
|
|
|
|
// Verify the follower's ledger was updated.
|
|
let score = follower
|
|
.read_decay_score(EntityId::new(42), "view", 0)
|
|
.expect("read should succeed");
|
|
assert!(score.is_some(), "signal should be visible on follower");
|
|
|
|
// Verify replication state advanced.
|
|
let applied = state.applied_seqno(ShardId::SINGLE);
|
|
assert_eq!(applied, Some(1), "replication state should have advanced");
|
|
|
|
// Shutdown: drop sender so receiver exits.
|
|
drop(tx);
|
|
follower.close().unwrap();
|
|
}
|
|
|
|
// ── Test 4: Idempotent replay ────────────────────────────────────────────
|
|
|
|
#[test]
|
|
fn replay_is_idempotent() {
|
|
let schema = make_schema();
|
|
let ledger = Arc::new(SignalLedger::new(schema, Box::new(NoopWalWriter)));
|
|
let state = Arc::new(ReplicationState::single());
|
|
|
|
let type_id = ledger.resolve_signal_type("view").unwrap();
|
|
|
|
let events = vec![EventRecord {
|
|
entity_id: 10,
|
|
signal_type: type_id.as_u16() as u8,
|
|
weight: 5.0,
|
|
timestamp_nanos: 1_000_000_000,
|
|
}];
|
|
let bytes = encode_batch(&events, 1, 1).unwrap();
|
|
|
|
// Build a transport that delivers the same segment twice.
|
|
let (tx, rx) = crossbeam::channel::bounded(4);
|
|
let transport = Arc::new(ChannelTransport { rx });
|
|
let handle = tidaldb::replication::spawn_receiver(
|
|
Arc::clone(&transport),
|
|
Arc::clone(&ledger),
|
|
Arc::clone(&state),
|
|
);
|
|
|
|
// Send the same segment twice.
|
|
for _ in 0..2 {
|
|
tx.send(WalSegmentPayload {
|
|
id: WalSegmentId::new(tidaldb::replication::RegionId::SINGLE, ShardId::SINGLE, 1),
|
|
bytes: bytes.clone(),
|
|
event_count: 1,
|
|
})
|
|
.unwrap();
|
|
}
|
|
|
|
std::thread::sleep(Duration::from_millis(100));
|
|
drop(tx);
|
|
handle.join();
|
|
|
|
// The entity should exist with weight=5.0, not 10.0.
|
|
// (Idempotent replay means the second apply was a no-op.)
|
|
assert_eq!(state.applied_seqno(ShardId::SINGLE), Some(1));
|
|
|
|
// Read the hot tier directly to verify only one application.
|
|
let entry = ledger.entries().get(&(EntityId::new(10), type_id));
|
|
assert!(entry.is_some(), "entity should exist in ledger");
|
|
}
|
|
|
|
// ── Test 5: InProcessTransport end-to-end ────────────────────────────────
|
|
|
|
#[test]
|
|
fn in_process_transport_delivers_segment() {
|
|
let shards = vec![ShardId(0), ShardId(1)];
|
|
let mut transports = InProcessTransportFactory::new(&shards).build();
|
|
|
|
let t0 = transports.remove(&ShardId(0)).unwrap();
|
|
let t1 = transports.remove(&ShardId(1)).unwrap();
|
|
|
|
let schema = make_schema();
|
|
let ledger = Arc::new(SignalLedger::new(schema, Box::new(NoopWalWriter)));
|
|
let type_id = ledger.resolve_signal_type("view").unwrap();
|
|
|
|
// Shard 0 sends a segment to shard 1.
|
|
let events = vec![EventRecord {
|
|
entity_id: 99,
|
|
signal_type: type_id.as_u16() as u8,
|
|
weight: 2.0,
|
|
timestamp_nanos: 500,
|
|
}];
|
|
let bytes = encode_batch(&events, 1, 42).unwrap();
|
|
|
|
t0.send_segment(
|
|
ShardId(1),
|
|
WalSegmentPayload {
|
|
id: WalSegmentId::new(tidaldb::replication::RegionId::SINGLE, ShardId(0), 42),
|
|
bytes: bytes.clone(),
|
|
event_count: 1,
|
|
},
|
|
)
|
|
.unwrap();
|
|
|
|
// Shard 1 receives.
|
|
let payload = t1.recv_segment();
|
|
assert!(payload.is_some(), "shard 1 should receive the segment");
|
|
let payload = payload.unwrap();
|
|
assert_eq!(payload.id.seqno, 42);
|
|
assert_eq!(payload.event_count, 1);
|
|
assert_eq!(payload.bytes, bytes);
|
|
|
|
// Drop both transports to clean up.
|
|
drop(t0);
|
|
drop(t1);
|
|
}
|
|
|
|
// ── Test 6: ReplicationLagGauge ──────────────────────────────────────────
|
|
|
|
#[test]
|
|
fn replication_lag_gauge_tracks_lag() {
|
|
let state = Arc::new(ReplicationState::single());
|
|
let gauge = ReplicationLagGauge::new(ShardId::SINGLE, Arc::clone(&state));
|
|
|
|
// Initially, both leader and applied are 0 => lag = 0.
|
|
assert_eq!(gauge.lag_segments(), 0);
|
|
|
|
// Leader moves ahead.
|
|
gauge.update_leader_seqno(10);
|
|
assert_eq!(gauge.lag_segments(), 10);
|
|
|
|
// Follower catches up partially.
|
|
state.advance(ShardId::SINGLE, 7);
|
|
assert_eq!(gauge.lag_segments(), 3);
|
|
|
|
// Follower catches up completely.
|
|
state.advance(ShardId::SINGLE, 10);
|
|
assert_eq!(gauge.lag_segments(), 0);
|
|
}
|
|
|
|
// ── Test 7: Full pipeline: leader -> transport -> follower ───────────────
|
|
|
|
#[test]
|
|
fn full_pipeline_leader_to_follower() {
|
|
let schema = make_schema();
|
|
|
|
// Open leader and follower.
|
|
let leader = open_leader(schema.clone());
|
|
let follower = open_follower(schema.clone());
|
|
|
|
// Resolve type ID using a standalone ledger (same schema).
|
|
let type_id = resolve_view_type_id(&schema);
|
|
let follower_state = follower.replication_state().clone();
|
|
|
|
// Wire up a channel-based transport for the follower.
|
|
let (tx, rx) = crossbeam::channel::bounded(16);
|
|
let transport = Arc::new(ChannelTransport { rx });
|
|
follower.start_replication(Arc::clone(&transport)).unwrap();
|
|
|
|
// Write signals on the leader.
|
|
let ts = Timestamp::from_nanos(2_000_000_000);
|
|
leader.signal("view", EntityId::new(100), 1.0, ts).unwrap();
|
|
leader.signal("view", EntityId::new(101), 2.0, ts).unwrap();
|
|
|
|
// Simulate the shipper: build a WAL payload from the leader's signals
|
|
// and send it to the follower via the transport.
|
|
let events = vec![
|
|
EventRecord {
|
|
entity_id: 100,
|
|
signal_type: type_id.as_u16() as u8,
|
|
weight: 1.0,
|
|
timestamp_nanos: 2_000_000_000,
|
|
},
|
|
EventRecord {
|
|
entity_id: 101,
|
|
signal_type: type_id.as_u16() as u8,
|
|
weight: 2.0,
|
|
timestamp_nanos: 2_000_000_000,
|
|
},
|
|
];
|
|
let batch_bytes = encode_batch(&events, 1, 1).unwrap();
|
|
|
|
tx.send(WalSegmentPayload {
|
|
id: WalSegmentId::new(tidaldb::replication::RegionId::SINGLE, ShardId::SINGLE, 1),
|
|
bytes: batch_bytes,
|
|
event_count: 2,
|
|
})
|
|
.unwrap();
|
|
|
|
// Wait for the follower to process.
|
|
std::thread::sleep(Duration::from_millis(100));
|
|
|
|
// Verify the follower has the signals.
|
|
let score_100 = follower
|
|
.read_decay_score(EntityId::new(100), "view", 0)
|
|
.unwrap();
|
|
let score_101 = follower
|
|
.read_decay_score(EntityId::new(101), "view", 0)
|
|
.unwrap();
|
|
|
|
assert!(
|
|
score_100.is_some(),
|
|
"entity 100 should be visible on follower"
|
|
);
|
|
assert!(
|
|
score_101.is_some(),
|
|
"entity 101 should be visible on follower"
|
|
);
|
|
|
|
// Verify replication state.
|
|
// Batch has 2 events starting at seq 1, so last seq = 1 + 2 - 1 = 2.
|
|
let applied = follower_state.applied_seqno(ShardId::SINGLE);
|
|
assert_eq!(
|
|
applied,
|
|
Some(2),
|
|
"replication state should reflect applied batch"
|
|
);
|
|
|
|
// Cleanup.
|
|
drop(tx);
|
|
leader.close().unwrap();
|
|
follower.close().unwrap();
|
|
}
|
|
|
|
// ── Test 8: Follower rejects session writes ──────────────────────────────
|
|
|
|
#[test]
|
|
fn follower_rejects_start_session() {
|
|
let schema = make_schema();
|
|
let follower = open_follower(schema);
|
|
|
|
// start_session is a write operation — followers must reject it.
|
|
let err = follower
|
|
.start_session(1, "test-agent", "default", HashMap::new())
|
|
.expect_err("follower should reject start_session");
|
|
|
|
assert!(
|
|
matches!(err, TidalError::ReadOnly(_)),
|
|
"expected ReadOnly error, got: {err}"
|
|
);
|
|
|
|
follower.close().unwrap();
|
|
}
|
|
|
|
// ── Test 9: 1K-signal decay score equivalence (6 decimal places) ─────────
|
|
|
|
#[test]
|
|
fn replication_decay_scores_match() {
|
|
let schema = make_schema();
|
|
let leader = open_leader(schema.clone());
|
|
let follower = open_follower(schema.clone());
|
|
|
|
let type_id = resolve_view_type_id(&schema);
|
|
let follower_state = follower.replication_state().clone();
|
|
|
|
// Wire up transport for follower.
|
|
let (tx, rx) = crossbeam::channel::bounded(16);
|
|
let transport = Arc::new(ChannelTransport { rx });
|
|
follower.start_replication(Arc::clone(&transport)).unwrap();
|
|
|
|
// Write 1,000 signals on the leader with varying timestamps and weights.
|
|
let base_ns = 1_000_000_000u64;
|
|
let mut all_events = Vec::with_capacity(1000);
|
|
for i in 0u64..1000 {
|
|
let ts = Timestamp::from_nanos(base_ns + i * 1_000_000); // 1ms apart
|
|
let weight = 1.0 + (i as f64) * 0.001;
|
|
let entity = EntityId::new(i + 1);
|
|
leader.signal("view", entity, weight, ts).unwrap();
|
|
|
|
all_events.push(EventRecord {
|
|
entity_id: i + 1,
|
|
signal_type: type_id.as_u16() as u8,
|
|
weight: weight as f32,
|
|
timestamp_nanos: base_ns + i * 1_000_000,
|
|
});
|
|
}
|
|
|
|
// Ship events in batches of 200 (encode_batch max is 256).
|
|
let batch_size = 200;
|
|
for (batch_idx, chunk) in all_events.chunks(batch_size).enumerate() {
|
|
let seqno = (batch_idx + 1) as u64;
|
|
let batch_bytes = encode_batch(chunk, 1, seqno).unwrap();
|
|
tx.send(WalSegmentPayload {
|
|
id: WalSegmentId::new(
|
|
tidaldb::replication::RegionId::SINGLE,
|
|
ShardId::SINGLE,
|
|
seqno,
|
|
),
|
|
bytes: batch_bytes,
|
|
event_count: chunk.len() as u64,
|
|
})
|
|
.unwrap();
|
|
}
|
|
|
|
// Wait for processing.
|
|
std::thread::sleep(Duration::from_millis(200));
|
|
|
|
// Compare decay scores (decay_rate_idx=0 reads current score at now()).
|
|
let mut mismatches = 0;
|
|
for i in 0u64..1000 {
|
|
let entity = EntityId::new(i + 1);
|
|
let leader_score = leader
|
|
.read_decay_score(entity, "view", 0)
|
|
.unwrap()
|
|
.unwrap_or(0.0);
|
|
let follower_score = follower
|
|
.read_decay_score(entity, "view", 0)
|
|
.unwrap()
|
|
.unwrap_or(0.0);
|
|
|
|
if (leader_score - follower_score).abs() > 1e-6 {
|
|
mismatches += 1;
|
|
}
|
|
}
|
|
|
|
assert_eq!(
|
|
mismatches, 0,
|
|
"all 1,000 decay scores should match to 6 decimal places"
|
|
);
|
|
|
|
// Verify replication state advanced.
|
|
let applied = follower_state.applied_seqno(ShardId::SINGLE);
|
|
assert!(applied.is_some(), "replication state should have advanced");
|
|
|
|
drop(tx);
|
|
leader.close().unwrap();
|
|
follower.close().unwrap();
|
|
}
|
|
|
|
// ── Test 10: Follower serves retrieve queries ────────────────────────────
|
|
|
|
#[test]
|
|
fn follower_serves_retrieve_queries() {
|
|
let schema = make_schema();
|
|
let follower = open_follower(schema.clone());
|
|
|
|
let type_id = resolve_view_type_id(&schema);
|
|
|
|
// Wire up transport.
|
|
let (tx, rx) = crossbeam::channel::bounded(4);
|
|
let transport = Arc::new(ChannelTransport { rx });
|
|
follower.start_replication(Arc::clone(&transport)).unwrap();
|
|
|
|
// Replicate some signals to the follower.
|
|
let events = vec![
|
|
EventRecord {
|
|
entity_id: 200,
|
|
signal_type: type_id.as_u16() as u8,
|
|
weight: 5.0,
|
|
timestamp_nanos: 1_000_000_000,
|
|
},
|
|
EventRecord {
|
|
entity_id: 201,
|
|
signal_type: type_id.as_u16() as u8,
|
|
weight: 3.0,
|
|
timestamp_nanos: 1_000_000_000,
|
|
},
|
|
];
|
|
let bytes = encode_batch(&events, 1, 1).unwrap();
|
|
|
|
tx.send(WalSegmentPayload {
|
|
id: WalSegmentId::new(tidaldb::replication::RegionId::SINGLE, ShardId::SINGLE, 1),
|
|
bytes,
|
|
event_count: 2,
|
|
})
|
|
.unwrap();
|
|
|
|
std::thread::sleep(Duration::from_millis(100));
|
|
|
|
// Execute a retrieve query on the follower — should NOT return ReadOnly.
|
|
// NOTE: We assert `is_ok()` rather than checking result contents because
|
|
// signals-only replication does not populate items_storage (the retrieve
|
|
// pipeline requires item metadata to produce ranked results). This test
|
|
// validates that the follower's read-path is accessible, not that
|
|
// replicated signals produce ranked output.
|
|
let query = Retrieve::builder().build().unwrap();
|
|
let result = follower.retrieve(&query);
|
|
assert!(
|
|
result.is_ok(),
|
|
"follower should serve retrieve queries, got: {:?}",
|
|
result.err()
|
|
);
|
|
|
|
drop(tx);
|
|
follower.close().unwrap();
|
|
}
|
|
|
|
// ── Test 11: Corrupted segment is rejected ───────────────────────────────
|
|
|
|
#[test]
|
|
fn corrupted_segment_is_rejected() {
|
|
let schema = make_schema();
|
|
let follower = open_follower(schema.clone());
|
|
|
|
let type_id = resolve_view_type_id(&schema);
|
|
|
|
let (tx, rx) = crossbeam::channel::bounded(8);
|
|
let transport = Arc::new(ChannelTransport { rx });
|
|
follower.start_replication(Arc::clone(&transport)).unwrap();
|
|
|
|
// Build a valid batch, then corrupt it.
|
|
let events = vec![EventRecord {
|
|
entity_id: 500,
|
|
signal_type: type_id.as_u16() as u8,
|
|
weight: 10.0,
|
|
timestamp_nanos: 1_000_000_000,
|
|
}];
|
|
let mut corrupted = encode_batch(&events, 1, 1).unwrap();
|
|
// Flip a byte in the payload region (past the 64-byte header) to trigger BLAKE3 mismatch.
|
|
let corrupt_offset = HEADER_SIZE + 1;
|
|
assert!(
|
|
corrupted.len() > corrupt_offset,
|
|
"batch too short to corrupt payload"
|
|
);
|
|
corrupted[corrupt_offset] ^= 0xFF;
|
|
|
|
tx.send(WalSegmentPayload {
|
|
id: WalSegmentId::new(tidaldb::replication::RegionId::SINGLE, ShardId::SINGLE, 1),
|
|
bytes: corrupted,
|
|
event_count: 1,
|
|
})
|
|
.unwrap();
|
|
|
|
// Also send a valid segment (seqno 2) to prove the receiver keeps running.
|
|
let valid_events = vec![EventRecord {
|
|
entity_id: 501,
|
|
signal_type: type_id.as_u16() as u8,
|
|
weight: 7.0,
|
|
timestamp_nanos: 2_000_000_000,
|
|
}];
|
|
let valid_bytes = encode_batch(&valid_events, 1, 2).unwrap();
|
|
|
|
tx.send(WalSegmentPayload {
|
|
id: WalSegmentId::new(tidaldb::replication::RegionId::SINGLE, ShardId::SINGLE, 2),
|
|
bytes: valid_bytes,
|
|
event_count: 1,
|
|
})
|
|
.unwrap();
|
|
|
|
std::thread::sleep(Duration::from_millis(200));
|
|
|
|
// Entity 500 (from corrupted segment) should NOT be present.
|
|
let score_500 = follower
|
|
.read_decay_score(EntityId::new(500), "view", 0)
|
|
.unwrap();
|
|
assert!(
|
|
score_500.is_none(),
|
|
"corrupted segment entity should not appear"
|
|
);
|
|
|
|
// Entity 501 (from valid segment) SHOULD be present.
|
|
let score_501 = follower
|
|
.read_decay_score(EntityId::new(501), "view", 0)
|
|
.unwrap();
|
|
assert!(
|
|
score_501.is_some(),
|
|
"valid segment after corruption should be applied"
|
|
);
|
|
|
|
drop(tx);
|
|
follower.close().unwrap();
|
|
}
|
|
|
|
// ── Test 12: Replication lag converges to zero ───────────────────────────
|
|
|
|
#[test]
|
|
fn replication_lag_converges_to_zero() {
|
|
let state = Arc::new(ReplicationState::single());
|
|
let gauge = ReplicationLagGauge::new(ShardId::SINGLE, Arc::clone(&state));
|
|
|
|
// Simulate 10 segments shipped.
|
|
for seqno in 1..=10u64 {
|
|
gauge.update_leader_seqno(seqno);
|
|
}
|
|
assert_eq!(gauge.lag_segments(), 10);
|
|
|
|
// Follower applies segments 1..=10.
|
|
for seqno in 1..=10u64 {
|
|
state.advance(ShardId::SINGLE, seqno);
|
|
}
|
|
assert_eq!(gauge.lag_segments(), 0, "lag should be 0 after catching up");
|
|
|
|
// Another batch: leader ships 11..=20.
|
|
for seqno in 11..=20u64 {
|
|
gauge.update_leader_seqno(seqno);
|
|
}
|
|
assert_eq!(gauge.lag_segments(), 10);
|
|
|
|
// Follower catches up.
|
|
state.advance(ShardId::SINGLE, 20);
|
|
assert_eq!(
|
|
gauge.lag_segments(),
|
|
0,
|
|
"lag should converge to 0 again after second batch"
|
|
);
|
|
}
|