//! Entity state rebuild from durable storage and periodic checkpoint thread. use std::path::PathBuf; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::time::Duration; use crate::cohort::CohortSignalLedger; use crate::query::suggest::SuggestionIndex; use crate::schema::{TidalError, Timestamp}; use crate::signals::{DEFAULT_MAX_SIGNAL_ENTRIES, SignalLedger, trim_cold_entries}; use crate::storage::{StorageEngine, Tag}; use super::metadata::deserialize_metadata; use super::metrics::MetricsState; use super::storage_box::StorageBox; // ── Index health metrics handles ──────────────────────────────────────────── /// Handles to live index instances for periodic metrics refresh. /// /// When the `metrics` feature is enabled, carries `Arc`/clone references to /// the text index, embedding registry, and bitmap indexes so the checkpoint /// thread can read their current sizes without touching `TidalDb` itself. /// /// When the `metrics` feature is disabled, this is a zero-size type so the /// function signature remains unchanged and the compiler eliminates all /// overhead. #[cfg(feature = "metrics")] pub(super) struct IndexMetricsHandles { pub text_index: Option>, pub embedding_registry: Arc>, pub bitmap_category: crate::storage::indexes::bitmap::BitmapIndex, pub bitmap_format: crate::storage::indexes::bitmap::BitmapIndex, pub bitmap_creator: crate::storage::indexes::bitmap::BitmapIndex, pub bitmap_tag: crate::storage::indexes::bitmap::BitmapIndex, } #[cfg(not(feature = "metrics"))] pub(super) struct IndexMetricsHandles; /// Rebuild in-memory entity state from durable storage on restart. /// /// Scans the users keyspace for relationship edges and the items keyspace for /// `creator_id` metadata. Populates: /// 1. `user_state.blocked` from `RelationshipType::Blocks` edges /// 2. `user_state.seen` (hidden items) from `RelationshipType::Hide` edges /// 3. `user_state.follows` from `RelationshipType::Follows` edges /// 4. `creator_items` bitmap from items with `creator_id` metadata /// 5. `interaction_ledger` from `RelationshipType::InteractionWeight` edges /// /// For ephemeral mode, all engines are empty, so this is effectively a no-op. pub(super) fn rebuild_entity_state( storage: &StorageBox, user_state: &crate::entities::UserStateIndex, creator_items: &crate::entities::CreatorItemsBitmap, interaction_ledger: &crate::entities::InteractionLedger, ) -> crate::Result<()> { use crate::entities::relationship::{ RelationshipType, deserialize_relationship_value, parse_relationship_to, }; use crate::storage::keys::parse_key; // Scan the users keyspace for all relationship edges. // The relationship key format is: // [from_entity_id: 8 BE][0x00][Tag::Rel (0x04)][rel_type: 1][to_entity_id: 8 BE] // We scan with an empty prefix to get all keys, then filter for Tag::Rel. let mut rel_count = 0u64; for entry in storage.users_engine().scan_prefix(&[]) { let (key, value) = entry.map_err(TidalError::from)?; // Only process relationship keys (Tag::Rel = 0x04). if let Some((from_id, Tag::Rel, suffix)) = parse_key(&key) { // suffix = [rel_type: 1 byte][to_entity_id: 8 BE] if suffix.is_empty() { continue; } let rel_type_byte = suffix[0]; let Some(rel_type) = RelationshipType::from_byte(rel_type_byte) else { continue; }; let Some(to_id) = parse_relationship_to(&key) else { continue; }; let from_id_u64 = from_id.as_u64(); match rel_type { RelationshipType::Blocks => { user_state.add_block_creator(from_id_u64, to_id.as_u64()); rel_count += 1; } RelationshipType::Hide => { #[allow(clippy::cast_possible_truncation)] user_state.add_hide(from_id_u64, to_id.as_u64() as u32); rel_count += 1; } RelationshipType::Follows => { // Forward: user -> followed creator user_state.add_follow(from_id_u64, to_id.as_u64()); // Reverse: creator -> follower users user_state.add_creator_follower(to_id.as_u64(), from_id_u64); rel_count += 1; } RelationshipType::InteractionWeight => { // Reconstruct interaction weight from the stored edge value. if let Some((weight, ts_nanos)) = deserialize_relationship_value(&value) { interaction_ledger.record(from_id_u64, to_id.as_u64(), weight, ts_nanos); rel_count += 1; } } RelationshipType::Mute => { // Mute edges do not have in-memory state (yet). rel_count += 1; } } } } // Scan items keyspace for creator_id metadata to rebuild creator_items bitmap. let mut item_count = 0u64; let item_scan_start = std::time::Instant::now(); for entry in storage.items_engine().scan_prefix(&[]) { let (key, value) = entry.map_err(TidalError::from)?; if let Some((entity_id, Tag::Meta, _suffix)) = parse_key(&key) { let meta = deserialize_metadata(&value); if let Some(creator_str) = meta.get("creator_id") && let Ok(creator_id) = creator_str.parse::() { #[allow(clippy::cast_possible_truncation)] creator_items.add_item(creator_id, entity_id.as_u64() as u32); item_count += 1; if item_count.is_multiple_of(10_000) { tracing::info!(rebuilt = item_count, "entity state rebuild in progress"); } } } } if item_count > 0 { tracing::info!( rebuilt = item_count, elapsed_ms = item_scan_start.elapsed().as_millis(), "entity state item scan complete" ); } if rel_count > 0 || item_count > 0 { tracing::info!( relationships = rel_count, creator_items = item_count, "entity state rebuilt from durable storage" ); } Ok(()) } /// Rebuild `SuggestionIndex` title terms from durable item metadata on restart. /// /// Scans the items keyspace for `Tag::Meta` keys, deserializes metadata, and /// calls `suggestion_index.index_title(title)` for each item that has a `"title"` /// field. This ensures autocomplete works correctly after a restart without /// requiring all items to be re-written. /// /// For ephemeral mode the engine is empty, so this is a no-op. pub(super) fn rebuild_suggestion_index(storage: &StorageBox, suggestion_index: &SuggestionIndex) { let mut indexed = 0u64; for entry in storage.items_engine().scan_prefix(&[]) { let Ok((key, value)) = entry else { continue }; if let Some((_entity_id, Tag::Meta, _suffix)) = crate::storage::keys::parse_key(&key) { let meta = deserialize_metadata(&value); if let Some(title) = meta.get("title") { suggestion_index.index_title(title); indexed += 1; } } } if indexed > 0 { tracing::info!( items = indexed, "suggestion index rebuilt from durable storage" ); } } /// Background thread body: checkpoint signal state to storage every 30 seconds. /// /// Checkpoints both the global signal ledger and the cohort signal ledger /// atomically (each writes its own `WriteBatch`). The cohort checkpoint uses /// the same storage engine and the same `CheckpointMeta` as the global ledger. /// /// After each successful checkpoint, compacts WAL segments that are fully /// covered by the checkpoint. Compaction failure is non-fatal: a warning is /// logged and the next checkpoint cycle will retry. /// /// Polls the shutdown flag every 500ms so the thread exits promptly when /// `shutdown_inner()` is called. Only runs in persistent mode (ephemeral opens /// never spawn this thread). /// /// The `Arc` arguments are intentionally passed by value: the thread must own /// them for its entire lifetime (references cannot satisfy the `'static` bound /// required by `std::thread::spawn`). #[allow(clippy::needless_pass_by_value, clippy::too_many_arguments)] pub(super) fn run_checkpoint_thread( shutdown: Arc, ledger: Arc, cohort_ledger: Arc, storage: Box, last_wal_seq: Arc, wal_dir: Option, metrics: Arc, index_handles: IndexMetricsHandles, ) { const CHECKPOINT_INTERVAL: Duration = Duration::from_secs(30); /// Index health metrics (Tantivy, `USearch`, bitmap) refresh every 10s -- 3x more /// frequent than checkpoints so operators get near-real-time index visibility. const INDEX_METRICS_INTERVAL: Duration = Duration::from_secs(10); const POLL_INTERVAL: Duration = Duration::from_millis(500); let mut elapsed = Duration::ZERO; let mut index_metrics_elapsed = Duration::ZERO; loop { std::thread::sleep(POLL_INTERVAL); if shutdown.load(Ordering::Acquire) { break; } elapsed += POLL_INTERVAL; index_metrics_elapsed += POLL_INTERVAL; // Refresh index health metrics every 10s (faster than checkpoint). #[cfg(feature = "metrics")] if index_metrics_elapsed >= INDEX_METRICS_INTERVAL { index_metrics_elapsed = Duration::ZERO; refresh_index_metrics(&index_handles, &metrics); } if elapsed >= CHECKPOINT_INTERVAL { elapsed = Duration::ZERO; // Update signal hot entries gauge. #[cfg(feature = "metrics")] { metrics .signal_hot_entries .store(ledger.entries().len() as u64, Ordering::Relaxed); } // (index health metrics refreshed every 10s in the block above) // Trim signal ledger if over the memory budget (5M entries ~5.4 GB). let entry_count = ledger.entries().len(); if entry_count > DEFAULT_MAX_SIGNAL_ENTRIES { tracing::info!( entry_count, max_entries = DEFAULT_MAX_SIGNAL_ENTRIES, "signal ledger exceeds memory budget — trimming cold entries" ); let evicted = trim_cold_entries(ledger.entries(), DEFAULT_MAX_SIGNAL_ENTRIES); tracing::info!( evicted, remaining = ledger.entries().len(), "signal ledger trim complete" ); } let seq = last_wal_seq.load(Ordering::Relaxed); let meta = crate::signals::checkpoint::CheckpointMeta { checkpoint_time_ns: Timestamp::now().as_nanos(), wal_sequence: seq, payload_hash: [0u8; 32], // computed by checkpoint() }; if let Err(e) = ledger.checkpoint(storage.as_ref(), meta) { tracing::error!(error = %e, "periodic signal checkpoint failed"); metrics .checkpoint_failures_total .fetch_add(1, Ordering::Relaxed); } else { tracing::debug!("periodic signal checkpoint written"); // Update checkpoint age metric. #[cfg(feature = "metrics")] { let now_ns = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_nanos() as u64; metrics.last_checkpoint_ns.store(now_ns, Ordering::Relaxed); } // Compact WAL segments covered by the checkpoint. // This runs AFTER the checkpoint is durable, so deleted // segments are guaranteed to be redundant. if let Some(ref dir) = wal_dir && seq > 0 { match crate::wal::compaction::compact_wal(dir, seq) { Ok(result) => { #[cfg(feature = "metrics")] { metrics .wal_compacted_segments_total .fetch_add(result.segments_deleted as u64, Ordering::Relaxed); } let _ = result; // suppress unused warning when metrics disabled } Err(e) => { tracing::warn!(error = %e, "WAL compaction after checkpoint failed"); } } // Update WAL lag bytes: sum remaining segment file sizes. #[cfg(feature = "metrics")] { let lag = compute_wal_lag_bytes(dir); metrics.wal_lag_bytes.store(lag, Ordering::Relaxed); } } } // Checkpoint cohort signal state with the same meta. if cohort_ledger.entry_count() > 0 && let Err(e) = cohort_ledger.checkpoint(storage.as_ref(), meta) { tracing::error!(error = %e, "periodic cohort checkpoint failed"); } } } // Suppress unused-variable warnings when metrics feature is disabled. let _ = &metrics; let _ = &index_handles; } /// Refresh index health metrics from the live index handles. /// /// Called once per checkpoint cycle (~30s). Reads current stats from the /// Tantivy text index, `USearch` embedding registry, and bitmap indexes, then /// stores them into the corresponding `MetricsState` atomic gauges. /// /// All stores use `Relaxed` ordering because these are monitoring gauges -- /// a slightly stale value is acceptable, and no other thread depends on the /// freshness of any individual gauge. #[cfg(feature = "metrics")] fn refresh_index_metrics(handles: &IndexMetricsHandles, metrics: &MetricsState) { // Tantivy text index. if let Some(ref text) = handles.text_index { let (segments, docs) = text.index_stats(); metrics .tantivy_segment_count .store(segments as u64, Ordering::Relaxed); metrics.tantivy_indexed_docs.store(docs, Ordering::Relaxed); } // USearch embedding registry. if let Ok(registry) = handles.embedding_registry.read() { let (vectors, bytes) = registry.index_stats(); metrics .usearch_vector_count .store(vectors, Ordering::Relaxed); metrics .usearch_index_size_bytes .store(bytes, Ordering::Relaxed); } // Bitmap indexes: sum cardinality across all four index types. let cardinality = handles.bitmap_category.total_cardinality() + handles.bitmap_format.total_cardinality() + handles.bitmap_creator.total_cardinality() + handles.bitmap_tag.total_cardinality(); metrics .bitmap_index_cardinality .store(cardinality, Ordering::Relaxed); } /// Sum the file sizes of all remaining WAL segment files in the directory. /// /// Returns 0 if the directory cannot be read or contains no segments. /// Errors on individual file metadata reads are treated as 0 bytes /// (non-fatal: this is a best-effort monitoring metric). #[cfg(feature = "metrics")] fn compute_wal_lag_bytes(wal_dir: &std::path::Path) -> u64 { let Ok(segments) = crate::wal::segment::list_segments(wal_dir) else { return 0; }; segments .iter() .map(|(_, path)| std::fs::metadata(path).map(|m| m.len()).unwrap_or(0)) .sum() }