tidaldb/docs/planning/milestone-8/phase-6/task-01-simulated-cluster.md
jordan f4cfd6c81f feat: complete M8 replication primitives + forage enhancements + docs
Milestone 8 (phases 1-4):
- Shard-aware WAL segment naming, BatchHeader v2, ShardRouter
- Transport trait, InProcessTransport, WalShipper, FollowerDb
- HLC, PNCounter, LWWRegister, CrdtSignalState, ReconciliationEngine
- Session replication bridge with SeqNo/HWM, idempotency store

Forage application:
- Multi-source discovery engine with MAB exploration
- Embedding-based label system, server handlers, UI refresh

Other:
- QUICKSTART.md, README.md, milestone-8 planning docs
- Hard negative union semantics, RLHF export enhancements
- Recovery benchmark and visibility test expansions
- Split 8 oversized source files per CODING_GUIDELINES §9

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 13:17:19 -07:00

6.8 KiB
Raw Permalink Blame History

Task 01: SimulatedCluster Harness

Delivers

SimulatedCluster in tidal/src/testing/cluster.rs. Test harness that creates a multi-region tidalDB cluster using InProcessTransport. Exposes a simple API for spinning up N regions × M shards, writing signals, and asserting cross-region replication state. Used by all Phase 8.6 UAT tests.

Complexity: M

Dependencies

  • All phases 8.18.5 complete

Technical Design

// tidal/src/testing/cluster.rs
// Only compiled with #[cfg(test)] or --features test-utils

/// A fully simulated multi-region tidalDB cluster.
///
/// All network communication happens via `InProcessTransport` (in-memory
/// channels). No actual network, no actual disk I/O (ephemeral mode).
/// Designed for deterministic, repeatable integration tests.
pub struct SimulatedCluster {
    /// All nodes in the cluster, indexed by (region, shard).
    nodes: HashMap<(RegionId, ShardId), SimulatedNode>,
    /// Shared transport factory for the entire cluster.
    transport_factory: Arc<InProcessTransportFactory>,
    /// Shared control plane (single per cluster).
    control_plane: Arc<ControlPlane>,
    /// Schema used by all nodes.
    schema: Arc<Schema>,
}

pub struct SimulatedNode {
    pub region_id: RegionId,
    pub shard_id: ShardId,
    pub role: NodeRole,
    pub db: TidalDb,
}

pub struct ClusterConfig {
    pub regions: Vec<RegionId>,
    pub shards_per_region: usize,
    /// Which (region, shard) is the primary leader (shard 0 in region 0 by default).
    pub leader: (RegionId, ShardId),
    pub schema: Schema,
}

impl SimulatedCluster {
    /// Build a cluster from the given configuration.
    ///
    /// All nodes start immediately; WAL shipping begins automatically.
    pub async fn build(config: ClusterConfig) -> Self {
        let factory = Arc::new(InProcessTransportFactory::new());
        let topology = ClusterTopology {
            shards: config.regions.iter().flat_map(|&region| {
                (0..config.shards_per_region).map(move |s| ShardAssignment {
                    shard_id: ShardId(s as u16),
                    region_id: region,
                })
            }).collect(),
        };
        let topology = Arc::new(topology);
        let control_plane = Arc::new(ControlPlane::new(
            Arc::new(RwLock::new((*topology).clone())),
            Arc::new(TenantRouter::new(
                Arc::new(ShardRouter::new(topology.clone())),
                topology.clone(),
            )),
            Arc::new(ReplicationLagGauge::new()),
        ));

        let mut nodes = HashMap::new();
        for &region in &config.regions {
            for shard in 0..config.shards_per_region {
                let shard_id = ShardId(shard as u16);
                let is_leader = (region, shard_id) == config.leader;
                let transport = factory.connect(region);

                let db = TidalDb::builder()
                    .ephemeral()
                    .with_schema(config.schema.clone())
                    .with_cluster(NodeConfig {
                        region_id: region,
                        shard_id,
                        role: if is_leader { NodeRole::Leader } else { NodeRole::Follower },
                    })
                    .with_transport(Arc::new(transport))
                    .with_control_plane(control_plane.clone())
                    .open()
                    .unwrap();

                nodes.insert((region, shard_id), SimulatedNode {
                    region_id: region,
                    shard_id,
                    role: if is_leader { NodeRole::Leader } else { NodeRole::Follower },
                    db,
                });
            }
        }

        Self { nodes, transport_factory: factory, control_plane, schema: Arc::new(config.schema) }
    }

    /// Get the leader node.
    pub fn leader(&self) -> &SimulatedNode {
        self.nodes.values().find(|n| n.role == NodeRole::Leader).unwrap()
    }

    /// Get a follower in a specific region.
    pub fn follower_in(&self, region: RegionId) -> Option<&SimulatedNode> {
        self.nodes.values().find(|n| n.region_id == region && n.role == NodeRole::Follower)
    }

    /// Write a signal to the cluster leader.
    pub fn write_signal(&self, signal: &str, entity: EntityId, weight: f64) {
        self.leader().db.signal(signal, entity, weight, Timestamp::now()).unwrap();
    }

    /// Wait for all followers to have applied all leader events.
    pub async fn await_full_convergence(&self, timeout: Duration) {
        let deadline = Instant::now() + timeout;
        loop {
            if Instant::now() > deadline {
                panic!("convergence timeout: cluster did not converge within {:?}", timeout);
            }
            let all_converged = self.nodes.values()
                .filter(|n| n.role == NodeRole::Follower)
                .all(|n| {
                    let lag = self.control_plane.lag_for(n.shard_id);
                    lag == 0
                });
            if all_converged { return; }
            tokio::time::sleep(Duration::from_millis(50)).await;
        }
    }

    /// Read decay score from a specific region.
    pub fn read_decay_score(&self, region: RegionId, entity: EntityId, signal: &str) -> Option<f64> {
        self.nodes.values()
            .find(|n| n.region_id == region)
            .and_then(|n| n.db.read_decay_score(entity, signal, 0).ok().flatten())
    }

    /// Total number of WAL events applied on a given region's shard.
    pub fn applied_seqno(&self, region: RegionId) -> u64 {
        self.nodes.values()
            .find(|n| n.region_id == region)
            .map(|n| n.db.applied_seqno())
            .unwrap_or(0)
    }

    /// Inject a network partition between two regions (via the transport factory).
    pub fn inject_partition(&self, from: RegionId, to: RegionId) -> NetworkPartition {
        self.transport_factory.inject_partition(from, to)
    }

    /// Heal all network partitions.
    pub fn heal_all_partitions(&self) {
        self.transport_factory.heal_all();
    }
}

Acceptance Criteria

  • SimulatedCluster::build(config) creates N×M nodes, all connected via InProcessTransport
  • leader() returns the single leader node; follower_in(region) returns a follower for the given region
  • write_signal(signal, entity, weight) writes to the leader and returns without error
  • await_full_convergence(timeout) blocks until all followers have lag = 0, or panics on timeout
  • read_decay_score(region, entity, signal) reads from the specified region's node
  • inject_partition(from, to) returns a NetworkPartition handle; traffic between those regions is dropped while the handle is live
  • heal_all_partitions() restores transport for all region pairs
  • Smoke test: 2-region cluster, write 10 signals, await_full_convergence(2s), verify decay score matches in both regions
  • cargo clippy -D warnings and cargo fmt pass