Milestone 8 (phases 1-4): - Shard-aware WAL segment naming, BatchHeader v2, ShardRouter - Transport trait, InProcessTransport, WalShipper, FollowerDb - HLC, PNCounter, LWWRegister, CrdtSignalState, ReconciliationEngine - Session replication bridge with SeqNo/HWM, idempotency store Forage application: - Multi-source discovery engine with MAB exploration - Embedding-based label system, server handlers, UI refresh Other: - QUICKSTART.md, README.md, milestone-8 planning docs - Hard negative union semantics, RLHF export enhancements - Recovery benchmark and visibility test expansions - Split 8 oversized source files per CODING_GUIDELINES §9 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
333 lines
12 KiB
Rust
333 lines
12 KiB
Rust
//! Recovery SLA integration tests.
|
|
//!
|
|
//! Asserts hard latency bounds on cold-start recovery time.
|
|
//! These tests run on every `cargo test` invocation (the expensive 1M-item
|
|
//! variant is marked `#[ignore]` and must be run explicitly).
|
|
//!
|
|
//! See `benches/recovery.rs` for the matching Criterion microbenchmark.
|
|
|
|
#![allow(clippy::unwrap_used)]
|
|
|
|
use std::time::{Duration, Instant};
|
|
|
|
use tidaldb::TidalDb;
|
|
use tidaldb::schema::{DecaySpec, EntityId, EntityKind, SchemaBuilder, Timestamp, Window};
|
|
|
|
fn bench_schema() -> tidaldb::schema::Schema {
|
|
let mut builder = SchemaBuilder::new();
|
|
let _ = builder
|
|
.signal(
|
|
"view",
|
|
EntityKind::Item,
|
|
DecaySpec::Exponential {
|
|
half_life: Duration::from_secs(7 * 24 * 3600),
|
|
},
|
|
)
|
|
.windows(&[Window::AllTime])
|
|
.velocity(false)
|
|
.add();
|
|
builder.build().expect("valid schema")
|
|
}
|
|
|
|
fn generate_test_data(dir: &std::path::Path, entity_count: u64) {
|
|
let schema = bench_schema();
|
|
let db = TidalDb::builder()
|
|
.with_data_dir(dir)
|
|
.with_schema(schema)
|
|
.open()
|
|
.unwrap();
|
|
|
|
let base_ns = 1_000_000_000_000u64;
|
|
for i in 1..=entity_count {
|
|
let ts = Timestamp::from_nanos(base_ns + i * 1_000_000);
|
|
db.signal("view", EntityId::new(i), 1.0, ts).unwrap();
|
|
|
|
if entity_count >= 100_000 && i % 100_000 == 0 {
|
|
eprintln!(" setup: {i}/{entity_count} entities written");
|
|
}
|
|
}
|
|
db.close().unwrap();
|
|
}
|
|
|
|
/// Assert that recovery from 1000-item checkpoint completes in under 2 seconds.
|
|
#[test]
|
|
fn small_scale_recovery_smoke_test() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
generate_test_data(dir.path(), 1000);
|
|
|
|
let schema = bench_schema();
|
|
let start = Instant::now();
|
|
let db = TidalDb::builder()
|
|
.with_data_dir(dir.path())
|
|
.with_schema(schema)
|
|
.open()
|
|
.unwrap();
|
|
|
|
let count = db
|
|
.read_windowed_count(EntityId::new(500), "view", Window::AllTime)
|
|
.unwrap();
|
|
assert_eq!(count, 1);
|
|
|
|
let elapsed = start.elapsed();
|
|
assert!(
|
|
elapsed < Duration::from_secs(2),
|
|
"1000-entity recovery took {elapsed:?}, expected < 2s"
|
|
);
|
|
|
|
db.close().unwrap();
|
|
}
|
|
|
|
/// Assert that recovery from 10K-item checkpoint completes in under 5 seconds.
|
|
///
|
|
/// This is the CI-safe version of the 1M-item SLA test below.
|
|
/// Run the full benchmark with: cargo bench --bench recovery
|
|
#[test]
|
|
fn recovery_under_5_seconds_10k_items() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
generate_test_data(dir.path(), 10_000);
|
|
|
|
let schema = bench_schema();
|
|
let start = Instant::now();
|
|
let db = TidalDb::builder()
|
|
.with_data_dir(dir.path())
|
|
.with_schema(schema)
|
|
.open()
|
|
.unwrap();
|
|
|
|
let count = db
|
|
.read_windowed_count(EntityId::new(5000), "view", Window::AllTime)
|
|
.unwrap();
|
|
assert_eq!(count, 1);
|
|
|
|
let elapsed = start.elapsed();
|
|
assert!(
|
|
elapsed < Duration::from_secs(5),
|
|
"10K-entity recovery took {elapsed:?}, expected < 5s"
|
|
);
|
|
|
|
db.close().unwrap();
|
|
}
|
|
|
|
/// Assert that recovery from 1M-item checkpoint completes in under 30 seconds.
|
|
///
|
|
/// This is the spec-minimum SLA test from task-05. It takes ~5-10 minutes
|
|
/// to generate the 1M-item dataset (one-time setup). Marked `#[ignore]` so
|
|
/// CI does not run it on every commit. Run locally before major changes to
|
|
/// the checkpoint format, WAL replay logic, or entity rebuild:
|
|
///
|
|
/// ```bash
|
|
/// cargo test --manifest-path tidal/Cargo.toml --test m7_recovery_sla -- --ignored
|
|
/// ```
|
|
///
|
|
/// NOTE: This measures checkpoint restore + index rebuild only (WAL backlog
|
|
/// is ~0 after a clean `db.close()`). See the module-level doc for scope.
|
|
#[test]
|
|
#[ignore = "expensive: generates 1M items (~5min setup), run with --ignored"]
|
|
fn recovery_under_30_seconds() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
generate_test_data(dir.path(), 1_000_000);
|
|
eprintln!(" setup: checkpoint written, starting recovery timing");
|
|
|
|
let schema = bench_schema();
|
|
let start = Instant::now();
|
|
let db = TidalDb::builder()
|
|
.with_data_dir(dir.path())
|
|
.with_schema(schema)
|
|
.open()
|
|
.unwrap();
|
|
|
|
let elapsed = start.elapsed();
|
|
eprintln!("Recovery time (1M items): {elapsed:?}");
|
|
|
|
let count = db
|
|
.read_windowed_count(EntityId::new(500_000), "view", Window::AllTime)
|
|
.unwrap();
|
|
assert_eq!(count, 1, "entity 500000 should have exactly 1 signal");
|
|
|
|
// 30-second SLA from task-05 spec.
|
|
assert!(
|
|
elapsed < Duration::from_secs(30),
|
|
"1M-entity recovery took {elapsed:?}, expected < 30s"
|
|
);
|
|
|
|
db.close().unwrap();
|
|
}
|
|
|
|
// ── WAL backlog recovery tests ──────────────────────────────────────────────
|
|
//
|
|
// These tests verify recovery when a checkpoint exists on disk but there are
|
|
// also unprocessed WAL events written after the checkpoint (the "backlog").
|
|
//
|
|
// The approach:
|
|
// 1. Write base signals via `TidalDb`, call `db.close()` -> checkpoint on disk,
|
|
// WAL compacted.
|
|
// 2. Inject raw WAL segment files into the `wal/` directory using the public
|
|
// `encode_batch` + `segment_filename` API. The injected batches have
|
|
// sequence numbers above the checkpoint boundary, simulating events written
|
|
// to the WAL but never checkpointed (i.e., a crash before checkpoint).
|
|
// 3. Reopen the data directory. Recovery replays the injected WAL events.
|
|
// 4. Verify that backlog entity IDs are readable (proves WAL replay occurred).
|
|
//
|
|
// NOTE: `TidalDb::Drop` calls `shutdown_inner()` which performs a full checkpoint
|
|
// and WAL compaction. We cannot use `std::mem::forget` because it would leak the
|
|
// file lock. Instead we inject WAL segments post-close, which produces the exact
|
|
// same on-disk state as a crash-with-pending-WAL scenario.
|
|
|
|
use tidaldb::replication::ShardId;
|
|
use tidaldb::wal::checkpoint::CheckpointManager;
|
|
use tidaldb::wal::format::{EventRecord, MAX_EVENTS_PER_BATCH, encode_batch};
|
|
use tidaldb::wal::segment::segment_filename;
|
|
|
|
/// Inject `backlog_count` raw WAL signal events into the WAL directory,
|
|
/// starting at sequence `checkpoint_seq + 1`. Uses entity IDs
|
|
/// `base_entity + 1 ..= base_entity + backlog_count` with `signal_type = 0`
|
|
/// ("view", the only signal in `bench_schema`, assigned ID 0 alphabetically).
|
|
///
|
|
/// Returns the first backlog entity ID for verification.
|
|
fn inject_wal_backlog(data_dir: &std::path::Path, base_entity: u64, backlog_count: u64) -> u64 {
|
|
let wal_dir = data_dir.join("wal");
|
|
std::fs::create_dir_all(&wal_dir).unwrap();
|
|
|
|
// Read the current checkpoint to determine where to start injected seqs.
|
|
let checkpoint = CheckpointManager::read(&wal_dir).unwrap();
|
|
let checkpoint_seq = checkpoint.map_or(0, |(seq, _)| seq);
|
|
|
|
let base_ns = 1_000_000_000_000u64;
|
|
let first_backlog_entity = base_entity + 1;
|
|
|
|
// Build event records for the backlog.
|
|
let events: Vec<EventRecord> = (1..=backlog_count)
|
|
.map(|i| EventRecord {
|
|
entity_id: base_entity + i,
|
|
signal_type: 0, // "view" is the only signal, assigned ID 0
|
|
weight: 1.0,
|
|
timestamp_nanos: base_ns + (base_entity + i) * 1_000_000,
|
|
})
|
|
.collect();
|
|
|
|
// Encode into batches (max 256 events per batch), write as a single
|
|
// WAL segment file starting at checkpoint_seq + 1.
|
|
let mut seq = checkpoint_seq + 1;
|
|
let seg_path = wal_dir.join(segment_filename(ShardId::SINGLE, seq));
|
|
let mut segment_bytes: Vec<u8> = Vec::new();
|
|
|
|
for chunk in events.chunks(usize::from(MAX_EVENTS_PER_BATCH)) {
|
|
let batch_ts = chunk[0].timestamp_nanos;
|
|
let batch_bytes = encode_batch(chunk, seq, batch_ts).unwrap();
|
|
segment_bytes.extend_from_slice(&batch_bytes);
|
|
seq += chunk.len() as u64;
|
|
}
|
|
|
|
std::fs::write(&seg_path, &segment_bytes).unwrap();
|
|
|
|
first_backlog_entity
|
|
}
|
|
|
|
/// Assert that recovery from a 1000-item checkpoint + 500-signal WAL backlog
|
|
/// completes in under 5 seconds and that backlog signals are actually replayed.
|
|
///
|
|
/// This is the CI-safe smoke test for WAL backlog recovery.
|
|
#[test]
|
|
fn small_scale_wal_backlog_smoke_test() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let base_count = 1000u64;
|
|
let backlog_count = 500u64;
|
|
|
|
// Phase 1: Write base signals and checkpoint via clean close.
|
|
generate_test_data(dir.path(), base_count);
|
|
|
|
// Phase 2: Inject raw WAL events that simulate a crash before checkpoint.
|
|
let first_backlog = inject_wal_backlog(dir.path(), base_count, backlog_count);
|
|
|
|
// Phase 3: Reopen and time recovery (checkpoint restore + WAL replay).
|
|
let schema = bench_schema();
|
|
let start = Instant::now();
|
|
let db = TidalDb::builder()
|
|
.with_data_dir(dir.path())
|
|
.with_schema(schema)
|
|
.open()
|
|
.unwrap();
|
|
let elapsed = start.elapsed();
|
|
|
|
eprintln!("Recovery time (1K checkpoint + 500 WAL backlog): {elapsed:?}");
|
|
|
|
// Verify base entity survived the checkpoint.
|
|
let base_count_val = db
|
|
.read_windowed_count(EntityId::new(500), "view", Window::AllTime)
|
|
.unwrap();
|
|
assert_eq!(
|
|
base_count_val, 1,
|
|
"base entity 500 should have 1 signal from checkpoint"
|
|
);
|
|
|
|
// Verify backlog entity was replayed from WAL.
|
|
let backlog_count_val = db
|
|
.read_windowed_count(EntityId::new(first_backlog), "view", Window::AllTime)
|
|
.unwrap();
|
|
assert_eq!(
|
|
backlog_count_val, 1,
|
|
"backlog entity {first_backlog} should have 1 signal from WAL replay"
|
|
);
|
|
|
|
assert!(
|
|
elapsed < Duration::from_secs(5),
|
|
"WAL backlog recovery took {elapsed:?}, expected < 5s"
|
|
);
|
|
|
|
db.close().unwrap();
|
|
}
|
|
|
|
/// Assert that recovery from a 1M-item checkpoint + 50K-signal WAL backlog
|
|
/// completes in under 30 seconds and that backlog signals are replayed.
|
|
///
|
|
/// This is the full-scale WAL backlog SLA test. Marked `#[ignore]` because
|
|
/// generating 1M items takes several minutes. Run explicitly:
|
|
///
|
|
/// ```bash
|
|
/// cargo test --manifest-path tidal/Cargo.toml --test m7_recovery_sla -- --ignored recovery_with_wal_backlog_under_30_seconds
|
|
/// ```
|
|
#[test]
|
|
#[ignore = "expensive: generates 1M + 50K items, run with --ignored"]
|
|
fn recovery_with_wal_backlog_under_30_seconds() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let base_count = 1_000_000u64;
|
|
let backlog_count = 50_000u64;
|
|
|
|
// Phase 1: Write base signals and checkpoint via clean close.
|
|
generate_test_data(dir.path(), base_count);
|
|
eprintln!(" setup: checkpoint written ({base_count} base items)");
|
|
|
|
// Phase 2: Inject raw WAL events that simulate a crash before checkpoint.
|
|
let first_backlog = inject_wal_backlog(dir.path(), base_count, backlog_count);
|
|
eprintln!(" setup: {backlog_count} WAL backlog events injected, starting recovery timing");
|
|
|
|
// Phase 3: Reopen and time recovery (checkpoint restore + WAL replay).
|
|
let schema = bench_schema();
|
|
let start = Instant::now();
|
|
let db = TidalDb::builder()
|
|
.with_data_dir(dir.path())
|
|
.with_schema(schema)
|
|
.open()
|
|
.unwrap();
|
|
let elapsed = start.elapsed();
|
|
|
|
eprintln!("Recovery time (1M checkpoint + 50K WAL backlog): {elapsed:?}");
|
|
|
|
// Verify backlog entity was replayed from WAL.
|
|
let backlog_count_val = db
|
|
.read_windowed_count(EntityId::new(first_backlog), "view", Window::AllTime)
|
|
.unwrap();
|
|
assert_eq!(
|
|
backlog_count_val, 1,
|
|
"backlog entity {first_backlog} should have 1 signal from WAL replay"
|
|
);
|
|
|
|
// 30-second SLA.
|
|
assert!(
|
|
elapsed < Duration::from_secs(30),
|
|
"WAL backlog recovery took {elapsed:?}, expected < 30s"
|
|
);
|
|
|
|
db.close().unwrap();
|
|
}
|