tidaldb/docs/planning/milestone-1/phase-4/task-04-checkpoint-and-restore.md
jordan 29400d48db feat: implement Milestone 1 phases 1-3 — schema, WAL, and storage layer
Implements the foundation of tidalDB's data pipeline:

**Phase 1 – Schema primitives**
- EntityId newtype (u64, big-endian ordering)
- SignalTypeDefinition with pre-computed decay λ, deduped/sorted windows
- SchemaBuilder with full constraint validation (duplicates, identifiers,
  half-life, windows, velocity)
- LumenError wrapping all subsystems with required From impls

**Phase 2 – Write-Ahead Log**
- Length-prefixed, BLAKE3-protected entry format
- Group-commit writer (batch up to 100 events / 10 ms)
- Double-buffered content-hash deduplication
- Checkpoint, truncation, and crash-recovery with full replay
- Integration, property, and UAT tests (incl. 5,500-event deterministic UAT)
- Proptest coverage scaled to 10 000 events/run (was ≤500) to meet
  acceptance criterion; cases reduced 100→10 to keep runtime comparable

**Phase 3 – Storage engine**
- StorageEngine trait (get/put/delete/scan/batch/flush)
- Key encoding: [EntityId][0x00][Tag][suffix] with ordering/prefix helpers
- InMemoryBackend (BTreeMap + RwLock)
- FjallStorage with three isolated keyspaces and atomic batch helper
- Property tests for key ordering and round-trip correctness

Also adds planning docs for phases 4-5, research docs, architecture
overview, and roadmap updates.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-20 16:43:24 -07:00

555 lines
24 KiB
Markdown

# Task 04: Checkpoint and Restore
## Context
**Milestone:** 1 -- Signal Engine
**Phase:** m1p4 -- Signal Ledger
**Depends On:** Task 03 (Signal Ledger and Velocity)
**Blocks:** m1p5 (Entity CRUD and Signal Write API)
**Complexity:** M
## Objective
Deliver the checkpoint and restore mechanism for the `SignalLedger`. Hot-tier decay scores and warm-tier bucketed counters are in-memory state. Without persistence, a crash loses all signal aggregates and requires full WAL replay from the beginning of time. Checkpoint/restore writes the current in-memory state to the `StorageEngine` (via `Tag::Sig`) periodically, so that crash recovery only needs to replay WAL events since the last checkpoint.
This task implements:
1. **Checkpoint:** Serialize all `DashMap` entries to the `StorageEngine` using the existing key encoding (`encode_key(entity_id, Tag::Sig, suffix)`).
2. **Restore:** On startup, scan the `Tag::Sig` key range and reconstruct `EntitySignalEntry` instances into the `DashMap`.
3. **Serialization format:** A compact binary format for `HotSignalState` and `BucketedCounterSnapshot`.
The checkpoint is a consistent snapshot of the signal ledger at a point in time. After restore, WAL events after the checkpoint's sequence number are replayed to bring the state up to date. The WAL replay mechanism itself is m1p2's responsibility; this task provides the `checkpoint()` and `restore()` methods that m1p5 will call.
## Requirements
- `SignalLedger::checkpoint()` writes all entries to `StorageEngine` via `Tag::Sig` keys
- `SignalLedger::restore()` reads all `Tag::Sig` keys and populates the `DashMap`
- Key format: `encode_key(entity_id, Tag::Sig, &[signal_type_id_hi, signal_type_id_lo])`
- Value format: deterministic binary serialization of hot-tier + warm-tier state
- Checkpoint must be consistent: no partial entries (use `write_batch` for atomicity)
- Restore + re-checkpoint produces identical storage content (roundtrip property)
- Checkpoint duration target: < 2 seconds for 10,000 entity-signal pairs
- `StorageEngine` is passed by reference -- `SignalLedger` does not own storage (m1p5's `TidalDB` owns both)
- No external serialization dependencies (no serde, no bincode) -- hand-rolled binary for control and `#![forbid(unsafe_code)]` compatibility
## Technical Design
### Module Structure
```
tidal/src/signals/
checkpoint.rs -- checkpoint, restore, serialization helpers
```
### Public API
```rust
// === signals/checkpoint.rs ===
use crate::schema::EntityId;
use crate::storage::{StorageEngine, Tag, WriteBatch, encode_key, entity_tag_prefix, parse_key};
use super::ledger::{SignalLedger, EntitySignalEntry};
use super::hot::HotSignalState;
use super::warm::BucketedCounterSnapshot;
use super::SignalTypeId;
/// Checkpoint sequence metadata stored alongside the signal state.
/// Used by the WAL replay mechanism to know where to start replaying.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct CheckpointMeta {
/// Timestamp (nanos) when the checkpoint was taken.
pub checkpoint_time_ns: u64,
/// WAL sequence number at checkpoint time.
/// Events with sequence > this number must be replayed after restore.
pub wal_sequence: u64,
}
impl SignalLedger {
/// Write all in-memory signal state to the storage engine.
///
/// Iterates over the DashMap and serializes each entry to a key-value pair
/// using `Tag::Sig`. Uses `write_batch` for atomicity -- either all entries
/// are written or none.
///
/// The checkpoint metadata (timestamp, WAL sequence) is written to a
/// well-known key: `encode_key(EntityId::new(0), Tag::Sig, b"meta")`.
///
/// # Key Format
///
/// Per-entry key: `[entity_id: 8 BE][0x00][Tag::Sig][signal_type_id: 2 BE]`
/// Meta key: `[0x00..0x00 (8 bytes)][0x00][Tag::Sig][b"meta"]`
///
/// # Errors
///
/// Returns `LumenError::Storage` if the write batch fails.
pub fn checkpoint(
&self,
storage: &dyn StorageEngine,
meta: CheckpointMeta,
) -> crate::Result<()>;
/// Restore in-memory signal state from the storage engine.
///
/// Scans all keys with `Tag::Sig` prefix for each entity kind's keyspace,
/// deserializes the values, and populates the DashMap.
///
/// Returns the checkpoint metadata (for the WAL to know where to resume).
/// Returns `None` if no checkpoint exists (first boot).
///
/// # Errors
///
/// Returns `LumenError::Storage` on I/O failure.
/// Returns `LumenError::Internal` on deserialization failure (corrupt checkpoint).
pub fn restore(
&self,
storage: &dyn StorageEngine,
) -> crate::Result<Option<CheckpointMeta>>;
/// Return the number of entries currently in the DashMap.
/// Used for diagnostics and testing.
pub fn entry_count(&self) -> usize;
}
/// Serialize an EntitySignalEntry to bytes.
///
/// Binary format (all values little-endian for simplicity):
///
/// ```text
/// Offset Size Field
/// 0 1 version (0x01)
/// 1 8 entity_id (u64 LE)
/// 9 2 signal_type_id (u16 LE)
/// 11 2 flags (u16 LE)
/// 13 8 last_update_ns (u64 LE)
/// 21 8 decay_score_0 (f64 LE, as u64 bits)
/// 29 8 decay_score_1 (f64 LE)
/// 37 8 decay_score_2 (f64 LE)
/// 45 1 current_minute (u8)
/// 46 1 current_hour (u8)
/// 47 8 all_time_count (u64 LE)
/// 55 8 last_minute_rotation_ns (u64 LE)
/// 63 8 last_hour_rotation_ns (u64 LE)
/// 71 240 minute_buckets (60 * u32 LE)
/// 311 672 hour_buckets (168 * u32 LE)
/// Total: 983 bytes
/// ```
pub fn serialize_entry(
entity_id: EntityId,
signal_type_id: SignalTypeId,
entry: &EntitySignalEntry,
) -> Vec<u8>;
/// Deserialize an EntitySignalEntry from bytes.
///
/// Returns (entity_id, signal_type_id, entry) or an error if the format is invalid.
pub fn deserialize_entry(
bytes: &[u8],
) -> Result<(EntityId, SignalTypeId, EntitySignalEntry), String>;
/// Serialize CheckpointMeta to bytes.
///
/// Format: [version: 1][checkpoint_time_ns: 8 LE][wal_sequence: 8 LE] = 17 bytes
pub fn serialize_meta(meta: &CheckpointMeta) -> Vec<u8>;
/// Deserialize CheckpointMeta from bytes.
pub fn deserialize_meta(bytes: &[u8]) -> Result<CheckpointMeta, String>;
```
### Internal Design
**Key encoding for checkpoint entries:**
Each `(EntityId, SignalTypeId)` pair maps to a storage key using the existing `encode_key` function:
```rust
let suffix = signal_type_id.as_u16().to_be_bytes();
let key = encode_key(entity_id, Tag::Sig, &suffix);
```
This produces: `[entity_id: 8 BE][0x00][0x02][signal_type_id: 2 BE]` -- 12 bytes total. The `Tag::Sig` byte (0x02) ensures checkpoint entries live in a separate namespace from event data (`Tag::Evt`) and metadata (`Tag::Meta`).
**Checkpoint meta key:**
The checkpoint metadata is stored at a well-known key using `EntityId::new(0)` as the entity ID:
```rust
let meta_key = encode_key(EntityId::new(0), Tag::Sig, b"meta");
```
Entity ID 0 is reserved for system-level keys. The suffix `b"meta"` distinguishes the checkpoint metadata from any entity-signal pair (whose suffix is exactly 2 bytes, never 4).
**Atomic checkpoint via write_batch:**
The checkpoint writes all entries plus the metadata in a single `WriteBatch`. This ensures that the checkpoint is either fully written or not written at all. If the process crashes during checkpoint, the previous checkpoint remains valid.
```rust
pub fn checkpoint(&self, storage: &dyn StorageEngine, meta: CheckpointMeta) -> crate::Result<()> {
let mut batch = WriteBatch::new();
// Write checkpoint metadata
let meta_key = encode_key(EntityId::new(0), Tag::Sig, b"meta");
batch.put(meta_key, serialize_meta(&meta));
// Write all entries
for entry_ref in self.entries.iter() {
let &(entity_id, signal_type_id) = entry_ref.key();
let entry = entry_ref.value();
let suffix = signal_type_id.as_u16().to_be_bytes();
let key = encode_key(entity_id, Tag::Sig, &suffix);
let value = serialize_entry(entity_id, signal_type_id, entry);
batch.put(key, value);
}
storage.write_batch(batch)?;
storage.flush()?;
Ok(())
}
```
**Restore via prefix scan:**
On restore, we scan all keys under `Tag::Sig` for each entity kind. However, at M1 scope, we only have one keyspace (items). The scan uses `entity_tag_prefix` is not sufficient since we need to scan across ALL entities. Instead, we scan all keys in the keyspace and filter by `Tag::Sig`:
Actually, a simpler approach: scan by a known pattern. Since all checkpoint keys have `Tag::Sig` (0x02) at byte position 9, and we want all of them, we scan the entire keyspace and filter. But `scan_prefix` requires a prefix. We can iterate entity IDs 0..MAX, but that is impractical.
Better approach: the `SignalLedger::restore` accepts a `&dyn StorageEngine` that represents a single keyspace (items in M1). It performs `scan_prefix(&[])` -- an empty prefix that returns all keys -- and filters for `Tag::Sig` keys, excluding the meta key.
Wait -- `scan_prefix` with empty prefix returns all keys. Then `parse_key` extracts the tag. This works.
```rust
pub fn restore(&self, storage: &dyn StorageEngine) -> crate::Result<Option<CheckpointMeta>> {
let mut meta: Option<CheckpointMeta> = None;
// Read the meta key first
let meta_key = encode_key(EntityId::new(0), Tag::Sig, b"meta");
if let Some(meta_bytes) = storage.get(&meta_key)? {
meta = Some(deserialize_meta(&meta_bytes)
.map_err(|e| LumenError::Internal(format!("corrupt checkpoint meta: {e}")))?);
}
// Scan all Tag::Sig keys (excluding meta)
// Use entity_id=0 tag prefix to get the meta, then scan higher entity IDs
// Actually, iterate all keys and filter:
for (key, value) in storage.scan_prefix(&[]) {
if let Some((entity_id, Tag::Sig, suffix)) = parse_key(&key) {
// Skip the meta key
if entity_id == EntityId::new(0) && suffix == b"meta" {
continue;
}
let (eid, stid, entry) = deserialize_entry(&value)
.map_err(|e| LumenError::Internal(format!("corrupt checkpoint entry: {e}")))?;
self.entries.insert((eid, stid), entry);
}
}
Ok(meta)
}
```
**Serialization format:**
Hand-rolled binary serialization is used instead of serde/bincode because:
1. Zero additional dependencies
2. Full control over format stability
3. Trivial to implement for fixed-layout structs
4. Compatible with `#![forbid(unsafe_code)]` without question
The format uses a version byte (0x01) at offset 0. If the format changes in future milestones, the version byte enables backward-compatible deserialization.
Little-endian is used for serialized values (vs big-endian for storage keys). The choice does not matter for correctness; little-endian matches the native byte order on x86/ARM64/RISC-V (the target platforms) and avoids byte-swapping on the common path.
### Error Handling
- Storage write failure: returns `LumenError::Storage(StorageError::...)`.
- Corrupt checkpoint data (deserialization failure): returns `LumenError::Internal(...)` with a descriptive message. This should never happen in normal operation -- it indicates disk corruption or a bug.
- No checkpoint found on restore: returns `Ok(None)`. The caller (m1p5's `TidalDB::open`) handles this by starting with empty state and replaying the entire WAL.
## Test Strategy
### Property Tests
```rust
use proptest::prelude::*;
// Checkpoint-restore roundtrip preserves all state.
proptest! {
#[test]
fn checkpoint_restore_roundtrip(
entity_count in 1usize..50,
signals_per_entity in 1usize..20,
) {
let schema = test_schema();
let ledger = SignalLedger::new(schema.clone(), Box::new(NoopWalWriter));
// Populate with random signals
let now_ns = 1_000_000_000_000u64;
for entity in 0..entity_count as u64 {
for i in 0..signals_per_entity {
let ts = Timestamp::from_nanos(now_ns + (i as u64) * 1_000_000_000);
ledger.record_signal("view", EntityId::new(entity + 1), 1.0, ts).unwrap();
}
}
// Checkpoint to in-memory storage
let storage = InMemoryBackend::new();
let meta = CheckpointMeta { checkpoint_time_ns: now_ns, wal_sequence: 42 };
ledger.checkpoint(&storage, meta).unwrap();
// Restore into a fresh ledger
let ledger2 = SignalLedger::new(schema, Box::new(NoopWalWriter));
let restored_meta = ledger2.restore(&storage).unwrap();
// Meta matches
prop_assert_eq!(restored_meta, Some(meta));
// Entry count matches
prop_assert_eq!(ledger2.entry_count(), ledger.entry_count());
// Spot-check: decay scores match for all entities
for entity in 0..entity_count as u64 {
let eid = EntityId::new(entity + 1);
let original = ledger.read_decay_score(eid, "view", 0).unwrap();
let restored = ledger2.read_decay_score(eid, "view", 0).unwrap();
match (original, restored) {
(Some(o), Some(r)) => {
// Stored scores should match exactly (no lazy decay applied yet)
prop_assert!((o - r).abs() < 1e-10,
"entity {entity}: original={o}, restored={r}");
}
(None, None) => {}
_ => prop_assert!(false, "entity {entity}: mismatch in Some/None"),
}
}
// Spot-check: windowed counts match
for entity in 0..entity_count as u64 {
let eid = EntityId::new(entity + 1);
let orig_count = ledger.read_windowed_count(eid, "view", Window::AllTime).unwrap();
let rest_count = ledger2.read_windowed_count(eid, "view", Window::AllTime).unwrap();
prop_assert_eq!(orig_count, rest_count,
"entity {entity}: all-time count mismatch");
}
}
}
// Serialization roundtrip for individual entries.
proptest! {
#[test]
fn serialize_deserialize_entry_roundtrip(
entity_id_val in 1u64..1_000_000,
signal_type_id_val in 0u16..64,
score_0 in 0.0f64..1e12,
score_1 in 0.0f64..1e12,
score_2 in 0.0f64..1e12,
last_update in 0u64..2_000_000_000_000,
all_time in 0u64..1_000_000,
) {
let entity_id = EntityId::new(entity_id_val);
let signal_type_id = SignalTypeId::new(signal_type_id_val);
let hot = HotSignalState::new(entity_id_val, signal_type_id_val);
hot.restore(last_update, &[score_0, score_1, score_2]);
let warm = BucketedCounter::new();
// Set all-time count via increment_by
// (Or we test with the snapshot directly)
let entry = EntitySignalEntry { hot, warm };
let bytes = serialize_entry(entity_id, signal_type_id, &entry);
let (eid, stid, restored) = deserialize_entry(&bytes).unwrap();
prop_assert_eq!(eid, entity_id);
prop_assert_eq!(stid, signal_type_id);
prop_assert!((restored.hot.stored_score(0) - score_0).abs() < 1e-15);
prop_assert!((restored.hot.stored_score(1) - score_1).abs() < 1e-15);
prop_assert!((restored.hot.stored_score(2) - score_2).abs() < 1e-15);
prop_assert_eq!(restored.hot.last_update_ns(), last_update);
}
}
// Meta serialization roundtrip.
proptest! {
#[test]
fn serialize_deserialize_meta_roundtrip(
checkpoint_time_ns: u64,
wal_sequence: u64,
) {
let meta = CheckpointMeta { checkpoint_time_ns, wal_sequence };
let bytes = serialize_meta(&meta);
let restored = deserialize_meta(&bytes).unwrap();
prop_assert_eq!(restored, meta);
}
}
```
### Unit Tests
```rust
#[test]
fn checkpoint_to_empty_storage() {
let schema = test_schema();
let ledger = SignalLedger::new(schema, Box::new(NoopWalWriter));
// Record some signals
let now = Timestamp::now();
for i in 0..10 {
ledger.record_signal("view", EntityId::new(i + 1), 1.0, now).unwrap();
}
let storage = InMemoryBackend::new();
let meta = CheckpointMeta { checkpoint_time_ns: now.as_nanos(), wal_sequence: 100 };
ledger.checkpoint(&storage, meta).unwrap();
// Verify keys were written
// Meta key + 10 entity keys = 11 total
let all_keys: Vec<_> = storage.scan_prefix(&[]).collect();
assert_eq!(all_keys.len(), 11, "expected 11 keys, got {}", all_keys.len());
}
#[test]
fn restore_from_empty_storage() {
let schema = test_schema();
let ledger = SignalLedger::new(schema, Box::new(NoopWalWriter));
let storage = InMemoryBackend::new();
let meta = ledger.restore(&storage).unwrap();
assert!(meta.is_none(), "no checkpoint should return None");
assert_eq!(ledger.entry_count(), 0);
}
#[test]
fn restore_preserves_decay_scores() {
let schema = test_schema();
let ledger = SignalLedger::new(schema.clone(), Box::new(NoopWalWriter));
// Write signals with known values
let ts = Timestamp::from_nanos(1_000_000_000_000);
ledger.record_signal("view", EntityId::new(42), 5.0, ts).unwrap();
ledger.record_signal("view", EntityId::new(42), 3.0,
Timestamp::from_nanos(1_001_000_000_000)).unwrap();
// Checkpoint
let storage = InMemoryBackend::new();
let meta = CheckpointMeta { checkpoint_time_ns: 1_002_000_000_000, wal_sequence: 50 };
ledger.checkpoint(&storage, meta).unwrap();
// Restore
let ledger2 = SignalLedger::new(schema, Box::new(NoopWalWriter));
let restored_meta = ledger2.restore(&storage).unwrap().unwrap();
assert_eq!(restored_meta.wal_sequence, 50);
// Scores should match
let query_ts = Timestamp::from_nanos(1_002_000_000_000);
let original = ledger.read_decay_score(EntityId::new(42), "view", 0).unwrap();
let restored = ledger2.read_decay_score(EntityId::new(42), "view", 0).unwrap();
assert!(original.is_some());
assert!(restored.is_some());
}
#[test]
fn restore_preserves_windowed_counts() {
let schema = test_schema();
let ledger = SignalLedger::new(schema.clone(), Box::new(NoopWalWriter));
let ts = Timestamp::from_nanos(1_000_000_000_000);
for i in 0..100 {
ledger.record_signal("view", EntityId::new(1), 1.0,
Timestamp::from_nanos(ts.as_nanos() + i * 100_000_000)).unwrap();
}
let storage = InMemoryBackend::new();
let meta = CheckpointMeta { checkpoint_time_ns: ts.as_nanos() + 10_000_000_000, wal_sequence: 0 };
ledger.checkpoint(&storage, meta).unwrap();
let ledger2 = SignalLedger::new(schema, Box::new(NoopWalWriter));
ledger2.restore(&storage).unwrap();
let count_orig = ledger.read_windowed_count(EntityId::new(1), "view", Window::AllTime).unwrap();
let count_rest = ledger2.read_windowed_count(EntityId::new(1), "view", Window::AllTime).unwrap();
assert_eq!(count_orig, count_rest);
assert_eq!(count_rest, 100);
}
#[test]
fn serialize_entry_version_byte() {
let entry = EntitySignalEntry {
hot: HotSignalState::new(1, 0),
warm: BucketedCounter::new(),
};
let bytes = serialize_entry(EntityId::new(1), SignalTypeId::new(0), &entry);
assert_eq!(bytes[0], 0x01, "version byte should be 0x01");
}
#[test]
fn deserialize_entry_rejects_wrong_version() {
let mut bytes = vec![0x00; 983]; // wrong version byte
let result = deserialize_entry(&bytes);
assert!(result.is_err());
}
#[test]
fn deserialize_entry_rejects_truncated_data() {
let result = deserialize_entry(&[0x01, 0x00]); // too short
assert!(result.is_err());
}
#[test]
fn checkpoint_overwrites_previous() {
let schema = test_schema();
let ledger = SignalLedger::new(schema.clone(), Box::new(NoopWalWriter));
let storage = InMemoryBackend::new();
// First checkpoint with 5 entities
let ts = Timestamp::now();
for i in 0..5 {
ledger.record_signal("view", EntityId::new(i + 1), 1.0, ts).unwrap();
}
ledger.checkpoint(&storage, CheckpointMeta { checkpoint_time_ns: 1, wal_sequence: 10 }).unwrap();
// Second checkpoint with 3 more entities (8 total)
for i in 5..8 {
ledger.record_signal("view", EntityId::new(i + 1), 1.0, ts).unwrap();
}
ledger.checkpoint(&storage, CheckpointMeta { checkpoint_time_ns: 2, wal_sequence: 20 }).unwrap();
// Restore should have all 8 entries
let ledger2 = SignalLedger::new(schema, Box::new(NoopWalWriter));
let meta = ledger2.restore(&storage).unwrap().unwrap();
assert_eq!(meta.wal_sequence, 20);
assert_eq!(ledger2.entry_count(), 8);
}
```
## Acceptance Criteria
- [ ] `SignalLedger::checkpoint()` writes all entries to `StorageEngine` via `Tag::Sig` keys in a single `WriteBatch`
- [ ] `SignalLedger::restore()` reads all `Tag::Sig` keys and populates the `DashMap`
- [ ] Checkpoint metadata (timestamp, WAL sequence) stored at well-known key and recoverable on restore
- [ ] Checkpoint-restore roundtrip preserves: decay scores (to 15 decimal places), windowed counts (exact), all-time counts (exact)
- [ ] Serialization format has a version byte; deserialization rejects unknown versions
- [ ] Deserialization rejects truncated or corrupt data with descriptive error
- [ ] `InMemoryBackend` used for all tests (deterministic, no I/O)
- [ ] No `unsafe` code
- [ ] `cargo clippy -- -D warnings` passes
- [ ] All property tests and unit tests pass
## Research References
- [docs/research/tidaldb_signal_ledger.md](../../../research/tidaldb_signal_ledger.md) -- Section 10 (checkpoint/restore: "hot-tier state serialized to `entity_signal_state` CF every 30-60 seconds")
- [thoughts.md](../../../../thoughts.md) -- Part II.1 (WAL as source of truth: "everything else is derived state that can always be recomputed from events")
## Spec References
- [docs/specs/03-signal-system.md](../../../specs/03-signal-system.md) -- Section 3 (cold tier: `entity_signal_state` CF for crash recovery checkpoint), Section 9 (background materializer: "checkpoint hot-tier state every 30-60 seconds"), invariant INV-CR-2 (checkpoint consistency: "the hot-tier checkpoint, when restored and replayed from the checkpoint's WAL position, produces state identical to the pre-crash state"), crash recovery targets (Section 12: hot-tier restore < 10 seconds for 10M entities)
- [docs/specs/00-architecture-overview.md](../../../specs/00-architecture-overview.md) -- Section 3 (Materializer trait: `checkpoint()` writes state to storage, `restore()` reads it back)
## Implementation Notes
- The `StorageEngine` is passed as `&dyn StorageEngine` to both `checkpoint()` and `restore()`. In m1p5, `TidalDB` owns both the `SignalLedger` and the `FjallStorage`. It passes the appropriate keyspace backend to checkpoint/restore.
- The checkpoint writes to the same keyspace as entity metadata and events. The `Tag::Sig` discriminant in the key encoding ensures no collisions with `Tag::Meta` or `Tag::Evt` keys.
- At M1 scale (100 entities, 3 signal types, ~300 entries), checkpoint serializes 300 * 983 bytes = ~295 KB. Trivially fast.
- At production scale (10M entities, 6 signal types, ~60M entries), checkpoint serializes ~60M * 983 bytes = ~59 GB. This is too large for a single batch write. However, production-scale checkpointing is an M5/M6 concern. M1's checkpoint is designed for correctness, not production scale. The batch approach works at M1 scale.
- Do NOT implement incremental/delta checkpointing. Full checkpoint on every call. Incremental checkpointing (only writing changed entries) is an optimization for M5+.
- Do NOT implement checkpoint scheduling. m1p5's `TidalDB` will call `checkpoint()` on shutdown. Periodic checkpointing (every 30 seconds) is a m1p2/materializer concern.
- The `scan_prefix(&[])` approach for restore scans ALL keys, not just `Tag::Sig` keys. This is correct but not optimal -- at M1 scale it is fast. At production scale, a dedicated scan with a `Tag::Sig`-specific prefix would be needed. This optimization is deferred.