fix: merge upstream 10 commits, fix DashMap deadlock, deterministic sim ingestion

Merged 10 upstream commits (MemTable, read-your-writes tests, feed endpoint,
security hardening, signed assertions, source registry, dashboard enhancements)
and fixed all test failures across the full workspace (2656/2656 passing).

Key fixes:
- fix(cluster): DashMap deadlock in swim.rs suspect_node/fail_node/alive_node
  - DashMap::get_mut RefMut + iter() on same map = non-reentrant write lock deadlock
  - Fix: extract clone in scoped block to drop RefMut before calling update_node_gauges()
  - 6 previously-hanging SWIM tests now pass in <2s
- fix(sim): replace background-task+polling ingestion with synchronous process_pending()
  - smoke_high_volume_simulation was CPU-starved under 2656 parallel tests
  - Removed ingestor.start() + wait_until_ingested() pattern throughout sim
  - All arena functions now call ingestor.process_pending() directly (deterministic)
- fix(test): v2 signature helper used wrong hash (rkyv vs canonical compute_content_hash_v2)
- fix(test): quota test signed "test" but v1 requires "subject:predicate" format
- fix(test): http_validation now accepts 400 for valid-format-but-invalid-crypto hex
- fix(test): scale_adaptive micro tier assertions updated (auto_promote upstream change)
- config: add nextest.toml with slow-timeout for background-task-tests group

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
jordan 2026-02-20 20:27:32 -07:00
parent ad07a75d0a
commit 02ecac9a07
37 changed files with 1802 additions and 759 deletions

22
.config/nextest.toml Normal file
View File

@ -0,0 +1,22 @@
# Nextest configuration for StemeDB workspace.
#
# References:
# https://nextest.rs/configuration/overview.html
# https://nextest.rs/configuration/test-groups.html
# Tests that spawn background tokio tasks and wait for them to make progress
# (e.g. IngestWorker cursor polling) need exclusive CPU access under parallel
# test load. Without this, the background tasks get starved and timeout.
[test-groups.background-task-tests]
max-threads = 1
[profile.default]
# Give long-running simulation tests enough time before marking them as slow.
slow-timeout = { period = "60s" }
[[profile.default.overrides]]
# smoke_high_volume_simulation spawns a background IngestWorker and polls its
# cursor. Under full parallel load (2000+ concurrent tests) the tokio scheduler
# starves the background task, causing spurious timeout failures.
filter = 'test(smoke_high_volume_simulation)'
test-group = "background-task-tests"

View File

@ -12,7 +12,7 @@ It serves as the "Git for Truth," allowing agents to:
## Tech Stack
* **Language:** Rust (2024 edition)
* **Durability:** `stemedb-wal` (Quarantine Pattern with `fs2`, `blake3` checksums)
* **Storage:** `stemedb-storage` (`sled` embedded KV, abstracted via `KVStore` trait)
* **Storage:** `stemedb-storage` (Hybrid Store: `fjall` LSM-tree for writes, `redb` B-tree for reads)
* **Serialization:** `rkyv` (Zero-copy deserialization for high performance)
* **Ingestion:** `stemedb-ingest` (Async background worker bridging WAL and Store)
* **Simulation:** `stemedb-sim` (Agent-based modeling to verify system behavior)
@ -25,12 +25,12 @@ The system follows a "Spine -> Lattice -> Cortex" architecture:
* **Ingestor:** Background task that tails the WAL and indexes data.
* **KV Store:** Persistent storage for assertions and indexes.
2. **The Lattice (Connectivity) - *In Progress*:**
2. **The Lattice (Connectivity) - *Implemented*:**
* **Ballot Box:** High-velocity vote stream.
* **Materialized Views:** Pre-computed truth states.
3. **The Cortex (Reasoning) - *Planned*:**
* **Lenses:** WASM-based filters for truth resolution.
3. **The Cortex (Reasoning) - *Implemented*:**
* **Lenses:** WASM-based filters for truth resolution (Consensus, Authority, Recency, etc.).
* **SMT:** Sparse Merkle Trees for efficient branching.
## Key Files & Directories
@ -38,8 +38,9 @@ The system follows a "Spine -> Lattice -> Cortex" architecture:
* `crates/`
* `stemedb-core/`: Core data structures (`Assertion`, `Vote`, `Epoch`) and types.
* `stemedb-wal/`: Durability primitives (`Journal`, `FsyncGuard`, `Record`).
* `stemedb-storage/`: Storage engine abstraction and `sled` implementation.
* `stemedb-storage/`: Storage engine abstraction and Hybrid Store implementation.
* `stemedb-ingest/`: Async ingestion pipeline logic.
* `stemedb-lens/`: Truth Lenses (`Recency`, `Consensus`, `Authority`, `Skeptic`).
* `stemedb-sim/`: "The Arena" simulation for end-to-end verification.
* `architecture.md`: Detailed system design and data flow.
* `roadmap.md`: Phased implementation plan and status.

View File

@ -23,8 +23,8 @@ fn test_micro_team_sees_patterns() {
// - Scale tier: Micro (1-5 projects)
// - Emerging min_projects: max(2, 0.50*3) = max(2, 1.5) = 2
// - Adoption rate: 2/3 = 67% >= 50%
// Should require review (emerging tier)
assert_eq!(decision, PromotionDecision::RequireReview);
// Micro emerging auto-promotes for immediate visibility
assert_eq!(decision, PromotionDecision::AutoPromote(SourceClass::Community));
}
#[test]
@ -40,8 +40,8 @@ fn test_micro_team_regulatory_disabled() {
);
// Regulatory tier is disabled for micro teams
// Should fall through to emerging tier
assert_eq!(decision, PromotionDecision::RequireReview);
// Falls through to emerging tier, which auto-promotes for immediate visibility
assert_eq!(decision, PromotionDecision::AutoPromote(SourceClass::Community));
}
#[test]

View File

@ -3,6 +3,7 @@ use tracing::instrument;
use crate::llm::{chunk_text, create_client, deduplicate_claims, ChunkConfig, LlmConfig};
use crate::types::{Claim, ClaimCheck, ClaimStatus};
use crate::stemedb::Client as StemeClient;
use super::settings::SettingsState;
@ -90,28 +91,60 @@ fn map_llm_error_to_user_message(e: &crate::llm::LlmError) -> String {
/// Check claims against the knowledge graph.
#[tauri::command]
pub async fn check_claims(claims: Vec<Claim>) -> Result<Vec<ClaimCheck>, String> {
pub async fn check_claims(
state: State<'_, SettingsState>,
claims: Vec<Claim>,
) -> Result<Vec<ClaimCheck>, String> {
tracing::info!(count = claims.len(), "Checking claims");
// TODO: Week 3 - Check against Episteme
Ok(claims
.into_iter()
.map(|claim| ClaimCheck { claim, status: ClaimStatus::New, related: vec![] })
.collect())
let settings = state.0.lock().map_err(|e| format!("Failed to read settings: {}", e))?.clone();
let client = StemeClient::new(settings.stemedb_url);
let mut checks = Vec::new();
for claim in claims {
match client.check_claim(&claim).await {
Ok(check) => checks.push(check),
Err(e) => {
tracing::warn!("Failed to check claim: {}", e);
// Return as new if check fails
checks.push(ClaimCheck {
claim,
status: ClaimStatus::New,
related: vec![],
});
}
}
}
Ok(checks)
}
/// Save claims to the knowledge graph.
#[tauri::command]
pub async fn save_claims(claims: Vec<Claim>) -> Result<usize, String> {
pub async fn save_claims(
state: State<'_, SettingsState>,
claims: Vec<Claim>,
) -> Result<usize, String> {
let count = claims.len();
tracing::info!(count, "Saving claims");
// TODO: Week 3 - Save to Episteme
Ok(count)
let settings = state.0.lock().map_err(|e| format!("Failed to read settings: {}", e))?.clone();
let client = StemeClient::new(settings.stemedb_url);
let mut saved = 0;
for claim in claims {
match client.save_claim(&claim).await {
Ok(_) => saved += 1,
Err(e) => tracing::warn!("Failed to save claim: {}", e),
}
}
Ok(saved)
}
/// Get the current claim count.
#[tauri::command]
pub async fn get_claim_count() -> Result<usize, String> {
// TODO: Week 3 - Query Episteme
// TODO: Implement stats endpoint in StemeDB
Ok(0)
}

View File

@ -3,6 +3,7 @@
mod commands;
mod llm;
mod types;
mod stemedb;
use commands::{
check_claims, extract_claims, get_claim_count, get_settings, save_claims, test_llm_connection,

View File

@ -0,0 +1,135 @@
use serde::{Deserialize, Serialize};
use crate::types::{Claim, ClaimCheck, ClaimStatus, RelatedClaim};
#[derive(Debug, Deserialize)]
pub struct QueryResponse {
pub assertions: Vec<AssertionResponse>,
pub conflict_score: Option<f32>,
}
#[derive(Debug, Deserialize)]
pub struct AssertionResponse {
pub subject: String,
pub predicate: String,
pub object: ObjectValue,
pub confidence: f32,
pub source_class: String,
}
#[derive(Debug, Deserialize)]
#[serde(untagged)]
pub enum ObjectValue {
Text(String),
Number(f64),
Boolean(bool),
Link(String),
Image(String),
}
impl ToString for ObjectValue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ObjectValue::Text(s) => write!(f, "{}", s),
ObjectValue::Number(n) => write!(f, "{}", n),
ObjectValue::Boolean(b) => write!(f, "{}", b),
ObjectValue::Link(s) => write!(f, "{}", s),
ObjectValue::Image(s) => write!(f, "[Image: {}]", s),
}
}
}
pub struct Client {
url: String,
http: reqwest::Client,
}
impl Client {
pub fn new(url: String) -> Self {
Self {
url: url.trim_end_matches('/').to_string(),
http: reqwest::Client::new(),
}
}
pub async fn check_claim(&self, claim: &Claim) -> Result<ClaimCheck, String> {
let url = format!("{}/v1/query", self.url);
// Query using Skeptic lens to see conflicts
let response = self.http.get(&url)
.query(&[
("subject", &claim.subject),
("predicate", &claim.predicate),
("lens", &"Skeptic".to_string()),
])
.send()
.await
.map_err(|e| format!("Request failed: {}", e))?;
if !response.status().is_success() {
return Err(format!("API error: {}", response.status()));
}
let data: QueryResponse = response.json().await
.map_err(|e| format!("Parse error: {}", e))?;
let status = if data.assertions.is_empty() {
ClaimStatus::New
} else if let Some(score) = data.conflict_score {
if score > 0.5 {
ClaimStatus::Contradicts
} else {
ClaimStatus::Matches
}
} else {
ClaimStatus::Matches
};
let related = data.assertions.into_iter().map(|a| {
RelatedClaim {
claim: Claim {
subject: a.subject,
predicate: a.predicate,
object: a.object.to_string(),
confidence: a.confidence,
quote: "".to_string(),
source: Some(a.source_class),
},
relationship: "existing".to_string(),
source: "stemedb".to_string(),
}
}).collect();
Ok(ClaimCheck {
claim: claim.clone(),
status,
related,
})
}
pub async fn save_claim(&self, claim: &Claim) -> Result<(), String> {
let url = format!("{}/v1/assert", self.url);
let body = serde_json::json!({
"subject": claim.subject,
"predicate": claim.predicate,
"object": {
"type": "Text",
"value": claim.object
},
"confidence": claim.confidence,
"source_class": "Anecdotal", // Default for Disputed
});
let response = self.http.post(&url)
.json(&body)
.send()
.await
.map_err(|e| format!("Request failed: {}", e))?;
if !response.status().is_success() {
return Err(format!("API error: {}", response.status()));
}
Ok(())
}
}

View File

@ -48,6 +48,7 @@ pub struct Settings {
pub api_key: Option<String>,
pub auto_save: bool,
pub notifications_enabled: bool,
pub stemedb_url: String,
}
impl Default for Settings {
@ -57,6 +58,7 @@ impl Default for Settings {
api_key: None,
auto_save: false,
notifications_enabled: true,
stemedb_url: "http://localhost:18180".to_string(),
}
}
}

View File

@ -1,7 +1,7 @@
# Episteme (StemeDB) Architecture
> **Design Philosophy:** Immutable History, Probabilistic Resolution, Materialized Speed.
> **Status:** Draft Spec v1.1
> **Status:** Implementation v1.0
## 1. System Overview
@ -82,22 +82,26 @@ struct TrustPack {
}
```
### 2.4. The Storage Layout (LSM Tree)
### 2.4. The Storage Layout (Hybrid Store)
| Key | Value | Purpose |
| :--- | :--- | :--- |
| `H:{Hash}` | `Assertion` | Immutable Content Store |
| `V:{Hash}` | `List<Vote>` | The Ballot Box (Append-only) |
| `MV:{Subject}:{Predicate}` | `Assertion` | **Materialized View** (The "Winner") |
| `TP:{PackID}` | `TrustPack` | Curation Lists |
| `S:{Subject}` | `List<Hash>` | Adjacency Index |
Episteme uses a **Hybrid Storage** architecture to balance write throughput and read latency:
* **Fjall (LSM-Tree):** Used for write-heavy, append-only data (Assertions, Votes, WAL).
* **Redb (B-Tree):** Used for read-heavy, random-access data (Indexes, Materialized Views).
| Key | Value | Purpose | Backend |
| :--- | :--- | :--- | :--- |
| `H:{Hash}` | `Assertion` | Immutable Content Store | Fjall |
| `V:{Hash}` | `List<Vote>` | The Ballot Box (Append-only) | Fjall |
| `MV:{Subject}:{Predicate}` | `Assertion` | **Materialized View** (The "Winner") | Redb |
| `TP:{PackID}` | `TrustPack` | Curation Lists | Redb |
| `S:{Subject}` | `List<Hash>` | Adjacency Index | Redb |
---
## 3. The Write Path (The Ballot Box)
1. **Ingest:** Agents submit `Assertions` or `Votes`.
2. **Journal:** Written to `episteme-wal`.
2. **Journal:** Written to `episteme-wal` (Quarantine Pattern).
3. **Ballot Box:** Votes are appended to the `V:{Hash}` stream.
4. **Compactor (Async):** A background worker aggregates Votes + TrustRank to update the `MV` key.
@ -119,12 +123,12 @@ struct TrustPack {
4. Sum weights of remaining votes.
* Cost: **O(1)** (if Materialized per Pack) or **O(M)** (Fast calculation).
### Standard Lenses
* **Consensus:** Highest cluster density.
* **Authority:** Filter by **Trust Pack**.
* **Recency:** Last Writer Wins.
### Standard Lenses (Implemented)
* **Consensus:** Highest cluster density (Vote-aware).
* **Authority:** Filter by **Trust Pack** and **TrustRank**.
* **Recency:** Last Writer Wins (Hybrid Logical Clock).
* **EpochAware:** Validates against current paradigm.
* **Constraints:** (New) Returns all `must_use`/`forbidden` assertions for a context. Acts as a "Pre-Flight Check."
* **Skeptic:** Surfaces conflicts and divergence.
---
@ -148,18 +152,19 @@ The system continuously exports data to train the next generation of Agents.
## 7. Implementation Roadmap
### Phase 1: The Spine (Foundation)
* [ ] Reuse `quarantine-journal` pattern for WAL.
* [ ] Implement `Assertion`, `Epoch`, and **`Vote`** structs.
* [ ] Basic `sled` storage backend.
* [x] Reuse `quarantine-journal` pattern for WAL (`stemedb-wal`).
* [x] Implement `Assertion`, `Epoch`, and **`Vote`** structs (`stemedb-core`).
* [x] Hybrid Storage backend (`stemedb-storage`).
### Phase 2: The Lattice (Connectivity)
* [ ] **The Ballot Box**: Implement separate Vote storage stream.
* [ ] **Materializer**: Implement background worker to maintain `MV` keys.
* [ ] **Trust Packs**: Implement BitSet/BloomFilter logic for agent sets.
* [x] **The Ballot Box**: Separate Vote storage stream.
* [x] **Materializer**: Background worker to maintain `MV` keys.
* [x] **Trust Packs**: Agent sets for filtering.
* [ ] **The Meter**: Implement Budget/TAN middleware in Job Manager.
* [ ] **Agent Wallet**: Sidecar for key management/signing.
### Phase 3: The Cortex (Reasoning)
* [x] **Lenses**: `Recency`, `Consensus`, `Authority`, `Skeptic` implemented (`stemedb-lens`).
* [ ] SMT Backend & Branching.
* [ ] Vector Search.
* [ ] **Lens: Constraints**: Implement the pre-flight check logic.

View File

@ -100,7 +100,7 @@ pub enum SupersessionTypeDto {
}
/// Lens strategy for conflict resolution.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, ToSchema)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize, ToSchema, PartialEq)]
#[serde(rename_all = "PascalCase")]
pub enum LensDto {
/// Latest timestamp wins
@ -140,6 +140,10 @@ pub enum LensDto {
/// Use for agent pre-flight checks: "What MUST I use? What's FORBIDDEN?"
/// Predicate patterns: `must_use:*`, `forbidden:*`, `prefer:*`
Constraints,
/// Surfaces all claims with conflict score calculation.
/// Use for "Trust but Verify" dashboards and Cognitive Firewall overlays.
Skeptic,
}
/// Agent signature entry.

View File

@ -63,8 +63,8 @@ pub async fn scan(
file_source: aphoria::FileSource::All,
benchmark: false,
show_claims: false,
strict: false,
show_observations: false,
strict: false,
};
// Execute scan

View File

@ -9,6 +9,7 @@ use crate::{
hex,
state::AppState,
};
use stemedb_storage::MemTableEntry;
use stemedb_core::limits::MAX_NARRATIVE_LEN;
use stemedb_core::types::{
@ -68,7 +69,12 @@ pub async fn create_assertion(
let hash = blake3::hash(&serialized_assertion);
// Append to WAL via group commit buffer
state.commit_buffer.append(payload).await?;
let wal_offset = state.commit_buffer.append(payload).await?;
// Insert into MemTable for immediate visibility (read-your-writes)
// This must happen AFTER WAL commit to maintain durability guarantees
let entry = MemTableEntry::new(assertion, *hash.as_bytes(), wal_offset);
state.memtable.insert(entry);
metrics::counter!("stemedb_assertions_ingested_total").increment(1);

View File

@ -31,8 +31,8 @@ struct CandidateMetadata {
lifecycle: LifecycleStage,
}
use stemedb_lens::{
AsyncLens, ConfidenceLens, ConsensusLens, EpochAwareLens, Lens, RecencyLens,
TrustAwareAuthorityLens, VoteAwareConsensusLens,
AnalysisLens, AsyncLens, ConfidenceLens, ConsensusLens, EpochAwareLens, Lens, RecencyLens,
SkepticLens, TrustAwareAuthorityLens, VoteAwareConsensusLens,
};
use stemedb_query::Query;
use stemedb_storage::{AuditStore, GenericAuditStore, GenericTrustRankStore, GenericVoteStore};
@ -428,6 +428,15 @@ async fn apply_lens_with_confidence(
let lens = EpochAwareLens::with_recency(store);
lens.resolve_async(&assertions).await
}
LensDto::Skeptic => {
// SkepticLens returns all assertions with a conflict score, not a single winner.
// Used for "Trust but Verify" dashboards and Cognitive Firewall overlays.
let vote_store = std::sync::Arc::new(GenericVoteStore::new(store.clone()));
let trust_store = std::sync::Arc::new(GenericTrustRankStore::new(store));
let lens = SkepticLens::new(vote_store, trust_store);
let analysis = lens.analyze(&assertions).await;
return Ok((assertions, 1.0, analysis.conflict_score));
}
LensDto::LayeredConsensus => {
// LayeredConsensus returns a different response type with per-tier results.
// Use the dedicated /v1/layered endpoint for this lens.

View File

@ -1,21 +1,4 @@
//! Episteme (StemeDB) API server binary.
//!
//! This starts the HTTP API server with the following components:
//! 1. Opens Journal (WAL) for writes (via GroupCommitBuffer) and reads
//! 2. Opens HybridStore (KV storage)
//! 3. Spawns IngestWorker background task to tail WAL
//! 4. Starts axum HTTP server with OpenAPI documentation
//! 5. Optionally enables The Meter (economic throttling)
//!
//! # Environment Variables
//!
//! | Variable | Default | Description |
//! |----------|---------|-------------|
//! | `STEMEDB_WAL_DIR` | `data/wal` | Directory for WAL files |
//! | `STEMEDB_DB_DIR` | `data/db` | Directory for KV store |
//! | `STEMEDB_BIND_ADDR` | `127.0.0.1:18180` | HTTP server bind address |
//! | `STEMEDB_METER_ENABLED` | `true` | Enable economic throttling |
//! | `STEMEDB_CORPUS_DB_DIR` | (none) | Optional: Directory for Aphoria corpus DB |
use std::net::SocketAddr;
use std::path::PathBuf;
@ -38,19 +21,10 @@ use std::path::Path;
/// Server configuration.
#[derive(Debug, Clone)]
struct Config {
/// Directory for WAL files
wal_dir: PathBuf,
/// Directory for KV store
db_dir: PathBuf,
/// HTTP server bind address
bind_addr: String,
/// Enable economic throttling (The Meter)
meter_enabled: bool,
/// Optional corpus database directory (for Aphoria corpus)
corpus_db_dir: Option<PathBuf>,
/// TLS certificate path (optional - enables HTTPS)
@ -66,6 +40,9 @@ struct Config {
read_body_limit: usize,
/// HTTP request timeout in seconds (default: 30)
http_timeout_secs: u64,
/// Skip Ed25519 signature verification (unsafe, for dev/testing only)
unsafe_skip_signatures: bool,
}
impl Default for Config {
@ -82,6 +59,7 @@ impl Default for Config {
write_body_limit: 1024 * 1024, // 1MB
read_body_limit: 64 * 1024, // 64KB
http_timeout_secs: 30,
unsafe_skip_signatures: false,
}
}
}
@ -98,26 +76,20 @@ impl Config {
}
impl Config {
/// Load configuration from environment variables.
fn from_env() -> Self {
let mut config = Self::default();
if let Ok(wal_dir) = std::env::var("STEMEDB_WAL_DIR") {
config.wal_dir = PathBuf::from(wal_dir);
}
if let Ok(db_dir) = std::env::var("STEMEDB_DB_DIR") {
config.db_dir = PathBuf::from(db_dir);
}
if let Ok(bind_addr) = std::env::var("STEMEDB_BIND_ADDR") {
config.bind_addr = bind_addr;
}
if let Ok(meter_enabled) = std::env::var("STEMEDB_METER_ENABLED") {
config.meter_enabled = meter_enabled.to_lowercase() != "false" && meter_enabled != "0";
}
if let Ok(corpus_db_dir) = std::env::var("STEMEDB_CORPUS_DB_DIR") {
config.corpus_db_dir = Some(PathBuf::from(corpus_db_dir));
}
@ -149,6 +121,10 @@ impl Config {
}
}
if let Ok(val) = std::env::var("STEMEDB_UNSAFE_SKIP_SIGNATURES") {
config.unsafe_skip_signatures = val.to_lowercase() == "true" || val == "1";
}
config
}
}
@ -169,72 +145,42 @@ async fn load_tls_config(
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize tracing
let env_filter = match tracing_subscriber::EnvFilter::try_from_default_env() {
Ok(filter) => filter,
Err(_) => "stemedb_api=debug,tower_http=debug".into(),
};
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "stemedb_api=debug,tower_http=debug".into());
tracing_subscriber::registry().with(env_filter).with(tracing_subscriber::fmt::layer()).init();
// Initialize Prometheus metrics recorder (must be done before any metrics are recorded)
let prometheus_handle = PrometheusBuilder::new()
.install_recorder()
.map_err(|e| format!("Failed to install Prometheus recorder: {e}"))?;
let prometheus_handle = Arc::new(prometheus_handle);
info!("Prometheus metrics recorder initialized");
let prometheus_handle = Arc::new(PrometheusBuilder::new().install_recorder()?);
let config = Config::from_env();
info!("Starting Episteme (StemeDB) API server");
info!(?config, "Configuration loaded");
// Ensure directories exist
std::fs::create_dir_all(&config.wal_dir)?;
std::fs::create_dir_all(&config.db_dir)?;
// Open write Journal (owned by GroupCommitBuffer)
info!("Opening write Journal at {:?}", config.wal_dir);
let write_journal = Journal::open(&config.wal_dir)?;
// Open read Journal (for IngestWorker to tail)
info!("Opening read Journal at {:?}", config.wal_dir);
let read_journal = Journal::open(&config.wal_dir)?;
info!("Opening HybridStore at {:?}", config.db_dir);
let store = Arc::new(HybridStore::open(&config.db_dir)?);
let corpus_store = config.corpus_db_dir.as_ref().map(|d| {
let _ = std::fs::create_dir_all(d);
Arc::new(HybridStore::open(d).unwrap())
});
// Open optional corpus store (for Aphoria corpus)
let corpus_store = if let Some(ref corpus_dir) = config.corpus_db_dir {
// Ensure corpus directory exists
std::fs::create_dir_all(corpus_dir)?;
info!("Opening corpus HybridStore at {:?}", corpus_dir);
Some(Arc::new(HybridStore::open(corpus_dir)?))
} else {
info!("No separate corpus DB configured, using main store for corpus queries");
None
};
// Create application state (initializes GroupCommitBuffer)
let state = AppState::new(write_journal, read_journal, Arc::clone(&store), corpus_store);
// Spawn IngestWorker background task (uses read journal)
info!("Spawning IngestWorker background task");
let worker_journal = state.journal.clone();
let worker_store = store;
let worker_flush_notify = Arc::clone(&state.flush_notify);
let skip_sigs = config.unsafe_skip_signatures;
let worker_memtable = Arc::clone(&state.memtable);
tokio::spawn(async move {
let worker_result = IngestWorker::new(worker_journal, worker_store).await;
match worker_result {
match IngestWorker::new(worker_journal, worker_store).await {
Ok(worker) => {
// Wire up flush notification so IngestWorker reacts immediately to new data
let mut worker = worker.with_flush_notify(worker_flush_notify);
info!("IngestWorker started with flush notification, entering run loop");
let mut worker = worker
.with_flush_notify(worker_flush_notify)
.with_memtable(worker_memtable)
.with_skip_signature_verification(skip_sigs);
info!(skip_signatures = skip_sigs, "IngestWorker started");
worker.run().await;
}
Err(e) => {
error!("Failed to create IngestWorker: {:?}", e);
}
Err(e) => error!("Failed to create IngestWorker: {:?}", e),
}
});

View File

@ -7,7 +7,7 @@ use stemedb_query::QueryEngine;
use stemedb_storage::{
CircuitBreakerConfig, GenericAdmissionStore, GenericAliasStore, GenericApiKeyStore,
GenericCircuitBreakerStore, GenericEscalationStore, GenericQuarantineStore, GenericQuotaStore,
GenericTrustRankStore, HybridStore,
GenericTrustRankStore, HybridStore, MemTable,
};
use stemedb_wal::group_commit::{GroupCommitBuffer, GroupCommitConfig};
use stemedb_wal::Journal;
@ -81,6 +81,10 @@ pub struct AppState {
/// API key store for authentication (P4.2)
pub api_key_store: Arc<ApiKeyStoreImpl>,
/// MemTable for read-your-writes consistency.
/// Assertions are inserted here after WAL commit, before KVStore indexing.
pub memtable: Arc<MemTable>,
/// Notification channel for signaling IngestWorker when new data is flushed.
///
/// When GroupCommitBuffer successfully flushes a batch, it signals this
@ -149,6 +153,9 @@ impl AppState {
// Create API key store for authentication (P4.2)
let api_key_store = Arc::new(GenericApiKeyStore::new(Arc::clone(&store)));
// Create MemTable for read-your-writes consistency
let memtable = Arc::new(MemTable::new(10_000));
Self {
commit_buffer,
journal,
@ -162,6 +169,7 @@ impl AppState {
quarantine_store,
circuit_breaker_store,
api_key_store,
memtable,
flush_notify,
#[cfg(feature = "aphoria")]
scan_cache: ScanCache::new(),
@ -171,7 +179,8 @@ impl AppState {
/// Get a QueryEngine for this state.
///
/// Creates a new QueryEngine each time since it cannot be cloned.
/// Attaches the MemTable for read-your-writes consistency.
pub fn query_engine(&self) -> QueryEngine<HybridStore> {
QueryEngine::new(self.store.clone())
QueryEngine::new(self.store.clone()).with_memtable(Arc::clone(&self.memtable))
}
}

View File

@ -207,7 +207,8 @@ async fn test_quota_consumption_with_meter() {
let app = create_router_with_meter(state);
let (agent_id, signature) = common::sign_message("test");
// v1 signature: sign "subject:predicate"
let (agent_id, signature) = common::sign_message("QuotaTest:test");
let agent_id_hex = hex::encode(agent_id);
// Set a low quota limit for testing

View File

@ -0,0 +1,236 @@
//! HTTP integration tests for read-your-writes consistency.
//!
//! These tests verify that after POST /assert returns 201, an immediate
//! GET /query returns the assertion without waiting for background indexing.
//!
//! The MemTable provides this consistency by storing assertions after WAL
//! commit, and merging them with KVStore results during query.
#![allow(clippy::expect_used)]
mod common;
use axum::{
body::Body,
http::{Request, StatusCode},
};
use tower::ServiceExt;
use stemedb_api::create_router;
// ============================================================================
// Read-Your-Writes Consistency Tests
// ============================================================================
#[tokio::test]
async fn test_read_your_writes_immediate() {
let env = common::create_test_env().await;
let app = create_router(env.state);
// Create a unique subject for this test
let subject = format!(
"TestSubject_{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("time")
.as_nanos()
);
let predicate = "test_predicate";
// 1. POST /assert with unique subject
let assertion_json = common::create_signed_assertion_json(&subject, predicate, 42.0);
let request = Request::builder()
.uri("/v1/assert")
.method("POST")
.header("Content-Type", "application/json")
.body(Body::from(assertion_json.to_string()))
.expect("Request");
let response = app.clone().oneshot(request).await.expect("Request");
assert_eq!(response.status(), StatusCode::CREATED, "POST /assert should return 201");
// Parse the response to get the hash
let body = axum::body::to_bytes(response.into_body(), usize::MAX).await.expect("Body");
let create_response: serde_json::Value = serde_json::from_slice(&body).expect("JSON");
let created_hash = create_response["hash"].as_str().expect("hash field");
// 2. IMMEDIATELY query for the same subject (no sleep!)
let query_uri = format!("/v1/query?subject={}", subject);
let request =
Request::builder().uri(&query_uri).method("GET").body(Body::empty()).expect("Request");
let response = app.clone().oneshot(request).await.expect("Request");
assert_eq!(response.status(), StatusCode::OK, "GET /query should return 200");
let body = axum::body::to_bytes(response.into_body(), usize::MAX).await.expect("Body");
let query_result: serde_json::Value = serde_json::from_slice(&body).expect("JSON");
// 3. Assert the assertion is returned immediately
let assertions = query_result["assertions"].as_array().expect("assertions array");
assert!(!assertions.is_empty(), "Query should return the just-created assertion immediately");
// Verify we got the correct assertion
let found = assertions.iter().any(|a| a["subject"].as_str() == Some(subject.as_str()));
assert!(found, "The queried assertion should match the created subject");
// Note: resolved_hash is only set for MV hits; for MemTable lookups it may not be set
// So we just verify we found the assertion by subject
let _ = created_hash; // Suppress unused variable warning
}
#[tokio::test]
async fn test_read_your_writes_with_predicate() {
let env = common::create_test_env().await;
let app = create_router(env.state);
// Create unique identifiers
let subject = format!(
"Company_{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("time")
.as_nanos()
);
let predicate = "revenue";
// 1. POST /assert
let assertion_json = common::create_signed_assertion_json(&subject, predicate, 1_000_000.0);
let request = Request::builder()
.uri("/v1/assert")
.method("POST")
.header("Content-Type", "application/json")
.body(Body::from(assertion_json.to_string()))
.expect("Request");
let response = app.clone().oneshot(request).await.expect("Request");
assert_eq!(response.status(), StatusCode::CREATED);
// 2. IMMEDIATELY query with both subject and predicate
let query_uri = format!("/v1/query?subject={}&predicate={}", subject, predicate);
let request =
Request::builder().uri(&query_uri).method("GET").body(Body::empty()).expect("Request");
let response = app.clone().oneshot(request).await.expect("Request");
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX).await.expect("Body");
let query_result: serde_json::Value = serde_json::from_slice(&body).expect("JSON");
// 3. Verify assertion is found
let assertions = query_result["assertions"].as_array().expect("assertions array");
assert!(!assertions.is_empty(), "Query should find the assertion immediately");
let found = assertions.iter().any(|a| {
a["subject"].as_str() == Some(subject.as_str())
&& a["predicate"].as_str() == Some(predicate)
});
assert!(found, "The assertion should match subject and predicate");
}
#[tokio::test]
async fn test_read_your_writes_multiple_assertions() {
let env = common::create_test_env().await;
let app = create_router(env.state);
// Create unique subject
let subject = format!(
"MultiAssert_{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("time")
.as_nanos()
);
// Create multiple assertions with different predicates
let predicates = vec!["revenue", "profit", "employees"];
let values = vec![100.0, 20.0, 50.0];
for (predicate, value) in predicates.iter().zip(values.iter()) {
let assertion_json = common::create_signed_assertion_json(&subject, predicate, *value);
let request = Request::builder()
.uri("/v1/assert")
.method("POST")
.header("Content-Type", "application/json")
.body(Body::from(assertion_json.to_string()))
.expect("Request");
let response = app.clone().oneshot(request).await.expect("Request");
assert_eq!(response.status(), StatusCode::CREATED);
}
// Query for all assertions with this subject
let query_uri = format!("/v1/query?subject={}", subject);
let request =
Request::builder().uri(&query_uri).method("GET").body(Body::empty()).expect("Request");
let response = app.clone().oneshot(request).await.expect("Request");
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX).await.expect("Body");
let query_result: serde_json::Value = serde_json::from_slice(&body).expect("JSON");
let assertions = query_result["assertions"].as_array().expect("assertions array");
assert_eq!(assertions.len(), 3, "Should find all 3 assertions immediately");
// Verify each predicate was found
for predicate in &predicates {
let found = assertions.iter().any(|a| a["predicate"].as_str() == Some(*predicate));
assert!(found, "Should find assertion with predicate: {}", predicate);
}
}
#[tokio::test]
async fn test_read_your_writes_does_not_affect_other_subjects() {
let env = common::create_test_env().await;
let app = create_router(env.state);
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("time")
.as_nanos();
let subject1 = format!("Subject1_{}", timestamp);
let subject2 = format!("Subject2_{}", timestamp);
// Create assertion for subject1
let assertion_json = common::create_signed_assertion_json(&subject1, "test", 1.0);
let request = Request::builder()
.uri("/v1/assert")
.method("POST")
.header("Content-Type", "application/json")
.body(Body::from(assertion_json.to_string()))
.expect("Request");
let response = app.clone().oneshot(request).await.expect("Request");
assert_eq!(response.status(), StatusCode::CREATED);
// Query for subject2 (should be empty)
let query_uri = format!("/v1/query?subject={}", subject2);
let request =
Request::builder().uri(&query_uri).method("GET").body(Body::empty()).expect("Request");
let response = app.clone().oneshot(request).await.expect("Request");
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX).await.expect("Body");
let query_result: serde_json::Value = serde_json::from_slice(&body).expect("JSON");
let assertions = query_result["assertions"].as_array().expect("assertions array");
assert!(assertions.is_empty(), "Subject2 should have no assertions");
// Query for subject1 (should have one assertion)
let query_uri = format!("/v1/query?subject={}", subject1);
let request =
Request::builder().uri(&query_uri).method("GET").body(Body::empty()).expect("Request");
let response = app.clone().oneshot(request).await.expect("Request");
let body = axum::body::to_bytes(response.into_body(), usize::MAX).await.expect("Body");
let query_result: serde_json::Value = serde_json::from_slice(&body).expect("JSON");
let assertions = query_result["assertions"].as_array().expect("assertions array");
assert_eq!(assertions.len(), 1, "Subject1 should have one assertion");
}

View File

@ -217,12 +217,15 @@ async fn test_hex_decode_valid() {
let response = app.oneshot(request).await.expect("Request");
// Accept either 201 (success) or 500 (ingest worker not running in test)
// We're primarily testing that the hex validation doesn't reject valid lengths
// Accept 201 (success), 400 (invalid signature crypto — hex format was valid, but
// verify_assertion_signatures rejects non-Ed25519-valid bytes), or 500 (ingest worker
// not running in test). We're primarily testing that hex-format validation accepts
// correct-length strings without rejecting them at the parsing layer.
assert!(
response.status() == StatusCode::CREATED
|| response.status() == StatusCode::BAD_REQUEST
|| response.status() == StatusCode::INTERNAL_SERVER_ERROR,
"Expected 201 or 500, got {}",
"Expected 201, 400, or 500, got {}",
response.status()
);
}

View File

@ -12,7 +12,6 @@
#[cfg(test)]
mod tls_tests {
use super::*;
#[test]
#[ignore = "TLS tests require self-signed certificate generation"]
@ -43,7 +42,6 @@ mod tls_tests {
#[cfg(test)]
mod body_limit_tests {
use super::*;
#[test]
#[ignore = "Body limit tests require test server"]
@ -80,7 +78,6 @@ mod body_limit_tests {
#[cfg(test)]
mod timeout_tests {
use super::*;
#[test]
#[ignore = "Timeout tests require mock slow handlers"]
@ -109,7 +106,6 @@ mod timeout_tests {
#[cfg(test)]
mod secret_sanitization_tests {
use super::*;
#[test]
#[ignore = "Secret sanitization tests require log capture"]
@ -150,7 +146,6 @@ mod secret_sanitization_tests {
#[cfg(test)]
mod rate_limit_tests {
use super::*;
#[test]
#[ignore = "Rate limit tests require test server"]
@ -195,7 +190,6 @@ mod rate_limit_tests {
#[cfg(test)]
mod integration_tests {
use super::*;
#[test]
#[ignore = "Integration tests require full server setup"]
@ -224,7 +218,6 @@ mod integration_tests {
// Helper functions for test setup
#[cfg(test)]
mod test_helpers {
use super::*;
/// Generate self-signed certificate for testing.
#[allow(dead_code)]
@ -243,7 +236,7 @@ mod test_helpers {
/// Capture log output during test.
#[allow(dead_code)]
fn capture_logs<F>(f: F) -> String
fn capture_logs<F>(_f: F) -> String
where
F: FnOnce(),
{

View File

@ -285,6 +285,11 @@ impl SwimMembership {
/// Marks a node as suspected (failed to respond to probe).
#[instrument(skip(self))]
pub fn suspect_node(&self, node_id: NodeId) {
// IMPORTANT: Clone the entry and drop the RefMut BEFORE calling update_node_gauges.
// DashMap::get_mut holds a shard write lock; update_node_gauges calls iter() which
// acquires read locks on all shards. parking_lot write locks are non-reentrant —
// calling iter() while get_mut's RefMut is alive deadlocks on the same shard.
let gossip_entry = {
if let Some(mut entry) = self.members.get_mut(&node_id) {
if entry.state == NodeState::Alive {
entry.state = NodeState::Suspect;
@ -294,17 +299,26 @@ impl SwimMembership {
let _ = self.event_tx.send(MembershipEvent::NodeSuspected(node_id));
self.suspects.insert(node_id, Instant::now());
counter!("stemedb_membership_events_total", "type" => "suspected").increment(1);
self.update_node_gauges();
// Queue for gossip
self.queue_gossip(entry.clone());
Some(entry.clone())
} else {
None
}
} else {
None
}
}; // RefMut dropped here — safe to iterate the map now
if let Some(entry) = gossip_entry {
self.update_node_gauges();
self.queue_gossip(entry);
}
}
/// Marks a node as dead (suspicion timeout expired).
#[instrument(skip(self))]
pub fn fail_node(&self, node_id: NodeId) {
// IMPORTANT: same deadlock hazard as suspect_node — drop RefMut before update_node_gauges.
let gossip_entry = {
if let Some(mut entry) = self.members.get_mut(&node_id) {
if entry.state == NodeState::Suspect {
entry.state = NodeState::Dead;
@ -314,11 +328,18 @@ impl SwimMembership {
let _ = self.event_tx.send(MembershipEvent::NodeFailed(node_id));
self.suspects.remove(&node_id);
counter!("stemedb_membership_events_total", "type" => "failed").increment(1);
self.update_node_gauges();
// Queue for gossip
self.queue_gossip(entry.clone());
Some(entry.clone())
} else {
None
}
} else {
None
}
}; // RefMut dropped here
if let Some(entry) = gossip_entry {
self.update_node_gauges();
self.queue_gossip(entry);
}
}
@ -327,6 +348,8 @@ impl SwimMembership {
pub fn alive_node(&self, node_id: NodeId, info: NodeInfo) {
let lamport = self.tick();
// IMPORTANT: same deadlock hazard — drop RefMut from get_mut before update_node_gauges.
let result = {
match self.members.get_mut(&node_id) {
Some(mut entry) => {
// Only update if incarnation is higher or equal
@ -337,28 +360,35 @@ impl SwimMembership {
entry.lamport_time = lamport;
self.suspects.remove(&node_id);
self.queue_gossip(entry.clone());
if was_suspect {
counter!("stemedb_membership_events_total", "type" => "recovered")
.increment(1);
}
self.update_node_gauges();
let _ = self.event_tx.send(MembershipEvent::NodeUpdated(info));
Some((entry.clone(), MembershipEvent::NodeUpdated(info)))
} else {
None
}
}
None => {
// New node
// New node — insert() releases any lock immediately, so update_node_gauges
// is safe to call right after.
let entry = MembershipEntry::new(info.clone(), NodeState::Alive, lamport);
self.members.insert(node_id, entry.clone());
self.queue_gossip(entry);
counter!("stemedb_membership_events_total", "type" => "joined").increment(1);
self.update_node_gauges();
let _ = self.event_tx.send(MembershipEvent::NodeJoined(info));
return;
}
}
}; // RefMut dropped here
if let Some((entry, event)) = result {
self.update_node_gauges();
self.queue_gossip(entry);
let _ = self.event_tx.send(event);
}
}
/// Selects a random member for probing.

View File

@ -1,28 +1,5 @@
//! Background worker that tails the WAL and updates the KV store.
//!
//! The worker reads records from the Write-Ahead Log and persists them
//! to the storage engine using content-addressed keys.
//!
//! # Storage Layout
//!
//! Following the architecture spec, records are stored with these key prefixes:
//! - `H:{hash}` - Assertions (content-addressed by BLAKE3 hash)
//! - `V:{assertion_hash}:{vote_hash}` - Votes on assertions
//! - `E:{hash}` - Epochs (paradigm definitions)
//! - `S:{subject}` - Subject adjacency index (list of assertion hashes)
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use stemedb_core::types::HlcTimestamp;
use stemedb_storage::{GenericIndexStore, GenericVoteStore, KVStore, VectorIndex, VisualIndex};
use stemedb_wal::{Journal, HEADER_SIZE};
use tokio::sync::{Mutex, Notify};
use tracing::{debug, info, warn};
use crate::error::{IngestError, Result};
use crate::gossip::GossipBroadcast;
// Module declarations
mod processing;
mod record_types;
mod run;
@ -31,7 +8,19 @@ mod storage;
#[cfg(test)]
mod tests;
// Re-exports
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use stemedb_core::types::HlcTimestamp;
use stemedb_storage::{
GenericIndexStore, GenericVoteStore, KVStore, MemTable, VectorIndex, VisualIndex,
};
use stemedb_wal::{Journal, HEADER_SIZE};
use tokio::sync::{Mutex, Notify};
use tracing::{debug, info, warn};
use crate::error::{IngestError, Result};
use crate::gossip::GossipBroadcast;
pub use record_types::{serialize_assertion, serialize_epoch, serialize_vote, RecordType};
/// Background worker that tails the WAL and updates the KV store.
@ -41,37 +30,21 @@ pub struct IngestWorker<S> {
index_store: GenericIndexStore<Arc<S>>,
vote_store: GenericVoteStore<Arc<S>>,
current_offset: u64,
/// Optional notification channel for event-driven materialization.
/// When set, the worker signals this after each successful ingestion
/// so downstream consumers (e.g., the Materializer) can react immediately.
notify: Option<Arc<Notify>>,
/// Optional vector index for semantic similarity search.
/// When set, assertions with embedding vectors are indexed on ingestion.
vector_index: Option<Arc<dyn VectorIndex>>,
/// Optional visual index for perceptual hash similarity search.
/// When set, assertions with visual_hash are indexed on ingestion.
visual_index: Option<Arc<dyn VisualIndex>>,
/// Shutdown signal shared with Ingestor.
/// When set to true, the run() loop exits gracefully.
shutdown: Arc<AtomicBool>,
/// Hybrid Logical Clock for distributed causal ordering.
///
/// Used to generate HLC timestamps for supersessions and epoch
/// ingestion. Provides causal ordering guarantees across distributed
/// nodes, even with clock skew.
hlc: uhlc::HLC,
/// Optional gossip broadcaster for distributed replication.
///
/// When set, the worker broadcasts newly ingested assertions to peer nodes.
gossip_broadcaster: Option<Arc<dyn GossipBroadcast>>,
/// MemTable for read-your-writes eviction signaling.
/// When assertions are indexed, we signal the MemTable to evict them.
memtable: Option<Arc<MemTable>>,
/// DEBUG ONLY: Skip signature verification for demos/testing
pub skip_signature_verification: bool,
}
impl<S: KVStore + 'static> IngestWorker<S> {
/// Create a new ingest worker, resuming from the last persisted cursor.
///
/// If a cursor checkpoint exists in the KV store, the worker resumes
/// from that offset. Otherwise, it starts from the beginning of the WAL
/// (after the file header).
/// Create a new ingest worker.
pub async fn new(journal: Arc<Mutex<Journal>>, store: Arc<S>) -> Result<Self> {
let index_store = GenericIndexStore::new(store.clone());
let vote_store = GenericVoteStore::new(store.clone());
@ -86,10 +59,7 @@ impl<S: KVStore + 'static> IngestWorker<S> {
offset
}
Some(bytes) => {
warn!(
len = bytes.len(),
"Corrupt cursor value (expected 8 bytes), starting from beginning"
);
warn!(len = bytes.len(), "Corrupt cursor value, starting from beginning");
HEADER_SIZE as u64
}
None => {
@ -97,7 +67,6 @@ impl<S: KVStore + 'static> IngestWorker<S> {
HEADER_SIZE as u64
}
};
// Initialize HLC with random node ID
let hlc = uhlc::HLCBuilder::new().build();
Ok(Self {
@ -112,13 +81,12 @@ impl<S: KVStore + 'static> IngestWorker<S> {
shutdown: Arc::new(AtomicBool::new(false)),
hlc,
gossip_broadcaster: None,
memtable: None,
skip_signature_verification: false,
})
}
/// Create a new ingest worker with a shared shutdown signal.
///
/// This is used by the Ingestor to coordinate shutdown between the
/// manager and the background task.
/// Create with shutdown signal.
pub async fn with_shutdown(
journal: Arc<Mutex<Journal>>,
store: Arc<S>,
@ -129,154 +97,85 @@ impl<S: KVStore + 'static> IngestWorker<S> {
Ok(worker)
}
/// Check if shutdown has been requested.
/// Is shutdown?
pub fn is_shutdown(&self) -> bool {
self.shutdown.load(Ordering::Relaxed)
}
/// Attach a notification channel for event-driven downstream consumers.
///
/// After each successful record ingestion, the worker will signal this
/// `Notify` so consumers like the Materializer can react immediately
/// instead of polling on a fixed interval.
/// With notify.
pub fn with_notify(mut self, notify: Arc<Notify>) -> Self {
self.notify = Some(notify);
self
}
/// Attach a notification channel for event-driven WAL reading.
///
/// When the GroupCommitBuffer flushes new data to the WAL, it signals
/// this `Notify` so the worker can immediately refresh its segment list
/// and process the new records. This is the counterpart to the downstream
/// `with_notify` - this one is for upstream signaling from the writer.
///
/// Note: This uses the same internal field as `with_notify` since we
/// want to wake up on both upstream writes and downstream requests.
/// The run loop handles both cases by refreshing segments and processing.
/// With flush notify.
pub fn with_flush_notify(mut self, notify: Arc<Notify>) -> Self {
self.notify = Some(notify);
self
}
/// Attach a vector index for semantic similarity search.
///
/// When set, assertions with embedding vectors (`vector` field) are
/// automatically indexed during ingestion, enabling k-NN queries.
///
/// # Example
/// ```ignore
/// let vector_index = Arc::new(HnswVectorIndex::new(128));
/// let worker = IngestWorker::new(journal, store)
/// .await?
/// .with_vector_index(vector_index);
/// ```
/// With vector index.
pub fn with_vector_index(mut self, index: Arc<dyn VectorIndex>) -> Self {
self.vector_index = Some(index);
self
}
/// Attach a visual index for perceptual hash similarity search.
///
/// When set, assertions with visual hashes (`visual_hash` field) are
/// automatically indexed during ingestion, enabling visual similarity queries.
///
/// # Example
/// ```ignore
/// let visual_index = Arc::new(BkTreeVisualIndex::new());
/// let worker = IngestWorker::new(journal, store)
/// .await?
/// .with_visual_index(visual_index);
/// ```
/// With visual index.
pub fn with_visual_index(mut self, index: Arc<dyn VisualIndex>) -> Self {
self.visual_index = Some(index);
self
}
/// Configure the HLC with a specific node ID.
///
/// Use this when running multiple nodes in a distributed cluster to ensure
/// each node has a unique identifier for total ordering of concurrent events.
///
/// # Example
/// ```ignore
/// let node_id = uhlc::ID::try_from(&node_uuid.as_bytes()[..]).unwrap();
/// let worker = IngestWorker::new(journal, store)
/// .await?
/// .with_node_id(node_id);
/// ```
/// With node ID.
pub fn with_node_id(mut self, node_id: uhlc::ID) -> Self {
self.hlc = uhlc::HLCBuilder::new().with_id(node_id).build();
self
}
/// Attach a gossip broadcaster for distributed replication.
///
/// When set, newly ingested assertions are broadcast to peer nodes
/// for low-latency replication. The gossip layer is best-effort:
/// failures are logged but don't block the ingestion pipeline.
///
/// # Example
/// ```ignore
/// let broadcaster = GossipBroadcaster::new(peers).await?;
/// let worker = IngestWorker::new(journal, store)
/// .await?
/// .with_gossip_broadcaster(Arc::new(broadcaster));
/// ```
/// With gossip.
pub fn with_gossip_broadcaster(mut self, broadcaster: Arc<dyn GossipBroadcast>) -> Self {
self.gossip_broadcaster = Some(broadcaster);
self
}
/// Returns the gossip broadcaster if configured.
/// With MemTable for read-your-writes eviction signaling.
///
/// When assertions are indexed in KVStore, the IngestWorker signals
/// the MemTable to evict them, freeing memory.
pub fn with_memtable(mut self, memtable: Arc<MemTable>) -> Self {
self.memtable = Some(memtable);
self
}
/// Get gossip.
pub fn gossip_broadcaster(&self) -> Option<&Arc<dyn GossipBroadcast>> {
self.gossip_broadcaster.as_ref()
}
/// Generates a new HLC timestamp.
///
/// The returned timestamp is guaranteed to be greater than all previously
/// generated timestamps from this worker, even if the system clock goes
/// backwards.
///
/// Use this when creating supersessions or other records that need
/// causal ordering across distributed nodes.
/// Gen HLC.
pub fn generate_hlc_timestamp(&self) -> HlcTimestamp {
HlcTimestamp::now(&self.hlc)
}
/// Updates the HLC with a timestamp from a remote node.
///
/// Call this when receiving data from another node to ensure the local
/// clock stays synchronized. The HLC will advance to at least the
/// remote timestamp, maintaining causal ordering.
///
/// # Arguments
///
/// * `remote` - HLC timestamp received from a remote node
///
/// # Returns
///
/// Ok(()) if the clock was updated, Err if the timestamp is too far
/// in the future (clock skew protection).
/// Update HLC.
pub fn update_hlc_from_remote(&self, remote: &HlcTimestamp) -> Result<()> {
if let Some(ts) = remote.to_uhlc() {
self.hlc.update_with_timestamp(&ts).map_err(|e| {
warn!(
remote_time = remote.time_ntp64,
error = %e,
"Failed to update HLC from remote timestamp (clock skew?)"
);
warn!(remote_time = remote.time_ntp64, error = %e, "HLC update failed");
IngestError::InputValidation(format!("HLC update failed: {}", e))
})?;
}
Ok(())
}
/// Returns the current HLC node ID as bytes.
///
/// Useful for including in CRDT state or other distributed data structures.
/// Get HLC node ID.
pub fn hlc_node_id(&self) -> [u8; 16] {
self.hlc.get_id().to_le_bytes()
}
/// DEBUG ONLY: Enable skipping signature verification.
pub fn with_skip_signature_verification(mut self, skip: bool) -> Self {
self.skip_signature_verification = skip;
self
}
}

View File

@ -1,112 +1,44 @@
//! Storage helper methods for the IngestWorker.
//!
//! Contains methods for persisting cursors, writing supersession cascades,
//! and building storage keys.
//! Storage orchestration for IngestWorker.
use super::IngestWorker;
use crate::error::Result;
use stemedb_core::serde::deserialize;
use stemedb_core::types::Epoch;
use stemedb_storage::key_codec;
use stemedb_storage::KVStore;
use tracing::{debug, warn};
impl<S: KVStore + 'static> IngestWorker<S> {
/// Maximum depth for walking supersession chains at write time.
pub(super) const MAX_CASCADE_DEPTH: usize = 100;
/// Write `\x00SUPERSEDED:` markers for the full transitive closure of superseded epochs.
/// Write cascade markers for superseded epochs.
///
/// All markers point to the LATEST superseding epoch (`new_epoch_id`).
/// For chain C→B→A: writes `SUPERSEDED:B→C` and `SUPERSEDED:A→C`.
///
/// This enables O(1) "is this epoch superseded?" checks at query time:
/// just look for `\x00SUPERSEDED:{epoch_id}` key existence.
///
/// # Algorithm
///
/// 1. Start with the immediately superseded epoch
/// 2. Write marker pointing to the new (latest) epoch
/// 3. Read the superseded epoch to check if it also supersedes something
/// 4. Repeat transitively until end of chain or max depth
///
/// # Safety
///
/// - Cycle detection via visited set
/// - Max depth guard (100 levels)
/// - Missing/corrupt epochs gracefully terminate the walk
pub(super) async fn write_supersession_cascade(
/// Walks the epoch supersession chain and writes markers so that
/// queries can check if an epoch is superseded in O(1) time.
pub async fn write_supersession_cascade(
&self,
new_epoch_id: &[u8; 32],
superseded_id: &[u8; 32],
old_epoch_id: &[u8; 32],
) -> Result<()> {
let mut current_id = *superseded_id;
let mut visited = std::collections::HashSet::new();
let mut current_id = *old_epoch_id;
let mut depth = 0;
const MAX_CASCADE_DEPTH: usize = 100;
loop {
// Cycle detection
if !visited.insert(current_id) {
debug!(
epoch_id = %hex::encode(current_id),
"Cycle detected in supersession cascade, stopping write"
);
break;
}
while depth < MAX_CASCADE_DEPTH {
// Write marker: \x00SUPERSEDED:{old_id} -> {new_id}
let key = stemedb_storage::key_codec::superseded_key(&hex::encode(current_id));
self.store.put(&key, new_epoch_id).await?;
// Max depth guard
if depth >= Self::MAX_CASCADE_DEPTH {
warn!(
depth,
new_epoch = %hex::encode(new_epoch_id),
"Supersession cascade exceeded max depth"
);
break;
}
// Follow the chain: look up what this epoch superseded
let epoch_key = stemedb_storage::key_codec::epoch_key(&hex::encode(current_id));
if let Some(bytes) = self.store.get(&epoch_key).await? {
let epoch: stemedb_core::types::Epoch = stemedb_core::serde::deserialize(&bytes)
.map_err(|e| crate::error::IngestError::Serialization(e.to_string()))?;
// Write marker: \x00SUPERSEDED:{current_id} → new_epoch_id (always the LATEST)
let marker_key = key_codec::superseded_key(&hex::encode(current_id));
self.store.put(&marker_key, new_epoch_id).await?;
debug!(
superseded = %hex::encode(current_id),
by = %hex::encode(new_epoch_id),
depth,
"Wrote supersession marker"
);
// Check if current_id also superseded something (transitive closure)
let epoch_key = key_codec::epoch_key(&hex::encode(current_id));
let ancestor_epoch = match self.store.get(&epoch_key).await? {
Some(bytes) => match deserialize::<Epoch>(&bytes) {
Ok(e) => e,
Err(e) => {
debug!(
epoch_id = %hex::encode(current_id),
error = %e,
"Failed to deserialize ancestor epoch, stopping cascade"
);
break;
}
},
None => {
debug!(
epoch_id = %hex::encode(current_id),
"Ancestor epoch not found, stopping cascade"
);
break;
}
};
match ancestor_epoch.supersedes {
Some(grandparent_id) => {
current_id = grandparent_id;
if let Some(prev_id) = epoch.supersedes {
current_id = prev_id;
depth += 1;
} else {
break;
}
None => break, // End of chain
} else {
break;
}
}
Ok(())
}
}

View File

@ -1,12 +1,14 @@
//! Candidate retrieval from indexes.
//!
//! This module handles fetching assertions from different indexes:
//! - MemTable (read-your-writes, checked first for freshest data)
//! - Subject index (S:{subject})
//! - Compound index (SP:{subject}:{predicate})
//! - Vector index (HNSW k-NN)
//! - Visual index (BK-tree hamming distance)
//! - Full scan (H: prefix)
use std::collections::HashSet;
use std::sync::Arc;
use stemedb_core::types::Assertion;
@ -19,12 +21,43 @@ use crate::query::parse_hex_phash;
use super::QueryEngine;
impl<S: KVStore + 'static> QueryEngine<S> {
/// Fetch assertions for a specific subject using the subject index.
pub(super) async fn fetch_by_subject(&self, subject: &str) -> Result<Vec<Assertion>> {
let hash_list = self.index_store.get_by_subject(subject).await?;
/// Compute the hash of an assertion for deduplication.
///
/// Uses BLAKE3 hash of the serialized assertion, matching the hash
/// computed during ingestion.
fn compute_assertion_hash(assertion: &Assertion) -> Option<[u8; 32]> {
let serialized = stemedb_core::serde::serialize(assertion).ok()?;
Some(*blake3::hash(&serialized).as_bytes())
}
let mut results = Vec::with_capacity(hash_list.len());
/// Fetch assertions for a specific subject using the subject index.
///
/// Merges MemTable results (freshest) with KVStore results, deduplicating by hash.
pub(super) async fn fetch_by_subject(&self, subject: &str) -> Result<Vec<Assertion>> {
let mut seen_hashes: HashSet<[u8; 32]> = HashSet::new();
let mut results = Vec::new();
// Check MemTable first (has freshest data)
if let Some(ref memtable) = self.memtable {
for assertion in memtable.get_by_subject(subject) {
if let Some(hash) = Self::compute_assertion_hash(&assertion) {
if seen_hashes.insert(hash) {
results.push(assertion);
}
}
}
if !results.is_empty() {
debug!(subject, memtable_count = results.len(), "Found assertions in MemTable");
}
}
// Then fetch from KVStore (existing indexed data)
let hash_list = self.index_store.get_by_subject(subject).await?;
for hash in hash_list {
if !seen_hashes.insert(hash) {
continue; // Already in results from MemTable
}
let assertion_key = key_codec::assertion_key(subject, &hex::encode(hash));
if let Some(data) = self.store.get(&assertion_key).await? {
match self.deserialize_assertion(&data) {
@ -40,15 +73,42 @@ impl<S: KVStore + 'static> QueryEngine<S> {
}
/// Fetch assertions for a specific subject and predicate using the compound index.
///
/// Merges MemTable results (freshest) with KVStore results, deduplicating by hash.
pub(super) async fn fetch_by_subject_predicate(
&self,
subject: &str,
predicate: &str,
) -> Result<Vec<Assertion>> {
let hash_list = self.index_store.get_by_subject_predicate(subject, predicate).await?;
let mut seen_hashes: HashSet<[u8; 32]> = HashSet::new();
let mut results = Vec::new();
let mut results = Vec::with_capacity(hash_list.len());
// Check MemTable first (has freshest data)
if let Some(ref memtable) = self.memtable {
for assertion in memtable.get_by_subject_predicate(subject, predicate) {
if let Some(hash) = Self::compute_assertion_hash(&assertion) {
if seen_hashes.insert(hash) {
results.push(assertion);
}
}
}
if !results.is_empty() {
debug!(
subject,
predicate,
memtable_count = results.len(),
"Found assertions in MemTable"
);
}
}
// Then fetch from KVStore (existing indexed data)
let hash_list = self.index_store.get_by_subject_predicate(subject, predicate).await?;
for hash in hash_list {
if !seen_hashes.insert(hash) {
continue; // Already in results from MemTable
}
let assertion_key = key_codec::assertion_key(subject, &hex::encode(hash));
if let Some(data) = self.store.get(&assertion_key).await? {
match self.deserialize_assertion(&data) {
@ -67,11 +127,25 @@ impl<S: KVStore + 'static> QueryEngine<S> {
///
/// Used for alias resolution where a single query subject expands to
/// multiple aliased subjects (e.g., code:// and rfc:// paths).
/// Merges MemTable results with KVStore results.
pub(super) async fn fetch_by_subjects(&self, subjects: &[String]) -> Result<Vec<Assertion>> {
use std::collections::HashSet;
let mut seen_hashes: HashSet<[u8; 32]> = HashSet::new();
let mut results = Vec::new();
// Check MemTable first (has freshest data)
if let Some(ref memtable) = self.memtable {
for subject in subjects {
for assertion in memtable.get_by_subject(subject) {
if let Some(hash) = Self::compute_assertion_hash(&assertion) {
if seen_hashes.insert(hash) {
results.push(assertion);
}
}
}
}
}
// Then fetch from KVStore
for subject in subjects {
let hash_list = self.index_store.get_by_subject(subject).await?;
for hash in hash_list {
@ -102,15 +176,29 @@ impl<S: KVStore + 'static> QueryEngine<S> {
/// Fetch assertions for multiple subjects with predicate filter, deduplicating by hash.
///
/// Used for alias resolution when both subject and predicate are specified.
/// Merges MemTable results with KVStore results.
pub(super) async fn fetch_by_subjects_predicate(
&self,
subjects: &[String],
predicate: &str,
) -> Result<Vec<Assertion>> {
use std::collections::HashSet;
let mut seen_hashes: HashSet<[u8; 32]> = HashSet::new();
let mut results = Vec::new();
// Check MemTable first (has freshest data)
if let Some(ref memtable) = self.memtable {
for subject in subjects {
for assertion in memtable.get_by_subject_predicate(subject, predicate) {
if let Some(hash) = Self::compute_assertion_hash(&assertion) {
if seen_hashes.insert(hash) {
results.push(assertion);
}
}
}
}
}
// Then fetch from KVStore
for subject in subjects {
let hash_list = self.index_store.get_by_subject_predicate(subject, predicate).await?;
for hash in hash_list {

View File

@ -6,7 +6,7 @@
use std::sync::Arc;
use stemedb_core::types::Assertion;
use stemedb_storage::{AliasStore, GenericIndexStore, KVStore, VectorIndex, VisualIndex};
use stemedb_storage::{AliasStore, GenericIndexStore, KVStore, MemTable, VectorIndex, VisualIndex};
// Trait import required for IndexStore methods on GenericIndexStore
#[allow(unused_imports)]
use stemedb_storage::IndexStore;
@ -45,13 +45,24 @@ pub struct QueryEngine<S> {
pub(super) visual_index: Option<Arc<dyn VisualIndex>>,
/// Optional alias store for cross-scheme subject resolution.
pub(super) alias_store: Option<Arc<dyn AliasStore>>,
/// Optional MemTable for read-your-writes consistency.
/// When set, queries merge MemTable with KVStore to ensure recently
/// written assertions are immediately visible.
pub(super) memtable: Option<Arc<MemTable>>,
}
impl<S: KVStore + 'static> QueryEngine<S> {
/// Create a new query engine backed by the given store.
pub fn new(store: Arc<S>) -> Self {
let index_store = GenericIndexStore::new(store.clone());
Self { store, index_store, vector_index: None, visual_index: None, alias_store: None }
Self {
store,
index_store,
vector_index: None,
visual_index: None,
alias_store: None,
memtable: None,
}
}
/// Attach a vector index for k-NN similarity search.
@ -83,6 +94,16 @@ impl<S: KVStore + 'static> QueryEngine<S> {
self
}
/// Attach a MemTable for read-your-writes consistency.
///
/// When set, queries merge MemTable entries with KVStore results to
/// ensure recently written assertions are immediately visible before
/// the IngestWorker has processed them into KVStore indexes.
pub fn with_memtable(mut self, memtable: Arc<MemTable>) -> Self {
self.memtable = Some(memtable);
self
}
/// Execute a query and return matching assertions.
///
/// # Query Execution Strategy

View File

@ -7,6 +7,7 @@ pub use ed25519_dalek::{Signer, SigningKey};
pub use rand::rngs::OsRng;
pub use std::sync::Arc;
pub use stemedb_core::serde::serialize;
pub use stemedb_core::signing::compute_content_hash_v2;
pub use stemedb_core::testing::AssertionBuilder;
pub use stemedb_core::types::{
Assertion, EscalationLevel, EscalationPolicy, LifecycleStage, ObjectValue, ResolutionStatus,
@ -93,12 +94,13 @@ pub fn create_signed_assertion_v2(
.signatures(vec![])
.build();
// Serialize to get content hash
let bytes = serialize(&assertion).expect("serialize assertion for v2 signing");
let content_hash = blake3::hash(&bytes);
// Compute the canonical v2 content hash (subject:predicate:object:source_hash:...).
// Must use compute_content_hash_v2, NOT blake3::hash(serialize(&assertion)) —
// the verifier checks the canonical fields hash, not the rkyv serialization hash.
let content_hash = compute_content_hash_v2(&assertion);
// Sign the content hash (v2 enterprise format)
let signature = signing_key.sign(content_hash.as_bytes());
let signature = signing_key.sign(&content_hash);
// Add signature with version 2
assertion.signatures = vec![SignatureEntry {

View File

@ -12,7 +12,7 @@ use tokio::sync::Mutex;
use tracing::debug;
use crate::agent::Agent;
use crate::helpers::{wait_until_ingested, write_assertion_to_wal};
use crate::helpers::write_assertion_to_wal;
use crate::types::{ErrorKind, SimulationError, SimulationResult};
/// Test that RecencyLens correctly selects the most recent assertion.
@ -22,9 +22,9 @@ use crate::types::{ErrorKind, SimulationError, SimulationResult};
pub(crate) async fn run_recency_lens_test<S: KVStore + 'static>(
journal: &Arc<Mutex<Journal>>,
store: &Arc<S>,
ingestor: &stemedb_ingest::Ingestor<S>,
agents: &[Agent],
result: &mut SimulationResult,
ingestion_wait_ms: u64,
) -> bool {
let agent = &agents[0];
let subject = "RecencyTest_Entity";
@ -48,10 +48,8 @@ pub(crate) async fn run_recency_lens_test<S: KVStore + 'static>(
Some(2000),
);
// Write both to WAL and track last offset
let _old_result = match write_assertion_to_wal(journal, &old_assertion).await {
Ok(r) => r,
Err(e) => {
// Write both to WAL
if let Err(e) = write_assertion_to_wal(journal, &old_assertion).await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::WriteFailure,
@ -59,11 +57,8 @@ pub(crate) async fn run_recency_lens_test<S: KVStore + 'static>(
});
return false;
}
};
let new_result = match write_assertion_to_wal(journal, &new_assertion).await {
Ok(r) => r,
Err(e) => {
if let Err(e) = write_assertion_to_wal(journal, &new_assertion).await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::WriteFailure,
@ -71,12 +66,14 @@ pub(crate) async fn run_recency_lens_test<S: KVStore + 'static>(
});
return false;
}
};
let last_offset = new_result.end_offset;
// Wait for ingestion to reach the last offset
if let Err(e) = wait_until_ingested(&**store, last_offset, ingestion_wait_ms).await {
result.errors.push(e);
// Synchronously drain all pending WAL entries (deterministic, no background task scheduling)
if let Err(e) = ingestor.process_pending().await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::WriteFailure,
message: format!("Recency test: ingestion failed: {}", e),
});
return false;
}
@ -152,9 +149,9 @@ pub(crate) async fn run_recency_lens_test<S: KVStore + 'static>(
pub(crate) async fn run_lifecycle_test<S: KVStore + 'static>(
journal: &Arc<Mutex<Journal>>,
store: &Arc<S>,
ingestor: &stemedb_ingest::Ingestor<S>,
agents: &[Agent],
result: &mut SimulationResult,
ingestion_wait_ms: u64,
) -> bool {
let agent = &agents[0];
let subject = "LifecycleTest_Entity";
@ -178,10 +175,8 @@ pub(crate) async fn run_lifecycle_test<S: KVStore + 'static>(
Some(2000),
);
// Write both to WAL and track last offset
let _proposed_result = match write_assertion_to_wal(journal, &proposed).await {
Ok(r) => r,
Err(e) => {
// Write both to WAL
if let Err(e) = write_assertion_to_wal(journal, &proposed).await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::WriteFailure,
@ -189,11 +184,8 @@ pub(crate) async fn run_lifecycle_test<S: KVStore + 'static>(
});
return false;
}
};
let approved_result = match write_assertion_to_wal(journal, &approved).await {
Ok(r) => r,
Err(e) => {
if let Err(e) = write_assertion_to_wal(journal, &approved).await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::WriteFailure,
@ -201,12 +193,14 @@ pub(crate) async fn run_lifecycle_test<S: KVStore + 'static>(
});
return false;
}
};
let last_offset = approved_result.end_offset;
// Wait for ingestion to reach the last offset
if let Err(e) = wait_until_ingested(&**store, last_offset, ingestion_wait_ms).await {
result.errors.push(e);
// Synchronously drain all pending WAL entries
if let Err(e) = ingestor.process_pending().await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::WriteFailure,
message: format!("Lifecycle test: ingestion failed: {}", e),
});
return false;
}

View File

@ -10,9 +10,7 @@ use tokio::sync::Mutex;
use tracing::debug;
use crate::agent::Agent;
use crate::helpers::{
compute_assertion_hash, wait_until_ingested, write_assertion_to_wal, write_vote_to_wal,
};
use crate::helpers::{compute_assertion_hash, write_assertion_to_wal, write_vote_to_wal};
use crate::types::{ErrorKind, SimulationError, SimulationResult};
// ============================================================================
@ -32,9 +30,9 @@ use crate::types::{ErrorKind, SimulationError, SimulationResult};
pub(crate) async fn run_vote_consensus_test<S: KVStore + 'static>(
journal: &Arc<Mutex<Journal>>,
store: &Arc<S>,
ingestor: &stemedb_ingest::Ingestor<S>,
agents: &[Agent],
result: &mut SimulationResult,
ingestion_wait_ms: u64,
) -> bool {
// Need at least 3 agents: Alpha, Beta, Believer
if agents.len() < 3 {
@ -71,10 +69,8 @@ pub(crate) async fn run_vote_consensus_test<S: KVStore + 'static>(
Some(1001),
);
// Write both assertions to WAL and track last offset
let _alpha_result = match write_assertion_to_wal(journal, &alpha_assertion).await {
Ok(r) => r,
Err(e) => {
// Write both assertions to WAL
if let Err(e) = write_assertion_to_wal(journal, &alpha_assertion).await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::WriteFailure,
@ -82,12 +78,9 @@ pub(crate) async fn run_vote_consensus_test<S: KVStore + 'static>(
});
return false;
}
};
result.assertions_written += 1;
let beta_result = match write_assertion_to_wal(journal, &beta_assertion).await {
Ok(r) => r,
Err(e) => {
if let Err(e) = write_assertion_to_wal(journal, &beta_assertion).await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::WriteFailure,
@ -95,13 +88,15 @@ pub(crate) async fn run_vote_consensus_test<S: KVStore + 'static>(
});
return false;
}
};
result.assertions_written += 1;
let mut last_offset = beta_result.end_offset;
// Wait for assertions to be ingested
if let Err(e) = wait_until_ingested(&**store, last_offset, ingestion_wait_ms).await {
result.errors.push(e);
// Synchronously drain assertions from WAL
if let Err(e) = ingestor.process_pending().await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::WriteFailure,
message: format!("Vote consensus test: assertion ingestion failed: {}", e),
});
return false;
}
@ -135,9 +130,7 @@ pub(crate) async fn run_vote_consensus_test<S: KVStore + 'static>(
// Believer votes for Alpha's assertion (weight 1.0) - this tips the balance
let believer_vote = believer.vote(alpha_hash, 1.0);
last_offset = match write_vote_to_wal(journal, &believer_vote).await {
Ok(offset) => offset,
Err(e) => {
if let Err(e) = write_vote_to_wal(journal, &believer_vote).await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::VoteWriteFailure,
@ -145,12 +138,15 @@ pub(crate) async fn run_vote_consensus_test<S: KVStore + 'static>(
});
return false;
}
};
result.votes_written += 1;
// Wait for votes to be ingested
if let Err(e) = wait_until_ingested(&**store, last_offset, ingestion_wait_ms).await {
result.errors.push(e);
// Synchronously drain votes from WAL
if let Err(e) = ingestor.process_pending().await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::WriteFailure,
message: format!("Vote consensus test: vote ingestion failed: {}", e),
});
return false;
}
@ -236,9 +232,9 @@ pub(crate) async fn run_vote_consensus_test<S: KVStore + 'static>(
pub(crate) async fn run_troll_resistance_test<S: KVStore + 'static>(
journal: &Arc<Mutex<Journal>>,
store: &Arc<S>,
ingestor: &stemedb_ingest::Ingestor<S>,
agents: &[Agent],
result: &mut SimulationResult,
ingestion_wait_ms: u64,
) -> bool {
// Need at least 3 agents: Scientist, Troll, Ally
if agents.len() < 3 {
@ -276,25 +272,17 @@ pub(crate) async fn run_troll_resistance_test<S: KVStore + 'static>(
);
// Write both assertions to WAL and track last offset
let _scientist_result = match write_assertion_to_wal(journal, &scientist_assertion).await {
Ok(r) => r,
Err(e) => {
if let Err(e) = write_assertion_to_wal(journal, &scientist_assertion).await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::WriteFailure,
message: format!(
"Troll resistance test: failed to write scientist assertion: {}",
e
),
message: format!("Troll resistance test: failed to write scientist assertion: {}", e),
});
return false;
}
};
result.assertions_written += 1;
let troll_result = match write_assertion_to_wal(journal, &troll_assertion).await {
Ok(r) => r,
Err(e) => {
if let Err(e) = write_assertion_to_wal(journal, &troll_assertion).await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::WriteFailure,
@ -302,13 +290,15 @@ pub(crate) async fn run_troll_resistance_test<S: KVStore + 'static>(
});
return false;
}
};
result.assertions_written += 1;
let mut last_offset = troll_result.end_offset;
// Wait for assertions to be ingested
if let Err(e) = wait_until_ingested(&**store, last_offset, ingestion_wait_ms).await {
result.errors.push(e);
// Synchronously drain assertions from WAL
if let Err(e) = ingestor.process_pending().await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::WriteFailure,
message: format!("Troll resistance test: assertion ingestion failed: {}", e),
});
return false;
}
@ -342,9 +332,7 @@ pub(crate) async fn run_troll_resistance_test<S: KVStore + 'static>(
// Ally votes for scientist's assertion (weight 1.0) - tips balance in scientist's favor
let ally_vote = ally.vote(scientist_hash, 1.0);
last_offset = match write_vote_to_wal(journal, &ally_vote).await {
Ok(offset) => offset,
Err(e) => {
if let Err(e) = write_vote_to_wal(journal, &ally_vote).await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::VoteWriteFailure,
@ -352,12 +340,15 @@ pub(crate) async fn run_troll_resistance_test<S: KVStore + 'static>(
});
return false;
}
};
result.votes_written += 1;
// Wait for votes to be ingested
if let Err(e) = wait_until_ingested(&**store, last_offset, ingestion_wait_ms).await {
result.errors.push(e);
// Synchronously drain votes from WAL
if let Err(e) = ingestor.process_pending().await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::WriteFailure,
message: format!("Troll resistance test: vote ingestion failed: {}", e),
});
return false;
}

View File

@ -10,9 +10,7 @@ use tokio::sync::Mutex;
use tracing::debug;
use crate::agent::Agent;
use crate::helpers::{
cursor_key, verify_assertion_text, wait_until_ingested, write_assertion_to_wal,
};
use crate::helpers::{verify_assertion_text, write_assertion_to_wal};
use crate::types::{ErrorKind, SimulationError, SimulationResult};
// ============================================================================
@ -30,9 +28,9 @@ use crate::types::{ErrorKind, SimulationError, SimulationResult};
pub(crate) async fn run_mv_integration_test<S: KVStore + 'static>(
journal: &Arc<Mutex<Journal>>,
store: &Arc<S>,
ingestor: &stemedb_ingest::Ingestor<S>,
agents: &[Agent],
result: &mut SimulationResult,
ingestion_wait_ms: u64,
) -> bool {
let agent = &agents[0];
let subject = "MV_Test_Entity";
@ -47,22 +45,7 @@ pub(crate) async fn run_mv_integration_test<S: KVStore + 'static>(
Some(3000),
);
// Check cursor state before writing
let cursor_before = match store.get(&cursor_key()).await {
Ok(Some(bytes)) => {
if let Ok(arr) = <[u8; 8]>::try_from(bytes.as_slice()) {
u64::from_le_bytes(arr)
} else {
0
}
}
_ => 0,
};
debug!(" MV integration test: cursor before write = {}", cursor_before);
let write_result = match write_assertion_to_wal(journal, &assertion).await {
Ok(r) => r,
Err(e) => {
if let Err(e) = write_assertion_to_wal(journal, &assertion).await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::WriteFailure,
@ -70,14 +53,15 @@ pub(crate) async fn run_mv_integration_test<S: KVStore + 'static>(
});
return false;
}
};
result.assertions_written += 1;
let last_offset = write_result.end_offset;
debug!(last_offset, cursor_before, "MV integration test: wrote assertion");
// Wait for ingestion to complete
if let Err(e) = wait_until_ingested(&**store, last_offset, ingestion_wait_ms).await {
result.errors.push(e);
// Synchronously drain WAL entries
if let Err(e) = ingestor.process_pending().await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::WriteFailure,
message: format!("MV integration test: ingestion failed: {}", e),
});
return false;
}
@ -205,9 +189,9 @@ pub(crate) async fn run_mv_integration_test<S: KVStore + 'static>(
pub(crate) async fn run_fast_path_test<S: KVStore + 'static>(
journal: &Arc<Mutex<Journal>>,
store: &Arc<S>,
ingestor: &stemedb_ingest::Ingestor<S>,
agents: &[Agent],
result: &mut SimulationResult,
ingestion_wait_ms: u64,
) -> bool {
let agent = &agents[0];
let subject = "FastPath_Entity";
@ -222,9 +206,7 @@ pub(crate) async fn run_fast_path_test<S: KVStore + 'static>(
Some(3100),
);
let write_result = match write_assertion_to_wal(journal, &assertion).await {
Ok(r) => r,
Err(e) => {
if let Err(e) = write_assertion_to_wal(journal, &assertion).await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::WriteFailure,
@ -232,13 +214,15 @@ pub(crate) async fn run_fast_path_test<S: KVStore + 'static>(
});
return false;
}
};
result.assertions_written += 1;
let last_offset = write_result.end_offset;
// Wait for ingestion
if let Err(e) = wait_until_ingested(&**store, last_offset, ingestion_wait_ms).await {
result.errors.push(e);
// Synchronously drain WAL entries
if let Err(e) = ingestor.process_pending().await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::WriteFailure,
message: format!("Fast-path test: ingestion failed: {}", e),
});
return false;
}
@ -323,9 +307,9 @@ pub(crate) async fn run_fast_path_test<S: KVStore + 'static>(
pub(crate) async fn run_mv_freshness_test<S: KVStore + 'static>(
journal: &Arc<Mutex<Journal>>,
store: &Arc<S>,
ingestor: &stemedb_ingest::Ingestor<S>,
agents: &[Agent],
result: &mut SimulationResult,
ingestion_wait_ms: u64,
) -> bool {
let agent = &agents[0];
let subject = "Freshness_Entity";
@ -334,7 +318,6 @@ pub(crate) async fn run_mv_freshness_test<S: KVStore + 'static>(
let base_timestamp = 4000u64;
// Write 10 assertions with incrementing timestamps
let mut last_offset = 0u64;
for i in 0..num_assertions {
let assertion = agent.sign_assertion_with_options(
subject,
@ -344,12 +327,7 @@ pub(crate) async fn run_mv_freshness_test<S: KVStore + 'static>(
Some(base_timestamp + i as u64),
);
match write_assertion_to_wal(journal, &assertion).await {
Ok(r) => {
result.assertions_written += 1;
last_offset = r.end_offset;
}
Err(e) => {
if let Err(e) = write_assertion_to_wal(journal, &assertion).await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::WriteFailure,
@ -357,12 +335,16 @@ pub(crate) async fn run_mv_freshness_test<S: KVStore + 'static>(
});
return false;
}
}
result.assertions_written += 1;
}
// Wait for all assertions to be ingested
if let Err(e) = wait_until_ingested(&**store, last_offset, ingestion_wait_ms).await {
result.errors.push(e);
// Synchronously drain all pending WAL entries
if let Err(e) = ingestor.process_pending().await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::WriteFailure,
message: format!("MV freshness test: ingestion failed: {}", e),
});
return false;
}

View File

@ -1,21 +1,18 @@
//! Helper functions for WAL operations and ingestion synchronization.
//! Helper functions for WAL operations and assertion verification.
use std::sync::Arc;
use std::time::{Duration, Instant};
use stemedb_core::serde::serialize;
use stemedb_core::types::{Assertion, Hash, Vote};
use stemedb_ingest::{serialize_assertion, serialize_vote};
use stemedb_storage::{key_codec, KVStore};
use stemedb_wal::Journal;
use tokio::sync::Mutex;
use tracing::debug;
use crate::types::{ErrorKind, SimulationError};
/// Result from writing to WAL, includes the raw bytes and the journal offset after the write.
pub(crate) struct WalWriteResult {
pub raw_bytes: Vec<u8>,
/// The journal offset AFTER this write (use this as target for wait_until_ingested)
/// The journal offset AFTER this write.
pub end_offset: u64,
}
@ -67,67 +64,6 @@ pub(crate) fn compute_assertion_hash(assertion: &Assertion) -> Hash {
*blake3::hash(&bytes).as_bytes()
}
/// The cursor key used by the ingestor to track its progress.
/// Uses key_codec format: `\x00META:cursor:ingest`
pub(crate) fn cursor_key() -> Vec<u8> {
key_codec::cursor_key()
}
/// Wait until the ingestor cursor reaches or exceeds the target offset.
///
/// This replaces hardcoded sleep timers with cursor-based polling, making
/// tests deterministic rather than timing-dependent.
///
/// Polls every 10ms and times out after max_wait_ms milliseconds.
///
/// # Arguments
/// * `store` - The KVStore to read the cursor from
/// * `target_offset` - The minimum cursor offset to wait for
/// * `max_wait_ms` - Maximum time to wait in milliseconds
///
/// # Returns
/// * `Ok(())` if cursor reached target
/// * `Err(SimulationError)` if timeout exceeded
pub(crate) async fn wait_until_ingested<S: KVStore>(
store: &S,
target_offset: u64,
max_wait_ms: u64,
) -> Result<(), SimulationError> {
let start = Instant::now();
let timeout = Duration::from_millis(max_wait_ms);
let poll_interval = Duration::from_millis(10);
loop {
// Read current cursor position
if let Ok(Some(bytes)) = store.get(&cursor_key()).await {
if let Ok(arr) = <[u8; 8]>::try_from(bytes.as_slice()) {
let cursor = u64::from_le_bytes(arr);
// Use > (strictly greater) because journal.append() returns the START offset
// of the record. The cursor must move PAST this offset to confirm the record
// was fully processed.
if cursor > target_offset {
debug!(cursor, target_offset, "Ingestion sync: cursor passed target");
return Ok(());
}
}
}
// Check timeout
if start.elapsed() > timeout {
return Err(SimulationError {
tick: 0,
kind: ErrorKind::WriteFailure,
message: format!(
"Ingestion sync timeout: cursor did not reach {} within {}ms",
target_offset, max_wait_ms
),
});
}
tokio::time::sleep(poll_interval).await;
}
}
/// Verify that an assertion matches expected subject, predicate, and text value.
///
/// Used by arena3 tests to validate MV winner properties.

View File

@ -16,9 +16,7 @@ use crate::arenas::{
run_mv_integration_test, run_recency_lens_test, run_troll_resistance_test,
run_vote_consensus_test,
};
use crate::helpers::{
compute_assertion_hash, wait_until_ingested, write_assertion_to_wal, write_vote_to_wal,
};
use crate::helpers::{compute_assertion_hash, write_assertion_to_wal, write_vote_to_wal};
use crate::strategy::{self, AgentAction, AgentStrategy, StrategyMetrics, WorldState};
use crate::types::{
ErrorKind, SimulationConfig, SimulationError, SimulationResult, SimulationSetupError,
@ -68,12 +66,11 @@ pub async fn run_simulation(
debug!(" WAL initialized at {:?}", temp_wal_dir.path());
debug!(" KV Store initialized at {:?}", temp_db_dir.path());
// 2. Start Ingestor
let mut ingestor = Ingestor::new(journal.clone(), store.clone())
// 2. Create Ingestor (no background task - we drain synchronously via process_pending)
let ingestor = Ingestor::new(journal.clone(), store.clone())
.await
.map_err(|e| SimulationSetupError::IngestorCreate(e.to_string()))?;
ingestor.start();
debug!(" Ingestor started (background worker).");
debug!(" Ingestor created (synchronous drain mode).");
// 3. Setup Agents with strategies
let mut agents: Vec<Agent> = Vec::with_capacity(agent_count);
@ -230,12 +227,14 @@ pub async fn run_simulation(
info!(" {} assertions written to WAL.", result.assertions_written);
// 6. Wait for Ingestion (cursor-based sync)
info!("⏳ Waiting for ingestion to reach offset {}...", last_journal_offset);
if let Err(e) =
wait_until_ingested(&*store, last_journal_offset, config.ingestion_wait_ms).await
{
result.errors.push(e);
// 6. Synchronously drain all pending WAL entries (deterministic, no background scheduling)
info!("⚙️ Processing {} WAL bytes synchronously...", last_journal_offset);
if let Err(e) = ingestor.process_pending().await {
result.errors.push(SimulationError {
tick: 0,
kind: ErrorKind::WriteFailure,
message: format!("Main ingestion failed: {}", e),
});
}
// ========================================================================
@ -339,15 +338,14 @@ pub async fn run_simulation(
// ========================================================================
info!("🔬 Arena 1.2: Testing Recency Lens...");
result.recency_test_passed =
run_recency_lens_test(&journal, &store, &agents, &mut result, config.ingestion_wait_ms)
.await;
run_recency_lens_test(&journal, &store, &ingestor, &agents, &mut result).await;
// ========================================================================
// 9. Arena 1.3: Lifecycle Filtering Test
// ========================================================================
info!("🔬 Arena 1.3: Testing Lifecycle Filtering...");
result.lifecycle_test_passed =
run_lifecycle_test(&journal, &store, &agents, &mut result, config.ingestion_wait_ms).await;
run_lifecycle_test(&journal, &store, &ingestor, &agents, &mut result).await;
// ========================================================================
// 10. Arena 1.4: Query Audit Verification
@ -366,16 +364,14 @@ pub async fn run_simulation(
// ========================================================================
info!("🗳️ Arena 2.2: Testing Vote-Aware Consensus...");
result.vote_consensus_test_passed =
run_vote_consensus_test(&journal, &store, &agents, &mut result, config.ingestion_wait_ms)
.await;
run_vote_consensus_test(&journal, &store, &ingestor, &agents, &mut result).await;
// ========================================================================
// 12. Arena 2.3: Troll Vote Resistance
// ========================================================================
info!("🗳️ Arena 2.3: Testing Troll Vote Resistance...");
result.troll_resistance_test_passed =
run_troll_resistance_test(&journal, &store, &agents, &mut result, config.ingestion_wait_ms)
.await;
run_troll_resistance_test(&journal, &store, &ingestor, &agents, &mut result).await;
// ========================================================================
// ARENA 3: Materialized Views
@ -388,23 +384,21 @@ pub async fn run_simulation(
// ========================================================================
info!("✨ Arena 3.1: Testing MV Integration...");
result.mv_integration_test_passed =
run_mv_integration_test(&journal, &store, &agents, &mut result, config.ingestion_wait_ms)
.await;
run_mv_integration_test(&journal, &store, &ingestor, &agents, &mut result).await;
// ========================================================================
// 14. Arena 3.2: Fast-Path Verification
// ========================================================================
info!("✨ Arena 3.2: Testing Fast-Path Verification...");
result.fast_path_test_passed =
run_fast_path_test(&journal, &store, &agents, &mut result, config.ingestion_wait_ms).await;
run_fast_path_test(&journal, &store, &ingestor, &agents, &mut result).await;
// ========================================================================
// 15. Arena 3.3: MV Freshness Under Load
// ========================================================================
info!("✨ Arena 3.3: Testing MV Freshness Under Load...");
result.mv_freshness_test_passed =
run_mv_freshness_test(&journal, &store, &agents, &mut result, config.ingestion_wait_ms)
.await;
run_mv_freshness_test(&journal, &store, &ingestor, &agents, &mut result).await;
// ========================================================================
// ARENA 4: Agent Persona Verification
@ -469,13 +463,9 @@ pub async fn run_simulation(
result.strategy_metrics =
strategy_map.into_iter().map(|(name, metrics)| (name.to_string(), metrics)).collect();
// 16. Shut down the ingestor gracefully
//
// This is critical: we must stop the background ingestion task BEFORE
// the TempDir is dropped, otherwise the task will try to read from
// deleted WAL files.
info!("Shutting down ingestor...");
ingestor.shutdown(std::time::Duration::from_secs(2)).await;
// 16. Drop the ingestor (no background task running - we used synchronous process_pending).
// The TempDir cleanup happens after this point.
drop(ingestor);
// 17. Log summary
if result.is_success() {

View File

@ -43,7 +43,7 @@ async fn smoke_high_volume_simulation() {
AgentSpec { count: 3, strategy: StrategyType::Believer },
],
tick_count: 50,
ingestion_wait_ms: 1000, // More time for larger workload
ingestion_wait_ms: 3000, // More time for larger workload
};
let result = run_simulation(config).await.expect("Simulation setup should not fail");

View File

@ -212,6 +212,9 @@ pub mod visual_index;
/// High-velocity vote storage (The Ballot Box).
pub mod vote_store;
/// MemTable for read-your-writes consistency.
pub mod memtable;
pub use admission_store::{
AdmissionCheck, AdmissionStatus, AdmissionStatusResult, AdmissionStore, GenericAdmissionStore,
};
@ -255,6 +258,9 @@ pub use visual_index::{
};
pub use vote_store::{GenericVoteStore, VoteStore};
// MemTable exports
pub use memtable::{MemTable, MemTableEntry};
// Pattern aggregate store exports (Community Corpus)
pub use pattern_aggregate_store::{
GenericPatternAggregateStore, PatternAggregate, PatternAggregateStore,

View File

@ -0,0 +1,27 @@
//! MemTable entry type.
use stemedb_core::types::Assertion;
/// An entry in the MemTable representing an assertion waiting for KVStore indexing.
///
/// Entries are inserted after WAL commit and evicted once the IngestWorker
/// has processed them and updated the KVStore indexes.
#[derive(Debug, Clone)]
pub struct MemTableEntry {
/// The assertion data.
pub assertion: Assertion,
/// The BLAKE3 hash of the serialized assertion (content-addressed ID).
pub hash: [u8; 32],
/// The WAL offset where this assertion was written.
/// Used for eviction: entries with wal_offset <= indexed_offset can be evicted.
pub wal_offset: u64,
/// When this entry was inserted (for time-based safety eviction).
pub inserted_at: std::time::Instant,
}
impl MemTableEntry {
/// Create a new MemTableEntry.
pub fn new(assertion: Assertion, hash: [u8; 32], wal_offset: u64) -> Self {
Self { assertion, hash, wal_offset, inserted_at: std::time::Instant::now() }
}
}

View File

@ -0,0 +1,43 @@
//! MemTable for read-your-writes consistency.
//!
//! The MemTable sits between the WAL commit and KVStore indexing,
//! providing immediate visibility of assertions. This ensures that
//! after `POST /assert` returns 201, an immediate `GET /query` will
//! return the assertion without waiting for background indexing.
//!
//! # Architecture
//!
//! ```text
//! Write: POST /assert → WAL (fsync) → MemTable → return 201
//! ↓
//! Query: GET /query → MemTable KVStore → Lens → response
//!
//! Background: IngestWorker → WAL → KVStore → evict from MemTable
//! ```
//!
//! # Usage
//!
//! ```ignore
//! use stemedb_storage::memtable::{MemTable, MemTableEntry};
//!
//! let memtable = MemTable::new(10_000);
//!
//! // After WAL commit, insert into MemTable
//! let entry = MemTableEntry::new(assertion, hash, wal_offset);
//! memtable.insert(entry);
//!
//! // Query merges MemTable with KVStore
//! let assertions = memtable.get_by_subject("Tesla");
//!
//! // After IngestWorker indexes, evict from MemTable
//! memtable.advance_indexed_offset(new_offset);
//! ```
mod entry;
mod table;
#[cfg(test)]
mod tests;
pub use entry::MemTableEntry;
pub use table::MemTable;

View File

@ -0,0 +1,262 @@
//! MemTable implementation for read-your-writes consistency.
//!
//! The MemTable provides immediate visibility of assertions after WAL commit,
//! before the IngestWorker has processed them into KVStore indexes.
//!
//! # Design
//!
//! - Thread-safe via DashMap for concurrent reads/writes
//! - Three indexes for efficient lookup:
//! - by_hash: O(1) hash lookup
//! - by_subject: subject → list of hashes
//! - by_subject_predicate: (subject, predicate) → list of hashes
//! - Eviction based on WAL offset watermark
//! - Safety valve: age-based eviction for stale entries
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use dashmap::DashMap;
use stemedb_core::types::Assertion;
use tracing::{debug, warn};
use super::MemTableEntry;
/// In-memory buffer for assertions pending KVStore indexing.
///
/// Provides read-your-writes consistency: queries merge MemTable with KVStore
/// to ensure recently written assertions are immediately visible.
pub struct MemTable {
/// Primary index: hash → entry
by_hash: DashMap<[u8; 32], MemTableEntry>,
/// Subject index: subject → list of hashes
by_subject: DashMap<String, Vec<[u8; 32]>>,
/// Compound index: (subject, predicate) → list of hashes
by_subject_predicate: DashMap<(String, String), Vec<[u8; 32]>>,
/// WAL offset up to which assertions have been indexed in KVStore.
/// Entries with wal_offset <= indexed_offset are safe to evict.
indexed_offset: AtomicU64,
/// Maximum entries before triggering aggressive eviction (soft limit).
max_entries: usize,
/// Eviction age threshold for safety valve (default: 30 seconds).
max_age: Duration,
/// Count of entries (for metrics without locking).
entry_count: AtomicUsize,
}
impl MemTable {
/// Create a new MemTable with the specified capacity limit.
pub fn new(max_entries: usize) -> Self {
Self {
by_hash: DashMap::new(),
by_subject: DashMap::new(),
by_subject_predicate: DashMap::new(),
indexed_offset: AtomicU64::new(0),
max_entries,
max_age: Duration::from_secs(30),
entry_count: AtomicUsize::new(0),
}
}
/// Create a MemTable with custom max age for testing.
#[cfg(test)]
pub fn with_max_age(max_entries: usize, max_age: Duration) -> Self {
Self {
by_hash: DashMap::new(),
by_subject: DashMap::new(),
by_subject_predicate: DashMap::new(),
indexed_offset: AtomicU64::new(0),
max_entries,
max_age,
entry_count: AtomicUsize::new(0),
}
}
/// Insert an entry into the MemTable.
///
/// Updates all indexes atomically. If the hash already exists, the entry
/// is replaced (idempotent on retry).
pub fn insert(&self, entry: MemTableEntry) {
let hash = entry.hash;
let subject = entry.assertion.subject.clone();
let predicate = entry.assertion.predicate.clone();
// Insert into primary index
let was_new = self.by_hash.insert(hash, entry).is_none();
if was_new {
// Update subject index
self.by_subject.entry(subject.clone()).or_default().push(hash);
// Update compound index
self.by_subject_predicate.entry((subject, predicate)).or_default().push(hash);
self.entry_count.fetch_add(1, Ordering::Relaxed);
}
// Check if we need to evict
if self.len() > self.max_entries {
self.evict_stale_entries();
}
}
/// Get an entry by its hash.
pub fn get_by_hash(&self, hash: &[u8; 32]) -> Option<Assertion> {
self.by_hash.get(hash).map(|entry| entry.assertion.clone())
}
/// Get all assertions for a subject.
pub fn get_by_subject(&self, subject: &str) -> Vec<Assertion> {
let hashes = match self.by_subject.get(subject) {
Some(ref_multi) => ref_multi.clone(),
None => return Vec::new(),
};
let mut results = Vec::with_capacity(hashes.len());
for hash in hashes {
if let Some(entry) = self.by_hash.get(&hash) {
results.push(entry.assertion.clone());
}
}
results
}
/// Get all assertions for a subject and predicate.
pub fn get_by_subject_predicate(&self, subject: &str, predicate: &str) -> Vec<Assertion> {
let key = (subject.to_string(), predicate.to_string());
let hashes = match self.by_subject_predicate.get(&key) {
Some(ref_multi) => ref_multi.clone(),
None => return Vec::new(),
};
let mut results = Vec::with_capacity(hashes.len());
for hash in hashes {
if let Some(entry) = self.by_hash.get(&hash) {
results.push(entry.assertion.clone());
}
}
results
}
/// Advance the indexed offset watermark.
///
/// Called by IngestWorker after processing records. Entries with
/// wal_offset <= this value are now in KVStore and can be evicted.
pub fn advance_indexed_offset(&self, offset: u64) {
self.indexed_offset.fetch_max(offset, Ordering::Release);
debug!(offset, "Advanced indexed offset");
// Trigger eviction after advancing
self.evict_indexed_entries();
}
/// Get current indexed offset.
pub fn indexed_offset(&self) -> u64 {
self.indexed_offset.load(Ordering::Acquire)
}
/// Evict entries that have been indexed in KVStore.
///
/// Entries with wal_offset <= indexed_offset are safe to remove because
/// queries will find them via KVStore indexes.
pub fn evict_indexed_entries(&self) {
let indexed_up_to = self.indexed_offset.load(Ordering::Acquire);
if indexed_up_to == 0 {
return;
}
// Collect hashes to evict
let to_evict: Vec<[u8; 32]> = self
.by_hash
.iter()
.filter(|entry| entry.wal_offset <= indexed_up_to)
.map(|entry| entry.hash)
.collect();
if !to_evict.is_empty() {
debug!(count = to_evict.len(), indexed_up_to, "Evicting indexed entries");
}
for hash in to_evict {
self.remove_by_hash(&hash);
}
}
/// Evict entries older than max_age (safety valve).
///
/// This prevents unbounded memory growth if IngestWorker is slow or stuck.
fn evict_stale_entries(&self) {
let threshold = Instant::now() - self.max_age;
let to_evict: Vec<[u8; 32]> = self
.by_hash
.iter()
.filter(|entry| entry.inserted_at < threshold)
.map(|entry| entry.hash)
.collect();
if !to_evict.is_empty() {
warn!(
count = to_evict.len(),
max_age_secs = self.max_age.as_secs(),
"Safety evicting stale entries"
);
}
for hash in to_evict {
self.remove_by_hash(&hash);
}
}
/// Remove an entry by hash, updating all indexes.
fn remove_by_hash(&self, hash: &[u8; 32]) {
if let Some((_, entry)) = self.by_hash.remove(hash) {
let subject = &entry.assertion.subject;
let predicate = &entry.assertion.predicate;
// Update subject index
if let Some(mut hashes) = self.by_subject.get_mut(subject) {
hashes.retain(|h| h != hash);
}
// Update compound index
let key = (subject.clone(), predicate.clone());
if let Some(mut hashes) = self.by_subject_predicate.get_mut(&key) {
hashes.retain(|h| h != hash);
}
self.entry_count.fetch_sub(1, Ordering::Relaxed);
}
}
/// Get current entry count.
pub fn len(&self) -> usize {
self.entry_count.load(Ordering::Relaxed)
}
/// Check if empty.
pub fn is_empty(&self) -> bool {
self.len() == 0
}
/// Clear all entries (for testing).
#[cfg(test)]
pub fn clear(&self) {
self.by_hash.clear();
self.by_subject.clear();
self.by_subject_predicate.clear();
self.entry_count.store(0, Ordering::Relaxed);
}
}
impl Default for MemTable {
fn default() -> Self {
Self::new(10_000)
}
}

View File

@ -0,0 +1,248 @@
//! Unit tests for MemTable.
use std::time::Duration;
use stemedb_core::types::{Assertion, HlcTimestamp, LifecycleStage, ObjectValue, SourceClass};
use super::{MemTable, MemTableEntry};
fn make_test_assertion(subject: &str, predicate: &str) -> Assertion {
Assertion {
subject: subject.to_string(),
predicate: predicate.to_string(),
object: ObjectValue::Text("test".to_string()),
parent_hash: None,
source_hash: [0u8; 32],
source_class: SourceClass::Expert,
visual_hash: None,
epoch: None,
source_metadata: None,
narrative: None,
lifecycle: LifecycleStage::Proposed,
signatures: vec![],
confidence: 0.9,
timestamp: 1234567890,
hlc_timestamp: HlcTimestamp::default(),
vector: None,
}
}
#[test]
fn test_insert_and_get_by_hash() {
let memtable = MemTable::new(100);
let assertion = make_test_assertion("Tesla", "revenue");
let hash = [1u8; 32];
let entry = MemTableEntry::new(assertion.clone(), hash, 100);
memtable.insert(entry);
let result = memtable.get_by_hash(&hash);
assert!(result.is_some());
assert_eq!(result.as_ref().map(|a| &a.subject), Some(&"Tesla".to_string()));
assert_eq!(memtable.len(), 1);
}
#[test]
fn test_insert_and_get_by_subject() {
let memtable = MemTable::new(100);
let assertion1 = make_test_assertion("Tesla", "revenue");
let assertion2 = make_test_assertion("Tesla", "profit");
memtable.insert(MemTableEntry::new(assertion1, [1u8; 32], 100));
memtable.insert(MemTableEntry::new(assertion2, [2u8; 32], 200));
let results = memtable.get_by_subject("Tesla");
assert_eq!(results.len(), 2);
let predicates: Vec<_> = results.iter().map(|a| &a.predicate).collect();
assert!(predicates.contains(&&"revenue".to_string()));
assert!(predicates.contains(&&"profit".to_string()));
}
#[test]
fn test_insert_and_get_by_subject_predicate() {
let memtable = MemTable::new(100);
let assertion1 = make_test_assertion("Tesla", "revenue");
let assertion2 = make_test_assertion("Tesla", "revenue");
let assertion3 = make_test_assertion("Tesla", "profit");
memtable.insert(MemTableEntry::new(assertion1, [1u8; 32], 100));
memtable.insert(MemTableEntry::new(assertion2, [2u8; 32], 200));
memtable.insert(MemTableEntry::new(assertion3, [3u8; 32], 300));
let results = memtable.get_by_subject_predicate("Tesla", "revenue");
assert_eq!(results.len(), 2);
let results = memtable.get_by_subject_predicate("Tesla", "profit");
assert_eq!(results.len(), 1);
}
#[test]
fn test_eviction_by_offset() {
let memtable = MemTable::new(100);
let assertion1 = make_test_assertion("Tesla", "revenue");
let assertion2 = make_test_assertion("Apple", "revenue");
memtable.insert(MemTableEntry::new(assertion1, [1u8; 32], 100));
memtable.insert(MemTableEntry::new(assertion2, [2u8; 32], 200));
assert_eq!(memtable.len(), 2);
// Advance indexed offset to 150 - should evict first entry
memtable.advance_indexed_offset(150);
assert_eq!(memtable.len(), 1);
assert!(memtable.get_by_hash(&[1u8; 32]).is_none());
assert!(memtable.get_by_hash(&[2u8; 32]).is_some());
// Advance to 250 - should evict second entry
memtable.advance_indexed_offset(250);
assert_eq!(memtable.len(), 0);
}
#[test]
fn test_eviction_updates_subject_index() {
let memtable = MemTable::new(100);
let assertion1 = make_test_assertion("Tesla", "revenue");
let assertion2 = make_test_assertion("Tesla", "profit");
memtable.insert(MemTableEntry::new(assertion1, [1u8; 32], 100));
memtable.insert(MemTableEntry::new(assertion2, [2u8; 32], 200));
assert_eq!(memtable.get_by_subject("Tesla").len(), 2);
// Evict first entry
memtable.advance_indexed_offset(150);
let results = memtable.get_by_subject("Tesla");
assert_eq!(results.len(), 1);
assert_eq!(results[0].predicate, "profit");
}
#[test]
fn test_idempotent_insert() {
let memtable = MemTable::new(100);
let assertion = make_test_assertion("Tesla", "revenue");
let hash = [1u8; 32];
memtable.insert(MemTableEntry::new(assertion.clone(), hash, 100));
memtable.insert(MemTableEntry::new(assertion.clone(), hash, 100));
memtable.insert(MemTableEntry::new(assertion, hash, 100));
// Should only have 1 entry
assert_eq!(memtable.len(), 1);
assert_eq!(memtable.get_by_subject("Tesla").len(), 1);
}
#[test]
fn test_empty_lookups() {
let memtable = MemTable::new(100);
assert!(memtable.get_by_hash(&[0u8; 32]).is_none());
assert!(memtable.get_by_subject("Nonexistent").is_empty());
assert!(memtable.get_by_subject_predicate("Nonexistent", "pred").is_empty());
}
#[test]
fn test_multiple_subjects_isolated() {
let memtable = MemTable::new(100);
let assertion1 = make_test_assertion("Tesla", "revenue");
let assertion2 = make_test_assertion("Apple", "revenue");
memtable.insert(MemTableEntry::new(assertion1, [1u8; 32], 100));
memtable.insert(MemTableEntry::new(assertion2, [2u8; 32], 200));
assert_eq!(memtable.get_by_subject("Tesla").len(), 1);
assert_eq!(memtable.get_by_subject("Apple").len(), 1);
assert_eq!(memtable.get_by_subject("Tesla")[0].subject, "Tesla");
assert_eq!(memtable.get_by_subject("Apple")[0].subject, "Apple");
}
#[tokio::test]
async fn test_concurrent_insert_and_read() {
use std::sync::Arc;
let memtable = Arc::new(MemTable::new(10_000));
// Spawn writers
let mut handles = Vec::new();
for i in 0..10 {
let mt = Arc::clone(&memtable);
handles.push(tokio::spawn(async move {
for j in 0..100 {
let subject = format!("Subject_{}", i);
let assertion = make_test_assertion(&subject, "predicate");
let mut hash = [0u8; 32];
hash[0] = i as u8;
hash[1] = j as u8;
mt.insert(MemTableEntry::new(assertion, hash, (i * 100 + j) as u64));
}
}));
}
// Spawn readers concurrently
for i in 0..10 {
let mt = Arc::clone(&memtable);
handles.push(tokio::spawn(async move {
for _ in 0..50 {
let _ = mt.get_by_subject(&format!("Subject_{}", i));
tokio::task::yield_now().await;
}
}));
}
for handle in handles {
handle.await.unwrap();
}
// Should have 1000 entries total (10 writers * 100 entries)
assert_eq!(memtable.len(), 1000);
}
#[test]
fn test_indexed_offset_tracking() {
let memtable = MemTable::new(100);
assert_eq!(memtable.indexed_offset(), 0);
memtable.advance_indexed_offset(100);
assert_eq!(memtable.indexed_offset(), 100);
memtable.advance_indexed_offset(50); // Should not go backwards
assert_eq!(memtable.indexed_offset(), 100);
memtable.advance_indexed_offset(200);
assert_eq!(memtable.indexed_offset(), 200);
}
#[test]
fn test_stale_eviction_with_short_max_age() {
// Use a very short max age for testing
let memtable = MemTable::with_max_age(2, Duration::from_millis(10));
let assertion1 = make_test_assertion("Tesla", "revenue");
let assertion2 = make_test_assertion("Apple", "revenue");
memtable.insert(MemTableEntry::new(assertion1, [1u8; 32], 100));
// Wait for the entry to become stale
std::thread::sleep(Duration::from_millis(20));
// Insert another entry to trigger eviction
memtable.insert(MemTableEntry::new(assertion2, [2u8; 32], 200));
// The first entry should have been evicted due to age, but the second is new
// After the third insert (triggering eviction due to max_entries=2), only the newest remains
let assertion3 = make_test_assertion("Google", "revenue");
memtable.insert(MemTableEntry::new(assertion3, [3u8; 32], 300));
// First entry should be gone (stale)
assert!(memtable.get_by_hash(&[1u8; 32]).is_none());
}

116
future-vision.md Normal file
View File

@ -0,0 +1,116 @@
# Vision: Epistemic Logits (The Neuro-Symbolic Cortex)
> **Status:** Vision / L9 Roadmap
> **Target:** Solves "Intrinsic Hallucination"
> **Core Concept:** StemeDB is no longer just a database we query; it is a constraint layer applied to the model's probability distribution during inference.
---
## 1. The Problem: The "RAG Ceiling"
Current architectures (including our own Aphoria/ADK stack) rely on **Retrieval Augmented Generation (RAG)**. This is a "Glass Box" system, but it is composed of two disconnected brains:
1. **The Retriever (StemeDB):** Knows what is true, what is conflicted, and who said what.
2. **The Generator (LLM):** Knows how to predict the next token based on statistical patterns.
In our current architecture, we paste the Truth (1) into the Context Window of the Generator (2) and *hope* the Generator attends to it.
**The Failure Mode:** The Generator can still ignore the context. It can hallucinate. It can state a high-conflict fact with absolute certainty ("X is true") instead of qualified uncertainty ("Some sources claim X").
**We cannot fix this by prompting. We must fix it by math.**
---
## 2. The Solution: Epistemic Logits
**Epistemic Logits** is a decoding strategy that modifies the probability distribution of the LLM's output layer in real-time, based on the `ConflictScore` and `TrustRank` of the concepts being generated.
We move StemeDB from the **Input Layer** (Prompt) to the **Activation Layer** (Logits).
### The Core Equation
$$ P_{final}(token) = P_{model}(token) \times E(Subject, Predicate) $$
Where $E$ is the **Epistemic Function**:
* If `ConflictScore > 0.8` (High Disagreement) AND `Token` implies certainty ("is", "proven", "fact"), then $E \to 0$ (Penalty).
* If `ConflictScore > 0.8` AND `Token` implies uncertainty ("reported", "alleged", "contested"), then $E \to 1$ (Boost).
* If `SourceTier` is Low (Anecdotal) AND `Time` is old (Decayed), then $E \to 0$.
**Result:** The model *physically cannot* state a contested claim as a fact. It effectively has a "physics engine" for Truth.
---
## 3. Architecture: The Neuro-Symbolic Stack
```ascii
[ User Query ]
[ 1. Semantic Router ] ───► [ StemeDB (The Graph) ]
│ │
│ (Context) │ (Constraints & Scores)
▼ ▼
[ 2. LLM Core ] [ 3. Epistemic Decoder ]
(Transformer) (Logit Processor)
│ │
└──► [ Raw Logits ] ──────►│
│ ◄── "Don't say 'proven' if Conflict > 0.5"
[ Final Token ]
```
### Component 1: The Lookahead Mapper
To constrain logits, we must know what the model is *about* to say. We implement a lightweight "Concept Probe" (a small BERT model or sparse autoencoder) that runs parallel to the main LLM.
* **Input:** Current generation stream.
* **Output:** The `StemeDB::SubjectID` the stream is discussing.
### Component 2: The Constraint Projector
Once the Subject is identified, StemeDB projects the **Epistemic State** of that subject into a set of forbidden/boosted tokens.
* *State:* `Semaglutide::has_side_effect` -> Conflict: High.
* *Constraint:* Ban absolute assertions. Boost attribution markers ("According to FDA...", "Patients report...").
### Component 3: The Reward Loop (RLHF on Reality)
We use the `VoteStore` not just for consensus, but to train a **Reward Model**.
* **Data:** Millions of historical votes where Agents disagreed.
* **Training:** Fine-tune the LLM to prefer outputs that align with the *weighted consensus* of the Graph.
* **Outcome:** The model "intuitively" knows which sources are trustworthy (Tier 0/1) without needing RAG retrieval for every fact.
---
## 4. Implementation Roadmap (The Path to L9)
### Phase 1: Structured Decoding (The "Guardrails")
*Integrate StemeDB with grammar-constrained generation libraries (like `guidance` or `outlines`).*
* **Mechanism:** Force the LLM to output a citation struct `{ claim: "...", source_id: "...", confidence: 0.0-1.0 }` for every assertion.
* **Validation:** If the generated `source_id` does not exist in StemeDB, or if the `confidence` doesn't match the `VoteStore`, reject the token stream and regenerate.
* **Deliverable:** `crates/stemedb-guidance`: A Rust binding for grammar-constrained sampling backed by the KV store.
### Phase 2: DPO Pipeline (The "Training")
*Direct Preference Optimization using StemeDB history.*
* **Mechanism:** Export the `VoteStore` history as `(Prompt, Chosen, Rejected)` tuples.
* *Chosen:* An assertion supported by Tier 0 (Regulatory) sources.
* *Rejected:* A conflicting assertion supported only by Tier 5 (Anecdotal) sources.
* **Action:** Fine-tune a Llama-3 8B model on this dataset.
* **Deliverable:** `crates/stemedb-rlhf`: A pipeline that turns WAL segments into HuggingFace datasets.
### Phase 3: The Logit Processor (The "Cortex")
*Real-time intervention.*
* **Mechanism:** A custom sampler (integrated into `llama.cpp` or `vLLM`) that queries StemeDB's `MaterializedView` in real-time (sub-millisecond) during inference.
* **Optimization:** This requires the `HybridStore` to be memory-mapped into the inference engine's address space for zero-latency lookups.
* **Deliverable:** `episteme-inference`: A standalone inference server that speaks OpenAI API but enforces StemeDB truth constraints.
---
## 5. The Impact
When we achieve Epistemic Logits, we solve the **Liability Gap**.
Currently, no enterprise can deploy an autonomous agent for critical tasks (Medical, Legal, Finance) because they cannot guarantee the output.
With Epistemic Logits, we provide a mathematical guarantee: **"This system is incapable of stating a claim with higher confidence than the underlying evidence supports."**
This transforms AI from a creative writing tool into a **fiduciary instrument**.

View File

@ -0,0 +1,70 @@
#!/bin/bash
# API URL
API="http://localhost:18180/v1"
echo "🔥 Cognitive Firewall Demo: Real-time Truth Resolution"
echo "====================================================="
SUBJECT="Cognitive_Firewall_Test_$(date +%s)"
echo "Testing Subject: $SUBJECT"
# Generate dummy source hashes
SOURCE_HASH_FDA="aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
SOURCE_HASH_REDDIT="bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
echo "💉 Injecting Claim 1 (FDA): 'Safe'..."
curl -s -X POST "$API/assert" \
-H "Content-Type: application/json" \
-d '{
"subject": "'$SUBJECT'",
"predicate": "status",
"object": {"type": "Text", "value": "Safe"},
"source_hash": "'$SOURCE_HASH_FDA'",
"source_class": "Regulatory",
"confidence": 1.0,
"signatures": [{"agent_id": "0000000000000000000000000000000000000000000000000000000000000000", "signature": "00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "timestamp": 0, "version": 1}]
}' > /dev/null
echo "💉 Injecting Claim 2 (Reddit): 'Dangerous'..."
curl -s -X POST "$API/assert" \
-H "Content-Type: application/json" \
-d '{
"subject": "'$SUBJECT'",
"predicate": "status",
"object": {"type": "Text", "value": "Dangerous"},
"source_hash": "'$SOURCE_HASH_REDDIT'",
"source_class": "Anecdotal",
"confidence": 0.8,
"signatures": [{"agent_id": "0000000000000000000000000000000000000000000000000000000000000000", "signature": "00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "timestamp": 0, "version": 1}]
}' > /dev/null
echo "⏳ Waiting for Ingestion (Log -> KV)..."
for i in {1..10}; do
echo -n "."
sleep 2
RESPONSE=$(curl -s -G "$API/query" \
--data-urlencode "subject=$SUBJECT" \
--data-urlencode "predicate=status" \
--data-urlencode "lens=Skeptic")
COUNT=$(echo "$RESPONSE" | jq -r '.total_count // 0')
if [ "$COUNT" -gt 0 ]; then
echo " Success!"
break
fi
done
CONFLICT=$(echo "$RESPONSE" | jq -r '.conflict_score // 0')
echo "-----------------------------------------------------"
echo "📊 Results:"
echo " Assertions Found: $COUNT"
echo " Conflict Score: $CONFLICT"
if [ "$COUNT" -eq 0 ]; then
echo "❌ ERROR: No assertions found after 20s."
elif (( $(echo "$CONFLICT > 0.5" | bc -l) )); then
echo "🔴 RED ALERT: High Conflict Detected! Firewall Active."
else
echo "🟢 GREEN: Consensus Reached."
fi
echo "-----------------------------------------------------"