//! Handler for querying assertions. use axum::{ extract::{Query as AxumQuery, State}, http::HeaderMap, Json, }; use tracing::{debug, instrument}; use crate::{ dto::{ AssertionResponse, ErrorResponse, LensDto, QueryParams, QueryResponse, SourceWarningDto, }, error::{ApiError, Result}, hex as hex_utils, services::{make_source_warning, should_exclude_source, SourceStatusEnricher}, state::AppState, }; use stemedb_core::types::{ Assertion, ContributingAssertion, LifecycleStage, QueryAudit, QueryParams as AuditQueryParams, }; /// Pre-computed metadata from candidate assertions for audit logging. /// /// This avoids cloning entire assertions before lens resolution. /// We only keep the fields needed for audit: hash, source_hash, lifecycle. struct CandidateMetadata { hash: [u8; 32], source_hash: [u8; 32], lifecycle: LifecycleStage, } use stemedb_lens::{ AsyncLens, ConfidenceLens, ConsensusLens, EpochAwareLens, Lens, RecencyLens, TrustAwareAuthorityLens, VoteAwareConsensusLens, }; use stemedb_query::Query; use stemedb_storage::{AuditStore, GenericAuditStore, GenericTrustRankStore, GenericVoteStore}; /// Query assertions with optional filters and lens-based conflict resolution. /// /// This endpoint builds a query from parameters (subject, predicate, lifecycle, epoch, limit), /// executes it via the QueryEngine, optionally applies a Lens for conflict resolution, /// and returns matching assertions. Returns early with empty results if no assertions match. /// /// # Lens Resolution /// When a lens is specified, it resolves conflicts among matching assertions and returns /// only the winning assertion based on the lens strategy (Recency, Consensus, Authority, etc.) /// /// # Audit Trail /// Every query is logged to the audit trail for incident investigation. /// Include the `X-Agent-Id` header (hex-encoded, 32 bytes) to associate queries with an agent. #[utoipa::path( get, path = "/v1/query", responses( (status = 200, description = "Query successful", body = QueryResponse), (status = 400, description = "Invalid request", body = ErrorResponse), (status = 500, description = "Internal server error", body = ErrorResponse) ), tag = "query" )] #[instrument(skip(state, headers), fields( subject = ?params.subject, predicate = ?params.predicate, lifecycle = ?params.lifecycle, lens = ?params.lens ))] pub async fn query_assertions( State(state): State, headers: HeaderMap, AxumQuery(params): AxumQuery, ) -> Result> { let query_start = std::time::Instant::now(); metrics::counter!("stemedb_queries_total", "endpoint" => "query").increment(1); let query_start_timestamp = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .map(|d| d.as_secs()) .unwrap_or(0); // Extract agent_id from headers if present let agent_id = extract_agent_id(&headers); // Capture params for audit before they're moved let audit_params = AuditQueryParams { subject: params.subject.clone(), predicate: params.predicate.clone(), lifecycle: params.lifecycle.map(Into::into), epoch: params.epoch.as_ref().and_then(|h| hex_utils::decode_hash_32(h).ok()), lens: params.lens.map(|l| format!("{:?}", l)), }; // Build the query let mut builder = Query::builder(); if let Some(ref subject) = params.subject { builder = builder.subject(subject.clone()); } if let Some(ref predicate) = params.predicate { builder = builder.predicate(predicate.clone()); } if let Some(lifecycle_dto) = params.lifecycle { builder = builder.lifecycle(lifecycle_dto.into()); } if let Some(ref epoch_hex) = params.epoch { let epoch = hex_utils::decode_hash_32(epoch_hex)?; builder = builder.epoch(epoch); } builder = builder.limit(params.limit); if let Some(max_stale) = params.max_stale { builder = builder.max_stale(max_stale); } if let Some(ref visual_near) = params.visual_near { // Clamp threshold to valid range (0-64, max hamming distance for 8-byte hash) let threshold = params.visual_threshold.unwrap_or(8).min(64); builder = builder.visual_near(visual_near.clone(), threshold); } if let Some(as_of) = params.as_of { builder = builder.as_of(as_of); } if let Some(decay_halflife) = params.decay_halflife { builder = builder.decay_halflife(decay_halflife); } if let Some(source_class_decay) = params.source_class_decay { builder = builder.source_class_decay(source_class_decay); } // Validate and apply conflict score filters if let Some(min_score) = params.min_conflict_score { if !min_score.is_finite() || !(0.0..=1.0).contains(&min_score) { return Err(ApiError::InvalidRequest(format!( "min_conflict_score must be between 0.0 and 1.0, got: {}", min_score ))); } builder = builder.min_conflict_score(min_score); } if let Some(max_score) = params.max_conflict_score { if !max_score.is_finite() || !(0.0..=1.0).contains(&max_score) { return Err(ApiError::InvalidRequest(format!( "max_conflict_score must be between 0.0 and 1.0, got: {}", max_score ))); } builder = builder.max_conflict_score(max_score); } let query = builder.build(); // Execute the query let query_engine = state.query_engine(); let result = query_engine.execute(&query).await?; // Return early if no assertions found (before lens application) if result.assertions.is_empty() { // Log empty query audit log_query_audit(&state, agent_id, query_start_timestamp, &audit_params, None, 0.0, vec![]) .await; return Ok(Json(QueryResponse { assertions: vec![], total_count: 0, has_more: result.has_more, conflict_score: None, resolution_confidence: None, changes_since: None, })); } // Pre-compute candidate metadata for audit BEFORE lens consumes assertions. // This avoids cloning all assertions - we only keep what's needed for audit. let candidate_metadata: Vec = result .assertions .iter() .filter_map(|a| { stemedb_core::serde::serialize(a).ok().map(|s| CandidateMetadata { hash: *blake3::hash(&s).as_bytes(), source_hash: a.source_hash, lifecycle: a.lifecycle, }) }) .collect(); // Apply lens if specified let (assertions, resolution_confidence, conflict_score) = if let Some(lens_dto) = params.lens { let (winner, confidence, conflict) = apply_lens_with_confidence(lens_dto, result.assertions, state.store.clone()).await?; (winner, Some(confidence), Some(conflict)) } else { // No lens = return all candidates with full confidence, no conflict score (result.assertions, None, None) }; // Sort subjectless queries by timestamp descending for consistent ordering. // Only applies when both subject and lens are None (broad scan without resolution). let assertions = if params.subject.is_none() && params.lens.is_none() { let mut sorted = assertions; sorted.sort_unstable_by(|a, b| b.timestamp.cmp(&a.timestamp)); sorted } else { assertions }; // Compute contributing assertions for audit using pre-computed metadata let contributing = build_contributing_from_metadata(&candidate_metadata, &assertions)?; // Compute result hash (hash of the winning assertion, if any) let result_hash = if let Some(winner) = assertions.first() { let serialized = stemedb_core::serde::serialize(winner) .map_err(|e| ApiError::Serialization(format!("Failed to serialize winner: {}", e)))?; Some(*blake3::hash(&serialized).as_bytes()) } else { None }; // Log query audit (fire and forget - don't fail the query if audit fails) log_query_audit( &state, agent_id, query_start_timestamp, &audit_params, result_hash, resolution_confidence.unwrap_or(1.0), contributing, ) .await; // --- Source Status Enrichment (P3.2 Cascade Flagging) --- // Collect unique source hashes for batch lookup let unique_source_hashes: Vec<[u8; 32]> = assertions .iter() .map(|a| a.source_hash) .collect::>() .into_iter() .collect(); // Batch lookup source statuses from registry let enricher = SourceStatusEnricher::new(state.store.clone()); let source_statuses = enricher.batch_lookup(&unique_source_hashes).await?; // Filter if exclude_quarantined_sources is true let exclude_quarantined = params.exclude_quarantined_sources; let assertions: Vec<_> = if exclude_quarantined { assertions .into_iter() .filter(|a| !should_exclude_source(source_statuses.get(&a.source_hash))) .collect() } else { assertions }; // Convert to response DTOs with warnings attached let assertion_responses: Vec = assertions .into_iter() .map(|a| { let source_hash = a.source_hash; let warning = source_statuses.get(&source_hash).and_then(make_source_warning); assertion_to_dto_with_warning(a, warning) }) .collect::>>()?; let total_count = assertion_responses.len(); metrics::histogram!("stemedb_query_latency_seconds", "endpoint" => "query") .record(query_start.elapsed().as_secs_f64()); Ok(Json(QueryResponse { assertions: assertion_responses, total_count, has_more: result.has_more, conflict_score, resolution_confidence, changes_since: None, })) } /// Extract agent_id from X-Agent-Id header if present and valid. fn extract_agent_id(headers: &HeaderMap) -> Option<[u8; 32]> { headers .get("X-Agent-Id") .and_then(|v| v.to_str().ok()) .and_then(|s| hex_utils::decode_hash_32(s).ok()) } /// Log a query audit record. Errors are logged but don't fail the query. async fn log_query_audit( state: &AppState, agent_id: Option<[u8; 32]>, timestamp: u64, params: &AuditQueryParams, result_hash: Option<[u8; 32]>, result_confidence: f32, contributing_assertions: Vec, ) { // Generate query_id from content hash of params + timestamp let query_id = generate_query_id(params, timestamp); let audit = QueryAudit { query_id, agent_id, timestamp, params: params.clone(), result_hash, result_confidence, contributing_assertions, }; let audit_store = GenericAuditStore::new(state.store.clone()); if let Err(e) = audit_store.put_audit(&audit).await { // Log but don't fail the query debug!(error = %e, query_id = %hex::encode(query_id), "Failed to log query audit"); } } /// Generate a deterministic query_id from params and timestamp. /// /// The query_id is a BLAKE3 hash of the canonical string representation /// of the query parameters plus timestamp. This ensures: /// - Deterministic: same params + timestamp = same ID /// - Collision-resistant: BLAKE3 provides cryptographic security fn generate_query_id(params: &AuditQueryParams, timestamp: u64) -> [u8; 32] { // Build a canonical string representation for hashing let mut content = String::new(); if let Some(ref s) = params.subject { content.push_str(s); } content.push(':'); if let Some(ref p) = params.predicate { content.push_str(p); } content.push(':'); if let Some(l) = params.lifecycle { content.push_str(&format!("{:?}", l)); } content.push(':'); if let Some(ref e) = params.epoch { content.push_str(&hex::encode(e)); } content.push(':'); if let Some(ref l) = params.lens { content.push_str(l); } content.push(':'); content.push_str(×tamp.to_string()); // BLAKE3 is collision-resistant; no additional entropy needed *blake3::hash(content.as_bytes()).as_bytes() } /// Build ContributingAssertion records from pre-computed metadata and winners. /// /// This function uses pre-computed candidate hashes, avoiding the need to /// clone all candidate assertions before lens resolution. O(n + w) total. fn build_contributing_from_metadata( candidates: &[CandidateMetadata], winners: &[Assertion], ) -> Result> { use std::collections::HashSet; // Pre-compute winner hashes once: O(w) where w = number of winners let winner_hashes: HashSet<[u8; 32]> = winners .iter() .filter_map(|w| { stemedb_core::serde::serialize(w).ok().map(|s| *blake3::hash(&s).as_bytes()) }) .collect(); let contributing: Vec = candidates .iter() .map(|c| ContributingAssertion { assertion_hash: c.hash, weight: if winner_hashes.contains(&c.hash) { 1.0 } else { 0.0 }, source_hash: c.source_hash, lifecycle: c.lifecycle, }) .collect(); Ok(contributing) } /// Apply the specified lens to resolve conflicts and return confidence and conflict score. async fn apply_lens_with_confidence( lens_dto: LensDto, assertions: Vec, store: std::sync::Arc, ) -> Result<(Vec, f32, f32)> { let assertion_count = assertions.len(); let resolution = match lens_dto { LensDto::Recency => { let lens = RecencyLens; lens.resolve(&assertions) } LensDto::Consensus => { let lens = ConsensusLens; lens.resolve(&assertions) } LensDto::Confidence => { let lens = ConfidenceLens; lens.resolve(&assertions) } LensDto::VoteAwareConsensus => { let vote_store = std::sync::Arc::new(GenericVoteStore::new(store.clone())); let lens = VoteAwareConsensusLens::new(vote_store); lens.resolve_async(&assertions).await } // Authority and TrustAwareAuthority both route to TrustAwareAuthorityLens. // Authority is the user-friendly name; TrustAwareAuthority is the explicit name. LensDto::Authority | LensDto::TrustAwareAuthority => { let trust_store = std::sync::Arc::new(GenericTrustRankStore::new(store)); let lens = TrustAwareAuthorityLens::new(trust_store); lens.resolve_async(&assertions).await } LensDto::EpochAware => { // EpochAwareLens filters assertions from superseded epochs before // delegating to RecencyLens for final resolution. let lens = EpochAwareLens::with_recency(store); lens.resolve_async(&assertions).await } LensDto::LayeredConsensus => { // LayeredConsensus returns a different response type with per-tier results. // Use the dedicated /v1/layered endpoint for this lens. return Err(ApiError::InvalidRequest( "LayeredConsensus lens requires the /v1/layered endpoint for per-tier results. \ Use GET /v1/layered?subject=X&predicate=Y instead." .to_string(), )); } LensDto::Constraints => { // Constraints lens returns a different response type with categorized constraints. // Use the dedicated /v1/constraints endpoint for this lens. return Err(ApiError::InvalidRequest( "Constraints lens requires the /v1/constraints endpoint for categorized results. \ Use GET /v1/constraints?subject=X instead." .to_string(), )); } }; let confidence = resolution.resolution_confidence; let conflict = resolution.conflict_score; let winner = resolution.winner.ok_or_else(|| { ApiError::InvalidRequest(format!( "Lens {:?} failed to resolve a winner among {} assertions", lens_dto, assertion_count )) })?; Ok((vec![winner], confidence, conflict)) } /// Convert an internal Assertion to an AssertionResponse DTO with optional warning. /// /// Returns an error if serialization fails (should never happen for assertions /// that came from the database, as they were already serialized once). pub(crate) fn assertion_to_dto_with_warning( assertion: Assertion, source_warning: Option, ) -> Result { // Compute hash - propagate errors instead of silent fallback let serialized = stemedb_core::serde::serialize(&assertion) .map_err(|e| ApiError::Serialization(format!("Failed to serialize assertion: {}", e)))?; let hash = blake3::hash(&serialized); Ok(AssertionResponse { hash: hash.to_hex().to_string(), subject: assertion.subject, predicate: assertion.predicate, object: assertion.object.into(), parent_hash: assertion.parent_hash.map(hex::encode), source_hash: hex::encode(assertion.source_hash), source_class: assertion.source_class.into(), visual_hash: assertion.visual_hash.map(hex::encode), epoch: assertion.epoch.map(hex::encode), lifecycle: assertion.lifecycle.into(), signatures: assertion.signatures.into_iter().map(Into::into).collect(), confidence: assertion.confidence, timestamp: assertion.timestamp, vector: assertion.vector, source_metadata: assertion.source_metadata.and_then(|bytes| String::from_utf8(bytes).ok()), source_warning, }) } /// Convert an internal Assertion to an AssertionResponse DTO (no warning). /// /// Returns an error if serialization fails (should never happen for assertions /// that came from the database, as they were already serialized once). #[allow(dead_code)] pub(crate) fn assertion_to_dto(assertion: Assertion) -> Result { assertion_to_dto_with_warning(assertion, None) }