stemedb/crates/stemedb-sim/src/lib.rs
jordan 1ce4004807 feat: Complete Phase 2 (The Cortex) - query, lens, and API layers
This commit adds the read path (Cortex) to complement the write path (Spine):

## Crates
- stemedb-api: HTTP API with axum + utoipa OpenAPI
  - /v1/assert, /v1/query, /v1/epoch, /v1/skeptic, /v1/trace, /v1/audit
  - Metered endpoints with quota enforcement
  - Ed25519 signature verification
- stemedb-lens: Truth resolution lenses
  - RecencyLens, ConsensusLens, ConfidenceLens
  - VoteAwareConsensusLens (Ballot Box pattern)
  - TrustAwareAuthorityLens (The Hive pattern)
  - SkepticLens (conflict analysis)
  - EpochAwareLens (paradigm-safe queries)
- stemedb-query: Query engine with materialized views

## Storage Extensions
- VoteStore: Vote aggregation with cached counts
- TrustRankStore: Agent reputation with decay
- AuditStore: Query audit trail
- IndexStore: SP/P/S index structures
- SupersessionStore: Epoch supersession chains

## SDKs
- sdk/go/steme: Go HTTP client with Ed25519 signing
- sdk/go/adk: ADK-Go tools for AI agents

## Documentation
- Updated CLAUDE.md, architecture.md, roadmap.md
- New ai-lookup entries for all services
- Use case docs for consumer health intelligence
- Arena roadmap for simulation advancement

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-01 13:22:44 -07:00

1526 lines
53 KiB
Rust

//! StemeDB Simulation Library
//!
//! This module provides the core simulation logic for validating the StemeDB
//! "Spine" (Durability + Schema + Ingestion) and "Cortex" (Query + Lenses)
//! under agent-driven stress tests.
//!
//! # Design Philosophy
//!
//! Following "Philosophy of Software Design" principles:
//! - **Deep Module**: Simple `run_simulation()` interface hides all complexity
//! - **Define Errors Out of Existence**: Failures are collected, not panicked
//! - **Strategic Programming**: Built for testability and extension
//!
//! # Arena Phases
//!
//! - **Arena 0**: Spine validation (WAL + Ingestor + KV Store)
//! - **Arena 1**: Query path validation (QueryEngine + Lenses + Lifecycle + Audits)
//! - **Arena 2**: Voting & Consensus (VoteStore + VoteAwareConsensusLens)
//!
//! # Example
//!
//! ```ignore
//! use stemedb_sim::{run_simulation, SimulationConfig};
//!
//! let config = SimulationConfig::default();
//! let result = run_simulation(config).await?;
//!
//! assert_eq!(result.assertions_verified, result.assertions_written);
//! assert!(result.errors.is_empty());
//! ```
use ed25519_dalek::{Signature, Signer, SigningKey, Verifier, VerifyingKey};
use rand::rngs::OsRng;
use std::sync::Arc;
use std::time::{Duration, Instant};
use stemedb_core::serde::serialize;
use stemedb_core::types::{
Assertion, ContributingAssertion, Hash, LifecycleStage, ObjectValue, QueryAudit, QueryId,
QueryParams, SignatureEntry, SourceClass, Vote,
};
use stemedb_ingest::{serialize_assertion, serialize_vote, Ingestor};
use stemedb_lens::{AsyncLens, Lens, RecencyLens, VoteAwareConsensusLens};
use stemedb_query::{Query, QueryEngine};
use stemedb_storage::{AuditStore, GenericAuditStore, GenericVoteStore, KVStore, SledStore};
use stemedb_wal::Journal;
use thiserror::Error;
use tokio::sync::Mutex;
use tracing::{debug, info, warn};
// ============================================================================
// Public Types
// ============================================================================
/// The outcome of a simulation run.
///
/// This struct captures both success metrics and any errors encountered,
/// allowing callers to programmatically verify simulation outcomes.
#[derive(Debug, Clone)]
pub struct SimulationResult {
/// Number of assertions successfully written to WAL.
pub assertions_written: u64,
/// Number of assertions successfully verified from KV store.
pub assertions_verified: u64,
/// Number of queries executed via QueryEngine.
pub queries_executed: u64,
/// Number of votes written to WAL.
pub votes_written: u64,
/// Whether the recency lens test passed.
pub recency_test_passed: bool,
/// Whether the lifecycle filtering test passed.
pub lifecycle_test_passed: bool,
/// Whether the query audit verification passed.
pub audit_test_passed: bool,
/// Whether the vote-aware consensus test passed (Arena 2.2).
pub vote_consensus_test_passed: bool,
/// Whether the troll resistance test passed (Arena 2.3).
pub troll_resistance_test_passed: bool,
/// Errors encountered during the simulation (non-fatal).
/// An empty vector indicates complete success.
pub errors: Vec<SimulationError>,
/// Number of agents that participated.
pub agent_count: usize,
/// Number of ticks executed.
pub tick_count: usize,
}
impl SimulationResult {
/// Returns true if the simulation completed without errors.
///
/// Note: We don't check assertions_written == assertions_verified because
/// Arena 2 tests write additional test assertions that are verified through
/// the VoteAwareConsensusLens rather than the main verification loop.
pub fn is_success(&self) -> bool {
self.errors.is_empty()
&& self.recency_test_passed
&& self.lifecycle_test_passed
&& self.audit_test_passed
&& self.vote_consensus_test_passed
&& self.troll_resistance_test_passed
}
/// Returns a human-readable summary of the simulation.
pub fn summary(&self) -> String {
if self.is_success() {
format!(
"✅ Success: {} assertions, {} votes, {} queries | \
recency={}, lifecycle={}, audit={}, vote_consensus={}, troll_resist={}",
self.assertions_written,
self.votes_written,
self.queries_executed,
if self.recency_test_passed { "" } else { "" },
if self.lifecycle_test_passed { "" } else { "" },
if self.audit_test_passed { "" } else { "" },
if self.vote_consensus_test_passed { "" } else { "" },
if self.troll_resistance_test_passed { "" } else { "" },
)
} else {
format!(
"❌ Failed: {} assertions, {} votes, {} errors | \
recency={}, lifecycle={}, audit={}, vote_consensus={}, troll_resist={}",
self.assertions_written,
self.votes_written,
self.errors.len(),
if self.recency_test_passed { "" } else { "" },
if self.lifecycle_test_passed { "" } else { "" },
if self.audit_test_passed { "" } else { "" },
if self.vote_consensus_test_passed { "" } else { "" },
if self.troll_resistance_test_passed { "" } else { "" },
)
}
}
}
/// A non-fatal error encountered during simulation.
#[derive(Debug, Clone)]
pub struct SimulationError {
/// Which tick the error occurred on (0-indexed), or 0 for scenario errors.
pub tick: u64,
/// The category of error.
pub kind: ErrorKind,
/// Human-readable error description.
pub message: String,
}
/// Categories of simulation errors.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ErrorKind {
/// Failed to write assertion to WAL.
WriteFailure,
/// Assertion not found in KV store after ingestion.
VerificationFailure,
/// Ed25519 signature verification failed.
SignatureInvalid,
/// Deserialization of stored data failed.
StorageCorruption,
/// Serialization of assertion failed.
SerializationFailure,
/// Query execution failed.
QueryFailure,
/// Lens resolution produced unexpected result.
LensResolutionFailure,
/// Audit trail verification failed.
AuditFailure,
/// Failed to write vote to WAL.
VoteWriteFailure,
/// Vote-aware consensus resolution failed.
VoteConsensusFailure,
}
/// Configuration for a simulation run.
#[derive(Debug, Clone)]
pub struct SimulationConfig {
/// Number of agents to create.
pub agent_count: usize,
/// Number of ticks (assertion cycles) to run.
pub tick_count: usize,
/// Milliseconds to wait for ingestion to complete.
pub ingestion_wait_ms: u64,
}
impl Default for SimulationConfig {
fn default() -> Self {
Self { agent_count: 3, tick_count: 10, ingestion_wait_ms: 500 }
}
}
/// Fatal errors that prevent the simulation from running.
#[derive(Error, Debug)]
pub enum SimulationSetupError {
/// Failed to create temporary directory for WAL.
#[error("Failed to create WAL directory: {0}")]
WalDirectory(String),
/// Failed to create temporary directory for KV store.
#[error("Failed to create store directory: {0}")]
StoreDirectory(String),
/// Failed to open the WAL journal.
#[error("Failed to open journal: {0}")]
JournalOpen(String),
/// Failed to open the KV store.
#[error("Failed to open store: {0}")]
StoreOpen(String),
/// Failed to create the ingestor.
#[error("Failed to create ingestor: {0}")]
IngestorCreate(String),
}
// ============================================================================
// Internal Types
// ============================================================================
/// A simulated agent with a cryptographic identity.
struct Agent {
pub id: String,
signing_key: SigningKey,
verifying_key: VerifyingKey,
}
impl Agent {
pub fn new(id: &str) -> Self {
let mut csprng = OsRng;
let signing_key = SigningKey::generate(&mut csprng);
let verifying_key = VerifyingKey::from(&signing_key);
Self { id: id.to_string(), signing_key, verifying_key }
}
/// Get the agent's public key as a 32-byte array.
pub fn public_key(&self) -> [u8; 32] {
self.verifying_key.to_bytes()
}
/// Create and sign an assertion with default lifecycle (Proposed).
pub fn sign_assertion(&self, subject: &str, predicate: &str, object: ObjectValue) -> Assertion {
self.sign_assertion_with_options(subject, predicate, object, LifecycleStage::Proposed, None)
}
/// Create and sign an assertion with custom lifecycle and timestamp.
pub fn sign_assertion_with_options(
&self,
subject: &str,
predicate: &str,
object: ObjectValue,
lifecycle: LifecycleStage,
custom_timestamp: Option<u64>,
) -> Assertion {
let timestamp = custom_timestamp.unwrap_or_else(|| {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
});
// For simulation, we sign the concatenation of subject and predicate.
// In a real system, we'd sign the hash of the fact data.
let message = format!("{}:{}", subject, predicate);
let signature: Signature = self.signing_key.sign(message.as_bytes());
Assertion {
subject: subject.to_string(),
predicate: predicate.to_string(),
object,
parent_hash: None,
source_hash: [0u8; 32],
source_class: SourceClass::Expert,
visual_hash: None,
epoch: None,
lifecycle,
signatures: vec![SignatureEntry {
agent_id: self.verifying_key.to_bytes(),
signature: signature.to_bytes(),
timestamp,
}],
confidence: 1.0,
timestamp,
vector: None,
}
}
/// Create and sign a vote for an assertion.
///
/// The agent signs the assertion_hash to prove authenticity.
pub fn vote(&self, assertion_hash: Hash, weight: f32) -> Vote {
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
// Sign the assertion hash to prove we're voting on this specific assertion
let signature: Signature = self.signing_key.sign(&assertion_hash);
Vote {
assertion_hash,
agent_id: self.verifying_key.to_bytes(),
weight,
signature: signature.to_bytes(),
timestamp,
}
}
}
// ============================================================================
// Helper Functions
// ============================================================================
/// Result from writing to WAL, includes the raw bytes and the journal offset after the write.
struct WalWriteResult {
raw_bytes: Vec<u8>,
/// The journal offset AFTER this write (use this as target for wait_until_ingested)
end_offset: u64,
}
/// Write an assertion to the WAL and track it for verification.
/// Returns the raw bytes and the journal offset after the write.
async fn write_assertion_to_wal(
journal: &Arc<Mutex<Journal>>,
assertion: &Assertion,
) -> Result<WalWriteResult, String> {
// Serialize with header for WAL
let wal_bytes =
serialize_assertion(assertion).map_err(|e| format!("Failed to serialize: {}", e))?;
// Serialize raw for hash computation
let raw_bytes = serialize(assertion).map_err(|e| format!("Failed to serialize raw: {}", e))?;
// Write to WAL and get the offset after write
let mut journal_lock = journal.lock().await;
let end_offset =
journal_lock.append(wal_bytes).map_err(|e| format!("WAL write failed: {}", e))?;
Ok(WalWriteResult { raw_bytes, end_offset })
}
/// Write a vote to the WAL.
///
/// The vote flows through the full pipeline: WAL → IngestWorker → VoteStore,
/// which automatically updates vote count and aggregate weight caches.
/// Returns the journal offset after the write.
async fn write_vote_to_wal(journal: &Arc<Mutex<Journal>>, vote: &Vote) -> Result<u64, String> {
let wal_bytes = serialize_vote(vote).map_err(|e| format!("Failed to serialize vote: {}", e))?;
let mut journal_lock = journal.lock().await;
let end_offset =
journal_lock.append(wal_bytes).map_err(|e| format!("WAL vote write failed: {}", e))?;
Ok(end_offset)
}
/// Compute the content-addressed hash of an assertion.
fn compute_assertion_hash(assertion: &Assertion) -> Hash {
let bytes = match serialize(assertion) {
Ok(b) => b,
Err(_) => return [0u8; 32],
};
*blake3::hash(&bytes).as_bytes()
}
/// The cursor key used by the ingestor to track its progress.
#[allow(dead_code)]
const CURSOR_KEY: &[u8] = b"__CURSOR__:ingest";
/// Wait until the ingestor cursor reaches or exceeds the target offset.
///
/// This replaces hardcoded sleep timers with cursor-based polling, making
/// tests deterministic rather than timing-dependent.
///
/// Polls every 10ms and times out after max_wait_ms milliseconds.
///
/// # Arguments
/// * `store` - The KVStore to read the cursor from
/// * `target_offset` - The minimum cursor offset to wait for
/// * `max_wait_ms` - Maximum time to wait in milliseconds
///
/// # Returns
/// * `Ok(())` if cursor reached target
/// * `Err(SimulationError)` if timeout exceeded
#[allow(dead_code)]
async fn wait_until_ingested<S: KVStore>(
store: &S,
target_offset: u64,
max_wait_ms: u64,
) -> Result<(), SimulationError> {
let start = Instant::now();
let timeout = Duration::from_millis(max_wait_ms);
let poll_interval = Duration::from_millis(10);
loop {
// Read current cursor position
if let Ok(Some(bytes)) = store.get(CURSOR_KEY).await {
if let Ok(arr) = <[u8; 8]>::try_from(bytes.as_slice()) {
let cursor = u64::from_le_bytes(arr);
if cursor >= target_offset {
debug!(cursor, target_offset, "Ingestion sync: cursor reached target");
return Ok(());
}
}
}
// Check timeout
if start.elapsed() > timeout {
return Err(SimulationError {
tick: 0,
kind: ErrorKind::WriteFailure,
message: format!(
"Ingestion sync timeout: cursor did not reach {} within {}ms",
target_offset, max_wait_ms
),
});
}
tokio::time::sleep(poll_interval).await;
}
}
// ============================================================================
// Core Simulation Function
// ============================================================================
/// Run the simulation with the given configuration.
///
/// This is the primary entry point for the simulation. It:
/// 1. Sets up storage infrastructure (WAL + KV)
/// 2. Creates agents with cryptographic identities
/// 3. Runs assertion ticks (write to WAL)
/// 4. Waits for ingestion
/// 5. Verifies assertions via QueryEngine (Arena 1)
/// 6. Tests Recency Lens (Arena 1)
/// 7. Tests Lifecycle Filtering (Arena 1)
/// 8. Verifies Query Audit Trail (Arena 1)
///
/// # Returns
///
/// - `Ok(SimulationResult)` on completion (even with verification errors)
/// - `Err(SimulationSetupError)` if infrastructure setup fails
pub async fn run_simulation(
config: SimulationConfig,
) -> Result<SimulationResult, SimulationSetupError> {
info!("🚀 Starting StemeDB Simulation: 'The Arena' (Phase 1: Query Path)");
info!(
" Config: {} agents, {} ticks, {}ms ingestion wait",
config.agent_count, config.tick_count, config.ingestion_wait_ms
);
// 1. Setup Storage (WAL + KV)
let temp_wal_dir =
tempfile::tempdir().map_err(|e| SimulationSetupError::WalDirectory(e.to_string()))?;
let temp_db_dir =
tempfile::tempdir().map_err(|e| SimulationSetupError::StoreDirectory(e.to_string()))?;
let journal = Arc::new(Mutex::new(
Journal::open(temp_wal_dir.path())
.map_err(|e| SimulationSetupError::JournalOpen(e.to_string()))?,
));
let store = Arc::new(
SledStore::open(temp_db_dir.path())
.map_err(|e| SimulationSetupError::StoreOpen(e.to_string()))?,
);
debug!(" WAL initialized at {:?}", temp_wal_dir.path());
debug!(" KV Store initialized at {:?}", temp_db_dir.path());
// 2. Start Ingestor
let mut ingestor = Ingestor::new(journal.clone(), store.clone())
.await
.map_err(|e| SimulationSetupError::IngestorCreate(e.to_string()))?;
ingestor.start();
debug!(" Ingestor started (background worker).");
// 3. Setup Agents
let agents: Vec<Agent> = (0..config.agent_count)
.map(|i| {
let name = match i % 3 {
0 => format!("Scientist_{}", i),
1 => format!("Researcher_{}", i),
_ => format!("Troll_{}", i),
};
Agent::new(&name)
})
.collect();
info!(" Swarm of {} agents instantiated.", agents.len());
// 4. Initialize Result
let mut result = SimulationResult {
assertions_written: 0,
assertions_verified: 0,
queries_executed: 0,
votes_written: 0,
recency_test_passed: false,
lifecycle_test_passed: false,
audit_test_passed: false,
vote_consensus_test_passed: false,
troll_resistance_test_passed: false,
errors: Vec::new(),
agent_count: config.agent_count,
tick_count: config.tick_count,
};
// 5. Run Simulation Ticks (Write Phase)
let mut assertions_data: Vec<(Assertion, Vec<u8>)> = Vec::with_capacity(config.tick_count);
let mut last_journal_offset = 0u64;
for tick in 0..config.tick_count {
let agent = &agents[tick % agents.len()];
let assertion = agent.sign_assertion(
&format!("Entity_{}", tick),
"has_property",
ObjectValue::Text(format!("Value_{}", tick)),
);
match write_assertion_to_wal(&journal, &assertion).await {
Ok(wal_result) => {
debug!(" [Tick {}] Agent '{}' wrote to WAL", tick, agent.id);
result.assertions_written += 1;
last_journal_offset = wal_result.end_offset;
assertions_data.push((assertion, wal_result.raw_bytes));
}
Err(e) => {
result.errors.push(SimulationError {
tick: tick as u64,
kind: ErrorKind::WriteFailure,
message: e,
});
}
}
}
info!(" {} assertions written to WAL.", result.assertions_written);
// 6. Wait for Ingestion (cursor-based sync)
info!("⏳ Waiting for ingestion to reach offset {}...", last_journal_offset);
if let Err(e) =
wait_until_ingested(&*store, last_journal_offset, config.ingestion_wait_ms).await
{
result.errors.push(e);
}
// ========================================================================
// ARENA 1: Query Path Verification
// ========================================================================
info!("🔬 Arena 1: Verifying Query Path...");
// 7. Create QueryEngine and verify assertions via queries
let engine = QueryEngine::new(store.clone());
for (i, (original_assertion, _original_bytes)) in assertions_data.iter().enumerate() {
let tick = i as u64;
// Query via QueryEngine (not direct KV access)
let query = Query::builder()
.subject(&original_assertion.subject)
.predicate(&original_assertion.predicate)
.build();
match engine.execute(&query).await {
Ok(query_result) => {
result.queries_executed += 1;
if query_result.assertions.is_empty() {
result.errors.push(SimulationError {
tick,
kind: ErrorKind::QueryFailure,
message: format!(
"Query returned no results for {}:{}",
original_assertion.subject, original_assertion.predicate
),
});
continue;
}
// Verify content matches
let found = &query_result.assertions[0];
if found.subject != original_assertion.subject
|| found.object != original_assertion.object
{
result.errors.push(SimulationError {
tick,
kind: ErrorKind::StorageCorruption,
message: format!(
"Content mismatch: expected subject='{}', got='{}'",
original_assertion.subject, found.subject
),
});
continue;
}
// Verify signature
let sig_entry = &found.signatures[0];
let verifying_key = match VerifyingKey::from_bytes(&sig_entry.agent_id) {
Ok(k) => k,
Err(e) => {
result.errors.push(SimulationError {
tick,
kind: ErrorKind::SignatureInvalid,
message: format!("Invalid agent public key: {}", e),
});
continue;
}
};
let signature = Signature::from_bytes(&sig_entry.signature);
let message = format!("{}:{}", found.subject, found.predicate);
if let Err(e) = verifying_key.verify(message.as_bytes(), &signature) {
result.errors.push(SimulationError {
tick,
kind: ErrorKind::SignatureInvalid,
message: format!("Signature verification failed: {}", e),
});
continue;
}
debug!(
" [Query {}] Assertion for '{}' verified via QueryEngine.",
i, found.subject
);
result.assertions_verified += 1;
}
Err(e) => {
result.errors.push(SimulationError {
tick,
kind: ErrorKind::QueryFailure,
message: format!("Query execution failed: {}", e),
});
}
}
}
// ========================================================================
// 8. Arena 1.2: Recency Lens Test
// ========================================================================
info!("🔬 Arena 1.2: Testing Recency Lens...");
result.recency_test_passed =
run_recency_lens_test(&journal, &store, &agents, &mut result).await;
// ========================================================================
// 9. Arena 1.3: Lifecycle Filtering Test
// ========================================================================
info!("🔬 Arena 1.3: Testing Lifecycle Filtering...");
result.lifecycle_test_passed = run_lifecycle_test(&journal, &store, &agents, &mut result).await;
// Wait for these new assertions to be ingested
tokio::time::sleep(std::time::Duration::from_millis(config.ingestion_wait_ms)).await;
// ========================================================================
// 10. Arena 1.4: Query Audit Verification
// ========================================================================
info!("🔬 Arena 1.4: Testing Query Audit Trail...");
result.audit_test_passed = run_audit_test(&store, &agents, &mut result).await;
// ========================================================================
// ARENA 2: Voting & Consensus
// ========================================================================
info!("🗳️ Arena 2: Verifying Voting & Consensus...");
// ========================================================================
// 11. Arena 2.2: Conflicting Assertions with Votes
// ========================================================================
info!("🗳️ Arena 2.2: Testing Vote-Aware Consensus...");
result.vote_consensus_test_passed =
run_vote_consensus_test(&journal, &store, &agents, &mut result).await;
// Wait for votes to be ingested
tokio::time::sleep(std::time::Duration::from_millis(config.ingestion_wait_ms)).await;
// ========================================================================
// 12. Arena 2.3: Troll Vote Resistance
// ========================================================================
info!("🗳️ Arena 2.3: Testing Troll Vote Resistance...");
result.troll_resistance_test_passed =
run_troll_resistance_test(&journal, &store, &agents, &mut result).await;
// Wait for final ingestion
tokio::time::sleep(std::time::Duration::from_millis(config.ingestion_wait_ms)).await;
// 13. Log summary
if result.is_success() {
info!("{}", result.summary());
} else {
warn!("{}", result.summary());
for err in &result.errors {
warn!(" [Tick {}] {:?}: {}", err.tick, err.kind, err.message);
}
}
Ok(result)
}
// ============================================================================
// Arena 1.2: Recency Lens Test
// ============================================================================
/// Test that RecencyLens correctly selects the most recent assertion.
///
/// Creates two assertions for the same subject+predicate with different timestamps,
/// then verifies that the lens resolves to the newer one.
async fn run_recency_lens_test<S: KVStore + 'static>(
journal: &Arc<Mutex<Journal>>,
store: &Arc<S>,
agents: &[Agent],
result: &mut SimulationResult,
) -> bool {
let agent = &agents[0];
let subject = "RecencyTest_Entity";
let predicate = "test_property";
// Create older assertion (timestamp = 1000)
let old_assertion = agent.sign_assertion_with_options(
subject,
predicate,
ObjectValue::Text("old_value".to_string()),
LifecycleStage::Proposed,
Some(1000),
);
// Create newer assertion (timestamp = 2000)
let new_assertion = agent.sign_assertion_with_options(
subject,
predicate,
ObjectValue::Text("new_value".to_string()),
LifecycleStage::Proposed,
Some(2000),
);
// Write both to WAL
if let Err(e) = write_assertion_to_wal(journal, &old_assertion).await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::WriteFailure,
message: format!("Recency test: failed to write old assertion: {}", e),
});
return false;
}
if let Err(e) = write_assertion_to_wal(journal, &new_assertion).await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::WriteFailure,
message: format!("Recency test: failed to write new assertion: {}", e),
});
return false;
}
// Wait for ingestion
tokio::time::sleep(std::time::Duration::from_millis(300)).await;
// Query to get candidates
let engine = QueryEngine::new(store.clone());
let query = Query::builder().subject(subject).predicate(predicate).build();
let query_result = match engine.execute(&query).await {
Ok(r) => r,
Err(e) => {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::QueryFailure,
message: format!("Recency test: query failed: {}", e),
});
return false;
}
};
if query_result.assertions.len() < 2 {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::QueryFailure,
message: format!(
"Recency test: expected 2 assertions, got {}",
query_result.assertions.len()
),
});
return false;
}
// Apply RecencyLens
let lens = RecencyLens;
let resolution = lens.resolve(&query_result.assertions);
match resolution.winner {
Some(winner) => {
// Winner should be the newer assertion (timestamp = 2000)
if winner.timestamp == 2000 {
if let ObjectValue::Text(ref value) = winner.object {
if value == "new_value" {
debug!(" Recency lens correctly selected newest assertion");
result.queries_executed += 1;
return true;
}
}
}
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::LensResolutionFailure,
message: format!(
"Recency test: wrong winner selected (timestamp={}, expected 2000)",
winner.timestamp
),
});
false
}
None => {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::LensResolutionFailure,
message: "Recency test: lens returned no winner".to_string(),
});
false
}
}
}
// ============================================================================
// Arena 1.3: Lifecycle Filtering Test
// ============================================================================
/// Test that lifecycle filtering correctly filters assertions.
///
/// Creates a Proposed and an Approved assertion for the same subject+predicate,
/// then verifies that querying with lifecycle=Approved returns only the Approved one.
async fn run_lifecycle_test<S: KVStore + 'static>(
journal: &Arc<Mutex<Journal>>,
store: &Arc<S>,
agents: &[Agent],
result: &mut SimulationResult,
) -> bool {
let agent = &agents[0];
let subject = "LifecycleTest_Entity";
let predicate = "jwt_algorithm";
// Proposed assertion (the bug - using RS256)
let proposed = agent.sign_assertion_with_options(
subject,
predicate,
ObjectValue::Text("RS256".to_string()),
LifecycleStage::Proposed,
Some(1000),
);
// Approved assertion (the fix - using ES256)
let approved = agent.sign_assertion_with_options(
subject,
predicate,
ObjectValue::Text("ES256".to_string()),
LifecycleStage::Approved,
Some(2000),
);
// Write both to WAL
if let Err(e) = write_assertion_to_wal(journal, &proposed).await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::WriteFailure,
message: format!("Lifecycle test: failed to write proposed: {}", e),
});
return false;
}
if let Err(e) = write_assertion_to_wal(journal, &approved).await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::WriteFailure,
message: format!("Lifecycle test: failed to write approved: {}", e),
});
return false;
}
// Wait for ingestion
tokio::time::sleep(std::time::Duration::from_millis(300)).await;
// Query with lifecycle filter
let engine = QueryEngine::new(store.clone());
let query = Query::builder()
.subject(subject)
.predicate(predicate)
.lifecycle(LifecycleStage::Approved)
.build();
let query_result = match engine.execute(&query).await {
Ok(r) => r,
Err(e) => {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::QueryFailure,
message: format!("Lifecycle test: query failed: {}", e),
});
return false;
}
};
// Should return exactly 1 assertion (the Approved one)
if query_result.assertions.len() != 1 {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::QueryFailure,
message: format!(
"Lifecycle test: expected 1 Approved assertion, got {}",
query_result.assertions.len()
),
});
return false;
}
let found = &query_result.assertions[0];
if found.lifecycle != LifecycleStage::Approved {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::QueryFailure,
message: format!("Lifecycle test: expected Approved, got {:?}", found.lifecycle),
});
return false;
}
// Verify it's the ES256 value (the fix)
if let ObjectValue::Text(ref value) = found.object {
if value == "ES256" {
debug!(" Lifecycle filter correctly returned only Approved assertion (ES256)");
result.queries_executed += 1;
return true;
}
}
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::QueryFailure,
message: "Lifecycle test: wrong object value returned".to_string(),
});
false
}
// ============================================================================
// Arena 1.4: Query Audit Verification
// ============================================================================
/// Test that query audits are correctly stored and retrievable.
///
/// Creates a query audit with an agent ID, stores it, then verifies it can be
/// retrieved via `get_audits_for_agent()`.
async fn run_audit_test<S: KVStore + 'static>(
store: &Arc<S>,
agents: &[Agent],
result: &mut SimulationResult,
) -> bool {
let audit_store = GenericAuditStore::new(store.clone());
let agent = &agents[0];
let agent_id = agent.public_key();
// Create a query audit
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let query_id: QueryId = *blake3::hash(b"test_query").as_bytes();
let audit = QueryAudit {
query_id,
agent_id: Some(agent_id),
timestamp,
params: QueryParams {
subject: Some("AuditTest_Entity".to_string()),
predicate: Some("test_property".to_string()),
lifecycle: Some(LifecycleStage::Approved),
epoch: None,
lens: Some("Recency".to_string()),
},
result_hash: Some([1u8; 32]),
result_confidence: 0.95,
contributing_assertions: vec![ContributingAssertion {
assertion_hash: [2u8; 32],
weight: 1.0,
source_hash: [3u8; 32],
lifecycle: LifecycleStage::Approved,
}],
};
// Store the audit
if let Err(e) = audit_store.put_audit(&audit).await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::AuditFailure,
message: format!("Audit test: failed to store audit: {}", e),
});
return false;
}
// Retrieve audits for this agent
let audits = match audit_store.get_audits_for_agent(&agent_id, 0, None, 10).await {
Ok(a) => a,
Err(e) => {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::AuditFailure,
message: format!("Audit test: failed to retrieve audits: {}", e),
});
return false;
}
};
// Verify we got our audit back
if audits.is_empty() {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::AuditFailure,
message: "Audit test: no audits found for agent".to_string(),
});
return false;
}
let found = &audits[0];
if found.query_id != query_id {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::AuditFailure,
message: "Audit test: query_id mismatch".to_string(),
});
return false;
}
if found.agent_id != Some(agent_id) {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::AuditFailure,
message: "Audit test: agent_id mismatch".to_string(),
});
return false;
}
// Verify contributing assertions
if found.contributing_assertions.len() != 1 {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::AuditFailure,
message: format!(
"Audit test: expected 1 contributing assertion, got {}",
found.contributing_assertions.len()
),
});
return false;
}
debug!(" Audit trail correctly stored and retrieved for agent");
true
}
// ============================================================================
// Arena 2.2: Vote-Aware Consensus Test
// ============================================================================
/// Test that VoteAwareConsensusLens correctly selects the assertion with most votes.
///
/// Scenario:
/// - Scientist_Alpha asserts "Protein_X binds Receptor_Y" (confidence 0.8)
/// - Scientist_Beta asserts "Protein_X binds Receptor_Z" (confidence 0.8)
/// - Alpha votes for own assertion (weight 1.0)
/// - Beta votes for own assertion (weight 1.0)
/// - Believer (third agent) votes for Alpha's assertion
/// - Query with VoteAwareConsensusLens
/// - Verify Alpha's assertion wins (2 votes vs 1)
async fn run_vote_consensus_test<S: KVStore + 'static>(
journal: &Arc<Mutex<Journal>>,
store: &Arc<S>,
agents: &[Agent],
result: &mut SimulationResult,
) -> bool {
// Need at least 3 agents: Alpha, Beta, Believer
if agents.len() < 3 {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::VoteConsensusFailure,
message: "Vote consensus test: need at least 3 agents".to_string(),
});
return false;
}
let alpha = &agents[0];
let beta = &agents[1];
let believer = &agents[2];
let subject = "Protein_X";
let predicate = "binds";
// Alpha's assertion: Protein_X binds Receptor_Y
let alpha_assertion = alpha.sign_assertion_with_options(
subject,
predicate,
ObjectValue::Text("Receptor_Y".to_string()),
LifecycleStage::Proposed,
Some(1000),
);
// Beta's conflicting assertion: Protein_X binds Receptor_Z
let beta_assertion = beta.sign_assertion_with_options(
subject,
predicate,
ObjectValue::Text("Receptor_Z".to_string()),
LifecycleStage::Proposed,
Some(1001),
);
// Write both assertions to WAL
if let Err(e) = write_assertion_to_wal(journal, &alpha_assertion).await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::WriteFailure,
message: format!("Vote consensus test: failed to write Alpha assertion: {}", e),
});
return false;
}
result.assertions_written += 1;
if let Err(e) = write_assertion_to_wal(journal, &beta_assertion).await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::WriteFailure,
message: format!("Vote consensus test: failed to write Beta assertion: {}", e),
});
return false;
}
result.assertions_written += 1;
// Wait for assertions to be ingested
tokio::time::sleep(std::time::Duration::from_millis(300)).await;
// Compute assertion hashes
let alpha_hash = compute_assertion_hash(&alpha_assertion);
let beta_hash = compute_assertion_hash(&beta_assertion);
// Alpha votes for own assertion (weight 1.0)
let alpha_vote = alpha.vote(alpha_hash, 1.0);
if let Err(e) = write_vote_to_wal(journal, &alpha_vote).await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::VoteWriteFailure,
message: format!("Vote consensus test: failed to write Alpha vote: {}", e),
});
return false;
}
result.votes_written += 1;
// Beta votes for own assertion (weight 1.0)
let beta_vote = beta.vote(beta_hash, 1.0);
if let Err(e) = write_vote_to_wal(journal, &beta_vote).await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::VoteWriteFailure,
message: format!("Vote consensus test: failed to write Beta vote: {}", e),
});
return false;
}
result.votes_written += 1;
// Believer votes for Alpha's assertion (weight 1.0) - this tips the balance
let believer_vote = believer.vote(alpha_hash, 1.0);
if let Err(e) = write_vote_to_wal(journal, &believer_vote).await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::VoteWriteFailure,
message: format!("Vote consensus test: failed to write Believer vote: {}", e),
});
return false;
}
result.votes_written += 1;
// Wait for votes to be ingested (IngestWorker now uses VoteStore.put_vote())
tokio::time::sleep(std::time::Duration::from_millis(300)).await;
// Query to get candidates
let engine = QueryEngine::new(store.clone());
let query = Query::builder().subject(subject).predicate(predicate).build();
let query_result = match engine.execute(&query).await {
Ok(r) => r,
Err(e) => {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::QueryFailure,
message: format!("Vote consensus test: query failed: {}", e),
});
return false;
}
};
if query_result.assertions.len() < 2 {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::QueryFailure,
message: format!(
"Vote consensus test: expected 2 assertions, got {}",
query_result.assertions.len()
),
});
return false;
}
// Apply VoteAwareConsensusLens
let vote_store = Arc::new(GenericVoteStore::new(store.clone()));
let lens = VoteAwareConsensusLens::new(vote_store);
let resolution = lens.resolve_async(&query_result.assertions).await;
match resolution.winner {
Some(winner) => {
// Winner should be Alpha's assertion (Receptor_Y) - it has 2 votes vs Beta's 1
if let ObjectValue::Text(ref value) = winner.object {
if value == "Receptor_Y" {
debug!(
" Vote-aware consensus correctly selected Alpha's assertion (2 votes vs 1)"
);
result.queries_executed += 1;
return true;
}
}
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::VoteConsensusFailure,
message: format!(
"Vote consensus test: wrong winner selected (expected Receptor_Y, got {:?})",
winner.object
),
});
false
}
None => {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::VoteConsensusFailure,
message: "Vote consensus test: lens returned no winner".to_string(),
});
false
}
}
}
// ============================================================================
// Arena 2.3: Troll Vote Resistance Test
// ============================================================================
/// Test that trolls cannot overturn consensus with self-votes.
///
/// Scenario:
/// - Scientist asserts high-confidence fact about Entity_X
/// - Scientist votes for own assertion
/// - Troll creates low-confidence contradicting assertion
/// - Troll votes for own assertion
/// - Another scientist (ally) also votes for Scientist's assertion
/// - Verify Scientist's assertion still wins despite Troll vote
async fn run_troll_resistance_test<S: KVStore + 'static>(
journal: &Arc<Mutex<Journal>>,
store: &Arc<S>,
agents: &[Agent],
result: &mut SimulationResult,
) -> bool {
// Need at least 3 agents: Scientist, Troll, Ally
if agents.len() < 3 {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::VoteConsensusFailure,
message: "Troll resistance test: need at least 3 agents".to_string(),
});
return false;
}
let scientist = &agents[0];
let troll = &agents[1];
let ally = &agents[2];
let subject = "Entity_X";
let predicate = "property";
// Scientist's high-confidence assertion
let scientist_assertion = scientist.sign_assertion_with_options(
subject,
predicate,
ObjectValue::Text("verified_value".to_string()),
LifecycleStage::Proposed,
Some(2000),
);
// Troll's low-confidence contradicting assertion
let troll_assertion = troll.sign_assertion_with_options(
subject,
predicate,
ObjectValue::Text("fake_value".to_string()),
LifecycleStage::Proposed,
Some(2001),
);
// Write both assertions to WAL
if let Err(e) = write_assertion_to_wal(journal, &scientist_assertion).await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::WriteFailure,
message: format!("Troll resistance test: failed to write scientist assertion: {}", e),
});
return false;
}
result.assertions_written += 1;
if let Err(e) = write_assertion_to_wal(journal, &troll_assertion).await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::WriteFailure,
message: format!("Troll resistance test: failed to write troll assertion: {}", e),
});
return false;
}
result.assertions_written += 1;
// Wait for assertions to be ingested
tokio::time::sleep(std::time::Duration::from_millis(300)).await;
// Compute assertion hashes
let scientist_hash = compute_assertion_hash(&scientist_assertion);
let troll_hash = compute_assertion_hash(&troll_assertion);
// Scientist votes for own assertion (weight 1.0)
let scientist_vote = scientist.vote(scientist_hash, 1.0);
if let Err(e) = write_vote_to_wal(journal, &scientist_vote).await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::VoteWriteFailure,
message: format!("Troll resistance test: failed to write scientist vote: {}", e),
});
return false;
}
result.votes_written += 1;
// Troll votes for own assertion (weight 1.0)
let troll_vote = troll.vote(troll_hash, 1.0);
if let Err(e) = write_vote_to_wal(journal, &troll_vote).await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::VoteWriteFailure,
message: format!("Troll resistance test: failed to write troll vote: {}", e),
});
return false;
}
result.votes_written += 1;
// Ally votes for scientist's assertion (weight 1.0) - tips balance in scientist's favor
let ally_vote = ally.vote(scientist_hash, 1.0);
if let Err(e) = write_vote_to_wal(journal, &ally_vote).await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::VoteWriteFailure,
message: format!("Troll resistance test: failed to write ally vote: {}", e),
});
return false;
}
result.votes_written += 1;
// Wait for votes to be ingested (IngestWorker now uses VoteStore.put_vote())
tokio::time::sleep(std::time::Duration::from_millis(300)).await;
// Query to get candidates
let engine = QueryEngine::new(store.clone());
let query = Query::builder().subject(subject).predicate(predicate).build();
let query_result = match engine.execute(&query).await {
Ok(r) => r,
Err(e) => {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::QueryFailure,
message: format!("Troll resistance test: query failed: {}", e),
});
return false;
}
};
if query_result.assertions.len() < 2 {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::QueryFailure,
message: format!(
"Troll resistance test: expected 2 assertions, got {}",
query_result.assertions.len()
),
});
return false;
}
// Apply VoteAwareConsensusLens
let vote_store = Arc::new(GenericVoteStore::new(store.clone()));
let lens = VoteAwareConsensusLens::new(vote_store);
let resolution = lens.resolve_async(&query_result.assertions).await;
match resolution.winner {
Some(winner) => {
// Winner should be scientist's assertion (verified_value) - it has 2 votes vs troll's 1
if let ObjectValue::Text(ref value) = winner.object {
if value == "verified_value" {
debug!(" Troll resistance: consensus correctly resisted troll (2 votes vs 1)");
result.queries_executed += 1;
return true;
}
}
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::VoteConsensusFailure,
message: format!(
"Troll resistance test: troll won! (expected verified_value, got {:?})",
winner.object
),
});
false
}
None => {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::VoteConsensusFailure,
message: "Troll resistance test: lens returned no winner".to_string(),
});
false
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_simulation_default_config_succeeds() {
let config = SimulationConfig::default();
let result = run_simulation(config).await.expect("Simulation should not fail setup");
assert!(result.is_success(), "Simulation should succeed: {:?}", result.errors);
// Arena 0 + Arena 2 assertions: 10 base + 2 (vote consensus) + 2 (troll resistance)
assert_eq!(result.assertions_written, 14);
assert_eq!(result.assertions_verified, 10);
// Arena 2 votes: 3 (vote consensus) + 3 (troll resistance)
assert_eq!(result.votes_written, 6);
assert!(result.recency_test_passed, "Recency test should pass");
assert!(result.lifecycle_test_passed, "Lifecycle test should pass");
assert!(result.audit_test_passed, "Audit test should pass");
assert!(result.vote_consensus_test_passed, "Vote consensus test should pass");
assert!(result.troll_resistance_test_passed, "Troll resistance test should pass");
}
#[tokio::test]
async fn test_simulation_custom_config() {
let config = SimulationConfig { agent_count: 5, tick_count: 20, ingestion_wait_ms: 600 };
let result = run_simulation(config).await.expect("Simulation should not fail setup");
assert!(result.is_success());
// 20 base + 2 (vote consensus) + 2 (troll resistance)
assert_eq!(result.assertions_written, 24);
assert_eq!(result.assertions_verified, 20);
assert_eq!(result.votes_written, 6);
assert_eq!(result.agent_count, 5);
assert_eq!(result.tick_count, 20);
}
#[tokio::test]
async fn test_simulation_result_summary() {
let success = SimulationResult {
assertions_written: 10,
assertions_verified: 10,
queries_executed: 12,
votes_written: 6,
recency_test_passed: true,
lifecycle_test_passed: true,
audit_test_passed: true,
vote_consensus_test_passed: true,
troll_resistance_test_passed: true,
errors: vec![],
agent_count: 3,
tick_count: 10,
};
assert!(success.summary().contains(""));
assert!(success.summary().contains("recency=✓"));
assert!(success.summary().contains("lifecycle=✓"));
assert!(success.summary().contains("audit=✓"));
assert!(success.summary().contains("vote_consensus=✓"));
assert!(success.summary().contains("troll_resist=✓"));
let failure = SimulationResult {
assertions_written: 10,
assertions_verified: 8,
queries_executed: 10,
votes_written: 6,
recency_test_passed: false,
lifecycle_test_passed: true,
audit_test_passed: true,
vote_consensus_test_passed: true,
troll_resistance_test_passed: false,
errors: vec![SimulationError {
tick: 5,
kind: ErrorKind::VerificationFailure,
message: "Test error".to_string(),
}],
agent_count: 3,
tick_count: 10,
};
assert!(failure.summary().contains(""));
assert!(failure.summary().contains("recency=✗"));
assert!(failure.summary().contains("troll_resist=✗"));
}
}