Implements the foundation of tidalDB's data pipeline: **Phase 1 – Schema primitives** - EntityId newtype (u64, big-endian ordering) - SignalTypeDefinition with pre-computed decay λ, deduped/sorted windows - SchemaBuilder with full constraint validation (duplicates, identifiers, half-life, windows, velocity) - LumenError wrapping all subsystems with required From impls **Phase 2 – Write-Ahead Log** - Length-prefixed, BLAKE3-protected entry format - Group-commit writer (batch up to 100 events / 10 ms) - Double-buffered content-hash deduplication - Checkpoint, truncation, and crash-recovery with full replay - Integration, property, and UAT tests (incl. 5,500-event deterministic UAT) - Proptest coverage scaled to 10 000 events/run (was ≤500) to meet acceptance criterion; cases reduced 100→10 to keep runtime comparable **Phase 3 – Storage engine** - StorageEngine trait (get/put/delete/scan/batch/flush) - Key encoding: [EntityId][0x00][Tag][suffix] with ordering/prefix helpers - InMemoryBackend (BTreeMap + RwLock) - FjallStorage with three isolated keyspaces and atomic batch helper - Property tests for key ordering and round-trip correctness Also adds planning docs for phases 4-5, research docs, architecture overview, and roadmap updates. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
19 KiB
Task 02: Warm-Tier Bucketed Counters
Context
Milestone: 1 -- Signal Engine Phase: m1p4 -- Signal Ledger Depends On: None (uses types from m1p1 but no m1p4 tasks) Blocks: Task 03 (Signal Ledger and Velocity) Complexity: M
Objective
Deliver BucketedCounter, the warm-tier data structure that maintains per-minute and per-hour bucketed event counts for windowed aggregation. A single BucketedCounter instance supports simultaneous 1h, 24h, and 7d window queries by summing the appropriate range of buckets. This follows the Scotty stream-slicing approach where partial aggregates per time slice are shared across all concurrent windows.
The BucketedCounter is the data structure that makes db.read_windowed_count(item_42, "view", Window::TwentyFourHours) work without scanning raw events. A 1h query sums 60 minute buckets. A 24h query sums 24 hour buckets. A 7d query sums 168 hour buckets. An all-time query reads a single atomic counter. No duplicated storage, no SWAG stacks (deferred), no background materializer thread.
Requirements
- Per-minute buckets: 60
AtomicU32counters for the last 60 minutes - Per-hour buckets: 168
AtomicU32counters for the last 168 hours (7 days) - All-time counter: single
AtomicU64for the unbounded total - Current bucket pointer:
AtomicU8for minute index (0..59),AtomicU8for hour index (0..167) increment(): atomically increment the current minute bucket and all-time counterwindowed_count(): sum the appropriate bucket range for a givenWindowrotate_minute(): zero the next minute bucket and advance the pointerrotate_hour(): aggregate the last 60 minute buckets into the current hour bucket, zero the next hour bucket, advance the pointer- All operations are atomic -- no mutex, no
unsafe Send + Sync
Technical Design
Module Structure
tidal/src/signals/
warm.rs -- BucketedCounter, all methods
Public API
// === signals/warm.rs ===
use std::sync::atomic::{AtomicU8, AtomicU32, AtomicU64, Ordering};
use crate::schema::Window;
/// Number of per-minute bucket slots (covers 1 hour).
pub const MINUTE_BUCKETS: usize = 60;
/// Number of per-hour bucket slots (covers 7 days).
pub const HOUR_BUCKETS: usize = 168;
/// Warm-tier bucketed event counter for a single signal type on a single entity.
///
/// Supports simultaneous windowed count queries across 1h, 24h, 7d, 30d, and
/// all-time windows by summing the appropriate range of time-bucketed counters.
///
/// # Design
///
/// Per-minute buckets cover the last 60 minutes. Per-hour buckets cover the
/// last 168 hours (7 days). The all-time counter is unbounded.
///
/// Window queries:
/// 1h = sum of last 60 minute buckets
/// 24h = sum of last 24 hour buckets
/// 7d = sum of last 168 hour buckets
/// 30d = not supported in M1 (requires cold-tier rollups)
/// all = single atomic counter
///
/// Bucket rotation is trigger-based (called by SignalLedger on signal writes
/// when enough time has elapsed), not background-thread-based. This keeps M1
/// simple while being correct.
pub struct BucketedCounter {
/// Per-minute event count buckets. Index 0 is always the "oldest" bucket
/// relative to current_minute. Circular buffer.
minute_buckets: [AtomicU32; MINUTE_BUCKETS],
/// Per-hour event count buckets. Circular buffer.
hour_buckets: [AtomicU32; HOUR_BUCKETS],
/// Current minute bucket index (0..59).
current_minute: AtomicU8,
/// Current hour bucket index (0..167).
current_hour: AtomicU8,
/// All-time total event count.
all_time_count: AtomicU64,
/// Timestamp (nanos) of the last minute rotation.
last_minute_rotation_ns: AtomicU64,
/// Timestamp (nanos) of the last hour rotation.
last_hour_rotation_ns: AtomicU64,
}
impl BucketedCounter {
/// Construct a new counter with all buckets zeroed.
pub fn new() -> Self;
/// Construct with initial rotation timestamps.
pub fn with_start_time(now_ns: u64) -> Self;
/// Increment the current minute bucket and all-time counter by 1.
///
/// Also checks if minute/hour rotation is needed based on `now_ns`.
/// If a rotation is due, it is performed inline (trigger-based).
///
/// Cost: 2 atomic fetch_add + optional rotation.
pub fn increment(&self, now_ns: u64);
/// Increment by a count other than 1 (for batch replay).
pub fn increment_by(&self, count: u32, now_ns: u64);
/// Query the windowed event count for a given window.
///
/// Sums the appropriate circular buffer range:
/// OneHour -> sum last 60 minute buckets
/// TwentyFourHours -> sum last 24 hour buckets
/// SevenDays -> sum last 168 hour buckets
/// ThirtyDays -> NOT SUPPORTED in M1 (returns 0 with tracing::warn)
/// AllTime -> single atomic load
///
/// Cost: O(bucket_count) atomic loads.
pub fn windowed_count(&self, window: Window) -> u64;
/// Read the all-time total event count.
pub fn all_time_count(&self) -> u64;
/// Read the count in the current minute bucket only.
/// Used for fine-grained velocity computation.
pub fn current_minute_count(&self) -> u32;
/// Rotate the minute pointer: zero the next slot, advance `current_minute`.
///
/// Called when at least 60 seconds have elapsed since the last rotation.
/// Returns the count from the expired bucket (for hour aggregation).
pub fn rotate_minute(&self) -> u32;
/// Rotate the hour pointer: set the next hour bucket from aggregated
/// minute data, advance `current_hour`.
///
/// Called when at least 3600 seconds have elapsed since the last rotation.
/// `minute_aggregate` is the sum of the last 60 minute buckets (provided
/// by the caller after summing).
pub fn rotate_hour(&self, minute_aggregate: u32);
/// Snapshot all state for checkpoint serialization.
/// Returns (minute_buckets, hour_buckets, current_minute, current_hour,
/// all_time_count, last_minute_rotation_ns, last_hour_rotation_ns).
pub fn snapshot(&self) -> BucketedCounterSnapshot;
/// Restore from a checkpoint snapshot.
pub fn restore(&self, snapshot: &BucketedCounterSnapshot);
}
/// Serializable snapshot of a BucketedCounter.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BucketedCounterSnapshot {
pub minute_buckets: [u32; MINUTE_BUCKETS],
pub hour_buckets: [u32; HOUR_BUCKETS],
pub current_minute: u8,
pub current_hour: u8,
pub all_time_count: u64,
pub last_minute_rotation_ns: u64,
pub last_hour_rotation_ns: u64,
}
Internal Design
Circular buffer indexing:
The minute buckets form a circular buffer. current_minute points to the slot currently being incremented. On rotation, the pointer advances to the next slot (wrapping at 60), and that slot is zeroed before use.
To query the last N minutes, we read N slots ending at current_minute (inclusive), wrapping backwards through the circular buffer:
fn sum_last_n_minutes(&self, n: usize) -> u64 {
let current = self.current_minute.load(Ordering::Acquire) as usize;
let mut total: u64 = 0;
for i in 0..n {
let idx = (current + MINUTE_BUCKETS - i) % MINUTE_BUCKETS;
total += u64::from(self.minute_buckets[idx].load(Ordering::Relaxed));
}
total
}
The same pattern applies to hour buckets with current_hour and 168 slots.
Relaxed ordering for bucket reads:
Bucket reads use Ordering::Relaxed because windowed counts are inherently approximate -- a query at time T may see a bucket that was incremented at T-1ms or T+1ms due to scheduling. The ranking system does not require exact counts; it requires counts that are correct to within one bucket boundary (60 seconds). Relaxed ordering is safe and avoids unnecessary memory fences on the read path.
Bucket writes (increments) also use Ordering::Relaxed on fetch_add because the only ordering guarantee needed is that the increment is eventually visible, which Relaxed provides.
Trigger-based rotation:
M1 does not have a background materializer thread. Instead, rotation is checked on each increment() call:
pub fn increment(&self, now_ns: u64) {
// Check if minute rotation is needed
let last_minute = self.last_minute_rotation_ns.load(Ordering::Relaxed);
if now_ns >= last_minute + 60_000_000_000 { // 60 seconds in nanos
self.maybe_rotate_minutes(now_ns);
}
// Increment current minute bucket
let idx = self.current_minute.load(Ordering::Acquire) as usize;
self.minute_buckets[idx].fetch_add(1, Ordering::Relaxed);
// Increment all-time counter
self.all_time_count.fetch_add(1, Ordering::Relaxed);
}
The rotation check is cheap (one Relaxed load + comparison). Actual rotation happens at most once per minute per entity. Multiple concurrent callers that detect rotation due may race, but the rotation logic uses CAS on last_minute_rotation_ns to ensure exactly one caller performs the rotation.
30-day window:
Not supported in M1. The 30d window requires cold-tier hourly rollups (720 hour buckets or disk-backed data). For M1, windowed_count(Window::ThirtyDays) returns 0 and emits a tracing::warn!. This is documented in the Window type and the API.
Error Handling
No fallible operations. All methods are infallible. Invalid window variants (ThirtyDays) return 0 with a warning log, not an error.
Test Strategy
Property Tests
use proptest::prelude::*;
// P3: Windowed count equals event count in window (1h window).
proptest! {
#[test]
fn windowed_count_1h_matches_events(
event_times_secs in prop::collection::vec(0u64..7200, 1..500),
query_time_secs in 3600u64..7200,
) {
let counter = BucketedCounter::with_start_time(0);
// Convert to nanoseconds and insert events
for &t_secs in &event_times_secs {
let t_ns = t_secs * 1_000_000_000;
counter.increment(t_ns);
}
// Count events analytically in the 1h window ending at query_time
let query_ns = query_time_secs * 1_000_000_000;
let window_start = query_time_secs.saturating_sub(3600);
let expected = event_times_secs.iter()
.filter(|&&t| t > window_start && t <= query_time_secs)
.count() as u64;
let actual = counter.windowed_count(Window::OneHour);
// Allow +/- 1 bucket boundary tolerance (events at exact boundary)
let tolerance = event_times_secs.iter()
.filter(|&&t| {
let boundary = query_time_secs.saturating_sub(3600);
t == boundary || t == boundary + 1
})
.count() as u64;
prop_assert!(
actual.abs_diff(expected) <= tolerance + 1,
"actual={actual}, expected={expected}, tolerance={tolerance}"
);
}
}
// All-time count equals total event count.
proptest! {
#[test]
fn all_time_count_matches_total(
event_count in 0u64..10_000,
) {
let counter = BucketedCounter::with_start_time(0);
for i in 0..event_count {
let t_ns = i * 1_000_000;
counter.increment(t_ns);
}
prop_assert_eq!(counter.all_time_count(), event_count);
}
}
// Circular buffer wrapping: counts survive full rotation.
proptest! {
#[test]
fn minute_rotation_preserves_total(
events_per_minute in prop::collection::vec(0u32..100, 60..120),
) {
let counter = BucketedCounter::with_start_time(0);
let mut total = 0u64;
for (minute_idx, &count) in events_per_minute.iter().enumerate() {
let base_ns = (minute_idx as u64) * 60_000_000_000;
for j in 0..count {
let t_ns = base_ns + u64::from(j) * 1_000_000;
counter.increment(t_ns);
total += 1;
}
}
prop_assert_eq!(counter.all_time_count(), total);
}
}
Unit Tests
#[test]
fn new_counter_is_zeroed() {
let counter = BucketedCounter::new();
assert_eq!(counter.all_time_count(), 0);
assert_eq!(counter.windowed_count(Window::OneHour), 0);
assert_eq!(counter.windowed_count(Window::TwentyFourHours), 0);
assert_eq!(counter.windowed_count(Window::SevenDays), 0);
assert_eq!(counter.windowed_count(Window::AllTime), 0);
}
#[test]
fn single_increment() {
let counter = BucketedCounter::with_start_time(0);
counter.increment(1_000_000_000); // 1 second
assert_eq!(counter.all_time_count(), 1);
assert_eq!(counter.windowed_count(Window::OneHour), 1);
assert_eq!(counter.windowed_count(Window::AllTime), 1);
}
#[test]
fn multiple_increments_same_minute() {
let counter = BucketedCounter::with_start_time(0);
for i in 0..100 {
counter.increment(i * 100_000_000); // every 100ms for 10 seconds
}
assert_eq!(counter.all_time_count(), 100);
assert_eq!(counter.windowed_count(Window::OneHour), 100);
}
#[test]
fn minute_rotation_zeros_next_bucket() {
let counter = BucketedCounter::with_start_time(0);
// Fill minute 0 with 10 events
for i in 0..10 {
counter.increment(i * 1_000_000_000);
}
assert_eq!(counter.windowed_count(Window::OneHour), 10);
// Advance past minute boundary (61 seconds)
counter.increment(61_000_000_000);
assert_eq!(counter.all_time_count(), 11);
// The 1h window should include both minutes
let count_1h = counter.windowed_count(Window::OneHour);
assert_eq!(count_1h, 11);
}
#[test]
fn events_outside_1h_window_not_counted() {
let counter = BucketedCounter::with_start_time(0);
// Add events at t=0 (ancient)
counter.increment(0);
// Advance time past 1 hour with many rotations
for minute in 1..=70 {
let t_ns = minute * 60_000_000_000u64;
counter.increment(t_ns);
}
// The 1h window should contain 60 events (minutes 11-70), not 71
let count_1h = counter.windowed_count(Window::OneHour);
// The events from minute 0 through minute 10 have rotated out
assert!(count_1h <= 61, "1h count was {count_1h}, expected <= 61");
assert_eq!(counter.all_time_count(), 71);
}
#[test]
fn hour_rotation_aggregates_minutes() {
let counter = BucketedCounter::with_start_time(0);
// Simulate 2 hours of events: 5 per minute
for minute in 0..120 {
let base_ns = minute * 60_000_000_000u64;
for j in 0..5 {
counter.increment(base_ns + j * 1_000_000_000);
}
}
assert_eq!(counter.all_time_count(), 600);
// 24h window should include all events (only 2 hours elapsed)
let count_24h = counter.windowed_count(Window::TwentyFourHours);
assert!(count_24h > 0, "24h window should have events");
}
#[test]
fn all_time_window_reads_atomic_counter() {
let counter = BucketedCounter::with_start_time(0);
for i in 0..1000 {
counter.increment(i * 1_000_000);
}
assert_eq!(counter.windowed_count(Window::AllTime), 1000);
}
#[test]
fn thirty_day_window_returns_zero() {
let counter = BucketedCounter::with_start_time(0);
counter.increment(1_000_000_000);
// ThirtyDays not supported in M1
assert_eq!(counter.windowed_count(Window::ThirtyDays), 0);
}
#[test]
fn snapshot_and_restore_roundtrip() {
let counter = BucketedCounter::with_start_time(0);
for i in 0..50 {
counter.increment(i * 2_000_000_000); // every 2 seconds
}
let snapshot = counter.snapshot();
let restored = BucketedCounter::new();
restored.restore(&snapshot);
assert_eq!(restored.all_time_count(), counter.all_time_count());
assert_eq!(
restored.windowed_count(Window::OneHour),
counter.windowed_count(Window::OneHour)
);
assert_eq!(
restored.windowed_count(Window::AllTime),
counter.windowed_count(Window::AllTime)
);
}
#[test]
fn increment_by_adds_multiple() {
let counter = BucketedCounter::with_start_time(0);
counter.increment_by(42, 1_000_000_000);
assert_eq!(counter.all_time_count(), 42);
assert_eq!(counter.windowed_count(Window::OneHour), 42);
}
Acceptance Criteria
BucketedCounterhas 60 per-minute buckets (AtomicU32) and 168 per-hour buckets (AtomicU32)increment()atomically increments current minute bucket and all-time counterwindowed_count(Window::OneHour)sums last 60 minute bucketswindowed_count(Window::TwentyFourHours)sums last 24 hour bucketswindowed_count(Window::SevenDays)sums last 168 hour bucketswindowed_count(Window::AllTime)returns atomic counter valuewindowed_count(Window::ThirtyDays)returns 0 (not supported in M1)- Trigger-based minute rotation: when 60+ seconds elapsed, next slot is zeroed and pointer advanced
- Trigger-based hour rotation: when 3600+ seconds elapsed, minute aggregate stored in hour bucket
snapshot()andrestore()roundtrip preserves all state- All-time count matches total number of
increment()calls (property tested) - No
unsafecode cargo clippy -- -D warningspasses- All property tests and unit tests pass
Research References
- docs/research/tidaldb_signal_ledger.md -- Section 6 (BucketedCounter design), Section 7 (Scotty stream-slicing: "divide the event stream into non-overlapping time slices, compute partial aggregates per slice, and share these across all concurrent windows")
- Traub, J. et al., "Scotty: General and Efficient Open-Source Window Aggregation," EDBT 2019 -- stream-slicing approach for shared bucket counters
Spec References
- docs/specs/03-signal-system.md -- Section 3 (WarmSignalState with
minute_buckets[60],hour_buckets[168],AtomicU32), Section 6 (windowed aggregation: bucket granularity table, rotation logic, concurrency during rotation), performance targets (Section 12: windowed count 1h ~120ns, 7d ~336ns, all_time ~2ns)
Implementation Notes
AtomicU32is used for minute and hour buckets because a single bucket cannot exceed 2^32 events. At 100,000 events/second (far above tidalDB's target), one minute accumulates 6M events -- well within u32.AtomicU64is used for all-time count because it can exceed u32 over the lifetime of a database.- The
Relaxedordering on bucket reads is justified in the Internal Design section. This is an intentional, documented exception to the general "no Relaxed without justification" rule. BucketedCounteris NOT#[repr(C, align(64))]. It is warm-tier, not hot-tier. Cache-line alignment would waste space for the ~1.8KB struct. The hot-tierHotSignalStateis the only cache-line-aligned struct.- Do NOT implement SWAG two-stacks. Bucketed counters are simpler and sufficient for M1. SWAG is deferred because it provides O(1) amortized aggregation, but our O(60) or O(168) summation is already sub-microsecond.
- Do NOT implement weighted sum buckets (
minute_weight_sums,hour_weight_sumsfrom the spec). M1 only counts events, not weighted sums. Weighted sums are a M2+ concern for signals likecompletion(ratio 0-1) anddwell_time(duration). The spec'sWarmSignalStateincludes them but they are deferred.