tidaldb/docs/planning/milestone-7/phase-2/task-01-degradation-level-load-detector.md
2026-02-23 22:41:16 -07:00

15 KiB

Task 01: DegradationLevel Enum + Load Detector

Delivers

DegradationLevel enum representing the four quality stages. LoadDetector struct with an AtomicU64 in-flight counter, configurable thresholds, and an RAII InFlightGuard that decrements on drop. DegradationThresholds config struct with defaults matching the spec.

Complexity: M

Dependencies

  • None from prior m7p2 tasks (this is the foundation)
  • m7p1 MetricsState (for gauge reporting -- optional integration, not blocking)

Technical Design

1. Create the load module

Create tidal/src/load/mod.rs and tidal/src/load/detector.rs. Add pub mod load; to tidal/src/lib.rs.

2. DegradationLevel enum

// tidal/src/load/mod.rs

pub mod detector;
pub mod rate_limiter;

pub use detector::{DegradationLevel, DegradationThresholds, InFlightGuard, LoadDetector};
pub use rate_limiter::RateLimiter;
// tidal/src/load/detector.rs

use std::sync::atomic::{AtomicU64, Ordering};

/// The four degradation stages, ordered from best to worst quality.
///
/// Each level corresponds to a specific set of optimizations that reduce
/// per-query work. The level is computed once at query entry from the
/// in-flight counter and threaded through the executor -- it is NOT
/// re-checked mid-pipeline to avoid inconsistent behavior within a
/// single query.
///
/// Ordering: Full > ReducedCandidates > CoarseAggregates > NoDiversity
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[repr(u8)]
pub enum DegradationLevel {
    /// Normal operation. All stages run at full fidelity.
    Full = 0,
    /// ANN top_k reduced (500 -> 200), BM25 candidate limit halved.
    /// Signal scoring and diversity remain unchanged.
    ReducedCandidates = 1,
    /// In addition to ReducedCandidates: windowed count reads fall back
    /// to AllTime; velocity reads fall back to 24h window.
    CoarseAggregates = 2,
    /// In addition to CoarseAggregates: diversity enforcement skipped entirely.
    /// This is the last resort before rejecting queries.
    NoDiversity = 3,
}

impl DegradationLevel {
    /// Whether this level reduces candidate generation work.
    #[must_use]
    pub const fn reduces_candidates(self) -> bool {
        matches!(
            self,
            Self::ReducedCandidates | Self::CoarseAggregates | Self::NoDiversity
        )
    }

    /// Whether this level coarsens signal aggregation reads.
    #[must_use]
    pub const fn coarsens_aggregates(self) -> bool {
        matches!(self, Self::CoarseAggregates | Self::NoDiversity)
    }

    /// Whether this level skips diversity enforcement.
    #[must_use]
    pub const fn skips_diversity(self) -> bool {
        matches!(self, Self::NoDiversity)
    }
}

impl std::fmt::Display for DegradationLevel {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::Full => f.write_str("full"),
            Self::ReducedCandidates => f.write_str("reduced_candidates"),
            Self::CoarseAggregates => f.write_str("coarse_aggregates"),
            Self::NoDiversity => f.write_str("no_diversity"),
        }
    }
}

3. DegradationThresholds config

/// Configurable thresholds for degradation level transitions.
///
/// The load detector computes the degradation level by comparing the
/// current in-flight query count against these thresholds. The thresholds
/// are inclusive: at exactly `reduced_candidates` in-flight queries, the
/// level transitions from `Full` to `ReducedCandidates`.
///
/// Invariant: `reduced_candidates < coarse_aggregates < no_diversity`.
/// Enforced at construction time.
#[derive(Debug, Clone, Copy)]
pub struct DegradationThresholds {
    /// In-flight count at which candidate generation is reduced.
    pub reduced_candidates: u64,
    /// In-flight count at which signal aggregation is coarsened.
    pub coarse_aggregates: u64,
    /// In-flight count at which diversity is disabled.
    pub no_diversity: u64,
}

impl Default for DegradationThresholds {
    fn default() -> Self {
        Self {
            reduced_candidates: 200,
            coarse_aggregates: 500,
            no_diversity: 1000,
        }
    }
}

impl DegradationThresholds {
    /// Create new thresholds, validating the ordering invariant.
    ///
    /// # Errors
    ///
    /// Returns `Err` if the thresholds are not strictly increasing.
    pub fn new(
        reduced_candidates: u64,
        coarse_aggregates: u64,
        no_diversity: u64,
    ) -> Result<Self, String> {
        if reduced_candidates == 0 {
            return Err("reduced_candidates must be > 0".to_string());
        }
        if reduced_candidates >= coarse_aggregates {
            return Err(format!(
                "reduced_candidates ({reduced_candidates}) must be < coarse_aggregates ({coarse_aggregates})"
            ));
        }
        if coarse_aggregates >= no_diversity {
            return Err(format!(
                "coarse_aggregates ({coarse_aggregates}) must be < no_diversity ({no_diversity})"
            ));
        }
        Ok(Self {
            reduced_candidates,
            coarse_aggregates,
            no_diversity,
        })
    }
}

4. LoadDetector

/// Tracks in-flight query count and computes degradation level.
///
/// Thread-safe via `AtomicU64`. The counter is incremented via `enter()`
/// which returns an `InFlightGuard` that decrements on drop. This RAII
/// pattern guarantees the counter is always consistent, even if the
/// query executor panics.
///
/// The `DegradationLevel` is sampled once at `enter()` time and returned
/// alongside the guard. The executor must NOT re-read the level mid-query
/// to avoid inconsistent behavior within a single request.
pub struct LoadDetector {
    in_flight: AtomicU64,
    thresholds: DegradationThresholds,
}

impl LoadDetector {
    /// Create a new detector with the given thresholds.
    #[must_use]
    pub const fn new(thresholds: DegradationThresholds) -> Self {
        Self {
            in_flight: AtomicU64::new(0),
            thresholds,
        }
    }

    /// Enter a query. Increments the in-flight counter and returns:
    /// - The `DegradationLevel` computed from the NEW counter value
    /// - An `InFlightGuard` that decrements the counter on drop
    ///
    /// The level is computed AFTER incrementing so that the Nth query
    /// sees itself in the count. This is the conservative choice:
    /// it triggers degradation one query earlier than checking before
    /// incrementing.
    #[must_use]
    pub fn enter(&self) -> (DegradationLevel, InFlightGuard<'_>) {
        // Relaxed is sufficient: we only need atomicity, not
        // happens-before ordering with any other memory location.
        // The in-flight counter is a statistical gauge, not a
        // synchronization primitive.
        let current = self.in_flight.fetch_add(1, Ordering::Relaxed) + 1;
        let level = self.level_for(current);
        (level, InFlightGuard { detector: self })
    }

    /// Read the current in-flight count without modifying it.
    ///
    /// Useful for metrics reporting and health checks.
    #[must_use]
    pub fn in_flight(&self) -> u64 {
        self.in_flight.load(Ordering::Relaxed)
    }

    /// Compute the degradation level for a given in-flight count.
    #[must_use]
    fn level_for(&self, count: u64) -> DegradationLevel {
        if count >= self.thresholds.no_diversity {
            DegradationLevel::NoDiversity
        } else if count >= self.thresholds.coarse_aggregates {
            DegradationLevel::CoarseAggregates
        } else if count >= self.thresholds.reduced_candidates {
            DegradationLevel::ReducedCandidates
        } else {
            DegradationLevel::Full
        }
    }
}

5. InFlightGuard (RAII decrement)

/// RAII guard that decrements the `LoadDetector` in-flight counter on drop.
///
/// Created by `LoadDetector::enter()`. Must not be forgotten (`mem::forget`
/// would leak the count). In practice this is held on the stack of the
/// `TidalDb::retrieve()` / `TidalDb::search()` methods and drops when
/// the method returns (success or error).
pub struct InFlightGuard<'a> {
    detector: &'a LoadDetector,
}

impl Drop for InFlightGuard<'_> {
    fn drop(&mut self) {
        // Relaxed: same reasoning as in enter(). The decrement only needs
        // to be atomic, not ordered with respect to other memory.
        self.detector.in_flight.fetch_sub(1, Ordering::Relaxed);
    }
}

6. Wire into TidalDb

Add the LoadDetector as a field on TidalDb:

// In tidal/src/db/mod.rs, add to TidalDb struct:
load_detector: Arc<crate::load::LoadDetector>,

// In from_config() and from_parts(), initialize:
load_detector: Arc::new(crate::load::LoadDetector::new(
    crate::load::DegradationThresholds::default(),
)),

Expose a public accessor so that the Config can override thresholds in a future task:

impl TidalDb {
    /// Access the load detector for metrics and health check reporting.
    #[must_use]
    pub fn load_detector(&self) -> &crate::load::LoadDetector {
        &self.load_detector
    }
}

Acceptance Criteria

  • DegradationLevel enum with 4 variants and u8 repr
  • DegradationLevel derives Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash
  • DegradationLevel::Display produces lowercase snake_case strings
  • Helper methods: reduces_candidates(), coarsens_aggregates(), skips_diversity()
  • DegradationThresholds with default (200, 500, 1000) and validated constructor
  • LoadDetector::enter() returns (DegradationLevel, InFlightGuard) atomically
  • InFlightGuard decrements counter on drop (verified by test)
  • LoadDetector::in_flight() returns current gauge value
  • LoadDetector wired into TidalDb behind Arc
  • pub mod load; added to lib.rs
  • All tests pass: cargo test --manifest-path tidal/Cargo.toml
  • cargo clippy -D warnings clean

Test Strategy

#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
    use super::*;

    #[test]
    fn default_thresholds() {
        let t = DegradationThresholds::default();
        assert_eq!(t.reduced_candidates, 200);
        assert_eq!(t.coarse_aggregates, 500);
        assert_eq!(t.no_diversity, 1000);
    }

    #[test]
    fn thresholds_validation_rejects_bad_order() {
        assert!(DegradationThresholds::new(500, 200, 1000).is_err());
        assert!(DegradationThresholds::new(200, 1000, 500).is_err());
        assert!(DegradationThresholds::new(0, 500, 1000).is_err());
    }

    #[test]
    fn thresholds_validation_accepts_good_order() {
        assert!(DegradationThresholds::new(100, 300, 600).is_ok());
    }

    #[test]
    fn level_display() {
        assert_eq!(DegradationLevel::Full.to_string(), "full");
        assert_eq!(
            DegradationLevel::ReducedCandidates.to_string(),
            "reduced_candidates"
        );
        assert_eq!(
            DegradationLevel::CoarseAggregates.to_string(),
            "coarse_aggregates"
        );
        assert_eq!(DegradationLevel::NoDiversity.to_string(), "no_diversity");
    }

    #[test]
    fn level_ordering() {
        assert!(DegradationLevel::Full < DegradationLevel::ReducedCandidates);
        assert!(DegradationLevel::ReducedCandidates < DegradationLevel::CoarseAggregates);
        assert!(DegradationLevel::CoarseAggregates < DegradationLevel::NoDiversity);
    }

    #[test]
    fn level_helper_methods() {
        assert!(!DegradationLevel::Full.reduces_candidates());
        assert!(DegradationLevel::ReducedCandidates.reduces_candidates());
        assert!(DegradationLevel::CoarseAggregates.reduces_candidates());
        assert!(DegradationLevel::NoDiversity.reduces_candidates());

        assert!(!DegradationLevel::Full.coarsens_aggregates());
        assert!(!DegradationLevel::ReducedCandidates.coarsens_aggregates());
        assert!(DegradationLevel::CoarseAggregates.coarsens_aggregates());
        assert!(DegradationLevel::NoDiversity.coarsens_aggregates());

        assert!(!DegradationLevel::Full.skips_diversity());
        assert!(!DegradationLevel::ReducedCandidates.skips_diversity());
        assert!(!DegradationLevel::CoarseAggregates.skips_diversity());
        assert!(DegradationLevel::NoDiversity.skips_diversity());
    }

    #[test]
    fn detector_starts_at_zero() {
        let d = LoadDetector::new(DegradationThresholds::default());
        assert_eq!(d.in_flight(), 0);
    }

    #[test]
    fn enter_increments_and_guard_decrements() {
        let d = LoadDetector::new(DegradationThresholds::default());
        assert_eq!(d.in_flight(), 0);

        let (level, guard) = d.enter();
        assert_eq!(level, DegradationLevel::Full);
        assert_eq!(d.in_flight(), 1);

        drop(guard);
        assert_eq!(d.in_flight(), 0);
    }

    #[test]
    fn degradation_level_transitions() {
        let thresholds = DegradationThresholds::new(2, 4, 6).unwrap();
        let d = LoadDetector::new(thresholds);

        // Enter 1: count=1, Full
        let (l1, _g1) = d.enter();
        assert_eq!(l1, DegradationLevel::Full);

        // Enter 2: count=2, ReducedCandidates
        let (l2, _g2) = d.enter();
        assert_eq!(l2, DegradationLevel::ReducedCandidates);

        // Enter 3: count=3, still ReducedCandidates
        let (l3, _g3) = d.enter();
        assert_eq!(l3, DegradationLevel::ReducedCandidates);

        // Enter 4: count=4, CoarseAggregates
        let (l4, _g4) = d.enter();
        assert_eq!(l4, DegradationLevel::CoarseAggregates);

        // Enter 5: count=5, still CoarseAggregates
        let (l5, _g5) = d.enter();
        assert_eq!(l5, DegradationLevel::CoarseAggregates);

        // Enter 6: count=6, NoDiversity
        let (l6, _g6) = d.enter();
        assert_eq!(l6, DegradationLevel::NoDiversity);
    }

    #[test]
    fn guard_drop_on_panic_path() {
        let d = LoadDetector::new(DegradationThresholds::default());

        // Simulate a panic path: the guard must still decrement.
        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
            let (_level, _guard) = d.enter();
            panic!("simulated query executor panic");
        }));

        assert!(result.is_err());
        assert_eq!(d.in_flight(), 0, "guard must decrement even on panic");
    }

    // Property test: in_flight never goes negative.
    mod proptests {
        use super::*;
        use proptest::prelude::*;

        proptest! {
            #[test]
            fn in_flight_never_negative(ops in proptest::collection::vec(prop::bool::ANY, 1..100)) {
                let d = LoadDetector::new(DegradationThresholds::default());
                let mut guards = Vec::new();
                for enter in ops {
                    if enter {
                        guards.push(d.enter().1);
                    } else if let Some(g) = guards.pop() {
                        drop(g);
                    }
                }
                // After dropping all remaining guards:
                drop(guards);
                prop_assert_eq!(d.in_flight(), 0);
            }
        }
    }
}