tidaldb/docs/planning/milestone-8/phase-4/task-04-hardneg-monotonicity.md
jordan f4cfd6c81f feat: complete M8 replication primitives + forage enhancements + docs
Milestone 8 (phases 1-4):
- Shard-aware WAL segment naming, BatchHeader v2, ShardRouter
- Transport trait, InProcessTransport, WalShipper, FollowerDb
- HLC, PNCounter, LWWRegister, CrdtSignalState, ReconciliationEngine
- Session replication bridge with SeqNo/HWM, idempotency store

Forage application:
- Multi-source discovery engine with MAB exploration
- Embedding-based label system, server handlers, UI refresh

Other:
- QUICKSTART.md, README.md, milestone-8 planning docs
- Hard negative union semantics, RLHF export enhancements
- Recovery benchmark and visibility test expansions
- Split 8 oversized source files per CODING_GUIDELINES §9

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 13:17:19 -07:00

6.7 KiB

Task 04: Hard-Negative Monotonicity During Convergence

Delivers

Modified HardNegIndex merge behavior in tidal/src/entities/hard_neg.rs to enforce union semantics during convergence: a hide from any shard always wins during replication, even if a remote shard has a later Unhide operation. Explicit unhide operations are only honored once they arrive with an HLC timestamp strictly higher than the hide timestamp (via the existing LWWRegister<HardNegAction>).

Complexity: M

Dependencies

  • Task 03 (SessionReplicationBridge -- brings hard negatives into replication flow)
  • Phase 8.3, Task 03 (LWWRegister)

Technical Design

The Problem

During a network partition:

  • Shard A: user hides item X at HLC(t=100)
  • Shard B: user un-hides item X at HLC(t=50) (old operation, pre-partition)

When the partition heals, shard B's state has Unhide(t=50) and shard A's state has Hide(t=100). The LWW register resolves this correctly: t=100 > t=50, so Hide wins.

But during the convergence window (before B has received A's segment), shard B might serve the un-hidden item X to the user. This is the safety violation we must prevent.

The Solution

Union semantics during convergence: the HardNegIndex accumulates all hide operations from all replicating shards immediately (before full reconciliation). A Remove (explicit unhide) only takes effect after the LWW register has resolved and the hide's HLC is definitively beaten.

// tidal/src/entities/hard_neg.rs

/// Hard negative action stored per (user_id, item_id) pair.
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
pub enum HardNegAction {
    Hide,
    Mute,
    Block,
    Unhide, // explicit removal with HLC timestamp
}

/// Hard negative entry with LWW register for convergence.
#[derive(Debug, Clone)]
pub struct HardNegEntry {
    /// LWW register: tracks the most recent action (by HLC).
    pub register: LWWRegister<HardNegAction>,
    /// Union flag: set to `true` when any shard has contributed a hide/mute/block.
    /// Reset to `false` only when the LWW register definitively resolves to `Unhide`.
    pub union_active: bool,
}

impl HardNegEntry {
    /// Returns `true` if this entry should suppress the item from appearing
    /// in query results.
    ///
    /// During convergence: `union_active` is set; item is suppressed.
    /// After convergence: `union_active` reflects LWW resolution.
    pub fn is_active(&self) -> bool {
        if self.union_active {
            return true;
        }
        // LWW resolution only.
        matches!(self.register.get(), Some(HardNegAction::Hide | HardNegAction::Mute | HardNegAction::Block))
    }

    /// Apply a remote hard-negative action from replication.
    ///
    /// Union rule: any positive hard-negative action (hide/mute/block) sets
    /// `union_active = true`. Only a fully-resolved LWW Unhide clears it.
    pub fn apply_remote(&mut self, action: HardNegAction, ts: HlcTimestamp) {
        match &action {
            HardNegAction::Unhide => {
                // LWW only: if this Unhide beats the current register, try to clear.
                self.register.write(action, ts);
                // Clear union_active only if the register definitively has Unhide.
                if matches!(self.register.get(), Some(HardNegAction::Unhide)) {
                    self.union_active = false;
                }
            }
            _ => {
                // Hide/Mute/Block: set union_active unconditionally.
                self.register.write(action, ts);
                self.union_active = true;
            }
        }
    }
}

/// Index of hard negatives for a shard.
pub struct HardNegIndex {
    /// (user_id, item_id) -> HardNegEntry
    entries: DashMap<(EntityId, EntityId), HardNegEntry>,
}

impl HardNegIndex {
    /// Apply a local hard-negative action (user-initiated, not from replication).
    pub fn apply_action(
        &self,
        user_id: EntityId,
        item_id: EntityId,
        action: HardNegAction,
        ts: HlcTimestamp,
    ) -> Result<()> {
        let mut entry = self.entries
            .entry((user_id, item_id))
            .or_insert_with(|| HardNegEntry {
                register: LWWRegister::empty(),
                union_active: false,
            });
        entry.apply_remote(action, ts);
        Ok(())
    }

    /// Merge a remote HardNegEntry from replication.
    ///
    /// Union semantics: if the remote entry is active, set union_active locally.
    pub fn merge_remote(
        &self,
        user_id: EntityId,
        item_id: EntityId,
        remote: &HardNegEntry,
    ) {
        let mut local = self.entries
            .entry((user_id, item_id))
            .or_insert_with(|| HardNegEntry {
                register: LWWRegister::empty(),
                union_active: false,
            });
        local.register.merge(&remote.register);
        // Union rule: if remote had an active negative, propagate.
        if remote.union_active {
            local.union_active = true;
        }
        // Re-evaluate after merge: if the register definitively says Unhide, clear.
        if matches!(local.register.get(), Some(HardNegAction::Unhide)) && !remote.union_active {
            local.union_active = false;
        }
    }

    /// Check if a (user_id, item_id) pair is hard-negated (should be filtered).
    pub fn is_negated(&self, user_id: EntityId, item_id: EntityId) -> bool {
        self.entries
            .get(&(user_id, item_id))
            .map(|e| e.is_active())
            .unwrap_or(false)
    }

    /// Remove a hard negative (explicit unhide with the given HLC timestamp).
    ///
    /// Only removes if the given ts beats the current register.
    pub fn remove(&self, user_id: EntityId, item_id: EntityId, ts: HlcTimestamp) -> Result<()> {
        if let Some(mut entry) = self.entries.get_mut(&(user_id, item_id)) {
            entry.apply_remote(HardNegAction::Unhide, ts);
        }
        Ok(())
    }
}

Acceptance Criteria

  • HardNegEntry::is_active() returns true when union_active = true, regardless of the LWW register state
  • apply_remote(Hide, t=100) followed by apply_remote(Unhide, t=50) leaves union_active = true (hide wins, Unhide loses LWW)
  • apply_remote(Hide, t=50) followed by apply_remote(Unhide, t=100) clears union_active = false (Unhide wins LWW)
  • merge_remote with an active remote entry always sets local union_active = true
  • Property test: concurrent hide on shard A + unhide on shard B with lower HLC → after merge, item is negated on both shards
  • is_negated() is called during RETRIEVE/SEARCH result post-filtering (verified by existing HardNeg integration test with updated merge logic)
  • cargo clippy -D warnings and cargo fmt pass