stemedb/applications/aphoria/src/scan/scanner.rs
jordan 422e2d4416 feat(aphoria): wire claims through StemeDB — Gap Closure Phase 1
Claims now flow through StemeDB's append-only knowledge graph instead of
mutable TOML files. This resolves all 6 critical claim-bypass code paths:

- Bridge: lossless AuthoredClaim ↔ Assertion round-trip (comparison, status, lifecycle mapping)
- LocalEpisteme: ingest_authored_claim() and fetch_authored_claims() with AUTHORED_CLAIM predicate index
- EpistemeClaimStore: ClaimStore trait backed by StemeDB (append-only delete via deprecation)
- CLI handlers: all claim commands read/write through StemeDB
- Scanner: loads claims from StemeDB with auto-migration fallback to TOML
- Export: new `aphoria claims export` serializes StemeDB claims to TOML/JSON

Also cleans up dead code (EpistemeConfig.url), renames ingest_claims→ingest_observations,
fixes ClaimFilter.authority_tier type, adds Draft variant to ClaimStatus, and fixes
pre-existing clippy warnings (too_many_arguments, filter_next→rfind).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-12 02:02:51 -07:00

650 lines
24 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! Core scanner logic for conflict detection and observation recording.
use std::collections::HashSet;
use std::path::Path;
use std::time::Instant;
use tracing::{info, instrument};
use crate::bridge::{self, observation_to_assertion};
use crate::claims_file::ClaimsFile;
use crate::config::{AphoriaConfig, SyncMode};
use crate::episteme::{current_timestamp_millis, ConceptIndex, EphemeralDetector, LocalEpisteme};
use crate::error::AphoriaError;
use crate::extractors::INLINE_MARKER_PREDICATE;
use crate::hosted::HostedClient;
use crate::pending_markers::{PendingMarker, PendingMarkersFile};
use crate::policy::PolicyManager;
use crate::types::{
ConflictResult, DriftResult, FileSource, Observation, ScanArgs, ScanMode, ScanResult,
ScanTiming,
};
use crate::verify;
use crate::walker::{walk_project, walk_staged_files};
use super::walker::extract_claims_from_files;
/// Result of conflict checking including observation count and drift detection.
pub(super) struct ConflictCheckResult {
pub conflicts: Vec<ConflictResult>,
pub drifts: Vec<DriftResult>,
pub observations_recorded: usize,
/// Authored claims fetched from StemeDB during persistent mode.
/// Empty in ephemeral mode (caller should fall back to TOML).
pub authored_claims: Vec<crate::types::AuthoredClaim>,
}
/// Run a scan on the specified project.
///
/// This is the main entry point for scanning a codebase. It:
/// 1. Walks the project directory
/// 2. Extracts claims from config and code
/// 3. Checks for conflicts against authoritative sources
/// 4. (Optional) Persists claims to Episteme storage if `mode == Persistent`
/// 5. (Optional) Records observations for claims with no conflicts if `sync == true`
/// 6. Returns a formatted report
///
/// # Scan Modes
///
/// - **Ephemeral** (default): Fast in-memory scan. No disk I/O for Episteme.
/// Uses `EphemeralDetector` for conflict detection. Does not support
/// diff/baseline features or observation write-back.
///
/// - **Persistent**: Full scan with Episteme storage. Enables diff, baseline,
/// alias creation, and observation write-back (when `--sync` is enabled).
#[instrument(skip(config), fields(path = %args.path.display(), format = %args.format, mode = ?args.mode, sync = args.sync, file_source = ?args.file_source, benchmark = args.benchmark))]
pub async fn run_scan(args: ScanArgs, config: &AphoriaConfig) -> Result<ScanResult, AphoriaError> {
info!("Starting scan");
let total_start = Instant::now();
let project_root = args.path.canonicalize().unwrap_or_else(|_| args.path.clone());
// 1. Walk the project to find files (or just staged files)
let walk_start = Instant::now();
let files = match args.file_source {
FileSource::All => walk_project(&project_root, config)?,
FileSource::Staged => walk_staged_files(&project_root, config)?,
};
let walk_ms = walk_start.elapsed().as_millis() as u64;
info!(files_found = files.len(), file_source = ?args.file_source, walk_ms, "Project walk complete");
// 2. Extract claims from files (LLM extraction only in persistent mode)
let extraction_start = Instant::now();
let all_claims = extract_claims_from_files(&files, config, args.mode, &project_root).await?;
let extraction_ms = extraction_start.elapsed().as_millis() as u64;
info!(claims_extracted = all_claims.len(), extraction_ms, "Extraction complete");
// 2.5. Sync inline markers to pending_markers.toml if enabled
if config.extractors.inline_markers.enabled && config.extractors.inline_markers.sync_to_pending
{
let marker_count = sync_pending_markers(&all_claims, &project_root)?;
if marker_count > 0 {
info!(markers_synced = marker_count, "Pending markers synced");
}
}
// 3. Check for conflicts - mode determines which path
let conflict_start = Instant::now();
let result = check_conflicts(&args, &all_claims, &project_root, config).await?;
let conflict_ms = conflict_start.elapsed().as_millis() as u64;
let total_ms = total_start.elapsed().as_millis() as u64;
// 4. Verify authored claims against observations
// Use claims from StemeDB (persistent mode) or fall back to TOML (ephemeral mode)
let verify_report = {
let authored_claims = if !result.authored_claims.is_empty() {
result.authored_claims
} else {
load_authored_claims_for_scan(&project_root, config).await?
};
if authored_claims.is_empty() {
None
} else {
let active_claims: Vec<_> = authored_claims
.into_iter()
.filter(|c| c.status == crate::types::ClaimStatus::Active)
.collect();
if active_claims.is_empty() {
None
} else {
info!(claims = active_claims.len(), "Verifying authored claims");
Some(verify::verify_claims(&active_claims, &all_claims))
}
}
};
// 5. Calculate lines of code if benchmark mode
let lines_of_code = if args.benchmark { Some(count_lines_of_code(&files)) } else { None };
// 6. Build timing info if benchmark mode
let timing = if args.benchmark {
Some(ScanTiming { walk_ms, extraction_ms, conflict_ms, total_ms, lines_of_code })
} else {
None
};
// 7. Populate claims if requested (clone and sort by file, then line)
let claims = if args.show_claims {
let mut sorted = all_claims.to_vec();
sorted.sort_by(|a, b| a.file.cmp(&b.file).then(a.line.cmp(&b.line)));
Some(sorted)
} else {
None
};
// 8. Build result
let project_name =
project_root.file_name().and_then(|s| s.to_str()).unwrap_or("unknown").to_string();
Ok(ScanResult {
project: project_name,
scan_id: generate_scan_id(),
files_scanned: files.len(),
claims_extracted: all_claims.len(),
conflicts: result.conflicts,
drifts: result.drifts,
format: args.format.clone(),
debug: args.debug,
strict: args.strict,
observations_recorded: result.observations_recorded,
timing,
claims,
observations: all_claims.to_vec(), // Always populate for verification/coverage
deprecated_usages: vec![], // TODO: Populate from lifecycle store during scan
verify: verify_report,
})
}
/// Count lines of code in the scanned files.
///
/// Reads each file and counts non-empty lines. Used for benchmark reporting.
fn count_lines_of_code(files: &[crate::walker::WalkedFile]) -> usize {
files
.iter()
.map(|file| {
std::fs::read_to_string(&file.path)
.map(|content| content.lines().filter(|line| !line.trim().is_empty()).count())
.unwrap_or(0)
})
.sum()
}
/// Check claims for conflicts using either ephemeral or persistent mode.
async fn check_conflicts(
args: &ScanArgs,
all_claims: &[Observation],
project_root: &Path,
config: &AphoriaConfig,
) -> Result<ConflictCheckResult, AphoriaError> {
match args.mode {
ScanMode::Ephemeral => {
let conflicts =
check_conflicts_ephemeral(all_claims, project_root, config, args.debug).await?;
// Ephemeral mode never records observations or detects drift (intentionally stateless)
Ok(ConflictCheckResult {
conflicts,
drifts: vec![],
observations_recorded: 0,
authored_claims: vec![],
})
}
ScanMode::Persistent => {
check_conflicts_persistent(all_claims, project_root, config, args.sync).await
}
}
}
/// Fast in-memory conflict detection (no persistence).
async fn check_conflicts_ephemeral(
all_claims: &[Observation],
project_root: &Path,
config: &AphoriaConfig,
debug: bool,
) -> Result<Vec<ConflictResult>, AphoriaError> {
info!("Using ephemeral detector (no persistence)");
let signing_key = bridge::load_or_generate_key(project_root)?;
// Load policies if any
let policy_manager = PolicyManager::new(&config.corpus.cache_dir);
let policies = policy_manager.load_policies(&config.policies)?;
// Create detector with policies
let mut detector = EphemeralDetector::new(&signing_key, &config.corpus).await;
detector.ingest_policies(&policies);
if debug {
Ok(detector.check_conflicts_debug(all_claims, config))
} else {
Ok(detector.check_conflicts(all_claims, config))
}
}
/// Full conflict detection with Episteme persistence.
///
/// When `sync` is enabled, claims with no authority conflict are written back
/// as Tier 4 (Community) observations, creating "project memory".
///
/// Drift detection runs AFTER authority conflict detection: claims that have
/// no authority conflict are checked against prior observations to detect
/// value changes.
///
/// # Hosted Mode
///
/// When `[hosted]` is configured with a URL, sync is automatically enabled
/// and observations are pushed to the remote server. The `sync_mode` setting
/// controls whether local storage is also used:
///
/// - `remote-only`: Only push to remote (no local storage)
/// - `local-and-remote`: Store locally AND push to remote
async fn check_conflicts_persistent(
all_claims: &[Observation],
project_root: &Path,
config: &AphoriaConfig,
sync: bool,
) -> Result<ConflictCheckResult, AphoriaError> {
// Auto-enable sync when hosted mode is configured
let effective_sync = sync || config.hosted.is_enabled();
let hosted_enabled = config.hosted.is_enabled();
info!(
sync = effective_sync,
hosted = hosted_enabled,
"Using persistent mode (with Episteme storage)"
);
// Open local Episteme and ingest claims
let mut episteme = LocalEpisteme::open(config, project_root).await?;
if !all_claims.is_empty() {
episteme.ingest_observations(all_claims).await?;
}
// Build authoritative corpus from bundled sources AND imported Trust Packs
// If config.corpus.use_community is enabled, this will also include community patterns
let mut corpus = episteme.build_corpus_with_stores(&config.corpus).await?;
// Include assertions imported from Trust Packs
let imported_assertions = episteme.fetch_authoritative_assertions().await?;
if !imported_assertions.is_empty() {
info!(
count = imported_assertions.len(),
"Including imported Trust Pack assertions in conflict detection"
);
corpus.extend(imported_assertions);
}
// Merge predicate aliases from config AND from persisted/imported Trust Packs
// This ensures both config-defined and pack-imported aliases are used for
// semantic predicate matching (Phase 6.5.3)
let mut all_predicate_aliases = config.predicate_aliases.to_alias_sets();
all_predicate_aliases.extend(episteme.predicate_aliases().iter().cloned());
if !all_predicate_aliases.is_empty() {
info!(
config_count = config.predicate_aliases.to_alias_sets().len(),
stored_count = episteme.predicate_aliases().len(),
total_count = all_predicate_aliases.len(),
"Using predicate aliases for index normalization"
);
}
// Build index WITH predicate alias normalization so both authority and code
// predicates use canonical forms (e.g., "required" normalizes to "enabled")
let index = ConceptIndex::build_with_aliases(&corpus, &all_predicate_aliases);
let conflicts = episteme.check_conflicts(all_claims, config, &index).await?;
// Find claims that DO have an authority conflict
let conflicting_paths: HashSet<_> =
conflicts.iter().map(|c| c.claim.concept_path.as_str()).collect();
// Non-conflicting claims are candidates for drift detection and observation write-back
let non_conflicting_claims: Vec<_> = all_claims
.iter()
.filter(|c| !conflicting_paths.contains(c.concept_path.as_str()))
.cloned()
.collect();
// Drift detection: check non-conflicting claims against prior observations
let drifts = episteme.check_drift(&non_conflicting_claims).await?;
// Find claims that drifted (we don't want to overwrite them with new observations)
let drifting_paths: HashSet<_> = drifts.iter().map(|d| d.claim.concept_path.as_str()).collect();
// Write observations for novel claims (no conflict AND no drift) if sync enabled
let observations_recorded = if effective_sync {
// Novel claims are those with NO authority conflict AND NO drift
let novel_claims: Vec<_> = non_conflicting_claims
.iter()
.filter(|c| !drifting_paths.contains(c.concept_path.as_str()))
.cloned()
.collect();
let mut local_count = 0;
let mut remote_count = 0;
// Local persistence (unless hosted mode is remote-only without fallback)
let should_persist_locally =
!hosted_enabled || config.hosted.sync_mode == SyncMode::LocalAndRemote;
if should_persist_locally && !novel_claims.is_empty() {
local_count = episteme.ingest_observations(&novel_claims).await?;
info!(count = local_count, "Recorded observations locally");
}
// Remote push (if hosted mode is enabled)
if hosted_enabled && !novel_claims.is_empty() {
// Get project name for fallback
let project_name =
project_root.file_name().and_then(|s| s.to_str()).unwrap_or("unknown");
// Load signing key for hosted client
let signing_key = bridge::load_or_generate_key(project_root)?;
// Create hosted client
if let Some(client) =
HostedClient::new(&config.hosted, &config.community, &signing_key, project_name)?
{
// Convert claims to observations
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let observations: Vec<_> = novel_claims
.iter()
.map(|c| observation_to_assertion(c, &signing_key, timestamp, None))
.collect();
remote_count = client.push_observations(observations)?;
info!(count = remote_count, "Pushed observations to hosted server");
}
}
// Aggregate observations into pattern records (Phase 4 - community corpus)
if config.corpus.aggregation_enabled && should_persist_locally && !novel_claims.is_empty() {
let project_hash = compute_project_hash(project_root);
if let Err(e) =
aggregate_observations_to_patterns(&novel_claims, &episteme, &project_hash).await
{
// Log error but don't fail the scan
tracing::warn!(error = %e, "Failed to aggregate observations to patterns");
}
}
// Return the higher count (they should be the same for LocalAndRemote)
local_count.max(remote_count)
} else {
0
};
// Fetch authored claims from StemeDB before shutdown (avoids opening DB again later)
let authored_claims = match episteme.fetch_authored_claims().await {
Ok(claims) => claims,
Err(e) => {
info!(error = %e, "Could not fetch authored claims from StemeDB");
vec![]
}
};
// Shut down Episteme
episteme.shutdown().await;
Ok(ConflictCheckResult { conflicts, drifts, observations_recorded, authored_claims })
}
/// Generate a unique scan ID.
pub fn generate_scan_id() -> String {
format!("scan-{}", current_timestamp_millis())
}
/// Extract claims from a project without running conflict detection.
///
/// This is used for community preview to show what observations would be shared.
/// Note: LLM extraction is not used for preview (uses ScanMode::Ephemeral).
#[instrument(skip(config), fields(path = %args.path.display(), file_source = ?args.file_source))]
pub async fn extract_claims(
args: &ScanArgs,
config: &AphoriaConfig,
) -> Result<Vec<Observation>, AphoriaError> {
info!("Extracting claims for preview");
let project_root = args.path.canonicalize().unwrap_or_else(|_| args.path.clone());
// Walk the project to find files
let files = match args.file_source {
FileSource::All => walk_project(&project_root, config)?,
FileSource::Staged => walk_staged_files(&project_root, config)?,
};
info!(files_found = files.len(), "Project walk complete");
// Extract claims from files (ephemeral mode - no LLM)
let claims =
extract_claims_from_files(&files, config, ScanMode::Ephemeral, &project_root).await?;
info!(claims_extracted = claims.len(), "Extraction complete");
Ok(claims)
}
/// Sync inline marker observations to `.aphoria/pending_markers.toml`.
///
/// Filters observations by predicate `inline_marker` and adds them to the
/// pending markers file, deduplicating by marker ID.
fn sync_pending_markers(
observations: &[Observation],
project_root: &Path,
) -> Result<usize, AphoriaError> {
// Filter for inline marker observations
let marker_observations: Vec<_> =
observations.iter().filter(|o| o.predicate == INLINE_MARKER_PREDICATE).collect();
if marker_observations.is_empty() {
return Ok(0);
}
// Load or create pending markers file
let markers_path = PendingMarkersFile::default_path(project_root);
let mut markers_file = PendingMarkersFile::load(&markers_path)?;
let mut added_count = 0;
for observation in marker_observations {
// Parse the JSON value to extract marker fields
let value_str = match &observation.value {
stemedb_core::types::ObjectValue::Text(s) => s,
_ => continue, // Skip non-text values
};
let marker_data: serde_json::Value = match serde_json::from_str(value_str) {
Ok(d) => d,
Err(e) => {
tracing::warn!(
file = %observation.file,
line = observation.line,
error = %e,
json = %value_str,
"Failed to parse marker JSON"
);
continue;
}
};
let invariant = marker_data["invariant"].as_str().map(|s| s.to_string());
let consequence = marker_data["consequence"].as_str().map(|s| s.to_string());
let category = marker_data["category"].as_str().map(|s| s.to_string());
if let Some(invariant) = invariant {
// Check if marker already exists at this location
if markers_file.find_by_location(&observation.file, observation.line).is_some() {
continue; // Skip duplicate
}
let marker = PendingMarker::new(
observation.file.clone(),
observation.line,
invariant,
consequence,
category,
);
markers_file.add(marker);
added_count += 1;
}
}
if added_count > 0 {
markers_file.save(&markers_path)?;
// User-facing output in CLI context
#[allow(clippy::print_stdout)]
{
println!(
" Detected {} new claim marker(s). Run 'aphoria claims list-markers' to review.",
added_count
);
}
}
Ok(added_count)
}
/// Aggregate observations into community pattern records.
///
/// For each observation, either:
/// - Increment existing pattern counts (project_count, observation_count)
/// - Create new pattern aggregate if not seen before
///
/// Patterns are stored as StemeDB assertions with predicate "pattern_aggregate".
async fn aggregate_observations_to_patterns(
observations: &[Observation],
episteme: &LocalEpisteme,
project_hash: &str,
) -> Result<(), AphoriaError> {
use crate::community::{CommunityObjectValue, PatternAggregate, StemeDBPatternStore};
use std::collections::HashMap;
info!(
observations = observations.len(),
project_hash, "Aggregating observations into community patterns"
);
// Get stores
let kv_store = episteme.get_kv_store();
let predicate_index = episteme.get_predicate_index();
let pattern_store = StemeDBPatternStore::new(kv_store, predicate_index);
// Group observations by (subject, predicate, value)
let mut patterns: HashMap<(String, String, CommunityObjectValue), Vec<&Observation>> =
HashMap::new();
for obs in observations {
// Wildcard the project path for community sharing
let wildcarded_subject = crate::community::wildcard_project_path(&obs.concept_path);
let key =
(wildcarded_subject, obs.predicate.clone(), CommunityObjectValue::from(&obs.value));
patterns.entry(key).or_default().push(obs);
}
info!(unique_patterns = patterns.len(), "Grouped observations by pattern");
// Get current timestamp
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_err(|e| AphoriaError::Storage(format!("Failed to get timestamp: {}", e)))?
.as_secs();
// For each unique pattern, update or create aggregate
let mut created = 0;
let mut updated = 0;
for ((subject, predicate, value), obs_group) in patterns {
// Check if pattern exists
let existing = pattern_store.get_pattern_by_spv(&subject, &predicate, &value).await?;
match existing {
Some(mut agg) => {
// Increment counts
agg.observation_count += obs_group.len() as u64;
agg.last_seen = timestamp;
// Check if this is a new project
if !pattern_store.has_project(&agg, project_hash).await? {
agg.project_count += 1;
}
// Update in StemeDB
pattern_store.update_pattern(&agg).await?;
updated += 1;
}
None => {
// Create new pattern aggregate
let agg = PatternAggregate::new(subject, predicate, value, timestamp);
// Use PatternAggregator to add it
let aggregator = crate::community::PatternAggregator::new(
episteme.get_kv_store(),
episteme.get_predicate_index(),
);
aggregator.add_pattern(&agg).await?;
created += 1;
}
}
}
info!(
created,
updated,
total = created + updated,
"Aggregated observations into community patterns"
);
Ok(())
}
/// Load authored claims for scan verification.
///
/// Attempts to load claims from StemeDB first. If StemeDB has no authored claims
/// but a `claims.toml` file exists, falls back to TOML (migration path).
async fn load_authored_claims_for_scan(
project_root: &Path,
config: &AphoriaConfig,
) -> Result<Vec<crate::types::AuthoredClaim>, AphoriaError> {
// Only try StemeDB if the database directory already exists.
// This avoids creating storage in ephemeral mode and avoids lock
// contention when persistent mode already holds the DB open.
let db_dir = project_root.join(".aphoria/db");
if db_dir.exists() {
match LocalEpisteme::open(config, project_root).await {
Ok(mut episteme) => {
let stemedb_claims = episteme.fetch_authored_claims().await?;
episteme.shutdown().await;
if !stemedb_claims.is_empty() {
return Ok(stemedb_claims);
}
}
Err(e) => {
info!(error = %e, "Could not open StemeDB for claim loading, falling back to TOML");
}
}
}
// Fallback: load from claims.toml if it exists (migration path)
let claims_path = ClaimsFile::default_path(project_root);
let claims_file = ClaimsFile::load(&claims_path)?;
if !claims_file.is_empty() {
info!(claims = claims_file.len(), "Loaded authored claims from claims.toml");
}
Ok(claims_file.claims)
}
/// Compute stable hash of project identity for deduplication.
///
/// Uses project root path to create a unique identifier that
/// remains stable across scans of the same project.
fn compute_project_hash(project_root: &Path) -> String {
let mut hasher = blake3::Hasher::new();
hasher.update(project_root.to_string_lossy().as_bytes());
hex::encode(hasher.finalize().as_bytes())
}