15 KiB
15 KiB
Task 01: DegradationLevel Enum + Load Detector
Delivers
DegradationLevel enum representing the four quality stages. LoadDetector struct with an AtomicU64 in-flight counter, configurable thresholds, and an RAII InFlightGuard that decrements on drop. DegradationThresholds config struct with defaults matching the spec.
Complexity: M
Dependencies
- None from prior m7p2 tasks (this is the foundation)
- m7p1
MetricsState(for gauge reporting -- optional integration, not blocking)
Technical Design
1. Create the load module
Create tidal/src/load/mod.rs and tidal/src/load/detector.rs. Add pub mod load; to tidal/src/lib.rs.
2. DegradationLevel enum
// tidal/src/load/mod.rs
pub mod detector;
pub mod rate_limiter;
pub use detector::{DegradationLevel, DegradationThresholds, InFlightGuard, LoadDetector};
pub use rate_limiter::RateLimiter;
// tidal/src/load/detector.rs
use std::sync::atomic::{AtomicU64, Ordering};
/// The four degradation stages, ordered from best to worst quality.
///
/// Each level corresponds to a specific set of optimizations that reduce
/// per-query work. The level is computed once at query entry from the
/// in-flight counter and threaded through the executor -- it is NOT
/// re-checked mid-pipeline to avoid inconsistent behavior within a
/// single query.
///
/// Ordering: Full > ReducedCandidates > CoarseAggregates > NoDiversity
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[repr(u8)]
pub enum DegradationLevel {
/// Normal operation. All stages run at full fidelity.
Full = 0,
/// ANN top_k reduced (500 -> 200), BM25 candidate limit halved.
/// Signal scoring and diversity remain unchanged.
ReducedCandidates = 1,
/// In addition to ReducedCandidates: windowed count reads fall back
/// to AllTime; velocity reads fall back to 24h window.
CoarseAggregates = 2,
/// In addition to CoarseAggregates: diversity enforcement skipped entirely.
/// This is the last resort before rejecting queries.
NoDiversity = 3,
}
impl DegradationLevel {
/// Whether this level reduces candidate generation work.
#[must_use]
pub const fn reduces_candidates(self) -> bool {
matches!(
self,
Self::ReducedCandidates | Self::CoarseAggregates | Self::NoDiversity
)
}
/// Whether this level coarsens signal aggregation reads.
#[must_use]
pub const fn coarsens_aggregates(self) -> bool {
matches!(self, Self::CoarseAggregates | Self::NoDiversity)
}
/// Whether this level skips diversity enforcement.
#[must_use]
pub const fn skips_diversity(self) -> bool {
matches!(self, Self::NoDiversity)
}
}
impl std::fmt::Display for DegradationLevel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Full => f.write_str("full"),
Self::ReducedCandidates => f.write_str("reduced_candidates"),
Self::CoarseAggregates => f.write_str("coarse_aggregates"),
Self::NoDiversity => f.write_str("no_diversity"),
}
}
}
3. DegradationThresholds config
/// Configurable thresholds for degradation level transitions.
///
/// The load detector computes the degradation level by comparing the
/// current in-flight query count against these thresholds. The thresholds
/// are inclusive: at exactly `reduced_candidates` in-flight queries, the
/// level transitions from `Full` to `ReducedCandidates`.
///
/// Invariant: `reduced_candidates < coarse_aggregates < no_diversity`.
/// Enforced at construction time.
#[derive(Debug, Clone, Copy)]
pub struct DegradationThresholds {
/// In-flight count at which candidate generation is reduced.
pub reduced_candidates: u64,
/// In-flight count at which signal aggregation is coarsened.
pub coarse_aggregates: u64,
/// In-flight count at which diversity is disabled.
pub no_diversity: u64,
}
impl Default for DegradationThresholds {
fn default() -> Self {
Self {
reduced_candidates: 200,
coarse_aggregates: 500,
no_diversity: 1000,
}
}
}
impl DegradationThresholds {
/// Create new thresholds, validating the ordering invariant.
///
/// # Errors
///
/// Returns `Err` if the thresholds are not strictly increasing.
pub fn new(
reduced_candidates: u64,
coarse_aggregates: u64,
no_diversity: u64,
) -> Result<Self, String> {
if reduced_candidates == 0 {
return Err("reduced_candidates must be > 0".to_string());
}
if reduced_candidates >= coarse_aggregates {
return Err(format!(
"reduced_candidates ({reduced_candidates}) must be < coarse_aggregates ({coarse_aggregates})"
));
}
if coarse_aggregates >= no_diversity {
return Err(format!(
"coarse_aggregates ({coarse_aggregates}) must be < no_diversity ({no_diversity})"
));
}
Ok(Self {
reduced_candidates,
coarse_aggregates,
no_diversity,
})
}
}
4. LoadDetector
/// Tracks in-flight query count and computes degradation level.
///
/// Thread-safe via `AtomicU64`. The counter is incremented via `enter()`
/// which returns an `InFlightGuard` that decrements on drop. This RAII
/// pattern guarantees the counter is always consistent, even if the
/// query executor panics.
///
/// The `DegradationLevel` is sampled once at `enter()` time and returned
/// alongside the guard. The executor must NOT re-read the level mid-query
/// to avoid inconsistent behavior within a single request.
pub struct LoadDetector {
in_flight: AtomicU64,
thresholds: DegradationThresholds,
}
impl LoadDetector {
/// Create a new detector with the given thresholds.
#[must_use]
pub const fn new(thresholds: DegradationThresholds) -> Self {
Self {
in_flight: AtomicU64::new(0),
thresholds,
}
}
/// Enter a query. Increments the in-flight counter and returns:
/// - The `DegradationLevel` computed from the NEW counter value
/// - An `InFlightGuard` that decrements the counter on drop
///
/// The level is computed AFTER incrementing so that the Nth query
/// sees itself in the count. This is the conservative choice:
/// it triggers degradation one query earlier than checking before
/// incrementing.
#[must_use]
pub fn enter(&self) -> (DegradationLevel, InFlightGuard<'_>) {
// Relaxed is sufficient: we only need atomicity, not
// happens-before ordering with any other memory location.
// The in-flight counter is a statistical gauge, not a
// synchronization primitive.
let current = self.in_flight.fetch_add(1, Ordering::Relaxed) + 1;
let level = self.level_for(current);
(level, InFlightGuard { detector: self })
}
/// Read the current in-flight count without modifying it.
///
/// Useful for metrics reporting and health checks.
#[must_use]
pub fn in_flight(&self) -> u64 {
self.in_flight.load(Ordering::Relaxed)
}
/// Compute the degradation level for a given in-flight count.
#[must_use]
fn level_for(&self, count: u64) -> DegradationLevel {
if count >= self.thresholds.no_diversity {
DegradationLevel::NoDiversity
} else if count >= self.thresholds.coarse_aggregates {
DegradationLevel::CoarseAggregates
} else if count >= self.thresholds.reduced_candidates {
DegradationLevel::ReducedCandidates
} else {
DegradationLevel::Full
}
}
}
5. InFlightGuard (RAII decrement)
/// RAII guard that decrements the `LoadDetector` in-flight counter on drop.
///
/// Created by `LoadDetector::enter()`. Must not be forgotten (`mem::forget`
/// would leak the count). In practice this is held on the stack of the
/// `TidalDb::retrieve()` / `TidalDb::search()` methods and drops when
/// the method returns (success or error).
pub struct InFlightGuard<'a> {
detector: &'a LoadDetector,
}
impl Drop for InFlightGuard<'_> {
fn drop(&mut self) {
// Relaxed: same reasoning as in enter(). The decrement only needs
// to be atomic, not ordered with respect to other memory.
self.detector.in_flight.fetch_sub(1, Ordering::Relaxed);
}
}
6. Wire into TidalDb
Add the LoadDetector as a field on TidalDb:
// In tidal/src/db/mod.rs, add to TidalDb struct:
load_detector: Arc<crate::load::LoadDetector>,
// In from_config() and from_parts(), initialize:
load_detector: Arc::new(crate::load::LoadDetector::new(
crate::load::DegradationThresholds::default(),
)),
Expose a public accessor so that the Config can override thresholds in a future task:
impl TidalDb {
/// Access the load detector for metrics and health check reporting.
#[must_use]
pub fn load_detector(&self) -> &crate::load::LoadDetector {
&self.load_detector
}
}
Acceptance Criteria
DegradationLevelenum with 4 variants andu8reprDegradationLevelderivesDebug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, HashDegradationLevel::Displayproduces lowercase snake_case strings- Helper methods:
reduces_candidates(),coarsens_aggregates(),skips_diversity() DegradationThresholdswith default (200, 500, 1000) and validated constructorLoadDetector::enter()returns(DegradationLevel, InFlightGuard)atomicallyInFlightGuarddecrements counter on drop (verified by test)LoadDetector::in_flight()returns current gauge valueLoadDetectorwired intoTidalDbbehindArcpub mod load;added tolib.rs- All tests pass:
cargo test --manifest-path tidal/Cargo.toml cargo clippy -D warningsclean
Test Strategy
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
#[test]
fn default_thresholds() {
let t = DegradationThresholds::default();
assert_eq!(t.reduced_candidates, 200);
assert_eq!(t.coarse_aggregates, 500);
assert_eq!(t.no_diversity, 1000);
}
#[test]
fn thresholds_validation_rejects_bad_order() {
assert!(DegradationThresholds::new(500, 200, 1000).is_err());
assert!(DegradationThresholds::new(200, 1000, 500).is_err());
assert!(DegradationThresholds::new(0, 500, 1000).is_err());
}
#[test]
fn thresholds_validation_accepts_good_order() {
assert!(DegradationThresholds::new(100, 300, 600).is_ok());
}
#[test]
fn level_display() {
assert_eq!(DegradationLevel::Full.to_string(), "full");
assert_eq!(
DegradationLevel::ReducedCandidates.to_string(),
"reduced_candidates"
);
assert_eq!(
DegradationLevel::CoarseAggregates.to_string(),
"coarse_aggregates"
);
assert_eq!(DegradationLevel::NoDiversity.to_string(), "no_diversity");
}
#[test]
fn level_ordering() {
assert!(DegradationLevel::Full < DegradationLevel::ReducedCandidates);
assert!(DegradationLevel::ReducedCandidates < DegradationLevel::CoarseAggregates);
assert!(DegradationLevel::CoarseAggregates < DegradationLevel::NoDiversity);
}
#[test]
fn level_helper_methods() {
assert!(!DegradationLevel::Full.reduces_candidates());
assert!(DegradationLevel::ReducedCandidates.reduces_candidates());
assert!(DegradationLevel::CoarseAggregates.reduces_candidates());
assert!(DegradationLevel::NoDiversity.reduces_candidates());
assert!(!DegradationLevel::Full.coarsens_aggregates());
assert!(!DegradationLevel::ReducedCandidates.coarsens_aggregates());
assert!(DegradationLevel::CoarseAggregates.coarsens_aggregates());
assert!(DegradationLevel::NoDiversity.coarsens_aggregates());
assert!(!DegradationLevel::Full.skips_diversity());
assert!(!DegradationLevel::ReducedCandidates.skips_diversity());
assert!(!DegradationLevel::CoarseAggregates.skips_diversity());
assert!(DegradationLevel::NoDiversity.skips_diversity());
}
#[test]
fn detector_starts_at_zero() {
let d = LoadDetector::new(DegradationThresholds::default());
assert_eq!(d.in_flight(), 0);
}
#[test]
fn enter_increments_and_guard_decrements() {
let d = LoadDetector::new(DegradationThresholds::default());
assert_eq!(d.in_flight(), 0);
let (level, guard) = d.enter();
assert_eq!(level, DegradationLevel::Full);
assert_eq!(d.in_flight(), 1);
drop(guard);
assert_eq!(d.in_flight(), 0);
}
#[test]
fn degradation_level_transitions() {
let thresholds = DegradationThresholds::new(2, 4, 6).unwrap();
let d = LoadDetector::new(thresholds);
// Enter 1: count=1, Full
let (l1, _g1) = d.enter();
assert_eq!(l1, DegradationLevel::Full);
// Enter 2: count=2, ReducedCandidates
let (l2, _g2) = d.enter();
assert_eq!(l2, DegradationLevel::ReducedCandidates);
// Enter 3: count=3, still ReducedCandidates
let (l3, _g3) = d.enter();
assert_eq!(l3, DegradationLevel::ReducedCandidates);
// Enter 4: count=4, CoarseAggregates
let (l4, _g4) = d.enter();
assert_eq!(l4, DegradationLevel::CoarseAggregates);
// Enter 5: count=5, still CoarseAggregates
let (l5, _g5) = d.enter();
assert_eq!(l5, DegradationLevel::CoarseAggregates);
// Enter 6: count=6, NoDiversity
let (l6, _g6) = d.enter();
assert_eq!(l6, DegradationLevel::NoDiversity);
}
#[test]
fn guard_drop_on_panic_path() {
let d = LoadDetector::new(DegradationThresholds::default());
// Simulate a panic path: the guard must still decrement.
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let (_level, _guard) = d.enter();
panic!("simulated query executor panic");
}));
assert!(result.is_err());
assert_eq!(d.in_flight(), 0, "guard must decrement even on panic");
}
// Property test: in_flight never goes negative.
mod proptests {
use super::*;
use proptest::prelude::*;
proptest! {
#[test]
fn in_flight_never_negative(ops in proptest::collection::vec(prop::bool::ANY, 1..100)) {
let d = LoadDetector::new(DegradationThresholds::default());
let mut guards = Vec::new();
for enter in ops {
if enter {
guards.push(d.enter().1);
} else if let Some(g) = guards.pop() {
drop(g);
}
}
// After dropping all remaining guards:
drop(guards);
prop_assert_eq!(d.in_flight(), 0);
}
}
}
}