tidaldb/docs/planning/milestone-8/phase-6/task-01-simulated-cluster.md
jordan f4cfd6c81f feat: complete M8 replication primitives + forage enhancements + docs
Milestone 8 (phases 1-4):
- Shard-aware WAL segment naming, BatchHeader v2, ShardRouter
- Transport trait, InProcessTransport, WalShipper, FollowerDb
- HLC, PNCounter, LWWRegister, CrdtSignalState, ReconciliationEngine
- Session replication bridge with SeqNo/HWM, idempotency store

Forage application:
- Multi-source discovery engine with MAB exploration
- Embedding-based label system, server handlers, UI refresh

Other:
- QUICKSTART.md, README.md, milestone-8 planning docs
- Hard negative union semantics, RLHF export enhancements
- Recovery benchmark and visibility test expansions
- Split 8 oversized source files per CODING_GUIDELINES §9

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 13:17:19 -07:00

177 lines
6.8 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Task 01: SimulatedCluster Harness
## Delivers
`SimulatedCluster` in `tidal/src/testing/cluster.rs`. Test harness that creates a multi-region tidalDB cluster using `InProcessTransport`. Exposes a simple API for spinning up N regions × M shards, writing signals, and asserting cross-region replication state. Used by all Phase 8.6 UAT tests.
## Complexity: M
## Dependencies
- All phases 8.18.5 complete
## Technical Design
```rust
// tidal/src/testing/cluster.rs
// Only compiled with #[cfg(test)] or --features test-utils
/// A fully simulated multi-region tidalDB cluster.
///
/// All network communication happens via `InProcessTransport` (in-memory
/// channels). No actual network, no actual disk I/O (ephemeral mode).
/// Designed for deterministic, repeatable integration tests.
pub struct SimulatedCluster {
/// All nodes in the cluster, indexed by (region, shard).
nodes: HashMap<(RegionId, ShardId), SimulatedNode>,
/// Shared transport factory for the entire cluster.
transport_factory: Arc<InProcessTransportFactory>,
/// Shared control plane (single per cluster).
control_plane: Arc<ControlPlane>,
/// Schema used by all nodes.
schema: Arc<Schema>,
}
pub struct SimulatedNode {
pub region_id: RegionId,
pub shard_id: ShardId,
pub role: NodeRole,
pub db: TidalDb,
}
pub struct ClusterConfig {
pub regions: Vec<RegionId>,
pub shards_per_region: usize,
/// Which (region, shard) is the primary leader (shard 0 in region 0 by default).
pub leader: (RegionId, ShardId),
pub schema: Schema,
}
impl SimulatedCluster {
/// Build a cluster from the given configuration.
///
/// All nodes start immediately; WAL shipping begins automatically.
pub async fn build(config: ClusterConfig) -> Self {
let factory = Arc::new(InProcessTransportFactory::new());
let topology = ClusterTopology {
shards: config.regions.iter().flat_map(|&region| {
(0..config.shards_per_region).map(move |s| ShardAssignment {
shard_id: ShardId(s as u16),
region_id: region,
})
}).collect(),
};
let topology = Arc::new(topology);
let control_plane = Arc::new(ControlPlane::new(
Arc::new(RwLock::new((*topology).clone())),
Arc::new(TenantRouter::new(
Arc::new(ShardRouter::new(topology.clone())),
topology.clone(),
)),
Arc::new(ReplicationLagGauge::new()),
));
let mut nodes = HashMap::new();
for &region in &config.regions {
for shard in 0..config.shards_per_region {
let shard_id = ShardId(shard as u16);
let is_leader = (region, shard_id) == config.leader;
let transport = factory.connect(region);
let db = TidalDb::builder()
.ephemeral()
.with_schema(config.schema.clone())
.with_cluster(NodeConfig {
region_id: region,
shard_id,
role: if is_leader { NodeRole::Leader } else { NodeRole::Follower },
})
.with_transport(Arc::new(transport))
.with_control_plane(control_plane.clone())
.open()
.unwrap();
nodes.insert((region, shard_id), SimulatedNode {
region_id: region,
shard_id,
role: if is_leader { NodeRole::Leader } else { NodeRole::Follower },
db,
});
}
}
Self { nodes, transport_factory: factory, control_plane, schema: Arc::new(config.schema) }
}
/// Get the leader node.
pub fn leader(&self) -> &SimulatedNode {
self.nodes.values().find(|n| n.role == NodeRole::Leader).unwrap()
}
/// Get a follower in a specific region.
pub fn follower_in(&self, region: RegionId) -> Option<&SimulatedNode> {
self.nodes.values().find(|n| n.region_id == region && n.role == NodeRole::Follower)
}
/// Write a signal to the cluster leader.
pub fn write_signal(&self, signal: &str, entity: EntityId, weight: f64) {
self.leader().db.signal(signal, entity, weight, Timestamp::now()).unwrap();
}
/// Wait for all followers to have applied all leader events.
pub async fn await_full_convergence(&self, timeout: Duration) {
let deadline = Instant::now() + timeout;
loop {
if Instant::now() > deadline {
panic!("convergence timeout: cluster did not converge within {:?}", timeout);
}
let all_converged = self.nodes.values()
.filter(|n| n.role == NodeRole::Follower)
.all(|n| {
let lag = self.control_plane.lag_for(n.shard_id);
lag == 0
});
if all_converged { return; }
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
/// Read decay score from a specific region.
pub fn read_decay_score(&self, region: RegionId, entity: EntityId, signal: &str) -> Option<f64> {
self.nodes.values()
.find(|n| n.region_id == region)
.and_then(|n| n.db.read_decay_score(entity, signal, 0).ok().flatten())
}
/// Total number of WAL events applied on a given region's shard.
pub fn applied_seqno(&self, region: RegionId) -> u64 {
self.nodes.values()
.find(|n| n.region_id == region)
.map(|n| n.db.applied_seqno())
.unwrap_or(0)
}
/// Inject a network partition between two regions (via the transport factory).
pub fn inject_partition(&self, from: RegionId, to: RegionId) -> NetworkPartition {
self.transport_factory.inject_partition(from, to)
}
/// Heal all network partitions.
pub fn heal_all_partitions(&self) {
self.transport_factory.heal_all();
}
}
```
## Acceptance Criteria
- [ ] `SimulatedCluster::build(config)` creates N×M nodes, all connected via `InProcessTransport`
- [ ] `leader()` returns the single leader node; `follower_in(region)` returns a follower for the given region
- [ ] `write_signal(signal, entity, weight)` writes to the leader and returns without error
- [ ] `await_full_convergence(timeout)` blocks until all followers have lag = 0, or panics on timeout
- [ ] `read_decay_score(region, entity, signal)` reads from the specified region's node
- [ ] `inject_partition(from, to)` returns a `NetworkPartition` handle; traffic between those regions is dropped while the handle is live
- [ ] `heal_all_partitions()` restores transport for all region pairs
- [ ] Smoke test: 2-region cluster, write 10 signals, `await_full_convergence(2s)`, verify decay score matches in both regions
- [ ] `cargo clippy -D warnings` and `cargo fmt` pass