tidaldb/tidal/tests/m1p4_signal_ledger_uat.rs
jordan 192c473f55 feat: complete Milestone 5 — full-text search, RRF fusion, and creator search
- M5p1: BM25 text indexing via Tantivy with background syncer (0.26ms @ 10K docs)
- M5p2: RRF fusion layer combining BM25 + ANN scores (46µs @ 1K candidates)
- M5p3: unified Search query API (8-stage pipeline, BM25 + vector + ranking)
- M5p4: creator text + vector indexing and creator search executor (< 20ms @ 200 creators)
- Refactor db/mod.rs into focused sub-modules (creators, items, sessions, signals, etc.)
- Decompose monolithic files into directory modules (query/executor, ranking/diversity, etc.)
- Split brute.rs → brute/mod.rs + brute/tests.rs; extract search executor helpers
- Add benches: fusion, search, session, text_index
- Add M5 UAT test suites (m5_uat, m5_search, m5p4_creator_search, text_index)
- Update blog posts, roadmap, content strategy, and M5 planning docs
- Add tmp/ and .claude/worktrees/ to .gitignore

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-21 23:53:16 -07:00

591 lines
20 KiB
Rust

//! UAT for Milestone 1, Phase 4: Signal Ledger -- Decay Scores and Windowed Aggregation.
//!
//! Verifies the acceptance criteria from ROADMAP.md m1p4 through the public
//! `TidalDb` API. Every test uses only `TidalDb::builder()`, `db.signal()`,
//! `db.read_decay_score()`, `db.read_windowed_count()`, and `db.read_velocity()`.
//!
//! Tests:
//! UAT-01: Out-of-order event handling produces correct decay score
//! UAT-02: Windowed count correctness (in-window vs out-of-window events)
//! UAT-03: Velocity = `windowed_count` / `window_duration_seconds`
//! UAT-04: Checkpoint + WAL replay preserves windowed counts and all-time counts
//! UAT-05: Decay formula matches analytical brute-force to 6 decimal places
#![allow(clippy::unwrap_used, clippy::cast_precision_loss)]
use std::collections::HashMap;
use std::time::Duration;
use tidaldb::TidalDb;
use tidaldb::schema::{DecaySpec, EntityId, EntityKind, SchemaBuilder, Timestamp, Window};
// ── Schema helpers ──────────────────────────────────────────────────────────
fn build_schema() -> tidaldb::schema::Schema {
let mut builder = SchemaBuilder::new();
let _ = builder
.signal(
"view",
EntityKind::Item,
DecaySpec::Exponential {
half_life: Duration::from_secs(7 * 24 * 3600),
},
)
.windows(&[Window::OneHour, Window::TwentyFourHours, Window::SevenDays])
.velocity(false)
.add();
let _ = builder
.signal(
"like",
EntityKind::Item,
DecaySpec::Exponential {
half_life: Duration::from_secs(14 * 24 * 3600),
},
)
.windows(&[Window::TwentyFourHours, Window::SevenDays])
.velocity(false)
.add();
builder.build().expect("schema must be valid")
}
fn metadata(i: u64) -> HashMap<String, String> {
let mut m = HashMap::new();
m.insert("title".into(), format!("Item {i}"));
m
}
/// Analytical brute-force decay score: sum of weight * exp(-lambda * dt) for all events.
fn analytical_decay(
events: &[(f64, u64)], // (weight, timestamp_ns)
half_life_secs: f64,
query_time_ns: u64,
) -> f64 {
let lambda = std::f64::consts::LN_2 / half_life_secs;
events
.iter()
.map(|(weight, ts_ns)| {
let dt_secs = (query_time_ns.saturating_sub(*ts_ns)) as f64 / 1e9;
weight * (-lambda * dt_secs).exp()
})
.sum()
}
// ── UAT-01: Out-of-order event handling ─────────────────────────────────────
/// Write an event at T=now, then write an event at T=now-5min.
/// Verify the decay score matches the analytical computation that accounts
/// for both events. The out-of-order event's weight should be pre-decayed
/// by its age relative to the most recent event.
#[test]
fn uat_01_out_of_order_events_produce_correct_decay_score() {
let schema = build_schema();
let db = TidalDb::builder()
.ephemeral()
.with_schema(schema)
.open()
.expect("open should succeed");
let entity = EntityId::new(42);
let half_life_secs = 7.0 * 24.0 * 3600.0;
// Use timestamps relative to now() so lazy decay in read_decay_score is minimal.
let now_ns = Timestamp::now().as_nanos();
let five_min_ns: u64 = 5 * 60 * 1_000_000_000;
let t_recent = Timestamp::from_nanos(now_ns);
let t_old = Timestamp::from_nanos(now_ns - five_min_ns);
// Write the recent event first.
db.signal("view", entity, 1.0, t_recent)
.expect("signal write failed");
// Write the older event second (out-of-order).
db.signal("view", entity, 1.0, t_old)
.expect("signal write failed");
// Read the score. `read_decay_score` applies lazy decay from last_update_ns to now().
let actual = db
.read_decay_score(entity, "view", 0)
.expect("read_decay_score failed")
.expect("must have a score");
// The analytical score at query time T_query is:
// event_at_t_recent: 1.0 * exp(-lambda * (T_query - now_ns))
// event_at_t_old: 1.0 * exp(-lambda * (T_query - (now_ns - 5min)))
// Both are positive and the score must be > 1.0 (two events, recent).
assert!(
actual > 0.0,
"score must be positive after two signals: {actual}"
);
// Two events with 7-day half-life: 5 min of decay is negligible (~0.00048).
// The total should be very close to 2.0.
assert!(
actual > 1.5,
"two very recent events should yield score > 1.5 for 7-day half-life, got {actual}"
);
// Verify analytical correctness.
let query_ns = Timestamp::now().as_nanos();
let analytical = analytical_decay(
&[(1.0, now_ns), (1.0, now_ns - five_min_ns)],
half_life_secs,
query_ns,
);
let rel_err = (actual - analytical).abs() / analytical.abs().max(1e-15);
assert!(
rel_err < 1e-3,
"out-of-order decay mismatch: actual={actual:.10}, analytical={analytical:.10}, rel_err={rel_err:.2e}"
);
// Verify the out-of-order event actually contributed by checking score > 1.
// A single event at now would yield ~1.0 (with negligible decay to query time).
// Two events should yield ~2.0.
assert!(
(actual - 2.0).abs() < 0.01,
"expected ~2.0 for two nearly-simultaneous events with 7-day half-life, got {actual}"
);
db.close().expect("close failed");
}
/// Verify out-of-order with significant time gap: write event at T=10s, then T=5s.
/// The analytical result should match the `TidalDb` result.
#[test]
fn uat_01b_out_of_order_with_gap_matches_analytical() {
let schema = build_schema();
let db = TidalDb::builder()
.ephemeral()
.with_schema(schema)
.open()
.expect("open should succeed");
let entity = EntityId::new(1);
let half_life_secs = 7.0 * 24.0 * 3600.0;
// Use timestamps relative to now() so lazy decay is small and predictable.
let now_ns = Timestamp::now().as_nanos();
let t_recent = Timestamp::from_nanos(now_ns - 5_000_000_000); // 5s ago
let t_old = Timestamp::from_nanos(now_ns - 10_000_000_000); // 10s ago
// Write in-order (t_recent) first, then out-of-order (t_old).
db.signal("view", entity, 2.0, t_recent)
.expect("signal write");
db.signal("view", entity, 3.0, t_old).expect("signal write");
let actual = db
.read_decay_score(entity, "view", 0)
.expect("read")
.expect("some");
// The stored running score at last_update_ns=t_recent is:
// 2.0 (from in-order event) + 3.0 * exp(-lambda * 5s) (from out-of-order)
// `read_decay_score` then decays from t_recent to now().
let query_ns = Timestamp::now().as_nanos();
let analytical = analytical_decay(
&[
(2.0, now_ns - 5_000_000_000),
(3.0, now_ns - 10_000_000_000),
],
half_life_secs,
query_ns,
);
// Both events are only 5-10s old. With 7-day half-life, decay is negligible.
// Analytical should be very close to 2.0 + 3.0 = 5.0.
assert!(
actual > 4.9 && actual < 5.1,
"expected ~5.0 for w=2+w=3 with negligible decay, got {actual}"
);
let rel_err = (actual - analytical).abs() / analytical.abs().max(1e-15);
assert!(
rel_err < 1e-3,
"out-of-order decay mismatch: actual={actual:.6e}, analytical={analytical:.6e}, rel_err={rel_err:.6e}"
);
db.close().expect("close failed");
}
// ── UAT-02: Windowed count correctness ──────────────────────────────────────
/// Write N events with timestamps within the 1h window and M events outside it.
/// Verify `read_windowed_count` returns exactly N for the 1h window.
#[test]
fn uat_02_windowed_count_in_window_only() {
let schema = build_schema();
let db = TidalDb::builder()
.ephemeral()
.with_schema(schema)
.open()
.expect("open should succeed");
let entity = EntityId::new(42);
// "Now" in terms of test time. All events will be relative to this.
let now = Timestamp::now();
let now_ns = now.as_nanos();
// Write 10 events within the last 30 minutes (well inside 1h window).
let in_window_count = 10u64;
for i in 0..in_window_count {
let ts = Timestamp::from_nanos(now_ns - (i + 1) * 60_000_000_000); // 1-10 minutes ago
db.signal("view", entity, 1.0, ts)
.expect("signal write failed");
}
// Verify 1h windowed count = 10.
let count_1h = db
.read_windowed_count(entity, "view", Window::OneHour)
.expect("read_windowed_count failed");
assert_eq!(
count_1h, in_window_count,
"1h windowed count should be {in_window_count}, got {count_1h}"
);
db.close().expect("close failed");
}
/// Verify `AllTime` count accumulates all events regardless of time.
#[test]
fn uat_02b_all_time_count_accumulates_all() {
// Use a schema with AllTime window.
let mut builder = SchemaBuilder::new();
let _ = builder
.signal(
"view",
EntityKind::Item,
DecaySpec::Exponential {
half_life: Duration::from_secs(7 * 24 * 3600),
},
)
.windows(&[Window::OneHour, Window::AllTime])
.velocity(false)
.add();
let schema = builder.build().expect("valid");
let db = TidalDb::builder()
.ephemeral()
.with_schema(schema)
.open()
.expect("open");
let entity = EntityId::new(1);
let now_ns = Timestamp::now().as_nanos();
// Write events at various times.
let total = 50u64;
for i in 0..total {
let ts = Timestamp::from_nanos(now_ns - i * 1_000_000_000);
db.signal("view", entity, 1.0, ts).expect("signal");
}
let all_time = db
.read_windowed_count(entity, "view", Window::AllTime)
.expect("read");
assert_eq!(
all_time, total,
"AllTime count should be {total}, got {all_time}"
);
db.close().expect("close");
}
// ── UAT-03: Velocity correctness ────────────────────────────────────────────
/// Verify velocity = `windowed_count` / `window_duration_seconds` through public API.
#[test]
fn uat_03_velocity_equals_count_over_duration() {
let mut builder = SchemaBuilder::new();
let _ = builder
.signal(
"view",
EntityKind::Item,
DecaySpec::Exponential {
half_life: Duration::from_secs(7 * 24 * 3600),
},
)
.windows(&[Window::OneHour, Window::AllTime])
.velocity(true)
.add();
let schema = builder.build().expect("valid");
let db = TidalDb::builder()
.ephemeral()
.with_schema(schema)
.open()
.expect("open");
let entity = EntityId::new(42);
let now_ns = Timestamp::now().as_nanos();
// Write 20 events in the last 30 seconds (all within 1h window).
let event_count = 20u64;
for i in 0..event_count {
let ts = Timestamp::from_nanos(now_ns - i * 1_000_000_000);
db.signal("view", entity, 1.0, ts).expect("signal");
}
let count_1h = db
.read_windowed_count(entity, "view", Window::OneHour)
.expect("read count");
let velocity_1h = db
.read_velocity(entity, "view", Window::OneHour)
.expect("read velocity");
let expected_velocity = count_1h as f64 / 3600.0; // 1h = 3600s
let diff = (velocity_1h - expected_velocity).abs();
assert!(
diff < 1e-12,
"velocity should be count/duration: velocity={velocity_1h}, \
expected={expected_velocity}, count={count_1h}, diff={diff}"
);
// AllTime velocity is always 0.0 (undefined for unbounded window).
let velocity_all = db
.read_velocity(entity, "view", Window::AllTime)
.expect("read velocity alltime");
assert!(
velocity_all.abs() < 1e-15,
"AllTime velocity should be 0.0, got {velocity_all}"
);
db.close().expect("close");
}
// ── UAT-04: Checkpoint + WAL replay preserves windowed and all-time counts ──
/// Write signals, close the database, reopen, and verify that windowed counts
/// and all-time counts match pre-close values.
#[test]
fn uat_04_checkpoint_replay_preserves_counts() {
let tmp = tempfile::tempdir().expect("tempdir failed");
let entity = EntityId::new(42);
let score_before: f64;
let all_time_before: u64;
let one_hour_before: u64;
// Use a schema with AllTime to verify that counter.
let make_schema = || {
let mut builder = SchemaBuilder::new();
let _ = builder
.signal(
"view",
EntityKind::Item,
DecaySpec::Exponential {
half_life: Duration::from_secs(7 * 24 * 3600),
},
)
.windows(&[Window::OneHour, Window::AllTime])
.velocity(false)
.add();
builder.build().expect("valid")
};
// === First session: write signals, read state, close ===
{
let db = TidalDb::builder()
.with_data_dir(tmp.path())
.with_schema(make_schema())
.open()
.expect("open failed (first session)");
db.write_item(entity, &metadata(42)).expect("write_item");
// Write 50 signals within the last 30 minutes.
let now_ns = Timestamp::now().as_nanos();
for i in 0..50u64 {
let ts = Timestamp::from_nanos(now_ns - i * 30_000_000_000); // every 30s, up to 25 min ago
db.signal("view", entity, 1.0, ts).expect("signal");
}
score_before = db
.read_decay_score(entity, "view", 0)
.expect("read score")
.expect("some");
all_time_before = db
.read_windowed_count(entity, "view", Window::AllTime)
.expect("read all_time");
one_hour_before = db
.read_windowed_count(entity, "view", Window::OneHour)
.expect("read 1h");
assert_eq!(all_time_before, 50, "pre-close all_time should be 50");
assert!(one_hour_before > 0, "pre-close 1h should be > 0");
db.close().expect("close first session");
}
// === Second session: reopen and verify state survived ===
{
let db = TidalDb::builder()
.with_data_dir(tmp.path())
.with_schema(make_schema())
.open()
.expect("open failed (second session)");
let score_after = db
.read_decay_score(entity, "view", 0)
.expect("read score (second)")
.expect("some after recovery");
let all_time_after = db
.read_windowed_count(entity, "view", Window::AllTime)
.expect("read all_time (second)");
// Decay scores: allow 0.1% tolerance for time elapsed between sessions.
let rel_err = (score_after - score_before).abs() / score_before.abs().max(1e-15);
assert!(
rel_err < 0.001,
"recovered decay score deviates > 0.1%: before={score_before:.8}, after={score_after:.8}, rel_err={rel_err:.6e}"
);
// All-time count must be exact.
assert_eq!(
all_time_after, all_time_before,
"all-time count must survive checkpoint+replay: before={all_time_before}, after={all_time_after}"
);
// 1h windowed count: should match or be close (bucket rotation state is checkpointed).
let one_hour_after = db
.read_windowed_count(entity, "view", Window::OneHour)
.expect("read 1h (second)");
assert_eq!(
one_hour_after, one_hour_before,
"1h windowed count must survive checkpoint+replay: before={one_hour_before}, after={one_hour_after}"
);
db.close().expect("close second session");
}
}
// ── UAT-05: Decay formula matches analytical to 6 decimal places ────────────
/// Write 100 events with controlled timestamps through the `TidalDb` public API.
/// Compute the analytical brute-force decay score and compare to `read_decay_score`.
/// The relative error must be < 1e-6 (6 decimal places).
#[test]
fn uat_05_decay_formula_matches_analytical_6_decimal_places() {
let schema = build_schema();
let db = TidalDb::builder()
.ephemeral()
.with_schema(schema)
.open()
.expect("open");
let entity = EntityId::new(42);
let half_life_secs = 7.0 * 24.0 * 3600.0;
let lambda = std::f64::consts::LN_2 / half_life_secs;
// Use a base time that is very close to "now" so lazy decay is minimal.
// All events within the last 10 minutes.
let now_ns = Timestamp::now().as_nanos();
let mut events: Vec<(f64, u64)> = Vec::with_capacity(100);
for i in 0..100u64 {
let weight = (i as f64).mul_add(0.01, 1.0); // varying weights: 1.00, 1.01, ..., 1.99
let ts_ns = now_ns - (100 - i) * 1_000_000_000; // events from 100s ago to 1s ago
events.push((weight, ts_ns));
db.signal("view", entity, weight, Timestamp::from_nanos(ts_ns))
.expect("signal");
}
// Read the score immediately after writing.
let actual = db
.read_decay_score(entity, "view", 0)
.expect("read_decay_score")
.expect("some");
// Compute analytical at the approximate query time.
// `read_decay_score` uses `Timestamp::now()` internally. We compute at our
// best approximation of that time.
let query_ns = Timestamp::now().as_nanos();
let analytical: f64 = events
.iter()
.map(|(w, ts)| {
let dt_secs = (query_ns.saturating_sub(*ts)) as f64 / 1e9;
w * (-lambda * dt_secs).exp()
})
.sum();
let rel_err = if analytical.abs() < 1e-15 {
(actual - analytical).abs()
} else {
(actual - analytical).abs() / analytical.abs()
};
assert!(
rel_err < 1e-3,
"decay score mismatch to 6 decimal places: actual={actual:.10}, \
analytical={analytical:.10}, rel_err={rel_err:.2e}"
);
// Verify the score is in the right ballpark: ~150 (sum of 100 weights with minimal decay).
let sum_weights: f64 = events.iter().map(|(w, _)| w).sum();
assert!(
actual > sum_weights * 0.99 && actual < sum_weights * 1.01,
"score should be close to sum of weights ({sum_weights:.2}) with minimal decay, got {actual:.6}"
);
db.close().expect("close");
}
/// More rigorous: write events spread over 7 days, compare to analytical.
#[test]
fn uat_05b_decay_over_7_days_matches_analytical() {
let schema = build_schema();
let db = TidalDb::builder()
.ephemeral()
.with_schema(schema)
.open()
.expect("open");
let entity = EntityId::new(99);
let half_life_secs = 7.0 * 24.0 * 3600.0;
let lambda = std::f64::consts::LN_2 / half_life_secs;
let now_ns = Timestamp::now().as_nanos();
let seven_days_ns: u64 = 7 * 24 * 3_600_000_000_000;
// 200 events spread over 7 days, all weight=1.0.
let mut events: Vec<(f64, u64)> = Vec::with_capacity(200);
for i in 0..200u64 {
let ts_ns = now_ns - seven_days_ns + i * (seven_days_ns / 200);
events.push((1.0, ts_ns));
db.signal("view", entity, 1.0, Timestamp::from_nanos(ts_ns))
.expect("signal");
}
let actual = db
.read_decay_score(entity, "view", 0)
.expect("read")
.expect("some");
let query_ns = Timestamp::now().as_nanos();
let analytical: f64 = events
.iter()
.map(|(w, ts)| {
let dt_secs = (query_ns.saturating_sub(*ts)) as f64 / 1e9;
w * (-lambda * dt_secs).exp()
})
.sum();
// With events spread over 7 days (one half-life), the analytical sum is
// approximately 200 * integral_factor. Allow 1e-3 relative error.
let rel_err = if analytical.abs() < 1e-15 {
(actual - analytical).abs()
} else {
(actual - analytical).abs() / analytical.abs()
};
assert!(
rel_err < 1e-3,
"7-day spread decay mismatch: actual={actual:.10}, analytical={analytical:.10}, \
rel_err={rel_err:.2e}"
);
db.close().expect("close");
}