Implements the foundation of tidalDB's data pipeline: **Phase 1 – Schema primitives** - EntityId newtype (u64, big-endian ordering) - SignalTypeDefinition with pre-computed decay λ, deduped/sorted windows - SchemaBuilder with full constraint validation (duplicates, identifiers, half-life, windows, velocity) - LumenError wrapping all subsystems with required From impls **Phase 2 – Write-Ahead Log** - Length-prefixed, BLAKE3-protected entry format - Group-commit writer (batch up to 100 events / 10 ms) - Double-buffered content-hash deduplication - Checkpoint, truncation, and crash-recovery with full replay - Integration, property, and UAT tests (incl. 5,500-event deterministic UAT) - Proptest coverage scaled to 10 000 events/run (was ≤500) to meet acceptance criterion; cases reduced 100→10 to keep runtime comparable **Phase 3 – Storage engine** - StorageEngine trait (get/put/delete/scan/batch/flush) - Key encoding: [EntityId][0x00][Tag][suffix] with ordering/prefix helpers - InMemoryBackend (BTreeMap + RwLock) - FjallStorage with three isolated keyspaces and atomic batch helper - Property tests for key ordering and round-trip correctness Also adds planning docs for phases 4-5, research docs, architecture overview, and roadmap updates. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
555 lines
24 KiB
Markdown
555 lines
24 KiB
Markdown
# Task 04: Checkpoint and Restore
|
|
|
|
## Context
|
|
|
|
**Milestone:** 1 -- Signal Engine
|
|
**Phase:** m1p4 -- Signal Ledger
|
|
**Depends On:** Task 03 (Signal Ledger and Velocity)
|
|
**Blocks:** m1p5 (Entity CRUD and Signal Write API)
|
|
**Complexity:** M
|
|
|
|
## Objective
|
|
|
|
Deliver the checkpoint and restore mechanism for the `SignalLedger`. Hot-tier decay scores and warm-tier bucketed counters are in-memory state. Without persistence, a crash loses all signal aggregates and requires full WAL replay from the beginning of time. Checkpoint/restore writes the current in-memory state to the `StorageEngine` (via `Tag::Sig`) periodically, so that crash recovery only needs to replay WAL events since the last checkpoint.
|
|
|
|
This task implements:
|
|
1. **Checkpoint:** Serialize all `DashMap` entries to the `StorageEngine` using the existing key encoding (`encode_key(entity_id, Tag::Sig, suffix)`).
|
|
2. **Restore:** On startup, scan the `Tag::Sig` key range and reconstruct `EntitySignalEntry` instances into the `DashMap`.
|
|
3. **Serialization format:** A compact binary format for `HotSignalState` and `BucketedCounterSnapshot`.
|
|
|
|
The checkpoint is a consistent snapshot of the signal ledger at a point in time. After restore, WAL events after the checkpoint's sequence number are replayed to bring the state up to date. The WAL replay mechanism itself is m1p2's responsibility; this task provides the `checkpoint()` and `restore()` methods that m1p5 will call.
|
|
|
|
## Requirements
|
|
|
|
- `SignalLedger::checkpoint()` writes all entries to `StorageEngine` via `Tag::Sig` keys
|
|
- `SignalLedger::restore()` reads all `Tag::Sig` keys and populates the `DashMap`
|
|
- Key format: `encode_key(entity_id, Tag::Sig, &[signal_type_id_hi, signal_type_id_lo])`
|
|
- Value format: deterministic binary serialization of hot-tier + warm-tier state
|
|
- Checkpoint must be consistent: no partial entries (use `write_batch` for atomicity)
|
|
- Restore + re-checkpoint produces identical storage content (roundtrip property)
|
|
- Checkpoint duration target: < 2 seconds for 10,000 entity-signal pairs
|
|
- `StorageEngine` is passed by reference -- `SignalLedger` does not own storage (m1p5's `TidalDB` owns both)
|
|
- No external serialization dependencies (no serde, no bincode) -- hand-rolled binary for control and `#![forbid(unsafe_code)]` compatibility
|
|
|
|
## Technical Design
|
|
|
|
### Module Structure
|
|
|
|
```
|
|
tidal/src/signals/
|
|
checkpoint.rs -- checkpoint, restore, serialization helpers
|
|
```
|
|
|
|
### Public API
|
|
|
|
```rust
|
|
// === signals/checkpoint.rs ===
|
|
|
|
use crate::schema::EntityId;
|
|
use crate::storage::{StorageEngine, Tag, WriteBatch, encode_key, entity_tag_prefix, parse_key};
|
|
use super::ledger::{SignalLedger, EntitySignalEntry};
|
|
use super::hot::HotSignalState;
|
|
use super::warm::BucketedCounterSnapshot;
|
|
use super::SignalTypeId;
|
|
|
|
/// Checkpoint sequence metadata stored alongside the signal state.
|
|
/// Used by the WAL replay mechanism to know where to start replaying.
|
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
|
pub struct CheckpointMeta {
|
|
/// Timestamp (nanos) when the checkpoint was taken.
|
|
pub checkpoint_time_ns: u64,
|
|
/// WAL sequence number at checkpoint time.
|
|
/// Events with sequence > this number must be replayed after restore.
|
|
pub wal_sequence: u64,
|
|
}
|
|
|
|
impl SignalLedger {
|
|
/// Write all in-memory signal state to the storage engine.
|
|
///
|
|
/// Iterates over the DashMap and serializes each entry to a key-value pair
|
|
/// using `Tag::Sig`. Uses `write_batch` for atomicity -- either all entries
|
|
/// are written or none.
|
|
///
|
|
/// The checkpoint metadata (timestamp, WAL sequence) is written to a
|
|
/// well-known key: `encode_key(EntityId::new(0), Tag::Sig, b"meta")`.
|
|
///
|
|
/// # Key Format
|
|
///
|
|
/// Per-entry key: `[entity_id: 8 BE][0x00][Tag::Sig][signal_type_id: 2 BE]`
|
|
/// Meta key: `[0x00..0x00 (8 bytes)][0x00][Tag::Sig][b"meta"]`
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Returns `LumenError::Storage` if the write batch fails.
|
|
pub fn checkpoint(
|
|
&self,
|
|
storage: &dyn StorageEngine,
|
|
meta: CheckpointMeta,
|
|
) -> crate::Result<()>;
|
|
|
|
/// Restore in-memory signal state from the storage engine.
|
|
///
|
|
/// Scans all keys with `Tag::Sig` prefix for each entity kind's keyspace,
|
|
/// deserializes the values, and populates the DashMap.
|
|
///
|
|
/// Returns the checkpoint metadata (for the WAL to know where to resume).
|
|
/// Returns `None` if no checkpoint exists (first boot).
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Returns `LumenError::Storage` on I/O failure.
|
|
/// Returns `LumenError::Internal` on deserialization failure (corrupt checkpoint).
|
|
pub fn restore(
|
|
&self,
|
|
storage: &dyn StorageEngine,
|
|
) -> crate::Result<Option<CheckpointMeta>>;
|
|
|
|
/// Return the number of entries currently in the DashMap.
|
|
/// Used for diagnostics and testing.
|
|
pub fn entry_count(&self) -> usize;
|
|
}
|
|
|
|
/// Serialize an EntitySignalEntry to bytes.
|
|
///
|
|
/// Binary format (all values little-endian for simplicity):
|
|
///
|
|
/// ```text
|
|
/// Offset Size Field
|
|
/// 0 1 version (0x01)
|
|
/// 1 8 entity_id (u64 LE)
|
|
/// 9 2 signal_type_id (u16 LE)
|
|
/// 11 2 flags (u16 LE)
|
|
/// 13 8 last_update_ns (u64 LE)
|
|
/// 21 8 decay_score_0 (f64 LE, as u64 bits)
|
|
/// 29 8 decay_score_1 (f64 LE)
|
|
/// 37 8 decay_score_2 (f64 LE)
|
|
/// 45 1 current_minute (u8)
|
|
/// 46 1 current_hour (u8)
|
|
/// 47 8 all_time_count (u64 LE)
|
|
/// 55 8 last_minute_rotation_ns (u64 LE)
|
|
/// 63 8 last_hour_rotation_ns (u64 LE)
|
|
/// 71 240 minute_buckets (60 * u32 LE)
|
|
/// 311 672 hour_buckets (168 * u32 LE)
|
|
/// Total: 983 bytes
|
|
/// ```
|
|
pub fn serialize_entry(
|
|
entity_id: EntityId,
|
|
signal_type_id: SignalTypeId,
|
|
entry: &EntitySignalEntry,
|
|
) -> Vec<u8>;
|
|
|
|
/// Deserialize an EntitySignalEntry from bytes.
|
|
///
|
|
/// Returns (entity_id, signal_type_id, entry) or an error if the format is invalid.
|
|
pub fn deserialize_entry(
|
|
bytes: &[u8],
|
|
) -> Result<(EntityId, SignalTypeId, EntitySignalEntry), String>;
|
|
|
|
/// Serialize CheckpointMeta to bytes.
|
|
///
|
|
/// Format: [version: 1][checkpoint_time_ns: 8 LE][wal_sequence: 8 LE] = 17 bytes
|
|
pub fn serialize_meta(meta: &CheckpointMeta) -> Vec<u8>;
|
|
|
|
/// Deserialize CheckpointMeta from bytes.
|
|
pub fn deserialize_meta(bytes: &[u8]) -> Result<CheckpointMeta, String>;
|
|
```
|
|
|
|
### Internal Design
|
|
|
|
**Key encoding for checkpoint entries:**
|
|
|
|
Each `(EntityId, SignalTypeId)` pair maps to a storage key using the existing `encode_key` function:
|
|
|
|
```rust
|
|
let suffix = signal_type_id.as_u16().to_be_bytes();
|
|
let key = encode_key(entity_id, Tag::Sig, &suffix);
|
|
```
|
|
|
|
This produces: `[entity_id: 8 BE][0x00][0x02][signal_type_id: 2 BE]` -- 12 bytes total. The `Tag::Sig` byte (0x02) ensures checkpoint entries live in a separate namespace from event data (`Tag::Evt`) and metadata (`Tag::Meta`).
|
|
|
|
**Checkpoint meta key:**
|
|
|
|
The checkpoint metadata is stored at a well-known key using `EntityId::new(0)` as the entity ID:
|
|
|
|
```rust
|
|
let meta_key = encode_key(EntityId::new(0), Tag::Sig, b"meta");
|
|
```
|
|
|
|
Entity ID 0 is reserved for system-level keys. The suffix `b"meta"` distinguishes the checkpoint metadata from any entity-signal pair (whose suffix is exactly 2 bytes, never 4).
|
|
|
|
**Atomic checkpoint via write_batch:**
|
|
|
|
The checkpoint writes all entries plus the metadata in a single `WriteBatch`. This ensures that the checkpoint is either fully written or not written at all. If the process crashes during checkpoint, the previous checkpoint remains valid.
|
|
|
|
```rust
|
|
pub fn checkpoint(&self, storage: &dyn StorageEngine, meta: CheckpointMeta) -> crate::Result<()> {
|
|
let mut batch = WriteBatch::new();
|
|
|
|
// Write checkpoint metadata
|
|
let meta_key = encode_key(EntityId::new(0), Tag::Sig, b"meta");
|
|
batch.put(meta_key, serialize_meta(&meta));
|
|
|
|
// Write all entries
|
|
for entry_ref in self.entries.iter() {
|
|
let &(entity_id, signal_type_id) = entry_ref.key();
|
|
let entry = entry_ref.value();
|
|
let suffix = signal_type_id.as_u16().to_be_bytes();
|
|
let key = encode_key(entity_id, Tag::Sig, &suffix);
|
|
let value = serialize_entry(entity_id, signal_type_id, entry);
|
|
batch.put(key, value);
|
|
}
|
|
|
|
storage.write_batch(batch)?;
|
|
storage.flush()?;
|
|
Ok(())
|
|
}
|
|
```
|
|
|
|
**Restore via prefix scan:**
|
|
|
|
On restore, we scan all keys under `Tag::Sig` for each entity kind. However, at M1 scope, we only have one keyspace (items). The scan uses `entity_tag_prefix` is not sufficient since we need to scan across ALL entities. Instead, we scan all keys in the keyspace and filter by `Tag::Sig`:
|
|
|
|
Actually, a simpler approach: scan by a known pattern. Since all checkpoint keys have `Tag::Sig` (0x02) at byte position 9, and we want all of them, we scan the entire keyspace and filter. But `scan_prefix` requires a prefix. We can iterate entity IDs 0..MAX, but that is impractical.
|
|
|
|
Better approach: the `SignalLedger::restore` accepts a `&dyn StorageEngine` that represents a single keyspace (items in M1). It performs `scan_prefix(&[])` -- an empty prefix that returns all keys -- and filters for `Tag::Sig` keys, excluding the meta key.
|
|
|
|
Wait -- `scan_prefix` with empty prefix returns all keys. Then `parse_key` extracts the tag. This works.
|
|
|
|
```rust
|
|
pub fn restore(&self, storage: &dyn StorageEngine) -> crate::Result<Option<CheckpointMeta>> {
|
|
let mut meta: Option<CheckpointMeta> = None;
|
|
|
|
// Read the meta key first
|
|
let meta_key = encode_key(EntityId::new(0), Tag::Sig, b"meta");
|
|
if let Some(meta_bytes) = storage.get(&meta_key)? {
|
|
meta = Some(deserialize_meta(&meta_bytes)
|
|
.map_err(|e| LumenError::Internal(format!("corrupt checkpoint meta: {e}")))?);
|
|
}
|
|
|
|
// Scan all Tag::Sig keys (excluding meta)
|
|
// Use entity_id=0 tag prefix to get the meta, then scan higher entity IDs
|
|
// Actually, iterate all keys and filter:
|
|
for (key, value) in storage.scan_prefix(&[]) {
|
|
if let Some((entity_id, Tag::Sig, suffix)) = parse_key(&key) {
|
|
// Skip the meta key
|
|
if entity_id == EntityId::new(0) && suffix == b"meta" {
|
|
continue;
|
|
}
|
|
let (eid, stid, entry) = deserialize_entry(&value)
|
|
.map_err(|e| LumenError::Internal(format!("corrupt checkpoint entry: {e}")))?;
|
|
self.entries.insert((eid, stid), entry);
|
|
}
|
|
}
|
|
|
|
Ok(meta)
|
|
}
|
|
```
|
|
|
|
**Serialization format:**
|
|
|
|
Hand-rolled binary serialization is used instead of serde/bincode because:
|
|
1. Zero additional dependencies
|
|
2. Full control over format stability
|
|
3. Trivial to implement for fixed-layout structs
|
|
4. Compatible with `#![forbid(unsafe_code)]` without question
|
|
|
|
The format uses a version byte (0x01) at offset 0. If the format changes in future milestones, the version byte enables backward-compatible deserialization.
|
|
|
|
Little-endian is used for serialized values (vs big-endian for storage keys). The choice does not matter for correctness; little-endian matches the native byte order on x86/ARM64/RISC-V (the target platforms) and avoids byte-swapping on the common path.
|
|
|
|
### Error Handling
|
|
|
|
- Storage write failure: returns `LumenError::Storage(StorageError::...)`.
|
|
- Corrupt checkpoint data (deserialization failure): returns `LumenError::Internal(...)` with a descriptive message. This should never happen in normal operation -- it indicates disk corruption or a bug.
|
|
- No checkpoint found on restore: returns `Ok(None)`. The caller (m1p5's `TidalDB::open`) handles this by starting with empty state and replaying the entire WAL.
|
|
|
|
## Test Strategy
|
|
|
|
### Property Tests
|
|
|
|
```rust
|
|
use proptest::prelude::*;
|
|
|
|
// Checkpoint-restore roundtrip preserves all state.
|
|
proptest! {
|
|
#[test]
|
|
fn checkpoint_restore_roundtrip(
|
|
entity_count in 1usize..50,
|
|
signals_per_entity in 1usize..20,
|
|
) {
|
|
let schema = test_schema();
|
|
let ledger = SignalLedger::new(schema.clone(), Box::new(NoopWalWriter));
|
|
|
|
// Populate with random signals
|
|
let now_ns = 1_000_000_000_000u64;
|
|
for entity in 0..entity_count as u64 {
|
|
for i in 0..signals_per_entity {
|
|
let ts = Timestamp::from_nanos(now_ns + (i as u64) * 1_000_000_000);
|
|
ledger.record_signal("view", EntityId::new(entity + 1), 1.0, ts).unwrap();
|
|
}
|
|
}
|
|
|
|
// Checkpoint to in-memory storage
|
|
let storage = InMemoryBackend::new();
|
|
let meta = CheckpointMeta { checkpoint_time_ns: now_ns, wal_sequence: 42 };
|
|
ledger.checkpoint(&storage, meta).unwrap();
|
|
|
|
// Restore into a fresh ledger
|
|
let ledger2 = SignalLedger::new(schema, Box::new(NoopWalWriter));
|
|
let restored_meta = ledger2.restore(&storage).unwrap();
|
|
|
|
// Meta matches
|
|
prop_assert_eq!(restored_meta, Some(meta));
|
|
|
|
// Entry count matches
|
|
prop_assert_eq!(ledger2.entry_count(), ledger.entry_count());
|
|
|
|
// Spot-check: decay scores match for all entities
|
|
for entity in 0..entity_count as u64 {
|
|
let eid = EntityId::new(entity + 1);
|
|
let original = ledger.read_decay_score(eid, "view", 0).unwrap();
|
|
let restored = ledger2.read_decay_score(eid, "view", 0).unwrap();
|
|
match (original, restored) {
|
|
(Some(o), Some(r)) => {
|
|
// Stored scores should match exactly (no lazy decay applied yet)
|
|
prop_assert!((o - r).abs() < 1e-10,
|
|
"entity {entity}: original={o}, restored={r}");
|
|
}
|
|
(None, None) => {}
|
|
_ => prop_assert!(false, "entity {entity}: mismatch in Some/None"),
|
|
}
|
|
}
|
|
|
|
// Spot-check: windowed counts match
|
|
for entity in 0..entity_count as u64 {
|
|
let eid = EntityId::new(entity + 1);
|
|
let orig_count = ledger.read_windowed_count(eid, "view", Window::AllTime).unwrap();
|
|
let rest_count = ledger2.read_windowed_count(eid, "view", Window::AllTime).unwrap();
|
|
prop_assert_eq!(orig_count, rest_count,
|
|
"entity {entity}: all-time count mismatch");
|
|
}
|
|
}
|
|
}
|
|
|
|
// Serialization roundtrip for individual entries.
|
|
proptest! {
|
|
#[test]
|
|
fn serialize_deserialize_entry_roundtrip(
|
|
entity_id_val in 1u64..1_000_000,
|
|
signal_type_id_val in 0u16..64,
|
|
score_0 in 0.0f64..1e12,
|
|
score_1 in 0.0f64..1e12,
|
|
score_2 in 0.0f64..1e12,
|
|
last_update in 0u64..2_000_000_000_000,
|
|
all_time in 0u64..1_000_000,
|
|
) {
|
|
let entity_id = EntityId::new(entity_id_val);
|
|
let signal_type_id = SignalTypeId::new(signal_type_id_val);
|
|
|
|
let hot = HotSignalState::new(entity_id_val, signal_type_id_val);
|
|
hot.restore(last_update, &[score_0, score_1, score_2]);
|
|
|
|
let warm = BucketedCounter::new();
|
|
// Set all-time count via increment_by
|
|
// (Or we test with the snapshot directly)
|
|
|
|
let entry = EntitySignalEntry { hot, warm };
|
|
let bytes = serialize_entry(entity_id, signal_type_id, &entry);
|
|
let (eid, stid, restored) = deserialize_entry(&bytes).unwrap();
|
|
|
|
prop_assert_eq!(eid, entity_id);
|
|
prop_assert_eq!(stid, signal_type_id);
|
|
prop_assert!((restored.hot.stored_score(0) - score_0).abs() < 1e-15);
|
|
prop_assert!((restored.hot.stored_score(1) - score_1).abs() < 1e-15);
|
|
prop_assert!((restored.hot.stored_score(2) - score_2).abs() < 1e-15);
|
|
prop_assert_eq!(restored.hot.last_update_ns(), last_update);
|
|
}
|
|
}
|
|
|
|
// Meta serialization roundtrip.
|
|
proptest! {
|
|
#[test]
|
|
fn serialize_deserialize_meta_roundtrip(
|
|
checkpoint_time_ns: u64,
|
|
wal_sequence: u64,
|
|
) {
|
|
let meta = CheckpointMeta { checkpoint_time_ns, wal_sequence };
|
|
let bytes = serialize_meta(&meta);
|
|
let restored = deserialize_meta(&bytes).unwrap();
|
|
prop_assert_eq!(restored, meta);
|
|
}
|
|
}
|
|
```
|
|
|
|
### Unit Tests
|
|
|
|
```rust
|
|
#[test]
|
|
fn checkpoint_to_empty_storage() {
|
|
let schema = test_schema();
|
|
let ledger = SignalLedger::new(schema, Box::new(NoopWalWriter));
|
|
|
|
// Record some signals
|
|
let now = Timestamp::now();
|
|
for i in 0..10 {
|
|
ledger.record_signal("view", EntityId::new(i + 1), 1.0, now).unwrap();
|
|
}
|
|
|
|
let storage = InMemoryBackend::new();
|
|
let meta = CheckpointMeta { checkpoint_time_ns: now.as_nanos(), wal_sequence: 100 };
|
|
ledger.checkpoint(&storage, meta).unwrap();
|
|
|
|
// Verify keys were written
|
|
// Meta key + 10 entity keys = 11 total
|
|
let all_keys: Vec<_> = storage.scan_prefix(&[]).collect();
|
|
assert_eq!(all_keys.len(), 11, "expected 11 keys, got {}", all_keys.len());
|
|
}
|
|
|
|
#[test]
|
|
fn restore_from_empty_storage() {
|
|
let schema = test_schema();
|
|
let ledger = SignalLedger::new(schema, Box::new(NoopWalWriter));
|
|
|
|
let storage = InMemoryBackend::new();
|
|
let meta = ledger.restore(&storage).unwrap();
|
|
|
|
assert!(meta.is_none(), "no checkpoint should return None");
|
|
assert_eq!(ledger.entry_count(), 0);
|
|
}
|
|
|
|
#[test]
|
|
fn restore_preserves_decay_scores() {
|
|
let schema = test_schema();
|
|
let ledger = SignalLedger::new(schema.clone(), Box::new(NoopWalWriter));
|
|
|
|
// Write signals with known values
|
|
let ts = Timestamp::from_nanos(1_000_000_000_000);
|
|
ledger.record_signal("view", EntityId::new(42), 5.0, ts).unwrap();
|
|
ledger.record_signal("view", EntityId::new(42), 3.0,
|
|
Timestamp::from_nanos(1_001_000_000_000)).unwrap();
|
|
|
|
// Checkpoint
|
|
let storage = InMemoryBackend::new();
|
|
let meta = CheckpointMeta { checkpoint_time_ns: 1_002_000_000_000, wal_sequence: 50 };
|
|
ledger.checkpoint(&storage, meta).unwrap();
|
|
|
|
// Restore
|
|
let ledger2 = SignalLedger::new(schema, Box::new(NoopWalWriter));
|
|
let restored_meta = ledger2.restore(&storage).unwrap().unwrap();
|
|
assert_eq!(restored_meta.wal_sequence, 50);
|
|
|
|
// Scores should match
|
|
let query_ts = Timestamp::from_nanos(1_002_000_000_000);
|
|
let original = ledger.read_decay_score(EntityId::new(42), "view", 0).unwrap();
|
|
let restored = ledger2.read_decay_score(EntityId::new(42), "view", 0).unwrap();
|
|
assert!(original.is_some());
|
|
assert!(restored.is_some());
|
|
}
|
|
|
|
#[test]
|
|
fn restore_preserves_windowed_counts() {
|
|
let schema = test_schema();
|
|
let ledger = SignalLedger::new(schema.clone(), Box::new(NoopWalWriter));
|
|
|
|
let ts = Timestamp::from_nanos(1_000_000_000_000);
|
|
for i in 0..100 {
|
|
ledger.record_signal("view", EntityId::new(1), 1.0,
|
|
Timestamp::from_nanos(ts.as_nanos() + i * 100_000_000)).unwrap();
|
|
}
|
|
|
|
let storage = InMemoryBackend::new();
|
|
let meta = CheckpointMeta { checkpoint_time_ns: ts.as_nanos() + 10_000_000_000, wal_sequence: 0 };
|
|
ledger.checkpoint(&storage, meta).unwrap();
|
|
|
|
let ledger2 = SignalLedger::new(schema, Box::new(NoopWalWriter));
|
|
ledger2.restore(&storage).unwrap();
|
|
|
|
let count_orig = ledger.read_windowed_count(EntityId::new(1), "view", Window::AllTime).unwrap();
|
|
let count_rest = ledger2.read_windowed_count(EntityId::new(1), "view", Window::AllTime).unwrap();
|
|
assert_eq!(count_orig, count_rest);
|
|
assert_eq!(count_rest, 100);
|
|
}
|
|
|
|
#[test]
|
|
fn serialize_entry_version_byte() {
|
|
let entry = EntitySignalEntry {
|
|
hot: HotSignalState::new(1, 0),
|
|
warm: BucketedCounter::new(),
|
|
};
|
|
let bytes = serialize_entry(EntityId::new(1), SignalTypeId::new(0), &entry);
|
|
assert_eq!(bytes[0], 0x01, "version byte should be 0x01");
|
|
}
|
|
|
|
#[test]
|
|
fn deserialize_entry_rejects_wrong_version() {
|
|
let mut bytes = vec![0x00; 983]; // wrong version byte
|
|
let result = deserialize_entry(&bytes);
|
|
assert!(result.is_err());
|
|
}
|
|
|
|
#[test]
|
|
fn deserialize_entry_rejects_truncated_data() {
|
|
let result = deserialize_entry(&[0x01, 0x00]); // too short
|
|
assert!(result.is_err());
|
|
}
|
|
|
|
#[test]
|
|
fn checkpoint_overwrites_previous() {
|
|
let schema = test_schema();
|
|
let ledger = SignalLedger::new(schema.clone(), Box::new(NoopWalWriter));
|
|
let storage = InMemoryBackend::new();
|
|
|
|
// First checkpoint with 5 entities
|
|
let ts = Timestamp::now();
|
|
for i in 0..5 {
|
|
ledger.record_signal("view", EntityId::new(i + 1), 1.0, ts).unwrap();
|
|
}
|
|
ledger.checkpoint(&storage, CheckpointMeta { checkpoint_time_ns: 1, wal_sequence: 10 }).unwrap();
|
|
|
|
// Second checkpoint with 3 more entities (8 total)
|
|
for i in 5..8 {
|
|
ledger.record_signal("view", EntityId::new(i + 1), 1.0, ts).unwrap();
|
|
}
|
|
ledger.checkpoint(&storage, CheckpointMeta { checkpoint_time_ns: 2, wal_sequence: 20 }).unwrap();
|
|
|
|
// Restore should have all 8 entries
|
|
let ledger2 = SignalLedger::new(schema, Box::new(NoopWalWriter));
|
|
let meta = ledger2.restore(&storage).unwrap().unwrap();
|
|
assert_eq!(meta.wal_sequence, 20);
|
|
assert_eq!(ledger2.entry_count(), 8);
|
|
}
|
|
```
|
|
|
|
## Acceptance Criteria
|
|
|
|
- [ ] `SignalLedger::checkpoint()` writes all entries to `StorageEngine` via `Tag::Sig` keys in a single `WriteBatch`
|
|
- [ ] `SignalLedger::restore()` reads all `Tag::Sig` keys and populates the `DashMap`
|
|
- [ ] Checkpoint metadata (timestamp, WAL sequence) stored at well-known key and recoverable on restore
|
|
- [ ] Checkpoint-restore roundtrip preserves: decay scores (to 15 decimal places), windowed counts (exact), all-time counts (exact)
|
|
- [ ] Serialization format has a version byte; deserialization rejects unknown versions
|
|
- [ ] Deserialization rejects truncated or corrupt data with descriptive error
|
|
- [ ] `InMemoryBackend` used for all tests (deterministic, no I/O)
|
|
- [ ] No `unsafe` code
|
|
- [ ] `cargo clippy -- -D warnings` passes
|
|
- [ ] All property tests and unit tests pass
|
|
|
|
## Research References
|
|
|
|
- [docs/research/tidaldb_signal_ledger.md](../../../research/tidaldb_signal_ledger.md) -- Section 10 (checkpoint/restore: "hot-tier state serialized to `entity_signal_state` CF every 30-60 seconds")
|
|
- [thoughts.md](../../../../thoughts.md) -- Part II.1 (WAL as source of truth: "everything else is derived state that can always be recomputed from events")
|
|
|
|
## Spec References
|
|
|
|
- [docs/specs/03-signal-system.md](../../../specs/03-signal-system.md) -- Section 3 (cold tier: `entity_signal_state` CF for crash recovery checkpoint), Section 9 (background materializer: "checkpoint hot-tier state every 30-60 seconds"), invariant INV-CR-2 (checkpoint consistency: "the hot-tier checkpoint, when restored and replayed from the checkpoint's WAL position, produces state identical to the pre-crash state"), crash recovery targets (Section 12: hot-tier restore < 10 seconds for 10M entities)
|
|
- [docs/specs/00-architecture-overview.md](../../../specs/00-architecture-overview.md) -- Section 3 (Materializer trait: `checkpoint()` writes state to storage, `restore()` reads it back)
|
|
|
|
## Implementation Notes
|
|
|
|
- The `StorageEngine` is passed as `&dyn StorageEngine` to both `checkpoint()` and `restore()`. In m1p5, `TidalDB` owns both the `SignalLedger` and the `FjallStorage`. It passes the appropriate keyspace backend to checkpoint/restore.
|
|
- The checkpoint writes to the same keyspace as entity metadata and events. The `Tag::Sig` discriminant in the key encoding ensures no collisions with `Tag::Meta` or `Tag::Evt` keys.
|
|
- At M1 scale (100 entities, 3 signal types, ~300 entries), checkpoint serializes 300 * 983 bytes = ~295 KB. Trivially fast.
|
|
- At production scale (10M entities, 6 signal types, ~60M entries), checkpoint serializes ~60M * 983 bytes = ~59 GB. This is too large for a single batch write. However, production-scale checkpointing is an M5/M6 concern. M1's checkpoint is designed for correctness, not production scale. The batch approach works at M1 scale.
|
|
- Do NOT implement incremental/delta checkpointing. Full checkpoint on every call. Incremental checkpointing (only writing changed entries) is an optimization for M5+.
|
|
- Do NOT implement checkpoint scheduling. m1p5's `TidalDB` will call `checkpoint()` on shutdown. Periodic checkpointing (every 30 seconds) is a m1p2/materializer concern.
|
|
- The `scan_prefix(&[])` approach for restore scans ALL keys, not just `Tag::Sig` keys. This is correct but not optimal -- at M1 scale it is fast. At production scale, a dedicated scan with a `Tag::Sig`-specific prefix would be needed. This optimization is deferred.
|