stemedb/crates/stemedb-sim/src/helpers.rs
jordan 02ecac9a07 fix: merge upstream 10 commits, fix DashMap deadlock, deterministic sim ingestion
Merged 10 upstream commits (MemTable, read-your-writes tests, feed endpoint,
security hardening, signed assertions, source registry, dashboard enhancements)
and fixed all test failures across the full workspace (2656/2656 passing).

Key fixes:
- fix(cluster): DashMap deadlock in swim.rs suspect_node/fail_node/alive_node
  - DashMap::get_mut RefMut + iter() on same map = non-reentrant write lock deadlock
  - Fix: extract clone in scoped block to drop RefMut before calling update_node_gauges()
  - 6 previously-hanging SWIM tests now pass in <2s
- fix(sim): replace background-task+polling ingestion with synchronous process_pending()
  - smoke_high_volume_simulation was CPU-starved under 2656 parallel tests
  - Removed ingestor.start() + wait_until_ingested() pattern throughout sim
  - All arena functions now call ingestor.process_pending() directly (deterministic)
- fix(test): v2 signature helper used wrong hash (rkyv vs canonical compute_content_hash_v2)
- fix(test): quota test signed "test" but v1 requires "subject:predicate" format
- fix(test): http_validation now accepts 400 for valid-format-but-invalid-crypto hex
- fix(test): scale_adaptive micro tier assertions updated (auto_promote upstream change)
- config: add nextest.toml with slow-timeout for background-task-tests group

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-20 20:27:32 -07:00

115 lines
3.8 KiB
Rust

//! Helper functions for WAL operations and assertion verification.
use std::sync::Arc;
use stemedb_core::serde::serialize;
use stemedb_core::types::{Assertion, Hash, Vote};
use stemedb_ingest::{serialize_assertion, serialize_vote};
use stemedb_wal::Journal;
use tokio::sync::Mutex;
use crate::types::{ErrorKind, SimulationError};
/// Result from writing to WAL, includes the raw bytes and the journal offset after the write.
pub(crate) struct WalWriteResult {
pub raw_bytes: Vec<u8>,
/// The journal offset AFTER this write.
pub end_offset: u64,
}
/// Write an assertion to the WAL and track it for verification.
/// Returns the raw bytes and the journal offset after the write.
pub(crate) async fn write_assertion_to_wal(
journal: &Arc<Mutex<Journal>>,
assertion: &Assertion,
) -> Result<WalWriteResult, String> {
// Serialize with header for WAL
let wal_bytes =
serialize_assertion(assertion).map_err(|e| format!("Failed to serialize: {}", e))?;
// Serialize raw for hash computation
let raw_bytes = serialize(assertion).map_err(|e| format!("Failed to serialize raw: {}", e))?;
// Write to WAL and get the offset after write
let mut journal_lock = journal.lock().await;
let end_offset =
journal_lock.append(wal_bytes).map_err(|e| format!("WAL write failed: {}", e))?;
Ok(WalWriteResult { raw_bytes, end_offset })
}
/// Write a vote to the WAL.
///
/// The vote flows through the full pipeline: WAL → IngestWorker → VoteStore,
/// which automatically updates vote count and aggregate weight caches.
/// Returns the journal offset after the write.
pub(crate) async fn write_vote_to_wal(
journal: &Arc<Mutex<Journal>>,
vote: &Vote,
) -> Result<u64, String> {
let wal_bytes = serialize_vote(vote).map_err(|e| format!("Failed to serialize vote: {}", e))?;
let mut journal_lock = journal.lock().await;
let end_offset =
journal_lock.append(wal_bytes).map_err(|e| format!("WAL vote write failed: {}", e))?;
Ok(end_offset)
}
/// Compute the content-addressed hash of an assertion.
pub(crate) fn compute_assertion_hash(assertion: &Assertion) -> Hash {
let bytes = match serialize(assertion) {
Ok(b) => b,
Err(_) => return [0u8; 32],
};
*blake3::hash(&bytes).as_bytes()
}
/// Verify that an assertion matches expected subject, predicate, and text value.
///
/// Used by arena3 tests to validate MV winner properties.
pub(crate) fn verify_assertion_text(
assertion: &Assertion,
expected_subject: &str,
expected_predicate: &str,
expected_value: &str,
test_name: &str,
) -> Result<(), SimulationError> {
use stemedb_core::types::ObjectValue;
if assertion.subject != expected_subject || assertion.predicate != expected_predicate {
return Err(SimulationError {
tick: 0,
kind: ErrorKind::MaterializerFailure,
message: format!(
"{}: subject/predicate mismatch. Expected {}:{}, got {}:{}",
test_name,
expected_subject,
expected_predicate,
assertion.subject,
assertion.predicate
),
});
}
if let ObjectValue::Text(ref value) = assertion.object {
if value != expected_value {
return Err(SimulationError {
tick: 0,
kind: ErrorKind::MaterializerFailure,
message: format!(
"{}: wrong value. Expected '{}', got '{}'",
test_name, expected_value, value
),
});
}
} else {
return Err(SimulationError {
tick: 0,
kind: ErrorKind::MaterializerFailure,
message: format!("{}: object is not Text", test_name),
});
}
Ok(())
}