11 KiB
Task 02: Signal System + WAL Metrics
Delivers
Atomic counters and gauges for WAL health (lag bytes, compacted segments), checkpoint freshness (age in seconds), signal ledger size (hot entry count), signal write throughput (total writes), and signal write latency (histogram with fixed buckets). All metrics rendered in Prometheus text format via MetricsState::render_prometheus().
Complexity: M
Dependencies
- task-01 complete (QueryStats struct establishes the instrumentation pattern)
tidal/src/db/metrics.rs-- existingMetricsStatewith uptime and health gaugestidal/src/wal/mod.rs--WalHandle, segment managementtidal/src/signals/checkpoint/meta.rs--CheckpointMetawithcheckpoint_time_nstidal/src/signals/mod.rs--SignalLedgerwithentries()DashMap
Technical Design
1. Add atomic counters to MetricsState
In tidal/src/db/metrics.rs, extend MetricsState:
use std::sync::atomic::{AtomicU64, Ordering};
pub struct MetricsState {
// ... existing fields ...
// ── Signal + WAL metrics (m7p4) ────────────────────────────────────
/// Total bytes of WAL segments not yet compacted.
#[cfg(feature = "metrics")]
pub(crate) wal_lag_bytes: AtomicU64,
/// Cumulative count of WAL segments compacted since open.
#[cfg(feature = "metrics")]
pub(crate) wal_compacted_segments_total: AtomicU64,
/// Nanosecond timestamp of the most recent checkpoint. 0 if none.
#[cfg(feature = "metrics")]
pub(crate) last_checkpoint_ns: AtomicU64,
/// Number of entries in the signal ledger hot tier.
#[cfg(feature = "metrics")]
pub(crate) signal_hot_entries: AtomicU64,
/// Total signal writes since open.
#[cfg(feature = "metrics")]
pub(crate) signal_writes_total: AtomicU64,
/// Histogram for signal write latency in microseconds.
/// Fixed buckets: [1, 5, 10, 25, 50, 100, 250, 500, 1000, 5000, 10000].
#[cfg(feature = "metrics")]
pub(crate) signal_write_latency: LatencyHistogram,
}
2. LatencyHistogram
A minimal fixed-bucket histogram that satisfies Prometheus conventions without pulling in a library:
/// Fixed-bucket histogram for Prometheus exposition.
///
/// Buckets are cumulative: each bucket counts observations <= its upper bound.
/// Thread-safe via per-bucket `AtomicU64`. No locks, no allocations on observe.
pub struct LatencyHistogram {
/// Upper bounds of each bucket, in microseconds.
bounds: &'static [u64],
/// Cumulative count for each bucket (index matches `bounds`).
buckets: Vec<AtomicU64>,
/// Count of all observations.
count: AtomicU64,
/// Sum of all observed values (microseconds).
sum: AtomicU64,
}
/// Fixed bucket boundaries for signal write latency.
const WRITE_LATENCY_BOUNDS: &[u64] = &[1, 5, 10, 25, 50, 100, 250, 500, 1000, 5000, 10000];
impl LatencyHistogram {
pub fn new(bounds: &'static [u64]) -> Self {
let buckets = bounds.iter().map(|_| AtomicU64::new(0)).collect();
Self {
bounds,
buckets,
count: AtomicU64::new(0),
sum: AtomicU64::new(0),
}
}
/// Record an observation. O(buckets) -- 11 iterations for write latency.
pub fn observe(&self, value_us: u64) {
for (i, &bound) in self.bounds.iter().enumerate() {
if value_us <= bound {
// Relaxed: histogram accuracy does not require ordering
// guarantees with other fields. Prometheus scrapes are
// inherently approximate.
self.buckets[i].fetch_add(1, Ordering::Relaxed);
}
}
self.count.fetch_add(1, Ordering::Relaxed);
self.sum.fetch_add(value_us, Ordering::Relaxed);
}
/// Render Prometheus histogram lines for the given metric name.
pub fn render_prometheus(&self, name: &str, help: &str) -> String {
// ... format HELP, TYPE histogram, _bucket, _sum, _count lines
}
}
Note: the histogram buckets are cumulative per Prometheus convention. Each observe() increments ALL buckets whose bound >= the observed value. This means buckets[last] always equals count. The observe() loop is only 11 iterations -- negligible overhead.
3. Instrument signal write path
In tidal/src/db/signals.rs (the TidalDb::signal() method):
pub fn signal(&self, signal_type: &str, entity_id: EntityId, weight: f64, ts: Timestamp) -> crate::Result<()> {
#[cfg(feature = "metrics")]
let write_start = std::time::Instant::now();
// ... existing write logic ...
#[cfg(feature = "metrics")]
{
let elapsed_us = write_start.elapsed().as_micros() as u64;
self.metrics.signal_writes_total.fetch_add(1, Ordering::Relaxed);
self.metrics.signal_write_latency.observe(elapsed_us);
}
Ok(())
}
4. Instrument WAL compaction
After WAL compaction in the checkpoint thread (m7p1), update:
#[cfg(feature = "metrics")]
{
self.metrics.wal_compacted_segments_total.fetch_add(compacted_count, Ordering::Relaxed);
}
5. Update checkpoint age on checkpoint write
In the periodic checkpoint callback:
#[cfg(feature = "metrics")]
{
let now_ns = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64;
self.metrics.last_checkpoint_ns.store(now_ns, Ordering::Relaxed);
}
6. Periodically update ledger entry count
In the checkpoint thread or a dedicated metrics-refresh callback:
#[cfg(feature = "metrics")]
if let Some(ledger) = &self.ledger {
self.metrics.signal_hot_entries.store(
ledger.entries().len() as u64,
Ordering::Relaxed,
);
}
7. Render all new metrics in Prometheus format
Extend MetricsState::render_prometheus():
// WAL lag bytes
write_gauge(&mut out, "tidaldb_wal_lag_bytes",
"Bytes of WAL segments not yet compacted",
self.wal_lag_bytes.load(Ordering::Relaxed) as f64);
// WAL compacted segments total
write_counter(&mut out, "tidaldb_wal_compacted_segments_total",
"Total WAL segments compacted since open",
self.wal_compacted_segments_total.load(Ordering::Relaxed) as f64);
// Checkpoint age
let checkpoint_age = if last_cp_ns > 0 { now_ns - last_cp_ns } else { 0 };
write_gauge(&mut out, "tidaldb_checkpoint_age_seconds",
"Seconds since the last successful checkpoint",
checkpoint_age as f64 / 1_000_000_000.0);
// Signal hot entries
write_gauge(&mut out, "tidaldb_signal_hot_entries",
"Number of entries in the signal ledger hot tier",
self.signal_hot_entries.load(Ordering::Relaxed) as f64);
// Signal writes total
write_counter(&mut out, "tidaldb_signal_writes_total",
"Total signal writes since database open",
self.signal_writes_total.load(Ordering::Relaxed) as f64);
// Signal write latency histogram
out.push_str(&self.signal_write_latency.render_prometheus(
"tidaldb_signal_write_latency_us",
"Signal write latency in microseconds",
));
8. Metric names (string literals)
| Metric name | Type | Description |
|---|---|---|
tidaldb_wal_lag_bytes |
gauge | Bytes of WAL segments not yet compacted |
tidaldb_wal_compacted_segments_total |
counter | Total WAL segments compacted since open |
tidaldb_checkpoint_age_seconds |
gauge | Seconds since the last successful checkpoint |
tidaldb_signal_hot_entries |
gauge | Number of entries in the signal ledger hot tier |
tidaldb_signal_writes_total |
counter | Total signal writes since database open |
tidaldb_signal_write_latency_us |
histogram | Signal write latency in microseconds |
Acceptance Criteria
MetricsStateextended with 5 atomic counters + 1 histogram, all#[cfg(feature = "metrics")]LatencyHistogramstruct withobserve()andrender_prometheus()tidaldb_wal_lag_bytesupdated after WAL segment scantidaldb_wal_compacted_segments_totalincremented on compactiontidaldb_checkpoint_age_secondscomputed fromlast_checkpoint_nstidaldb_signal_hot_entriesupdated periodically fromledger.entries().len()tidaldb_signal_writes_totalincremented on everysignal()calltidaldb_signal_write_latency_ushistogram records every signal write duration/metricsendpoint renders all 6 new metrics in valid Prometheus format- All metrics gated behind
#[cfg(feature = "metrics")] - Unit tests for
LatencyHistogram::observe()andrender_prometheus() cargo clippy -D warningsandcargo fmt --checkpass
Test Strategy
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn histogram_observe_increments_correct_buckets() {
let hist = LatencyHistogram::new(WRITE_LATENCY_BOUNDS);
hist.observe(5); // should increment buckets: 5, 10, 25, 50, 100, 250, 500, 1000, 5000, 10000
assert_eq!(hist.buckets[0].load(Ordering::Relaxed), 0); // bucket 1
assert_eq!(hist.buckets[1].load(Ordering::Relaxed), 1); // bucket 5
assert_eq!(hist.buckets[2].load(Ordering::Relaxed), 1); // bucket 10
assert_eq!(hist.count.load(Ordering::Relaxed), 1);
assert_eq!(hist.sum.load(Ordering::Relaxed), 5);
}
#[test]
fn histogram_render_prometheus_valid_format() {
let hist = LatencyHistogram::new(WRITE_LATENCY_BOUNDS);
hist.observe(50);
hist.observe(200);
let output = hist.render_prometheus("test_latency_us", "Test latency");
assert!(output.contains("# HELP test_latency_us"));
assert!(output.contains("# TYPE test_latency_us histogram"));
assert!(output.contains("test_latency_us_bucket{le=\"100\"}"));
assert!(output.contains("test_latency_us_sum"));
assert!(output.contains("test_latency_us_count 2"));
}
#[test]
fn metrics_state_renders_signal_metrics() {
let state = MetricsState::new();
state.signal_writes_total.store(42, Ordering::Relaxed);
let output = state.render_prometheus();
assert!(output.contains("tidaldb_signal_writes_total"));
assert!(output.contains("42"));
}
}
Integration test:
#[test]
fn signal_write_increments_metrics() {
let db = make_test_db_with_schema();
db.signal("view", EntityId::new(1), 1.0, Timestamp::now()).unwrap();
db.signal("view", EntityId::new(2), 1.0, Timestamp::now()).unwrap();
let metrics = db.metrics();
let prom = metrics.render_prometheus();
assert!(prom.contains("tidaldb_signal_writes_total"));
// At least 2 writes recorded
assert!(prom.contains("tidaldb_signal_write_latency_us_count 2")
|| prom.contains("tidaldb_signal_write_latency_us_count"));
}