# Task 07: Crash Fencing for M6 State ## Delivers Property tests proving crash recovery correctness for all M6 state surfaces: `CohortSignalLedger`, `CollectionIndex`, `CoEngagementIndex`, and active sessions. Each state surface is tested against its relevant crash points with random event sequences. After simulated crash + reopen, every invariant must hold. ## Complexity: L ## Dependencies - Task 01 (CrashPoint enum + fault injection hooks) ## Technical Design ### 1. CohortSignalLedger crash recovery tests The cohort ledger checkpoints atomically via `WriteBatch`. If the checkpoint is interrupted, the previous checkpoint (or empty state) is the restore point. The cohort ledger is best-effort (does not go through WAL), so signals written to the cohort ledger but not checkpointed are lost on crash. This is by design -- the acceptance criterion is that the restored state is consistent (no corrupt entries), not that every cohort signal survives. ```rust // tidal/tests/m7_crash_m6.rs use std::time::Duration; use tidaldb::cohort::CohortSignalLedger; use tidaldb::schema::{DecaySpec, EntityId, EntityKind, SchemaBuilder, Timestamp, Window}; use tidaldb::signals::SignalTypeId; use tidaldb::storage::InMemoryBackend; use tidaldb::signals::checkpoint::CheckpointMeta; fn test_schema() -> tidaldb::schema::Schema { let mut builder = SchemaBuilder::new(); let _ = builder .signal("view", EntityKind::Item, DecaySpec::Exponential { half_life: Duration::from_secs(7 * 24 * 3600) }) .windows(&[Window::AllTime]) .velocity(false) .add(); builder.build().unwrap() } #[test] fn cohort_ledger_checkpoint_restore_roundtrip() { let schema = test_schema(); let ledger = CohortSignalLedger::new(&schema); let type_id = ledger.resolve_signal_type("view").unwrap(); let ts_ns = Timestamp::now().as_nanos(); // Record signals across multiple cohorts. for i in 1..=100u64 { ledger.record("cohort_a", EntityId::new(i), type_id, 1.0, ts_ns); if i % 3 == 0 { ledger.record("cohort_b", EntityId::new(i), type_id, 2.0, ts_ns); } } let storage = InMemoryBackend::new(); let meta = CheckpointMeta { checkpoint_time_ns: ts_ns, wal_sequence: 42, payload_hash: [0u8; 32], // cohort uses same meta but no BLAKE3 yet }; ledger.checkpoint(&storage, meta).unwrap(); // Restore into a fresh ledger. let ledger2 = CohortSignalLedger::new(&schema); let restored_meta = ledger2.restore(&storage).unwrap().unwrap(); assert_eq!(restored_meta.wal_sequence, 42); assert_eq!(ledger2.entry_count(), ledger.entry_count()); // Verify specific counts. let count_a = ledger2 .read_windowed_count("cohort_a", EntityId::new(1), "view", Window::AllTime) .unwrap(); assert_eq!(count_a, 1); let count_b = ledger2 .read_windowed_count("cohort_b", EntityId::new(3), "view", Window::AllTime) .unwrap(); assert_eq!(count_b, 1); // Cohort not in the ledger returns 0. let count_c = ledger2 .read_windowed_count("cohort_c", EntityId::new(1), "view", Window::AllTime) .unwrap(); assert_eq!(count_c, 0); } ``` ### 2. CohortSignalLedger crash during checkpoint ```rust use proptest::prelude::*; proptest! { #![proptest_config(ProptestConfig::with_cases(500))] #[test] fn cohort_ledger_survives_checkpoint_crash( cohort_count in 1usize..5, entities_per_cohort in 1usize..20, signals_per_entity in 1usize..10, ) { let schema = test_schema(); let storage = InMemoryBackend::new(); let type_id = SignalTypeId::new(0); // "view" is first alphabetically let base_ns = 1_000_000_000_000u64; // Phase 1: Write signals and do a clean checkpoint. let ledger = CohortSignalLedger::new(&schema); for c in 0..cohort_count { let cohort_name = format!("cohort_{c}"); for e in 1..=entities_per_cohort as u64 { for s in 0..signals_per_entity { let ts_ns = base_ns + (s as u64) * 1_000_000; ledger.record(&cohort_name, EntityId::new(e), type_id, 1.0, ts_ns); } } } let meta = CheckpointMeta { checkpoint_time_ns: base_ns, wal_sequence: 100, payload_hash: [0u8; 32], }; ledger.checkpoint(&storage, meta).unwrap(); let checkpoint_entry_count = ledger.entry_count(); // Phase 2: Write more signals (these will be lost on crash). for e in 1..=3u64 { ledger.record("cohort_0", EntityId::new(e + 1000), type_id, 1.0, base_ns); } prop_assert!(ledger.entry_count() > checkpoint_entry_count); // Phase 3: Simulate crash (don't checkpoint the new signals). // Restore from the checkpoint. let ledger2 = CohortSignalLedger::new(&schema); let restored_meta = ledger2.restore(&storage).unwrap().unwrap(); prop_assert_eq!(restored_meta.wal_sequence, 100); // Restored entry count should match the CHECKPOINT entry count, // not the live entry count (post-checkpoint signals are lost). prop_assert_eq!(ledger2.entry_count(), checkpoint_entry_count); } } ``` ### 3. CollectionIndex crash recovery Collections are persisted to fjall via `Tag::Collection` keys. The `rebuild_collections` function scans these keys on open. Test that after a crash during `add_to_collection`, the collection state on reopen is consistent (either the item is in the collection or it is not -- no partial state). ```rust #[test] fn collection_persists_across_restart() { let dir = tempfile::tempdir().unwrap(); let schema = test_schema_with_collections(); // schema with text fields for items // Phase 1: Create collection, add items, close. { let db = TidalDb::builder() .with_data_dir(dir.path()) .with_schema(schema.clone()) .open() .unwrap(); // Write items first (collections reference items). for i in 1..=10u64 { db.write_item(EntityId::new(i), &HashMap::new()).unwrap(); } let cid = db.create_collection(42, "favorites", Visibility::Private).unwrap(); for i in 1..=5u64 { db.add_to_collection(cid, EntityId::new(i)).unwrap(); } db.close().unwrap(); } // Phase 2: Reopen and verify collection state. { let db = TidalDb::builder() .with_data_dir(dir.path()) .with_schema(schema) .open() .unwrap(); let collections = db.list_collections(42).unwrap(); assert_eq!(collections.len(), 1); assert_eq!(collections[0].name, "favorites"); assert_eq!(collections[0].item_count, 5); db.close().unwrap(); } } proptest! { #![proptest_config(ProptestConfig::with_cases(200))] #[test] fn collection_items_survive_restart( item_count in 1usize..50, items_in_collection in 1usize..50, ) { let n_items = item_count; let n_in_coll = items_in_collection.min(n_items); let dir = tempfile::tempdir().unwrap(); let schema = test_schema_with_collections(); { let db = TidalDb::builder() .with_data_dir(dir.path()) .with_schema(schema.clone()) .open() .unwrap(); for i in 1..=n_items as u64 { db.write_item(EntityId::new(i), &HashMap::new()).unwrap(); } let cid = db.create_collection(1, "test", Visibility::Public).unwrap(); for i in 1..=n_in_coll as u64 { db.add_to_collection(cid, EntityId::new(i)).unwrap(); } db.close().unwrap(); } { let db = TidalDb::builder() .with_data_dir(dir.path()) .with_schema(schema) .open() .unwrap(); let collections = db.list_collections(1).unwrap(); prop_assert_eq!(collections.len(), 1); prop_assert_eq!(collections[0].item_count, n_in_coll); db.close().unwrap(); } } } ``` ### 4. CoEngagementIndex crash recovery The co-engagement index checkpoints during shutdown. Test that edges survive a clean restart and that the weight-based eviction invariant (edge_count <= capacity) holds after recovery. ```rust #[test] fn co_engagement_survives_restart() { let dir = tempfile::tempdir().unwrap(); let schema = test_schema_with_entities(); { let db = TidalDb::builder() .with_data_dir(dir.path()) .with_schema(schema.clone()) .open() .unwrap(); // Write items and signals that trigger co-engagement. for i in 1..=20u64 { db.write_item(EntityId::new(i), &HashMap::new()).unwrap(); } // Simulate user engagement: user 1 likes items 1-10 sequentially. for i in 1..=10u64 { let ts = Timestamp::from_nanos(1_000_000_000_000 + i * 1_000_000); db.signal("like", EntityId::new(i), 1.0, ts).unwrap(); } let edge_count_before = db.co_engagement().edge_count(); assert!(edge_count_before > 0, "co-engagement edges should exist"); db.close().unwrap(); } { let db = TidalDb::builder() .with_data_dir(dir.path()) .with_schema(schema) .open() .unwrap(); let edge_count_after = db.co_engagement().edge_count(); // Edges should survive the restart (checkpoint in shutdown). assert!(edge_count_after > 0, "co-engagement edges should survive restart"); db.close().unwrap(); } } proptest! { #![proptest_config(ProptestConfig::with_cases(200))] #[test] fn co_engagement_capacity_invariant_after_restart( edge_count in 1usize..100, ) { let index = CoEngagementIndex::with_capacity(50); let storage = InMemoryBackend::new(); // Insert edges directly. for i in 0..edge_count { index.insert_edge(i as u64, (i + 1) as u64, (i as f32) + 0.5); } // Checkpoint. index.checkpoint(&storage).unwrap(); // Restore into a fresh index. let index2 = CoEngagementIndex::with_capacity(50); index2.restore(&storage).unwrap(); // Edge count after restore matches the original (capped by checkpoint, // not by capacity -- capacity eviction happens on record_positive, not restore). let expected = edge_count.min(100); // all inserted edges, up to what we wrote prop_assert_eq!(index2.edge_count(), expected.min(index.edge_count())); } } ``` ### 5. Session crash recovery Sessions are journaled to `sessions.log` via the WAL writer thread. If a session-start is written but no session-close, the session should be restored as active on reopen. Test this with the existing `session_durability` test pattern. ```rust #[test] fn active_session_restored_after_crash() { let dir = tempfile::tempdir().unwrap(); let schema = test_schema_with_sessions(); let mut session_id = 0u64; { let db = TidalDb::builder() .with_data_dir(dir.path()) .with_schema(schema.clone()) .open() .unwrap(); // Start a session but do NOT close it (simulating crash). let handle = db.start_session(42, "test-agent", "default").unwrap(); session_id = handle.session_id().as_u64(); // Write some session signals. handle.signal("view", EntityId::new(1), 1.0, Timestamp::now()).unwrap(); handle.signal("view", EntityId::new(2), 1.0, Timestamp::now()).unwrap(); // Graceful close (writes checkpoint, but session is still "open"). db.close().unwrap(); } { let db = TidalDb::builder() .with_data_dir(dir.path()) .with_schema(schema) .open() .unwrap(); // The session should be restored with its signals. // The exact API depends on session restore implementation. // At minimum: the session's signal effects (view counts) should be present. let count = db.read_windowed_count( EntityId::new(1), "view", Window::AllTime ).unwrap(); assert!(count >= 1, "session signals should survive restart"); db.close().unwrap(); } } #[test] fn session_metadata_restored_after_crash() { let dir = tempfile::tempdir().unwrap(); let schema = test_schema_with_sessions(); { let db = TidalDb::builder() .with_data_dir(dir.path()) .with_schema(schema.clone()) .open() .unwrap(); let handle = db.start_session(42, "test-agent", "default").unwrap(); handle.annotate(EntityId::new(1), "good recommendation").unwrap(); // Do NOT close the session -- simulate crash. db.close().unwrap(); } { let db = TidalDb::builder() .with_data_dir(dir.path()) .with_schema(schema) .open() .unwrap(); // Session journal recovery should have replayed the session-start // and the annotation signal. The exact verification depends on // whether session state is queryable after restore. // At minimum: the database should open without error. db.health_check().unwrap(); db.close().unwrap(); } } ``` ### 6. Cross-surface crash test Verify that a crash during a mixed workload (signals + collections + co-engagement + cohort) does not corrupt any surface. ```rust #[test] fn mixed_workload_crash_recovery() { let dir = tempfile::tempdir().unwrap(); let schema = test_schema_full(); { let db = TidalDb::builder() .with_data_dir(dir.path()) .with_schema(schema.clone()) .open() .unwrap(); // Write items. for i in 1..=100u64 { let mut meta = HashMap::new(); meta.insert("title".to_string(), format!("Item {i}")); meta.insert("creator_id".to_string(), format!("{}", i % 10)); db.write_item_with_metadata(EntityId::new(i), &meta).unwrap(); } // Write signals. for i in 1..=100u64 { let ts = Timestamp::from_nanos(1_000_000_000_000 + i * 1_000_000); db.signal("view", EntityId::new(i), 1.0, ts).unwrap(); } // Create collection. let cid = db.create_collection(1, "test", Visibility::Public).unwrap(); for i in 1..=10u64 { db.add_to_collection(cid, EntityId::new(i)).unwrap(); } db.close().unwrap(); } { let db = TidalDb::builder() .with_data_dir(dir.path()) .with_schema(schema) .open() .unwrap(); // Verify all surfaces are consistent. db.health_check().unwrap(); // Signals survived. let count = db.read_windowed_count( EntityId::new(50), "view", Window::AllTime ).unwrap(); assert!(count >= 1); // Collections survived. let collections = db.list_collections(1).unwrap(); assert_eq!(collections.len(), 1); assert_eq!(collections[0].item_count, 10); // Items survived. let meta = db.get_item_metadata(EntityId::new(1)).unwrap(); assert!(meta.is_some()); db.close().unwrap(); } } ``` ## Acceptance Criteria - [ ] `cohort_ledger_checkpoint_restore_roundtrip`: cohort signal state round-trips correctly through checkpoint/restore - [ ] `cohort_ledger_survives_checkpoint_crash`: 500 proptest cases, interrupted checkpoint does not corrupt state - [ ] `collection_persists_across_restart`: collection with items survives clean restart - [ ] `collection_items_survive_restart`: 200 proptest cases, collection item counts match after restart - [ ] `co_engagement_survives_restart`: co-engagement edges persist through shutdown/reopen - [ ] `co_engagement_capacity_invariant_after_restart`: 200 proptest cases, edge count preserved through checkpoint/restore - [ ] `active_session_restored_after_crash`: session signals survive when session is not closed before shutdown - [ ] `session_metadata_restored_after_crash`: session journal events replayed on open - [ ] `mixed_workload_crash_recovery`: all surfaces consistent after mixed write + restart - [ ] All tests pass with `cargo test --test m7_crash_m6` ## Test Strategy All tests described in the Technical Design section are the deliverable. The key principle: each test writes state through the public `TidalDb` API, closes the database, reopens it, and verifies the state through the public API. This tests the full persistence pipeline (in-memory -> checkpoint/fjall -> restore/rebuild) without relying on internal implementation details. Property tests use proptest with varied workload sizes to catch edge cases in serialization, key encoding, and scan/rebuild logic.