stemedb/crates/stemedb-storage/src/hybrid_backend.rs
jordan 3320c24afa feat: WAL hardening (Phase 5B) - CRC32C, crash recovery, group commit, log rotation
Add CRC32C checksums to WAL record format (v2), implement crash recovery
with automatic truncation of corrupt records, add feature-gated group commit
buffer for batched fsync under concurrent load, and implement log rotation
via segment files with global offset addressing.

Key changes:
- Record format v2: [len:u32][crc32c:u32][blake3:32][payload:N]
- recover_file() scans and truncates corrupt tail records
- GroupCommitBuffer batches fsync via MPSC channel (tokio feature gate)
- SegmentManager with binary search resolution and cursor-based cleanup
- Journal::read() auto-refreshes segments on miss for writer/reader split
- Split recovery.rs and key_codec.rs into directory modules for 500-line max

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-02 12:36:35 -07:00

353 lines
14 KiB
Rust

use crate::error::{Result, StorageError};
use crate::fjall_backend::FjallStore;
use crate::key_codec;
use crate::redb_backend::RedbStore;
use crate::traits::KVStore;
use async_trait::async_trait;
use std::path::Path;
use tracing::instrument;
/// Which backend handles a given key.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum Backend {
/// Fjall (LSM) — optimized for write-heavy workloads.
Fjall,
/// Redb (B-tree) — optimized for read-heavy workloads.
Redb,
}
/// Hybrid storage backend that routes keys to fjall (write-heavy) or redb (read-heavy).
///
/// Keys follow the `key_codec` format:
/// - Subject-prefixed: `{subject}\x00{TAG}:{suffix}`
/// - Global: `\x00{TAG}:{suffix}`
///
/// Routing extracts the TAG and dispatches:
/// - **Fjall**: `H:` (assertions), `V:` (votes), `VC:` (vote counts), `VW:` (vote weights),
/// `E:` (epochs), `SUPERSEDED:`, `META:` (cursors, counters)
/// - **Redb**: `S:` (subject index), `SP:` (compound index), `MV:` (materialized views),
/// `TRUST:` (trust ranks), `AUD:` (audits), `QUOTA:` (quotas), `TP:` (trust packs),
/// `GS:` (gold standards), `ESC:` (escalations), and everything else
pub struct HybridStore {
fjall: FjallStore,
redb: RedbStore,
_temp_dir: Option<tempfile::TempDir>,
}
impl std::fmt::Debug for HybridStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HybridStore").finish()
}
}
/// Route a key to the appropriate backend based on its tag.
///
/// Uses `key_codec::extract_tag` to parse the tag portion from keys in
/// `{subject}\x00{TAG}:{suffix}` or `\x00{TAG}:{suffix}` format.
fn route(key: &[u8]) -> Backend {
let tag = key_codec::extract_tag(key);
if tag.starts_with(b"H:")
|| tag.starts_with(b"V:")
|| tag.starts_with(b"VC:")
|| tag.starts_with(b"VW:")
|| tag.starts_with(b"E:")
|| tag.starts_with(b"SUPERSEDED:")
|| tag.starts_with(b"META:")
{
Backend::Fjall
} else {
Backend::Redb
}
}
/// Check if a prefix is ambiguous — it could match keys in both backends.
///
/// This happens when scanning by subject only (`{subject}\x00`) since a subject
/// can have keys in both fjall (assertions, votes) and redb (indexes, views).
fn is_cross_backend_prefix(prefix: &[u8]) -> bool {
// A subject-only prefix ends with \x00 and has no tag after it
if prefix.is_empty() {
return false;
}
let tag = key_codec::extract_tag(prefix);
// If the extracted tag is empty, the prefix doesn't specify which backend
tag.is_empty()
}
impl HybridStore {
/// Open or create a HybridStore at the given path.
///
/// Creates `fjall/` and `redb/` subdirectories under the given path.
#[instrument(skip_all)]
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
let base = path.as_ref();
let fjall_path = base.join("fjall");
let redb_path = base.join("redb");
std::fs::create_dir_all(&fjall_path).map_err(StorageError::Io)?;
std::fs::create_dir_all(&redb_path).map_err(StorageError::Io)?;
let fjall = FjallStore::open(&fjall_path)?;
let redb = RedbStore::open(redb_path.join("data.redb"))?;
Ok(Self { fjall, redb, _temp_dir: None })
}
/// Open a temporary HybridStore for testing.
///
/// Both backends share one temp directory with `fjall/` and `redb/` subdirectories.
pub fn open_temp() -> Result<Self> {
let temp_dir = tempfile::tempdir().map_err(StorageError::Io)?;
let redb_dir = temp_dir.path().join("redb");
std::fs::create_dir_all(&redb_dir).map_err(StorageError::Io)?;
let fjall = FjallStore::open(temp_dir.path().join("fjall"))?;
let redb = RedbStore::open(redb_dir.join("data.redb"))?;
Ok(Self { fjall, redb, _temp_dir: Some(temp_dir) })
}
}
#[async_trait]
impl KVStore for HybridStore {
#[instrument(skip_all, fields(key_len = key.len()))]
async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
match route(key) {
Backend::Fjall => self.fjall.get(key).await,
Backend::Redb => self.redb.get(key).await,
}
}
#[instrument(skip_all, fields(key_len = key.len(), value_len = value.len()))]
async fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
match route(key) {
Backend::Fjall => self.fjall.put(key, value).await,
Backend::Redb => self.redb.put(key, value).await,
}
}
#[instrument(skip_all, fields(key_len = key.len()))]
async fn delete(&self, key: &[u8]) -> Result<()> {
match route(key) {
Backend::Fjall => self.fjall.delete(key).await,
Backend::Redb => self.redb.delete(key).await,
}
}
#[instrument(skip_all, fields(prefix_len = prefix.len()))]
async fn scan_prefix(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
if is_cross_backend_prefix(prefix) {
// Subject-only prefix — scan both backends and merge
let mut results = self.fjall.scan_prefix(prefix).await?;
results.extend(self.redb.scan_prefix(prefix).await?);
results.sort_by(|a, b| a.0.cmp(&b.0));
return Ok(results);
}
match route(prefix) {
Backend::Fjall => self.fjall.scan_prefix(prefix).await,
Backend::Redb => self.redb.scan_prefix(prefix).await,
}
}
#[instrument(skip_all)]
async fn flush(&self) -> Result<()> {
// Flush fjall first (write-heavy, most critical for durability),
// then redb (always durable after commit, so this is a no-op).
self.fjall.flush().await?;
self.redb.flush().await?;
Ok(())
}
#[instrument(skip_all, fields(key_len = key.len(), delta))]
async fn fetch_and_add_u64(&self, key: &[u8], delta: u64) -> Result<u64> {
match route(key) {
Backend::Fjall => self.fjall.fetch_and_add_u64(key, delta).await,
Backend::Redb => self.redb.fetch_and_add_u64(key, delta).await,
}
}
#[instrument(skip_all, fields(key_len = key.len()))]
async fn compare_and_swap_f32<F>(&self, key: &[u8], update_fn: F) -> Result<f32>
where
F: Fn(f32) -> f32 + Send + Sync,
{
match route(key) {
Backend::Fjall => self.fjall.compare_and_swap_f32(key, update_fn).await,
Backend::Redb => self.redb.compare_and_swap_f32(key, update_fn).await,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::key_codec;
// ── Basic KVStore contract tests ──
#[tokio::test]
async fn test_hybrid_store_roundtrip() {
let store = HybridStore::open_temp().expect("Failed to create temp DB");
let key = b"test_key";
let value = b"test_value";
store.put(key, value).await.expect("Put failed");
let retrieved = store.get(key).await.expect("Get failed");
assert_eq!(retrieved, Some(value.to_vec()));
store.delete(key).await.expect("Delete failed");
let deleted = store.get(key).await.expect("Get failed");
assert_eq!(deleted, None);
}
#[tokio::test]
async fn test_hybrid_scan_prefix() {
let store = HybridStore::open_temp().expect("Failed to create temp DB");
let k1 = key_codec::subject_index_key("subject1");
let k2 = key_codec::subject_predicate_key("subject1", "pred");
let k3 = key_codec::subject_index_key("subject2");
store.put(&k1, b"val1").await.unwrap();
store.put(&k2, b"val2").await.unwrap();
store.put(&k3, b"val3").await.unwrap();
let prefix = key_codec::subject_scan_prefix("subject1");
let results = store.scan_prefix(&prefix).await.unwrap();
assert_eq!(results.len(), 2);
}
#[tokio::test]
async fn test_hybrid_fetch_and_add() {
let store = HybridStore::open_temp().expect("Failed to create temp DB");
// Vote count (fjall path — subject-prefixed)
let vc_key = key_codec::vote_count_key("Tesla", "abc123");
let val = store.fetch_and_add_u64(&vc_key, 5).await.unwrap();
assert_eq!(val, 5);
let val = store.fetch_and_add_u64(&vc_key, 3).await.unwrap();
assert_eq!(val, 8);
// Quota counter (redb path — global)
let qt_key = key_codec::quota_key("agent1", 1000);
let val = store.fetch_and_add_u64(&qt_key, 10).await.unwrap();
assert_eq!(val, 10);
}
#[tokio::test]
async fn test_hybrid_compare_and_swap_f32() {
let store = HybridStore::open_temp().expect("Failed to create temp DB");
// Vote weight (fjall path — subject-prefixed)
let vw_key = key_codec::vote_weight_key("Tesla", "abc123");
let val = store.compare_and_swap_f32(&vw_key, |c| c + 1.5).await.unwrap();
assert!((val - 1.5).abs() < f32::EPSILON);
// Trust rank (redb path — global)
let tr_key = key_codec::trust_rank_key("agent1");
let val = store.compare_and_swap_f32(&tr_key, |c| c + 0.8).await.unwrap();
assert!((val - 0.8).abs() < f32::EPSILON);
}
#[tokio::test]
async fn test_hybrid_flush() {
let store = HybridStore::open_temp().expect("Failed to create temp DB");
let h_key = key_codec::assertion_key("Tesla", "hash1");
let s_key = key_codec::subject_index_key("Tesla");
store.put(&h_key, b"assertion_data").await.unwrap();
store.put(&s_key, b"index_data").await.unwrap();
store.flush().await.expect("Flush should succeed");
}
// ── Routing tests with key_codec keys ──
#[test]
fn test_routing_fjall_subject_prefixed() {
// Subject-prefixed write-heavy keys → Fjall
assert_eq!(route(&key_codec::assertion_key("Tesla", "abc")), Backend::Fjall);
assert_eq!(route(&key_codec::vote_key("Tesla", "abc", "def")), Backend::Fjall);
assert_eq!(route(&key_codec::vote_count_key("Tesla", "abc")), Backend::Fjall);
assert_eq!(route(&key_codec::vote_weight_key("Tesla", "abc")), Backend::Fjall);
}
#[test]
fn test_routing_fjall_global() {
// Global write-heavy keys → Fjall
assert_eq!(route(&key_codec::epoch_key("deadbeef")), Backend::Fjall);
assert_eq!(route(&key_codec::superseded_key("deadbeef")), Backend::Fjall);
assert_eq!(route(&key_codec::cursor_key()), Backend::Fjall);
assert_eq!(route(&key_codec::assertion_count_key()), Backend::Fjall);
}
#[test]
fn test_routing_redb_subject_prefixed() {
// Subject-prefixed read-heavy keys → Redb
assert_eq!(route(&key_codec::subject_index_key("Tesla")), Backend::Redb);
assert_eq!(route(&key_codec::subject_predicate_key("Tesla", "rev")), Backend::Redb);
assert_eq!(route(&key_codec::mv_key("Tesla", "revenue")), Backend::Redb);
assert_eq!(route(&key_codec::gold_standard_key("Earth", "shape")), Backend::Redb);
}
#[test]
fn test_routing_redb_global() {
// Global read-heavy keys → Redb
assert_eq!(route(&key_codec::trust_rank_key("agent1")), Backend::Redb);
assert_eq!(route(&key_codec::quota_key("agent1", 1000)), Backend::Redb);
assert_eq!(route(&key_codec::audit_key("query1")), Backend::Redb);
assert_eq!(route(&key_codec::escalation_key(1000, "hash1")), Backend::Redb);
assert_eq!(route(&key_codec::trust_pack_key(&[1u8; 32])), Backend::Redb);
}
#[test]
fn test_routing_default_to_redb() {
assert_eq!(route(b"unknown:key"), Backend::Redb);
assert_eq!(route(b""), Backend::Redb);
}
#[tokio::test]
async fn test_cross_backend_isolation() {
let store = HybridStore::open_temp().expect("Failed to create temp DB");
// Write to fjall (assertion — subject-prefixed)
let h_key = key_codec::assertion_key("Tesla", "hash1");
store.put(&h_key, b"assertion").await.unwrap();
// Write to redb (index — subject-prefixed)
let s_key = key_codec::subject_index_key("Tesla");
store.put(&s_key, b"index").await.unwrap();
// Both should be retrievable
assert_eq!(store.get(&h_key).await.unwrap(), Some(b"assertion".to_vec()));
assert_eq!(store.get(&s_key).await.unwrap(), Some(b"index".to_vec()));
// Delete from one backend shouldn't affect the other
store.delete(&h_key).await.unwrap();
assert_eq!(store.get(&h_key).await.unwrap(), None);
assert_eq!(store.get(&s_key).await.unwrap(), Some(b"index".to_vec()));
}
#[tokio::test]
async fn test_prefix_scan_within_backend() {
let store = HybridStore::open_temp().expect("Failed to create temp DB");
// Write assertion hashes (fjall — subject-prefixed)
let h1 = key_codec::assertion_key("Earth", "aaa");
let h2 = key_codec::assertion_key("Earth", "bbb");
store.put(&h1, b"val1").await.unwrap();
store.put(&h2, b"val2").await.unwrap();
// Write index entries (redb — global)
let tr1 = key_codec::trust_rank_key("agent_a");
let tr2 = key_codec::trust_rank_key("agent_b");
store.put(&tr1, b"rank1").await.unwrap();
store.put(&tr2, b"rank2").await.unwrap();
// Scan fjall (subject prefix)
let earth_prefix = key_codec::subject_scan_prefix("Earth");
let h_results = store.scan_prefix(&earth_prefix).await.unwrap();
assert_eq!(h_results.len(), 2);
// Scan redb (global prefix)
let trust_prefix = key_codec::trust_rank_scan_prefix();
let tr_results = store.scan_prefix(&trust_prefix).await.unwrap();
assert_eq!(tr_results.len(), 2);
}
}