tidaldb/tidal/tests/m1_uat.rs
jordan 192c473f55 feat: complete Milestone 5 — full-text search, RRF fusion, and creator search
- M5p1: BM25 text indexing via Tantivy with background syncer (0.26ms @ 10K docs)
- M5p2: RRF fusion layer combining BM25 + ANN scores (46µs @ 1K candidates)
- M5p3: unified Search query API (8-stage pipeline, BM25 + vector + ranking)
- M5p4: creator text + vector indexing and creator search executor (< 20ms @ 200 creators)
- Refactor db/mod.rs into focused sub-modules (creators, items, sessions, signals, etc.)
- Decompose monolithic files into directory modules (query/executor, ranking/diversity, etc.)
- Split brute.rs → brute/mod.rs + brute/tests.rs; extract search executor helpers
- Add benches: fusion, search, session, text_index
- Add M5 UAT test suites (m5_uat, m5_search, m5p4_creator_search, text_index)
- Update blog posts, roadmap, content strategy, and M5 planning docs
- Add tmp/ and .claude/worktrees/ to .gitignore

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-21 23:53:16 -07:00

645 lines
24 KiB
Rust

#![allow(
clippy::unwrap_used,
clippy::cast_precision_loss,
clippy::cast_possible_truncation,
clippy::cast_sign_loss
)]
//! Milestone 1 User Acceptance Test.
//!
//! Proves the full M1 lifecycle: schema declaration, entity CRUD, signal
//! ingestion with WAL-backed durability, decay score reads, windowed counts,
//! velocity, and crash recovery.
//!
//! The main test (`m1_milestone_uat`) follows the ROADMAP.md scenario:
//! 1. Open with schema (view/like/skip)
//! 2. Write 100 items with metadata
//! 3. Write signal events spanning last 7 days
//! 4-5. Read decay score, windowed count, velocity for item #42
//! 6-7. Write a new event, verify immediate visibility
//! 8-9. Close, reopen, verify durability
//!
//! Note: The ROADMAP specifies 10K events but persistent-mode WAL writes
//! are serialized through group commit with a 10ms batch timeout. In a
//! single-threaded test, each event waits for the timeout. We use 1K events
//! to keep the test under 15s while still exercising all code paths. The
//! `benches/` suite validates throughput at scale.
use std::collections::HashMap;
use std::time::Duration;
use tidaldb::TidalDb;
use tidaldb::schema::{DecaySpec, EntityId, EntityKind, SchemaBuilder, Timestamp, Window};
// ── Schema construction ─────────────────────────────────────────────────────
fn m1_schema() -> tidaldb::schema::Schema {
let mut builder = SchemaBuilder::new();
// "view": exponential decay, half_life=7d, windows=[1h, 24h, 7d], velocity=true
let _ = builder
.signal(
"view",
EntityKind::Item,
DecaySpec::Exponential {
half_life: Duration::from_secs(7 * 24 * 3600),
},
)
.windows(&[Window::OneHour, Window::TwentyFourHours, Window::SevenDays])
.velocity(true)
.add();
// "like": exponential decay, half_life=14d, windows=[24h, 7d, all_time]
let _ = builder
.signal(
"like",
EntityKind::Item,
DecaySpec::Exponential {
half_life: Duration::from_secs(14 * 24 * 3600),
},
)
.windows(&[Window::TwentyFourHours, Window::SevenDays, Window::AllTime])
.velocity(true)
.add();
// "skip": exponential decay, half_life=1d, windows=[1h, 24h]
let _ = builder
.signal(
"skip",
EntityKind::Item,
DecaySpec::Exponential {
half_life: Duration::from_secs(24 * 3600),
},
)
.windows(&[Window::OneHour, Window::TwentyFourHours])
.velocity(false)
.add();
builder.build().unwrap()
}
// ── Analytical helpers ──────────────────────────────────────────────────────
/// Compute the analytical exponential decay score for a set of (weight, timestamp_ns)
/// events evaluated at `now_ns`.
///
/// Formula: sum_i(w_i * exp(-lambda * (now_ns - t_i) / 1e9))
fn analytical_decay(events: &[(f64, u64)], lambda: f64, now_ns: u64) -> f64 {
events.iter().fold(0.0, |acc, &(w, t)| {
let dt_secs = if now_ns >= t {
(now_ns - t) as f64 / 1e9
} else {
0.0
};
acc + w * (-lambda * dt_secs).exp()
})
}
/// Simple LCG for deterministic pseudo-random generation (no dependency needed).
struct Lcg {
state: u64,
}
impl Lcg {
fn new(seed: u64) -> Self {
Self { state: seed }
}
fn next(&mut self) -> u64 {
// Knuth LCG constants
self.state = self
.state
.wrapping_mul(6_364_136_223_846_793_005)
.wrapping_add(1_442_695_040_888_963_407);
self.state
}
/// Random u64 in [0, max)
fn next_range(&mut self, max: u64) -> u64 {
self.next() % max
}
}
// ── Compile-time assertions ─────────────────────────────────────────────────
/// TidalDb must be Send + Sync for safe sharing across threads.
const _: () = {
fn assert_send_sync<T: Send + Sync>() {}
// This function is never called at runtime -- it only needs to compile.
#[allow(dead_code)]
fn check() {
assert_send_sync::<TidalDb>();
}
};
// ── Focused acceptance criteria tests ───────────────────────────────────────
#[test]
fn m1p5_open_close_lifecycle() {
let db = TidalDb::builder()
.ephemeral()
.with_schema(m1_schema())
.open()
.unwrap();
db.health_check().unwrap();
db.close().unwrap();
}
#[test]
fn m1p5_shutdown_alias_works() {
let db = TidalDb::builder()
.ephemeral()
.with_schema(m1_schema())
.open()
.unwrap();
db.shutdown().unwrap();
}
#[test]
fn m1p5_write_item_and_read_metadata() {
let db = TidalDb::builder()
.ephemeral()
.with_schema(m1_schema())
.open()
.unwrap();
let id = EntityId::new(42);
let mut meta = HashMap::new();
meta.insert("title".to_string(), "Test Article".to_string());
meta.insert("category".to_string(), "tech".to_string());
db.write_item(id, &meta).unwrap();
let retrieved = db.get_item_metadata(id).unwrap();
assert!(retrieved.is_some(), "metadata should exist after write");
let retrieved = retrieved.unwrap();
assert_eq!(retrieved.get("title").unwrap(), "Test Article");
assert_eq!(retrieved.get("category").unwrap(), "tech");
db.close().unwrap();
}
#[test]
fn m1p5_signal_updates_decay_score() {
let db = TidalDb::builder()
.ephemeral()
.with_schema(m1_schema())
.open()
.unwrap();
let id = EntityId::new(1);
let now = Timestamp::now();
db.signal("view", id, 1.0, now).unwrap();
let score = db.read_decay_score(id, "view", 0).unwrap();
assert!(score.is_some(), "should have a score after signal");
// Score should be close to 1.0 since the event just happened.
assert!(
score.unwrap() > 0.99,
"score for just-written event should be close to 1.0, got {}",
score.unwrap()
);
db.close().unwrap();
}
#[test]
fn m1p5_windowed_count_and_velocity() {
let db = TidalDb::builder()
.ephemeral()
.with_schema(m1_schema())
.open()
.unwrap();
let id = EntityId::new(1);
let now = Timestamp::now();
// Write 5 events with weight 1.0
for i in 0..5u64 {
// Spread events over last 10 seconds so they all fall within 1h window.
let ts = Timestamp::from_nanos(now.as_nanos() - (i * 1_000_000_000));
db.signal("view", id, 1.0, ts).unwrap();
}
let count = db.read_windowed_count(id, "view", Window::OneHour).unwrap();
assert_eq!(count, 5, "windowed count should match number of events");
let velocity = db.read_velocity(id, "view", Window::OneHour).unwrap();
let expected_velocity = 5.0 / 3600.0;
assert!(
(velocity - expected_velocity).abs() < 1e-10,
"velocity should be count/window_secs, got {velocity}, expected {expected_velocity}"
);
db.close().unwrap();
}
#[test]
fn m1p5_signal_error_on_unknown_type() {
let db = TidalDb::builder()
.ephemeral()
.with_schema(m1_schema())
.open()
.unwrap();
let result = db.signal("nonexistent", EntityId::new(1), 1.0, Timestamp::now());
assert!(
result.is_err(),
"signal with unknown type should return error"
);
db.close().unwrap();
}
// ── Full M1 UAT scenario ───────────────────────────────────────────────────
#[test]
fn m1_milestone_uat() {
let dir = tempfile::tempdir().unwrap();
let now = Timestamp::now();
let now_ns = now.as_nanos();
// Decay constants
let view_half_life_secs = 7.0 * 24.0 * 3600.0;
let view_lambda = std::f64::consts::LN_2 / view_half_life_secs;
let seven_days_ns: u64 = 7 * 24 * 3600 * 1_000_000_000;
let one_hour_ns: u64 = 3600 * 1_000_000_000;
let twenty_four_hours_ns: u64 = 24 * 3600 * 1_000_000_000;
// Event count: 1000 events across 100 entities x 3 signal types.
// Single-threaded WAL writes wait for batch timeout (~10ms each),
// so 1000 events keeps the test under 15s while exercising all paths.
let event_count = 1_000u64;
// ── Step 1: Open with schema ────────────────────────────────────────
let schema = m1_schema();
let db = TidalDb::builder()
.with_data_dir(dir.path())
.with_schema(schema.clone())
.open()
.unwrap();
// ── Step 2: Write 100 items with metadata ───────────────────────────
for i in 0..100u64 {
let id = EntityId::new(i);
let mut meta = HashMap::new();
meta.insert("title".to_string(), format!("Item {i}"));
meta.insert("category".to_string(), format!("cat_{}", i % 10));
db.write_item(id, &meta).unwrap();
}
// Verify metadata for item #42
let meta42 = db.get_item_metadata(EntityId::new(42)).unwrap();
assert!(meta42.is_some(), "item 42 metadata should exist");
assert_eq!(meta42.as_ref().unwrap().get("title").unwrap(), "Item 42");
// ── Step 3: Write signal events spanning last 7 days ────────────────
//
// Deterministic LCG for reproducibility. Each event:
// entity_id = i % 100
// signal_type = one of view/like/skip based on i % 3
// timestamp = now - random offset within [0, 7 days)
// weight = 1.0
let mut rng = Lcg::new(42);
let signal_types = ["view", "like", "skip"];
// Generate all events first, then sort by timestamp so the BucketedCounter
// receives events in temporal order (its rotation logic is trigger-based
// and requires monotonically increasing timestamps for accurate counts).
struct EventSpec {
entity_id: u64,
sig_idx: usize,
ts_ns: u64,
}
let mut events: Vec<EventSpec> = (0..event_count)
.map(|i| {
let offset_ns = rng.next_range(seven_days_ns);
EventSpec {
entity_id: i % 100,
sig_idx: (i % 3) as usize,
ts_ns: now_ns.saturating_sub(offset_ns),
}
})
.collect();
events.sort_by_key(|e| e.ts_ns);
// Track events for item #42 + signal "view" for analytical verification.
let mut item42_view_events: Vec<(f64, u64)> = Vec::new();
for event in &events {
let entity_id = EntityId::new(event.entity_id);
let sig = signal_types[event.sig_idx];
let ts = Timestamp::from_nanos(event.ts_ns);
let weight = 1.0;
db.signal(sig, entity_id, weight, ts).unwrap();
if event.entity_id == 42 && sig == "view" {
item42_view_events.push((weight, event.ts_ns));
}
}
assert!(
!item42_view_events.is_empty(),
"should have generated some view events for item 42"
);
// ── Step 4: Read decay score for item #42, signal "view" ────────────
//
// The decay score is computed at read-time using Timestamp::now(), which
// will be slightly after our `now`. We compute the analytical score at
// the moment of reading and allow a tolerance that accounts for the small
// time delta.
let read_time_before = Timestamp::now().as_nanos();
let score42 = db
.read_decay_score(EntityId::new(42), "view", 0)
.unwrap()
.expect("item 42 should have a view decay score");
let read_time_after = Timestamp::now().as_nanos();
// Compute analytical bounds: score at read_time_before and read_time_after.
let analytical_before = analytical_decay(&item42_view_events, view_lambda, read_time_before);
let analytical_after = analytical_decay(&item42_view_events, view_lambda, read_time_after);
// The actual read happened somewhere between before and after.
// The score should be in [analytical_after, analytical_before] (since more
// decay means lower score, and read_time_after > read_time_before).
//
// But the internal running-score accumulation may differ slightly from the
// analytical formula due to floating-point non-associativity. The running
// score applies decay incrementally: S = S_prev * exp(-lambda*dt) + w,
// while the analytical formula sums independently. For events with similar
// timestamps the difference is negligible, but for 7 days of spread events
// with an LCG we allow 1e-6 relative tolerance.
let analytical_mid = analytical_decay(
&item42_view_events,
view_lambda,
(read_time_before + read_time_after) / 2,
);
let tolerance = analytical_mid.abs() * 1e-6 + 1e-9; // relative + absolute floor
assert!(
(score42 - analytical_mid).abs() < tolerance + (analytical_before - analytical_after).abs(),
"decay score {score42} should match analytical {analytical_mid} within tolerance; \
analytical_before={analytical_before}, analytical_after={analytical_after}"
);
// ── Step 5: Read windowed count for item #42, "view", 24h ───────────
//
// Note: The BucketedCounter uses hour-granularity buckets for the 24h
// window. For dense event streams this is accurate; for the exact count
// we filter events ourselves and compare.
let expected_24h_count = item42_view_events
.iter()
.filter(|&&(_, ts_ns)| now_ns.saturating_sub(ts_ns) <= twenty_four_hours_ns)
.count() as u64;
let actual_24h_count = db
.read_windowed_count(EntityId::new(42), "view", Window::TwentyFourHours)
.unwrap();
// The warm tier uses hour-bucket granularity for 24h windows, so it may
// differ by up to the count in a single hour bucket boundary. We allow
// a margin of the events in the boundary hour.
//
// For correctness at the M1 level, we verify the count is in the right
// ballpark. The 1h window uses minute buckets and is always precise.
let expected_1h_count = item42_view_events
.iter()
.filter(|&&(_, ts_ns)| now_ns.saturating_sub(ts_ns) <= one_hour_ns)
.count() as u64;
let actual_1h_count = db
.read_windowed_count(EntityId::new(42), "view", Window::OneHour)
.unwrap();
// 1h window with minute buckets: the BucketedCounter's trigger-based
// rotation can leave at most 1 residual event in the current minute bucket
// after a full rotation cycle (60 minute buckets cleared, then 1 increment).
// When events span 7 days and entity 42's events are sparse, this boundary
// effect produces a +/- 1 discrepancy. Allow tolerance of 1.
assert!(
(actual_1h_count as i64 - expected_1h_count as i64).unsigned_abs() <= 1,
"1h windowed count should be close to expected: got {actual_1h_count}, expected {expected_1h_count}"
);
// 24h count: allow tolerance for bucket-boundary effects.
// The hour-bucket design means events near the 24h boundary may or may not
// be counted depending on which hour bucket they land in.
let tolerance_24h = (expected_24h_count as f64 * 0.15).max(5.0) as u64;
assert!(
(actual_24h_count as i64 - expected_24h_count as i64).unsigned_abs() <= tolerance_24h,
"24h windowed count {actual_24h_count} should be close to {expected_24h_count} \
(tolerance {tolerance_24h})"
);
// ── Step 5b: Read velocity for item #42, "view", 1h ────────────────
let velocity = db
.read_velocity(EntityId::new(42), "view", Window::OneHour)
.unwrap();
// Velocity = count / 3600.0. With the +/- 1 tolerance on count, velocity
// matches within 1/3600 = ~0.000278.
let expected_velocity = actual_1h_count as f64 / 3600.0;
assert!(
(velocity - expected_velocity).abs() < 1e-10,
"velocity should be count/window_secs: got {velocity}, expected {expected_velocity}"
);
// ── Step 6: Write a new "view" event for item #42 ───────────────────
let new_event_ts = Timestamp::now();
db.signal("view", EntityId::new(42), 1.0, new_event_ts)
.unwrap();
// Update our tracking for analytical comparison.
item42_view_events.push((1.0, new_event_ts.as_nanos()));
// ── Step 7: Immediately re-read and verify new event is visible ─────
let score42_after = db
.read_decay_score(EntityId::new(42), "view", 0)
.unwrap()
.expect("should still have score");
// The new score should be higher than the old one (we added a fresh event).
assert!(
score42_after >= score42,
"score after new event ({score42_after}) should be >= before ({score42})"
);
let count_1h_after = db
.read_windowed_count(EntityId::new(42), "view", Window::OneHour)
.unwrap();
// The new event is at "now", so it must be in the 1h window (count >= 1).
// However, writing the new event may trigger minute rotation that clears
// any residual count from the pre-existing events (which were from days ago).
// So the count might not increase relative to actual_1h_count -- it might
// drop to 1 (only the new event). The invariant: the new event is visible.
assert!(
count_1h_after >= 1,
"1h count should include the new event: got {count_1h_after}"
);
let velocity_after = db
.read_velocity(EntityId::new(42), "view", Window::OneHour)
.unwrap();
let expected_velocity_after = count_1h_after as f64 / 3600.0;
assert!(
(velocity_after - expected_velocity_after).abs() < 1e-10,
"velocity after new event: got {velocity_after}, expected {expected_velocity_after}"
);
// Capture values for post-recovery comparison.
let pre_close_score = score42_after;
let pre_close_1h_count = count_1h_after;
let pre_close_velocity = velocity_after;
// ── Step 8: Close and reopen ────────────────────────────────────────
db.close().unwrap();
let schema2 = m1_schema();
let db2 = TidalDb::builder()
.with_data_dir(dir.path())
.with_schema(schema2)
.open()
.unwrap();
// ── Step 9: Re-read all values for item #42 after recovery ──────────
//
// Decay score will have decayed slightly more due to time elapsed during
// close/reopen. We verify it is close to the pre-close value.
let recovered_score = db2
.read_decay_score(EntityId::new(42), "view", 0)
.unwrap()
.expect("score should survive recovery");
// The score should be very close to the pre-close score. The only
// difference is additional time decay during the close/reopen cycle
// (typically < 1 second). We allow 1% relative tolerance.
let recovery_tolerance = pre_close_score * 0.01 + 1e-9;
assert!(
(recovered_score - pre_close_score).abs() < recovery_tolerance,
"recovered score {recovered_score} should match pre-close {pre_close_score} \
within {recovery_tolerance}"
);
let recovered_1h_count = db2
.read_windowed_count(EntityId::new(42), "view", Window::OneHour)
.unwrap();
// The 1h windowed count after recovery should match the pre-close value.
// WAL replay re-applies all events in order, producing the same bucket state.
// Allow +/- 1 tolerance for bucket-boundary effects during replay.
assert!(
(recovered_1h_count as i64 - pre_close_1h_count as i64).unsigned_abs() <= 1,
"1h count should survive recovery: got {recovered_1h_count}, expected {pre_close_1h_count}"
);
let recovered_velocity = db2
.read_velocity(EntityId::new(42), "view", Window::OneHour)
.unwrap();
// Velocity = count/3600. With +/- 1 on count, velocity tolerance is 1/3600.
let velocity_tolerance = 1.0 / 3600.0 + 1e-10;
assert!(
(recovered_velocity - pre_close_velocity).abs() < velocity_tolerance,
"velocity should survive recovery: got {recovered_velocity}, expected {pre_close_velocity}"
);
// Verify metadata also survives recovery.
let meta42_recovered = db2.get_item_metadata(EntityId::new(42)).unwrap();
assert!(
meta42_recovered.is_some(),
"metadata should survive recovery"
);
assert_eq!(
meta42_recovered.unwrap().get("title").unwrap(),
"Item 42",
"metadata content should survive recovery"
);
db2.close().unwrap();
// ── Performance assertions (with generous headroom) ─────────────────
//
// These are smoke-test bounds, not strict benchmarks. The benches/ suite
// enforces the real targets. We just verify no pathological regression.
//
// Use ephemeral mode for perf checks to avoid WAL batch-timeout latency.
let perf_db = TidalDb::builder()
.ephemeral()
.with_schema(m1_schema())
.open()
.unwrap();
// Seed some data for the perf entity.
for i in 0..100u64 {
perf_db
.signal(
"view",
EntityId::new(i),
1.0,
Timestamp::from_nanos(now_ns - i * 1_000_000_000),
)
.unwrap();
}
// Decay score read: spec < 100ns, allow < 10us per read.
let perf_start = std::time::Instant::now();
let iterations = 1_000u64;
for _ in 0..iterations {
let _ = perf_db
.read_decay_score(EntityId::new(42), "view", 0)
.unwrap();
}
let perf_elapsed = perf_start.elapsed();
let per_read_ns = perf_elapsed.as_nanos() / iterations as u128;
assert!(
per_read_ns < 10_000, // 10us -- generous, spec is 100ns
"decay score read too slow: {per_read_ns}ns per read"
);
// Signal write (ephemeral, no WAL): spec < 100us amortized.
let perf_start = std::time::Instant::now();
let write_iterations = 1_000u64;
for i in 0..write_iterations {
perf_db
.signal(
"view",
EntityId::new(42),
1.0,
Timestamp::from_nanos(now_ns + 1_000_000_000 + i * 1_000_000),
)
.unwrap();
}
let write_elapsed = perf_start.elapsed();
let per_write_us = write_elapsed.as_micros() / write_iterations as u128;
assert!(
per_write_us < 1_000, // 1ms -- generous
"signal write too slow: {per_write_us}us per write"
);
// 200-entity scoring pass: spec < 5us, allow < 500us.
let perf_start = std::time::Instant::now();
let scoring_iterations = 100u64;
for _ in 0..scoring_iterations {
let mut sum = 0.0f64;
for eid in 0..200u64 {
if let Some(score) = perf_db
.read_decay_score(EntityId::new(eid % 100), "view", 0)
.unwrap()
{
sum += score;
}
}
// Prevent optimization from eliding the loop.
std::hint::black_box(sum);
}
let scoring_elapsed = perf_start.elapsed();
let per_pass_us = scoring_elapsed.as_micros() / scoring_iterations as u128;
assert!(
per_pass_us < 500, // 500us -- generous, spec is 5us (direct hot-tier access)
"200-entity scoring pass too slow: {per_pass_us}us per pass"
);
perf_db.close().unwrap();
}