tidaldb/docs/planning/milestone-2/phase-5/task-02-retrieve-executor-pipeline.md
jordan 6fdaa1584b feat: complete M1 signal engine — m0p3 samples/docs, m1p5 TidalDb API, examples, and periodic checkpoint
- m0p3: CONTRIBUTING.md with run-samples checklist, all 4 examples
  (quickstart, cli_embedding, axum_embedding, actix_embedding), doc-test
  coverage for every public API surface
- m1p5: TidalDb public API — write_item, signal, read_decay_score,
  read_windowed_count, read_velocity; StorageBox enum routing memory vs
  fjall; WalSender/WalHandleWriter bridge; WAL replay on open
- Periodic checkpoint: 30s background thread for persistent+schema mode;
  FjallBackend::Clone (O(1), fjall::Keyspace is ref-counted); graceful
  shutdown via Arc<AtomicBool> + join before final checkpoint
- ROADMAP.md: M0 and M1 fully marked COMPLETE (341 tests passing)
- Milestone 2 planning scaffolding added under docs/planning/milestone-2/

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-20 22:45:10 -07:00

37 KiB

Task 02: RETRIEVE Executor Pipeline

Context

Milestone: 2 -- Ranked Retrieval Phase: m2p5 -- Query Parser and RETRIEVE Executor Depends On: Task 01 (Retrieve, Results, RetrieveResult, Cursor, QueryError, ProfileRef) Blocks: Task 03 (M2 UAT Integration Test) Complexity: L

Objective

Deliver the RetrieveExecutor -- the orchestrator that wires m2p1 (vector index), m2p2 (filter engine), m2p3 (profile executor), and m2p4 (diversity selector) into a single 5-stage pipeline that executes a Retrieve query and returns Results. This is the "one query" entry point where a developer calls db.retrieve(query) and gets ranked, filtered, diverse results.

The executor also delivers TidalDb::retrieve() -- the public API method that constructs the executor from the database's internal state and delegates to it. After this task, the full RETRIEVE query path works end-to-end.

The key performance gate: end-to-end RETRIEVE latency < 50ms at 10K items (Criterion benchmarked). This budget is distributed across the pipeline stages: candidate generation (~10ms ANN or ~5ms scan), filter evaluation (~1ms), scoring (~100us), diversity (~1ms), result assembly (~100us).

Requirements

  • RetrieveExecutor struct: borrows all subsystem references needed for query execution
  • 5-stage pipeline: candidate generation -> filter evaluation -> signal scoring -> diversity enforcement -> result assembly
  • Candidate generation routes to one of three strategies based on the profile's CandidateStrategy: Ann (ANN search), Scan (full entity scan), SignalRanked (top-K by signal value)
  • Filter evaluation uses FilterEvaluator from m2p2 to apply metadata filters to the candidate set
  • Signal scoring uses ProfileExecutor from m2p3 to score and sort candidates
  • Diversity enforcement uses DiversitySelector from m2p4 to enforce max_per_creator and format_mix
  • Result assembly constructs Results from the diversity output, including signal snapshots and pagination cursor
  • TidalDb::retrieve() public method wires the executor to the public API
  • Criterion benchmarks meeting the < 50ms target at 10K items
  • No unsafe code

Technical Design

Module Structure

tidal/src/
  query/
    executor.rs  -- RetrieveExecutor, pipeline stages (this task)
    mod.rs       -- add `pub mod executor;` and re-export RetrieveExecutor
  lib.rs         -- add TidalDb::retrieve() method
tidal/benches/
  query.rs       -- Criterion benchmarks (this task)
tidal/Cargo.toml -- add [[bench]] name = "query" harness = false

Public API

// === query/executor.rs ===

use crate::query::retrieve::*;
use crate::ranking::diversity::{DiversityConstraints, DiversitySelector};
use crate::ranking::executor::{ProfileExecutor, ScoredCandidate};
use crate::ranking::profile::RankingProfile;
use crate::ranking::registry::ProfileRegistry;
use crate::schema::{EntityId, EntityKind, Schema, Timestamp};
use crate::signals::SignalLedger;
use crate::storage::indexes::filter::{FilterEvaluator, FilterResult};
use crate::storage::vector::registry::EmbeddingSlotRegistry;
use crate::storage::StorageEngine;

/// Executes RETRIEVE queries by orchestrating all M2 subsystems.
///
/// The executor is a stateless orchestrator -- it holds borrowed references
/// to the subsystems it coordinates and has no state of its own. If the
/// executor is dropped, no data is lost.
///
/// # Pipeline Stages
///
/// ```text
/// Stage 1: Candidate Generation
///   ANN search | Full scan | Signal-ranked
///   -> candidate set: Vec<EntityId> (200-500 candidates)
///
/// Stage 2: Filter Evaluation
///   FilterEvaluator::evaluate() -> bitmap intersection
///   -> surviving candidates (100-500)
///
/// Stage 3: Signal Scoring
///   ProfileExecutor::score() -> Vec<ScoredCandidate> sorted by score
///   -> scored, sorted, gate-filtered candidates
///
/// Stage 4: Diversity Enforcement
///   DiversitySelector::select() -> DiversityResult
///   -> reordered candidates satisfying constraints
///
/// Stage 5: Result Assembly
///   Take first `limit` items, build RetrieveResult with signal snapshots
///   -> Results with next_cursor
/// ```
///
/// # Performance
///
/// Target: end-to-end < 50ms at 10K items.
/// Stage budgets: candidate gen ~10ms, filter ~1ms, scoring ~100us,
/// diversity ~1ms, assembly ~100us.
pub struct RetrieveExecutor<'a> {
    /// Signal ledger for signal reads during scoring.
    ledger: &'a SignalLedger,

    /// Entity store for metadata reads (creator_id, format).
    entity_store: &'a dyn StorageEngine,

    /// Vector index registry for ANN candidate generation.
    vector_index: &'a EmbeddingSlotRegistry,

    /// Filter evaluator for metadata filter application.
    filter_evaluator: &'a FilterEvaluator<'a>,

    /// Profile registry for resolving profile names to definitions.
    profile_registry: &'a ProfileRegistry,

    /// Schema for resolving signal type IDs and entity metadata.
    schema: &'a Schema,
}

impl<'a> RetrieveExecutor<'a> {
    /// Create a new executor with references to all required subsystems.
    pub fn new(
        ledger: &'a SignalLedger,
        entity_store: &'a dyn StorageEngine,
        vector_index: &'a EmbeddingSlotRegistry,
        filter_evaluator: &'a FilterEvaluator<'a>,
        profile_registry: &'a ProfileRegistry,
        schema: &'a Schema,
    ) -> Self {
        Self {
            ledger,
            entity_store,
            vector_index,
            filter_evaluator,
            profile_registry,
            schema,
        }
    }

    /// Execute a RETRIEVE query.
    ///
    /// This is the main entry point. It validates the query, constructs
    /// the pipeline, executes each stage, and returns the result set.
    ///
    /// # Errors
    ///
    /// Returns `QueryError::ProfileNotFound` if the profile does not exist.
    /// Returns `QueryError::UnsupportedStrategy` if the profile's candidate
    /// strategy is not supported in M2 (e.g., Relationship, CohortTrending).
    /// Returns `QueryError::IndexNotAvailable` if ANN retrieval is requested
    /// but no vector index exists for the entity kind.
    /// Returns `QueryError::StorageError` on underlying storage failures.
    pub fn retrieve(&self, query: &Retrieve) -> Result<Results, QueryError> {
        // Validate the query
        query.validate(self.profile_registry)?;

        // Resolve the profile
        let profile = self.resolve_profile(&query.profile)?;
        let now = Timestamp::now();

        // Stage 1: Candidate Generation
        let mut candidates = self.generate_candidates(query, profile, now)?;

        // Apply exclude list
        if !query.exclude.is_empty() {
            let exclude_set: std::collections::HashSet<EntityId> =
                query.exclude.iter().copied().collect();
            candidates.retain(|id| !exclude_set.contains(id));
        }

        // Stage 2: Filter Evaluation
        let candidates = self.apply_filters(query, candidates)?;

        // Stage 3: Signal Scoring
        let scored = self.score_candidates(&candidates, profile, now);

        let total_scored = scored.len();

        // Stage 4: Diversity Enforcement
        let (diverse_candidates, constraints_satisfied) =
            self.apply_diversity(query, scored)?;

        // Stage 5: Result Assembly
        self.assemble_results(query, diverse_candidates, total_scored, constraints_satisfied)
    }

    /// Resolve a ProfileRef to a RankingProfile.
    fn resolve_profile(
        &self,
        profile_ref: &ProfileRef,
    ) -> Result<&'a RankingProfile, QueryError> {
        match profile_ref.version {
            Some(v) => self
                .profile_registry
                .get_versioned(&profile_ref.name, v)
                .ok_or_else(|| {
                    QueryError::ProfileNotFound(format!(
                        "{}@v{}",
                        profile_ref.name, v
                    ))
                }),
            None => self
                .profile_registry
                .get(&profile_ref.name)
                .ok_or_else(|| {
                    QueryError::ProfileNotFound(profile_ref.name.clone())
                }),
        }
    }
}

Stage 1: Candidate Generation

impl<'a> RetrieveExecutor<'a> {
    /// Generate the initial candidate set based on the profile's strategy.
    ///
    /// For M2, three strategies are implemented:
    /// - `Ann`: ANN search over the default embedding slot for the entity kind
    /// - `Scan`: Full entity scan (all entity IDs in the store)
    /// - `SignalRanked`: Top-K entities by signal value from the ledger
    ///
    /// The overprovisioning factor (2-4x the requested limit) ensures enough
    /// candidates survive filtering, scoring, and diversity to fill the page.
    fn generate_candidates(
        &self,
        query: &Retrieve,
        profile: &RankingProfile,
        now: Timestamp,
    ) -> Result<Vec<EntityId>, QueryError> {
        let overprovision = std::cmp::max(query.limit * 4, 200);

        match profile.candidate_strategy() {
            CandidateStrategy::Ann { .. } => {
                self.generate_ann_candidates(query, overprovision)
            }
            CandidateStrategy::Scan { .. } => {
                self.generate_scan_candidates(query, overprovision)
            }
            CandidateStrategy::SignalRanked { .. } => {
                self.generate_signal_ranked_candidates(query, profile, overprovision, now)
            }
            other => Err(QueryError::UnsupportedStrategy(format!(
                "{other:?} is not supported in M2"
            ))),
        }
    }

    /// ANN candidate generation: query the vector index.
    ///
    /// Uses the default embedding slot for the entity kind.
    /// For M2, no user preference vector is available, so the query vector
    /// is derived from the embedding space (e.g., a representative vector
    /// or the adaptive planner's default).
    ///
    /// If filters are present, the adaptive query planner selects the
    /// appropriate ANN strategy (brute-force, widened HNSW, or in-graph).
    fn generate_ann_candidates(
        &self,
        query: &Retrieve,
        top_k: usize,
    ) -> Result<Vec<EntityId>, QueryError> {
        let slot = self
            .vector_index
            .default_slot(query.entity_kind)
            .ok_or_else(|| {
                QueryError::IndexNotAvailable(format!(
                    "no embedding slot for {:?}",
                    query.entity_kind
                ))
            })?;

        // For M2, no user preference vector is available (that is M3+).
        // Use a zero vector as a placeholder. For L2 metric this produces
        // distance=1.0 from all normalized vectors (arbitrary order).
        // For cosine metric, verify USearch handles zero-norm gracefully;
        // if not, the ANN strategy must fall back to Scan for M2.
        let dimensions = slot.dimensions();
        let query_vector = vec![0.0f32; dimensions];

        // M2: apply filters post-ANN in Stage 2 (sequential approach).
        // Filter push-down into USearch predicate callbacks is an M3+ optimization.
        // See OVERVIEW.md Open Question 3.
        let results = slot
            .search(&query_vector, top_k, None)
            .map_err(|e| QueryError::IndexNotAvailable(format!("ANN search failed: {e}")))?;

        Ok(results.into_iter().map(|r| r.id).collect())
    }

    /// Scan candidate generation: iterate all entities of the kind.
    ///
    /// Used for profiles like `new` (sorted by created_at) and `alphabetical`.
    /// Loads all entity IDs from the store and returns up to `top_k`.
    fn generate_scan_candidates(
        &self,
        query: &Retrieve,
        top_k: usize,
    ) -> Result<Vec<EntityId>, QueryError> {
        let candidates = self
            .entity_store
            .scan_entity_ids(query.entity_kind, top_k)
            .map_err(|e| QueryError::StorageError(format!("{e}")))?;
        Ok(candidates)
    }

    /// Signal-ranked candidate generation: top-K by signal value.
    ///
    /// Used for profiles like `most_viewed`, `most_liked`. Reads signal
    /// state from the ledger for all entities and returns the top-K by
    /// the profile's primary signal.
    fn generate_signal_ranked_candidates(
        &self,
        query: &Retrieve,
        profile: &RankingProfile,
        top_k: usize,
        now: Timestamp,
    ) -> Result<Vec<EntityId>, QueryError> {
        // Get the primary signal name from the profile's first boost
        let primary_signal = profile
            .primary_signal()
            .ok_or_else(|| {
                QueryError::ProfileNotFound(
                    "signal-ranked profile has no primary signal".to_string(),
                )
            })?;

        let candidates = self
            .ledger
            .top_entities_by_signal(primary_signal, top_k, now)
            .map_err(|e| QueryError::StorageError(format!("{e}")))?;

        Ok(candidates)
    }
}

Stage 2: Filter Evaluation

impl<'a> RetrieveExecutor<'a> {
    /// Apply metadata filters to the candidate set.
    ///
    /// If no filters are specified, the candidate set passes through unchanged.
    /// If filters are specified, evaluate them as a bitmap and intersect with
    /// the candidate set.
    ///
    /// For M2, ANN-then-filter is the approach: candidates are generated
    /// first, then filters are applied as a post-processing step. Filter
    /// push-down into ANN (via predicate callbacks) is an M3+ optimization.
    ///
    /// Note: for Scan strategy with pre-filter bitmap already applied in
    /// Stage 1, this stage may be a no-op (candidates already filtered).
    fn apply_filters(
        &self,
        query: &Retrieve,
        candidates: Vec<EntityId>,
    ) -> Result<Vec<EntityId>, QueryError> {
        let combined = match query.combined_filter() {
            Some(expr) => expr,
            None => return Ok(candidates),
        };

        let filter_result = self
            .filter_evaluator
            .evaluate(&combined)
            .map_err(|e| QueryError::InvalidFilter {
                field: "filter".to_string(),
                reason: format!("{e}"),
            })?;

        match filter_result {
            FilterResult::Bitmap(bitmap) => {
                // Intersect candidates with the filter bitmap.
                //
                // M2 limitation: RoaringBitmap uses u32 keys. Entity IDs are
                // u64 but M2 is bounded to 10K items (well within u32::MAX).
                // M7+ will upgrade to RoaringTreemap for full u64 support.
                // See m2p2 task-01 for the INDEX_ROOT_ID/u32 design rationale.
                Ok(candidates
                    .into_iter()
                    .filter(|id| {
                        debug_assert!(
                            id.as_u64() <= u64::from(u32::MAX),
                            "entity ID {id} exceeds u32 range -- upgrade to RoaringTreemap"
                        );
                        bitmap.contains(id.as_u64() as u32)
                    })
                    .collect())
            }
            FilterResult::Predicate(predicate) => {
                // Evaluate predicate per candidate
                Ok(candidates
                    .into_iter()
                    .filter(|id| predicate(id.as_u64()))
                    .collect())
            }
        }
    }
}

Stage 3: Signal Scoring

impl<'a> RetrieveExecutor<'a> {
    /// Score surviving candidates using the profile's scoring rules.
    ///
    /// Delegates to `ProfileExecutor::score()` from m2p3. The result is
    /// a sorted, gate-filtered list of `ScoredCandidate` with scores
    /// normalized to [0.0, 1.0].
    fn score_candidates(
        &self,
        candidates: &[EntityId],
        profile: &RankingProfile,
        now: Timestamp,
    ) -> Vec<ScoredCandidate> {
        if candidates.is_empty() {
            return Vec::new();
        }

        let profile_executor = ProfileExecutor::new(self.ledger);
        let mut scored = profile_executor.score(candidates, profile, now, None);

        // Enrich scored candidates with creator_id and format for diversity.
        //
        // Degradation semantics: if metadata enrichment fails for a candidate
        // (storage error or missing record), the candidate proceeds without
        // creator_id/format. For diversity, it will be treated as a unique
        // creator (None creator_id never matches another None), and no format
        // constraint applies. This is the "degrade, do not fail" policy from
        // Spec 08 Section 13. The caller may inspect Results.warnings for
        // any enrichment failures.
        for candidate in &mut scored {
            if let Ok(Some(metadata)) = self.entity_store.get_metadata(
                candidate.entity_id,
                EntityKind::Item,
            ) {
                candidate.creator_id = metadata.creator_id;
                candidate.format = metadata.format.clone();
            }
            // On error: candidate proceeds with creator_id=None, format=None.
            // This is intentional degradation, not a bug.
        }

        scored
    }
}

Stage 4: Diversity Enforcement

impl<'a> RetrieveExecutor<'a> {
    /// Apply diversity constraints to the scored candidate list.
    ///
    /// If no diversity constraints are specified (neither on the query
    /// nor on the profile), candidates pass through unchanged.
    fn apply_diversity(
        &self,
        query: &Retrieve,
        candidates: Vec<ScoredCandidate>,
    ) -> Result<(Vec<ScoredCandidate>, bool), QueryError> {
        // Determine active constraints: query overrides profile defaults
        let constraints = match &query.diversity {
            Some(c) => c.clone(),
            None => {
                // No query-level diversity: return candidates as-is
                return Ok((candidates, true));
            }
        };

        let target_count = query.limit;
        let selector = DiversitySelector::new();
        let result = selector.select(candidates, &constraints, target_count);

        let satisfied = result.violations.is_empty();
        Ok((result.selected, satisfied))
    }
}

Stage 5: Result Assembly

impl<'a> RetrieveExecutor<'a> {
    /// Assemble the final Results from the diversity output.
    ///
    /// Applies pagination (cursor offset + limit), constructs RetrieveResult
    /// for each item, and computes the next_cursor.
    fn assemble_results(
        &self,
        query: &Retrieve,
        candidates: Vec<ScoredCandidate>,
        total_scored: usize,
        constraints_satisfied: bool,
    ) -> Result<Results, QueryError> {
        // Apply pagination offset from cursor
        let offset = query
            .cursor
            .as_ref()
            .map(|c| c.offset())
            .unwrap_or(0);

        let limit = query.limit;
        let page_start = std::cmp::min(offset, candidates.len());
        let page_end = std::cmp::min(page_start + limit, candidates.len());

        let items: Vec<RetrieveResult> = candidates[page_start..page_end]
            .iter()
            .enumerate()
            .map(|(i, candidate)| RetrieveResult {
                entity_id: candidate.entity_id,
                score: candidate.score,
                rank: offset + i + 1, // 1-based rank
                signal_snapshot: candidate.signal_snapshot.clone(),
            })
            .collect();

        // Compute next cursor
        let next_cursor = if page_end < candidates.len() {
            Some(Cursor::from_offset(page_end))
        } else {
            None
        };

        Ok(Results {
            items,
            next_cursor,
            total_scored,
            constraints_satisfied,
            warnings: vec![], // TODO: thread warnings from Stage 3 enrichment failures
        })
    }
}

TidalDb::retrieve() Public API

// === lib.rs (modification to TidalDb impl) ===

use crate::query::retrieve::{Retrieve, Results, QueryError};
use crate::query::executor::RetrieveExecutor;

impl TidalDb {
    /// Execute a RETRIEVE query.
    ///
    /// This is the primary ranked retrieval entry point. Given a declarative
    /// query (profile, filters, diversity, limit), the database generates
    /// candidates, applies filters, scores with signals, enforces diversity,
    /// and returns a ranked result set.
    ///
    /// # Example
    ///
    /// ```ignore
    /// let query = Retrieve::builder()
    ///     .profile("trending")
    ///     .diversity(DiversityConstraints::new().max_per_creator(1))
    ///     .limit(25)
    ///     .build()?;
    /// let results = db.retrieve(&query)?;
    /// for item in &results.items {
    ///     println!("#{}: entity={} score={:.3}",
    ///              item.rank, item.entity_id, item.score);
    /// }
    /// ```
    pub fn retrieve(&self, query: &Retrieve) -> Result<Results, QueryError> {
        // Construct FilterEvaluator per-query to avoid self-referential borrows.
        //
        // FilterEvaluator<'_> borrows from the bitmap and range indexes, which
        // are fields of TidalDb. Storing a FilterEvaluator<'self> in TidalDb
        // would make TidalDb self-referential (a struct containing references
        // to its own fields), which Rust prohibits. The per-query construction
        // is cheap: FilterEvaluator holds only references (no allocation).
        let filter_evaluator = FilterEvaluator::new(
            &self.bitmap_indexes,
            &self.range_indexes,
        );

        let executor = RetrieveExecutor::new(
            &self.signal_ledger,
            self.entity_store(),
            &self.embedding_registry,
            &filter_evaluator,
            &self.profile_registry,
            &self.schema,
        );
        executor.retrieve(query)
    }
}

Criterion Benchmarks

// === tidal/benches/query.rs ===

use criterion::{criterion_group, criterion_main, Criterion};
use tempfile::TempDir;

use tidaldb::query::retrieve::Retrieve;
use tidaldb::ranking::diversity::DiversityConstraints;
use tidaldb::schema::*;
use tidaldb::{Config, TidalDB};

/// Setup: create a TidalDB with 10K items, embeddings, and signal state.
///
/// Items have:
/// - Metadata: category (10 values), format (4 values), creator_id (200 creators)
/// - Embeddings: 64-dim vectors (small for benchmark speed)
/// - Signals: 5 signal events per item (50K total)
fn setup_10k_db() -> (TidalDB, TempDir) {
    let dir = TempDir::new().unwrap();
    let schema = build_m2_schema();
    let db = TidalDB::open(Config {
        data_dir: dir.path().to_owned(),
        schema,
    })
    .unwrap();

    // Write 10K items with metadata and embeddings
    for i in 0..10_000u64 {
        let metadata = item_metadata(i);
        let embedding = generate_embedding(i, 64);
        db.write_item(EntityId::new(i + 1), &metadata, Some(&embedding))
            .unwrap();
    }

    // Write 50K signal events (5 per item average)
    let now = Timestamp::now();
    let seven_days_nanos = 7 * 24 * 3600 * 1_000_000_000u64;
    for i in 0..50_000u64 {
        let entity = EntityId::new((i % 10_000) + 1);
        let signal_types = ["view", "like", "skip", "share", "completion"];
        let signal = signal_types[(i as usize) % signal_types.len()];
        let offset = (i * 7919 + 1) % seven_days_nanos;
        let ts = Timestamp::from_nanos(now.as_nanos().saturating_sub(offset));
        db.signal(signal, entity, 1.0, ts).unwrap();
    }

    (db, dir)
}

/// KEY BENCHMARK: end-to-end trending RETRIEVE at 10K items.
/// Target: < 50ms.
fn bench_retrieve_trending_10k(c: &mut Criterion) {
    let (db, _dir) = setup_10k_db();

    let query = Retrieve::builder()
        .profile("trending")
        .diversity(DiversityConstraints::new().max_per_creator(1))
        .limit(25)
        .build()
        .unwrap();

    c.bench_function("retrieve_trending_10k_items", |b| {
        b.iter(|| {
            let results = db.retrieve(&query).unwrap();
            assert!(!results.is_empty());
        })
    });
}

/// Benchmark: new profile (full scan, no ANN) at 10K items.
/// Expected: < 10ms (scan + metadata sort, no vector search).
fn bench_retrieve_new_10k(c: &mut Criterion) {
    let (db, _dir) = setup_10k_db();

    let query = Retrieve::builder()
        .profile("new")
        .limit(20)
        .build()
        .unwrap();

    c.bench_function("retrieve_new_10k_items", |b| {
        b.iter(|| {
            let results = db.retrieve(&query).unwrap();
            assert!(!results.is_empty());
        })
    });
}

/// Benchmark: hot profile with category filter at 10K items.
fn bench_retrieve_hot_filtered_10k(c: &mut Criterion) {
    let (db, _dir) = setup_10k_db();

    let query = Retrieve::builder()
        .profile("hot")
        .filter(FilterExpr::eq("category", "jazz"))
        .limit(20)
        .build()
        .unwrap();

    c.bench_function("retrieve_hot_filtered_10k_items", |b| {
        b.iter(|| {
            let results = db.retrieve(&query).unwrap();
            // May be empty if no jazz items exist in the random dataset
        })
    });
}

/// Benchmark: controversial profile at 10K items.
fn bench_retrieve_controversial_10k(c: &mut Criterion) {
    let (db, _dir) = setup_10k_db();

    let query = Retrieve::builder()
        .profile("controversial")
        .limit(10)
        .build()
        .unwrap();

    c.bench_function("retrieve_controversial_10k_items", |b| {
        b.iter(|| {
            let results = db.retrieve(&query).unwrap();
            assert!(!results.is_empty());
        })
    });
}

criterion_group!(
    benches,
    bench_retrieve_trending_10k,
    bench_retrieve_new_10k,
    bench_retrieve_hot_filtered_10k,
    bench_retrieve_controversial_10k,
);
criterion_main!(benches);

Error Handling

  • Profile not found: QueryError::ProfileNotFound with the profile name. Occurs in validation before pipeline execution.
  • Unsupported strategy: QueryError::UnsupportedStrategy for Relationship, Hybrid, CohortTrending in M2. Occurs in candidate generation.
  • No vector index: QueryError::IndexNotAvailable when ANN strategy is requested but no embedding slot exists. Occurs in ANN candidate generation.
  • Filter evaluation failure: QueryError::InvalidFilter when a filter references a non-existent field or index. Occurs in Stage 2.
  • Storage error: QueryError::StorageError wraps underlying storage failures during entity reads.
  • Empty results: NOT an error. The pipeline returns Results with an empty items vec, total_scored: 0, and constraints_satisfied: true. This is valid -- the filter may exclude everything.

Test Strategy

Unit Tests

// === Pipeline Stage Tests ===
// These test each stage independently with mock/test data.

#[test]
fn exclude_list_removes_candidates() {
    // Setup: candidates [1, 2, 3, 4, 5], exclude [2, 4]
    // After exclude: [1, 3, 5]
    let candidates = vec![
        EntityId::new(1),
        EntityId::new(2),
        EntityId::new(3),
        EntityId::new(4),
        EntityId::new(5),
    ];
    let exclude = vec![EntityId::new(2), EntityId::new(4)];
    let exclude_set: std::collections::HashSet<EntityId> =
        exclude.iter().copied().collect();
    let filtered: Vec<EntityId> = candidates
        .into_iter()
        .filter(|id| !exclude_set.contains(id))
        .collect();
    assert_eq!(filtered.len(), 3);
    assert_eq!(filtered[0], EntityId::new(1));
    assert_eq!(filtered[1], EntityId::new(3));
    assert_eq!(filtered[2], EntityId::new(5));
}

#[test]
fn result_assembly_pagination_first_page() {
    // 100 scored candidates, limit 25, no cursor
    // -> items[0..25], rank 1-25, next_cursor at offset 25
    let candidates: Vec<ScoredCandidate> = (0..100u64)
        .map(|i| {
            let mut c = ScoredCandidate::new(EntityId::new(i + 1), 1.0 - (i as f64 * 0.01));
            c
        })
        .collect();

    let query = Retrieve::builder().profile("test").limit(25).build().unwrap();
    // (simplified test -- full test requires executor)

    // Verify page slicing
    let offset = 0;
    let limit = 25;
    let page_end = std::cmp::min(offset + limit, candidates.len());
    assert_eq!(page_end, 25);

    let items: Vec<RetrieveResult> = candidates[offset..page_end]
        .iter()
        .enumerate()
        .map(|(i, c)| RetrieveResult {
            entity_id: c.entity_id,
            score: c.score,
            rank: offset + i + 1,
            signal_snapshot: vec![],
        })
        .collect();

    assert_eq!(items.len(), 25);
    assert_eq!(items[0].rank, 1);
    assert_eq!(items[24].rank, 25);
    assert!(page_end < candidates.len()); // next_cursor should exist
}

#[test]
fn result_assembly_pagination_last_page() {
    // 30 scored candidates, limit 25, cursor at offset 25
    // -> items[25..30], rank 26-30, no next_cursor
    let candidates_len = 30;
    let offset = 25;
    let limit = 25;
    let page_end = std::cmp::min(offset + limit, candidates_len);
    assert_eq!(page_end, 30);
    assert_eq!(page_end - offset, 5); // 5 items on last page
    assert!(page_end >= candidates_len); // no next_cursor
}

#[test]
fn result_assembly_empty_candidates() {
    // 0 scored candidates -> empty results, no cursor
    let candidates: Vec<ScoredCandidate> = vec![];
    assert!(candidates.is_empty());
    // Results should have items: [], total_scored: 0, constraints_satisfied: true
}

#[test]
fn result_assembly_ranks_are_one_based() {
    let items: Vec<RetrieveResult> = (0..5)
        .map(|i| RetrieveResult {
            entity_id: EntityId::new(i + 1),
            score: 0.5,
            rank: i as usize + 1,
            signal_snapshot: vec![],
        })
        .collect();
    assert_eq!(items[0].rank, 1);
    assert_eq!(items[4].rank, 5);
}

#[test]
fn scores_descending_in_results() {
    // Verify that results maintain score ordering from scoring stage
    let items: Vec<RetrieveResult> = vec![
        RetrieveResult { entity_id: EntityId::new(1), score: 0.9, rank: 1, signal_snapshot: vec![] },
        RetrieveResult { entity_id: EntityId::new(2), score: 0.7, rank: 2, signal_snapshot: vec![] },
        RetrieveResult { entity_id: EntityId::new(3), score: 0.5, rank: 3, signal_snapshot: vec![] },
    ];
    for pair in items.windows(2) {
        assert!(pair[0].score >= pair[1].score);
    }
}

// === Profile resolution tests ===

#[test]
fn resolve_profile_latest_version() {
    let mut registry = ProfileRegistry::new();
    // register_builtins adds trending, hot, new, etc.
    register_builtins(&mut registry, &[]);
    let profile = registry.get("trending");
    assert!(profile.is_some());
}

#[test]
fn resolve_profile_unknown_name() {
    let registry = ProfileRegistry::new();
    let profile = registry.get("nonexistent");
    assert!(profile.is_none());
}

// === Candidate strategy routing tests ===

#[test]
fn scan_strategy_returns_entity_ids() {
    // Verify that scan candidate generation returns IDs from the store
    // (full integration test in Task 03; this is a unit-level sanity check)
    let ids: Vec<EntityId> = (1..=100u64).map(EntityId::new).collect();
    let top_k = 50;
    let result: Vec<EntityId> = ids.into_iter().take(top_k).collect();
    assert_eq!(result.len(), 50);
}

Integration Tests

Integration tests covering full pipeline execution are in Task 03 (m2_uat.rs). This task's test strategy focuses on unit-level testing of individual pipeline stages and the wiring between them.

Acceptance Criteria

  • RetrieveExecutor::new() takes borrowed references to ledger, entity_store, vector_index, filter_evaluator, profile_registry, schema
  • RetrieveExecutor::retrieve() executes the 5-stage pipeline and returns Results
  • Stage 1: candidate generation routes to Ann, Scan, or SignalRanked based on profile's CandidateStrategy
  • Stage 1: Ann strategy queries the vector index via EmbeddingSlotRegistry and returns entity IDs
  • Stage 1: Scan strategy loads all entity IDs from the entity store
  • Stage 1: SignalRanked strategy reads top-K entities by signal value from the ledger
  • Stage 1: Unsupported strategies (Relationship, Hybrid, CohortTrending) return QueryError::UnsupportedStrategy
  • Stage 1: exclude list is applied after candidate generation (removed via HashSet lookup)
  • Stage 2: filter evaluation uses FilterEvaluator and intersects with candidate set
  • Stage 2: no filters = candidate set passes through unchanged
  • Stage 2: empty filter result (zero matching items) returns empty Results, not an error
  • Stage 3: delegates to ProfileExecutor::score() from m2p3
  • Stage 3: enriches ScoredCandidate with creator_id and format from entity metadata
  • Stage 4: applies DiversitySelector when diversity constraints are present
  • Stage 4: no diversity constraints = candidates pass through unchanged, constraints_satisfied: true
  • Stage 5: slices candidates to [offset..offset+limit] based on cursor
  • Stage 5: builds RetrieveResult with 1-based rank and signal snapshot
  • Stage 5: computes next_cursor when more results exist beyond the page
  • Stage 5: next_cursor is None when the page contains all remaining results
  • TidalDb::retrieve() public method wires the executor correctly
  • Criterion benchmarks implemented and passing:
    • retrieve_trending_10k_items -- target < 50ms
    • retrieve_new_10k_items -- target < 10ms
    • retrieve_hot_filtered_10k_items -- measured
    • retrieve_controversial_10k_items -- measured
  • No unsafe code
  • cargo clippy -- -D warnings passes
  • All unit tests pass

Research References

Spec References

  • docs/specs/08-query-engine.md -- Section 4 (Query planning: CandidateStrategy, plan construction), Section 5 (Execution pipeline: all 6 stages), Section 7 (Filter evaluation: bitmap intersection, short-circuit), Section 8 (Pagination: cursor decode, offset, limit), Section 11 (Performance targets: < 50ms end-to-end)
  • docs/specs/09-ranking-scoring.md -- Section 3 (CandidateStrategy variants), Section 4 (Scoring pipeline), Section 9 (Diversity enforcement as Stage 8)

Implementation Notes

  • Add [[bench]] name = "query" harness = false to tidal/Cargo.toml.
  • The RetrieveExecutor is intentionally stateless -- it borrows references to all subsystems. This means it is cheap to construct (no allocation) and the caller (TidalDb) can create a new executor for every query. No caching or connection pooling is needed.
  • scan_entity_ids() is a new method needed on StorageEngine (or on TidalDb directly) that returns all entity IDs of a given kind. If this method does not exist yet, it should be added as part of this task. It reads the entity keyspace prefix and collects IDs. At 10K items this is ~1ms.
  • top_entities_by_signal() is a new method needed on SignalLedger that returns the top-K entity IDs by signal value. For M2, this iterates over all entities in the hot tier and returns the top-K by decay score. At 10K entities this is ~2ms. A sorted index for signal values is an M6 optimization.
  • The get_metadata() method on StorageEngine (for reading creator_id and format) needs to return a structured metadata object, not raw bytes. If M1's read_item() returns raw bytes, this task should add a get_item_metadata() helper that parses the metadata into a struct with creator_id: Option<EntityId> and format: Option<String>. The exact metadata format depends on how write_item() stores metadata in M1/M2.
  • The benchmark setup function (setup_10k_db) creates a fresh database for each benchmark group run. This takes several seconds. Use criterion::BenchmarkGroup with large sample_size and measurement_time to amortize setup cost. Consider using lazy_static or OnceCell for the setup if benchmarks are too slow.
  • The ANN candidate generation in M2 uses a zero vector as the query vector. This is a placeholder -- in M3, user preference vectors will be used. For L2 distance metrics, a zero query vector is equidistant from all normalized vectors (distance = 1.0), effectively providing arbitrary candidate ordering. For cosine similarity, a zero query vector produces undefined similarity (0/0); if USearch uses cosine metric and returns an error for zero-norm queries, fall back to CandidateStrategy::Scan instead. Verify USearch's zero-vector behavior during integration. The query vector quality improves in M3.
  • ScoredCandidate.signal_snapshot dependency: the ScoredCandidate struct from m2p3 (task-03 of phase 3) MUST include a signal_snapshot: Vec<(String, f64)> field. The result assembly stage (Stage 5) reads this field directly. Verify this field exists in m2p3's ScoredCandidate before implementing Stage 5; if missing, add it as part of this task.
  • FilterEvaluator is NOT stored in TidalDb: construct it per-query in TidalDb::retrieve() by passing references to self.bitmap_indexes and self.range_indexes. See the TidalDb::retrieve() code snippet above for the correct wiring. Do not add filter_evaluator as a field on TidalDb -- it would create a self-referential struct.
  • Results.warnings accumulation: the executor should accumulate warnings into a Vec<String> and pass it into the Results struct. Metadata enrichment failures (in Stage 3) are one source. Start with an empty vec and push warnings as they occur during pipeline execution. Do not propagate degradation warnings as errors.