# Task 02: Warm-Tier Bucketed Counters ## Context **Milestone:** 1 -- Signal Engine **Phase:** m1p4 -- Signal Ledger **Depends On:** None (uses types from m1p1 but no m1p4 tasks) **Blocks:** Task 03 (Signal Ledger and Velocity) **Complexity:** M ## Objective Deliver `BucketedCounter`, the warm-tier data structure that maintains per-minute and per-hour bucketed event counts for windowed aggregation. A single `BucketedCounter` instance supports simultaneous 1h, 24h, and 7d window queries by summing the appropriate range of buckets. This follows the Scotty stream-slicing approach where partial aggregates per time slice are shared across all concurrent windows. The `BucketedCounter` is the data structure that makes `db.read_windowed_count(item_42, "view", Window::TwentyFourHours)` work without scanning raw events. A 1h query sums 60 minute buckets. A 24h query sums 24 hour buckets. A 7d query sums 168 hour buckets. An all-time query reads a single atomic counter. No duplicated storage, no SWAG stacks (deferred), no background materializer thread. ## Requirements - Per-minute buckets: 60 `AtomicU32` counters for the last 60 minutes - Per-hour buckets: 168 `AtomicU32` counters for the last 168 hours (7 days) - All-time counter: single `AtomicU64` for the unbounded total - Current bucket pointer: `AtomicU8` for minute index (0..59), `AtomicU8` for hour index (0..167) - `increment()`: atomically increment the current minute bucket and all-time counter - `windowed_count()`: sum the appropriate bucket range for a given `Window` - `rotate_minute()`: zero the next minute bucket and advance the pointer - `rotate_hour()`: aggregate the last 60 minute buckets into the current hour bucket, zero the next hour bucket, advance the pointer - All operations are atomic -- no mutex, no `unsafe` - `Send + Sync` ## Technical Design ### Module Structure ``` tidal/src/signals/ warm.rs -- BucketedCounter, all methods ``` ### Public API ```rust // === signals/warm.rs === use std::sync::atomic::{AtomicU8, AtomicU32, AtomicU64, Ordering}; use crate::schema::Window; /// Number of per-minute bucket slots (covers 1 hour). pub const MINUTE_BUCKETS: usize = 60; /// Number of per-hour bucket slots (covers 7 days). pub const HOUR_BUCKETS: usize = 168; /// Warm-tier bucketed event counter for a single signal type on a single entity. /// /// Supports simultaneous windowed count queries across 1h, 24h, 7d, 30d, and /// all-time windows by summing the appropriate range of time-bucketed counters. /// /// # Design /// /// Per-minute buckets cover the last 60 minutes. Per-hour buckets cover the /// last 168 hours (7 days). The all-time counter is unbounded. /// /// Window queries: /// 1h = sum of last 60 minute buckets /// 24h = sum of last 24 hour buckets /// 7d = sum of last 168 hour buckets /// 30d = not supported in M1 (requires cold-tier rollups) /// all = single atomic counter /// /// Bucket rotation is trigger-based (called by SignalLedger on signal writes /// when enough time has elapsed), not background-thread-based. This keeps M1 /// simple while being correct. pub struct BucketedCounter { /// Per-minute event count buckets. Index 0 is always the "oldest" bucket /// relative to current_minute. Circular buffer. minute_buckets: [AtomicU32; MINUTE_BUCKETS], /// Per-hour event count buckets. Circular buffer. hour_buckets: [AtomicU32; HOUR_BUCKETS], /// Current minute bucket index (0..59). current_minute: AtomicU8, /// Current hour bucket index (0..167). current_hour: AtomicU8, /// All-time total event count. all_time_count: AtomicU64, /// Timestamp (nanos) of the last minute rotation. last_minute_rotation_ns: AtomicU64, /// Timestamp (nanos) of the last hour rotation. last_hour_rotation_ns: AtomicU64, } impl BucketedCounter { /// Construct a new counter with all buckets zeroed. pub fn new() -> Self; /// Construct with initial rotation timestamps. pub fn with_start_time(now_ns: u64) -> Self; /// Increment the current minute bucket and all-time counter by 1. /// /// Also checks if minute/hour rotation is needed based on `now_ns`. /// If a rotation is due, it is performed inline (trigger-based). /// /// Cost: 2 atomic fetch_add + optional rotation. pub fn increment(&self, now_ns: u64); /// Increment by a count other than 1 (for batch replay). pub fn increment_by(&self, count: u32, now_ns: u64); /// Query the windowed event count for a given window. /// /// Sums the appropriate circular buffer range: /// OneHour -> sum last 60 minute buckets /// TwentyFourHours -> sum last 24 hour buckets /// SevenDays -> sum last 168 hour buckets /// ThirtyDays -> NOT SUPPORTED in M1 (returns 0 with tracing::warn) /// AllTime -> single atomic load /// /// Cost: O(bucket_count) atomic loads. pub fn windowed_count(&self, window: Window) -> u64; /// Read the all-time total event count. pub fn all_time_count(&self) -> u64; /// Read the count in the current minute bucket only. /// Used for fine-grained velocity computation. pub fn current_minute_count(&self) -> u32; /// Rotate the minute pointer: zero the next slot, advance `current_minute`. /// /// Called when at least 60 seconds have elapsed since the last rotation. /// Returns the count from the expired bucket (for hour aggregation). pub fn rotate_minute(&self) -> u32; /// Rotate the hour pointer: set the next hour bucket from aggregated /// minute data, advance `current_hour`. /// /// Called when at least 3600 seconds have elapsed since the last rotation. /// `minute_aggregate` is the sum of the last 60 minute buckets (provided /// by the caller after summing). pub fn rotate_hour(&self, minute_aggregate: u32); /// Snapshot all state for checkpoint serialization. /// Returns (minute_buckets, hour_buckets, current_minute, current_hour, /// all_time_count, last_minute_rotation_ns, last_hour_rotation_ns). pub fn snapshot(&self) -> BucketedCounterSnapshot; /// Restore from a checkpoint snapshot. pub fn restore(&self, snapshot: &BucketedCounterSnapshot); } /// Serializable snapshot of a BucketedCounter. #[derive(Debug, Clone, PartialEq, Eq)] pub struct BucketedCounterSnapshot { pub minute_buckets: [u32; MINUTE_BUCKETS], pub hour_buckets: [u32; HOUR_BUCKETS], pub current_minute: u8, pub current_hour: u8, pub all_time_count: u64, pub last_minute_rotation_ns: u64, pub last_hour_rotation_ns: u64, } ``` ### Internal Design **Circular buffer indexing:** The minute buckets form a circular buffer. `current_minute` points to the slot currently being incremented. On rotation, the pointer advances to the next slot (wrapping at 60), and that slot is zeroed before use. To query the last N minutes, we read N slots ending at `current_minute` (inclusive), wrapping backwards through the circular buffer: ```rust fn sum_last_n_minutes(&self, n: usize) -> u64 { let current = self.current_minute.load(Ordering::Acquire) as usize; let mut total: u64 = 0; for i in 0..n { let idx = (current + MINUTE_BUCKETS - i) % MINUTE_BUCKETS; total += u64::from(self.minute_buckets[idx].load(Ordering::Relaxed)); } total } ``` The same pattern applies to hour buckets with `current_hour` and 168 slots. **Relaxed ordering for bucket reads:** Bucket reads use `Ordering::Relaxed` because windowed counts are inherently approximate -- a query at time T may see a bucket that was incremented at T-1ms or T+1ms due to scheduling. The ranking system does not require exact counts; it requires counts that are correct to within one bucket boundary (60 seconds). Relaxed ordering is safe and avoids unnecessary memory fences on the read path. Bucket writes (increments) also use `Ordering::Relaxed` on `fetch_add` because the only ordering guarantee needed is that the increment is eventually visible, which Relaxed provides. **Trigger-based rotation:** M1 does not have a background materializer thread. Instead, rotation is checked on each `increment()` call: ```rust pub fn increment(&self, now_ns: u64) { // Check if minute rotation is needed let last_minute = self.last_minute_rotation_ns.load(Ordering::Relaxed); if now_ns >= last_minute + 60_000_000_000 { // 60 seconds in nanos self.maybe_rotate_minutes(now_ns); } // Increment current minute bucket let idx = self.current_minute.load(Ordering::Acquire) as usize; self.minute_buckets[idx].fetch_add(1, Ordering::Relaxed); // Increment all-time counter self.all_time_count.fetch_add(1, Ordering::Relaxed); } ``` The rotation check is cheap (one Relaxed load + comparison). Actual rotation happens at most once per minute per entity. Multiple concurrent callers that detect rotation due may race, but the rotation logic uses CAS on `last_minute_rotation_ns` to ensure exactly one caller performs the rotation. **30-day window:** Not supported in M1. The 30d window requires cold-tier hourly rollups (720 hour buckets or disk-backed data). For M1, `windowed_count(Window::ThirtyDays)` returns 0 and emits a `tracing::warn!`. This is documented in the `Window` type and the API. ### Error Handling No fallible operations. All methods are infallible. Invalid window variants (ThirtyDays) return 0 with a warning log, not an error. ## Test Strategy ### Property Tests ```rust use proptest::prelude::*; // P3: Windowed count equals event count in window (1h window). proptest! { #[test] fn windowed_count_1h_matches_events( event_times_secs in prop::collection::vec(0u64..7200, 1..500), query_time_secs in 3600u64..7200, ) { let counter = BucketedCounter::with_start_time(0); // Convert to nanoseconds and insert events for &t_secs in &event_times_secs { let t_ns = t_secs * 1_000_000_000; counter.increment(t_ns); } // Count events analytically in the 1h window ending at query_time let query_ns = query_time_secs * 1_000_000_000; let window_start = query_time_secs.saturating_sub(3600); let expected = event_times_secs.iter() .filter(|&&t| t > window_start && t <= query_time_secs) .count() as u64; let actual = counter.windowed_count(Window::OneHour); // Allow +/- 1 bucket boundary tolerance (events at exact boundary) let tolerance = event_times_secs.iter() .filter(|&&t| { let boundary = query_time_secs.saturating_sub(3600); t == boundary || t == boundary + 1 }) .count() as u64; prop_assert!( actual.abs_diff(expected) <= tolerance + 1, "actual={actual}, expected={expected}, tolerance={tolerance}" ); } } // All-time count equals total event count. proptest! { #[test] fn all_time_count_matches_total( event_count in 0u64..10_000, ) { let counter = BucketedCounter::with_start_time(0); for i in 0..event_count { let t_ns = i * 1_000_000; counter.increment(t_ns); } prop_assert_eq!(counter.all_time_count(), event_count); } } // Circular buffer wrapping: counts survive full rotation. proptest! { #[test] fn minute_rotation_preserves_total( events_per_minute in prop::collection::vec(0u32..100, 60..120), ) { let counter = BucketedCounter::with_start_time(0); let mut total = 0u64; for (minute_idx, &count) in events_per_minute.iter().enumerate() { let base_ns = (minute_idx as u64) * 60_000_000_000; for j in 0..count { let t_ns = base_ns + u64::from(j) * 1_000_000; counter.increment(t_ns); total += 1; } } prop_assert_eq!(counter.all_time_count(), total); } } ``` ### Unit Tests ```rust #[test] fn new_counter_is_zeroed() { let counter = BucketedCounter::new(); assert_eq!(counter.all_time_count(), 0); assert_eq!(counter.windowed_count(Window::OneHour), 0); assert_eq!(counter.windowed_count(Window::TwentyFourHours), 0); assert_eq!(counter.windowed_count(Window::SevenDays), 0); assert_eq!(counter.windowed_count(Window::AllTime), 0); } #[test] fn single_increment() { let counter = BucketedCounter::with_start_time(0); counter.increment(1_000_000_000); // 1 second assert_eq!(counter.all_time_count(), 1); assert_eq!(counter.windowed_count(Window::OneHour), 1); assert_eq!(counter.windowed_count(Window::AllTime), 1); } #[test] fn multiple_increments_same_minute() { let counter = BucketedCounter::with_start_time(0); for i in 0..100 { counter.increment(i * 100_000_000); // every 100ms for 10 seconds } assert_eq!(counter.all_time_count(), 100); assert_eq!(counter.windowed_count(Window::OneHour), 100); } #[test] fn minute_rotation_zeros_next_bucket() { let counter = BucketedCounter::with_start_time(0); // Fill minute 0 with 10 events for i in 0..10 { counter.increment(i * 1_000_000_000); } assert_eq!(counter.windowed_count(Window::OneHour), 10); // Advance past minute boundary (61 seconds) counter.increment(61_000_000_000); assert_eq!(counter.all_time_count(), 11); // The 1h window should include both minutes let count_1h = counter.windowed_count(Window::OneHour); assert_eq!(count_1h, 11); } #[test] fn events_outside_1h_window_not_counted() { let counter = BucketedCounter::with_start_time(0); // Add events at t=0 (ancient) counter.increment(0); // Advance time past 1 hour with many rotations for minute in 1..=70 { let t_ns = minute * 60_000_000_000u64; counter.increment(t_ns); } // The 1h window should contain 60 events (minutes 11-70), not 71 let count_1h = counter.windowed_count(Window::OneHour); // The events from minute 0 through minute 10 have rotated out assert!(count_1h <= 61, "1h count was {count_1h}, expected <= 61"); assert_eq!(counter.all_time_count(), 71); } #[test] fn hour_rotation_aggregates_minutes() { let counter = BucketedCounter::with_start_time(0); // Simulate 2 hours of events: 5 per minute for minute in 0..120 { let base_ns = minute * 60_000_000_000u64; for j in 0..5 { counter.increment(base_ns + j * 1_000_000_000); } } assert_eq!(counter.all_time_count(), 600); // 24h window should include all events (only 2 hours elapsed) let count_24h = counter.windowed_count(Window::TwentyFourHours); assert!(count_24h > 0, "24h window should have events"); } #[test] fn all_time_window_reads_atomic_counter() { let counter = BucketedCounter::with_start_time(0); for i in 0..1000 { counter.increment(i * 1_000_000); } assert_eq!(counter.windowed_count(Window::AllTime), 1000); } #[test] fn thirty_day_window_returns_zero() { let counter = BucketedCounter::with_start_time(0); counter.increment(1_000_000_000); // ThirtyDays not supported in M1 assert_eq!(counter.windowed_count(Window::ThirtyDays), 0); } #[test] fn snapshot_and_restore_roundtrip() { let counter = BucketedCounter::with_start_time(0); for i in 0..50 { counter.increment(i * 2_000_000_000); // every 2 seconds } let snapshot = counter.snapshot(); let restored = BucketedCounter::new(); restored.restore(&snapshot); assert_eq!(restored.all_time_count(), counter.all_time_count()); assert_eq!( restored.windowed_count(Window::OneHour), counter.windowed_count(Window::OneHour) ); assert_eq!( restored.windowed_count(Window::AllTime), counter.windowed_count(Window::AllTime) ); } #[test] fn increment_by_adds_multiple() { let counter = BucketedCounter::with_start_time(0); counter.increment_by(42, 1_000_000_000); assert_eq!(counter.all_time_count(), 42); assert_eq!(counter.windowed_count(Window::OneHour), 42); } ``` ## Acceptance Criteria - [ ] `BucketedCounter` has 60 per-minute buckets (`AtomicU32`) and 168 per-hour buckets (`AtomicU32`) - [ ] `increment()` atomically increments current minute bucket and all-time counter - [ ] `windowed_count(Window::OneHour)` sums last 60 minute buckets - [ ] `windowed_count(Window::TwentyFourHours)` sums last 24 hour buckets - [ ] `windowed_count(Window::SevenDays)` sums last 168 hour buckets - [ ] `windowed_count(Window::AllTime)` returns atomic counter value - [ ] `windowed_count(Window::ThirtyDays)` returns 0 (not supported in M1) - [ ] Trigger-based minute rotation: when 60+ seconds elapsed, next slot is zeroed and pointer advanced - [ ] Trigger-based hour rotation: when 3600+ seconds elapsed, minute aggregate stored in hour bucket - [ ] `snapshot()` and `restore()` roundtrip preserves all state - [ ] All-time count matches total number of `increment()` calls (property tested) - [ ] No `unsafe` code - [ ] `cargo clippy -- -D warnings` passes - [ ] All property tests and unit tests pass ## Research References - [docs/research/tidaldb_signal_ledger.md](../../../research/tidaldb_signal_ledger.md) -- Section 6 (BucketedCounter design), Section 7 (Scotty stream-slicing: "divide the event stream into non-overlapping time slices, compute partial aggregates per slice, and share these across all concurrent windows") - Traub, J. et al., "Scotty: General and Efficient Open-Source Window Aggregation," EDBT 2019 -- stream-slicing approach for shared bucket counters ## Spec References - [docs/specs/03-signal-system.md](../../../specs/03-signal-system.md) -- Section 3 (WarmSignalState with `minute_buckets[60]`, `hour_buckets[168]`, `AtomicU32`), Section 6 (windowed aggregation: bucket granularity table, rotation logic, concurrency during rotation), performance targets (Section 12: windowed count 1h ~120ns, 7d ~336ns, all_time ~2ns) ## Implementation Notes - `AtomicU32` is used for minute and hour buckets because a single bucket cannot exceed 2^32 events. At 100,000 events/second (far above tidalDB's target), one minute accumulates 6M events -- well within u32. - `AtomicU64` is used for all-time count because it can exceed u32 over the lifetime of a database. - The `Relaxed` ordering on bucket reads is justified in the Internal Design section. This is an intentional, documented exception to the general "no Relaxed without justification" rule. - `BucketedCounter` is NOT `#[repr(C, align(64))]`. It is warm-tier, not hot-tier. Cache-line alignment would waste space for the ~1.8KB struct. The hot-tier `HotSignalState` is the only cache-line-aligned struct. - Do NOT implement SWAG two-stacks. Bucketed counters are simpler and sufficient for M1. SWAG is deferred because it provides O(1) amortized aggregation, but our O(60) or O(168) summation is already sub-microsecond. - Do NOT implement weighted sum buckets (`minute_weight_sums`, `hour_weight_sums` from the spec). M1 only counts events, not weighted sums. Weighted sums are a M2+ concern for signals like `completion` (ratio 0-1) and `dwell_time` (duration). The spec's `WarmSignalState` includes them but they are deferred.