Milestone 8 (phases 1-4): - Shard-aware WAL segment naming, BatchHeader v2, ShardRouter - Transport trait, InProcessTransport, WalShipper, FollowerDb - HLC, PNCounter, LWWRegister, CrdtSignalState, ReconciliationEngine - Session replication bridge with SeqNo/HWM, idempotency store Forage application: - Multi-source discovery engine with MAB exploration - Embedding-based label system, server handlers, UI refresh Other: - QUICKSTART.md, README.md, milestone-8 planning docs - Hard negative union semantics, RLHF export enhancements - Recovery benchmark and visibility test expansions - Split 8 oversized source files per CODING_GUIDELINES §9 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
126 lines
4.3 KiB
Markdown
126 lines
4.3 KiB
Markdown
# Task 06: ReplicationState
|
|
|
|
## Delivers
|
|
|
|
`ReplicationState` in `tidal/src/replication/state.rs` tracking per-shard high-water-mark seqno with `AtomicU64` for lock-free reads. Serialize/deserialize for checkpoint persistence. Used by followers to track which segments have been applied.
|
|
|
|
## Complexity: S
|
|
|
|
## Dependencies
|
|
|
|
- Task 05 (NodeConfig -- establishes the set of known shards)
|
|
|
|
## Technical Design
|
|
|
|
```rust
|
|
// tidal/src/replication/state.rs
|
|
|
|
use crate::replication::shard::ShardId;
|
|
use std::collections::HashMap;
|
|
use std::sync::atomic::{AtomicU64, Ordering};
|
|
use std::sync::Arc;
|
|
|
|
/// Tracks the per-shard replication high-water-mark.
|
|
///
|
|
/// Each shard tracks the last WAL segment seqno that has been fully
|
|
/// applied to the local state machine. Segments with seqno <=
|
|
/// high_water_mark are idempotent no-ops on replay.
|
|
///
|
|
/// Thread-safe: all fields are atomic. Clone is O(n_shards) -- clones
|
|
/// the snapshot, not the atomics.
|
|
#[derive(Debug)]
|
|
pub struct ReplicationState {
|
|
/// Per-shard high-water-mark seqno.
|
|
/// `AtomicU64::MAX` means "no segments applied yet" (initial state).
|
|
applied: HashMap<ShardId, Arc<AtomicU64>>,
|
|
}
|
|
|
|
impl ReplicationState {
|
|
/// Create a new `ReplicationState` tracking the given shards.
|
|
///
|
|
/// All high-water-marks start at 0 (no segments applied).
|
|
pub fn new(shards: &[ShardId]) -> Self {
|
|
let applied = shards
|
|
.iter()
|
|
.map(|&s| (s, Arc::new(AtomicU64::new(0))))
|
|
.collect();
|
|
Self { applied }
|
|
}
|
|
|
|
/// Create a single-node `ReplicationState` (tracks only `ShardId(0)`).
|
|
pub fn single() -> Self {
|
|
Self::new(&[ShardId::SINGLE])
|
|
}
|
|
|
|
/// Get the high-water-mark seqno for a shard.
|
|
///
|
|
/// Returns `None` if the shard is unknown to this state.
|
|
pub fn applied_seqno(&self, shard_id: ShardId) -> Option<u64> {
|
|
self.applied.get(&shard_id).map(|a| a.load(Ordering::Acquire))
|
|
}
|
|
|
|
/// Update the high-water-mark for a shard.
|
|
///
|
|
/// Only advances forward -- a seqno smaller than the current
|
|
/// high-water-mark is silently ignored.
|
|
pub fn advance(&self, shard_id: ShardId, seqno: u64) {
|
|
if let Some(atomic) = self.applied.get(&shard_id) {
|
|
let mut current = atomic.load(Ordering::Acquire);
|
|
loop {
|
|
if seqno <= current {
|
|
break; // already at or past this seqno
|
|
}
|
|
match atomic.compare_exchange_weak(
|
|
current,
|
|
seqno,
|
|
Ordering::AcqRel,
|
|
Ordering::Acquire,
|
|
) {
|
|
Ok(_) => break,
|
|
Err(actual) => current = actual,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Returns all tracked shards and their current seqnos.
|
|
pub fn snapshot(&self) -> HashMap<ShardId, u64> {
|
|
self.applied
|
|
.iter()
|
|
.map(|(&s, a)| (s, a.load(Ordering::Acquire)))
|
|
.collect()
|
|
}
|
|
|
|
/// Serialize for checkpoint persistence.
|
|
pub fn to_checkpoint_bytes(&self) -> Vec<u8> {
|
|
let snap = self.snapshot();
|
|
serde_json::to_vec(&snap).expect("ReplicationState serialization is infallible")
|
|
}
|
|
|
|
/// Restore from checkpoint bytes.
|
|
pub fn from_checkpoint_bytes(bytes: &[u8], shards: &[ShardId]) -> Self {
|
|
let snap: HashMap<ShardId, u64> = serde_json::from_slice(bytes)
|
|
.unwrap_or_default();
|
|
let applied = shards
|
|
.iter()
|
|
.map(|&s| {
|
|
let seqno = snap.get(&s).copied().unwrap_or(0);
|
|
(s, Arc::new(AtomicU64::new(seqno)))
|
|
})
|
|
.collect();
|
|
Self { applied }
|
|
}
|
|
}
|
|
```
|
|
|
|
## Acceptance Criteria
|
|
|
|
- [ ] `ReplicationState::single()` tracks only `ShardId(0)`; initial seqno = 0
|
|
- [ ] `ReplicationState::advance(shard, seqno)` atomically advances the high-water-mark; never decreases
|
|
- [ ] `ReplicationState::applied_seqno(shard)` returns `None` for unknown shards
|
|
- [ ] `advance` is safe to call from multiple threads concurrently (CAS loop)
|
|
- [ ] `to_checkpoint_bytes` + `from_checkpoint_bytes` roundtrip preserves all shard seqnos
|
|
- [ ] `ReplicationState` is `Send + Sync`
|
|
- [ ] Unit tests: advance monotonicity, concurrent advance from 4 threads, checkpoint roundtrip
|
|
- [ ] `cargo clippy -D warnings` and `cargo fmt` pass
|