Milestone 8 (phases 1-4): - Shard-aware WAL segment naming, BatchHeader v2, ShardRouter - Transport trait, InProcessTransport, WalShipper, FollowerDb - HLC, PNCounter, LWWRegister, CrdtSignalState, ReconciliationEngine - Session replication bridge with SeqNo/HWM, idempotency store Forage application: - Multi-source discovery engine with MAB exploration - Embedding-based label system, server handlers, UI refresh Other: - QUICKSTART.md, README.md, milestone-8 planning docs - Hard negative union semantics, RLHF export enhancements - Recovery benchmark and visibility test expansions - Split 8 oversized source files per CODING_GUIDELINES §9 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
6.8 KiB
6.8 KiB
Task 03: SessionReplicationBridge
Delivers
SessionReplicationBridge in tidal/src/replication/session_bridge.rs. Bundles session journal entries alongside WAL segments for transport to follower nodes. Session events are transmitted on a separate channel from signal WAL segments, keeping the signal-critical path unaffected by session I/O.
Complexity: M
Dependencies
- Task 01 (SessionSeqNo + WAL format extension)
- Task 02 (IdempotencyKey + IdempotencyStore)
- Phase 8.2 (Transport trait, WalShipper)
Technical Design
// tidal/src/replication/session_bridge.rs
/// Replicates session journal entries to follower nodes.
///
/// Session events piggyback on the same `Transport` as WAL segments but
/// use a dedicated `SessionPayload` envelope, not the signal WAL format.
/// This separation lets us tune session replication (e.g., smaller MTU,
/// higher frequency) independently of signal WAL shipping.
pub struct SessionReplicationBridge {
transport: Arc<dyn Transport>,
session_journal: Arc<SessionJournal>,
idempotency_store: Arc<IdempotencyStore>,
seqno_tracker: Arc<SessionSeqNoTracker>,
/// Highest seqno shipped per (session_id, region_id) pair.
ship_hwm: DashMap<(SessionId, RegionId), SessionSeqNo>,
}
/// Envelope for session events shipped over the Transport.
///
/// Distinct from `WalSegmentPayload` -- the transport multiplexes these
/// by payload kind byte (0x01 = WAL segment, 0x02 = session batch).
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct SessionPayload {
pub kind: PayloadKind,
pub source_region: RegionId,
pub source_shard: ShardId,
pub events: Vec<SessionWalEvent>,
/// BLAKE3 checksum of serialized `events` bytes.
pub checksum: [u8; 32],
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[repr(u8)]
pub enum PayloadKind {
WalSegment = 0x01,
SessionBatch = 0x02,
}
impl SessionReplicationBridge {
pub fn new(
transport: Arc<dyn Transport>,
session_journal: Arc<SessionJournal>,
idempotency_store: Arc<IdempotencyStore>,
seqno_tracker: Arc<SessionSeqNoTracker>,
) -> Self {
Self {
transport,
session_journal,
idempotency_store,
seqno_tracker,
ship_hwm: DashMap::new(),
}
}
/// Ship all un-shipped session events for `session_id` to `target_region`.
///
/// Fetches events from the session journal whose seqno > current ship HWM.
/// Bundles them into a `SessionPayload`, ships via `Transport::send_session_batch`.
/// Updates ship HWM on success.
pub async fn ship_session(
&self,
session_id: SessionId,
target_region: RegionId,
) -> Result<(), TransportError> {
let hwm_key = (session_id, target_region);
let current_hwm = self.ship_hwm
.get(&hwm_key)
.map(|v| *v)
.unwrap_or(SessionSeqNo::ZERO);
let events = self.session_journal.events_after(session_id, current_hwm)?;
if events.is_empty() {
return Ok(());
}
let highest_seqno = events.iter()
.filter_map(|e| e.session_seqno)
.max()
.unwrap_or(SessionSeqNo::ZERO);
let payload = self.build_payload(events)?;
self.transport.send_session_batch(target_region, payload).await?;
self.ship_hwm.insert(hwm_key, highest_seqno);
Ok(())
}
/// Receive and apply an incoming `SessionPayload` from a remote region.
///
/// Validates checksum, then applies each event through the idempotency
/// store + seqno tracker pipeline before forwarding to the session manager.
pub async fn receive_session_batch(
&self,
payload: SessionPayload,
session_manager: &SessionManager,
) -> Result<usize> {
// Validate BLAKE3 checksum.
let serialized = bincode::serialize(&payload.events)?;
let expected = blake3::hash(&serialized);
if expected.as_bytes() != &payload.checksum {
return Err(TidalError::CorruptedWal("session batch checksum mismatch".into()));
}
let mut applied = 0;
for event in &payload.events {
// Layer 1: SeqNo HWM.
if let Some(seqno) = event.session_seqno {
if !self.seqno_tracker.should_apply(event.session_id, seqno) {
continue;
}
}
// Layer 2: Idempotency key.
if let Some(key_int) = event.idempotency_key {
let key = IdempotencyKey(key_int);
if !self.idempotency_store.check_and_record(key) {
continue;
}
}
session_manager.apply_wal_event(event)?;
applied += 1;
}
Ok(applied)
}
fn build_payload(&self, events: Vec<SessionWalEvent>) -> Result<SessionPayload> {
let serialized = bincode::serialize(&events)?;
let checksum = *blake3::hash(&serialized).as_bytes();
Ok(SessionPayload {
kind: PayloadKind::SessionBatch,
source_region: self.session_journal.region_id(),
source_shard: ShardId(0), // session journal is not sharded by entity
events,
checksum,
})
}
}
Transport Extension
// tidal/src/replication/transport.rs (extension to Transport trait)
#[async_trait::async_trait]
pub trait Transport: Send + Sync + 'static {
// --- existing methods (unchanged) ---
async fn send_segment(
&self,
target: RegionId,
payload: WalSegmentPayload,
) -> Result<(), TransportError>;
async fn recv_segment(&self) -> Result<WalSegmentPayload, TransportError>;
// --- new session methods ---
async fn send_session_batch(
&self,
target: RegionId,
payload: SessionPayload,
) -> Result<(), TransportError>;
async fn recv_session_batch(&self) -> Result<SessionPayload, TransportError>;
}
Acceptance Criteria
SessionReplicationBridge::ship_session(session_id, target)fetches only events with seqno > current ship HWM; does nothing on empty diffreceive_session_batchvalidates the BLAKE3 checksum; returnsTidalError::CorruptedWalon mismatch- Duplicate events (same idempotency key or same seqno <= HWM) are silently dropped; applied count reflects only new events
PayloadKind::SessionBatch(0x02) is distinct fromPayloadKind::WalSegment(0x01); transport multiplexes by kind byteTransporttrait extended withsend_session_batch/recv_session_batch;InProcessTransportimplements both new methods- Unit test: ship 10 session events, receive on follower, verify 10 applied; re-ship same events, verify 0 applied (idempotent)
cargo clippy -D warningsandcargo fmtpass