Milestone 8 (phases 1-4): - Shard-aware WAL segment naming, BatchHeader v2, ShardRouter - Transport trait, InProcessTransport, WalShipper, FollowerDb - HLC, PNCounter, LWWRegister, CrdtSignalState, ReconciliationEngine - Session replication bridge with SeqNo/HWM, idempotency store Forage application: - Multi-source discovery engine with MAB exploration - Embedding-based label system, server handlers, UI refresh Other: - QUICKSTART.md, README.md, milestone-8 planning docs - Hard negative union semantics, RLHF export enhancements - Recovery benchmark and visibility test expansions - Split 8 oversized source files per CODING_GUIDELINES §9 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
192 lines
6.8 KiB
Markdown
192 lines
6.8 KiB
Markdown
# Task 03: SessionReplicationBridge
|
|
|
|
## Delivers
|
|
|
|
`SessionReplicationBridge` in `tidal/src/replication/session_bridge.rs`. Bundles session journal entries alongside WAL segments for transport to follower nodes. Session events are transmitted on a separate channel from signal WAL segments, keeping the signal-critical path unaffected by session I/O.
|
|
|
|
## Complexity: M
|
|
|
|
## Dependencies
|
|
|
|
- Task 01 (SessionSeqNo + WAL format extension)
|
|
- Task 02 (IdempotencyKey + IdempotencyStore)
|
|
- Phase 8.2 (Transport trait, WalShipper)
|
|
|
|
## Technical Design
|
|
|
|
```rust
|
|
// tidal/src/replication/session_bridge.rs
|
|
|
|
/// Replicates session journal entries to follower nodes.
|
|
///
|
|
/// Session events piggyback on the same `Transport` as WAL segments but
|
|
/// use a dedicated `SessionPayload` envelope, not the signal WAL format.
|
|
/// This separation lets us tune session replication (e.g., smaller MTU,
|
|
/// higher frequency) independently of signal WAL shipping.
|
|
pub struct SessionReplicationBridge {
|
|
transport: Arc<dyn Transport>,
|
|
session_journal: Arc<SessionJournal>,
|
|
idempotency_store: Arc<IdempotencyStore>,
|
|
seqno_tracker: Arc<SessionSeqNoTracker>,
|
|
/// Highest seqno shipped per (session_id, region_id) pair.
|
|
ship_hwm: DashMap<(SessionId, RegionId), SessionSeqNo>,
|
|
}
|
|
|
|
/// Envelope for session events shipped over the Transport.
|
|
///
|
|
/// Distinct from `WalSegmentPayload` -- the transport multiplexes these
|
|
/// by payload kind byte (0x01 = WAL segment, 0x02 = session batch).
|
|
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
|
pub struct SessionPayload {
|
|
pub kind: PayloadKind,
|
|
pub source_region: RegionId,
|
|
pub source_shard: ShardId,
|
|
pub events: Vec<SessionWalEvent>,
|
|
/// BLAKE3 checksum of serialized `events` bytes.
|
|
pub checksum: [u8; 32],
|
|
}
|
|
|
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
|
#[repr(u8)]
|
|
pub enum PayloadKind {
|
|
WalSegment = 0x01,
|
|
SessionBatch = 0x02,
|
|
}
|
|
|
|
impl SessionReplicationBridge {
|
|
pub fn new(
|
|
transport: Arc<dyn Transport>,
|
|
session_journal: Arc<SessionJournal>,
|
|
idempotency_store: Arc<IdempotencyStore>,
|
|
seqno_tracker: Arc<SessionSeqNoTracker>,
|
|
) -> Self {
|
|
Self {
|
|
transport,
|
|
session_journal,
|
|
idempotency_store,
|
|
seqno_tracker,
|
|
ship_hwm: DashMap::new(),
|
|
}
|
|
}
|
|
|
|
/// Ship all un-shipped session events for `session_id` to `target_region`.
|
|
///
|
|
/// Fetches events from the session journal whose seqno > current ship HWM.
|
|
/// Bundles them into a `SessionPayload`, ships via `Transport::send_session_batch`.
|
|
/// Updates ship HWM on success.
|
|
pub async fn ship_session(
|
|
&self,
|
|
session_id: SessionId,
|
|
target_region: RegionId,
|
|
) -> Result<(), TransportError> {
|
|
let hwm_key = (session_id, target_region);
|
|
let current_hwm = self.ship_hwm
|
|
.get(&hwm_key)
|
|
.map(|v| *v)
|
|
.unwrap_or(SessionSeqNo::ZERO);
|
|
|
|
let events = self.session_journal.events_after(session_id, current_hwm)?;
|
|
if events.is_empty() {
|
|
return Ok(());
|
|
}
|
|
|
|
let highest_seqno = events.iter()
|
|
.filter_map(|e| e.session_seqno)
|
|
.max()
|
|
.unwrap_or(SessionSeqNo::ZERO);
|
|
|
|
let payload = self.build_payload(events)?;
|
|
self.transport.send_session_batch(target_region, payload).await?;
|
|
|
|
self.ship_hwm.insert(hwm_key, highest_seqno);
|
|
Ok(())
|
|
}
|
|
|
|
/// Receive and apply an incoming `SessionPayload` from a remote region.
|
|
///
|
|
/// Validates checksum, then applies each event through the idempotency
|
|
/// store + seqno tracker pipeline before forwarding to the session manager.
|
|
pub async fn receive_session_batch(
|
|
&self,
|
|
payload: SessionPayload,
|
|
session_manager: &SessionManager,
|
|
) -> Result<usize> {
|
|
// Validate BLAKE3 checksum.
|
|
let serialized = bincode::serialize(&payload.events)?;
|
|
let expected = blake3::hash(&serialized);
|
|
if expected.as_bytes() != &payload.checksum {
|
|
return Err(TidalError::CorruptedWal("session batch checksum mismatch".into()));
|
|
}
|
|
|
|
let mut applied = 0;
|
|
for event in &payload.events {
|
|
// Layer 1: SeqNo HWM.
|
|
if let Some(seqno) = event.session_seqno {
|
|
if !self.seqno_tracker.should_apply(event.session_id, seqno) {
|
|
continue;
|
|
}
|
|
}
|
|
// Layer 2: Idempotency key.
|
|
if let Some(key_int) = event.idempotency_key {
|
|
let key = IdempotencyKey(key_int);
|
|
if !self.idempotency_store.check_and_record(key) {
|
|
continue;
|
|
}
|
|
}
|
|
session_manager.apply_wal_event(event)?;
|
|
applied += 1;
|
|
}
|
|
Ok(applied)
|
|
}
|
|
|
|
fn build_payload(&self, events: Vec<SessionWalEvent>) -> Result<SessionPayload> {
|
|
let serialized = bincode::serialize(&events)?;
|
|
let checksum = *blake3::hash(&serialized).as_bytes();
|
|
Ok(SessionPayload {
|
|
kind: PayloadKind::SessionBatch,
|
|
source_region: self.session_journal.region_id(),
|
|
source_shard: ShardId(0), // session journal is not sharded by entity
|
|
events,
|
|
checksum,
|
|
})
|
|
}
|
|
}
|
|
```
|
|
|
|
### Transport Extension
|
|
|
|
```rust
|
|
// tidal/src/replication/transport.rs (extension to Transport trait)
|
|
|
|
#[async_trait::async_trait]
|
|
pub trait Transport: Send + Sync + 'static {
|
|
// --- existing methods (unchanged) ---
|
|
async fn send_segment(
|
|
&self,
|
|
target: RegionId,
|
|
payload: WalSegmentPayload,
|
|
) -> Result<(), TransportError>;
|
|
|
|
async fn recv_segment(&self) -> Result<WalSegmentPayload, TransportError>;
|
|
|
|
// --- new session methods ---
|
|
async fn send_session_batch(
|
|
&self,
|
|
target: RegionId,
|
|
payload: SessionPayload,
|
|
) -> Result<(), TransportError>;
|
|
|
|
async fn recv_session_batch(&self) -> Result<SessionPayload, TransportError>;
|
|
}
|
|
```
|
|
|
|
## Acceptance Criteria
|
|
|
|
- [ ] `SessionReplicationBridge::ship_session(session_id, target)` fetches only events with seqno > current ship HWM; does nothing on empty diff
|
|
- [ ] `receive_session_batch` validates the BLAKE3 checksum; returns `TidalError::CorruptedWal` on mismatch
|
|
- [ ] Duplicate events (same idempotency key or same seqno <= HWM) are silently dropped; applied count reflects only new events
|
|
- [ ] `PayloadKind::SessionBatch` (0x02) is distinct from `PayloadKind::WalSegment` (0x01); transport multiplexes by kind byte
|
|
- [ ] `Transport` trait extended with `send_session_batch` / `recv_session_batch`; `InProcessTransport` implements both new methods
|
|
- [ ] Unit test: ship 10 session events, receive on follower, verify 10 applied; re-ship same events, verify 0 applied (idempotent)
|
|
- [ ] `cargo clippy -D warnings` and `cargo fmt` pass
|