tidaldb/docs/planning/milestone-8/phase-3/task-05-reconciliation-engine.md
jordan f4cfd6c81f feat: complete M8 replication primitives + forage enhancements + docs
Milestone 8 (phases 1-4):
- Shard-aware WAL segment naming, BatchHeader v2, ShardRouter
- Transport trait, InProcessTransport, WalShipper, FollowerDb
- HLC, PNCounter, LWWRegister, CrdtSignalState, ReconciliationEngine
- Session replication bridge with SeqNo/HWM, idempotency store

Forage application:
- Multi-source discovery engine with MAB exploration
- Embedding-based label system, server handlers, UI refresh

Other:
- QUICKSTART.md, README.md, milestone-8 planning docs
- Hard negative union semantics, RLHF export enhancements
- Recovery benchmark and visibility test expansions
- Split 8 oversized source files per CODING_GUIDELINES §9

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 13:17:19 -07:00

5.2 KiB

Task 05: ReconciliationEngine

Delivers

ReconciliationEngine in tidal/src/replication/reconcile.rs. Takes two ReplicationState snapshots (from two shards that experienced a partition), produces a MergePlan (list of signal counter merges + LWW hard-negative resolutions), applies the plan idempotently.

Complexity: L

Dependencies

  • Task 04 (CrdtSignalState)
  • Task 03 (LWWRegister for hard negatives)

Technical Design

// tidal/src/replication/reconcile.rs

/// A plan for merging diverged state from two shards.
///
/// Produced by `ReconciliationEngine::plan()`, applied by `apply()`.
/// The plan is deterministic and idempotent -- applying it twice is safe.
#[derive(Debug, Clone)]
pub struct MergePlan {
    /// Signal counter merges: (entity_id, signal_type_id) -> merged CrdtSignalState
    pub signal_merges: Vec<SignalMergeOp>,
    /// Hard-negative resolutions: (user_id, item_id) -> winning LWW value
    pub hardneg_resolutions: Vec<HardNegResolutionOp>,
}

#[derive(Debug, Clone)]
pub struct SignalMergeOp {
    pub entity_id: EntityId,
    pub signal_type_id: SignalTypeId,
    pub merged_state: CrdtSignalState,
}

#[derive(Debug, Clone)]
pub struct HardNegResolutionOp {
    pub user_id: EntityId,
    pub item_id: EntityId,
    /// The winning hard-negative action after LWW resolution.
    /// `None` means "remove the hard negative" (explicit unhide won).
    pub action: Option<HardNegAction>,
}

/// Produces and applies reconciliation plans for partitioned shards.
pub struct ReconciliationEngine {
    signal_ledger: Arc<SignalLedger>,
    hard_neg_index: Arc<HardNegIndex>,
}

impl ReconciliationEngine {
    pub fn new(
        signal_ledger: Arc<SignalLedger>,
        hard_neg_index: Arc<HardNegIndex>,
    ) -> Self {
        Self { signal_ledger, hard_neg_index }
    }

    /// Produce a merge plan from two diverged state snapshots.
    ///
    /// The plan covers all entities/signals that differ between the two shards.
    /// Entities only on one shard are included unchanged (no data loss).
    pub fn plan(
        &self,
        local_snapshot: &StateSnapshot,
        remote_snapshot: &StateSnapshot,
    ) -> MergePlan {
        let mut signal_merges = Vec::new();
        let mut hardneg_resolutions = Vec::new();

        // Merge signal states: union of both snapshots, CRDT-merged per entity.
        let all_keys: HashSet<_> = local_snapshot.signal_keys()
            .chain(remote_snapshot.signal_keys())
            .collect();

        for key in all_keys {
            let local = local_snapshot.signal_state(key);
            let remote = remote_snapshot.signal_state(key);
            let mut merged = local.cloned().unwrap_or_else(|| CrdtSignalState::new(key.lambda));
            if let Some(r) = remote {
                merged.merge(r);
            }
            signal_merges.push(SignalMergeOp {
                entity_id: key.entity_id,
                signal_type_id: key.signal_type_id,
                merged_state: merged,
            });
        }

        // Resolve hard negatives: LWW by HLC timestamp.
        let all_neg_keys: HashSet<_> = local_snapshot.hardneg_keys()
            .chain(remote_snapshot.hardneg_keys())
            .collect();

        for key in all_neg_keys {
            let local = local_snapshot.hardneg_register(key);
            let remote = remote_snapshot.hardneg_register(key);
            let mut reg = local.cloned().unwrap_or_default();
            if let Some(r) = remote {
                reg.merge(r);
            }
            hardneg_resolutions.push(HardNegResolutionOp {
                user_id: key.user_id,
                item_id: key.item_id,
                action: reg.get().cloned(),
            });
        }

        MergePlan { signal_merges, hardneg_resolutions }
    }

    /// Apply a merge plan to the local state.
    ///
    /// Idempotent: applying the same plan twice produces identical state.
    pub fn apply(&self, plan: &MergePlan) -> crate::Result<()> {
        for op in &plan.signal_merges {
            self.signal_ledger.apply_crdt_state(
                op.entity_id,
                op.signal_type_id,
                &op.merged_state,
            )?;
        }
        for op in &plan.hardneg_resolutions {
            match &op.action {
                Some(action) => {
                    self.hard_neg_index.apply_action(op.user_id, op.item_id, action.clone())?;
                }
                None => {
                    self.hard_neg_index.remove(op.user_id, op.item_id)?;
                }
            }
        }
        Ok(())
    }
}

Acceptance Criteria

  • ReconciliationEngine::plan(local, remote) covers all entities/signals from both snapshots
  • Signal merge: no double-counting (property test: sum of events from both sides == merged value)
  • Hard-negative merge: LWW with HLC timestamp; hides never leak during merge (test: concurrent hide + unhide resolves to hide when hide has higher HLC)
  • MergePlan is serializable (for audit logging)
  • apply(plan) is idempotent: applying the same plan twice produces identical state
  • tidalctl reconcile --since <ts> tool uses this engine (wired in Phase 8.6 UAT; stub here)
  • cargo clippy -D warnings and cargo fmt pass