Phase 5C (Index Persistence) implementation: - PersistentVectorIndex with hot/cold architecture - Hot: in-memory HNSW for recent vectors - Cold: memory-mapped HNSW loaded from disk - Background builder for WAL replay and atomic swap - BLAKE3 integrity verification - PersistentVisualIndex with checkpoint persistence - BkTreeSnapshot with rkyv serialization - CRC32C corruption detection - Atomic write pattern (temp → fsync → rename) - Key codec additions for vector index metadata - Split large files into modules (<500 lines each) - battery_pre_sentinel.rs → battery/ directory - visual_index.rs → visual_index/ directory - persistent.rs → persistent/ directory - Refactored ingest worker tests for clarity - Updated roadmap to mark Phase 5 complete Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
258 lines
10 KiB
Rust
258 lines
10 KiB
Rust
//! Battery 7: Materialized View Consistency.
|
|
//!
|
|
//! Purpose: Aphoria queries MVs for fast conflict checks. Stale or inconsistent
|
|
//! MVs produce wrong verdicts. These tests validate MV staleness detection and
|
|
//! basic MV integrity.
|
|
//!
|
|
//! Note: Changelog and `since` queries are defined in the spec but NOT YET
|
|
//! IMPLEMENTED in the Materializer. Tests 7.2-7.4 are implemented as stubs
|
|
//! documenting the expected behavior. Tests 7.1, 7.5, 7.6 validate the
|
|
//! currently implemented features.
|
|
//!
|
|
//! # Test Coverage
|
|
//!
|
|
//! | Test | Feature | Validates |
|
|
//! |------|---------|-----------|
|
|
//! | `test_mv_initial_materialization` | MV creation | Structure and storage |
|
|
//! | `test_mv_winner_changes_on_update` | Changelog | STUB - not implemented |
|
|
//! | `test_mv_no_changelog_when_winner_unchanged` | Changelog | STUB - not implemented |
|
|
//! | `test_mv_since_query_returns_changelog` | Since query | STUB - not implemented |
|
|
//! | `test_mv_max_stale_fast_path` | Staleness | Fresh MV fast path |
|
|
//! | `test_mv_max_stale_slow_path` | Staleness | Stale MV slow path |
|
|
|
|
#![allow(clippy::expect_used)] // Test code uses expect() for clear failure messages
|
|
|
|
use super::helpers::*;
|
|
|
|
/// Battery 7.1: Initial materialization creates MV with correct structure.
|
|
///
|
|
/// Validates:
|
|
/// - MV is written to correct key: `{subject}\x00MV:{predicate}`
|
|
/// - MV contains the winning assertion
|
|
/// - MV confidence matches the assertion's confidence
|
|
/// - materialized_at timestamp is set
|
|
#[tokio::test]
|
|
async fn test_mv_initial_materialization() {
|
|
let tmp_dir = tempdir().expect("create temp dir");
|
|
let db_dir = tmp_dir.path().join("db");
|
|
let store = Arc::new(HybridStore::open(&db_dir).expect("open store"));
|
|
let index_store = GenericIndexStore::new(store.clone());
|
|
|
|
let base_ts = 1000;
|
|
|
|
// Create assertion A with confidence 0.9
|
|
let assertion_a = create_signed_assertion_with_source(
|
|
"Subject_H",
|
|
"predicate_mv",
|
|
ObjectValue::Text("value_a".to_string()),
|
|
SourceClass::Clinical,
|
|
0.9,
|
|
base_ts,
|
|
);
|
|
|
|
// Store assertion directly (bypass WAL for simplicity)
|
|
store_assertion_direct(&store, &index_store, &assertion_a).await;
|
|
|
|
// Create materializer with RecencyLens
|
|
let lens: Box<dyn AsyncLens> = Box::new(SyncLensWrapper(RecencyLens));
|
|
let materializer = Materializer::new(store.clone(), lens);
|
|
|
|
// Run materialization step
|
|
let report = materializer.step().await.expect("materialization step");
|
|
assert_eq!(report.views_updated, 1, "should update 1 view");
|
|
|
|
// Read the MV from storage
|
|
let mv = materializer
|
|
.get_materialized_view("Subject_H", "predicate_mv")
|
|
.await
|
|
.expect("get MV")
|
|
.expect("MV should exist");
|
|
|
|
// Validate MV structure
|
|
assert_eq!(mv.winner.subject, "Subject_H", "MV winner subject matches");
|
|
assert_eq!(mv.winner.predicate, "predicate_mv", "MV winner predicate matches");
|
|
assert_eq!(mv.winner.confidence, 0.9, "MV winner confidence matches");
|
|
assert_eq!(mv.lens_name, "Recency", "MV lens name is correct");
|
|
assert_eq!(mv.candidates_count, 1, "MV candidates_count is 1");
|
|
assert!(mv.materialized_at > 0, "MV materialized_at timestamp is set");
|
|
|
|
// Verify the MV is at the correct key
|
|
let mv_key = key_codec::mv_key("Subject_H", "predicate_mv");
|
|
let mv_bytes = store.get(&mv_key).await.expect("get MV key");
|
|
assert!(mv_bytes.is_some(), "MV should be stored at correct key");
|
|
}
|
|
|
|
/// Battery 7.2: Winner changes on update (STUB - changelog not implemented).
|
|
///
|
|
/// Expected behavior:
|
|
/// - Ingest A (confidence 0.9), materialize
|
|
/// - Ingest B (same S/P, confidence 0.95), materialize again
|
|
/// - MV winner changes to B
|
|
/// - Changelog has 2 entries: initial (winner=A), update (previous=A, new=B)
|
|
///
|
|
/// Current status: MaterializedView does not track changelog yet.
|
|
/// ChangeEntry is defined but Materializer doesn't write it.
|
|
#[tokio::test]
|
|
#[ignore = "Changelog not yet implemented - see ChangeEntry in stemedb-core"]
|
|
async fn test_mv_winner_changes_on_update() {
|
|
// TODO: Implement when Materializer writes ChangeEntry records
|
|
// Expected key pattern: MVC:{subject}:{predicate}:{timestamp}
|
|
panic!("Not yet implemented: Materializer needs to write ChangeEntry records");
|
|
}
|
|
|
|
/// Battery 7.3: No changelog when winner unchanged (STUB - changelog not implemented).
|
|
///
|
|
/// Expected behavior:
|
|
/// - Ingest A (confidence 0.9), materialize
|
|
/// - Ingest B (same S/P, confidence 0.5), materialize again
|
|
/// - MV winner stays A (B has lower confidence)
|
|
/// - No new changelog entry after second materialization
|
|
///
|
|
/// Current status: MaterializedView does not track changelog yet.
|
|
#[tokio::test]
|
|
#[ignore = "Changelog not yet implemented - see ChangeEntry in stemedb-core"]
|
|
async fn test_mv_no_changelog_when_winner_unchanged() {
|
|
// TODO: Implement when Materializer writes ChangeEntry records
|
|
panic!("Not yet implemented: Materializer needs to write ChangeEntry records");
|
|
}
|
|
|
|
/// Battery 7.4: Since query returns changelog (STUB - since query not implemented).
|
|
///
|
|
/// Expected behavior:
|
|
/// - Ingest A at T=1000, materialize at T=1001
|
|
/// - Ingest B at T=2000, materialize at T=2001
|
|
/// - Query with `since: 1500` returns only changelog entries after T=1500
|
|
/// - Should include B materialization but not A materialization
|
|
///
|
|
/// Current status: Query struct has no `since` field yet.
|
|
#[tokio::test]
|
|
#[ignore = "Since query not yet implemented - Query struct needs `since` field"]
|
|
async fn test_mv_since_query_returns_changelog() {
|
|
// TODO: Add `since` field to Query struct
|
|
// TODO: Implement changelog query API in QueryEngine or Materializer
|
|
panic!("Not yet implemented: Query.since field and changelog query API");
|
|
}
|
|
|
|
/// Battery 7.5: max_stale fast path when MV is fresh.
|
|
///
|
|
/// Validates:
|
|
/// - When `max_stale: 60` is set, a freshly materialized MV is accepted
|
|
/// - The fast path (MV lookup) is used
|
|
/// - Query returns the MV winner
|
|
#[tokio::test]
|
|
async fn test_mv_max_stale_fast_path() {
|
|
let tmp_dir = tempdir().expect("create temp dir");
|
|
let db_dir = tmp_dir.path().join("db");
|
|
let store = Arc::new(HybridStore::open(&db_dir).expect("open store"));
|
|
let index_store = GenericIndexStore::new(store.clone());
|
|
|
|
let base_ts = 1000;
|
|
|
|
// Create assertion A
|
|
let assertion_a = create_signed_assertion_with_source(
|
|
"Subject_I",
|
|
"predicate_fresh",
|
|
ObjectValue::Text("fresh_value".to_string()),
|
|
SourceClass::Clinical,
|
|
0.9,
|
|
base_ts,
|
|
);
|
|
|
|
store_assertion_direct(&store, &index_store, &assertion_a).await;
|
|
|
|
// Materialize
|
|
let lens: Box<dyn AsyncLens> = Box::new(SyncLensWrapper(RecencyLens));
|
|
let materializer = Materializer::new(store.clone(), lens);
|
|
materializer.step().await.expect("materialization step");
|
|
|
|
// Query immediately with max_stale: 60 (MV is fresh)
|
|
let query =
|
|
Query::builder().subject("Subject_I").predicate("predicate_fresh").max_stale(60).build();
|
|
|
|
let engine = QueryEngine::new(store.clone());
|
|
let result = engine.execute(&query).await.expect("execute query");
|
|
|
|
// Verify we got a result (fast path used)
|
|
assert_eq!(result.assertions.len(), 1, "fast path should return 1 result");
|
|
assert_eq!(result.assertions[0].confidence, 0.9, "result confidence matches MV");
|
|
|
|
// The MV should be used because it's fresh (materialized just now)
|
|
// Note: We can't directly observe "fast path vs slow path" without instrumentation,
|
|
// but we verify the result is consistent with MV being used.
|
|
}
|
|
|
|
/// Battery 7.6: max_stale slow path when MV is stale.
|
|
///
|
|
/// Validates:
|
|
/// - When MV is older than `max_stale` threshold, slow path is used
|
|
/// - Query re-computes from candidate assertions instead of using MV
|
|
///
|
|
/// Note: This test relies on Query.max_stale being implemented in QueryEngine.
|
|
/// Current implementation: max_stale exists in Query but may not be enforced yet.
|
|
#[tokio::test]
|
|
async fn test_mv_max_stale_slow_path() {
|
|
let tmp_dir = tempdir().expect("create temp dir");
|
|
let db_dir = tmp_dir.path().join("db");
|
|
let store = Arc::new(HybridStore::open(&db_dir).expect("open store"));
|
|
let index_store = GenericIndexStore::new(store.clone());
|
|
|
|
let base_ts = 1000;
|
|
|
|
// Create assertion A
|
|
let assertion_a = create_signed_assertion_with_source(
|
|
"Subject_J",
|
|
"predicate_stale",
|
|
ObjectValue::Text("stale_value".to_string()),
|
|
SourceClass::Clinical,
|
|
0.9,
|
|
base_ts,
|
|
);
|
|
|
|
store_assertion_direct(&store, &index_store, &assertion_a).await;
|
|
|
|
// Materialize
|
|
let lens: Box<dyn AsyncLens> = Box::new(SyncLensWrapper(RecencyLens));
|
|
let materializer = Materializer::new(store.clone(), lens);
|
|
materializer.step().await.expect("materialization step");
|
|
|
|
// Get the MV to check its timestamp
|
|
let mv = materializer
|
|
.get_materialized_view("Subject_J", "predicate_stale")
|
|
.await
|
|
.expect("get MV")
|
|
.expect("MV should exist");
|
|
|
|
let mv_age_seconds =
|
|
std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).expect("time").as_secs()
|
|
- mv.materialized_at;
|
|
|
|
// Query with max_stale: 0 (always use slow path)
|
|
let query = Query::builder()
|
|
.subject("Subject_J")
|
|
.predicate("predicate_stale")
|
|
.max_stale(0) // Force slow path
|
|
.build();
|
|
|
|
let engine = QueryEngine::new(store.clone());
|
|
let result = engine.execute(&query).await.expect("execute query");
|
|
|
|
// Verify we still get correct result (slow path re-computes from index)
|
|
assert_eq!(result.assertions.len(), 1, "slow path should return 1 result");
|
|
assert_eq!(result.assertions[0].confidence, 0.9, "result confidence matches assertion");
|
|
|
|
// Document the staleness check behavior
|
|
// With max_stale: 0, even a freshly materialized MV should be considered stale
|
|
// and the slow path should be used. However, we can't directly observe which
|
|
// path was taken without instrumentation/metrics.
|
|
//
|
|
// This test validates that the query returns correct results regardless of
|
|
// whether fast or slow path is used.
|
|
|
|
// Additional validation: verify MV age is reasonable
|
|
assert!(
|
|
mv_age_seconds < 5,
|
|
"MV should be very fresh (materialized seconds ago), got {} seconds",
|
|
mv_age_seconds
|
|
);
|
|
}
|