tidaldb/docs/planning/milestone-7/phase-3/task-04-signal-state-memory-analysis.md
2026-02-23 22:41:16 -07:00

10 KiB

Task 04: Signal State Memory Analysis + Trimming

Delivers

Precise measurement of per-entity signal state memory footprint at scale. Analysis of total memory consumption for 1M items x 10 signal types x 5 windows. If the footprint exceeds 10 GB, an LRU trimming mechanism that evicts cold entries from the DashMap.

Complexity

L

Dependencies

  • task-01 complete (1M-item TidalDb populated with signal data)
  • docs/research/tidaldb_signal_ledger.md (per-entity memory model)

Technical Design

1. Per-entry memory audit

The EntitySignalEntry contains:

pub struct EntitySignalEntry {
    pub hot: HotSignalState,   // 64 bytes (cache-line aligned)
    pub warm: BucketedCounter, // measured below
}

HotSignalState: Exactly 64 bytes (compile-time assert). Contains 3 AtomicU64 decay scores, 1 AtomicU64 timestamp, entity_id, signal_type_id, flags, padding.

BucketedCounter: Contains:

  • minute_buckets: [AtomicU32; 60] = 240 bytes
  • hour_buckets: [AtomicU32; 168] = 672 bytes
  • current_minute: AtomicU8 = 1 byte (+ alignment padding)
  • current_hour: AtomicU8 = 1 byte (+ alignment padding)
  • all_time_count: AtomicU64 = 8 bytes
  • last_minute_rotation_ns: AtomicU64 = 8 bytes
  • last_hour_rotation_ns: AtomicU64 = 8 bytes
  • Alignment padding to meet AtomicU64 alignment = varies

Total BucketedCounter: ~944 bytes (verify with std::mem::size_of).

Total EntitySignalEntry: ~1,008 bytes per entry (64 + 944).

2. DashMap overhead

DashMap stores entries in sharded hash maps. Per entry, the overhead includes:

  • Hash table slot: ~48 bytes (key + value pointer + hash + metadata)
  • Key: (EntityId, SignalTypeId) = 10 bytes (u64 + u16), padded to 16
  • Allocation overhead: ~16 bytes (allocator metadata)

Estimated per-entry DashMap overhead: ~80 bytes.

Total per-entry cost: ~1,088 bytes.

3. Scale projection

Scale Entries Memory
1M items x 1 signal type 1M ~1.04 GB
1M items x 5 signal types 5M ~5.2 GB
1M items x 10 signal types 10M ~10.4 GB
10M items x 10 signal types 100M ~104 GB

At 1M items x 10 signal types, we are at the 10 GB threshold. This is the trigger for LRU trimming.

4. Measurement implementation

Write a test that measures actual memory, not theoretical:

#[test]
#[ignore]
fn signal_state_memory_audit() {
    use std::mem;

    println!("=== Signal State Memory Audit ===");
    println!("HotSignalState:    {} bytes", mem::size_of::<HotSignalState>());
    println!("BucketedCounter:   {} bytes", mem::size_of::<BucketedCounter>());
    println!("EntitySignalEntry: {} bytes", mem::size_of::<EntitySignalEntry>());
    println!("DashMap key (EntityId, SignalTypeId): {} bytes",
        mem::size_of::<(EntityId, SignalTypeId)>());

    let entry_size = mem::size_of::<EntitySignalEntry>();
    let key_size = mem::size_of::<(EntityId, SignalTypeId)>();
    let overhead_estimate = 80; // DashMap per-entry overhead (hash slot + allocator)
    let total_per_entry = entry_size + key_size + overhead_estimate;

    println!("Estimated total per entry: {} bytes", total_per_entry);

    // Project at scale
    for &(items, signals) in &[
        (100_000u64, 5u64),
        (1_000_000, 5),
        (1_000_000, 10),
        (10_000_000, 10),
    ] {
        let total = items * signals * total_per_entry as u64;
        let gb = total as f64 / (1024.0 * 1024.0 * 1024.0);
        println!("{items} items x {signals} signals = {total} bytes ({gb:.2} GB)");
    }

    // Actual measurement: populate a DashMap with 100K entries and measure RSS
    let ledger = build_measurement_ledger(10); // 10 signal types
    let ts = Timestamp::now();
    for i in 0..100_000u64 {
        for sig in &["view", "like", "share", "skip", "completion",
                     "follow", "save", "comment", "repost", "block"] {
            ledger.record_signal(sig, EntityId::new(i), 1.0, ts).unwrap();
        }
    }

    let entry_count = ledger.entries().len();
    let measured_per_entry = {
        // Use process RSS delta as an approximation
        // (platform-specific; on macOS use mach_task_basic_info)
        let approx_bytes = entry_count as u64 * total_per_entry as u64;
        println!("Actual DashMap entries: {entry_count}");
        println!("Projected memory at 100K items: {approx_bytes} bytes ({:.2} MB)",
            approx_bytes as f64 / (1024.0 * 1024.0));
        total_per_entry
    };

    // Threshold check
    let threshold_gb = 10.0;
    let projected_1m_10sig = 1_000_000u64 * 10 * measured_per_entry as u64;
    let projected_gb = projected_1m_10sig as f64 / (1024.0 * 1024.0 * 1024.0);
    println!("\nProjected 1M x 10 signals: {projected_gb:.2} GB (threshold: {threshold_gb} GB)");

    if projected_gb > threshold_gb {
        println!("WARNING: Exceeds {threshold_gb} GB threshold -- LRU trimming required");
    } else {
        println!("OK: Within {threshold_gb} GB threshold -- LRU trimming not required");
    }
}

5. LRU trimming (conditional implementation)

If the memory threshold is exceeded, implement a background trimmer that evicts cold entries:

// tidal/src/signals/trimmer.rs

use std::sync::atomic::{AtomicU64, Ordering};
use dashmap::DashMap;
use crate::schema::EntityId;
use super::SignalTypeId;
use super::ledger::types::EntitySignalEntry;

/// Maximum number of entries to retain in the signal ledger DashMap.
/// Beyond this, the trimmer evicts the coldest entries (oldest last_update_ns).
pub const DEFAULT_MAX_ENTRIES: usize = 5_000_000; // ~5 GB at ~1KB/entry

/// Trims the signal ledger DashMap by evicting entries with the oldest
/// `last_update_ns` timestamps until the entry count is at or below `max_entries`.
///
/// This is O(N log N) when eviction fires (sort by timestamp, remove bottom M).
/// Called periodically by the checkpoint background thread, not on every write.
pub fn trim_cold_entries(
    entries: &DashMap<(EntityId, SignalTypeId), EntitySignalEntry>,
    max_entries: usize,
) -> usize {
    let len = entries.len();
    if len <= max_entries {
        return 0;
    }

    let to_evict = len - max_entries;

    // Collect (key, last_update_ns) for all entries
    let mut timestamps: Vec<((EntityId, SignalTypeId), u64)> = entries
        .iter()
        .map(|entry| {
            let key = *entry.key();
            let ts = entry.value().hot.last_update_ns();
            (key, ts)
        })
        .collect();

    // Sort by timestamp ascending (oldest first)
    timestamps.sort_unstable_by_key(|&(_, ts)| ts);

    // Evict the oldest entries
    let mut evicted = 0;
    for (key, _) in timestamps.into_iter().take(to_evict) {
        entries.remove(&key);
        evicted += 1;
    }

    tracing::info!(
        evicted,
        remaining = entries.len(),
        "signal ledger LRU trim completed"
    );

    evicted
}

6. Integration with checkpoint cycle

If trimming is needed, call it from the existing periodic checkpoint thread:

// In the checkpoint/background loop:
if ledger.entries().len() > DEFAULT_MAX_ENTRIES {
    let evicted = trim_cold_entries(ledger.entries(), DEFAULT_MAX_ENTRIES);
    tracing::info!(evicted, "signal ledger trimmed cold entries");
}

7. BucketedCounter size reduction (optional optimization)

If the memory budget is tight, consider reducing HOUR_BUCKETS from 168 (7 days) to 24 (1 day) for signal types that only use 24h or shorter windows. This saves 576 bytes per entry (144 fewer AtomicU32 slots):

// Compact variant for signals with only 1h and 24h windows
pub struct CompactBucketedCounter {
    minute_buckets: [AtomicU32; 60],  // 240 bytes
    hour_buckets: [AtomicU32; 24],    // 96 bytes
    // ... same fields, total ~368 bytes vs ~944 bytes
}

This is a deeper refactor and should only be pursued if trimming alone is insufficient.

Acceptance Criteria

  • std::mem::size_of for HotSignalState, BucketedCounter, EntitySignalEntry documented
  • Memory projection table for 100K, 1M, 10M items x 5 and 10 signal types
  • Actual DashMap population at 100K entries measured (RSS or calculated)
  • If projected 1M x 10 signals > 10 GB: trim_cold_entries implemented and tested
  • If projected 1M x 10 signals <= 10 GB: document the finding with margin analysis
  • Trimming correctness: after trim, entries.len() <= max_entries
  • Trimming fairness: evicted entries have the oldest last_update_ns timestamps
  • Results documented in docs/profiling/signal-memory-analysis.md

Test Strategy

  1. Size assertion test:
#[test]
fn entity_signal_entry_size_documented() {
    assert_eq!(std::mem::size_of::<HotSignalState>(), 64);
    // Document actual BucketedCounter size (do not hard-code until measured)
    let bc_size = std::mem::size_of::<BucketedCounter>();
    assert!(bc_size < 1200, "BucketedCounter size {bc_size} unexpectedly large");
    println!("BucketedCounter: {bc_size} bytes");
    println!("EntitySignalEntry: {} bytes", std::mem::size_of::<EntitySignalEntry>());
}
  1. Trimmer correctness (if implemented):
#[test]
fn trim_cold_entries_evicts_oldest() {
    let entries = DashMap::new();
    // Insert 100 entries with ascending timestamps
    for i in 0..100u64 {
        let key = (EntityId::new(i), SignalTypeId::new(0));
        let entry = EntitySignalEntry { /* ... */ };
        entry.hot.restore(i * 1_000_000_000, &[0.0]); // timestamp = i seconds
        entries.insert(key, entry);
    }

    let evicted = trim_cold_entries(&entries, 80);
    assert_eq!(evicted, 20);
    assert_eq!(entries.len(), 80);

    // Verify the 20 oldest (timestamps 0-19) were evicted
    for i in 0..20u64 {
        assert!(entries.get(&(EntityId::new(i), SignalTypeId::new(0))).is_none(),
            "Entry {i} should have been evicted");
    }
    // Verify the 80 newest (timestamps 20-99) survive
    for i in 20..100u64 {
        assert!(entries.get(&(EntityId::new(i), SignalTypeId::new(0))).is_some(),
            "Entry {i} should have survived");
    }
}
  1. Property test (if implemented):
proptest! {
    #[test]
    fn trim_never_exceeds_max(
        entry_count in 100usize..10_000,
        max_entries in 50usize..5_000,
    ) {
        let entries = DashMap::new();
        for i in 0..entry_count as u64 {
            let key = (EntityId::new(i), SignalTypeId::new(0));
            let entry = make_test_entry(i);
            entries.insert(key, entry);
        }
        trim_cold_entries(&entries, max_entries);
        prop_assert!(entries.len() <= max_entries);
    }
}