stemedb/crates/stemedb-storage/src/redb_backend.rs
jordan 3320c24afa feat: WAL hardening (Phase 5B) - CRC32C, crash recovery, group commit, log rotation
Add CRC32C checksums to WAL record format (v2), implement crash recovery
with automatic truncation of corrupt records, add feature-gated group commit
buffer for batched fsync under concurrent load, and implement log rotation
via segment files with global offset addressing.

Key changes:
- Record format v2: [len:u32][crc32c:u32][blake3:32][payload:N]
- recover_file() scans and truncates corrupt tail records
- GroupCommitBuffer batches fsync via MPSC channel (tokio feature gate)
- SegmentManager with binary search resolution and cursor-based cleanup
- Journal::read() auto-refreshes segments on miss for writer/reader split
- Split recovery.rs and key_codec.rs into directory modules for 500-line max

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-02 12:36:35 -07:00

281 lines
10 KiB
Rust

use crate::error::{Result, StorageError};
use crate::traits::KVStore;
use async_trait::async_trait;
use redb::ReadableTable;
use std::path::Path;
use std::sync::Arc;
use tracing::instrument;
const DATA_TABLE: redb::TableDefinition<&[u8], &[u8]> = redb::TableDefinition::new("data");
fn redb_err(e: impl std::fmt::Display) -> StorageError {
StorageError::Backend(e.to_string())
}
/// Compute the lexicographic successor of a byte prefix.
///
/// Returns `None` if the prefix is all `0xFF` (no successor possible).
fn prefix_successor(prefix: &[u8]) -> Option<Vec<u8>> {
let mut end = prefix.to_vec();
while let Some(last) = end.last_mut() {
if *last < 0xFF {
*last += 1;
return Some(end);
}
end.pop();
}
None
}
/// Redb (B-tree) implementation of the KVStore trait.
///
/// Used for read-heavy key prefixes: indexes (`S:`, `SP:`), materialized views (`MV:`),
/// trust ranks (`TR:`), audits (`QA:`), quotas (`QT:`), trust packs (`TP:`),
/// gold standards (`GS:`), and escalations (`ESC:`).
pub struct RedbStore {
db: Arc<redb::Database>,
_temp_dir: Option<tempfile::TempDir>,
}
impl std::fmt::Debug for RedbStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RedbStore").finish()
}
}
impl RedbStore {
/// Open or create a Redb database at the given path.
#[instrument(skip_all)]
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
let db = redb::Database::create(path.as_ref()).map_err(redb_err)?;
Ok(Self { db: Arc::new(db), _temp_dir: None })
}
/// Open a temporary Redb database for testing.
///
/// The database will be automatically deleted when the returned store is dropped.
pub fn open_temp() -> Result<Self> {
let temp_dir = tempfile::tempdir().map_err(StorageError::Io)?;
let db_path = temp_dir.path().join("data.redb");
let db = redb::Database::create(&db_path).map_err(redb_err)?;
Ok(Self { db: Arc::new(db), _temp_dir: Some(temp_dir) })
}
}
#[async_trait]
impl KVStore for RedbStore {
#[instrument(skip_all, fields(key_len = key.len()))]
async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
let read_txn = self.db.begin_read().map_err(redb_err)?;
let table = match read_txn.open_table(DATA_TABLE) {
Ok(t) => t,
Err(redb::TableError::TableDoesNotExist(_)) => return Ok(None),
Err(e) => return Err(redb_err(e)),
};
match table.get(key).map_err(redb_err)? {
Some(guard) => Ok(Some(guard.value().to_vec())),
None => Ok(None),
}
}
#[instrument(skip_all, fields(key_len = key.len(), value_len = value.len()))]
async fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
let write_txn = self.db.begin_write().map_err(redb_err)?;
{
let mut table = write_txn.open_table(DATA_TABLE).map_err(redb_err)?;
table.insert(key, value).map_err(redb_err)?;
}
write_txn.commit().map_err(redb_err)?;
Ok(())
}
#[instrument(skip_all, fields(key_len = key.len()))]
async fn delete(&self, key: &[u8]) -> Result<()> {
let write_txn = self.db.begin_write().map_err(redb_err)?;
{
let mut table = write_txn.open_table(DATA_TABLE).map_err(redb_err)?;
table.remove(key).map_err(redb_err)?;
}
write_txn.commit().map_err(redb_err)?;
Ok(())
}
#[instrument(skip_all, fields(prefix_len = prefix.len()))]
async fn scan_prefix(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
let read_txn = self.db.begin_read().map_err(redb_err)?;
let table = match read_txn.open_table(DATA_TABLE) {
Ok(t) => t,
Err(redb::TableError::TableDoesNotExist(_)) => return Ok(Vec::new()),
Err(e) => return Err(redb_err(e)),
};
let mut results = Vec::new();
match prefix_successor(prefix) {
Some(end_key) => {
let range = table.range(prefix..end_key.as_slice()).map_err(redb_err)?;
for entry in range {
let (k, v) = entry.map_err(redb_err)?;
results.push((k.value().to_vec(), v.value().to_vec()));
}
}
None => {
// prefix is all 0xFF — scan from prefix to end
let range = table.range(prefix..).map_err(redb_err)?;
for entry in range {
let (k, v) = entry.map_err(redb_err)?;
results.push((k.value().to_vec(), v.value().to_vec()));
}
}
}
Ok(results)
}
#[instrument(skip_all)]
async fn flush(&self) -> Result<()> {
// redb is always durable after commit — flush is a no-op
Ok(())
}
#[instrument(skip_all, fields(key_len = key.len(), delta))]
async fn fetch_and_add_u64(&self, key: &[u8], delta: u64) -> Result<u64> {
let write_txn = self.db.begin_write().map_err(redb_err)?;
let new_val = {
let mut table = write_txn.open_table(DATA_TABLE).map_err(redb_err)?;
let current = match table.get(key).map_err(redb_err)? {
Some(guard) => {
let arr: [u8; 8] = guard.value().try_into().map_err(|_| {
StorageError::Serialization(format!(
"Corrupted u64 counter: expected 8 bytes, got {}",
guard.value().len()
))
})?;
u64::from_le_bytes(arr)
}
None => 0,
};
let new_val = current.saturating_add(delta);
table.insert(key, new_val.to_le_bytes().as_slice()).map_err(redb_err)?;
new_val
};
write_txn.commit().map_err(redb_err)?;
Ok(new_val)
}
#[instrument(skip_all, fields(key_len = key.len()))]
async fn compare_and_swap_f32<F>(&self, key: &[u8], update_fn: F) -> Result<f32>
where
F: Fn(f32) -> f32 + Send + Sync,
{
let write_txn = self.db.begin_write().map_err(redb_err)?;
let new_val = {
let mut table = write_txn.open_table(DATA_TABLE).map_err(redb_err)?;
let current = match table.get(key).map_err(redb_err)? {
Some(guard) => {
let arr: [u8; 4] = guard.value().try_into().map_err(|_| {
StorageError::Serialization(format!(
"Corrupted f32 value: expected 4 bytes, got {}",
guard.value().len()
))
})?;
f32::from_le_bytes(arr)
}
None => 0.0,
};
let new_val = update_fn(current);
table.insert(key, new_val.to_le_bytes().as_slice()).map_err(redb_err)?;
new_val
};
write_txn.commit().map_err(redb_err)?;
Ok(new_val)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_redb_store_roundtrip() {
let store = RedbStore::open_temp().expect("Failed to create temp DB");
let key = b"test_key";
let value = b"test_value";
store.put(key, value).await.expect("Put failed");
let retrieved = store.get(key).await.expect("Get failed");
assert_eq!(retrieved, Some(value.to_vec()));
store.delete(key).await.expect("Delete failed");
let deleted = store.get(key).await.expect("Get failed");
assert_eq!(deleted, None);
}
#[tokio::test]
async fn test_redb_scan_prefix() {
let store = RedbStore::open_temp().expect("Failed to create temp DB");
store.put(b"prefix:1", b"val1").await.unwrap();
store.put(b"prefix:2", b"val2").await.unwrap();
store.put(b"other:3", b"val3").await.unwrap();
let results = store.scan_prefix(b"prefix:").await.unwrap();
assert_eq!(results.len(), 2);
assert_eq!(results[0], (b"prefix:1".to_vec(), b"val1".to_vec()));
assert_eq!(results[1], (b"prefix:2".to_vec(), b"val2".to_vec()));
}
#[tokio::test]
async fn test_redb_fetch_and_add() {
let store = RedbStore::open_temp().expect("Failed to create temp DB");
let key = b"counter";
let val = store.fetch_and_add_u64(key, 5).await.unwrap();
assert_eq!(val, 5);
let val = store.fetch_and_add_u64(key, 3).await.unwrap();
assert_eq!(val, 8);
}
#[tokio::test]
async fn test_redb_compare_and_swap_f32() {
let store = RedbStore::open_temp().expect("Failed to create temp DB");
let key = b"weight";
let val = store.compare_and_swap_f32(key, |current| current + 1.5).await.unwrap();
assert!((val - 1.5).abs() < f32::EPSILON);
let val = store.compare_and_swap_f32(key, |current| current + 2.0).await.unwrap();
assert!((val - 3.5).abs() < f32::EPSILON);
}
#[tokio::test]
async fn test_redb_flush() {
let store = RedbStore::open_temp().expect("Failed to create temp DB");
store.put(b"key", b"value").await.unwrap();
store.flush().await.expect("Flush should succeed");
}
#[tokio::test]
async fn test_redb_get_nonexistent_table() {
let store = RedbStore::open_temp().expect("Failed to create temp DB");
// Get from empty database (table doesn't exist yet)
let result = store.get(b"missing").await.unwrap();
assert_eq!(result, None);
}
#[tokio::test]
async fn test_redb_scan_prefix_empty_table() {
let store = RedbStore::open_temp().expect("Failed to create temp DB");
// Scan from empty database
let results = store.scan_prefix(b"prefix:").await.unwrap();
assert!(results.is_empty());
}
#[test]
fn test_prefix_successor() {
assert_eq!(prefix_successor(b"abc"), Some(b"abd".to_vec()));
assert_eq!(prefix_successor(b"ab\xff"), Some(b"ac".to_vec()));
assert_eq!(prefix_successor(b"\xff\xff\xff"), None);
assert_eq!(prefix_successor(b""), None);
assert_eq!(prefix_successor(b"a\xff\xff"), Some(b"b".to_vec()));
}
}