438 lines
15 KiB
Markdown
438 lines
15 KiB
Markdown
# Task 06: Load Test
|
|
|
|
## Delivers
|
|
|
|
Integration test that simulates 3x overload with concurrent query and write threads. Verifies degradation level progression, backpressure behavior, rate limiting isolation between agents, and session TTL cleanup. Confirms the acceptance criteria: under sustained overload, all well-formed queries return results (no ServiceUnavailable).
|
|
|
|
## Complexity: L
|
|
|
|
## Dependencies
|
|
|
|
- task-01 (`DegradationLevel`, `LoadDetector`)
|
|
- task-02 (executor degradation branches)
|
|
- task-03 (`Results.degradation_level`, `TidalError::Backpressure`)
|
|
- task-04 (`RateLimiter`, `TidalError::RateLimited`)
|
|
- task-05 (session TTL sweeper, `SessionSummary.auto_closed`)
|
|
|
|
## Technical Design
|
|
|
|
### 1. Test file location
|
|
|
|
`tidal/tests/m7p2_load.rs`
|
|
|
|
This is an integration test because it:
|
|
1. Spawns multiple threads to simulate concurrent load
|
|
2. Exercises the full `TidalDb` API end-to-end
|
|
3. Requires real wall-clock timing for rate limiter and sweeper behavior
|
|
4. Needs a running WAL with actual channel backpressure
|
|
|
|
### 2. Test setup helper
|
|
|
|
```rust
|
|
// tidal/tests/m7p2_load.rs
|
|
#![allow(clippy::unwrap_used)]
|
|
|
|
use std::collections::HashMap;
|
|
use std::sync::Arc;
|
|
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
|
use std::thread;
|
|
use std::time::{Duration, Instant};
|
|
|
|
use tidaldb::{TidalDb, TidalError, DegradationLevel};
|
|
use tidaldb::schema::{DecaySpec, EntityId, EntityKind, SchemaBuilder, Timestamp, Window};
|
|
use tidaldb::session::SessionId;
|
|
use tidaldb::query::retrieve::Retrieve;
|
|
use tidaldb::query::search::Search;
|
|
|
|
/// Create a test database with items, signals, and a short-TTL policy.
|
|
fn setup_db(item_count: u64) -> Arc<TidalDb> {
|
|
let mut builder = SchemaBuilder::new();
|
|
let _ = builder
|
|
.signal(
|
|
"view",
|
|
EntityKind::Item,
|
|
DecaySpec::Exponential {
|
|
half_life: Duration::from_secs(7 * 24 * 3600),
|
|
},
|
|
)
|
|
.windows(&[Window::OneHour, Window::TwentyFourHours, Window::AllTime])
|
|
.velocity(true)
|
|
.add();
|
|
let _ = builder
|
|
.signal(
|
|
"like",
|
|
EntityKind::Item,
|
|
DecaySpec::Exponential {
|
|
half_life: Duration::from_secs(14 * 24 * 3600),
|
|
},
|
|
)
|
|
.windows(&[Window::TwentyFourHours, Window::AllTime])
|
|
.velocity(false)
|
|
.add();
|
|
|
|
// Short-TTL policy for sweeper testing.
|
|
builder.session_policy(tidaldb::AgentPolicy {
|
|
name: "short_ttl".to_string(),
|
|
max_session_duration: Duration::from_millis(500),
|
|
max_signals_per_session: 1000,
|
|
allowed_signals: vec![],
|
|
denied_signals: vec![],
|
|
rate_limit_per_second: None,
|
|
});
|
|
|
|
// Normal policy for rate limiting tests.
|
|
builder.session_policy(tidaldb::AgentPolicy {
|
|
name: "normal".to_string(),
|
|
max_session_duration: Duration::from_secs(3600),
|
|
max_signals_per_session: 10_000,
|
|
allowed_signals: vec![],
|
|
denied_signals: vec![],
|
|
rate_limit_per_second: None,
|
|
});
|
|
|
|
let schema = builder.build().unwrap();
|
|
let db = TidalDb::builder()
|
|
.ephemeral()
|
|
.with_schema(schema)
|
|
.open()
|
|
.unwrap();
|
|
|
|
// Seed items.
|
|
for i in 1..=item_count {
|
|
let mut meta = HashMap::new();
|
|
meta.insert("title".to_string(), format!("Item {i}"));
|
|
meta.insert("category".to_string(), "test".to_string());
|
|
meta.insert("format".to_string(), "video".to_string());
|
|
meta.insert("creator_id".to_string(), format!("{}", i % 10));
|
|
db.write_item_with_metadata(EntityId::new(i), &meta).unwrap();
|
|
}
|
|
|
|
// Seed signals.
|
|
let base_ts = Timestamp::now().as_nanos();
|
|
for i in 1..=item_count.min(100) {
|
|
let ts = Timestamp::from_nanos(base_ts - i * 60_000_000_000);
|
|
db.signal("view", EntityId::new(i), 1.0, ts).unwrap();
|
|
}
|
|
|
|
Arc::new(db)
|
|
}
|
|
```
|
|
|
|
### 3. Test 1: Degradation level progression under concurrent queries
|
|
|
|
```rust
|
|
#[test]
|
|
fn degradation_progresses_under_concurrent_queries() {
|
|
// Use low thresholds to trigger degradation with fewer threads.
|
|
// Override thresholds: reduced=5, coarse=10, no_diversity=15.
|
|
//
|
|
// Strategy:
|
|
// 1. Spawn N threads (e.g., 20) that each call db.retrieve() in a tight loop.
|
|
// 2. Collect the degradation_level from each response.
|
|
// 3. Assert that at least one response has a non-Full level.
|
|
//
|
|
// NOTE: This test uses the LoadDetector directly with low thresholds
|
|
// rather than relying on timing of real queries, because real queries
|
|
// complete too fast on modern hardware to sustain 15+ in-flight.
|
|
|
|
let detector = tidaldb::load::LoadDetector::new(
|
|
tidaldb::load::DegradationThresholds::new(5, 10, 15).unwrap(),
|
|
);
|
|
|
|
// Simulate 20 concurrent "queries" that each hold the guard for a moment.
|
|
let detector = Arc::new(detector);
|
|
let levels = Arc::new(std::sync::Mutex::new(Vec::new()));
|
|
let barrier = Arc::new(std::sync::Barrier::new(20));
|
|
|
|
let mut handles = Vec::new();
|
|
for _ in 0..20 {
|
|
let d = Arc::clone(&detector);
|
|
let l = Arc::clone(&levels);
|
|
let b = Arc::clone(&barrier);
|
|
handles.push(thread::spawn(move || {
|
|
let (level, guard) = d.enter();
|
|
b.wait(); // all threads hold their guards simultaneously
|
|
l.lock().unwrap().push(level);
|
|
drop(guard);
|
|
}));
|
|
}
|
|
|
|
for h in handles {
|
|
h.join().unwrap();
|
|
}
|
|
|
|
let observed = levels.lock().unwrap();
|
|
// With 20 concurrent entries and thresholds at 5/10/15:
|
|
// - Entries 1-4: Full
|
|
// - Entries 5-9: ReducedCandidates
|
|
// - Entries 10-14: CoarseAggregates
|
|
// - Entries 15-20: NoDiversity
|
|
assert!(
|
|
observed.iter().any(|l| *l == DegradationLevel::NoDiversity),
|
|
"expected at least one NoDiversity, got: {observed:?}"
|
|
);
|
|
assert!(
|
|
observed.iter().any(|l| *l == DegradationLevel::Full),
|
|
"expected at least one Full, got: {observed:?}"
|
|
);
|
|
|
|
// After all guards are dropped, in_flight should be 0.
|
|
assert_eq!(detector.in_flight(), 0);
|
|
}
|
|
```
|
|
|
|
### 4. Test 2: All queries return Ok under overload
|
|
|
|
```rust
|
|
#[test]
|
|
fn all_queries_return_ok_under_overload() {
|
|
let db = setup_db(100);
|
|
let stop = Arc::new(AtomicBool::new(false));
|
|
let error_count = Arc::new(AtomicUsize::new(0));
|
|
let query_count = Arc::new(AtomicUsize::new(0));
|
|
|
|
// Spawn 50 query threads.
|
|
let mut handles = Vec::new();
|
|
for _ in 0..50 {
|
|
let db = Arc::clone(&db);
|
|
let stop = Arc::clone(&stop);
|
|
let errors = Arc::clone(&error_count);
|
|
let queries = Arc::clone(&query_count);
|
|
handles.push(thread::spawn(move || {
|
|
while !stop.load(Ordering::Relaxed) {
|
|
let query = Retrieve::builder().profile("hot").limit(10).build().unwrap();
|
|
match db.retrieve(&query) {
|
|
Ok(_) => {
|
|
queries.fetch_add(1, Ordering::Relaxed);
|
|
}
|
|
Err(e) => {
|
|
// Only count non-backpressure errors.
|
|
if !matches!(e, TidalError::Backpressure { .. }) {
|
|
errors.fetch_add(1, Ordering::Relaxed);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}));
|
|
}
|
|
|
|
// Run for 2 seconds.
|
|
thread::sleep(Duration::from_secs(2));
|
|
stop.store(true, Ordering::Release);
|
|
|
|
for h in handles {
|
|
h.join().unwrap();
|
|
}
|
|
|
|
let total_queries = query_count.load(Ordering::Relaxed);
|
|
let total_errors = error_count.load(Ordering::Relaxed);
|
|
assert!(total_queries > 0, "expected some queries to complete");
|
|
assert_eq!(
|
|
total_errors, 0,
|
|
"expected zero non-backpressure errors, got {total_errors}"
|
|
);
|
|
}
|
|
```
|
|
|
|
### 5. Test 3: Backpressure on write path
|
|
|
|
```rust
|
|
#[test]
|
|
fn backpressure_returns_retry_after() {
|
|
// This test requires a durable db with an actual WAL channel.
|
|
// Flood the WAL channel with signals faster than the writer can drain.
|
|
//
|
|
// NOTE: With a bounded channel of 10,000 and a threshold at 8,000,
|
|
// we need to enqueue ~8,000 signals very fast. In practice the WAL
|
|
// writer is fast enough that this may not trigger. Use a lower
|
|
// threshold for the test or mock the WAL channel.
|
|
//
|
|
// Alternative: test the backpressure check logic directly by
|
|
// constructing a TidalDb with a custom BackpressureConfig that
|
|
// has a threshold of 0 (always backpressure).
|
|
}
|
|
```
|
|
|
|
### 6. Test 4: Rate limiter isolates agents
|
|
|
|
```rust
|
|
#[test]
|
|
fn rate_limiter_isolates_per_agent_session() {
|
|
let db = setup_db(10);
|
|
|
|
// Agent A: session with burst capacity = 200 (default).
|
|
let handle_a = db.start_session(1, "agent-a", "normal", HashMap::new()).unwrap();
|
|
|
|
// Agent B: separate session.
|
|
let handle_b = db.start_session(2, "agent-b", "normal", HashMap::new()).unwrap();
|
|
|
|
// Flood agent A past its rate limit.
|
|
let mut a_accepted = 0u32;
|
|
let mut a_rejected = 0u32;
|
|
for i in 0..300 {
|
|
let ts = Timestamp::from_nanos(Timestamp::now().as_nanos() + u64::from(i));
|
|
match db.session_signal(&handle_a, "view", EntityId::new(1), 1.0, ts, None) {
|
|
Ok(()) => a_accepted += 1,
|
|
Err(TidalError::RateLimited { .. }) => a_rejected += 1,
|
|
Err(e) => panic!("unexpected error for agent-a: {e}"),
|
|
}
|
|
}
|
|
|
|
// Agent B should still be able to write (separate bucket).
|
|
let ts = Timestamp::now();
|
|
let result = db.session_signal(&handle_b, "view", EntityId::new(2), 1.0, ts, None);
|
|
assert!(result.is_ok(), "agent-b should not be rate limited by agent-a");
|
|
|
|
// Agent A should have hit the limit.
|
|
assert!(a_rejected > 0, "expected agent-a to be rate limited");
|
|
assert!(a_accepted > 0, "expected some agent-a signals to succeed");
|
|
|
|
db.close_session(handle_a).unwrap();
|
|
db.close_session(handle_b).unwrap();
|
|
}
|
|
```
|
|
|
|
### 7. Test 5: Rate limiter does not affect non-session signals
|
|
|
|
```rust
|
|
#[test]
|
|
fn non_session_signals_bypass_rate_limiter() {
|
|
let db = setup_db(10);
|
|
|
|
// Write 1000 non-session signals in a tight loop.
|
|
// None should be rate limited.
|
|
for i in 0..1000u64 {
|
|
let ts = Timestamp::from_nanos(Timestamp::now().as_nanos() + i);
|
|
let result = db.signal("view", EntityId::new(1), 1.0, ts);
|
|
// May get Backpressure (WAL full) but never RateLimited.
|
|
match result {
|
|
Ok(()) => {}
|
|
Err(TidalError::Backpressure { .. }) => {} // acceptable
|
|
Err(TidalError::RateLimited { .. }) => {
|
|
panic!("non-session signal should never be rate limited");
|
|
}
|
|
Err(e) => panic!("unexpected error: {e}"),
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
### 8. Test 6: Session TTL sweeper auto-closes expired sessions
|
|
|
|
```rust
|
|
#[test]
|
|
fn sweeper_auto_closes_expired_sessions() {
|
|
let db = setup_db(10);
|
|
|
|
// Start a session with the short_ttl policy (500ms max duration).
|
|
let handle = db.start_session(1, "agent-x", "short_ttl", HashMap::new()).unwrap();
|
|
let session_id = handle.id;
|
|
|
|
// Verify it's active.
|
|
assert_eq!(db.active_sessions().len(), 1);
|
|
|
|
// Sleep past the TTL.
|
|
thread::sleep(Duration::from_millis(600));
|
|
|
|
// Trigger a sweep manually (the background thread may not have run yet
|
|
// in ephemeral mode, and we want deterministic test behavior).
|
|
// NOTE: sweep_expired_sessions() is pub(crate), so this test must be
|
|
// in the crate or use a test-only helper. Alternative: wait for the
|
|
// sweeper thread to run.
|
|
//
|
|
// For integration tests outside the crate, wait 62s for the sweeper
|
|
// or add a pub test-only method db.force_sweep(). The recommended
|
|
// approach is adding a #[cfg(test)] pub method.
|
|
|
|
// Wait for the sweeper to run (if the sweeper is active).
|
|
// In ephemeral mode the sweeper may not be spawned, so this test
|
|
// may need to call a force_sweep() test helper.
|
|
//
|
|
// For now, verify the session is closeable internally:
|
|
// After the TTL, a session_signal should fail with SessionExpired.
|
|
let ts = Timestamp::now();
|
|
let result = db.session_signal(&handle, "view", EntityId::new(1), 1.0, ts, None);
|
|
assert!(
|
|
matches!(result, Err(TidalError::SessionExpired { .. })),
|
|
"expired session should reject signals, got: {result:?}"
|
|
);
|
|
|
|
// The SessionHandle still exists, but the session is expired.
|
|
// The sweeper (or explicit close) will archive it.
|
|
}
|
|
```
|
|
|
|
### 9. Test 7: Degradation level visible in response
|
|
|
|
```rust
|
|
#[test]
|
|
fn degradation_level_in_retrieve_response() {
|
|
let db = setup_db(50);
|
|
|
|
let query = Retrieve::builder().profile("hot").limit(10).build().unwrap();
|
|
let results = db.retrieve(&query).unwrap();
|
|
|
|
// Under no load, degradation should be Full.
|
|
assert_eq!(results.degradation_level, DegradationLevel::Full);
|
|
}
|
|
|
|
#[test]
|
|
fn degradation_level_in_search_response() {
|
|
// NOTE: Search requires text index setup. If text index is not
|
|
// available in ephemeral mode, use a vector-only search or skip.
|
|
}
|
|
```
|
|
|
|
### 10. Test 8: Shutdown cleans up all sessions
|
|
|
|
```rust
|
|
#[test]
|
|
fn shutdown_closes_all_active_sessions() {
|
|
let db = setup_db(10);
|
|
|
|
// Start 5 sessions, close none.
|
|
let mut handles = Vec::new();
|
|
for i in 0..5 {
|
|
let h = db.start_session(
|
|
u64::try_from(i + 1).unwrap(),
|
|
"agent-z",
|
|
"normal",
|
|
HashMap::new(),
|
|
).unwrap();
|
|
handles.push(h);
|
|
}
|
|
|
|
assert_eq!(db.active_sessions().len(), 5);
|
|
|
|
// Drop handles (they won't auto-close -- that's the SessionHandle contract).
|
|
drop(handles);
|
|
|
|
// Close the database. This should force-close all 5 sessions.
|
|
// (We need to consume the Arc, which is tricky in test setup.
|
|
// Use Arc::try_unwrap or test with a non-Arc db.)
|
|
}
|
|
```
|
|
|
|
## Acceptance Criteria
|
|
|
|
- [ ] Integration test file at `tidal/tests/m7p2_load.rs`
|
|
- [ ] Test: degradation level progresses from Full -> NoDiversity as in-flight increases
|
|
- [ ] Test: all queries return Ok under 50-thread overload (zero non-backpressure errors)
|
|
- [ ] Test: backpressure error returned when WAL queue saturated (or logic tested directly)
|
|
- [ ] Test: rate limiter isolates agent-a from agent-b (separate token buckets)
|
|
- [ ] Test: non-session signals bypass rate limiter
|
|
- [ ] Test: expired sessions detected and auto-closed by sweeper
|
|
- [ ] Test: degradation level visible in `Results.degradation_level`
|
|
- [ ] Test: shutdown force-closes remaining active sessions
|
|
- [ ] All tests pass: `cargo test --manifest-path tidal/Cargo.toml --test m7p2_load`
|
|
- [ ] All existing test suites still pass
|
|
- [ ] `cargo clippy -D warnings` clean
|
|
|
|
## Test Strategy
|
|
|
|
All tests are in `tidal/tests/m7p2_load.rs` as described above. The tests are structured to be deterministic where possible (using direct method calls to `sweep_expired_sessions()` and `LoadDetector::enter()` rather than relying on timing) and use timeouts where wall-clock behavior is being tested.
|
|
|
|
The 50-thread overload test (Test 2) runs for only 2 seconds to keep CI fast. The rate limiter test (Test 4) uses a tight loop of 300 signals to reliably trigger the default 200-burst capacity.
|
|
|
|
For the backpressure test, if saturating the real WAL channel is impractical in CI, test the check logic directly by setting a threshold of 0 and asserting that the first signal write returns `TidalError::Backpressure`.
|