Milestone 8 (phases 1-4): - Shard-aware WAL segment naming, BatchHeader v2, ShardRouter - Transport trait, InProcessTransport, WalShipper, FollowerDb - HLC, PNCounter, LWWRegister, CrdtSignalState, ReconciliationEngine - Session replication bridge with SeqNo/HWM, idempotency store Forage application: - Multi-source discovery engine with MAB exploration - Embedding-based label system, server handlers, UI refresh Other: - QUICKSTART.md, README.md, milestone-8 planning docs - Hard negative union semantics, RLHF export enhancements - Recovery benchmark and visibility test expansions - Split 8 oversized source files per CODING_GUIDELINES §9 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
6.8 KiB
6.8 KiB
Task 01: SimulatedCluster Harness
Delivers
SimulatedCluster in tidal/src/testing/cluster.rs. Test harness that creates a multi-region tidalDB cluster using InProcessTransport. Exposes a simple API for spinning up N regions × M shards, writing signals, and asserting cross-region replication state. Used by all Phase 8.6 UAT tests.
Complexity: M
Dependencies
- All phases 8.1–8.5 complete
Technical Design
// tidal/src/testing/cluster.rs
// Only compiled with #[cfg(test)] or --features test-utils
/// A fully simulated multi-region tidalDB cluster.
///
/// All network communication happens via `InProcessTransport` (in-memory
/// channels). No actual network, no actual disk I/O (ephemeral mode).
/// Designed for deterministic, repeatable integration tests.
pub struct SimulatedCluster {
/// All nodes in the cluster, indexed by (region, shard).
nodes: HashMap<(RegionId, ShardId), SimulatedNode>,
/// Shared transport factory for the entire cluster.
transport_factory: Arc<InProcessTransportFactory>,
/// Shared control plane (single per cluster).
control_plane: Arc<ControlPlane>,
/// Schema used by all nodes.
schema: Arc<Schema>,
}
pub struct SimulatedNode {
pub region_id: RegionId,
pub shard_id: ShardId,
pub role: NodeRole,
pub db: TidalDb,
}
pub struct ClusterConfig {
pub regions: Vec<RegionId>,
pub shards_per_region: usize,
/// Which (region, shard) is the primary leader (shard 0 in region 0 by default).
pub leader: (RegionId, ShardId),
pub schema: Schema,
}
impl SimulatedCluster {
/// Build a cluster from the given configuration.
///
/// All nodes start immediately; WAL shipping begins automatically.
pub async fn build(config: ClusterConfig) -> Self {
let factory = Arc::new(InProcessTransportFactory::new());
let topology = ClusterTopology {
shards: config.regions.iter().flat_map(|®ion| {
(0..config.shards_per_region).map(move |s| ShardAssignment {
shard_id: ShardId(s as u16),
region_id: region,
})
}).collect(),
};
let topology = Arc::new(topology);
let control_plane = Arc::new(ControlPlane::new(
Arc::new(RwLock::new((*topology).clone())),
Arc::new(TenantRouter::new(
Arc::new(ShardRouter::new(topology.clone())),
topology.clone(),
)),
Arc::new(ReplicationLagGauge::new()),
));
let mut nodes = HashMap::new();
for ®ion in &config.regions {
for shard in 0..config.shards_per_region {
let shard_id = ShardId(shard as u16);
let is_leader = (region, shard_id) == config.leader;
let transport = factory.connect(region);
let db = TidalDb::builder()
.ephemeral()
.with_schema(config.schema.clone())
.with_cluster(NodeConfig {
region_id: region,
shard_id,
role: if is_leader { NodeRole::Leader } else { NodeRole::Follower },
})
.with_transport(Arc::new(transport))
.with_control_plane(control_plane.clone())
.open()
.unwrap();
nodes.insert((region, shard_id), SimulatedNode {
region_id: region,
shard_id,
role: if is_leader { NodeRole::Leader } else { NodeRole::Follower },
db,
});
}
}
Self { nodes, transport_factory: factory, control_plane, schema: Arc::new(config.schema) }
}
/// Get the leader node.
pub fn leader(&self) -> &SimulatedNode {
self.nodes.values().find(|n| n.role == NodeRole::Leader).unwrap()
}
/// Get a follower in a specific region.
pub fn follower_in(&self, region: RegionId) -> Option<&SimulatedNode> {
self.nodes.values().find(|n| n.region_id == region && n.role == NodeRole::Follower)
}
/// Write a signal to the cluster leader.
pub fn write_signal(&self, signal: &str, entity: EntityId, weight: f64) {
self.leader().db.signal(signal, entity, weight, Timestamp::now()).unwrap();
}
/// Wait for all followers to have applied all leader events.
pub async fn await_full_convergence(&self, timeout: Duration) {
let deadline = Instant::now() + timeout;
loop {
if Instant::now() > deadline {
panic!("convergence timeout: cluster did not converge within {:?}", timeout);
}
let all_converged = self.nodes.values()
.filter(|n| n.role == NodeRole::Follower)
.all(|n| {
let lag = self.control_plane.lag_for(n.shard_id);
lag == 0
});
if all_converged { return; }
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
/// Read decay score from a specific region.
pub fn read_decay_score(&self, region: RegionId, entity: EntityId, signal: &str) -> Option<f64> {
self.nodes.values()
.find(|n| n.region_id == region)
.and_then(|n| n.db.read_decay_score(entity, signal, 0).ok().flatten())
}
/// Total number of WAL events applied on a given region's shard.
pub fn applied_seqno(&self, region: RegionId) -> u64 {
self.nodes.values()
.find(|n| n.region_id == region)
.map(|n| n.db.applied_seqno())
.unwrap_or(0)
}
/// Inject a network partition between two regions (via the transport factory).
pub fn inject_partition(&self, from: RegionId, to: RegionId) -> NetworkPartition {
self.transport_factory.inject_partition(from, to)
}
/// Heal all network partitions.
pub fn heal_all_partitions(&self) {
self.transport_factory.heal_all();
}
}
Acceptance Criteria
SimulatedCluster::build(config)creates N×M nodes, all connected viaInProcessTransportleader()returns the single leader node;follower_in(region)returns a follower for the given regionwrite_signal(signal, entity, weight)writes to the leader and returns without errorawait_full_convergence(timeout)blocks until all followers have lag = 0, or panics on timeoutread_decay_score(region, entity, signal)reads from the specified region's nodeinject_partition(from, to)returns aNetworkPartitionhandle; traffic between those regions is dropped while the handle is liveheal_all_partitions()restores transport for all region pairs- Smoke test: 2-region cluster, write 10 signals,
await_full_convergence(2s), verify decay score matches in both regions cargo clippy -D warningsandcargo fmtpass