#![allow( clippy::cast_precision_loss, clippy::cast_sign_loss, clippy::missing_const_for_fn )] //! UAT tests for Milestone 1, Phase 2: Write-Ahead Log. //! //! These tests verify acceptance criteria that are NOT sufficiently covered //! by the existing `wal_integration.rs` tests. Each test uses only the public //! WAL API surface: `WalHandle`, `WalConfig`, `SignalEvent`. use std::sync::Arc; use std::time::Duration; use tidaldb::wal::{SignalEvent, WalConfig, WalHandle}; fn uat_config(dir: &std::path::Path) -> WalConfig { WalConfig { dir: dir.to_path_buf(), segment_size: 16 * 1024 * 1024, batch_size: 100, batch_timeout: Duration::from_millis(10), dedup_window: Duration::from_secs(30), } } fn make_event(id: u64) -> SignalEvent { SignalEvent { entity_id: id, signal_type: 1, weight: 1.0, timestamp_nanos: id * 1_000_000_000, } } // --------------------------------------------------------------------------- // UAT-01: First sequence number is exactly 1 // // Spec: "Sequence numbers are monotonically increasing u64, starting at 1" // The existing tests verify monotonicity but not the exact starting value. // --------------------------------------------------------------------------- #[test] fn uat_01_first_seq_starts_at_one() { let dir = tempfile::tempdir().expect("tempdir creation should succeed"); let config = uat_config(dir.path()); let (handle, replayed, _session_events) = WalHandle::open(config).expect("open should succeed"); assert!( replayed.is_empty(), "fresh WAL should have no replayed events" ); let seq = handle.append(make_event(1)).expect("append should succeed"); assert_eq!( seq, 1, "very first event must get sequence number 1, got {seq}" ); let seq2 = handle.append(make_event(2)).expect("append should succeed"); assert_eq!( seq2, 2, "second event must get sequence number 2, got {seq2}" ); handle.shutdown().expect("shutdown should succeed"); } // --------------------------------------------------------------------------- // UAT-02: Crash simulation via Drop (no explicit shutdown) // // Spec: "Crash simulation = write events, drop WalHandle without clean // shutdown, reopen and verify" // // The WalHandle Drop implementation sends a best-effort Shutdown and joins // the writer thread. This simulates a non-graceful close where the caller // forgets to call shutdown(). Events that were already fsynced in committed // batches must survive. // --------------------------------------------------------------------------- #[test] fn uat_02_drop_without_shutdown_recovers_committed_events() { let dir = tempfile::tempdir().expect("tempdir creation should succeed"); // Write events and drop the handle without calling shutdown. { let config = uat_config(dir.path()); let (handle, _, _) = WalHandle::open(config).expect("open should succeed"); for i in 1..=50 { handle.append(make_event(i)).expect("append should succeed"); } // Each append() blocks until the batch is fsynced. So by the time we // reach this point, all 50 events are durable on disk. Now drop the // handle without calling shutdown() -- the Drop impl does best-effort // cleanup but the committed events must survive regardless. drop(handle); } // Reopen and verify all committed events are present. let config = uat_config(dir.path()); let (handle, replayed, _session_events) = WalHandle::open(config).expect("reopen should succeed"); assert_eq!( replayed.len(), 50, "all 50 committed events should survive a Drop-only close, got {}", replayed.len() ); // Verify data integrity of replayed events for (i, event) in replayed.iter().enumerate() { let expected = make_event((i + 1) as u64); assert_eq!( event.entity_id, expected.entity_id, "event {i} entity_id mismatch after Drop recovery" ); } handle.shutdown().expect("shutdown should succeed"); } // --------------------------------------------------------------------------- // UAT-03: Replay from checkpoint produces identical event data // // Spec: "WAL replay from any checkpoint produces identical state to // uninterrupted execution" // // The existing tests check counts. This test verifies byte-level identity: // every field of every replayed event matches the originally written event. // --------------------------------------------------------------------------- #[test] fn uat_03_replay_produces_identical_state() { let dir = tempfile::tempdir().expect("tempdir creation should succeed"); // Generate 200 events with varied fields (different signal types, weights, // timestamps) to maximize coverage of the serialization path. let events: Vec = (0..200u64) .map(|i| { #[allow(clippy::cast_possible_truncation)] SignalEvent { entity_id: i * 7 + 42, signal_type: (i % 256) as u8, weight: ((i % 50) as f32).mul_add(0.1, 0.5), timestamp_nanos: 1_000_000_000 + i * 500_000, } }) .collect(); // Session 1: write all events, checkpoint at event 100, write remaining. let config = uat_config(dir.path()); let (handle, _, _) = WalHandle::open(config).expect("open should succeed"); let mut seqs = Vec::with_capacity(200); for event in &events { let seq = handle.append(event.clone()).expect("append should succeed"); assert!(seq > 0, "unique event should get real sequence number"); seqs.push(seq); } // Checkpoint at the 100th event let checkpoint_seq = seqs[99]; handle .checkpoint(checkpoint_seq) .expect("checkpoint should succeed"); handle.shutdown().expect("shutdown should succeed"); // Session 2: reopen and verify replayed events match exactly. let config = uat_config(dir.path()); let (handle, replayed, _session_events) = WalHandle::open(config).expect("reopen should succeed"); // The replayed events should include at least events 100..200 // (those with seq >= checkpoint_seq). assert!( replayed.len() >= 100, "expected at least 100 replayed events (post-checkpoint), got {}", replayed.len() ); // Verify byte-level identity of the tail (the 100 events after checkpoint). // The tail of the replayed list should match events[100..200]. let post_checkpoint_replay: Vec<&SignalEvent> = replayed.iter().rev().take(100).rev().collect(); for (i, replayed_event) in post_checkpoint_replay.iter().enumerate() { let original = &events[100 + i]; assert_eq!( replayed_event.entity_id, original.entity_id, "event {i} entity_id mismatch in replay" ); assert_eq!( replayed_event.signal_type, original.signal_type, "event {i} signal_type mismatch in replay" ); assert_eq!( replayed_event.weight.to_bits(), original.weight.to_bits(), "event {i} weight mismatch in replay (bits differ)" ); assert_eq!( replayed_event.timestamp_nanos, original.timestamp_nanos, "event {i} timestamp_nanos mismatch in replay" ); } handle.shutdown().expect("shutdown should succeed"); } // --------------------------------------------------------------------------- // UAT-04: Truncate after checkpoint, then new writes succeed // // Spec: "WAL can be truncated after a checkpoint without losing committed // state" // // This tests the full cycle: write -> checkpoint -> truncate -> write more -> // reopen -> verify that the post-truncation writes survive and the WAL is // fully operational. // --------------------------------------------------------------------------- #[test] fn uat_04_truncate_then_continue_writing() { let dir = tempfile::tempdir().expect("tempdir creation should succeed"); // Use small segments to force multiple segment files. let make_config = |d: &std::path::Path| WalConfig { dir: d.to_path_buf(), segment_size: 512, batch_size: 10, batch_timeout: Duration::from_millis(10), dedup_window: Duration::from_secs(30), }; // Write 100 events (will span multiple segments due to small segment size). let config = make_config(dir.path()); let (handle, _, _) = WalHandle::open(config).expect("open should succeed"); let mut seqs = Vec::with_capacity(100); for i in 1..=100 { let seq = handle.append(make_event(i)).expect("append should succeed"); seqs.push(seq); } // Checkpoint at event 80 let checkpoint_seq = seqs[79]; handle .checkpoint(checkpoint_seq) .expect("checkpoint should succeed"); // Truncate all segments before the checkpoint handle .truncate_before(checkpoint_seq) .expect("truncate should succeed"); // Write 50 more events after truncation let mut post_truncation_events = Vec::with_capacity(50); for i in 101..=150 { let event = make_event(i); post_truncation_events.push(event.clone()); let seq = handle .append(event) .expect("post-truncation append should succeed"); assert!(seq > 0, "post-truncation event should get real seq"); } handle.shutdown().expect("shutdown should succeed"); // Reopen and verify the post-truncation events are present. let config = make_config(dir.path()); let (handle, replayed, _session_events) = WalHandle::open(config).expect("reopen should succeed"); // The 50 post-truncation events must be in the replay. assert!( replayed.len() >= 50, "expected at least 50 replayed events (post-truncation writes), got {}", replayed.len() ); // Verify the post-truncation events appear at the end of the replay. let tail: Vec<&SignalEvent> = replayed.iter().rev().take(50).rev().collect(); for (i, event) in tail.iter().enumerate() { let expected = &post_truncation_events[i]; assert_eq!( event.entity_id, expected.entity_id, "post-truncation event {i} entity_id mismatch" ); } // Verify the WAL can accept new writes after reopen post-truncation. let new_seq = handle .append(make_event(9999)) .expect("new append after reopen should succeed"); assert!( new_seq > 0, "new event after reopen post-truncation should get real seq" ); handle.shutdown().expect("shutdown should succeed"); } // --------------------------------------------------------------------------- // UAT-05: Group commit batches concurrent events together // // Spec: "Group commit batches up to 100 events or 10ms, whichever comes first; // fsync is called per batch, not per event" // // We submit many events concurrently from multiple threads. If batching works, // events in the same batch will have consecutive sequence numbers. We verify // that the total latency for N concurrent appends is NOT proportional to N // individual fsyncs (which would take seconds), and that sequence numbers are // dense (no gaps, indicating batching occurred). // --------------------------------------------------------------------------- #[test] fn uat_05_group_commit_batches_concurrent_events() { let dir = tempfile::tempdir().expect("tempdir creation should succeed"); let config = WalConfig { dir: dir.path().to_path_buf(), segment_size: 16 * 1024 * 1024, batch_size: 100, batch_timeout: Duration::from_millis(10), dedup_window: Duration::from_secs(30), }; let (handle, _, _) = WalHandle::open(config).expect("open should succeed"); let handle = Arc::new(handle); let num_threads = 4; let events_per_thread = 250; // 1000 total events let start = std::time::Instant::now(); let mut threads = Vec::new(); for t in 0..num_threads { let h = Arc::clone(&handle); threads.push(std::thread::spawn(move || { let mut thread_seqs = Vec::with_capacity(events_per_thread); for i in 0..events_per_thread { let entity_id = (t * events_per_thread + i) as u64; let event = SignalEvent { entity_id, signal_type: t as u8, weight: 1.0, timestamp_nanos: entity_id * 1_000, }; let seq = h.append(event).expect("concurrent append should succeed"); thread_seqs.push(seq); } thread_seqs })); } let mut all_seqs = Vec::new(); for t in threads { all_seqs.extend(t.join().expect("thread should join")); } let elapsed = start.elapsed(); let handle = Arc::try_unwrap(handle).expect("should be sole owner"); handle.shutdown().expect("shutdown should succeed"); // All 1000 events should have real sequence numbers (no dedup). let non_zero: Vec = all_seqs.iter().copied().filter(|&s| s > 0).collect(); assert_eq!( non_zero.len(), num_threads * events_per_thread, "all events should get unique sequence numbers" ); // Verify sequence numbers are dense: no gaps when sorted. let mut sorted = non_zero.clone(); sorted.sort_unstable(); sorted.dedup(); assert_eq!( sorted.len(), non_zero.len(), "no duplicate sequence numbers" ); // The sequence numbers should be contiguous: last - first + 1 == count. let min_seq = *sorted.first().expect("non-empty"); let max_seq = *sorted.last().expect("non-empty"); assert_eq!( (max_seq - min_seq + 1) as usize, sorted.len(), "sequence numbers should be contiguous (evidence of batching)" ); // If fsync was per-event, 1000 fsyncs at ~1ms each would take ~1s+. // With group commit, this should complete much faster. // Use a generous threshold to avoid flaky CI, but still catch // per-event fsync pathology. assert!( elapsed.as_secs() < 10, "1000 concurrent events took {elapsed:?}; if batching works this should be fast" ); // Verify replay integrity let config = uat_config(dir.path()); let (handle, replayed, _session_events) = WalHandle::open(config).expect("reopen should succeed"); assert_eq!( replayed.len(), num_threads * events_per_thread, "all events should survive replay" ); handle.shutdown().expect("shutdown should succeed"); } // --------------------------------------------------------------------------- // UAT-06: Dedup survives across sessions (within dedup window) // // Spec: "Duplicate events (same BLAKE3 hash) are silently deduplicated" // // After reopening the WAL, events replayed during recovery are populated // into the dedup window. Resubmitting the same event should still return // Ok(0). // --------------------------------------------------------------------------- #[test] fn uat_06_dedup_survives_reopen() { let dir = tempfile::tempdir().expect("tempdir creation should succeed"); let config = WalConfig { dir: dir.path().to_path_buf(), segment_size: 16 * 1024 * 1024, batch_size: 100, batch_timeout: Duration::from_millis(10), dedup_window: Duration::from_secs(60), // long window to span sessions }; // Session 1: write an event let (handle, _, _) = WalHandle::open(config).expect("open should succeed"); let event = make_event(42); let seq = handle.append(event.clone()).expect("append should succeed"); assert!(seq > 0, "first append should get real seq"); handle.shutdown().expect("shutdown should succeed"); // Session 2: reopen and try to write the same event let config = WalConfig { dir: dir.path().to_path_buf(), segment_size: 16 * 1024 * 1024, batch_size: 100, batch_timeout: Duration::from_millis(10), dedup_window: Duration::from_secs(60), }; let (handle, replayed, _session_events) = WalHandle::open(config).expect("reopen should succeed"); assert_eq!(replayed.len(), 1, "should replay the one event"); // The same event should be detected as duplicate even after reopen. let dup_seq = handle.append(event).expect("dup append should succeed"); assert_eq!( dup_seq, 0, "duplicate event after reopen should return seq=0 (dedup), got {dup_seq}" ); handle.shutdown().expect("shutdown should succeed"); } // --------------------------------------------------------------------------- // UAT-07: Multiple checkpoint-truncate cycles maintain correctness // // This exercises the checkpoint-truncate cycle repeatedly to verify no // state corruption accumulates over multiple cycles. // --------------------------------------------------------------------------- #[test] fn uat_07_multiple_checkpoint_truncate_cycles() { let dir = tempfile::tempdir().expect("tempdir creation should succeed"); let make_config = |d: &std::path::Path| WalConfig { dir: d.to_path_buf(), segment_size: 16 * 1024 * 1024, batch_size: 100, batch_timeout: Duration::from_millis(10), dedup_window: Duration::from_secs(30), }; // Simulate the real checkpoint-truncate lifecycle over 5 cycles: // // 1. Open WAL, replay events (if any) // 2. Write new events // 3. Checkpoint at the LAST event written // 4. Truncate segments before checkpoint (removes old segments) // 5. Shutdown // // The key invariant: after checkpoint + truncate, the _next_ reopen // has zero replayed events (all events were checkpointed and the // containing segment was truncated). This is CORRECT -- the materializer // already consumed them. Replaying them again would be a double-apply bug. // The WAL replay condition is strict greater-than (event_seq > checkpoint_seq), // so checkpoint events are never replayed, preventing double-apply. let mut last_checkpoint_seq = 0u64; for cycle in 0..5u64 { let config = make_config(dir.path()); let (handle, replayed, _session_events) = WalHandle::open(config).expect("open should succeed"); // Events from completed cycles are checkpointed and must NOT be replayed. // Zero replayed events is correct behavior here. let _ = replayed; // Write 50 new events in this cycle. let base = cycle * 50 + 1; let mut cycle_seqs = Vec::new(); for i in base..base + 50 { let seq = handle.append(make_event(i)).expect("append should succeed"); assert!(seq > 0, "event should get real seq in cycle {cycle}"); cycle_seqs.push(seq); } // Checkpoint at the last event of this cycle. let cp_seq = *cycle_seqs.last().expect("non-empty"); handle .checkpoint(cp_seq) .expect("checkpoint should succeed"); last_checkpoint_seq = cp_seq; // Truncate old segments now that checkpoint is durable. handle .truncate_before(last_checkpoint_seq) .expect("truncate should succeed"); handle.shutdown().expect("shutdown should succeed"); } // Final reopen: verify the WAL is operational after 5 cycles. let config = make_config(dir.path()); let (handle, _replayed, _session_events) = WalHandle::open(config).expect("final reopen should succeed"); // The WAL should be fully operational: new writes succeed and get // sequence numbers higher than the last checkpoint. let new_seq = handle .append(make_event(9999)) .expect("append after multi-cycle recovery should succeed"); assert!(new_seq > 0); assert!( new_seq > last_checkpoint_seq, "new seq {new_seq} should be > last checkpoint seq {last_checkpoint_seq}" ); handle.shutdown().expect("shutdown should succeed"); }