tidaldb/docs/planning/milestone-8/phase-4/task-03-session-replication-bridge.md
jordan f4cfd6c81f feat: complete M8 replication primitives + forage enhancements + docs
Milestone 8 (phases 1-4):
- Shard-aware WAL segment naming, BatchHeader v2, ShardRouter
- Transport trait, InProcessTransport, WalShipper, FollowerDb
- HLC, PNCounter, LWWRegister, CrdtSignalState, ReconciliationEngine
- Session replication bridge with SeqNo/HWM, idempotency store

Forage application:
- Multi-source discovery engine with MAB exploration
- Embedding-based label system, server handlers, UI refresh

Other:
- QUICKSTART.md, README.md, milestone-8 planning docs
- Hard negative union semantics, RLHF export enhancements
- Recovery benchmark and visibility test expansions
- Split 8 oversized source files per CODING_GUIDELINES §9

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 13:17:19 -07:00

192 lines
6.8 KiB
Markdown

# Task 03: SessionReplicationBridge
## Delivers
`SessionReplicationBridge` in `tidal/src/replication/session_bridge.rs`. Bundles session journal entries alongside WAL segments for transport to follower nodes. Session events are transmitted on a separate channel from signal WAL segments, keeping the signal-critical path unaffected by session I/O.
## Complexity: M
## Dependencies
- Task 01 (SessionSeqNo + WAL format extension)
- Task 02 (IdempotencyKey + IdempotencyStore)
- Phase 8.2 (Transport trait, WalShipper)
## Technical Design
```rust
// tidal/src/replication/session_bridge.rs
/// Replicates session journal entries to follower nodes.
///
/// Session events piggyback on the same `Transport` as WAL segments but
/// use a dedicated `SessionPayload` envelope, not the signal WAL format.
/// This separation lets us tune session replication (e.g., smaller MTU,
/// higher frequency) independently of signal WAL shipping.
pub struct SessionReplicationBridge {
transport: Arc<dyn Transport>,
session_journal: Arc<SessionJournal>,
idempotency_store: Arc<IdempotencyStore>,
seqno_tracker: Arc<SessionSeqNoTracker>,
/// Highest seqno shipped per (session_id, region_id) pair.
ship_hwm: DashMap<(SessionId, RegionId), SessionSeqNo>,
}
/// Envelope for session events shipped over the Transport.
///
/// Distinct from `WalSegmentPayload` -- the transport multiplexes these
/// by payload kind byte (0x01 = WAL segment, 0x02 = session batch).
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct SessionPayload {
pub kind: PayloadKind,
pub source_region: RegionId,
pub source_shard: ShardId,
pub events: Vec<SessionWalEvent>,
/// BLAKE3 checksum of serialized `events` bytes.
pub checksum: [u8; 32],
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[repr(u8)]
pub enum PayloadKind {
WalSegment = 0x01,
SessionBatch = 0x02,
}
impl SessionReplicationBridge {
pub fn new(
transport: Arc<dyn Transport>,
session_journal: Arc<SessionJournal>,
idempotency_store: Arc<IdempotencyStore>,
seqno_tracker: Arc<SessionSeqNoTracker>,
) -> Self {
Self {
transport,
session_journal,
idempotency_store,
seqno_tracker,
ship_hwm: DashMap::new(),
}
}
/// Ship all un-shipped session events for `session_id` to `target_region`.
///
/// Fetches events from the session journal whose seqno > current ship HWM.
/// Bundles them into a `SessionPayload`, ships via `Transport::send_session_batch`.
/// Updates ship HWM on success.
pub async fn ship_session(
&self,
session_id: SessionId,
target_region: RegionId,
) -> Result<(), TransportError> {
let hwm_key = (session_id, target_region);
let current_hwm = self.ship_hwm
.get(&hwm_key)
.map(|v| *v)
.unwrap_or(SessionSeqNo::ZERO);
let events = self.session_journal.events_after(session_id, current_hwm)?;
if events.is_empty() {
return Ok(());
}
let highest_seqno = events.iter()
.filter_map(|e| e.session_seqno)
.max()
.unwrap_or(SessionSeqNo::ZERO);
let payload = self.build_payload(events)?;
self.transport.send_session_batch(target_region, payload).await?;
self.ship_hwm.insert(hwm_key, highest_seqno);
Ok(())
}
/// Receive and apply an incoming `SessionPayload` from a remote region.
///
/// Validates checksum, then applies each event through the idempotency
/// store + seqno tracker pipeline before forwarding to the session manager.
pub async fn receive_session_batch(
&self,
payload: SessionPayload,
session_manager: &SessionManager,
) -> Result<usize> {
// Validate BLAKE3 checksum.
let serialized = bincode::serialize(&payload.events)?;
let expected = blake3::hash(&serialized);
if expected.as_bytes() != &payload.checksum {
return Err(TidalError::CorruptedWal("session batch checksum mismatch".into()));
}
let mut applied = 0;
for event in &payload.events {
// Layer 1: SeqNo HWM.
if let Some(seqno) = event.session_seqno {
if !self.seqno_tracker.should_apply(event.session_id, seqno) {
continue;
}
}
// Layer 2: Idempotency key.
if let Some(key_int) = event.idempotency_key {
let key = IdempotencyKey(key_int);
if !self.idempotency_store.check_and_record(key) {
continue;
}
}
session_manager.apply_wal_event(event)?;
applied += 1;
}
Ok(applied)
}
fn build_payload(&self, events: Vec<SessionWalEvent>) -> Result<SessionPayload> {
let serialized = bincode::serialize(&events)?;
let checksum = *blake3::hash(&serialized).as_bytes();
Ok(SessionPayload {
kind: PayloadKind::SessionBatch,
source_region: self.session_journal.region_id(),
source_shard: ShardId(0), // session journal is not sharded by entity
events,
checksum,
})
}
}
```
### Transport Extension
```rust
// tidal/src/replication/transport.rs (extension to Transport trait)
#[async_trait::async_trait]
pub trait Transport: Send + Sync + 'static {
// --- existing methods (unchanged) ---
async fn send_segment(
&self,
target: RegionId,
payload: WalSegmentPayload,
) -> Result<(), TransportError>;
async fn recv_segment(&self) -> Result<WalSegmentPayload, TransportError>;
// --- new session methods ---
async fn send_session_batch(
&self,
target: RegionId,
payload: SessionPayload,
) -> Result<(), TransportError>;
async fn recv_session_batch(&self) -> Result<SessionPayload, TransportError>;
}
```
## Acceptance Criteria
- [ ] `SessionReplicationBridge::ship_session(session_id, target)` fetches only events with seqno > current ship HWM; does nothing on empty diff
- [ ] `receive_session_batch` validates the BLAKE3 checksum; returns `TidalError::CorruptedWal` on mismatch
- [ ] Duplicate events (same idempotency key or same seqno <= HWM) are silently dropped; applied count reflects only new events
- [ ] `PayloadKind::SessionBatch` (0x02) is distinct from `PayloadKind::WalSegment` (0x01); transport multiplexes by kind byte
- [ ] `Transport` trait extended with `send_session_batch` / `recv_session_batch`; `InProcessTransport` implements both new methods
- [ ] Unit test: ship 10 session events, receive on follower, verify 10 applied; re-ship same events, verify 0 applied (idempotent)
- [ ] `cargo clippy -D warnings` and `cargo fmt` pass