Milestone 8 (phases 1-4): - Shard-aware WAL segment naming, BatchHeader v2, ShardRouter - Transport trait, InProcessTransport, WalShipper, FollowerDb - HLC, PNCounter, LWWRegister, CrdtSignalState, ReconciliationEngine - Session replication bridge with SeqNo/HWM, idempotency store Forage application: - Multi-source discovery engine with MAB exploration - Embedding-based label system, server handlers, UI refresh Other: - QUICKSTART.md, README.md, milestone-8 planning docs - Hard negative union semantics, RLHF export enhancements - Recovery benchmark and visibility test expansions - Split 8 oversized source files per CODING_GUIDELINES §9 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
4.6 KiB
4.6 KiB
Task 01: SessionSeqNo + WAL Format Extension
Delivers
SessionSeqNo(u64) type added to tidal/src/wal/format/session.rs and tidal/src/session/state.rs. Every session write operation carries a monotonically incrementing sequence number. The receiver's high-water-mark (HWM) rejects writes with seqno <= hwm as idempotent no-ops.
Complexity: S
Dependencies
- Phase 8.2 (WAL shipping, SegmentReceiver)
Technical Design
// tidal/src/wal/format/session.rs
/// Monotonic sequence number for session writes.
///
/// Incremented once per session write operation (preference signal,
/// annotation, search query, interaction). Used by the receiver to
/// enforce idempotent replay and exactly-once semantics.
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash,
serde::Serialize, serde::Deserialize,
)]
pub struct SessionSeqNo(pub u64);
impl SessionSeqNo {
pub const ZERO: Self = Self(0);
pub fn next(self) -> Self {
Self(self.0 + 1)
}
}
impl std::fmt::Display for SessionSeqNo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ssn:{}", self.0)
}
}
/// Extended session WAL event -- backward-compatible with existing format.
///
/// The `session_seqno` and `idempotency_key` fields are appended to the
/// existing `SessionWalEvent` bytes. Old readers that don't understand
/// the extension fields still decode the core event; they will silently
/// ignore the extra bytes (length-prefixed framing ensures this).
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct SessionWalEvent {
// --- existing fields (unchanged) ---
pub session_id: SessionId,
pub kind: SessionEventKind,
pub timestamp_ns: u64,
pub payload: SessionEventPayload,
// --- new fields (m8p4 extension) ---
/// Monotonically increasing sequence number for this session's writes.
/// Starts at 1 for the first write in a session.
#[serde(default)]
pub session_seqno: Option<SessionSeqNo>,
/// BLAKE3-derived idempotency key for exactly-once delivery.
/// `None` for events written before m8p4.
#[serde(default)]
pub idempotency_key: Option<u128>,
}
// tidal/src/session/state.rs (additions only)
/// Per-session monotonic write counter.
///
/// Tracks the highest seqno applied locally. Writes with seqno <= hwm
/// are silently dropped (idempotent replay is safe; the state is already
/// reflected in local storage).
#[derive(Debug, Default)]
pub struct SessionSeqNoTracker {
/// Map from SessionId to highest applied SessionSeqNo.
hwm: DashMap<SessionId, SessionSeqNo>,
}
impl SessionSeqNoTracker {
pub fn new() -> Self {
Self { hwm: DashMap::new() }
}
/// Returns `true` if this write should be applied (seqno > hwm).
/// Returns `false` if the write is a duplicate and should be skipped.
/// Updates the HWM on accept.
pub fn should_apply(&self, session_id: SessionId, seqno: SessionSeqNo) -> bool {
let mut entry = self.hwm.entry(session_id).or_insert(SessionSeqNo::ZERO);
if seqno > *entry {
*entry = seqno;
true
} else {
false
}
}
/// Current HWM for a session (returns ZERO if unknown).
pub fn hwm(&self, session_id: SessionId) -> SessionSeqNo {
self.hwm.get(&session_id)
.map(|v| *v)
.unwrap_or(SessionSeqNo::ZERO)
}
/// Initialize or reset HWM for a session (used on follower startup).
pub fn set_hwm(&self, session_id: SessionId, seqno: SessionSeqNo) {
self.hwm.insert(session_id, seqno);
}
}
Sequence Number Assignment
// In session write path (tidal/src/session/mod.rs)
impl SessionManager {
fn next_seqno(&self, session_id: SessionId) -> SessionSeqNo {
// Fetch-and-increment per session.
let mut counter = self.seqno_counters
.entry(session_id)
.or_insert(SessionSeqNo::ZERO);
*counter = counter.next();
*counter
}
}
Acceptance Criteria
SessionSeqNoisCopy + Clone + Ord + Hash + Serialize + DeserializeSessionSeqNoTracker::should_apply(id, seqno)returnstruefor the first call with a given seqno,falseon duplicate, andtrueagain for a higher seqno- HWM persists in memory; on follower node restart, WAL replay re-establishes HWM by scanning all
SessionWalEvententries in order SessionWalEventwithsession_seqno: None(pre-m8p4 events) is decoded without error;should_applyreturnstruefor all legacy eventscargo clippy -D warningsandcargo fmtpass