# Task 03: SessionReplicationBridge ## Delivers `SessionReplicationBridge` in `tidal/src/replication/session_bridge.rs`. Bundles session journal entries alongside WAL segments for transport to follower nodes. Session events are transmitted on a separate channel from signal WAL segments, keeping the signal-critical path unaffected by session I/O. ## Complexity: M ## Dependencies - Task 01 (SessionSeqNo + WAL format extension) - Task 02 (IdempotencyKey + IdempotencyStore) - Phase 8.2 (Transport trait, WalShipper) ## Technical Design ```rust // tidal/src/replication/session_bridge.rs /// Replicates session journal entries to follower nodes. /// /// Session events piggyback on the same `Transport` as WAL segments but /// use a dedicated `SessionPayload` envelope, not the signal WAL format. /// This separation lets us tune session replication (e.g., smaller MTU, /// higher frequency) independently of signal WAL shipping. pub struct SessionReplicationBridge { transport: Arc, session_journal: Arc, idempotency_store: Arc, seqno_tracker: Arc, /// Highest seqno shipped per (session_id, region_id) pair. ship_hwm: DashMap<(SessionId, RegionId), SessionSeqNo>, } /// Envelope for session events shipped over the Transport. /// /// Distinct from `WalSegmentPayload` -- the transport multiplexes these /// by payload kind byte (0x01 = WAL segment, 0x02 = session batch). #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct SessionPayload { pub kind: PayloadKind, pub source_region: RegionId, pub source_shard: ShardId, pub events: Vec, /// BLAKE3 checksum of serialized `events` bytes. pub checksum: [u8; 32], } #[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] #[repr(u8)] pub enum PayloadKind { WalSegment = 0x01, SessionBatch = 0x02, } impl SessionReplicationBridge { pub fn new( transport: Arc, session_journal: Arc, idempotency_store: Arc, seqno_tracker: Arc, ) -> Self { Self { transport, session_journal, idempotency_store, seqno_tracker, ship_hwm: DashMap::new(), } } /// Ship all un-shipped session events for `session_id` to `target_region`. /// /// Fetches events from the session journal whose seqno > current ship HWM. /// Bundles them into a `SessionPayload`, ships via `Transport::send_session_batch`. /// Updates ship HWM on success. pub async fn ship_session( &self, session_id: SessionId, target_region: RegionId, ) -> Result<(), TransportError> { let hwm_key = (session_id, target_region); let current_hwm = self.ship_hwm .get(&hwm_key) .map(|v| *v) .unwrap_or(SessionSeqNo::ZERO); let events = self.session_journal.events_after(session_id, current_hwm)?; if events.is_empty() { return Ok(()); } let highest_seqno = events.iter() .filter_map(|e| e.session_seqno) .max() .unwrap_or(SessionSeqNo::ZERO); let payload = self.build_payload(events)?; self.transport.send_session_batch(target_region, payload).await?; self.ship_hwm.insert(hwm_key, highest_seqno); Ok(()) } /// Receive and apply an incoming `SessionPayload` from a remote region. /// /// Validates checksum, then applies each event through the idempotency /// store + seqno tracker pipeline before forwarding to the session manager. pub async fn receive_session_batch( &self, payload: SessionPayload, session_manager: &SessionManager, ) -> Result { // Validate BLAKE3 checksum. let serialized = bincode::serialize(&payload.events)?; let expected = blake3::hash(&serialized); if expected.as_bytes() != &payload.checksum { return Err(TidalError::CorruptedWal("session batch checksum mismatch".into())); } let mut applied = 0; for event in &payload.events { // Layer 1: SeqNo HWM. if let Some(seqno) = event.session_seqno { if !self.seqno_tracker.should_apply(event.session_id, seqno) { continue; } } // Layer 2: Idempotency key. if let Some(key_int) = event.idempotency_key { let key = IdempotencyKey(key_int); if !self.idempotency_store.check_and_record(key) { continue; } } session_manager.apply_wal_event(event)?; applied += 1; } Ok(applied) } fn build_payload(&self, events: Vec) -> Result { let serialized = bincode::serialize(&events)?; let checksum = *blake3::hash(&serialized).as_bytes(); Ok(SessionPayload { kind: PayloadKind::SessionBatch, source_region: self.session_journal.region_id(), source_shard: ShardId(0), // session journal is not sharded by entity events, checksum, }) } } ``` ### Transport Extension ```rust // tidal/src/replication/transport.rs (extension to Transport trait) #[async_trait::async_trait] pub trait Transport: Send + Sync + 'static { // --- existing methods (unchanged) --- async fn send_segment( &self, target: RegionId, payload: WalSegmentPayload, ) -> Result<(), TransportError>; async fn recv_segment(&self) -> Result; // --- new session methods --- async fn send_session_batch( &self, target: RegionId, payload: SessionPayload, ) -> Result<(), TransportError>; async fn recv_session_batch(&self) -> Result; } ``` ## Acceptance Criteria - [ ] `SessionReplicationBridge::ship_session(session_id, target)` fetches only events with seqno > current ship HWM; does nothing on empty diff - [ ] `receive_session_batch` validates the BLAKE3 checksum; returns `TidalError::CorruptedWal` on mismatch - [ ] Duplicate events (same idempotency key or same seqno <= HWM) are silently dropped; applied count reflects only new events - [ ] `PayloadKind::SessionBatch` (0x02) is distinct from `PayloadKind::WalSegment` (0x01); transport multiplexes by kind byte - [ ] `Transport` trait extended with `send_session_batch` / `recv_session_batch`; `InProcessTransport` implements both new methods - [ ] Unit test: ship 10 session events, receive on follower, verify 10 applied; re-ship same events, verify 0 applied (idempotent) - [ ] `cargo clippy -D warnings` and `cargo fmt` pass