Milestone 8 (phases 1-4): - Shard-aware WAL segment naming, BatchHeader v2, ShardRouter - Transport trait, InProcessTransport, WalShipper, FollowerDb - HLC, PNCounter, LWWRegister, CrdtSignalState, ReconciliationEngine - Session replication bridge with SeqNo/HWM, idempotency store Forage application: - Multi-source discovery engine with MAB exploration - Embedding-based label system, server handlers, UI refresh Other: - QUICKSTART.md, README.md, milestone-8 planning docs - Hard negative union semantics, RLHF export enhancements - Recovery benchmark and visibility test expansions - Split 8 oversized source files per CODING_GUIDELINES §9 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
5.2 KiB
5.2 KiB
Task 02: IdempotencyKey + IdempotencyStore
Delivers
IdempotencyKey(u128) BLAKE3-derived key per session operation, and IdempotencyStore (bounded LRU, 100K capacity) in tidal/src/replication/idempotency.rs. Duplicate session writes arriving via replication are detected in O(1) time and silently discarded.
Complexity: S
Dependencies
- Task 01 (SessionSeqNo)
Technical Design
// tidal/src/replication/idempotency.rs
use blake3::Hasher;
/// Per-operation idempotency key derived from session context.
///
/// Derived as: BLAKE3(session_id_bytes || seqno_bytes || operation_bytes)
///
/// Using u128 (128 bits) gives 2^64 expected collisions at 2^64 operations,
/// which is astronomically unlikely in practice. Cheaper than storing the
/// full BLAKE3 hash (32 bytes) with no practical security difference for
/// our use case.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub struct IdempotencyKey(pub u128);
impl IdempotencyKey {
/// Derive an idempotency key for a session operation.
///
/// - `session_id`: the session this operation belongs to
/// - `seqno`: monotonic sequence number (see `SessionSeqNo`)
/// - `operation_bytes`: serialized operation payload (canonically ordered)
pub fn derive(
session_id: SessionId,
seqno: SessionSeqNo,
operation_bytes: &[u8],
) -> Self {
let mut hasher = Hasher::new();
hasher.update(&session_id.as_bytes());
hasher.update(&seqno.0.to_le_bytes());
hasher.update(operation_bytes);
let hash = hasher.finalize();
// Take first 16 bytes as u128 (little-endian).
let bytes: [u8; 16] = hash.as_bytes()[..16].try_into().unwrap();
Self(u128::from_le_bytes(bytes))
}
}
/// Bounded LRU store for idempotency keys.
///
/// Capacity: 100K entries ≈ 1.6 MB (u128 key + u8 metadata).
/// When capacity is reached, the least-recently-seen key is evicted.
/// This means idempotency is guaranteed for the last 100K distinct operations.
///
/// Older operations fall back to the SessionSeqNo HWM check, which is
/// unbounded and always monotonic (a write with seqno <= hwm is never re-applied).
///
/// Thread-safe: uses a `Mutex<LruCache>`.
pub struct IdempotencyStore {
cache: Mutex<LruCache<IdempotencyKey, ()>>,
capacity: usize,
}
impl IdempotencyStore {
/// Create a new store with the given capacity.
pub fn new(capacity: usize) -> Self {
Self {
cache: Mutex::new(LruCache::new(
NonZeroUsize::new(capacity).expect("capacity must be > 0"),
)),
capacity,
}
}
/// Create a store with the default capacity (100K).
pub fn default_capacity() -> Self {
Self::new(100_000)
}
/// Check if a key has been seen before and record it if not.
///
/// Returns `true` if the key is new (should apply the operation).
/// Returns `false` if the key was already seen (duplicate; skip).
pub fn check_and_record(&self, key: IdempotencyKey) -> bool {
let mut cache = self.cache.lock().unwrap();
if cache.contains(&key) {
false
} else {
cache.put(key, ());
true
}
}
/// Current number of tracked keys.
pub fn len(&self) -> usize {
self.cache.lock().unwrap().len()
}
/// Returns the configured capacity.
pub fn capacity(&self) -> usize {
self.capacity
}
}
Integration in SegmentReceiver
// In tidal/src/replication/receive.rs (additions)
impl SegmentReceiver {
fn apply_session_event(
&self,
event: &SessionWalEvent,
idempotency_store: &IdempotencyStore,
seqno_tracker: &SessionSeqNoTracker,
) -> Result<()> {
// Layer 1: SeqNo HWM check (fast, unbounded).
if let Some(seqno) = event.session_seqno {
if !seqno_tracker.should_apply(event.session_id, seqno) {
return Ok(()); // duplicate — skip
}
}
// Layer 2: Idempotency key check (bounded LRU, catches within-window dupes).
if let Some(key_int) = event.idempotency_key {
let key = IdempotencyKey(key_int);
if !idempotency_store.check_and_record(key) {
return Ok(()); // duplicate — skip
}
}
// Apply the event.
self.session_manager.apply_wal_event(event)
}
}
Acceptance Criteria
IdempotencyKey::derive(session_id, seqno, bytes)produces a deterministicu128for the same inputs- Different inputs produce different keys with overwhelming probability (no test for this -- mathematical guarantee from BLAKE3)
IdempotencyStore::check_and_record(key)returnstrueon first call,falseon any subsequent call with the same key- LRU eviction: when store exceeds
capacitydistinct keys, oldest entries are evicted; evicted keys returntrueon re-insert (they look new again; fallback to SeqNo HWM handles correctness) IdempotencyStore::len()returns 0 after initialization and grows up tocapacity- Memory bound: 100K-entry store consumes < 10 MB (verify with
std::mem::size_of) cargo clippy -D warningsandcargo fmtpass