//! M8p2 WAL Replication integration tests. //! //! Tests the full replication pipeline: leader writes signals, segments are //! shipped (or directly injected) to a follower, and the follower's ledger //! reflects the replicated signals. Also verifies follower write rejection //! and replication lag gauge. #![allow( clippy::unwrap_used, clippy::items_after_statements, clippy::doc_markdown, clippy::significant_drop_tightening, clippy::suboptimal_flops, clippy::cast_precision_loss )] use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use tidaldb::db::config::{NodeConfig, NodeRole}; use tidaldb::query::retrieve::Retrieve; use tidaldb::replication::lag::ReplicationLagGauge; use tidaldb::replication::shard::ShardId; use tidaldb::replication::state::ReplicationState; use tidaldb::replication::transport::{Transport, TransportError, WalSegmentPayload}; use tidaldb::replication::{InProcessTransportFactory, WalSegmentId}; use tidaldb::schema::{DecaySpec, EntityId, EntityKind, SchemaBuilder, Timestamp, Window}; use tidaldb::signals::{NoopWalWriter, SignalLedger}; use tidaldb::wal::format::batch::{EventRecord, HEADER_SIZE, encode_batch}; use tidaldb::{TidalDb, TidalError}; // ── Shared test transport ──────────────────────────────────────────────── /// Channel-based transport used by all integration tests that inject WAL /// segments into a follower. All six test-local transport structs were /// identical; this consolidates them into a single definition. struct ChannelTransport { rx: crossbeam::channel::Receiver, } impl Transport for ChannelTransport { fn send_segment( &self, _to: ShardId, _payload: WalSegmentPayload, ) -> Result<(), TransportError> { Ok(()) } fn recv_segment(&self) -> Option { self.rx.recv().ok() } fn local_shard(&self) -> ShardId { ShardId::SINGLE } } /// Build a minimal schema with one signal type. fn make_schema() -> tidaldb::schema::Schema { let mut builder = SchemaBuilder::new(); let _ = builder .signal( "view", EntityKind::Item, DecaySpec::Exponential { half_life: Duration::from_secs(7 * 24 * 3600), }, ) .windows(&[Window::AllTime]) .velocity(false) .add(); builder.build().expect("schema must be valid") } /// Resolve the signal type ID for "view" using a throwaway ledger. /// /// Signal type IDs are deterministic (alphabetically sorted, starting at 0). /// For a schema with one signal type "view", the ID is always 0. fn resolve_view_type_id(schema: &tidaldb::schema::Schema) -> tidaldb::signals::SignalTypeId { let ledger = SignalLedger::new(schema.clone(), Box::new(NoopWalWriter)); ledger.resolve_signal_type("view").unwrap() } /// Open a follower TidalDb (ephemeral, follower role). fn open_follower(schema: tidaldb::schema::Schema) -> TidalDb { TidalDb::builder() .ephemeral() .with_schema(schema) .with_cluster(NodeConfig { role: NodeRole::Follower, ..NodeConfig::default() }) .open() .expect("follower should open") } /// Open a leader TidalDb (ephemeral, leader role). fn open_leader(schema: tidaldb::schema::Schema) -> TidalDb { TidalDb::builder() .ephemeral() .with_schema(schema) .with_cluster(NodeConfig { role: NodeRole::Leader, ..NodeConfig::default() }) .open() .expect("leader should open") } // ── Test 1: Follower rejects write calls ───────────────────────────────── #[test] fn follower_rejects_signal_write() { let schema = make_schema(); let follower = open_follower(schema); let err = follower .signal("view", EntityId::new(1), 1.0, Timestamp::now()) .expect_err("follower should reject signal writes"); assert!( matches!(err, TidalError::ReadOnly(_)), "expected ReadOnly error, got: {err}" ); follower.close().unwrap(); } #[test] fn follower_rejects_write_item() { let schema = make_schema(); let follower = open_follower(schema); let meta = HashMap::from([("title".to_string(), "test".to_string())]); let err = follower .write_item_with_metadata(EntityId::new(1), &meta) .expect_err("follower should reject item writes"); assert!( matches!(err, TidalError::ReadOnly(_)), "expected ReadOnly error, got: {err}" ); follower.close().unwrap(); } // ── Test 2: Leader accepts writes ──────────────────────────────────────── #[test] fn leader_accepts_signal_write() { let schema = make_schema(); let leader = open_leader(schema); leader .signal("view", EntityId::new(1), 1.0, Timestamp::now()) .expect("leader should accept signal writes"); let score = leader .read_decay_score(EntityId::new(1), "view", 0) .expect("read should succeed"); assert!(score.is_some(), "signal should have been recorded"); leader.close().unwrap(); } // ── Test 3: Direct payload injection into follower ledger ──────────────── #[test] fn payload_injection_updates_follower_ledger() { let schema = make_schema(); let follower = open_follower(schema.clone()); // Resolve the signal type ID using a standalone ledger (same schema). let type_id = resolve_view_type_id(&schema); let state = follower.replication_state().clone(); // Build a WAL batch payload. let events = vec![EventRecord { entity_id: 42, signal_type: type_id.as_u16() as u8, weight: 3.0, timestamp_nanos: 1_000_000_000, }]; let bytes = encode_batch(&events, 1, 1).unwrap(); // Apply it through the receiver's apply_payload (via the public module). // We cannot call apply_payload directly (it is private), so we use // the InProcessTransport + spawn_receiver path instead. // Create a channel-based transport. let (tx, rx) = crossbeam::channel::bounded(4); let transport = Arc::new(ChannelTransport { rx }); follower.start_replication(Arc::clone(&transport)).unwrap(); // Send the payload. tx.send(WalSegmentPayload { id: WalSegmentId::new(tidaldb::replication::RegionId::SINGLE, ShardId::SINGLE, 1), bytes, event_count: 1, }) .unwrap(); // Give the receiver a moment to process. std::thread::sleep(Duration::from_millis(100)); // Verify the follower's ledger was updated. let score = follower .read_decay_score(EntityId::new(42), "view", 0) .expect("read should succeed"); assert!(score.is_some(), "signal should be visible on follower"); // Verify replication state advanced. let applied = state.applied_seqno(ShardId::SINGLE); assert_eq!(applied, Some(1), "replication state should have advanced"); // Shutdown: drop sender so receiver exits. drop(tx); follower.close().unwrap(); } // ── Test 4: Idempotent replay ──────────────────────────────────────────── #[test] fn replay_is_idempotent() { let schema = make_schema(); let ledger = Arc::new(SignalLedger::new(schema, Box::new(NoopWalWriter))); let state = Arc::new(ReplicationState::single()); let type_id = ledger.resolve_signal_type("view").unwrap(); let events = vec![EventRecord { entity_id: 10, signal_type: type_id.as_u16() as u8, weight: 5.0, timestamp_nanos: 1_000_000_000, }]; let bytes = encode_batch(&events, 1, 1).unwrap(); // Build a transport that delivers the same segment twice. let (tx, rx) = crossbeam::channel::bounded(4); let transport = Arc::new(ChannelTransport { rx }); let handle = tidaldb::replication::spawn_receiver( Arc::clone(&transport), Arc::clone(&ledger), Arc::clone(&state), ); // Send the same segment twice. for _ in 0..2 { tx.send(WalSegmentPayload { id: WalSegmentId::new(tidaldb::replication::RegionId::SINGLE, ShardId::SINGLE, 1), bytes: bytes.clone(), event_count: 1, }) .unwrap(); } std::thread::sleep(Duration::from_millis(100)); drop(tx); handle.join(); // The entity should exist with weight=5.0, not 10.0. // (Idempotent replay means the second apply was a no-op.) assert_eq!(state.applied_seqno(ShardId::SINGLE), Some(1)); // Read the hot tier directly to verify only one application. let entry = ledger.entries().get(&(EntityId::new(10), type_id)); assert!(entry.is_some(), "entity should exist in ledger"); } // ── Test 5: InProcessTransport end-to-end ──────────────────────────────── #[test] fn in_process_transport_delivers_segment() { let shards = vec![ShardId(0), ShardId(1)]; let mut transports = InProcessTransportFactory::new(&shards).build(); let t0 = transports.remove(&ShardId(0)).unwrap(); let t1 = transports.remove(&ShardId(1)).unwrap(); let schema = make_schema(); let ledger = Arc::new(SignalLedger::new(schema, Box::new(NoopWalWriter))); let type_id = ledger.resolve_signal_type("view").unwrap(); // Shard 0 sends a segment to shard 1. let events = vec![EventRecord { entity_id: 99, signal_type: type_id.as_u16() as u8, weight: 2.0, timestamp_nanos: 500, }]; let bytes = encode_batch(&events, 1, 42).unwrap(); t0.send_segment( ShardId(1), WalSegmentPayload { id: WalSegmentId::new(tidaldb::replication::RegionId::SINGLE, ShardId(0), 42), bytes: bytes.clone(), event_count: 1, }, ) .unwrap(); // Shard 1 receives. let payload = t1.recv_segment(); assert!(payload.is_some(), "shard 1 should receive the segment"); let payload = payload.unwrap(); assert_eq!(payload.id.seqno, 42); assert_eq!(payload.event_count, 1); assert_eq!(payload.bytes, bytes); // Drop both transports to clean up. drop(t0); drop(t1); } // ── Test 6: ReplicationLagGauge ────────────────────────────────────────── #[test] fn replication_lag_gauge_tracks_lag() { let state = Arc::new(ReplicationState::single()); let gauge = ReplicationLagGauge::new(ShardId::SINGLE, Arc::clone(&state)); // Initially, both leader and applied are 0 => lag = 0. assert_eq!(gauge.lag_segments(), 0); // Leader moves ahead. gauge.update_leader_seqno(10); assert_eq!(gauge.lag_segments(), 10); // Follower catches up partially. state.advance(ShardId::SINGLE, 7); assert_eq!(gauge.lag_segments(), 3); // Follower catches up completely. state.advance(ShardId::SINGLE, 10); assert_eq!(gauge.lag_segments(), 0); } // ── Test 7: Full pipeline: leader -> transport -> follower ─────────────── #[test] fn full_pipeline_leader_to_follower() { let schema = make_schema(); // Open leader and follower. let leader = open_leader(schema.clone()); let follower = open_follower(schema.clone()); // Resolve type ID using a standalone ledger (same schema). let type_id = resolve_view_type_id(&schema); let follower_state = follower.replication_state().clone(); // Wire up a channel-based transport for the follower. let (tx, rx) = crossbeam::channel::bounded(16); let transport = Arc::new(ChannelTransport { rx }); follower.start_replication(Arc::clone(&transport)).unwrap(); // Write signals on the leader. let ts = Timestamp::from_nanos(2_000_000_000); leader.signal("view", EntityId::new(100), 1.0, ts).unwrap(); leader.signal("view", EntityId::new(101), 2.0, ts).unwrap(); // Simulate the shipper: build a WAL payload from the leader's signals // and send it to the follower via the transport. let events = vec![ EventRecord { entity_id: 100, signal_type: type_id.as_u16() as u8, weight: 1.0, timestamp_nanos: 2_000_000_000, }, EventRecord { entity_id: 101, signal_type: type_id.as_u16() as u8, weight: 2.0, timestamp_nanos: 2_000_000_000, }, ]; let batch_bytes = encode_batch(&events, 1, 1).unwrap(); tx.send(WalSegmentPayload { id: WalSegmentId::new(tidaldb::replication::RegionId::SINGLE, ShardId::SINGLE, 1), bytes: batch_bytes, event_count: 2, }) .unwrap(); // Wait for the follower to process. std::thread::sleep(Duration::from_millis(100)); // Verify the follower has the signals. let score_100 = follower .read_decay_score(EntityId::new(100), "view", 0) .unwrap(); let score_101 = follower .read_decay_score(EntityId::new(101), "view", 0) .unwrap(); assert!( score_100.is_some(), "entity 100 should be visible on follower" ); assert!( score_101.is_some(), "entity 101 should be visible on follower" ); // Verify replication state. // Batch has 2 events starting at seq 1, so last seq = 1 + 2 - 1 = 2. let applied = follower_state.applied_seqno(ShardId::SINGLE); assert_eq!( applied, Some(2), "replication state should reflect applied batch" ); // Cleanup. drop(tx); leader.close().unwrap(); follower.close().unwrap(); } // ── Test 8: Follower rejects session writes ────────────────────────────── #[test] fn follower_rejects_start_session() { let schema = make_schema(); let follower = open_follower(schema); // start_session is a write operation — followers must reject it. let err = follower .start_session(1, "test-agent", "default", HashMap::new()) .expect_err("follower should reject start_session"); assert!( matches!(err, TidalError::ReadOnly(_)), "expected ReadOnly error, got: {err}" ); follower.close().unwrap(); } // ── Test 9: 1K-signal decay score equivalence (6 decimal places) ───────── #[test] fn replication_decay_scores_match() { let schema = make_schema(); let leader = open_leader(schema.clone()); let follower = open_follower(schema.clone()); let type_id = resolve_view_type_id(&schema); let follower_state = follower.replication_state().clone(); // Wire up transport for follower. let (tx, rx) = crossbeam::channel::bounded(16); let transport = Arc::new(ChannelTransport { rx }); follower.start_replication(Arc::clone(&transport)).unwrap(); // Write 1,000 signals on the leader with varying timestamps and weights. let base_ns = 1_000_000_000u64; let mut all_events = Vec::with_capacity(1000); for i in 0u64..1000 { let ts = Timestamp::from_nanos(base_ns + i * 1_000_000); // 1ms apart let weight = 1.0 + (i as f64) * 0.001; let entity = EntityId::new(i + 1); leader.signal("view", entity, weight, ts).unwrap(); all_events.push(EventRecord { entity_id: i + 1, signal_type: type_id.as_u16() as u8, weight: weight as f32, timestamp_nanos: base_ns + i * 1_000_000, }); } // Ship events in batches of 200 (encode_batch max is 256). let batch_size = 200; for (batch_idx, chunk) in all_events.chunks(batch_size).enumerate() { let seqno = (batch_idx + 1) as u64; let batch_bytes = encode_batch(chunk, 1, seqno).unwrap(); tx.send(WalSegmentPayload { id: WalSegmentId::new( tidaldb::replication::RegionId::SINGLE, ShardId::SINGLE, seqno, ), bytes: batch_bytes, event_count: chunk.len() as u64, }) .unwrap(); } // Wait for processing. std::thread::sleep(Duration::from_millis(200)); // Compare decay scores (decay_rate_idx=0 reads current score at now()). let mut mismatches = 0; for i in 0u64..1000 { let entity = EntityId::new(i + 1); let leader_score = leader .read_decay_score(entity, "view", 0) .unwrap() .unwrap_or(0.0); let follower_score = follower .read_decay_score(entity, "view", 0) .unwrap() .unwrap_or(0.0); if (leader_score - follower_score).abs() > 1e-6 { mismatches += 1; } } assert_eq!( mismatches, 0, "all 1,000 decay scores should match to 6 decimal places" ); // Verify replication state advanced. let applied = follower_state.applied_seqno(ShardId::SINGLE); assert!(applied.is_some(), "replication state should have advanced"); drop(tx); leader.close().unwrap(); follower.close().unwrap(); } // ── Test 10: Follower serves retrieve queries ──────────────────────────── #[test] fn follower_serves_retrieve_queries() { let schema = make_schema(); let follower = open_follower(schema.clone()); let type_id = resolve_view_type_id(&schema); // Wire up transport. let (tx, rx) = crossbeam::channel::bounded(4); let transport = Arc::new(ChannelTransport { rx }); follower.start_replication(Arc::clone(&transport)).unwrap(); // Replicate some signals to the follower. let events = vec![ EventRecord { entity_id: 200, signal_type: type_id.as_u16() as u8, weight: 5.0, timestamp_nanos: 1_000_000_000, }, EventRecord { entity_id: 201, signal_type: type_id.as_u16() as u8, weight: 3.0, timestamp_nanos: 1_000_000_000, }, ]; let bytes = encode_batch(&events, 1, 1).unwrap(); tx.send(WalSegmentPayload { id: WalSegmentId::new(tidaldb::replication::RegionId::SINGLE, ShardId::SINGLE, 1), bytes, event_count: 2, }) .unwrap(); std::thread::sleep(Duration::from_millis(100)); // Execute a retrieve query on the follower — should NOT return ReadOnly. // NOTE: We assert `is_ok()` rather than checking result contents because // signals-only replication does not populate items_storage (the retrieve // pipeline requires item metadata to produce ranked results). This test // validates that the follower's read-path is accessible, not that // replicated signals produce ranked output. let query = Retrieve::builder().build().unwrap(); let result = follower.retrieve(&query); assert!( result.is_ok(), "follower should serve retrieve queries, got: {:?}", result.err() ); drop(tx); follower.close().unwrap(); } // ── Test 11: Corrupted segment causes receiver to stop ─────────────────── #[test] fn corrupted_segment_is_rejected() { // New contract: a BLAKE3 failure causes the receiver thread to exit with // WalError::Corruption so operators can trigger remediation. The receiver // does NOT silently skip the corrupt payload and continue. let schema = make_schema(); let follower = open_follower(schema.clone()); let type_id = resolve_view_type_id(&schema); let (tx, rx) = crossbeam::channel::bounded(8); let transport = Arc::new(ChannelTransport { rx }); follower.start_replication(Arc::clone(&transport)).unwrap(); // Build a valid batch, then corrupt it. let events = vec![EventRecord { entity_id: 500, signal_type: type_id.as_u16() as u8, weight: 10.0, timestamp_nanos: 1_000_000_000, }]; let mut corrupted = encode_batch(&events, 1, 1).unwrap(); // Flip a byte in the payload region (past the 64-byte header) to trigger BLAKE3 mismatch. let corrupt_offset = HEADER_SIZE + 1; assert!( corrupted.len() > corrupt_offset, "batch too short to corrupt payload" ); corrupted[corrupt_offset] ^= 0xFF; tx.send(WalSegmentPayload { id: WalSegmentId::new(tidaldb::replication::RegionId::SINGLE, ShardId::SINGLE, 1), bytes: corrupted, event_count: 1, }) .unwrap(); // Queue a valid segment after the corrupt one. Under the new contract the // receiver has already stopped after the corrupt payload, so entity 501 // will NOT be applied either. let valid_events = vec![EventRecord { entity_id: 501, signal_type: type_id.as_u16() as u8, weight: 7.0, timestamp_nanos: 2_000_000_000, }]; let valid_bytes = encode_batch(&valid_events, 1, 2).unwrap(); tx.send(WalSegmentPayload { id: WalSegmentId::new(tidaldb::replication::RegionId::SINGLE, ShardId::SINGLE, 2), bytes: valid_bytes, event_count: 1, }) .unwrap(); // Give the receiver thread time to process the corrupt payload and exit. std::thread::sleep(Duration::from_millis(200)); // Entity 500 (from corrupted segment) must NOT be present. let score_500 = follower .read_decay_score(EntityId::new(500), "view", 0) .unwrap(); assert!( score_500.is_none(), "corrupted segment entity should not appear" ); // Entity 501 is also absent: the receiver stopped on corruption and did // not process subsequent payloads. Operators must restart replication // after investigating the corruption. let score_501 = follower .read_decay_score(EntityId::new(501), "view", 0) .unwrap(); assert!( score_501.is_none(), "receiver should have stopped after corruption; entity 501 must not appear" ); // close() logs the corruption warning but does not return an error // (WalError is not TidalError; lifecycle logs and moves on). drop(tx); follower.close().unwrap(); } // ── Test 12: Builder with_transport auto-starts follower receiver ───────── // // Validates that `TidalDbBuilder::with_transport` wires the segment receiver // automatically at open time so the caller never needs to invoke // `start_replication` manually. #[test] fn with_transport_auto_wires_follower_receiver() { let schema = make_schema(); let type_id = resolve_view_type_id(&schema); // Build a channel transport. The sender (`tx`) plays the role of a // leader shipping WAL segments; the receiver is handed to the follower. let (tx, rx) = crossbeam::channel::bounded::(16); let follower_transport = Arc::new(ChannelTransport { rx }) as Arc; // Open the follower via the builder — receiver must start automatically. // No explicit call to `start_replication` is made. let follower = TidalDb::builder() .ephemeral() .with_schema(schema.clone()) .with_cluster(NodeConfig { role: NodeRole::Follower, ..NodeConfig::default() }) .with_transport(follower_transport) .open() .expect("follower with auto-wired transport should open"); // Open a standalone leader (no transport — we ship segments manually). let leader = open_leader(schema.clone()); // Write one signal on the leader. let ts = Timestamp::from_nanos(1_000_000_000); leader.signal("view", EntityId::new(42), 2.5, ts).unwrap(); // Encode the same event into a WAL batch and ship it to the follower. let events = vec![EventRecord { entity_id: 42, signal_type: type_id.as_u16() as u8, weight: 2.5_f32, timestamp_nanos: 1_000_000_000, }]; let batch_bytes = encode_batch(&events, 1, 1).unwrap(); tx.send(WalSegmentPayload { id: WalSegmentId::new(tidaldb::replication::RegionId::SINGLE, ShardId::SINGLE, 1), bytes: batch_bytes, event_count: 1, }) .unwrap(); // Give the auto-started receiver time to apply the segment. std::thread::sleep(Duration::from_millis(100)); // The follower must have the replicated signal. let follower_score = follower .read_decay_score(EntityId::new(42), "view", 0) .unwrap(); assert!( follower_score.is_some(), "follower should see entity 42 after auto-wired replication" ); // Replication state must reflect the applied seqno. assert_eq!( follower.replication_state().applied_seqno(ShardId::SINGLE), Some(1), "follower replication state should be at seqno 1" ); // Drop sender first so the receiver thread can exit cleanly. drop(tx); leader.close().unwrap(); follower.close().unwrap(); } // ── Test 13: Replication lag converges to zero ─────────────────────────── #[test] fn replication_lag_converges_to_zero() { let state = Arc::new(ReplicationState::single()); let gauge = ReplicationLagGauge::new(ShardId::SINGLE, Arc::clone(&state)); // Simulate 10 segments shipped. for seqno in 1..=10u64 { gauge.update_leader_seqno(seqno); } assert_eq!(gauge.lag_segments(), 10); // Follower applies segments 1..=10. for seqno in 1..=10u64 { state.advance(ShardId::SINGLE, seqno); } assert_eq!(gauge.lag_segments(), 0, "lag should be 0 after catching up"); // Another batch: leader ships 11..=20. for seqno in 11..=20u64 { gauge.update_leader_seqno(seqno); } assert_eq!(gauge.lag_segments(), 10); // Follower catches up. state.advance(ShardId::SINGLE, 20); assert_eq!( gauge.lag_segments(), 0, "lag should converge to 0 again after second batch" ); }