tidaldb/docs/planning/milestone-8/phase-3/task-01-hlc.md
jordan f4cfd6c81f feat: complete M8 replication primitives + forage enhancements + docs
Milestone 8 (phases 1-4):
- Shard-aware WAL segment naming, BatchHeader v2, ShardRouter
- Transport trait, InProcessTransport, WalShipper, FollowerDb
- HLC, PNCounter, LWWRegister, CrdtSignalState, ReconciliationEngine
- Session replication bridge with SeqNo/HWM, idempotency store

Forage application:
- Multi-source discovery engine with MAB exploration
- Embedding-based label system, server handlers, UI refresh

Other:
- QUICKSTART.md, README.md, milestone-8 planning docs
- Hard negative union semantics, RLHF export enhancements
- Recovery benchmark and visibility test expansions
- Split 8 oversized source files per CODING_GUIDELINES §9

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 13:17:19 -07:00

127 lines
4.7 KiB
Markdown

# Task 01: Hybrid Logical Clock (HLC)
## Delivers
`HLC` (Hybrid Logical Clock) in `tidal/src/replication/crdt/hlc.rs`. Provides `now()`, `update(remote)`, monotonic guarantee, and `PartialOrd`/`Ord` by `(wall_ns, logical, node_id)`. Used by `LWWRegister` for causal ordering of concurrent writes across nodes.
## Complexity: S
## Dependencies
- Phase 8.1 (ShardId used as node_id)
## Technical Design
HLC (Kulkarni et al., 2014) combines a wall clock with a logical counter:
- On `send`: `pt = max(wall, clock.wall); l = if pt == clock.wall { clock.logical + 1 } else { 0 }; clock = (pt, l)`
- On `receive(msg_hlc)`: `pt = max(wall, msg_hlc.wall, clock.wall); l = if pt == clock.wall && pt == msg_hlc.wall { max(clock.logical, msg_hlc.logical) + 1 } else if pt == clock.wall { clock.logical + 1 } else if pt == msg_hlc.wall { msg_hlc.logical + 1 } else { 0 }; clock = (pt, l)`
```rust
// tidal/src/replication/crdt/hlc.rs
/// Hybrid Logical Clock timestamp.
///
/// Combines wall-clock time (ns) with a logical counter to provide
/// causal ordering even with clock skew between nodes.
///
/// Ordering: (wall_ns, logical, node_id) -- lexicographic.
/// This means: same-wall-time events are ordered by logical counter;
/// ties within one node (impossible) are broken by node_id.
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct HlcTimestamp {
pub wall_ns: u64,
pub logical: u32,
pub node_id: u16, // ShardId::0 for single-node
}
impl PartialOrd for HlcTimestamp {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for HlcTimestamp {
fn cmp(&self, other: &Self) -> Ordering {
self.wall_ns.cmp(&other.wall_ns)
.then(self.logical.cmp(&other.logical))
.then(self.node_id.cmp(&other.node_id))
}
}
/// A per-node HLC clock.
pub struct Hlc {
node_id: u16,
wall_ns: AtomicU64,
logical: AtomicU32,
}
impl Hlc {
pub fn new(node_id: u16) -> Self {
Self {
node_id,
wall_ns: AtomicU64::new(0),
logical: AtomicU32::new(0),
}
}
fn wall_now() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64
}
/// Generate a new HLC timestamp for a local event.
pub fn now(&self) -> HlcTimestamp {
let wall = Self::wall_now();
// Atomic CAS loop to advance monotonically
loop {
let cur_wall = self.wall_ns.load(Ordering::Acquire);
let cur_logical = self.logical.load(Ordering::Acquire);
let (new_wall, new_logical) = if wall > cur_wall {
(wall, 0u32)
} else {
(cur_wall, cur_logical + 1)
};
if self.wall_ns.compare_exchange(cur_wall, new_wall, Ordering::AcqRel, Ordering::Acquire).is_ok() {
self.logical.store(new_logical, Ordering::Release);
return HlcTimestamp { wall_ns: new_wall, logical: new_logical, node_id: self.node_id };
}
}
}
/// Update the clock on receiving a remote HLC timestamp.
pub fn update(&self, remote: HlcTimestamp) -> HlcTimestamp {
let wall = Self::wall_now();
let pt = wall.max(remote.wall_ns);
loop {
let cur_wall = self.wall_ns.load(Ordering::Acquire);
let cur_logical = self.logical.load(Ordering::Acquire);
let pt = pt.max(cur_wall);
let new_logical = if pt == cur_wall && pt == remote.wall_ns {
cur_logical.max(remote.logical) + 1
} else if pt == cur_wall {
cur_logical + 1
} else if pt == remote.wall_ns {
remote.logical + 1
} else {
0
};
if self.wall_ns.compare_exchange(cur_wall, pt, Ordering::AcqRel, Ordering::Acquire).is_ok() {
self.logical.store(new_logical, Ordering::Release);
return HlcTimestamp { wall_ns: pt, logical: new_logical, node_id: self.node_id };
}
}
}
}
```
## Acceptance Criteria
- [ ] `HlcTimestamp` ordering is `(wall_ns, logical, node_id)` lexicographic
- [ ] `Hlc::now()` returns monotonically increasing timestamps within a single node (property test: 10K calls in sequence never decrease)
- [ ] `Hlc::update(remote)` advances the clock if `remote.wall_ns` > current wall
- [ ] `Hlc` is thread-safe (`Send + Sync`); concurrent `now()` calls from 4 threads produce unique timestamps
- [ ] `HlcTimestamp` derives `Serialize, Deserialize`, `Copy`, `Clone`, `PartialEq`, `Eq`
- [ ] `cargo clippy -D warnings` and `cargo fmt` pass