tidaldb/docs/planning/milestone-7/phase-4/task-02-signal-wal-metrics.md
2026-02-23 22:41:16 -07:00

11 KiB

Task 02: Signal System + WAL Metrics

Delivers

Atomic counters and gauges for WAL health (lag bytes, compacted segments), checkpoint freshness (age in seconds), signal ledger size (hot entry count), signal write throughput (total writes), and signal write latency (histogram with fixed buckets). All metrics rendered in Prometheus text format via MetricsState::render_prometheus().

Complexity: M

Dependencies

  • task-01 complete (QueryStats struct establishes the instrumentation pattern)
  • tidal/src/db/metrics.rs -- existing MetricsState with uptime and health gauges
  • tidal/src/wal/mod.rs -- WalHandle, segment management
  • tidal/src/signals/checkpoint/meta.rs -- CheckpointMeta with checkpoint_time_ns
  • tidal/src/signals/mod.rs -- SignalLedger with entries() DashMap

Technical Design

1. Add atomic counters to MetricsState

In tidal/src/db/metrics.rs, extend MetricsState:

use std::sync::atomic::{AtomicU64, Ordering};

pub struct MetricsState {
    // ... existing fields ...

    // ── Signal + WAL metrics (m7p4) ────────────────────────────────────
    /// Total bytes of WAL segments not yet compacted.
    #[cfg(feature = "metrics")]
    pub(crate) wal_lag_bytes: AtomicU64,
    /// Cumulative count of WAL segments compacted since open.
    #[cfg(feature = "metrics")]
    pub(crate) wal_compacted_segments_total: AtomicU64,
    /// Nanosecond timestamp of the most recent checkpoint. 0 if none.
    #[cfg(feature = "metrics")]
    pub(crate) last_checkpoint_ns: AtomicU64,
    /// Number of entries in the signal ledger hot tier.
    #[cfg(feature = "metrics")]
    pub(crate) signal_hot_entries: AtomicU64,
    /// Total signal writes since open.
    #[cfg(feature = "metrics")]
    pub(crate) signal_writes_total: AtomicU64,
    /// Histogram for signal write latency in microseconds.
    /// Fixed buckets: [1, 5, 10, 25, 50, 100, 250, 500, 1000, 5000, 10000].
    #[cfg(feature = "metrics")]
    pub(crate) signal_write_latency: LatencyHistogram,
}

2. LatencyHistogram

A minimal fixed-bucket histogram that satisfies Prometheus conventions without pulling in a library:

/// Fixed-bucket histogram for Prometheus exposition.
///
/// Buckets are cumulative: each bucket counts observations <= its upper bound.
/// Thread-safe via per-bucket `AtomicU64`. No locks, no allocations on observe.
pub struct LatencyHistogram {
    /// Upper bounds of each bucket, in microseconds.
    bounds: &'static [u64],
    /// Cumulative count for each bucket (index matches `bounds`).
    buckets: Vec<AtomicU64>,
    /// Count of all observations.
    count: AtomicU64,
    /// Sum of all observed values (microseconds).
    sum: AtomicU64,
}

/// Fixed bucket boundaries for signal write latency.
const WRITE_LATENCY_BOUNDS: &[u64] = &[1, 5, 10, 25, 50, 100, 250, 500, 1000, 5000, 10000];

impl LatencyHistogram {
    pub fn new(bounds: &'static [u64]) -> Self {
        let buckets = bounds.iter().map(|_| AtomicU64::new(0)).collect();
        Self {
            bounds,
            buckets,
            count: AtomicU64::new(0),
            sum: AtomicU64::new(0),
        }
    }

    /// Record an observation. O(buckets) -- 11 iterations for write latency.
    pub fn observe(&self, value_us: u64) {
        for (i, &bound) in self.bounds.iter().enumerate() {
            if value_us <= bound {
                // Relaxed: histogram accuracy does not require ordering
                // guarantees with other fields. Prometheus scrapes are
                // inherently approximate.
                self.buckets[i].fetch_add(1, Ordering::Relaxed);
            }
        }
        self.count.fetch_add(1, Ordering::Relaxed);
        self.sum.fetch_add(value_us, Ordering::Relaxed);
    }

    /// Render Prometheus histogram lines for the given metric name.
    pub fn render_prometheus(&self, name: &str, help: &str) -> String {
        // ... format HELP, TYPE histogram, _bucket, _sum, _count lines
    }
}

Note: the histogram buckets are cumulative per Prometheus convention. Each observe() increments ALL buckets whose bound >= the observed value. This means buckets[last] always equals count. The observe() loop is only 11 iterations -- negligible overhead.

3. Instrument signal write path

In tidal/src/db/signals.rs (the TidalDb::signal() method):

pub fn signal(&self, signal_type: &str, entity_id: EntityId, weight: f64, ts: Timestamp) -> crate::Result<()> {
    #[cfg(feature = "metrics")]
    let write_start = std::time::Instant::now();

    // ... existing write logic ...

    #[cfg(feature = "metrics")]
    {
        let elapsed_us = write_start.elapsed().as_micros() as u64;
        self.metrics.signal_writes_total.fetch_add(1, Ordering::Relaxed);
        self.metrics.signal_write_latency.observe(elapsed_us);
    }

    Ok(())
}

4. Instrument WAL compaction

After WAL compaction in the checkpoint thread (m7p1), update:

#[cfg(feature = "metrics")]
{
    self.metrics.wal_compacted_segments_total.fetch_add(compacted_count, Ordering::Relaxed);
}

5. Update checkpoint age on checkpoint write

In the periodic checkpoint callback:

#[cfg(feature = "metrics")]
{
    let now_ns = std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .unwrap_or_default()
        .as_nanos() as u64;
    self.metrics.last_checkpoint_ns.store(now_ns, Ordering::Relaxed);
}

6. Periodically update ledger entry count

In the checkpoint thread or a dedicated metrics-refresh callback:

#[cfg(feature = "metrics")]
if let Some(ledger) = &self.ledger {
    self.metrics.signal_hot_entries.store(
        ledger.entries().len() as u64,
        Ordering::Relaxed,
    );
}

7. Render all new metrics in Prometheus format

Extend MetricsState::render_prometheus():

// WAL lag bytes
write_gauge(&mut out, "tidaldb_wal_lag_bytes",
    "Bytes of WAL segments not yet compacted",
    self.wal_lag_bytes.load(Ordering::Relaxed) as f64);

// WAL compacted segments total
write_counter(&mut out, "tidaldb_wal_compacted_segments_total",
    "Total WAL segments compacted since open",
    self.wal_compacted_segments_total.load(Ordering::Relaxed) as f64);

// Checkpoint age
let checkpoint_age = if last_cp_ns > 0 { now_ns - last_cp_ns } else { 0 };
write_gauge(&mut out, "tidaldb_checkpoint_age_seconds",
    "Seconds since the last successful checkpoint",
    checkpoint_age as f64 / 1_000_000_000.0);

// Signal hot entries
write_gauge(&mut out, "tidaldb_signal_hot_entries",
    "Number of entries in the signal ledger hot tier",
    self.signal_hot_entries.load(Ordering::Relaxed) as f64);

// Signal writes total
write_counter(&mut out, "tidaldb_signal_writes_total",
    "Total signal writes since database open",
    self.signal_writes_total.load(Ordering::Relaxed) as f64);

// Signal write latency histogram
out.push_str(&self.signal_write_latency.render_prometheus(
    "tidaldb_signal_write_latency_us",
    "Signal write latency in microseconds",
));

8. Metric names (string literals)

Metric name Type Description
tidaldb_wal_lag_bytes gauge Bytes of WAL segments not yet compacted
tidaldb_wal_compacted_segments_total counter Total WAL segments compacted since open
tidaldb_checkpoint_age_seconds gauge Seconds since the last successful checkpoint
tidaldb_signal_hot_entries gauge Number of entries in the signal ledger hot tier
tidaldb_signal_writes_total counter Total signal writes since database open
tidaldb_signal_write_latency_us histogram Signal write latency in microseconds

Acceptance Criteria

  • MetricsState extended with 5 atomic counters + 1 histogram, all #[cfg(feature = "metrics")]
  • LatencyHistogram struct with observe() and render_prometheus()
  • tidaldb_wal_lag_bytes updated after WAL segment scan
  • tidaldb_wal_compacted_segments_total incremented on compaction
  • tidaldb_checkpoint_age_seconds computed from last_checkpoint_ns
  • tidaldb_signal_hot_entries updated periodically from ledger.entries().len()
  • tidaldb_signal_writes_total incremented on every signal() call
  • tidaldb_signal_write_latency_us histogram records every signal write duration
  • /metrics endpoint renders all 6 new metrics in valid Prometheus format
  • All metrics gated behind #[cfg(feature = "metrics")]
  • Unit tests for LatencyHistogram::observe() and render_prometheus()
  • cargo clippy -D warnings and cargo fmt --check pass

Test Strategy

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn histogram_observe_increments_correct_buckets() {
        let hist = LatencyHistogram::new(WRITE_LATENCY_BOUNDS);
        hist.observe(5); // should increment buckets: 5, 10, 25, 50, 100, 250, 500, 1000, 5000, 10000
        assert_eq!(hist.buckets[0].load(Ordering::Relaxed), 0); // bucket 1
        assert_eq!(hist.buckets[1].load(Ordering::Relaxed), 1); // bucket 5
        assert_eq!(hist.buckets[2].load(Ordering::Relaxed), 1); // bucket 10
        assert_eq!(hist.count.load(Ordering::Relaxed), 1);
        assert_eq!(hist.sum.load(Ordering::Relaxed), 5);
    }

    #[test]
    fn histogram_render_prometheus_valid_format() {
        let hist = LatencyHistogram::new(WRITE_LATENCY_BOUNDS);
        hist.observe(50);
        hist.observe(200);
        let output = hist.render_prometheus("test_latency_us", "Test latency");
        assert!(output.contains("# HELP test_latency_us"));
        assert!(output.contains("# TYPE test_latency_us histogram"));
        assert!(output.contains("test_latency_us_bucket{le=\"100\"}"));
        assert!(output.contains("test_latency_us_sum"));
        assert!(output.contains("test_latency_us_count 2"));
    }

    #[test]
    fn metrics_state_renders_signal_metrics() {
        let state = MetricsState::new();
        state.signal_writes_total.store(42, Ordering::Relaxed);
        let output = state.render_prometheus();
        assert!(output.contains("tidaldb_signal_writes_total"));
        assert!(output.contains("42"));
    }
}

Integration test:

#[test]
fn signal_write_increments_metrics() {
    let db = make_test_db_with_schema();
    db.signal("view", EntityId::new(1), 1.0, Timestamp::now()).unwrap();
    db.signal("view", EntityId::new(2), 1.0, Timestamp::now()).unwrap();

    let metrics = db.metrics();
    let prom = metrics.render_prometheus();
    assert!(prom.contains("tidaldb_signal_writes_total"));
    // At least 2 writes recorded
    assert!(prom.contains("tidaldb_signal_write_latency_us_count 2")
        || prom.contains("tidaldb_signal_write_latency_us_count"));
}