tidaldb/docs/planning/milestone-8/phase-1/task-06-replication-state.md
jordan f4cfd6c81f feat: complete M8 replication primitives + forage enhancements + docs
Milestone 8 (phases 1-4):
- Shard-aware WAL segment naming, BatchHeader v2, ShardRouter
- Transport trait, InProcessTransport, WalShipper, FollowerDb
- HLC, PNCounter, LWWRegister, CrdtSignalState, ReconciliationEngine
- Session replication bridge with SeqNo/HWM, idempotency store

Forage application:
- Multi-source discovery engine with MAB exploration
- Embedding-based label system, server handlers, UI refresh

Other:
- QUICKSTART.md, README.md, milestone-8 planning docs
- Hard negative union semantics, RLHF export enhancements
- Recovery benchmark and visibility test expansions
- Split 8 oversized source files per CODING_GUIDELINES §9

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 13:17:19 -07:00

126 lines
4.3 KiB
Markdown

# Task 06: ReplicationState
## Delivers
`ReplicationState` in `tidal/src/replication/state.rs` tracking per-shard high-water-mark seqno with `AtomicU64` for lock-free reads. Serialize/deserialize for checkpoint persistence. Used by followers to track which segments have been applied.
## Complexity: S
## Dependencies
- Task 05 (NodeConfig -- establishes the set of known shards)
## Technical Design
```rust
// tidal/src/replication/state.rs
use crate::replication::shard::ShardId;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
/// Tracks the per-shard replication high-water-mark.
///
/// Each shard tracks the last WAL segment seqno that has been fully
/// applied to the local state machine. Segments with seqno <=
/// high_water_mark are idempotent no-ops on replay.
///
/// Thread-safe: all fields are atomic. Clone is O(n_shards) -- clones
/// the snapshot, not the atomics.
#[derive(Debug)]
pub struct ReplicationState {
/// Per-shard high-water-mark seqno.
/// `AtomicU64::MAX` means "no segments applied yet" (initial state).
applied: HashMap<ShardId, Arc<AtomicU64>>,
}
impl ReplicationState {
/// Create a new `ReplicationState` tracking the given shards.
///
/// All high-water-marks start at 0 (no segments applied).
pub fn new(shards: &[ShardId]) -> Self {
let applied = shards
.iter()
.map(|&s| (s, Arc::new(AtomicU64::new(0))))
.collect();
Self { applied }
}
/// Create a single-node `ReplicationState` (tracks only `ShardId(0)`).
pub fn single() -> Self {
Self::new(&[ShardId::SINGLE])
}
/// Get the high-water-mark seqno for a shard.
///
/// Returns `None` if the shard is unknown to this state.
pub fn applied_seqno(&self, shard_id: ShardId) -> Option<u64> {
self.applied.get(&shard_id).map(|a| a.load(Ordering::Acquire))
}
/// Update the high-water-mark for a shard.
///
/// Only advances forward -- a seqno smaller than the current
/// high-water-mark is silently ignored.
pub fn advance(&self, shard_id: ShardId, seqno: u64) {
if let Some(atomic) = self.applied.get(&shard_id) {
let mut current = atomic.load(Ordering::Acquire);
loop {
if seqno <= current {
break; // already at or past this seqno
}
match atomic.compare_exchange_weak(
current,
seqno,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => break,
Err(actual) => current = actual,
}
}
}
}
/// Returns all tracked shards and their current seqnos.
pub fn snapshot(&self) -> HashMap<ShardId, u64> {
self.applied
.iter()
.map(|(&s, a)| (s, a.load(Ordering::Acquire)))
.collect()
}
/// Serialize for checkpoint persistence.
pub fn to_checkpoint_bytes(&self) -> Vec<u8> {
let snap = self.snapshot();
serde_json::to_vec(&snap).expect("ReplicationState serialization is infallible")
}
/// Restore from checkpoint bytes.
pub fn from_checkpoint_bytes(bytes: &[u8], shards: &[ShardId]) -> Self {
let snap: HashMap<ShardId, u64> = serde_json::from_slice(bytes)
.unwrap_or_default();
let applied = shards
.iter()
.map(|&s| {
let seqno = snap.get(&s).copied().unwrap_or(0);
(s, Arc::new(AtomicU64::new(seqno)))
})
.collect();
Self { applied }
}
}
```
## Acceptance Criteria
- [ ] `ReplicationState::single()` tracks only `ShardId(0)`; initial seqno = 0
- [ ] `ReplicationState::advance(shard, seqno)` atomically advances the high-water-mark; never decreases
- [ ] `ReplicationState::applied_seqno(shard)` returns `None` for unknown shards
- [ ] `advance` is safe to call from multiple threads concurrently (CAS loop)
- [ ] `to_checkpoint_bytes` + `from_checkpoint_bytes` roundtrip preserves all shard seqnos
- [ ] `ReplicationState` is `Send + Sync`
- [ ] Unit tests: advance monotonicity, concurrent advance from 4 threads, checkpoint roundtrip
- [ ] `cargo clippy -D warnings` and `cargo fmt` pass