tidaldb/tidal/tests/m7p2_load.rs
jordan f4cfd6c81f feat: complete M8 replication primitives + forage enhancements + docs
Milestone 8 (phases 1-4):
- Shard-aware WAL segment naming, BatchHeader v2, ShardRouter
- Transport trait, InProcessTransport, WalShipper, FollowerDb
- HLC, PNCounter, LWWRegister, CrdtSignalState, ReconciliationEngine
- Session replication bridge with SeqNo/HWM, idempotency store

Forage application:
- Multi-source discovery engine with MAB exploration
- Embedding-based label system, server handlers, UI refresh

Other:
- QUICKSTART.md, README.md, milestone-8 planning docs
- Hard negative union semantics, RLHF export enhancements
- Recovery benchmark and visibility test expansions
- Split 8 oversized source files per CODING_GUIDELINES §9

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 13:17:19 -07:00

460 lines
15 KiB
Rust
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// M7p2 load test integration suite.
//
// Tests: degradation level progression, all-queries-ok under overload,
// rate limiter isolation, session TTL sweeper, degradation in response,
// shutdown session cleanup.
#![allow(clippy::unwrap_used)]
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::thread;
use std::time::Duration;
use tidaldb::schema::{DecaySpec, EntityId, EntityKind, SchemaBuilder, Timestamp, Window};
use tidaldb::{AgentPolicy, DegradationLevel, TidalDb, TidalError};
// ── Setup helper ──────────────────────────────────────────────────────────────
/// Build a test database with `item_count` items and a short-TTL policy.
fn setup_db(item_count: u64) -> TidalDb {
let mut builder = SchemaBuilder::new();
let _ = builder
.signal(
"view",
EntityKind::Item,
DecaySpec::Exponential {
half_life: Duration::from_secs(7 * 24 * 3600),
},
)
.windows(&[Window::OneHour, Window::TwentyFourHours, Window::AllTime])
.velocity(true)
.add();
let _ = builder
.signal(
"like",
EntityKind::Item,
DecaySpec::Exponential {
half_life: Duration::from_secs(14 * 24 * 3600),
},
)
.windows(&[Window::TwentyFourHours, Window::AllTime])
.velocity(false)
.add();
// Short-TTL policy: 500ms max session duration.
builder.session_policy(
"short_ttl",
AgentPolicy {
max_session_duration: Duration::from_millis(500),
max_signals_per_session: 1000,
allowed_signals: vec![],
denied_signals: vec![],
},
);
// Normal policy for rate limiting tests.
builder.session_policy(
"normal",
AgentPolicy {
max_session_duration: Duration::from_secs(3600),
max_signals_per_session: 10_000,
allowed_signals: vec![],
denied_signals: vec![],
},
);
let schema = builder.build().unwrap();
// Explicitly configure the rate limiter for load tests.
// Default is unlimited; these tests exercise the 100/sec, 200-burst limits.
let rl_config = tidaldb::load::RateLimiterConfig::limited(100.0, 200.0);
let db = TidalDb::builder()
.ephemeral()
.with_schema(schema)
.with_rate_limiter_config(rl_config)
.open()
.unwrap();
// Seed items.
for i in 1..=item_count {
let mut meta = HashMap::new();
meta.insert("title".to_string(), format!("Item {i}"));
meta.insert("category".to_string(), "test".to_string());
meta.insert("format".to_string(), "video".to_string());
meta.insert("creator_id".to_string(), format!("{}", i % 10));
db.write_item_with_metadata(EntityId::new(i), &meta)
.unwrap();
}
// Seed signals.
let base_ts = Timestamp::now().as_nanos();
for i in 1..=item_count.min(100) {
let ts = Timestamp::from_nanos(base_ts.saturating_sub(i * 60_000_000_000));
db.signal("view", EntityId::new(i), 1.0, ts).unwrap();
}
db
}
// ── Test 1: Degradation level progression ────────────────────────────────────
/// Test that the `LoadDetector` transitions through all 4 degradation levels
/// when N concurrent threads each hold a guard simultaneously.
#[test]
fn degradation_progresses_under_concurrent_queries() {
// Use low thresholds: reduced=5, coarse=10, no_diversity=15.
let detector = Arc::new(tidaldb::load::LoadDetector::new(
tidaldb::load::DegradationThresholds::new(5, 10, 15).unwrap(),
));
let levels = Arc::new(std::sync::Mutex::new(Vec::new()));
let barrier = Arc::new(std::sync::Barrier::new(20));
let mut handles = Vec::new();
for _ in 0..20 {
let d = Arc::clone(&detector);
let l = Arc::clone(&levels);
let b = Arc::clone(&barrier);
handles.push(thread::spawn(move || {
let (level, guard) = d.enter();
b.wait(); // all threads hold their guards simultaneously
l.lock().unwrap().push(level);
drop(guard);
}));
}
for h in handles {
h.join().unwrap();
}
let observed = levels.lock().unwrap();
// With 20 concurrent entries and thresholds at 5/10/15:
// entries 14: Full, 59: ReducedCandidates, 1014: CoarseAggregates, 1520: NoDiversity
assert!(
observed.iter().any(|l| *l == DegradationLevel::NoDiversity),
"expected NoDiversity, got: {observed:?}"
);
assert!(
observed.iter().any(|l| *l == DegradationLevel::Full),
"expected Full, got: {observed:?}"
);
// All guards dropped; counter must be zero.
assert_eq!(detector.in_flight(), 0);
}
// ── Test 2: All queries return Ok under overload ──────────────────────────────
/// 50 concurrent threads hammer `retrieve()` for 2 seconds.
/// Zero non-backpressure errors are acceptable.
#[test]
fn all_queries_return_ok_under_overload() {
let db = Arc::new(setup_db(100));
let stop = Arc::new(AtomicBool::new(false));
let error_count = Arc::new(AtomicUsize::new(0));
let query_count = Arc::new(AtomicUsize::new(0));
let mut handles = Vec::new();
for _ in 0..50 {
let db = Arc::clone(&db);
let stop = Arc::clone(&stop);
let errors = Arc::clone(&error_count);
let queries = Arc::clone(&query_count);
handles.push(thread::spawn(move || {
while !stop.load(Ordering::Relaxed) {
let query = tidaldb::query::retrieve::Retrieve::builder()
.profile("hot")
.limit(10)
.build()
.unwrap();
match db.retrieve(&query) {
Ok(_) => {
queries.fetch_add(1, Ordering::Relaxed);
}
Err(e) => {
// Backpressure is acceptable under overload.
if !matches!(e, TidalError::Backpressure { .. }) {
errors.fetch_add(1, Ordering::Relaxed);
}
}
}
}
}));
}
thread::sleep(Duration::from_secs(2));
stop.store(true, Ordering::Release);
for h in handles {
h.join().unwrap();
}
let total_queries = query_count.load(Ordering::Relaxed);
let total_errors = error_count.load(Ordering::Relaxed);
assert!(total_queries > 0, "expected some queries to complete");
assert_eq!(
total_errors, 0,
"expected zero non-backpressure errors, got {total_errors}"
);
}
// ── Test 3: Backpressure check logic ─────────────────────────────────────────
/// Verify that the backpressure check correctly gates the WAL-full scenario.
/// We test the logic by using the default threshold (1000) and confirming
/// that normal signal writes bypass it (channel is empty).
#[test]
fn non_session_signals_bypass_rate_limiter() {
let db = setup_db(10);
// 1000 non-session signal writes must never return RateLimited.
for i in 0..1000u64 {
let ts = Timestamp::from_nanos(Timestamp::now().as_nanos().saturating_add(i));
match db.signal("view", EntityId::new(1), 1.0, ts) {
Ok(()) => {}
Err(TidalError::Backpressure { .. }) => {} // acceptable
Err(TidalError::RateLimited { .. }) => {
panic!("non-session signal must never be rate-limited");
}
Err(e) => panic!("unexpected error: {e}"),
}
}
}
// ── Test 4: Rate limiter isolates per-agent-session ───────────────────────────
/// Agent A is flooded past its burst capacity (200 by default).
/// Agent B's separate bucket must remain unaffected.
#[test]
fn rate_limiter_isolates_per_agent_session() {
let db = setup_db(10);
let handle_a = db
.start_session(1, "agent-a", "normal", HashMap::new())
.unwrap();
let handle_b = db
.start_session(2, "agent-b", "normal", HashMap::new())
.unwrap();
// Flood agent A with 300 signals (burst cap = 200 by default).
let mut a_accepted = 0u32;
let mut a_rejected = 0u32;
for i in 0..300u64 {
let ts = Timestamp::from_nanos(Timestamp::now().as_nanos().saturating_add(i));
match db.session_signal(&handle_a, "view", EntityId::new(1), 1.0, ts, None) {
Ok(()) => a_accepted += 1,
Err(TidalError::RateLimited { .. }) => a_rejected += 1,
Err(e) => panic!("unexpected error for agent-a: {e}"),
}
}
// Agent B should still be able to write (separate bucket).
let ts = Timestamp::now();
let result = db.session_signal(&handle_b, "view", EntityId::new(2), 1.0, ts, None);
assert!(
result.is_ok(),
"agent-b must not be rate-limited by agent-a"
);
// Agent A should have hit the limit.
assert!(a_rejected > 0, "expected agent-a to be rate-limited");
assert!(a_accepted > 0, "expected some agent-a signals to succeed");
db.close_session(handle_a).unwrap();
db.close_session(handle_b).unwrap();
}
// ── Test 5: Rate limiter bucket cleaned up on session close ───────────────────
#[test]
fn close_session_cleans_up_rate_limiter_bucket() {
let db = setup_db(10);
let handle = db
.start_session(1, "agent-rl", "normal", HashMap::new())
.unwrap();
// Write one signal to ensure bucket is created.
let _ = db.session_signal(
&handle,
"view",
EntityId::new(1),
1.0,
Timestamp::now(),
None,
);
assert!(
db.rate_limiter_bucket_count() >= 1,
"bucket must exist after write"
);
db.close_session(handle).unwrap();
assert_eq!(
db.rate_limiter_bucket_count(),
0,
"bucket must be removed after close_session"
);
}
// ── Test 6: Session TTL sweeper closes expired sessions ───────────────────────
#[test]
fn sweeper_auto_closes_expired_sessions() {
let db = setup_db(10);
// Start a session with short_ttl policy (500ms max duration).
let handle = db
.start_session(1, "agent-x", "short_ttl", HashMap::new())
.unwrap();
let session_id = handle.id;
assert_eq!(db.active_sessions().len(), 1);
// Wait past the TTL.
thread::sleep(Duration::from_millis(600));
// Manually trigger a sweep (ephemeral mode, sweeper thread not auto-spawned).
db.force_sweep();
// Session should be removed from active sessions.
assert_eq!(
db.active_sessions().len(),
0,
"expired session must be swept"
);
// The session snapshot must be archived.
let snapshot = db.session_snapshot(session_id).unwrap();
assert_eq!(snapshot.id, session_id);
// The handle's closed flag must be set.
assert!(
handle.closed.load(Ordering::Acquire),
"handle.closed must be true after auto-close"
);
}
// ── Test 7: Sweeper does not close unexpired sessions ─────────────────────────
#[test]
fn sweeper_does_not_close_active_sessions() {
let db = setup_db(10);
// Start a session with the normal policy (1 hour max duration).
let handle = db
.start_session(1, "agent-y", "normal", HashMap::new())
.unwrap();
db.force_sweep();
// Session must still be active.
assert_eq!(
db.active_sessions().len(),
1,
"unexpired session must remain"
);
db.close_session(handle).unwrap();
}
// ── Test 8: Degradation level visible in retrieve response ────────────────────
#[test]
fn degradation_level_in_retrieve_response() {
let db = setup_db(50);
let query = tidaldb::query::retrieve::Retrieve::builder()
.profile("hot")
.limit(10)
.build()
.unwrap();
let results = db.retrieve(&query).unwrap();
// Under zero load the level must be Full.
assert_eq!(results.degradation_level, DegradationLevel::Full);
}
// ── Test 9: Shutdown force-closes remaining active sessions ───────────────────
#[test]
fn shutdown_closes_all_active_sessions() {
let db = setup_db(10);
// Start 3 sessions, close none.
let h1 = db
.start_session(1, "agent-z", "normal", HashMap::new())
.unwrap();
let h2 = db
.start_session(2, "agent-z", "normal", HashMap::new())
.unwrap();
let h3 = db
.start_session(3, "agent-z", "normal", HashMap::new())
.unwrap();
assert_eq!(db.active_sessions().len(), 3);
// Drop the handles (does NOT auto-close sessions).
drop(h1);
drop(h2);
drop(h3);
// close() must force-close all 3.
db.close().unwrap();
// No panic == pass; the active sessions were cleaned up internally.
}
// ── Test 10: auto_closed field on SessionSummary ──────────────────────────────
#[test]
fn session_summary_auto_closed_false_for_explicit_close() {
let db = setup_db(10);
let handle = db
.start_session(1, "agent-ac", "normal", HashMap::new())
.unwrap();
let summary = db.close_session(handle).unwrap();
assert!(
!summary.auto_closed,
"explicit close must set auto_closed=false"
);
}
#[test]
fn session_summary_auto_closed_true_for_internal_close() {
let db = setup_db(10);
let handle = db
.start_session(1, "agent-ac2", "normal", HashMap::new())
.unwrap();
let session_id = handle.id;
// Internal close without SessionHandle.
let summary = db.close_session_internal(session_id, true).unwrap();
assert!(
summary.auto_closed,
"close_session_internal must set auto_closed=true"
);
}
// ── Test 11: Sweeper thread start + cancellation ──────────────────────────────
#[test]
fn sweeper_thread_cancellation() {
let db = Arc::new(setup_db(10));
// Start the sweeper thread.
TidalDb::start_sweeper(&db);
// Close the database -- should signal the sweeper and join within ~1s.
let start = std::time::Instant::now();
Arc::try_unwrap(db)
.expect("test holds the only Arc")
.close()
.unwrap();
let elapsed = start.elapsed();
// The close should complete in well under 2 seconds (sweeper wakes every 1s).
assert!(
elapsed < Duration::from_secs(3),
"close took too long: {elapsed:?}"
);
}