Add CRC32C checksums to WAL record format (v2), implement crash recovery with automatic truncation of corrupt records, add feature-gated group commit buffer for batched fsync under concurrent load, and implement log rotation via segment files with global offset addressing. Key changes: - Record format v2: [len:u32][crc32c:u32][blake3:32][payload:N] - recover_file() scans and truncates corrupt tail records - GroupCommitBuffer batches fsync via MPSC channel (tokio feature gate) - SegmentManager with binary search resolution and cursor-based cleanup - Journal::read() auto-refreshes segments on miss for writer/reader split - Split recovery.rs and key_codec.rs into directory modules for 500-line max Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2.3 KiB
2.3 KiB
Ingestor Service
Crate:
stemedb-ingestStatus: Implemented (Phase 1)
Purpose
The Ingestor is the background worker that bridges the Write-Ahead Log (WAL) to the KV storage engine. It continuously tails the WAL and persists records to the HybridStore (fjall + redb) using content-addressed keys.
Architecture
[WAL Journal] ---> [IngestWorker] ---> [KVStore (HybridStore)]
|
v
[Subject Index]
Key Components
RecordType
Discriminator for WAL payloads (8-byte aligned header):
Assertion = 0- Knowledge claimsVote = 1- Consensus votesEpoch = 2- Paradigm definitions
Storage Layout
| Key Pattern | Value | Description |
|---|---|---|
H:{blake3_hash} |
Serialized Assertion | Content-addressed assertion store |
V:{assertion_hash}:{vote_hash} |
Serialized Vote | Votes on assertions |
E:{epoch_id_hex} |
Serialized Epoch | Epoch definitions |
S:{subject} |
BLAKE3 hash bytes | Subject adjacency index |
Usage
use stemedb_ingest::{Ingestor, serialize_assertion};
use stemedb_wal::Journal;
use stemedb_storage::HybridStore;
// Create components
let journal = Arc::new(Mutex::new(Journal::open("./wal")?));
let store = Arc::new(HybridStore::open("./db")?);
// Create and start ingestor
let mut ingestor = Ingestor::new(journal.clone(), store);
ingestor.start(); // Spawns background task
// Write to WAL (records will be ingested automatically)
let assertion = Assertion { ... };
let payload = serialize_assertion(&assertion)?;
journal.lock().await.append(payload)?;
Serialization
Records are serialized with an 8-byte header to maintain rkyv alignment:
[type: u8][padding: 7 bytes][rkyv payload...]
Helper functions:
serialize_assertion(&Assertion) -> Result<Vec<u8>>serialize_vote(&Vote) -> Result<Vec<u8>>serialize_epoch(&Epoch) -> Result<Vec<u8>>
Testing
The ingestor has integration tests covering:
- Single assertion ingestion
- Vote ingestion
- Epoch ingestion
- Multiple record processing
- Subject index creation
Related
- Storage Service - KVStore trait and HybridStore (fjall + redb)
- Content Addressing - BLAKE3 hashing