stemedb/crates/stemedb-ingest/src/worker/tests/epoch_cascade.rs
jordan 42d4e09508 feat: Index persistence (Phase 5C) - vector hot/cold, visual checkpoint
Phase 5C (Index Persistence) implementation:
- PersistentVectorIndex with hot/cold architecture
  - Hot: in-memory HNSW for recent vectors
  - Cold: memory-mapped HNSW loaded from disk
  - Background builder for WAL replay and atomic swap
  - BLAKE3 integrity verification
- PersistentVisualIndex with checkpoint persistence
  - BkTreeSnapshot with rkyv serialization
  - CRC32C corruption detection
  - Atomic write pattern (temp → fsync → rename)
- Key codec additions for vector index metadata
- Split large files into modules (<500 lines each)
  - battery_pre_sentinel.rs → battery/ directory
  - visual_index.rs → visual_index/ directory
  - persistent.rs → persistent/ directory
- Refactored ingest worker tests for clarity
- Updated roadmap to mark Phase 5 complete

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-02 15:43:18 -07:00

193 lines
7.5 KiB
Rust

//! Epoch supersession cascade tests.
//!
//! Tests for supersession marker writing, transitive cascade,
//! and cycle detection in epoch chains.
use super::*;
use stemedb_storage::key_codec;
/// Test: Ingesting an epoch that supersedes another writes a SUPERSEDED marker.
///
/// Setup: Create epochs A and B where B supersedes A
/// Verify: SUPERSEDED:A key exists with value = B's ID
#[tokio::test]
async fn test_cascade_writes_superseded_marker() {
let dir = tempdir().expect("Failed to create temp dir");
let wal_dir = dir.path().join("wal");
let db_dir = dir.path().join("db");
let mut journal = Journal::open(&wal_dir).expect("Failed to open journal");
let store = HybridStore::open(&db_dir).expect("Failed to open store");
// Create epochs: B supersedes A
let epoch_a = stemedb_core::types::Epoch {
id: [1u8; 32],
name: "Epoch A".to_string(),
supersedes: None,
supersession_type: None,
start_timestamp: 1000,
end_timestamp: None,
};
let epoch_b = testing::test_epoch_with_supersession(
[2u8; 32],
"Epoch B",
[1u8; 32], // B supersedes A
stemedb_core::types::SupersessionType::Temporal,
);
// Write epochs to WAL: A first, then B
journal.append(serialize_epoch(&epoch_a).expect("ser")).expect("append");
journal.append(serialize_epoch(&epoch_b).expect("ser")).expect("append");
let journal = Arc::new(Mutex::new(journal));
let store = Arc::new(store);
let mut worker =
IngestWorker::new(journal, store.clone()).await.expect("Failed to create worker");
// Ingest both epochs
while worker.step().await.expect("step") > 0 {}
// Verify: SUPERSEDED:A marker exists and points to B
let marker_key = key_codec::superseded_key(&hex::encode([1u8; 32]));
let marker_value = store.get(&marker_key).await.expect("get").expect("marker should exist");
assert_eq!(marker_value.as_slice(), &[2u8; 32], "SUPERSEDED marker should point to epoch B");
// Verify epochs themselves are stored
let epoch_a_key = key_codec::epoch_key(&hex::encode([1u8; 32]));
let epoch_b_key = key_codec::epoch_key(&hex::encode([2u8; 32]));
assert!(store.get(&epoch_a_key).await.expect("get").is_some(), "Epoch A should be stored");
assert!(store.get(&epoch_b_key).await.expect("get").is_some(), "Epoch B should be stored");
}
/// Test: Transitive supersession cascade writes markers for all ancestors.
///
/// Setup: C supersedes B, B supersedes A
/// Verify: Both SUPERSEDED:A and SUPERSEDED:B exist, both pointing to C
#[tokio::test]
async fn test_cascade_transitive() {
let dir = tempdir().expect("Failed to create temp dir");
let wal_dir = dir.path().join("wal");
let db_dir = dir.path().join("db");
let mut journal = Journal::open(&wal_dir).expect("Failed to open journal");
let store = HybridStore::open(&db_dir).expect("Failed to open store");
// Create chain: C -> B -> A
let epoch_a = stemedb_core::types::Epoch {
id: [1u8; 32],
name: "Epoch A".to_string(),
supersedes: None,
supersession_type: None,
start_timestamp: 1000,
end_timestamp: None,
};
let epoch_b = testing::test_epoch_with_supersession(
[2u8; 32],
"Epoch B",
[1u8; 32], // B supersedes A
stemedb_core::types::SupersessionType::Temporal,
);
let epoch_c = testing::test_epoch_with_supersession(
[3u8; 32],
"Epoch C",
[2u8; 32], // C supersedes B
stemedb_core::types::SupersessionType::Temporal,
);
// Ingest in order: A, then B, then C
journal.append(serialize_epoch(&epoch_a).expect("ser")).expect("append");
journal.append(serialize_epoch(&epoch_b).expect("ser")).expect("append");
journal.append(serialize_epoch(&epoch_c).expect("ser")).expect("append");
let journal = Arc::new(Mutex::new(journal));
let store = Arc::new(store);
let mut worker =
IngestWorker::new(journal, store.clone()).await.expect("Failed to create worker");
// Ingest all epochs
while worker.step().await.expect("step") > 0 {}
// After C is ingested:
// - SUPERSEDED:B should point to C (immediate supersession)
// - SUPERSEDED:A should point to C (transitive, overwriting B->A marker from step 2)
let marker_a_key = key_codec::superseded_key(&hex::encode([1u8; 32]));
let marker_b_key = key_codec::superseded_key(&hex::encode([2u8; 32]));
let marker_a_value =
store.get(&marker_a_key).await.expect("get").expect("SUPERSEDED:A should exist");
let marker_b_value =
store.get(&marker_b_key).await.expect("get").expect("SUPERSEDED:B should exist");
// Both markers should point to C (the LATEST superseder)
assert_eq!(
marker_a_value.as_slice(),
&[3u8; 32],
"SUPERSEDED:A should point to C (the latest)"
);
assert_eq!(marker_b_value.as_slice(), &[3u8; 32], "SUPERSEDED:B should point to C");
// Verify no marker for C (C is the head, not superseded)
let marker_c_key = key_codec::superseded_key(&hex::encode([3u8; 32]));
let marker_c = store.get(&marker_c_key).await.expect("get");
assert!(marker_c.is_none(), "C should not have a SUPERSEDED marker");
}
/// Test: Cycle in supersession chain is handled gracefully.
#[tokio::test]
async fn test_cascade_cycle_detection() {
let dir = tempdir().expect("Failed to create temp dir");
let wal_dir = dir.path().join("wal");
let db_dir = dir.path().join("db");
let mut journal = Journal::open(&wal_dir).expect("Failed to open journal");
let store = HybridStore::open(&db_dir).expect("Failed to open store");
// Create a cycle: A supersedes B, B supersedes A
// This is pathological but we must not hang
let epoch_a = stemedb_core::types::Epoch {
id: [1u8; 32],
name: "Epoch A".to_string(),
supersedes: Some([2u8; 32]), // A supersedes B
supersession_type: Some(stemedb_core::types::SupersessionType::Temporal),
start_timestamp: 1000,
end_timestamp: None,
};
let epoch_b = stemedb_core::types::Epoch {
id: [2u8; 32],
name: "Epoch B".to_string(),
supersedes: Some([1u8; 32]), // B supersedes A (cycle!)
supersession_type: Some(stemedb_core::types::SupersessionType::Temporal),
start_timestamp: 2000,
end_timestamp: None,
};
// Ingest both
journal.append(serialize_epoch(&epoch_a).expect("ser")).expect("append");
journal.append(serialize_epoch(&epoch_b).expect("ser")).expect("append");
let journal = Arc::new(Mutex::new(journal));
let store = Arc::new(store);
let mut worker =
IngestWorker::new(journal, store.clone()).await.expect("Failed to create worker");
// This should NOT hang - cycle detection should kick in
while worker.step().await.expect("step") > 0 {}
// Verify both epochs are stored (the cycle doesn't break storage)
let epoch_a_key = key_codec::epoch_key(&hex::encode([1u8; 32]));
let epoch_b_key = key_codec::epoch_key(&hex::encode([2u8; 32]));
assert!(store.get(&epoch_a_key).await.expect("get").is_some(), "Epoch A should be stored");
assert!(store.get(&epoch_b_key).await.expect("get").is_some(), "Epoch B should be stored");
// At least one SUPERSEDED marker should exist
let marker_a =
store.get(&key_codec::superseded_key(&hex::encode([1u8; 32]))).await.expect("get");
let marker_b =
store.get(&key_codec::superseded_key(&hex::encode([2u8; 32]))).await.expect("get");
assert!(
marker_a.is_some() || marker_b.is_some(),
"At least one SUPERSEDED marker should exist"
);
}