Milestone 8 (phases 1-4): - Shard-aware WAL segment naming, BatchHeader v2, ShardRouter - Transport trait, InProcessTransport, WalShipper, FollowerDb - HLC, PNCounter, LWWRegister, CrdtSignalState, ReconciliationEngine - Session replication bridge with SeqNo/HWM, idempotency store Forage application: - Multi-source discovery engine with MAB exploration - Embedding-based label system, server handlers, UI refresh Other: - QUICKSTART.md, README.md, milestone-8 planning docs - Hard negative union semantics, RLHF export enhancements - Recovery benchmark and visibility test expansions - Split 8 oversized source files per CODING_GUIDELINES §9 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
7.8 KiB
7.8 KiB
Task 06: Multi-Tenancy Integration Tests
Delivers
Integration test suite in tidal/tests/m8p5_multitenancy.rs verifying: per-tenant rate limiting, tenant migration with zero downtime, rolling upgrade, and noisy-neighbor isolation.
Complexity: M
Dependencies
- Tasks 01–05 complete
Technical Design
// tidal/tests/m8p5_multitenancy.rs
use tidaldb::replication::{
TenantId, TenantConfig, TenantRouter, ClusterTopology, ShardAssignment,
ControlPlane, TenantMigration, RollingUpgradeCoordinator,
InProcessTransportFactory,
};
fn three_shard_topology() -> ClusterTopology {
ClusterTopology {
shards: vec![
ShardAssignment { shard_id: ShardId(0), region_id: RegionId(0) },
ShardAssignment { shard_id: ShardId(1), region_id: RegionId(1) },
ShardAssignment { shard_id: ShardId(2), region_id: RegionId(2) },
],
}
}
/// Rate limiting: a tenant configured at 100 signals/sec is throttled
/// when burst exceeds that rate.
#[test]
fn test_tenant_rate_limiting() {
let topology = Arc::new(three_shard_topology());
let shard_router = Arc::new(ShardRouter::new(topology.clone()));
let tenant_router = Arc::new(TenantRouter::new(shard_router, topology));
let tenant = TenantId(1);
tenant_router.register_tenant(TenantConfig {
tenant_id: tenant,
max_signals_per_sec: Some(100),
max_entities: None,
max_storage_bytes: None,
required_regions: vec![],
label: "test-tenant".into(),
});
let limiter = tenant_router.rate_limiter_for(tenant).unwrap();
// Drain the bucket: 200 immediate acquires.
let mut allowed = 0;
let mut throttled = 0;
for _ in 0..200 {
match limiter.try_acquire() {
Ok(()) => allowed += 1,
Err(TidalError::QuotaExceeded(_)) => throttled += 1,
_ => panic!("unexpected error"),
}
}
// At 100 signals/sec, we get 2s burst (200 tokens). All 200 should succeed
// since the burst capacity is 2x rate. Let's verify that after exhaustion, next fails.
assert!(throttled == 0, "burst capacity should absorb 200 signals");
// One more should fail.
assert!(
matches!(limiter.try_acquire(), Err(TidalError::QuotaExceeded(_))),
"bucket should be empty after 200 signals"
);
}
/// Noisy neighbor: tenant A at full burst doesn't affect tenant B.
#[test]
fn test_noisy_neighbor_isolation() {
let topology = Arc::new(three_shard_topology());
let shard_router = Arc::new(ShardRouter::new(topology.clone()));
let tenant_router = Arc::new(TenantRouter::new(shard_router, topology));
let tenant_a = TenantId(1);
let tenant_b = TenantId(2);
tenant_router.register_tenant(TenantConfig {
tenant_id: tenant_a,
max_signals_per_sec: Some(10), // low limit
max_entities: None,
max_storage_bytes: None,
required_regions: vec![],
label: "noisy-tenant".into(),
});
tenant_router.register_tenant(TenantConfig {
tenant_id: tenant_b,
max_signals_per_sec: Some(10_000), // high limit
max_entities: None,
max_storage_bytes: None,
required_regions: vec![],
label: "good-tenant".into(),
});
let limiter_a = tenant_router.rate_limiter_for(tenant_a).unwrap();
let limiter_b = tenant_router.rate_limiter_for(tenant_b).unwrap();
// Exhaust tenant A's bucket.
for _ in 0..1000 { let _ = limiter_a.try_acquire(); }
// Tenant B should not be affected.
for _ in 0..100 {
assert!(
limiter_b.try_acquire().is_ok(),
"tenant B should not be throttled by tenant A's exhaustion"
);
}
}
/// Residency policy: tenant configured to stay in region 1 only routes there.
#[test]
fn test_tenant_residency_policy() {
let topology = Arc::new(three_shard_topology());
let shard_router = Arc::new(ShardRouter::new(topology.clone()));
let tenant_router = Arc::new(TenantRouter::new(shard_router, topology));
let tenant = TenantId(10);
tenant_router.register_tenant(TenantConfig {
tenant_id: tenant,
max_signals_per_sec: None,
max_entities: None,
max_storage_bytes: None,
required_regions: vec![RegionId(1)], // EU residency
label: "eu-tenant".into(),
});
// All entities for this tenant should route to shard 1 (region 1).
for i in 0u64..100 {
let assignment = tenant_router.route(tenant, EntityId::new(i)).unwrap();
assert_eq!(assignment.region_id, RegionId(1),
"entity {} should be in region 1 per residency policy", i);
}
}
/// Tenant migration: move tenant 1 from shard 0 to shard 2 with zero downtime.
#[tokio::test]
async fn test_tenant_migration_zero_downtime() {
let (db0, db2, factory) = setup_migration_cluster().await;
let tenant = TenantId(1);
let user = EntityId::new(1);
// Write 100 signals to tenant 1 on shard 0 before migration.
for i in 0..100u64 {
db0.signal_for_tenant(tenant, "view", EntityId::new(i + 10), 1.0, Timestamp::now())
.unwrap();
}
let migration = TenantMigration::new(
tenant, ShardId(0), ShardId(2),
db0.control_plane().clone(),
db0.tenant_router().clone(),
factory.transport(RegionId(0)),
);
// Phase 1: ship existing WAL to target.
migration.prepare_target().await.unwrap();
// Phase 2: enter dual-write; write 50 more signals.
migration.enter_dual_write().await.unwrap();
for i in 100..150u64 {
db0.signal_for_tenant(tenant, "view", EntityId::new(i + 10), 1.0, Timestamp::now())
.unwrap();
}
// Phase 3: finalize.
tokio::time::sleep(Duration::from_millis(100)).await;
migration.finalize().await.unwrap();
// All 150 signals should be present on shard 2 (new home).
let count_on_target = db2.total_signal_count_for_tenant(tenant, "view").unwrap();
assert_eq!(count_on_target, 150, "all signals must be on target shard after migration");
// Phase 4: GC (use 0 window for test).
migration.gc_source(0).unwrap();
let count_on_source = db0.total_signal_count_for_tenant(tenant, "view").unwrap();
assert_eq!(count_on_source, 0, "source shard must have no tenant data after GC");
}
/// Rolling upgrade: drain node, "upgrade", rejoin; signals written during
/// the upgrade are present on the rejoined node.
#[tokio::test]
async fn test_rolling_upgrade_no_data_loss() {
let (db_leader, db_followers, factory) = setup_three_node_cluster().await;
let coordinator = RollingUpgradeCoordinator::new(
db_leader.control_plane().clone(),
db_leader.wal_shipper().clone(),
);
// Drain follower 0.
coordinator.drain(ShardId(1)).await.unwrap();
// Write 200 signals during the "upgrade window".
for i in 0..200u64 {
db_leader.signal("view", EntityId::new(i + 1), 1.0, Timestamp::now()).unwrap();
}
// Rejoin (simulated: follower is already running, just re-enables routing).
coordinator.rejoin(ShardId(1)).await.unwrap();
// All 200 signals must be present on the rejoined follower.
let lag = db_leader.control_plane().lag_for(ShardId(1));
assert_eq!(lag, 0, "no replication lag after rejoin");
}
Acceptance Criteria
test_tenant_rate_limiting: 100-signal burst absorbed, 201st signal returnsQuotaExceededwithin 1mstest_noisy_neighbor_isolation: exhausting tenant A's rate limiter has no effect on tenant Btest_tenant_residency_policy: all 100 entities for an EU-residency tenant route to region 1test_tenant_migration_zero_downtime: all 150 signals present on target shard after migration; source has 0 after GCtest_rolling_upgrade_no_data_loss: 200 signals written during drain window present on rejoined follower- All 5 tests pass in
cargo test --test m8p5_multitenancy cargo clippy -D warningsandcargo fmtpass