tidaldb/docs/planning/milestone-8/phase-5/task-03-control-plane.md
jordan f4cfd6c81f feat: complete M8 replication primitives + forage enhancements + docs
Milestone 8 (phases 1-4):
- Shard-aware WAL segment naming, BatchHeader v2, ShardRouter
- Transport trait, InProcessTransport, WalShipper, FollowerDb
- HLC, PNCounter, LWWRegister, CrdtSignalState, ReconciliationEngine
- Session replication bridge with SeqNo/HWM, idempotency store

Forage application:
- Multi-source discovery engine with MAB exploration
- Embedding-based label system, server handlers, UI refresh

Other:
- QUICKSTART.md, README.md, milestone-8 planning docs
- Hard negative union semantics, RLHF export enhancements
- Recovery benchmark and visibility test expansions
- Split 8 oversized source files per CODING_GUIDELINES §9

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 13:17:19 -07:00

177 lines
6.3 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Task 03: ControlPlane
## Delivers
`ControlPlane` in `tidal/src/replication/control.rs`. Embedded within the leader node. Manages cluster topology (shard-to-region assignments, tenant placement, region health). Exposes cluster health metrics serializable to JSON for external monitoring. No separate service — runs as a background task within the leader process.
## Complexity: L
## Dependencies
- Task 01 (TenantId, TenantConfig)
- Task 02 (TenantRouter, ClusterTopology)
- Phase 8.2, Task 06 (ReplicationLagGauge)
## Technical Design
```rust
// tidal/src/replication/control.rs
/// Embedded cluster controller running on the leader node.
///
/// Tracks cluster topology, tenant placement, and shard health.
/// Exposes a `ClusterHealth` snapshot for external monitoring via the
/// existing `MetricsState` integration.
///
/// Design constraint: no external service. The control plane is an
/// in-process component, consistent with tidalDB's embeddable philosophy.
pub struct ControlPlane {
topology: Arc<RwLock<ClusterTopology>>,
tenant_router: Arc<TenantRouter>,
lag_gauge: Arc<ReplicationLagGauge>,
shard_stats: DashMap<ShardId, ShardStats>,
region_health: DashMap<RegionId, RegionHealth>,
}
/// Per-shard operational statistics.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ShardStats {
pub shard_id: ShardId,
pub region_id: RegionId,
pub entity_count: u64,
/// WAL events applied per second (EMA, α=0.1).
pub signal_throughput_eps: f64,
/// Replication lag to each follower (seqno distance).
pub replication_lag: HashMap<RegionId, u64>,
/// Approximate disk usage for this shard's WAL directory (bytes).
pub disk_bytes: u64,
/// Last heartbeat from this shard (ns since epoch).
pub last_heartbeat_ns: u64,
}
/// Per-region health state.
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum RegionHealth {
Healthy,
Degraded, // replication lag > 5s
Offline, // no heartbeat for > 30s
}
/// Full cluster health snapshot.
///
/// Serializable to JSON for monitoring dashboards (Prometheus/Grafana, etc.).
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ClusterHealth {
pub snapshot_ns: u64,
pub shards: Vec<ShardStats>,
pub regions: HashMap<RegionId, RegionHealth>,
pub tenant_count: usize,
pub total_entities: u64,
pub total_signals_eps: f64,
}
impl ControlPlane {
pub fn new(
topology: Arc<RwLock<ClusterTopology>>,
tenant_router: Arc<TenantRouter>,
lag_gauge: Arc<ReplicationLagGauge>,
) -> Self {
Self {
topology,
tenant_router,
lag_gauge,
shard_stats: DashMap::new(),
region_health: DashMap::new(),
}
}
/// Update shard statistics (called by each shard on its heartbeat interval).
pub fn record_shard_heartbeat(&self, stats: ShardStats) {
self.region_health.insert(stats.region_id, RegionHealth::Healthy);
self.shard_stats.insert(stats.shard_id, stats);
}
/// Compute and return current cluster health snapshot.
pub fn health(&self) -> ClusterHealth {
let now_ns = crate::util::now_ns();
let shards: Vec<_> = self.shard_stats.iter()
.map(|r| r.value().clone())
.collect();
// Mark regions offline if no heartbeat in 30s.
let regions: HashMap<_, _> = self.region_health.iter()
.map(|r| {
let shard_for_region = shards.iter()
.find(|s| s.region_id == *r.key());
let health = if let Some(s) = shard_for_region {
let age_ns = now_ns.saturating_sub(s.last_heartbeat_ns);
if age_ns > 30_000_000_000 { // 30s
RegionHealth::Offline
} else if s.replication_lag.values().any(|&lag| lag > 5_000_000_000) { // 5s
RegionHealth::Degraded
} else {
RegionHealth::Healthy
}
} else {
RegionHealth::Offline
};
(*r.key(), health)
})
.collect();
let total_entities = shards.iter().map(|s| s.entity_count).sum();
let total_signals_eps = shards.iter().map(|s| s.signal_throughput_eps).sum();
ClusterHealth {
snapshot_ns: now_ns,
shards,
regions,
tenant_count: self.tenant_router.tenant_count(),
total_entities,
total_signals_eps,
}
}
/// Update topology: add or reassign a shard.
///
/// Propagated to `TenantRouter` which will re-compute routes on next call.
pub fn update_topology(&self, assignment: ShardAssignment) {
let mut topology = self.topology.write().unwrap();
if let Some(existing) = topology.shards.iter_mut().find(|s| s.shard_id == assignment.shard_id) {
*existing = assignment;
} else {
topology.shards.push(assignment);
}
}
/// JSON representation of `ClusterHealth` for external monitoring.
pub fn health_json(&self) -> String {
serde_json::to_string_pretty(&self.health())
.unwrap_or_else(|e| format!("{{\"error\": \"{}\"}}", e))
}
}
```
### MetricsState Integration
```rust
// tidal/src/db/metrics.rs (extension)
impl MetricsState {
pub fn cluster_health(&self) -> Option<ClusterHealth> {
self.control_plane.as_ref().map(|cp| cp.health())
}
}
```
## Acceptance Criteria
- [ ] `ControlPlane::health()` returns a `ClusterHealth` with per-shard stats for all registered shards
- [ ] `RegionHealth::Offline` is set for a shard whose `last_heartbeat_ns` is > 30 seconds ago
- [ ] `RegionHealth::Degraded` is set for a shard with `replication_lag > 5s`
- [ ] `health_json()` produces valid JSON deserializable back to `ClusterHealth` (round-trip test)
- [ ] `update_topology(assignment)` is reflected in the next `health()` call and the next `TenantRouter::route()` call
- [ ] `MetricsState::cluster_health()` returns `None` on single-node deployments (control plane not configured)
- [ ] Control plane heartbeat test: 3 simulated shards, update stats for each, verify `health()` shows all 3 as `Healthy`
- [ ] `cargo clippy -D warnings` and `cargo fmt` pass