//! Helper functions for WAL operations and ingestion synchronization. use std::sync::Arc; use std::time::{Duration, Instant}; use stemedb_core::serde::serialize; use stemedb_core::types::{Assertion, Hash, Vote}; use stemedb_ingest::{serialize_assertion, serialize_vote}; use stemedb_storage::{key_codec, KVStore}; use stemedb_wal::Journal; use tokio::sync::Mutex; use tracing::debug; use crate::types::{ErrorKind, SimulationError}; /// Result from writing to WAL, includes the raw bytes and the journal offset after the write. pub(crate) struct WalWriteResult { pub raw_bytes: Vec, /// The journal offset AFTER this write (use this as target for wait_until_ingested) pub end_offset: u64, } /// Write an assertion to the WAL and track it for verification. /// Returns the raw bytes and the journal offset after the write. pub(crate) async fn write_assertion_to_wal( journal: &Arc>, assertion: &Assertion, ) -> Result { // Serialize with header for WAL let wal_bytes = serialize_assertion(assertion).map_err(|e| format!("Failed to serialize: {}", e))?; // Serialize raw for hash computation let raw_bytes = serialize(assertion).map_err(|e| format!("Failed to serialize raw: {}", e))?; // Write to WAL and get the offset after write let mut journal_lock = journal.lock().await; let end_offset = journal_lock.append(wal_bytes).map_err(|e| format!("WAL write failed: {}", e))?; Ok(WalWriteResult { raw_bytes, end_offset }) } /// Write a vote to the WAL. /// /// The vote flows through the full pipeline: WAL → IngestWorker → VoteStore, /// which automatically updates vote count and aggregate weight caches. /// Returns the journal offset after the write. pub(crate) async fn write_vote_to_wal( journal: &Arc>, vote: &Vote, ) -> Result { let wal_bytes = serialize_vote(vote).map_err(|e| format!("Failed to serialize vote: {}", e))?; let mut journal_lock = journal.lock().await; let end_offset = journal_lock.append(wal_bytes).map_err(|e| format!("WAL vote write failed: {}", e))?; Ok(end_offset) } /// Compute the content-addressed hash of an assertion. pub(crate) fn compute_assertion_hash(assertion: &Assertion) -> Hash { let bytes = match serialize(assertion) { Ok(b) => b, Err(_) => return [0u8; 32], }; *blake3::hash(&bytes).as_bytes() } /// The cursor key used by the ingestor to track its progress. /// Uses key_codec format: `\x00META:cursor:ingest` pub(crate) fn cursor_key() -> Vec { key_codec::cursor_key() } /// Wait until the ingestor cursor reaches or exceeds the target offset. /// /// This replaces hardcoded sleep timers with cursor-based polling, making /// tests deterministic rather than timing-dependent. /// /// Polls every 10ms and times out after max_wait_ms milliseconds. /// /// # Arguments /// * `store` - The KVStore to read the cursor from /// * `target_offset` - The minimum cursor offset to wait for /// * `max_wait_ms` - Maximum time to wait in milliseconds /// /// # Returns /// * `Ok(())` if cursor reached target /// * `Err(SimulationError)` if timeout exceeded pub(crate) async fn wait_until_ingested( store: &S, target_offset: u64, max_wait_ms: u64, ) -> Result<(), SimulationError> { let start = Instant::now(); let timeout = Duration::from_millis(max_wait_ms); let poll_interval = Duration::from_millis(10); loop { // Read current cursor position if let Ok(Some(bytes)) = store.get(&cursor_key()).await { if let Ok(arr) = <[u8; 8]>::try_from(bytes.as_slice()) { let cursor = u64::from_le_bytes(arr); // Use > (strictly greater) because journal.append() returns the START offset // of the record. The cursor must move PAST this offset to confirm the record // was fully processed. if cursor > target_offset { debug!(cursor, target_offset, "Ingestion sync: cursor passed target"); return Ok(()); } } } // Check timeout if start.elapsed() > timeout { return Err(SimulationError { tick: 0, kind: ErrorKind::WriteFailure, message: format!( "Ingestion sync timeout: cursor did not reach {} within {}ms", target_offset, max_wait_ms ), }); } tokio::time::sleep(poll_interval).await; } } /// Verify that an assertion matches expected subject, predicate, and text value. /// /// Used by arena3 tests to validate MV winner properties. pub(crate) fn verify_assertion_text( assertion: &Assertion, expected_subject: &str, expected_predicate: &str, expected_value: &str, test_name: &str, ) -> Result<(), SimulationError> { use stemedb_core::types::ObjectValue; if assertion.subject != expected_subject || assertion.predicate != expected_predicate { return Err(SimulationError { tick: 0, kind: ErrorKind::MaterializerFailure, message: format!( "{}: subject/predicate mismatch. Expected {}:{}, got {}:{}", test_name, expected_subject, expected_predicate, assertion.subject, assertion.predicate ), }); } if let ObjectValue::Text(ref value) = assertion.object { if value != expected_value { return Err(SimulationError { tick: 0, kind: ErrorKind::MaterializerFailure, message: format!( "{}: wrong value. Expected '{}', got '{}'", test_name, expected_value, value ), }); } } else { return Err(SimulationError { tick: 0, kind: ErrorKind::MaterializerFailure, message: format!("{}: object is not Text", test_name), }); } Ok(()) }