// M7p2 load test integration suite. // // Tests: degradation level progression, all-queries-ok under overload, // rate limiter isolation, session TTL sweeper, degradation in response, // shutdown session cleanup. #![allow(clippy::unwrap_used)] use std::collections::HashMap; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::thread; use std::time::Duration; use tidaldb::schema::{DecaySpec, EntityId, EntityKind, SchemaBuilder, Timestamp, Window}; use tidaldb::{AgentPolicy, DegradationLevel, TidalDb, TidalError}; // ── Setup helper ────────────────────────────────────────────────────────────── /// Build a test database with `item_count` items and a short-TTL policy. fn setup_db(item_count: u64) -> TidalDb { let mut builder = SchemaBuilder::new(); let _ = builder .signal( "view", EntityKind::Item, DecaySpec::Exponential { half_life: Duration::from_secs(7 * 24 * 3600), }, ) .windows(&[Window::OneHour, Window::TwentyFourHours, Window::AllTime]) .velocity(true) .add(); let _ = builder .signal( "like", EntityKind::Item, DecaySpec::Exponential { half_life: Duration::from_secs(14 * 24 * 3600), }, ) .windows(&[Window::TwentyFourHours, Window::AllTime]) .velocity(false) .add(); // Short-TTL policy: 500ms max session duration. builder.session_policy( "short_ttl", AgentPolicy { max_session_duration: Duration::from_millis(500), max_signals_per_session: 1000, allowed_signals: vec![], denied_signals: vec![], }, ); // Normal policy for rate limiting tests. builder.session_policy( "normal", AgentPolicy { max_session_duration: Duration::from_secs(3600), max_signals_per_session: 10_000, allowed_signals: vec![], denied_signals: vec![], }, ); let schema = builder.build().unwrap(); // Explicitly configure the rate limiter for load tests. // Default is unlimited; these tests exercise the 100/sec, 200-burst limits. let rl_config = tidaldb::load::RateLimiterConfig::limited(100.0, 200.0); let db = TidalDb::builder() .ephemeral() .with_schema(schema) .with_rate_limiter_config(rl_config) .open() .unwrap(); // Seed items. for i in 1..=item_count { let mut meta = HashMap::new(); meta.insert("title".to_string(), format!("Item {i}")); meta.insert("category".to_string(), "test".to_string()); meta.insert("format".to_string(), "video".to_string()); meta.insert("creator_id".to_string(), format!("{}", i % 10)); db.write_item_with_metadata(EntityId::new(i), &meta) .unwrap(); } // Seed signals. let base_ts = Timestamp::now().as_nanos(); for i in 1..=item_count.min(100) { let ts = Timestamp::from_nanos(base_ts.saturating_sub(i * 60_000_000_000)); db.signal("view", EntityId::new(i), 1.0, ts).unwrap(); } db } // ── Test 1: Degradation level progression ──────────────────────────────────── /// Test that the `LoadDetector` transitions through all 4 degradation levels /// when N concurrent threads each hold a guard simultaneously. #[test] fn degradation_progresses_under_concurrent_queries() { // Use low thresholds: reduced=5, coarse=10, no_diversity=15. let detector = Arc::new(tidaldb::load::LoadDetector::new( tidaldb::load::DegradationThresholds::new(5, 10, 15).unwrap(), )); let levels = Arc::new(std::sync::Mutex::new(Vec::new())); let barrier = Arc::new(std::sync::Barrier::new(20)); let mut handles = Vec::new(); for _ in 0..20 { let d = Arc::clone(&detector); let l = Arc::clone(&levels); let b = Arc::clone(&barrier); handles.push(thread::spawn(move || { let (level, guard) = d.enter(); b.wait(); // all threads hold their guards simultaneously l.lock().unwrap().push(level); drop(guard); })); } for h in handles { h.join().unwrap(); } let observed = levels.lock().unwrap(); // With 20 concurrent entries and thresholds at 5/10/15: // entries 1–4: Full, 5–9: ReducedCandidates, 10–14: CoarseAggregates, 15–20: NoDiversity assert!( observed.iter().any(|l| *l == DegradationLevel::NoDiversity), "expected NoDiversity, got: {observed:?}" ); assert!( observed.iter().any(|l| *l == DegradationLevel::Full), "expected Full, got: {observed:?}" ); // All guards dropped; counter must be zero. assert_eq!(detector.in_flight(), 0); } // ── Test 2: All queries return Ok under overload ────────────────────────────── /// 50 concurrent threads hammer `retrieve()` for 2 seconds. /// Zero non-backpressure errors are acceptable. #[test] fn all_queries_return_ok_under_overload() { let db = Arc::new(setup_db(100)); let stop = Arc::new(AtomicBool::new(false)); let error_count = Arc::new(AtomicUsize::new(0)); let query_count = Arc::new(AtomicUsize::new(0)); let mut handles = Vec::new(); for _ in 0..50 { let db = Arc::clone(&db); let stop = Arc::clone(&stop); let errors = Arc::clone(&error_count); let queries = Arc::clone(&query_count); handles.push(thread::spawn(move || { while !stop.load(Ordering::Relaxed) { let query = tidaldb::query::retrieve::Retrieve::builder() .profile("hot") .limit(10) .build() .unwrap(); match db.retrieve(&query) { Ok(_) => { queries.fetch_add(1, Ordering::Relaxed); } Err(e) => { // Backpressure is acceptable under overload. if !matches!(e, TidalError::Backpressure { .. }) { errors.fetch_add(1, Ordering::Relaxed); } } } } })); } thread::sleep(Duration::from_secs(2)); stop.store(true, Ordering::Release); for h in handles { h.join().unwrap(); } let total_queries = query_count.load(Ordering::Relaxed); let total_errors = error_count.load(Ordering::Relaxed); assert!(total_queries > 0, "expected some queries to complete"); assert_eq!( total_errors, 0, "expected zero non-backpressure errors, got {total_errors}" ); } // ── Test 3: Backpressure check logic ───────────────────────────────────────── /// Verify that the backpressure check correctly gates the WAL-full scenario. /// We test the logic by using the default threshold (1000) and confirming /// that normal signal writes bypass it (channel is empty). #[test] fn non_session_signals_bypass_rate_limiter() { let db = setup_db(10); // 1000 non-session signal writes must never return RateLimited. for i in 0..1000u64 { let ts = Timestamp::from_nanos(Timestamp::now().as_nanos().saturating_add(i)); match db.signal("view", EntityId::new(1), 1.0, ts) { Ok(()) => {} Err(TidalError::Backpressure { .. }) => {} // acceptable Err(TidalError::RateLimited { .. }) => { panic!("non-session signal must never be rate-limited"); } Err(e) => panic!("unexpected error: {e}"), } } } // ── Test 4: Rate limiter isolates per-agent-session ─────────────────────────── /// Agent A is flooded past its burst capacity (200 by default). /// Agent B's separate bucket must remain unaffected. #[test] fn rate_limiter_isolates_per_agent_session() { let db = setup_db(10); let handle_a = db .start_session(1, "agent-a", "normal", HashMap::new()) .unwrap(); let handle_b = db .start_session(2, "agent-b", "normal", HashMap::new()) .unwrap(); // Flood agent A with 300 signals (burst cap = 200 by default). let mut a_accepted = 0u32; let mut a_rejected = 0u32; for i in 0..300u64 { let ts = Timestamp::from_nanos(Timestamp::now().as_nanos().saturating_add(i)); match db.session_signal(&handle_a, "view", EntityId::new(1), 1.0, ts, None) { Ok(()) => a_accepted += 1, Err(TidalError::RateLimited { .. }) => a_rejected += 1, Err(e) => panic!("unexpected error for agent-a: {e}"), } } // Agent B should still be able to write (separate bucket). let ts = Timestamp::now(); let result = db.session_signal(&handle_b, "view", EntityId::new(2), 1.0, ts, None); assert!( result.is_ok(), "agent-b must not be rate-limited by agent-a" ); // Agent A should have hit the limit. assert!(a_rejected > 0, "expected agent-a to be rate-limited"); assert!(a_accepted > 0, "expected some agent-a signals to succeed"); db.close_session(handle_a).unwrap(); db.close_session(handle_b).unwrap(); } // ── Test 5: Rate limiter bucket cleaned up on session close ─────────────────── #[test] fn close_session_cleans_up_rate_limiter_bucket() { let db = setup_db(10); let handle = db .start_session(1, "agent-rl", "normal", HashMap::new()) .unwrap(); // Write one signal to ensure bucket is created. let _ = db.session_signal( &handle, "view", EntityId::new(1), 1.0, Timestamp::now(), None, ); assert!( db.rate_limiter_bucket_count() >= 1, "bucket must exist after write" ); db.close_session(handle).unwrap(); assert_eq!( db.rate_limiter_bucket_count(), 0, "bucket must be removed after close_session" ); } // ── Test 6: Session TTL sweeper closes expired sessions ─────────────────────── #[test] fn sweeper_auto_closes_expired_sessions() { let db = setup_db(10); // Start a session with short_ttl policy (500ms max duration). let handle = db .start_session(1, "agent-x", "short_ttl", HashMap::new()) .unwrap(); let session_id = handle.id; assert_eq!(db.active_sessions().len(), 1); // Wait past the TTL. thread::sleep(Duration::from_millis(600)); // Manually trigger a sweep (ephemeral mode, sweeper thread not auto-spawned). db.force_sweep(); // Session should be removed from active sessions. assert_eq!( db.active_sessions().len(), 0, "expired session must be swept" ); // The session snapshot must be archived. let snapshot = db.session_snapshot(session_id).unwrap(); assert_eq!(snapshot.id, session_id); // The handle's closed flag must be set. assert!( handle.closed.load(Ordering::Acquire), "handle.closed must be true after auto-close" ); } // ── Test 7: Sweeper does not close unexpired sessions ───────────────────────── #[test] fn sweeper_does_not_close_active_sessions() { let db = setup_db(10); // Start a session with the normal policy (1 hour max duration). let handle = db .start_session(1, "agent-y", "normal", HashMap::new()) .unwrap(); db.force_sweep(); // Session must still be active. assert_eq!( db.active_sessions().len(), 1, "unexpired session must remain" ); db.close_session(handle).unwrap(); } // ── Test 8: Degradation level visible in retrieve response ──────────────────── #[test] fn degradation_level_in_retrieve_response() { let db = setup_db(50); let query = tidaldb::query::retrieve::Retrieve::builder() .profile("hot") .limit(10) .build() .unwrap(); let results = db.retrieve(&query).unwrap(); // Under zero load the level must be Full. assert_eq!(results.degradation_level, DegradationLevel::Full); } // ── Test 9: Shutdown force-closes remaining active sessions ─────────────────── #[test] fn shutdown_closes_all_active_sessions() { let db = setup_db(10); // Start 3 sessions, close none. let h1 = db .start_session(1, "agent-z", "normal", HashMap::new()) .unwrap(); let h2 = db .start_session(2, "agent-z", "normal", HashMap::new()) .unwrap(); let h3 = db .start_session(3, "agent-z", "normal", HashMap::new()) .unwrap(); assert_eq!(db.active_sessions().len(), 3); // Drop the handles (does NOT auto-close sessions). drop(h1); drop(h2); drop(h3); // close() must force-close all 3. db.close().unwrap(); // No panic == pass; the active sessions were cleaned up internally. } // ── Test 10: auto_closed field on SessionSummary ────────────────────────────── #[test] fn session_summary_auto_closed_false_for_explicit_close() { let db = setup_db(10); let handle = db .start_session(1, "agent-ac", "normal", HashMap::new()) .unwrap(); let summary = db.close_session(handle).unwrap(); assert!( !summary.auto_closed, "explicit close must set auto_closed=false" ); } #[test] fn session_summary_auto_closed_true_for_internal_close() { let db = setup_db(10); let handle = db .start_session(1, "agent-ac2", "normal", HashMap::new()) .unwrap(); let session_id = handle.id; // Internal close without SessionHandle. let summary = db.close_session_internal(session_id, true).unwrap(); assert!( summary.auto_closed, "close_session_internal must set auto_closed=true" ); } // ── Test 11: Sweeper thread start + cancellation ────────────────────────────── #[test] fn sweeper_thread_cancellation() { let db = Arc::new(setup_db(10)); // Start the sweeper thread. TidalDb::start_sweeper(&db); // Close the database -- should signal the sweeper and join within ~1s. let start = std::time::Instant::now(); Arc::try_unwrap(db) .expect("test holds the only Arc") .close() .unwrap(); let elapsed = start.elapsed(); // The close should complete in well under 2 seconds (sweeper wakes every 1s). assert!( elapsed < Duration::from_secs(3), "close took too long: {elapsed:?}" ); }