10 KiB
Task 02: Query Executor Degradation Branches
Delivers
Wire DegradationLevel into RetrieveExecutor and SearchExecutor so that each stage of the pipeline respects the current degradation level. Candidate generation, signal aggregation, and diversity enforcement all have degradation-aware code paths.
Complexity: M
Dependencies
- task-01 (
DegradationLevel,LoadDetector,InFlightGuard) - m2p5
RetrieveExecutor(6-stage pipeline) - m5p3
SearchExecutor(8-stage pipeline)
Technical Design
1. Thread DegradationLevel through executor construction
Both executors are constructed per-query in TidalDb::retrieve() and TidalDb::search(). The degradation level is passed as a field on the executor.
// In tidal/src/query/executor/mod.rs, add to RetrieveExecutor:
pub struct RetrieveExecutor<'a> {
// ... existing fields ...
degradation_level: crate::load::DegradationLevel,
}
impl<'a> RetrieveExecutor<'a> {
// Add builder method:
#[must_use]
pub const fn with_degradation_level(
mut self,
level: crate::load::DegradationLevel,
) -> Self {
self.degradation_level = level;
self
}
}
// In tidal/src/query/search/executor.rs, add to SearchExecutor:
pub struct SearchExecutor<'a> {
// ... existing fields ...
degradation_level: crate::load::DegradationLevel,
}
impl<'a> SearchExecutor<'a> {
#[must_use]
pub const fn with_degradation_level(
mut self,
level: crate::load::DegradationLevel,
) -> Self {
self.degradation_level = level;
self
}
}
Default to DegradationLevel::Full in new() so that all existing tests pass without modification.
2. Wire LoadDetector into TidalDb::retrieve() and TidalDb::search()
In tidal/src/db/query_ops.rs (or wherever retrieve() / search() are implemented):
impl TidalDb {
pub fn retrieve(&self, query: &Retrieve) -> crate::Result<Results> {
// Enter the load detector. The guard decrements on drop (method return).
let (degradation_level, _guard) = self.load_detector.enter();
tracing::debug!(
degradation = %degradation_level,
in_flight = self.load_detector.in_flight(),
"query entry"
);
// Build executor as before, then wire degradation level:
let executor = RetrieveExecutor::new(/* ... */)
.with_degradation_level(degradation_level);
// ... rest of the method
}
}
Same pattern for search().
3. Stage 1 -- ReducedCandidates: ANN top_k and BM25 limit
In SearchExecutor::execute(), the ANN top_k is currently computed as:
let k = (query.limit as usize * 20).max(200);
Under ReducedCandidates, reduce the over-fetch factor:
let k = if self.degradation_level.reduces_candidates() {
// Reduced: cap at 200 regardless of query limit.
// This cuts ANN search work by ~60% for typical limit=20 queries.
(query.limit as usize * 10).max(100).min(200)
} else {
(query.limit as usize * 20).max(200)
};
For BM25, the AllScoresCollector currently returns all matches. Under degradation, post-truncate the BM25 results:
if self.degradation_level.reduces_candidates() {
// Cap BM25 candidates at half the normal budget to reduce
// downstream scoring work. Truncation preserves BM25 rank order
// so the top results are not lost, only the long tail.
let bm25_cap = (query.limit as usize * 10).max(100);
bm25_results.truncate(bm25_cap);
}
For RetrieveExecutor, the Scan candidate strategy currently caps at the universe size. Under degradation, reduce the scan cap:
// In candidate_gen::scan_candidates, or at the call site:
let scan_cap = if degradation_level.reduces_candidates() {
query.limit * 5 // reduced from default 10x over-fetch
} else {
query.limit * 10
};
4. Stage 3 -- CoarseAggregates: signal read fallback
The signal scoring stage reads windowed counts and velocity values through ProfileExecutor::score() -> helpers::read_agg(). Under CoarseAggregates, the executor should override the window argument.
Add a degradation-aware helper that substitutes coarse windows:
// In tidal/src/ranking/executor/helpers.rs or a new helper module:
use crate::load::DegradationLevel;
use crate::schema::Window;
/// Adjust the window for signal reads under degradation.
///
/// Under `CoarseAggregates` or `NoDiversity`:
/// - Windowed count requests fall back to `AllTime` (cheapest read)
/// - Velocity requests fall back to `TwentyFourHours` (widest cached window)
///
/// This avoids per-bucket scans in SWAG and uses pre-aggregated values.
#[must_use]
pub const fn degraded_window(
window: Window,
degradation: DegradationLevel,
) -> Window {
if degradation.coarsens_aggregates() {
Window::AllTime
} else {
window
}
}
/// Adjust the velocity window under degradation.
#[must_use]
pub const fn degraded_velocity_window(
window: Window,
degradation: DegradationLevel,
) -> Window {
if degradation.coarsens_aggregates() {
Window::TwentyFourHours
} else {
window
}
}
Wire this into ProfileExecutor by threading the DegradationLevel through the scoring path. The cleanest approach: add an optional DegradationLevel field to ProfileExecutor:
impl<'a> ProfileExecutor<'a> {
#[must_use]
pub const fn with_degradation(mut self, level: DegradationLevel) -> Self {
self.degradation_level = level;
self
}
}
Then in the read_agg call sites inside score_candidate(), apply the degradation window substitution.
5. Stage 4 -- NoDiversity: skip diversity enforcement
In RetrieveExecutor::execute(), the diversity stage currently runs unconditionally when constraints are present. Under NoDiversity, skip it:
// Stage 4: Diversity Enforcement
let (final_candidates, constraints_satisfied) =
if self.degradation_level.skips_diversity() {
// NoDiversity: skip the diversity pass entirely.
// Log a warning so the caller knows quality is reduced.
warnings.push(
"diversity enforcement skipped due to load degradation".to_string(),
);
(scored, true)
} else if let Some(diversity) = effective_diversity {
let result = DiversitySelector::select(&scored, diversity, scored.len());
// ... existing diversity logic ...
} else {
(scored, true)
};
Same pattern in SearchExecutor::execute():
let (final_candidates, constraints_satisfied) =
if self.degradation_level.skips_diversity() {
warnings.push(
"diversity enforcement skipped due to load degradation".to_string(),
);
(scored, true)
} else if let Some(ref diversity) = query.diversity {
// ... existing diversity logic ...
} else {
(scored, true)
};
6. Thread degradation level into Results
The executor already constructs Results at the end of execute(). Pass the degradation level through to the response. This is covered in detail by task-03 (which adds the field to Results and SearchResults), but the executor must set it:
Ok(Results {
items,
next_cursor,
total_candidates: total_scored,
constraints_satisfied,
warnings,
session_snapshot: self.session_snapshot.clone(),
degradation_level: self.degradation_level,
})
Acceptance Criteria
RetrieveExecutorhasdegradation_levelfield, defaultFullSearchExecutorhasdegradation_levelfield, defaultFullwith_degradation_level()builder method on both executors- ANN
top_kreduced underReducedCandidates(capped at 200) - BM25 results truncated under
ReducedCandidates(halved cap) - Scan candidate over-fetch reduced under
ReducedCandidates - Signal windowed reads fall back to AllTime under
CoarseAggregates - Velocity reads fall back to 24h under
CoarseAggregates - Diversity pass skipped under
NoDiversitywith warning LoadDetector::enter()called inTidalDb::retrieve()andTidalDb::search()InFlightGuardheld for the duration of each query method- All existing executor tests still pass (degradation defaults to
Full) cargo clippy -D warningsclean
Test Strategy
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use crate::load::DegradationLevel;
// Test that executor with DegradationLevel::Full behaves identically
// to the pre-degradation baseline (all existing tests implicitly cover this).
#[test]
fn reduced_candidates_caps_ann_top_k() {
// Build a SearchExecutor with ReducedCandidates level.
// Verify the ANN search k parameter is <= 200.
// This requires either inspecting the k value via a test hook
// or verifying that fewer candidates are produced.
}
#[test]
fn no_diversity_skips_enforcement() {
// Build 10 items all from the same creator.
// Query with max_per_creator=2 and NoDiversity.
// All 10 should be returned (diversity not enforced).
// Verify warning message includes "load degradation".
}
#[test]
fn no_diversity_adds_warning() {
// Run a query under NoDiversity.
// Assert results.warnings contains the degradation message.
}
#[test]
fn full_level_enforces_diversity() {
// Same setup as no_diversity_skips_enforcement but with Full level.
// Only 2 per creator should be returned.
}
// Property test: for any DegradationLevel, a valid query always
// returns Ok (not Err). This verifies "under 3x overload, all
// well-formed queries return results."
mod proptests {
use super::*;
use proptest::prelude::*;
proptest! {
#[test]
fn query_always_succeeds_regardless_of_degradation(
level in prop::sample::select(vec![
DegradationLevel::Full,
DegradationLevel::ReducedCandidates,
DegradationLevel::CoarseAggregates,
DegradationLevel::NoDiversity,
])
) {
// Build a minimal executor with the given degradation level.
// Execute a simple query. Assert Ok.
}
}
}
}