Implements the foundation of tidalDB's data pipeline: **Phase 1 – Schema primitives** - EntityId newtype (u64, big-endian ordering) - SignalTypeDefinition with pre-computed decay λ, deduped/sorted windows - SchemaBuilder with full constraint validation (duplicates, identifiers, half-life, windows, velocity) - LumenError wrapping all subsystems with required From impls **Phase 2 – Write-Ahead Log** - Length-prefixed, BLAKE3-protected entry format - Group-commit writer (batch up to 100 events / 10 ms) - Double-buffered content-hash deduplication - Checkpoint, truncation, and crash-recovery with full replay - Integration, property, and UAT tests (incl. 5,500-event deterministic UAT) - Proptest coverage scaled to 10 000 events/run (was ≤500) to meet acceptance criterion; cases reduced 100→10 to keep runtime comparable **Phase 3 – Storage engine** - StorageEngine trait (get/put/delete/scan/batch/flush) - Key encoding: [EntityId][0x00][Tag][suffix] with ordering/prefix helpers - InMemoryBackend (BTreeMap + RwLock) - FjallStorage with three isolated keyspaces and atomic batch helper - Property tests for key ordering and round-trip correctness Also adds planning docs for phases 4-5, research docs, architecture overview, and roadmap updates. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
10 KiB
Task 01: StorageEngine Trait and Key Encoding
Context
Milestone: 1 -- Signal Engine
Phase: m1p3 -- Storage Engine Trait and fjall Backend
Status: COMPLETE
Depends On: m1p1 (EntityId, EntityKind)
Blocks: Task 02 (FjallBackend), Task 03 (InMemoryBackend)
Complexity: M
Objective
Define the StorageEngine trait that abstracts all persistent entity state access, the key encoding scheme that colocates entity data for efficient prefix scans, and the supporting types (WriteBatch, BatchOp, PrefixIterator, StorageError).
This is the boundary that keeps the rest of tidalDB storage-engine-agnostic. The WAL (m1p2) is the signal event source of truth; the storage engine is where derived entity state (metadata, signal checkpoints, indexes) lives. Every higher module — signal ledger, entity API, query engine — talks to a StorageEngine, never to fjall directly.
Requirements
StorageEngineis aSend + Syncobject-safe trait- Operations:
get(&[u8]) -> Result<Option<Vec<u8>>>,put(&[u8], &[u8]) -> Result<()>,delete(&[u8]) -> Result<()>,scan_prefix(&[u8]) -> PrefixIterator<'_>,write_batch(WriteBatch) -> Result<()>,flush() -> Result<()> - Key encoding:
[entity_id: 8 bytes BE][0x00][Tag: 1 byte][suffix: variable]- 8-byte big-endian entity ID: byte-lexicographic order matches numeric order
0x00NUL separator between entity ID and tag- 1-byte
Tagdiscriminant for data category within the keyspace
Tagenum:Evt=0x01 (raw events),Sig=0x02 (signal state),Meta=0x03 (entity metadata),Rel=0x04 (relationships),Mv=0x05 (materialized views),Idx=0x06 (inverted index)entity_prefix(entity_id)returns 9 bytes:[entity_id: 8 BE][0x00]— scans all tags for one entityentity_tag_prefix(entity_id, tag)returns 10 bytes:[entity_id: 8 BE][0x00][tag: 1]— scans one tag for one entityencode_key(entity_id, tag, suffix)andparse_key(key)roundtrip correctly for all inputsWriteBatchcollectsPutandDeleteoperations;write_batch()applies them atomicallyPrefixIterator<'_>is a type alias forBox<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>), StorageError>> + '_>StorageErrorintegrates withLumenError::Storage
Technical Design
Key Encoding
[entity_id: u64 BE, 8 bytes][NUL: 0x00, 1 byte][Tag: u8, 1 byte][suffix: 0..N bytes]
Total prefix for entity scan: 9 bytes
Total prefix for tag scan: 10 bytes
Why big-endian for entity IDs? Byte-lexicographic order of the 8-byte encoding must match numeric order of the u64 value. Big-endian achieves this: EntityId(1) → [0,0,0,0,0,0,0,1], EntityId(256) → [0,0,0,1,0,0,0,0]. Little-endian would invert the ordering.
Why NUL separator? Prevents a variable-length entity ID prefix from colliding with suffixes. With fixed 8-byte IDs the separator is redundant but is kept for consistency with the subject-prefix pattern from thoughts.md and for future extensibility.
Public API
// === keys.rs ===
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[repr(u8)]
pub enum Tag {
Evt = 0x01, // raw event records (signal WAL overflow/cold tier)
Sig = 0x02, // signal state checkpoints
Meta = 0x03, // entity metadata (title, category, created_at, ...)
Rel = 0x04, // relationship edges (follows, blocks, interaction weights)
Mv = 0x05, // materialized views (pre-computed aggregates)
Idx = 0x06, // inverted index entries
}
/// Build a full key: [entity_id: 8 BE][0x00][tag: 1][suffix]
pub fn encode_key(entity_id: EntityId, tag: Tag, suffix: &[u8]) -> Vec<u8>;
/// Parse a key back into (entity_id, tag, suffix).
/// Returns Err on keys too short to contain entity_id + separator + tag.
pub fn parse_key(key: &[u8]) -> Result<(EntityId, Tag, &[u8]), StorageError>;
/// Prefix for all keys belonging to one entity: [entity_id: 8 BE][0x00]
pub fn entity_prefix(entity_id: EntityId) -> [u8; 9];
/// Prefix for one tag of one entity: [entity_id: 8 BE][0x00][tag: 1]
pub fn entity_tag_prefix(entity_id: EntityId, tag: Tag) -> [u8; 10];
// === batch.rs ===
#[derive(Debug, Clone)]
pub enum BatchOp {
Put { key: Vec<u8>, value: Vec<u8> },
Delete { key: Vec<u8> },
}
#[derive(Debug, Default, Clone)]
pub struct WriteBatch {
ops: Vec<BatchOp>,
}
impl WriteBatch {
pub fn new() -> Self;
pub fn put(&mut self, key: Vec<u8>, value: Vec<u8>) -> &mut Self;
pub fn delete(&mut self, key: Vec<u8>) -> &mut Self;
pub fn ops(&self) -> &[BatchOp];
pub fn is_empty(&self) -> bool;
pub fn len(&self) -> usize;
}
// === iterator.rs ===
/// Boxed prefix scan iterator yielding (key, value) pairs.
pub type PrefixIterator<'a> = Box<dyn Iterator<Item = Result<(Vec<u8>, Vec<u8>), StorageError>> + 'a>;
// === error.rs ===
#[derive(Debug, thiserror::Error)]
pub enum StorageError {
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("storage backend error: {0}")]
Backend(String),
#[error("key parse error: {0}")]
KeyParse(String),
#[error("engine closed")]
Closed,
}
Test Strategy
Property Tests (proptest)
// encode_key / parse_key roundtrip for all tags and suffixes
proptest! {
#[test]
fn key_roundtrip(
id: u64,
tag in prop_oneof![
Just(Tag::Evt), Just(Tag::Sig), Just(Tag::Meta),
Just(Tag::Rel), Just(Tag::Mv), Just(Tag::Idx),
],
suffix in prop::collection::vec(any::<u8>(), 0..64),
) {
let entity_id = EntityId::new(id);
let key = encode_key(entity_id, tag, &suffix);
let (parsed_id, parsed_tag, parsed_suffix) = parse_key(&key).unwrap();
prop_assert_eq!(parsed_id, entity_id);
prop_assert_eq!(parsed_tag, tag);
prop_assert_eq!(parsed_suffix, suffix.as_slice());
}
}
// Byte-lexicographic order of encoded keys matches numeric order of entity IDs
proptest! {
#[test]
fn key_ordering_matches_entity_id_ordering(a: u64, b: u64) {
let key_a = encode_key(EntityId::new(a), Tag::Meta, b"");
let key_b = encode_key(EntityId::new(b), Tag::Meta, b"");
prop_assert_eq!(
key_a.cmp(&key_b),
a.cmp(&b),
"key ordering must match entity ID ordering"
);
}
}
// entity_prefix is a prefix of every key for that entity
proptest! {
#[test]
fn entity_prefix_is_prefix_of_all_entity_keys(id: u64) {
let entity_id = EntityId::new(id);
let prefix = entity_prefix(entity_id);
for tag in [Tag::Evt, Tag::Sig, Tag::Meta, Tag::Rel] {
let key = encode_key(entity_id, tag, b"suffix");
prop_assert!(key.starts_with(&prefix));
}
}
}
// entity_tag_prefix is a prefix of every key for that entity and tag
proptest! {
#[test]
fn entity_tag_prefix_is_precise(id: u64, suffix in prop::collection::vec(any::<u8>(), 0..32)) {
let entity_id = EntityId::new(id);
let prefix = entity_tag_prefix(entity_id, Tag::Sig);
let key = encode_key(entity_id, Tag::Sig, &suffix);
prop_assert!(key.starts_with(&prefix));
// Tag::Meta key does NOT start with Tag::Sig prefix
let other_key = encode_key(entity_id, Tag::Meta, &suffix);
prop_assert!(!other_key.starts_with(&prefix));
}
}
Unit Tests
#[test]
fn tag_byte_values() {
assert_eq!(Tag::Evt as u8, 0x01);
assert_eq!(Tag::Sig as u8, 0x02);
assert_eq!(Tag::Meta as u8, 0x03);
assert_eq!(Tag::Rel as u8, 0x04);
assert_eq!(Tag::Mv as u8, 0x05);
assert_eq!(Tag::Idx as u8, 0x06);
}
#[test]
fn entity_prefix_length() {
let prefix = entity_prefix(EntityId::new(1));
assert_eq!(prefix.len(), 9);
}
#[test]
fn entity_tag_prefix_length() {
let prefix = entity_tag_prefix(EntityId::new(1), Tag::Meta);
assert_eq!(prefix.len(), 10);
}
#[test]
fn parse_key_rejects_short_input() {
assert!(parse_key(b"").is_err());
assert!(parse_key(&[0u8; 8]).is_err()); // missing NUL + tag
assert!(parse_key(&[0u8; 9]).is_err()); // missing tag
}
#[test]
fn write_batch_ops_order_preserved() {
let mut batch = WriteBatch::new();
batch.put(b"k1".to_vec(), b"v1".to_vec());
batch.delete(b"k2".to_vec());
batch.put(b"k3".to_vec(), b"v3".to_vec());
assert_eq!(batch.len(), 3);
assert!(matches!(batch.ops()[0], BatchOp::Put { .. }));
assert!(matches!(batch.ops()[1], BatchOp::Delete { .. }));
assert!(matches!(batch.ops()[2], BatchOp::Put { .. }));
}
Acceptance Criteria
encode_key/parse_keyroundtrip correctly for all 6Tagvariants and arbitrary suffixes (property tested)- Byte-lexicographic ordering of encoded keys matches numeric ordering of
EntityId(property tested) entity_prefixis 9 bytes and is a prefix of every key for that entity (property tested)entity_tag_prefixis 10 bytes and is a prefix of only keys with the matching entity+tag (property tested)parse_keyreturnsStorageError::KeyParsefor inputs shorter than 10 bytesWriteBatchpreserves insertion order of operationsStorageEnginetrait is object-safe (dyn StorageEnginecompiles)StorageEngine: Send + Sync— enforced by the trait boundcargo clippy -D warningspasses
Research References
- thoughts.md — Part V.12 (subject-prefix keys:
[entity_id][NUL][TAG][suffix], rationale for co-location, entity-scoped prefix scans) - CODING_GUIDELINES.md — Section 2 (key encoding: big-endian for byte-lexicographic ordering, NUL separator convention)
Implementation Notes
Taguses#[repr(u8)]for direct byte encoding. AFrom<u8>impl with a catch-all→ StorageError::KeyParseallows forward-compatible decoding of unknown future tag values.PrefixIterator<'_>is a type alias (not a newtype) to avoid boxing overhead in callers that know the concrete iterator type at compile time. The'_lifetime ties the iterator to the backend's lifetime.StorageErrorusesthiserror(already inCargo.toml) forDisplayandErrorimplementations.- Do NOT add
serdeto the storage error types. Error propagation usesFromimpls, not serialization.