#![allow(clippy::unwrap_used)] //! Session durability tests: persistent archive, hint-keyword ranking, //! per-signal windowed counts, and audit truncation. use std::collections::HashMap; use std::time::Duration; use tidaldb::TidalDb; use tidaldb::schema::{ AgentPolicy, DecaySpec, EntityId, EntityKind, SchemaBuilder, Timestamp, Window, }; use tidaldb::session::MAX_AUDIT_ENTRIES; fn test_schema() -> tidaldb::schema::Schema { let mut builder = SchemaBuilder::new(); for sig in &["view", "like", "reward", "skip"] { let _ = builder .signal( sig, EntityKind::Item, DecaySpec::Exponential { half_life: Duration::from_secs(7 * 24 * 3600), }, ) .windows(&[Window::OneHour, Window::TwentyFourHours]) .velocity(false) .add(); } let _ = builder.session_policy( "default_policy", AgentPolicy { allowed_signals: vec!["reward".to_string(), "view".to_string()], denied_signals: vec!["skip".to_string()], max_session_duration: Duration::from_secs(3600), max_signals_per_session: 10_000, }, ); builder.build().unwrap() } // ── Test 1: Archived session readable after close and reopen ──────────────── #[test] fn archived_session_readable_after_close_and_reopen() { let dir = tempfile::tempdir().unwrap(); let schema = test_schema(); let session_id; let written; { let db = TidalDb::builder() .with_data_dir(dir.path()) .with_schema(schema.clone()) .open() .unwrap(); let handle = db .start_session(1, "agent-a", "default_policy", HashMap::new()) .unwrap(); session_id = handle.id; let ts = Timestamp::now(); for i in 1u64..=3 { let mut meta = HashMap::new(); meta.insert("title".to_string(), format!("item-{i}")); db.write_item_with_metadata(EntityId::new(i), &meta) .unwrap(); } db.session_signal(&handle, "reward", EntityId::new(1), 1.0, ts, None) .unwrap(); db.session_signal(&handle, "view", EntityId::new(2), 0.5, ts, None) .unwrap(); let summary = db.close_session(handle).unwrap(); written = summary.signals_written; db.close().unwrap(); } // Reopen — snapshot must be readable from storage. { let db = TidalDb::builder() .with_data_dir(dir.path()) .with_schema(schema) .open() .unwrap(); let snap = db.session_snapshot(session_id).unwrap(); assert_eq!( snap.signals_written, written, "signals_written survives reopen" ); assert_eq!(snap.signals_written, 2); assert_eq!(snap.signals_rejected, 0); db.close().unwrap(); } } // ── Test 2: Hint keywords boost matching items ─────────────────────────────── #[test] fn hint_keywords_boost_matching_items() { let db = TidalDb::builder() .ephemeral() .with_schema(test_schema()) .open() .unwrap(); // Write 5 jazz items and 5 rock items. for i in 1u64..=5 { let mut meta = HashMap::new(); meta.insert("genre".to_string(), "jazz".to_string()); meta.insert("title".to_string(), format!("jazz-track-{i}")); db.write_item_with_metadata(EntityId::new(i), &meta) .unwrap(); } for i in 6u64..=10 { let mut meta = HashMap::new(); meta.insert("genre".to_string(), "rock".to_string()); meta.insert("title".to_string(), format!("rock-track-{i}")); db.write_item_with_metadata(EntityId::new(i), &meta) .unwrap(); } let handle = db .start_session(1, "agent-a", "default_policy", HashMap::new()) .unwrap(); let session_id = handle.id; let ts = Timestamp::now(); // Signal with annotation hinting jazz preference. db.session_signal( &handle, "reward", EntityId::new(1), 1.0, ts, Some("jazz fusion acoustic".to_string()), ) .unwrap(); // Query FOR SESSION. let query = tidaldb::query::retrieve::RetrieveBuilder::new( EntityKind::Item, tidaldb::query::retrieve::ProfileRef::new("hot"), ) .limit(10) .for_session(session_id) .build() .unwrap(); let results = db.retrieve(&query).unwrap(); assert!(!results.items.is_empty(), "should return results"); assert!( results.session_snapshot.is_some(), "FOR SESSION query must attach session snapshot" ); // Jazz items (1–5) should appear in results (session hint matched metadata). let jazz_count = results .items .iter() .filter(|r| r.entity_id.as_u64() <= 5) .count(); assert!( jazz_count > 0, "at least one jazz item should appear in FOR SESSION results" ); db.close_session(handle).unwrap(); db.close().unwrap(); } // ── Test 3: Per-signal windowed counts in snapshot ─────────────────────────── #[test] fn per_signal_snapshot_shows_windowed_counts() { let db = TidalDb::builder() .ephemeral() .with_schema(test_schema()) .open() .unwrap(); let mut meta = HashMap::new(); meta.insert("title".to_string(), "item-1".to_string()); db.write_item_with_metadata(EntityId::new(1), &meta) .unwrap(); let handle = db .start_session(2, "agent-b", "default_policy", HashMap::new()) .unwrap(); let session_id = handle.id; let ts = Timestamp::now(); // Write 5 "reward" signals. for _ in 0..5 { db.session_signal(&handle, "reward", EntityId::new(1), 1.0, ts, None) .unwrap(); } // Write 3 "view" signals. for _ in 0..3 { db.session_signal(&handle, "view", EntityId::new(1), 0.5, ts, None) .unwrap(); } let snap = db.session_snapshot(session_id).unwrap(); assert!( snap.signals.contains_key("reward"), "reward should appear in signals map" ); assert!( snap.signals.contains_key("view"), "view should appear in signals map" ); let reward = &snap.signals["reward"]; assert_eq!( reward.window_1h, 5, "reward window_1h should count 5 signals" ); assert!( reward.decay_score > 0.0, "reward decay_score should be positive" ); let view = &snap.signals["view"]; assert_eq!(view.window_1h, 3, "view window_1h should count 3 signals"); db.close_session(handle).unwrap(); db.close().unwrap(); } // ── Test 4: Audit truncation marker ───────────────────────────────────────── #[test] fn audit_truncation_marker_set_when_cap_exceeded() { let db = TidalDb::builder() .ephemeral() .with_schema(test_schema()) .open() .unwrap(); let mut meta = HashMap::new(); meta.insert("title".to_string(), "item-1".to_string()); db.write_item_with_metadata(EntityId::new(1), &meta) .unwrap(); // Use a policy with a very large signal cap so it doesn't interfere. let handle = db .start_session(3, "agent-c", "default_policy", HashMap::new()) .unwrap(); let session_id = handle.id; let ts = Timestamp::now(); // Write MAX_AUDIT_ENTRIES + 1 signals (all "reward" which is allowed). for _ in 0..=MAX_AUDIT_ENTRIES { let _ = db.session_signal(&handle, "reward", EntityId::new(1), 1.0, ts, None); } // audit_truncated flag is visible in the live snapshot. let snap = db.session_snapshot(session_id).unwrap(); assert!( snap.audit_truncated, "audit_truncated should be true after exceeding MAX_AUDIT_ENTRIES" ); // session_audit() returns the capped entries (MAX_AUDIT_ENTRIES). let entries = db.session_audit(session_id).unwrap(); assert_eq!( entries.len(), MAX_AUDIT_ENTRIES, "audit log capped at MAX_AUDIT_ENTRIES" ); db.close_session(handle).unwrap(); db.close().unwrap(); } // ── Test 5: Annotation timestamps preserved in snapshot ───────────────────── #[test] fn annotation_timestamps_preserved() { let db = TidalDb::builder() .ephemeral() .with_schema(test_schema()) .open() .unwrap(); let mut meta = HashMap::new(); meta.insert("title".to_string(), "item-1".to_string()); db.write_item_with_metadata(EntityId::new(1), &meta) .unwrap(); let handle = db .start_session(4, "agent-d", "default_policy", HashMap::new()) .unwrap(); let session_id = handle.id; let ts = Timestamp::now(); db.session_signal( &handle, "reward", EntityId::new(1), 1.0, ts, Some("piano solo".to_string()), ) .unwrap(); let snap = db.session_snapshot(session_id).unwrap(); assert_eq!(snap.annotations.len(), 1); let (ann_ts, ann_text) = &snap.annotations[0]; assert!(*ann_ts > 0, "annotation timestamp should be non-zero"); assert_eq!(ann_text, "piano solo"); db.close_session(handle).unwrap(); db.close().unwrap(); } // ── Test 6: Active session state restored after crash ──────────────────────── /// Proves that an active (never-closed) session is restored from the WAL /// journal after a simulated crash. The "crash" is simulated by dropping /// the `TidalDb` without calling `close_session()` — the WAL has a /// `SessionStart` and N `SessionSignal` records but no `SessionClose`, /// so on reopen the session must appear as active with all signals intact. #[test] fn active_session_state_restored_after_crash() { let dir = tempfile::tempdir().unwrap(); let schema = test_schema(); let session_id; { let db = TidalDb::builder() .with_data_dir(dir.path()) .with_schema(schema.clone()) .open() .unwrap(); // Write an item so session signals have a valid target. let mut meta = HashMap::new(); meta.insert("title".to_string(), "item-1".to_string()); db.write_item_with_metadata(EntityId::new(1), &meta) .unwrap(); let handle = db .start_session(42, "agent-crash", "default_policy", HashMap::new()) .unwrap(); session_id = handle.id; let ts = Timestamp::now(); // Write 4 "reward" signals and 3 "view" signals (7 total). for _ in 0..4 { db.session_signal(&handle, "reward", EntityId::new(1), 1.0, ts, None) .unwrap(); } for _ in 0..3 { db.session_signal(&handle, "view", EntityId::new(1), 0.5, ts, None) .unwrap(); } // Verify signals are live before "crash". let snap_before = db.session_snapshot(session_id).unwrap(); assert_eq!(snap_before.signals_written, 7); // Simulate crash: drop the db without calling close_session(). // The Drop impl flushes the WAL, but the session was never archived. drop(db); } // Reopen — the session should be restored as active from the WAL journal. { let db = TidalDb::builder() .with_data_dir(dir.path()) .with_schema(schema) .open() .unwrap(); // The session must appear in active_sessions(). let active = db.active_sessions(); let restored = active .iter() .find(|info| info.id == session_id) .expect("session should be restored as active after crash"); assert_eq!( restored.user_id, 42, "restored session should have the original user_id" ); assert_eq!( restored.signals_written, 7, "all 7 signals should be replayed from WAL" ); // Snapshot must contain per-signal-type data. let snap = db.session_snapshot(session_id).unwrap(); assert_eq!( snap.signals_written, 7, "snapshot signals_written must match total replayed signals" ); let reward = snap .signals .get("reward") .expect("reward signal type should exist in restored snapshot"); assert_eq!( reward.window_1h, 4, "reward window_1h should count 4 replayed signals" ); assert!( reward.decay_score > 0.0, "reward decay_score should be positive after replay" ); let view = snap .signals .get("view") .expect("view signal type should exist in restored snapshot"); assert_eq!( view.window_1h, 3, "view window_1h should count 3 replayed signals" ); assert!( view.decay_score > 0.0, "view decay_score should be positive after replay" ); // Entity 1 should appear in signaled_entities. assert!( snap.signaled_entities.contains(&1), "entity 1 should be in signaled_entities after replay" ); db.close().unwrap(); } } // ── Test 6: Session metadata survives crash recovery ───────────────────────── /// Verifies that `start_session` metadata (e.g. `{"tool": "planner"}`) is /// rehydrated from the storage start record during WAL replay, so that /// `session_snapshot().metadata` is populated after a crash. #[test] fn metadata_survives_crash() { let dir = tempfile::tempdir().unwrap(); let schema = test_schema(); let session_id; { let db = TidalDb::builder() .with_data_dir(dir.path()) .with_schema(schema.clone()) .open() .unwrap(); let mut meta = HashMap::new(); meta.insert("tool".to_string(), "planner".to_string()); meta.insert("context".to_string(), "daily-feed".to_string()); let handle = db .start_session(10, "agent-meta", "default_policy", meta) .unwrap(); session_id = handle.id; // Write a signal so the WAL has a SessionSignal record too. let mut item_meta = HashMap::new(); item_meta.insert("title".to_string(), "item-1".to_string()); db.write_item_with_metadata(EntityId::new(1), &item_meta) .unwrap(); let ts = Timestamp::now(); db.session_signal(&handle, "reward", EntityId::new(1), 1.0, ts, None) .unwrap(); // Simulate crash: drop without close_session(). drop(db); } // Reopen — metadata must be present in the restored active session. { let db = TidalDb::builder() .with_data_dir(dir.path()) .with_schema(schema) .open() .unwrap(); let active = db.active_sessions(); assert!( active.iter().any(|info| info.id == session_id), "session must be restored as active after crash" ); let snap = db.session_snapshot(session_id).unwrap(); assert_eq!( snap.metadata.get("tool").map(String::as_str), Some("planner"), "tool metadata must survive crash recovery" ); assert_eq!( snap.metadata.get("context").map(String::as_str), Some("daily-feed"), "context metadata must survive crash recovery" ); db.close().unwrap(); } } // ── Test 7: Preference annotations survive crash recovery ──────────────────── /// Verifies that session signal annotations ("more jazz today") are replayed /// from the WAL during crash recovery, so that `SessionContext::keywords` is /// populated for FOR SESSION ranking after a restart. #[test] fn annotations_survive_crash() { let dir = tempfile::tempdir().unwrap(); let schema = test_schema(); let session_id; { let db = TidalDb::builder() .with_data_dir(dir.path()) .with_schema(schema.clone()) .open() .unwrap(); let handle = db .start_session(11, "agent-ann", "default_policy", HashMap::new()) .unwrap(); session_id = handle.id; let mut item_meta = HashMap::new(); item_meta.insert("genre".to_string(), "jazz".to_string()); db.write_item_with_metadata(EntityId::new(1), &item_meta) .unwrap(); let ts = Timestamp::now(); // Write two annotated signals. db.session_signal( &handle, "reward", EntityId::new(1), 1.0, ts, Some("more jazz today".to_string()), ) .unwrap(); db.session_signal( &handle, "view", EntityId::new(1), 0.5, ts, Some("acoustic vibes".to_string()), ) .unwrap(); // Simulate crash. drop(db); } // Reopen — annotations must be present so FOR SESSION ranking can use them. { let db = TidalDb::builder() .with_data_dir(dir.path()) .with_schema(schema) .open() .unwrap(); let active = db.active_sessions(); assert!( active.iter().any(|info| info.id == session_id), "session must be restored as active after crash" ); let snap = db.session_snapshot(session_id).unwrap(); assert_eq!( snap.annotations.len(), 2, "both annotations must be replayed from WAL" ); let texts: Vec<&str> = snap.annotations.iter().map(|(_, s)| s.as_str()).collect(); assert!( texts.contains(&"more jazz today"), "jazz annotation must survive crash recovery" ); assert!( texts.contains(&"acoustic vibes"), "acoustic annotation must survive crash recovery" ); db.close().unwrap(); } } // ── Test 8: WAL replay preserves signal counts exactly ─────────────────────── /// Property-like correctness test: write exactly K signals of one type into /// an active session, "crash" (drop without close_session), reopen, and /// verify the replayed count is exactly K — not K-1, not K+1. /// /// This directly tests the acceptance criterion from the roadmap: /// "WAL replay of session signals restores SessionSignalState accumulators /// correctly." #[test] fn wal_replay_restores_signal_counts_exactly() { let dir = tempfile::tempdir().unwrap(); let schema = test_schema(); const K: u64 = 5; let session_id; { let db = TidalDb::builder() .with_data_dir(dir.path()) .with_schema(schema.clone()) .open() .unwrap(); let mut meta = HashMap::new(); meta.insert("title".to_string(), "target".to_string()); db.write_item_with_metadata(EntityId::new(99), &meta) .unwrap(); let handle = db .start_session(7, "agent-replay", "default_policy", HashMap::new()) .unwrap(); session_id = handle.id; let ts = Timestamp::now(); // Write exactly K "reward" signals, all targeting the same entity. for _ in 0..K { db.session_signal(&handle, "reward", EntityId::new(99), 1.0, ts, None) .unwrap(); } // Sanity check before "crash". let snap = db.session_snapshot(session_id).unwrap(); assert_eq!(snap.signals_written, K); assert_eq!(snap.signals["reward"].window_1h, K); // Drop without close_session — session left active in WAL. drop(db); } // Reopen and verify exact replay. { let db = TidalDb::builder() .with_data_dir(dir.path()) .with_schema(schema) .open() .unwrap(); // Session must be active (not closed — no SessionClose in WAL). let active = db.active_sessions(); assert!( active.iter().any(|info| info.id == session_id), "session must be restored as active" ); let snap = db.session_snapshot(session_id).unwrap(); // The critical assertion: exact signal count after WAL replay. assert_eq!( snap.signals_written, K, "total signals_written must be exactly {K} after replay" ); let reward = snap .signals .get("reward") .expect("reward signal type must exist after replay"); assert_eq!( reward.window_1h, K, "reward window_1h must be exactly {K} after replay" ); // Only one entity was signaled. assert_eq!( snap.signaled_entities.len(), 1, "exactly one entity should appear in signaled_entities" ); assert_eq!( snap.signaled_entities[0], 99, "signaled entity should be 99" ); // Decay score should be positive (signals were recent). assert!( reward.decay_score > 0.0, "decay_score must be positive for recently replayed signals" ); db.close().unwrap(); } }