//! m8p5 integration tests: Control Plane, Multi-Tenancy, and Routing. //! All tests are sync (`#[test]`). No `tokio::test`. #![allow(clippy::unwrap_used)] use std::sync::{Arc, RwLock}; use tidaldb::replication::state::ReplicationState; use tidaldb::replication::{ ClusterTopology, ControlPlane, MigrationState, RegionId, ReplicationLagGauge, ShardAssignment, ShardId, TenantConfig, TenantId, TenantMigration, TenantRouter, UpgradePhase, }; use tidaldb::schema::EntityId; fn make_router_and_cp() -> (Arc, Arc) { let topo = Arc::new(RwLock::new(ClusterTopology::single())); let router = Arc::new(TenantRouter::new(Arc::clone(&topo))); let state = Arc::new(ReplicationState::single()); let lag = Arc::new(ReplicationLagGauge::new(ShardId::SINGLE, state)); let cp = Arc::new(ControlPlane::new(topo, Arc::clone(&router), lag)); (router, cp) } fn make_two_shard_router_and_cp() -> (Arc, Arc) { let topo = Arc::new(RwLock::new(ClusterTopology { shards: vec![ ShardAssignment { shard_id: ShardId(0), region_id: RegionId(0), }, ShardAssignment { shard_id: ShardId(1), region_id: RegionId(1), }, ], })); let router = Arc::new(TenantRouter::new(Arc::clone(&topo))); let state = Arc::new(ReplicationState::single()); let lag = Arc::new(ReplicationLagGauge::new(ShardId::SINGLE, state)); let cp = Arc::new(ControlPlane::new(topo, Arc::clone(&router), lag)); (router, cp) } // ─── Rate Limiting ────────────────────────────────────────────────────────── #[test] fn test_tenant_rate_limiting() { use tidaldb::TidalError; let (router, _cp) = make_router_and_cp(); let mut cfg = TenantConfig::default_tenant(); cfg.max_signals_per_sec = Some(100); router.register_tenant(cfg); let limiter = router.rate_limiter_for(TenantId::DEFAULT).expect("limiter"); // Burst capacity = 2× rate = 200 tokens. All 200 should succeed. for i in 0..200 { assert!( limiter.try_acquire().is_ok(), "acquisition #{i} should succeed (burst capacity = 200)" ); } let err = limiter.try_acquire().expect_err("should be quota exceeded"); assert!(matches!(err, TidalError::QuotaExceeded(_))); } // ─── Noisy Neighbor ────────────────────────────────────────────────────────── #[test] fn test_noisy_neighbor_isolation() { let (router, _cp) = make_router_and_cp(); let mut cfg_a = TenantConfig::default_tenant(); cfg_a.max_signals_per_sec = Some(50); router.register_tenant(cfg_a); let mut cfg_b = TenantConfig::unlimited(TenantId(1), "tenant-b"); cfg_b.max_signals_per_sec = Some(50); router.register_tenant(cfg_b); let limiter_a = router .rate_limiter_for(TenantId::DEFAULT) .expect("limiter A"); let limiter_b = router.rate_limiter_for(TenantId(1)).expect("limiter B"); // Exhaust tenant A's bucket (2× burst = 100 tokens). for _ in 0..100 { let _ = limiter_a.try_acquire(); } assert!( limiter_a.try_acquire().is_err(), "tenant A should be rate limited" ); assert!( limiter_b.try_acquire().is_ok(), "tenant B should not be affected by tenant A's exhaustion" ); } // ─── Residency Policy ──────────────────────────────────────────────────────── #[test] fn test_tenant_residency_policy() { let topo = Arc::new(RwLock::new(ClusterTopology { shards: vec![ ShardAssignment { shard_id: ShardId(0), region_id: RegionId(0), }, ShardAssignment { shard_id: ShardId(1), region_id: RegionId(1), }, ], })); let router = Arc::new(TenantRouter::new(Arc::clone(&topo))); let cfg = TenantConfig { tenant_id: TenantId(42), max_signals_per_sec: None, max_entities: None, max_storage_bytes: None, required_regions: vec![RegionId(1)], label: "region-1-only".to_string(), }; router.register_tenant(cfg); for i in 0u64..100 { let assignment = router.route(TenantId(42), EntityId::new(i)).expect("route"); assert_eq!( assignment.shard_id, ShardId(1), "entity {i} routed to wrong shard" ); assert_eq!(assignment.region_id, RegionId(1)); } } // ─── Migration State Machine ───────────────────────────────────────────────── #[test] fn test_migration_state_machine() { use tidaldb::TidalError; let (router, cp) = make_router_and_cp(); let migration = TenantMigration::new( TenantId(10), ShardId::SINGLE, ShardId(1), Arc::clone(&cp), Arc::clone(&router), ); assert_eq!(migration.current_state(), MigrationState::Idle); // Invalid: enter_dual_write before prepare_target. let err = migration.enter_dual_write(0).expect_err("should fail"); assert!(matches!(err, TidalError::InvalidState(_))); // Idle -> PreparingTarget let seqno = migration.prepare_target(50).expect("prepare_target"); assert_eq!(seqno, 50); assert!(matches!( migration.current_state(), MigrationState::PreparingTarget { last_shipped_seqno: 50 } )); // Invalid: prepare_target again. let err = migration.prepare_target(99).expect_err("should fail"); assert!(matches!(err, TidalError::InvalidState(_))); // PreparingTarget -> DualWrite let cutover = migration.enter_dual_write(100).expect("enter_dual_write"); assert_eq!(cutover, 100); assert!(matches!( migration.current_state(), MigrationState::DualWrite { cutover_seqno: 100 } )); // Invalid: finalize when target behind cutover — NotReady (not InvalidState). let err = migration.finalize(50).expect_err("should fail"); assert!( matches!(err, TidalError::NotReady(_)), "expected NotReady, got: {err}" ); // DualWrite -> Finalizing migration.finalize(150).expect("finalize"); assert!(matches!( migration.current_state(), MigrationState::Finalizing { .. } )); // Invalid: finalize again. let err = migration.finalize(200).expect_err("should fail"); assert!(matches!(err, TidalError::InvalidState(_))); // Finalizing -> Complete (GC window = 0) migration.gc_source(0).expect("gc_source"); assert_eq!(migration.current_state(), MigrationState::Complete); // Invalid: transition from Complete. let err = migration.prepare_target(1).expect_err("should fail"); assert!(matches!(err, TidalError::InvalidState(_))); } /// GC window enforcement: `gc_source` must reject calls before the window elapses. #[test] fn test_gc_source_rejects_before_window_elapses() { use tidaldb::TidalError; let (router, cp) = make_router_and_cp(); let migration = TenantMigration::new( TenantId(11), ShardId::SINGLE, ShardId(1), Arc::clone(&cp), Arc::clone(&router), ); migration.prepare_target(10).unwrap(); migration.enter_dual_write(20).unwrap(); migration.finalize(25).unwrap(); // A 10-minute GC window has definitely not elapsed since finalize() just ran. let err = migration .gc_source(600_000_000_000) .expect_err("should be rejected"); assert!( matches!(err, TidalError::InvalidState(_)), "expected InvalidState for GC window not elapsed, got: {err}" ); } // ─── Dual-Write Routing ─────────────────────────────────────────────────────── /// During dual-write mode, `write_assignments` returns both source and target shards. #[test] fn test_dual_write_routing_returns_both_shards() { let (router, _cp) = make_two_shard_router_and_cp(); router.set_dual_write(TenantId(5), ShardId(0), ShardId(1)); assert!(router.is_dual_write(TenantId(5))); let assignments = router .write_assignments(TenantId(5), EntityId::new(42)) .expect("write_assignments"); assert_eq!(assignments.len(), 2, "dual-write must return 2 assignments"); let shard_ids: Vec<_> = assignments.iter().map(|a| a.shard_id).collect(); assert!( shard_ids.contains(&ShardId(0)), "source shard must be in assignments" ); assert!( shard_ids.contains(&ShardId(1)), "target shard must be in assignments" ); } /// After `finalize_migration`, routing pins to the target shard. #[test] fn test_finalize_migration_pins_to_target() { let (router, _cp) = make_two_shard_router_and_cp(); router.set_dual_write(TenantId(7), ShardId(0), ShardId(1)); router.finalize_migration(TenantId(7), ShardId(1)); // No longer in dual-write mode. assert!(!router.is_dual_write(TenantId(7))); // Pinned to target shard. assert_eq!(router.pinned_shard(TenantId(7)), Some(ShardId(1))); // All routing goes to the target shard. for i in 0u64..20 { let assignment = router.route(TenantId(7), EntityId::new(i)).unwrap(); assert_eq!( assignment.shard_id, ShardId(1), "post-migration routing must pin to target shard" ); } // write_assignments returns only the target after finalization. let writes = router .write_assignments(TenantId(7), EntityId::new(99)) .unwrap(); assert_eq!(writes.len(), 1); assert_eq!(writes[0].shard_id, ShardId(1)); } // ─── Rolling Upgrade ───────────────────────────────────────────────────────── #[test] fn test_rolling_upgrade_drain_rejoin() { use tidaldb::TidalError; // Use a 2-shard topology so that draining 1 shard still leaves 1 serving. let (_router, cp) = make_two_shard_router_and_cp(); let coordinator = tidaldb::replication::RollingUpgradeCoordinator::new(Arc::clone(&cp)); assert_eq!(coordinator.current_phase(), UpgradePhase::Ready); assert!(!coordinator.is_drained(ShardId(0))); coordinator.drain(ShardId(0)).expect("drain shard 0"); assert!(coordinator.is_drained(ShardId(0))); assert!(matches!( coordinator.current_phase(), UpgradePhase::Draining { shard_id: ShardId(0) } )); // Cannot start a second concurrent drain. let err = coordinator.drain(ShardId(1)).expect_err("should fail"); assert!(matches!(err, TidalError::InvalidState(_))); coordinator.rejoin(ShardId(0)).expect("rejoin"); assert!(!coordinator.is_drained(ShardId(0))); assert_eq!(coordinator.current_phase(), UpgradePhase::Ready); // Can drain the other shard after rejoin. coordinator.drain(ShardId(1)).expect("drain shard 1"); assert!(coordinator.is_drained(ShardId(1))); } /// Draining the sole shard in a single-node cluster must be refused. #[test] fn test_drain_single_node_cluster_is_refused() { use tidaldb::TidalError; let (_router, cp) = make_router_and_cp(); // single-shard let coordinator = tidaldb::replication::RollingUpgradeCoordinator::new(Arc::clone(&cp)); let err = coordinator .drain(ShardId::SINGLE) .expect_err("must refuse to drain the only node"); assert!( matches!(err, TidalError::InvalidState(_)), "expected InvalidState, got: {err}" ); }