tidaldb/docs/planning/milestone-8/phase-3/task-02-pn-counter.md
jordan f4cfd6c81f feat: complete M8 replication primitives + forage enhancements + docs
Milestone 8 (phases 1-4):
- Shard-aware WAL segment naming, BatchHeader v2, ShardRouter
- Transport trait, InProcessTransport, WalShipper, FollowerDb
- HLC, PNCounter, LWWRegister, CrdtSignalState, ReconciliationEngine
- Session replication bridge with SeqNo/HWM, idempotency store

Forage application:
- Multi-source discovery engine with MAB exploration
- Embedding-based label system, server handlers, UI refresh

Other:
- QUICKSTART.md, README.md, milestone-8 planning docs
- Hard negative union semantics, RLHF export enhancements
- Recovery benchmark and visibility test expansions
- Split 8 oversized source files per CODING_GUIDELINES §9

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 13:17:19 -07:00

91 lines
3.2 KiB
Markdown

# Task 02: PNCounter
## Delivers
`PNCounter` in `tidal/src/replication/crdt/pn_counter.rs`. Per-node P and N vectors (backed by `HashMap<ShardId, u64>`). Supports `increment`, `decrement`, `merge`, `value`. Property tests verify commutativity, monotonicity, and associativity (CMA) across 100K random operations over 5 nodes.
## Complexity: M
## Dependencies
- Phase 8.1 (ShardId)
## Technical Design
```rust
// tidal/src/replication/crdt/pn_counter.rs
/// Positive-Negative Counter CRDT.
///
/// Each node (ShardId) maintains its own P (increment) and N (decrement)
/// totals. The global value = sum(P) - sum(N). Merge takes the per-node
/// max of each component -- safe because values only ever increase within
/// a node.
///
/// Properties:
/// - Commutative: merge(A, B) == merge(B, A)
/// - Associative: merge(A, merge(B, C)) == merge(merge(A, B), C)
/// - Idempotent: merge(A, A) == A
#[derive(Debug, Clone, Default, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct PNCounter {
positive: HashMap<ShardId, u64>,
negative: HashMap<ShardId, u64>,
}
impl PNCounter {
pub fn new() -> Self {
Self::default()
}
/// Increment by `amount` for this node.
pub fn increment(&mut self, node: ShardId, amount: u64) {
*self.positive.entry(node).or_default() += amount;
}
/// Decrement by `amount` for this node.
pub fn decrement(&mut self, node: ShardId, amount: u64) {
*self.negative.entry(node).or_default() += amount;
}
/// Merge another counter into this one.
///
/// Takes the per-node maximum of both P and N components.
/// Safe because each node's contribution only grows.
pub fn merge(&mut self, other: &PNCounter) {
for (&node, &val) in &other.positive {
let entry = self.positive.entry(node).or_default();
*entry = (*entry).max(val);
}
for (&node, &val) in &other.negative {
let entry = self.negative.entry(node).or_default();
*entry = (*entry).max(val);
}
}
/// Returns the current value: sum(P) - sum(N).
///
/// Saturates at 0 (never negative).
pub fn value(&self) -> u64 {
let p: u64 = self.positive.values().sum();
let n: u64 = self.negative.values().sum();
p.saturating_sub(n)
}
/// Total positive contributions across all nodes.
pub fn total_positive(&self) -> u64 {
self.positive.values().sum()
}
}
```
## Acceptance Criteria
- [ ] `PNCounter::increment(node, amount)` increases the P component for `node`
- [ ] `PNCounter::decrement(node, amount)` increases the N component for `node`
- [ ] `PNCounter::value()` returns `sum(P) - sum(N)`, saturating at 0
- [ ] `PNCounter::merge` is commutative: `merge(A, B) == merge(B, A)` (property test: 100K random sequences, 5 nodes)
- [ ] `PNCounter::merge` is associative: `merge(A, merge(B, C)) == merge(merge(A, B), C)` (property test)
- [ ] `PNCounter::merge` is idempotent: `merge(A, A) == A` (property test)
- [ ] No double-counting: after merging two counters that each received N independent increments (no overlap), `value() == N * 2` (property test)
- [ ] `cargo clippy -D warnings` and `cargo fmt` pass