tidaldb/tidal/tests/m8p2_replication.rs
jordan 98bdc18a49 feat: add iknowyou app + complete M8 replication extensions + Aeries agents/skills
- applications/iknowyou: new Next.js chat application with persona-aware conversations,
  briefing API, cohort logic, vLLM streaming, and sidebar navigation
- tidal M8: add replication control plane (control.rs), tenant migration state machine
  (migration.rs), tenant/upgrade coordinators, cluster/fault test harnesses
- tidal M8 tests: expand m8p2/m8p3/m8p4 test suites; add m8p5_multitenancy and m8_uat
- tidal db: split replication_ops out of db/mod.rs (was 647 lines, now 574)
- .claude: add kai-park, kaya-osei, mira-vasquez agents; add aeries-design-architect,
  aeries-fullstack-engineer, aeries-product-visionary skills
- docs: update ROADMAP.md

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-24 21:09:11 -07:00

696 lines
22 KiB
Rust

//! M8p2 WAL Replication integration tests.
//!
//! Tests the full replication pipeline: leader writes signals, segments are
//! shipped (or directly injected) to a follower, and the follower's ledger
//! reflects the replicated signals. Also verifies follower write rejection
//! and replication lag gauge.
#![allow(
clippy::unwrap_used,
clippy::items_after_statements,
clippy::doc_markdown,
clippy::significant_drop_tightening,
clippy::suboptimal_flops,
clippy::cast_precision_loss
)]
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tidaldb::db::config::{NodeConfig, NodeRole};
use tidaldb::query::retrieve::Retrieve;
use tidaldb::replication::lag::ReplicationLagGauge;
use tidaldb::replication::shard::ShardId;
use tidaldb::replication::state::ReplicationState;
use tidaldb::replication::transport::{Transport, TransportError, WalSegmentPayload};
use tidaldb::replication::{InProcessTransportFactory, WalSegmentId};
use tidaldb::schema::{DecaySpec, EntityId, EntityKind, SchemaBuilder, Timestamp, Window};
use tidaldb::signals::{NoopWalWriter, SignalLedger};
use tidaldb::wal::format::batch::{EventRecord, HEADER_SIZE, encode_batch};
use tidaldb::{TidalDb, TidalError};
// ── Shared test transport ────────────────────────────────────────────────
/// Channel-based transport used by all integration tests that inject WAL
/// segments into a follower. All six test-local transport structs were
/// identical; this consolidates them into a single definition.
struct ChannelTransport {
rx: crossbeam::channel::Receiver<WalSegmentPayload>,
}
impl Transport for ChannelTransport {
fn send_segment(
&self,
_to: ShardId,
_payload: WalSegmentPayload,
) -> Result<(), TransportError> {
Ok(())
}
fn recv_segment(&self) -> Option<WalSegmentPayload> {
self.rx.recv().ok()
}
fn local_shard(&self) -> ShardId {
ShardId::SINGLE
}
}
/// Build a minimal schema with one signal type.
fn make_schema() -> tidaldb::schema::Schema {
let mut builder = SchemaBuilder::new();
let _ = builder
.signal(
"view",
EntityKind::Item,
DecaySpec::Exponential {
half_life: Duration::from_secs(7 * 24 * 3600),
},
)
.windows(&[Window::AllTime])
.velocity(false)
.add();
builder.build().expect("schema must be valid")
}
/// Resolve the signal type ID for "view" using a throwaway ledger.
///
/// Signal type IDs are deterministic (alphabetically sorted, starting at 0).
/// For a schema with one signal type "view", the ID is always 0.
fn resolve_view_type_id(schema: &tidaldb::schema::Schema) -> tidaldb::signals::SignalTypeId {
let ledger = SignalLedger::new(schema.clone(), Box::new(NoopWalWriter));
ledger.resolve_signal_type("view").unwrap()
}
/// Open a follower TidalDb (ephemeral, follower role).
fn open_follower(schema: tidaldb::schema::Schema) -> TidalDb {
TidalDb::builder()
.ephemeral()
.with_schema(schema)
.with_cluster(NodeConfig {
role: NodeRole::Follower,
..NodeConfig::default()
})
.open()
.expect("follower should open")
}
/// Open a leader TidalDb (ephemeral, leader role).
fn open_leader(schema: tidaldb::schema::Schema) -> TidalDb {
TidalDb::builder()
.ephemeral()
.with_schema(schema)
.with_cluster(NodeConfig {
role: NodeRole::Leader,
..NodeConfig::default()
})
.open()
.expect("leader should open")
}
// ── Test 1: Follower rejects write calls ─────────────────────────────────
#[test]
fn follower_rejects_signal_write() {
let schema = make_schema();
let follower = open_follower(schema);
let err = follower
.signal("view", EntityId::new(1), 1.0, Timestamp::now())
.expect_err("follower should reject signal writes");
assert!(
matches!(err, TidalError::ReadOnly(_)),
"expected ReadOnly error, got: {err}"
);
follower.close().unwrap();
}
#[test]
fn follower_rejects_write_item() {
let schema = make_schema();
let follower = open_follower(schema);
let meta = HashMap::from([("title".to_string(), "test".to_string())]);
let err = follower
.write_item_with_metadata(EntityId::new(1), &meta)
.expect_err("follower should reject item writes");
assert!(
matches!(err, TidalError::ReadOnly(_)),
"expected ReadOnly error, got: {err}"
);
follower.close().unwrap();
}
// ── Test 2: Leader accepts writes ────────────────────────────────────────
#[test]
fn leader_accepts_signal_write() {
let schema = make_schema();
let leader = open_leader(schema);
leader
.signal("view", EntityId::new(1), 1.0, Timestamp::now())
.expect("leader should accept signal writes");
let score = leader
.read_decay_score(EntityId::new(1), "view", 0)
.expect("read should succeed");
assert!(score.is_some(), "signal should have been recorded");
leader.close().unwrap();
}
// ── Test 3: Direct payload injection into follower ledger ────────────────
#[test]
fn payload_injection_updates_follower_ledger() {
let schema = make_schema();
let follower = open_follower(schema.clone());
// Resolve the signal type ID using a standalone ledger (same schema).
let type_id = resolve_view_type_id(&schema);
let state = follower.replication_state().clone();
// Build a WAL batch payload.
let events = vec![EventRecord {
entity_id: 42,
signal_type: type_id.as_u16() as u8,
weight: 3.0,
timestamp_nanos: 1_000_000_000,
}];
let bytes = encode_batch(&events, 1, 1).unwrap();
// Apply it through the receiver's apply_payload (via the public module).
// We cannot call apply_payload directly (it is private), so we use
// the InProcessTransport + spawn_receiver path instead.
// Create a channel-based transport.
let (tx, rx) = crossbeam::channel::bounded(4);
let transport = Arc::new(ChannelTransport { rx });
follower.start_replication(Arc::clone(&transport)).unwrap();
// Send the payload.
tx.send(WalSegmentPayload {
id: WalSegmentId::new(tidaldb::replication::RegionId::SINGLE, ShardId::SINGLE, 1),
bytes,
event_count: 1,
})
.unwrap();
// Give the receiver a moment to process.
std::thread::sleep(Duration::from_millis(100));
// Verify the follower's ledger was updated.
let score = follower
.read_decay_score(EntityId::new(42), "view", 0)
.expect("read should succeed");
assert!(score.is_some(), "signal should be visible on follower");
// Verify replication state advanced.
let applied = state.applied_seqno(ShardId::SINGLE);
assert_eq!(applied, Some(1), "replication state should have advanced");
// Shutdown: drop sender so receiver exits.
drop(tx);
follower.close().unwrap();
}
// ── Test 4: Idempotent replay ────────────────────────────────────────────
#[test]
fn replay_is_idempotent() {
let schema = make_schema();
let ledger = Arc::new(SignalLedger::new(schema, Box::new(NoopWalWriter)));
let state = Arc::new(ReplicationState::single());
let type_id = ledger.resolve_signal_type("view").unwrap();
let events = vec![EventRecord {
entity_id: 10,
signal_type: type_id.as_u16() as u8,
weight: 5.0,
timestamp_nanos: 1_000_000_000,
}];
let bytes = encode_batch(&events, 1, 1).unwrap();
// Build a transport that delivers the same segment twice.
let (tx, rx) = crossbeam::channel::bounded(4);
let transport = Arc::new(ChannelTransport { rx });
let handle = tidaldb::replication::spawn_receiver(
Arc::clone(&transport),
Arc::clone(&ledger),
Arc::clone(&state),
);
// Send the same segment twice.
for _ in 0..2 {
tx.send(WalSegmentPayload {
id: WalSegmentId::new(tidaldb::replication::RegionId::SINGLE, ShardId::SINGLE, 1),
bytes: bytes.clone(),
event_count: 1,
})
.unwrap();
}
std::thread::sleep(Duration::from_millis(100));
drop(tx);
handle.join();
// The entity should exist with weight=5.0, not 10.0.
// (Idempotent replay means the second apply was a no-op.)
assert_eq!(state.applied_seqno(ShardId::SINGLE), Some(1));
// Read the hot tier directly to verify only one application.
let entry = ledger.entries().get(&(EntityId::new(10), type_id));
assert!(entry.is_some(), "entity should exist in ledger");
}
// ── Test 5: InProcessTransport end-to-end ────────────────────────────────
#[test]
fn in_process_transport_delivers_segment() {
let shards = vec![ShardId(0), ShardId(1)];
let mut transports = InProcessTransportFactory::new(&shards).build();
let t0 = transports.remove(&ShardId(0)).unwrap();
let t1 = transports.remove(&ShardId(1)).unwrap();
let schema = make_schema();
let ledger = Arc::new(SignalLedger::new(schema, Box::new(NoopWalWriter)));
let type_id = ledger.resolve_signal_type("view").unwrap();
// Shard 0 sends a segment to shard 1.
let events = vec![EventRecord {
entity_id: 99,
signal_type: type_id.as_u16() as u8,
weight: 2.0,
timestamp_nanos: 500,
}];
let bytes = encode_batch(&events, 1, 42).unwrap();
t0.send_segment(
ShardId(1),
WalSegmentPayload {
id: WalSegmentId::new(tidaldb::replication::RegionId::SINGLE, ShardId(0), 42),
bytes: bytes.clone(),
event_count: 1,
},
)
.unwrap();
// Shard 1 receives.
let payload = t1.recv_segment();
assert!(payload.is_some(), "shard 1 should receive the segment");
let payload = payload.unwrap();
assert_eq!(payload.id.seqno, 42);
assert_eq!(payload.event_count, 1);
assert_eq!(payload.bytes, bytes);
// Drop both transports to clean up.
drop(t0);
drop(t1);
}
// ── Test 6: ReplicationLagGauge ──────────────────────────────────────────
#[test]
fn replication_lag_gauge_tracks_lag() {
let state = Arc::new(ReplicationState::single());
let gauge = ReplicationLagGauge::new(ShardId::SINGLE, Arc::clone(&state));
// Initially, both leader and applied are 0 => lag = 0.
assert_eq!(gauge.lag_segments(), 0);
// Leader moves ahead.
gauge.update_leader_seqno(10);
assert_eq!(gauge.lag_segments(), 10);
// Follower catches up partially.
state.advance(ShardId::SINGLE, 7);
assert_eq!(gauge.lag_segments(), 3);
// Follower catches up completely.
state.advance(ShardId::SINGLE, 10);
assert_eq!(gauge.lag_segments(), 0);
}
// ── Test 7: Full pipeline: leader -> transport -> follower ───────────────
#[test]
fn full_pipeline_leader_to_follower() {
let schema = make_schema();
// Open leader and follower.
let leader = open_leader(schema.clone());
let follower = open_follower(schema.clone());
// Resolve type ID using a standalone ledger (same schema).
let type_id = resolve_view_type_id(&schema);
let follower_state = follower.replication_state().clone();
// Wire up a channel-based transport for the follower.
let (tx, rx) = crossbeam::channel::bounded(16);
let transport = Arc::new(ChannelTransport { rx });
follower.start_replication(Arc::clone(&transport)).unwrap();
// Write signals on the leader.
let ts = Timestamp::from_nanos(2_000_000_000);
leader.signal("view", EntityId::new(100), 1.0, ts).unwrap();
leader.signal("view", EntityId::new(101), 2.0, ts).unwrap();
// Simulate the shipper: build a WAL payload from the leader's signals
// and send it to the follower via the transport.
let events = vec![
EventRecord {
entity_id: 100,
signal_type: type_id.as_u16() as u8,
weight: 1.0,
timestamp_nanos: 2_000_000_000,
},
EventRecord {
entity_id: 101,
signal_type: type_id.as_u16() as u8,
weight: 2.0,
timestamp_nanos: 2_000_000_000,
},
];
let batch_bytes = encode_batch(&events, 1, 1).unwrap();
tx.send(WalSegmentPayload {
id: WalSegmentId::new(tidaldb::replication::RegionId::SINGLE, ShardId::SINGLE, 1),
bytes: batch_bytes,
event_count: 2,
})
.unwrap();
// Wait for the follower to process.
std::thread::sleep(Duration::from_millis(100));
// Verify the follower has the signals.
let score_100 = follower
.read_decay_score(EntityId::new(100), "view", 0)
.unwrap();
let score_101 = follower
.read_decay_score(EntityId::new(101), "view", 0)
.unwrap();
assert!(
score_100.is_some(),
"entity 100 should be visible on follower"
);
assert!(
score_101.is_some(),
"entity 101 should be visible on follower"
);
// Verify replication state.
// Batch has 2 events starting at seq 1, so last seq = 1 + 2 - 1 = 2.
let applied = follower_state.applied_seqno(ShardId::SINGLE);
assert_eq!(
applied,
Some(2),
"replication state should reflect applied batch"
);
// Cleanup.
drop(tx);
leader.close().unwrap();
follower.close().unwrap();
}
// ── Test 8: Follower rejects session writes ──────────────────────────────
#[test]
fn follower_rejects_start_session() {
let schema = make_schema();
let follower = open_follower(schema);
// start_session is a write operation — followers must reject it.
let err = follower
.start_session(1, "test-agent", "default", HashMap::new())
.expect_err("follower should reject start_session");
assert!(
matches!(err, TidalError::ReadOnly(_)),
"expected ReadOnly error, got: {err}"
);
follower.close().unwrap();
}
// ── Test 9: 1K-signal decay score equivalence (6 decimal places) ─────────
#[test]
fn replication_decay_scores_match() {
let schema = make_schema();
let leader = open_leader(schema.clone());
let follower = open_follower(schema.clone());
let type_id = resolve_view_type_id(&schema);
let follower_state = follower.replication_state().clone();
// Wire up transport for follower.
let (tx, rx) = crossbeam::channel::bounded(16);
let transport = Arc::new(ChannelTransport { rx });
follower.start_replication(Arc::clone(&transport)).unwrap();
// Write 1,000 signals on the leader with varying timestamps and weights.
let base_ns = 1_000_000_000u64;
let mut all_events = Vec::with_capacity(1000);
for i in 0u64..1000 {
let ts = Timestamp::from_nanos(base_ns + i * 1_000_000); // 1ms apart
let weight = 1.0 + (i as f64) * 0.001;
let entity = EntityId::new(i + 1);
leader.signal("view", entity, weight, ts).unwrap();
all_events.push(EventRecord {
entity_id: i + 1,
signal_type: type_id.as_u16() as u8,
weight: weight as f32,
timestamp_nanos: base_ns + i * 1_000_000,
});
}
// Ship events in batches of 200 (encode_batch max is 256).
let batch_size = 200;
for (batch_idx, chunk) in all_events.chunks(batch_size).enumerate() {
let seqno = (batch_idx + 1) as u64;
let batch_bytes = encode_batch(chunk, 1, seqno).unwrap();
tx.send(WalSegmentPayload {
id: WalSegmentId::new(
tidaldb::replication::RegionId::SINGLE,
ShardId::SINGLE,
seqno,
),
bytes: batch_bytes,
event_count: chunk.len() as u64,
})
.unwrap();
}
// Wait for processing.
std::thread::sleep(Duration::from_millis(200));
// Compare decay scores (decay_rate_idx=0 reads current score at now()).
let mut mismatches = 0;
for i in 0u64..1000 {
let entity = EntityId::new(i + 1);
let leader_score = leader
.read_decay_score(entity, "view", 0)
.unwrap()
.unwrap_or(0.0);
let follower_score = follower
.read_decay_score(entity, "view", 0)
.unwrap()
.unwrap_or(0.0);
if (leader_score - follower_score).abs() > 1e-6 {
mismatches += 1;
}
}
assert_eq!(
mismatches, 0,
"all 1,000 decay scores should match to 6 decimal places"
);
// Verify replication state advanced.
let applied = follower_state.applied_seqno(ShardId::SINGLE);
assert!(applied.is_some(), "replication state should have advanced");
drop(tx);
leader.close().unwrap();
follower.close().unwrap();
}
// ── Test 10: Follower serves retrieve queries ────────────────────────────
#[test]
fn follower_serves_retrieve_queries() {
let schema = make_schema();
let follower = open_follower(schema.clone());
let type_id = resolve_view_type_id(&schema);
// Wire up transport.
let (tx, rx) = crossbeam::channel::bounded(4);
let transport = Arc::new(ChannelTransport { rx });
follower.start_replication(Arc::clone(&transport)).unwrap();
// Replicate some signals to the follower.
let events = vec![
EventRecord {
entity_id: 200,
signal_type: type_id.as_u16() as u8,
weight: 5.0,
timestamp_nanos: 1_000_000_000,
},
EventRecord {
entity_id: 201,
signal_type: type_id.as_u16() as u8,
weight: 3.0,
timestamp_nanos: 1_000_000_000,
},
];
let bytes = encode_batch(&events, 1, 1).unwrap();
tx.send(WalSegmentPayload {
id: WalSegmentId::new(tidaldb::replication::RegionId::SINGLE, ShardId::SINGLE, 1),
bytes,
event_count: 2,
})
.unwrap();
std::thread::sleep(Duration::from_millis(100));
// Execute a retrieve query on the follower — should NOT return ReadOnly.
// NOTE: We assert `is_ok()` rather than checking result contents because
// signals-only replication does not populate items_storage (the retrieve
// pipeline requires item metadata to produce ranked results). This test
// validates that the follower's read-path is accessible, not that
// replicated signals produce ranked output.
let query = Retrieve::builder().build().unwrap();
let result = follower.retrieve(&query);
assert!(
result.is_ok(),
"follower should serve retrieve queries, got: {:?}",
result.err()
);
drop(tx);
follower.close().unwrap();
}
// ── Test 11: Corrupted segment is rejected ───────────────────────────────
#[test]
fn corrupted_segment_is_rejected() {
let schema = make_schema();
let follower = open_follower(schema.clone());
let type_id = resolve_view_type_id(&schema);
let (tx, rx) = crossbeam::channel::bounded(8);
let transport = Arc::new(ChannelTransport { rx });
follower.start_replication(Arc::clone(&transport)).unwrap();
// Build a valid batch, then corrupt it.
let events = vec![EventRecord {
entity_id: 500,
signal_type: type_id.as_u16() as u8,
weight: 10.0,
timestamp_nanos: 1_000_000_000,
}];
let mut corrupted = encode_batch(&events, 1, 1).unwrap();
// Flip a byte in the payload region (past the 64-byte header) to trigger BLAKE3 mismatch.
let corrupt_offset = HEADER_SIZE + 1;
assert!(
corrupted.len() > corrupt_offset,
"batch too short to corrupt payload"
);
corrupted[corrupt_offset] ^= 0xFF;
tx.send(WalSegmentPayload {
id: WalSegmentId::new(tidaldb::replication::RegionId::SINGLE, ShardId::SINGLE, 1),
bytes: corrupted,
event_count: 1,
})
.unwrap();
// Also send a valid segment (seqno 2) to prove the receiver keeps running.
let valid_events = vec![EventRecord {
entity_id: 501,
signal_type: type_id.as_u16() as u8,
weight: 7.0,
timestamp_nanos: 2_000_000_000,
}];
let valid_bytes = encode_batch(&valid_events, 1, 2).unwrap();
tx.send(WalSegmentPayload {
id: WalSegmentId::new(tidaldb::replication::RegionId::SINGLE, ShardId::SINGLE, 2),
bytes: valid_bytes,
event_count: 1,
})
.unwrap();
std::thread::sleep(Duration::from_millis(200));
// Entity 500 (from corrupted segment) should NOT be present.
let score_500 = follower
.read_decay_score(EntityId::new(500), "view", 0)
.unwrap();
assert!(
score_500.is_none(),
"corrupted segment entity should not appear"
);
// Entity 501 (from valid segment) SHOULD be present.
let score_501 = follower
.read_decay_score(EntityId::new(501), "view", 0)
.unwrap();
assert!(
score_501.is_some(),
"valid segment after corruption should be applied"
);
drop(tx);
follower.close().unwrap();
}
// ── Test 12: Replication lag converges to zero ───────────────────────────
#[test]
fn replication_lag_converges_to_zero() {
let state = Arc::new(ReplicationState::single());
let gauge = ReplicationLagGauge::new(ShardId::SINGLE, Arc::clone(&state));
// Simulate 10 segments shipped.
for seqno in 1..=10u64 {
gauge.update_leader_seqno(seqno);
}
assert_eq!(gauge.lag_segments(), 10);
// Follower applies segments 1..=10.
for seqno in 1..=10u64 {
state.advance(ShardId::SINGLE, seqno);
}
assert_eq!(gauge.lag_segments(), 0, "lag should be 0 after catching up");
// Another batch: leader ships 11..=20.
for seqno in 11..=20u64 {
gauge.update_leader_seqno(seqno);
}
assert_eq!(gauge.lag_segments(), 10);
// Follower catches up.
state.advance(ShardId::SINGLE, 20);
assert_eq!(
gauge.lag_segments(),
0,
"lag should converge to 0 again after second batch"
);
}