Add CRC32C checksums to WAL record format (v2), implement crash recovery with automatic truncation of corrupt records, add feature-gated group commit buffer for batched fsync under concurrent load, and implement log rotation via segment files with global offset addressing. Key changes: - Record format v2: [len:u32][crc32c:u32][blake3:32][payload:N] - recover_file() scans and truncates corrupt tail records - GroupCommitBuffer batches fsync via MPSC channel (tokio feature gate) - SegmentManager with binary search resolution and cursor-based cleanup - Journal::read() auto-refreshes segments on miss for writer/reader split - Split recovery.rs and key_codec.rs into directory modules for 500-line max Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
124 lines
4.3 KiB
Rust
124 lines
4.3 KiB
Rust
//! Shared test utilities for HTTP integration tests.
|
|
|
|
#![allow(clippy::expect_used)]
|
|
#![allow(dead_code)]
|
|
|
|
use ed25519_dalek::{Signer, SigningKey};
|
|
use rand::rngs::OsRng;
|
|
use serde_json::json;
|
|
use std::sync::Arc;
|
|
use stemedb_api::AppState;
|
|
use stemedb_ingest::Ingestor;
|
|
use stemedb_storage::HybridStore;
|
|
use stemedb_wal::Journal;
|
|
use tokio::sync::Mutex;
|
|
|
|
/// Test environment that keeps temp directories alive for the test duration.
|
|
pub struct TestEnvironment {
|
|
pub _temp_dir: tempfile::TempDir,
|
|
pub state: AppState,
|
|
}
|
|
|
|
/// Test environment with full ingestor for roundtrip tests.
|
|
pub struct TestEnvironmentWithIngestor {
|
|
pub _temp_dir: tempfile::TempDir,
|
|
pub state: AppState,
|
|
pub ingestor: Ingestor<HybridStore>,
|
|
}
|
|
|
|
/// Helper to create a test environment with temporary directories.
|
|
pub async fn create_test_env() -> TestEnvironment {
|
|
let temp_dir = tempfile::tempdir().expect("failed to create temp dir");
|
|
let wal_dir = temp_dir.path().join("wal");
|
|
let db_dir = temp_dir.path().join("db");
|
|
|
|
std::fs::create_dir_all(&wal_dir).expect("failed to create wal dir");
|
|
std::fs::create_dir_all(&db_dir).expect("failed to create db dir");
|
|
|
|
let write_journal = Journal::open(&wal_dir).expect("failed to open write journal");
|
|
let read_journal = Journal::open(&wal_dir).expect("failed to open read journal");
|
|
let store = Arc::new(HybridStore::open(&db_dir).expect("failed to open store"));
|
|
|
|
let state = AppState::new(write_journal, read_journal, store);
|
|
|
|
TestEnvironment { _temp_dir: temp_dir, state }
|
|
}
|
|
|
|
/// Helper to create a test environment with a running ingestor for roundtrip tests.
|
|
///
|
|
/// Note: We need to share the same store between AppState and Ingestor.
|
|
pub async fn create_test_env_with_ingestor() -> TestEnvironmentWithIngestor {
|
|
let temp_dir = tempfile::tempdir().expect("failed to create temp dir");
|
|
let wal_dir = temp_dir.path().join("wal");
|
|
let db_dir = temp_dir.path().join("db");
|
|
|
|
std::fs::create_dir_all(&wal_dir).expect("failed to create wal dir");
|
|
std::fs::create_dir_all(&db_dir).expect("failed to create db dir");
|
|
|
|
// Create shared store
|
|
let store = Arc::new(HybridStore::open(&db_dir).expect("failed to open store"));
|
|
|
|
// Journal for ingestor (reading) - WAL allows multiple readers
|
|
let journal_for_ingestor =
|
|
Arc::new(Mutex::new(Journal::open(&wal_dir).expect("failed to open journal for ingestor")));
|
|
|
|
// Create ingestor with shared store
|
|
let ingestor = Ingestor::new(journal_for_ingestor, store.clone())
|
|
.await
|
|
.expect("failed to create ingestor");
|
|
|
|
// Create AppState with write and read journals
|
|
let write_journal = Journal::open(&wal_dir).expect("failed to open write journal");
|
|
let read_journal = Journal::open(&wal_dir).expect("failed to open read journal");
|
|
let state = AppState::new(write_journal, read_journal, store);
|
|
|
|
TestEnvironmentWithIngestor { _temp_dir: temp_dir, state, ingestor }
|
|
}
|
|
|
|
/// Helper to sign a message using Ed25519.
|
|
#[allow(dead_code)]
|
|
pub fn sign_message(message: &str) -> ([u8; 32], [u8; 64]) {
|
|
let mut csprng = OsRng;
|
|
let signing_key = SigningKey::generate(&mut csprng);
|
|
let verifying_key = signing_key.verifying_key();
|
|
|
|
let signature = signing_key.sign(message.as_bytes());
|
|
|
|
(verifying_key.to_bytes(), signature.to_bytes())
|
|
}
|
|
|
|
/// Create a properly signed assertion for testing.
|
|
pub fn create_signed_assertion_json(
|
|
subject: &str,
|
|
predicate: &str,
|
|
value: f64,
|
|
) -> serde_json::Value {
|
|
let mut csprng = OsRng;
|
|
let signing_key = SigningKey::generate(&mut csprng);
|
|
let verifying_key = signing_key.verifying_key();
|
|
|
|
let message = format!("{}:{}", subject, predicate);
|
|
let signature = signing_key.sign(message.as_bytes());
|
|
|
|
let timestamp = std::time::SystemTime::now()
|
|
.duration_since(std::time::UNIX_EPOCH)
|
|
.map(|d| d.as_secs())
|
|
.unwrap_or(0);
|
|
|
|
json!({
|
|
"subject": subject,
|
|
"predicate": predicate,
|
|
"object": {"type": "Number", "value": value},
|
|
"confidence": 0.95,
|
|
"source_class": "Expert",
|
|
"lifecycle": "Proposed",
|
|
"signatures": [{
|
|
"agent_id": hex::encode(verifying_key.to_bytes()),
|
|
"signature": hex::encode(signature.to_bytes()),
|
|
"timestamp": timestamp
|
|
}],
|
|
"source_hash": "0".repeat(64),
|
|
"timestamp": timestamp
|
|
})
|
|
}
|