Add CRC32C checksums to WAL record format (v2), implement crash recovery with automatic truncation of corrupt records, add feature-gated group commit buffer for batched fsync under concurrent load, and implement log rotation via segment files with global offset addressing. Key changes: - Record format v2: [len:u32][crc32c:u32][blake3:32][payload:N] - recover_file() scans and truncates corrupt tail records - GroupCommitBuffer batches fsync via MPSC channel (tokio feature gate) - SegmentManager with binary search resolution and cursor-based cleanup - Journal::read() auto-refreshes segments on miss for writer/reader split - Split recovery.rs and key_codec.rs into directory modules for 500-line max Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
146 lines
4.8 KiB
Rust
146 lines
4.8 KiB
Rust
#![allow(missing_docs, clippy::unwrap_used, clippy::expect_used)]
|
|
|
|
use criterion::{criterion_group, criterion_main, Criterion};
|
|
use stemedb_storage::key_codec;
|
|
use stemedb_storage::{HybridStore, KVStore};
|
|
use tokio::runtime::Runtime;
|
|
|
|
fn sequential_put(c: &mut Criterion) {
|
|
let rt = Runtime::new().expect("runtime");
|
|
let store = HybridStore::open_temp().expect("store");
|
|
|
|
c.bench_function("sequential_put_10k", |b| {
|
|
b.iter(|| {
|
|
rt.block_on(async {
|
|
for i in 0..10_000u64 {
|
|
let hash_hex = format!("bench_{}", i);
|
|
let key = key_codec::assertion_key("Bench", &hash_hex);
|
|
let value = format!("value_{}", i);
|
|
store.put(&key, value.as_bytes()).await.unwrap();
|
|
}
|
|
})
|
|
})
|
|
});
|
|
}
|
|
|
|
fn random_get(c: &mut Criterion) {
|
|
let rt = Runtime::new().expect("runtime");
|
|
let store = HybridStore::open_temp().expect("store");
|
|
|
|
// Pre-populate (read-heavy keys → redb via S: tag)
|
|
rt.block_on(async {
|
|
for i in 0..10_000u64 {
|
|
let key = key_codec::subject_predicate_key("Bench", &format!("pred_{}", i));
|
|
let value = format!("value_{}", i);
|
|
store.put(&key, value.as_bytes()).await.unwrap();
|
|
}
|
|
});
|
|
|
|
c.bench_function("random_get_10k", |b| {
|
|
b.iter(|| {
|
|
rt.block_on(async {
|
|
for i in 0..10_000u64 {
|
|
let key = key_codec::subject_predicate_key("Bench", &format!("pred_{}", i));
|
|
let _ = store.get(&key).await.unwrap();
|
|
}
|
|
})
|
|
})
|
|
});
|
|
}
|
|
|
|
fn prefix_scan(c: &mut Criterion) {
|
|
let rt = Runtime::new().expect("runtime");
|
|
let store = HybridStore::open_temp().expect("store");
|
|
|
|
// Pre-populate: 1K keys under "target", 9K under "other"
|
|
rt.block_on(async {
|
|
for i in 0..1_000u64 {
|
|
let key = key_codec::subject_predicate_key("target", &format!("pred_{}", i));
|
|
store.put(&key, b"matching").await.unwrap();
|
|
}
|
|
for i in 0..9_000u64 {
|
|
let key = key_codec::subject_predicate_key("other", &format!("pred_{}", i));
|
|
store.put(&key, b"non_matching").await.unwrap();
|
|
}
|
|
});
|
|
|
|
c.bench_function("prefix_scan_1k_of_10k", |b| {
|
|
b.iter(|| {
|
|
rt.block_on(async {
|
|
let prefix = key_codec::subject_scan_prefix("target");
|
|
let results = store.scan_prefix(&prefix).await.unwrap();
|
|
assert_eq!(results.len(), 1_000);
|
|
})
|
|
})
|
|
});
|
|
}
|
|
|
|
fn atomic_increment(c: &mut Criterion) {
|
|
let rt = Runtime::new().expect("runtime");
|
|
let store = HybridStore::open_temp().expect("store");
|
|
|
|
c.bench_function("atomic_increment_10k", |b| {
|
|
b.iter(|| {
|
|
rt.block_on(async {
|
|
for i in 0..10_000u64 {
|
|
let hash_hex = format!("counter_{}", i % 100);
|
|
let key = key_codec::vote_count_key("Bench", &hash_hex);
|
|
store.fetch_and_add_u64(&key, 1).await.unwrap();
|
|
}
|
|
})
|
|
})
|
|
});
|
|
}
|
|
|
|
fn mixed_workload(c: &mut Criterion) {
|
|
let rt = Runtime::new().expect("runtime");
|
|
let store = HybridStore::open_temp().expect("store");
|
|
|
|
// Pre-populate read-heavy keys
|
|
rt.block_on(async {
|
|
for i in 0..1_000u64 {
|
|
let key = key_codec::subject_predicate_key("mixed", &format!("pred_{}", i));
|
|
store.put(&key, b"initial_value").await.unwrap();
|
|
}
|
|
});
|
|
|
|
c.bench_function("mixed_70r_20w_10s", |b| {
|
|
b.iter(|| {
|
|
rt.block_on(async {
|
|
for i in 0..1_000u64 {
|
|
match i % 10 {
|
|
// 70% reads (redb path)
|
|
0..=6 => {
|
|
let key = key_codec::subject_predicate_key(
|
|
"mixed",
|
|
&format!("pred_{}", i % 1000),
|
|
);
|
|
let _ = store.get(&key).await.unwrap();
|
|
}
|
|
// 20% writes (fjall path)
|
|
7 | 8 => {
|
|
let key = key_codec::assertion_key("mixed", &format!("write_{}", i));
|
|
store.put(&key, b"new_value").await.unwrap();
|
|
}
|
|
// 10% scans (redb path)
|
|
_ => {
|
|
let prefix = key_codec::subject_scan_prefix("mixed");
|
|
let _ = store.scan_prefix(&prefix).await.unwrap();
|
|
}
|
|
}
|
|
}
|
|
})
|
|
})
|
|
});
|
|
}
|
|
|
|
criterion_group!(
|
|
benches,
|
|
sequential_put,
|
|
random_get,
|
|
prefix_scan,
|
|
atomic_increment,
|
|
mixed_workload
|
|
);
|
|
criterion_main!(benches);
|