tidaldb/docs/planning/milestone-8/phase-6/task-02-fault-injection.md
jordan f4cfd6c81f feat: complete M8 replication primitives + forage enhancements + docs
Milestone 8 (phases 1-4):
- Shard-aware WAL segment naming, BatchHeader v2, ShardRouter
- Transport trait, InProcessTransport, WalShipper, FollowerDb
- HLC, PNCounter, LWWRegister, CrdtSignalState, ReconciliationEngine
- Session replication bridge with SeqNo/HWM, idempotency store

Forage application:
- Multi-source discovery engine with MAB exploration
- Embedding-based label system, server handlers, UI refresh

Other:
- QUICKSTART.md, README.md, milestone-8 planning docs
- Hard negative union semantics, RLHF export enhancements
- Recovery benchmark and visibility test expansions
- Split 8 oversized source files per CODING_GUIDELINES §9

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 13:17:19 -07:00

6.4 KiB

Task 02: Fault Injection

Delivers

NetworkPartition and ShardCrash in tidal/src/testing/faults.rs. NetworkPartition intercepts Transport::send_segment calls and drops them for specified region pairs. ShardCrash drops a shard's primary and triggers follower promotion. Both are RAII handles — faults are active while the handle is alive, automatically healed/cleaned up on drop.

Complexity: M

Dependencies

  • Task 01 (SimulatedCluster)
  • Phase 8.2, Task 01 (Transport trait)
  • Phase 8.2, Task 05 (FollowerDb, NodeRole)

Technical Design

// tidal/src/testing/faults.rs
// Only compiled with #[cfg(test)] or --features test-utils

/// RAII handle for a network partition between two regions.
///
/// While this handle is alive, all `Transport::send_segment` and
/// `Transport::send_session_batch` calls from `from` to `to` (and
/// optionally `to` to `from` for symmetric partitions) are dropped
/// without delivery.
///
/// When the handle is dropped, the partition is automatically healed.
pub struct NetworkPartition {
    from: RegionId,
    to: RegionId,
    symmetric: bool,
    transport_factory: Arc<InProcessTransportFactory>,
}

impl NetworkPartition {
    /// Create a one-way partition: `from` cannot reach `to`.
    pub fn one_way(
        from: RegionId,
        to: RegionId,
        factory: Arc<InProcessTransportFactory>,
    ) -> Self {
        factory.block_route(from, to);
        Self { from, to, symmetric: false, transport_factory: factory }
    }

    /// Create a symmetric partition: neither side can reach the other.
    pub fn symmetric(
        region_a: RegionId,
        region_b: RegionId,
        factory: Arc<InProcessTransportFactory>,
    ) -> Self {
        factory.block_route(region_a, region_b);
        factory.block_route(region_b, region_a);
        Self { from: region_a, to: region_b, symmetric: true, transport_factory: factory }
    }

    /// Check how many segments have been dropped since partition was injected.
    pub fn dropped_segments(&self) -> u64 {
        self.transport_factory.dropped_count(self.from, self.to)
    }
}

impl Drop for NetworkPartition {
    fn drop(&mut self) {
        self.transport_factory.unblock_route(self.from, self.to);
        if self.symmetric {
            self.transport_factory.unblock_route(self.to, self.from);
        }
    }
}

/// RAII handle for a simulated shard crash.
///
/// Crashes the primary of the given shard. The primary is taken offline
/// (stops processing WAL writes, stops shipping to followers). The most
/// advanced follower is promoted to leader automatically.
///
/// When the handle is dropped, the "crashed" shard can be optionally
/// restored (simulating a node restart) or left offline.
pub struct ShardCrash {
    crashed_shard: ShardId,
    original_leader_seqno: u64,
    cluster: Arc<SimulatedCluster>,
    auto_rejoin: bool,
}

impl ShardCrash {
    /// Crash the primary of `shard_id`.
    ///
    /// `auto_rejoin`: if true, the shard restarts and rejoins on drop.
    pub async fn crash(
        shard_id: ShardId,
        cluster: Arc<SimulatedCluster>,
        auto_rejoin: bool,
    ) -> Self {
        // Record the shard's current seqno before crash.
        let original_seqno = cluster.applied_seqno_for(shard_id);

        // Take the shard offline: stop WAL shipping, stop write processing.
        cluster.take_shard_offline(shard_id).await;

        // Promote the most advanced follower (if any).
        cluster.promote_best_follower(shard_id).await;

        Self {
            crashed_shard: shard_id,
            original_leader_seqno: original_seqno,
            cluster,
            auto_rejoin,
        }
    }

    /// How many events the crashed shard had applied at crash time.
    pub fn pre_crash_seqno(&self) -> u64 {
        self.original_leader_seqno
    }

    /// Manually rejoin the crashed shard (ship missed WAL, re-enable as follower).
    pub async fn rejoin(&self) {
        self.cluster.rejoin_shard(self.crashed_shard).await;
    }
}

impl Drop for ShardCrash {
    fn drop(&mut self) {
        if self.auto_rejoin {
            // Best effort async rejoin on drop (may race with test teardown).
            let cluster = self.cluster.clone();
            let shard = self.crashed_shard;
            tokio::spawn(async move {
                cluster.rejoin_shard(shard).await;
            });
        }
    }
}

/// Extension to InProcessTransportFactory for fault injection.
impl InProcessTransportFactory {
    /// Block all traffic from `from` to `to`.
    pub fn block_route(&self, from: RegionId, to: RegionId) {
        self.blocked_routes.write().unwrap().insert((from, to));
    }

    /// Unblock traffic from `from` to `to`.
    pub fn unblock_route(&self, from: RegionId, to: RegionId) {
        self.blocked_routes.write().unwrap().remove(&(from, to));
    }

    /// Heal all partitions.
    pub fn heal_all(&self) {
        self.blocked_routes.write().unwrap().clear();
    }

    /// Count of segments dropped on a specific route since the factory was created.
    pub fn dropped_count(&self, from: RegionId, to: RegionId) -> u64 {
        self.drop_counters
            .get(&(from, to))
            .map(|c| c.load(Ordering::Relaxed))
            .unwrap_or(0)
    }

    /// Replay the last session batch that was dropped to `to` region.
    /// Used by idempotency tests to simulate duplicate delivery.
    pub async fn replay_last_session_batch(&self, to: RegionId) {
        if let Some(batch) = self.last_session_batch.lock().unwrap().get(&to).cloned() {
            self.deliver_session_batch(to, batch).await;
        }
    }
}

Acceptance Criteria

  • NetworkPartition::one_way(from, to) drops all segments from from to to; segments from to to from still deliver
  • NetworkPartition::symmetric(a, b) drops segments in both directions
  • Dropping NetworkPartition heals the route; subsequent segments deliver normally
  • dropped_segments() accurately counts segments dropped since partition injection
  • ShardCrash::crash(shard, cluster, false) takes the shard offline; a follower is promoted
  • After ShardCrash::rejoin(): the previously crashed shard catches up from WAL segments and its applied seqno matches the current leader's
  • heal_all() restores all blocked routes in one call
  • Partition test: inject partition, write 50 segments, verify they are not applied on isolated follower; heal, verify they are applied
  • cargo clippy -D warnings and cargo fmt pass