Implements the foundation of tidalDB's data pipeline: **Phase 1 – Schema primitives** - EntityId newtype (u64, big-endian ordering) - SignalTypeDefinition with pre-computed decay λ, deduped/sorted windows - SchemaBuilder with full constraint validation (duplicates, identifiers, half-life, windows, velocity) - LumenError wrapping all subsystems with required From impls **Phase 2 – Write-Ahead Log** - Length-prefixed, BLAKE3-protected entry format - Group-commit writer (batch up to 100 events / 10 ms) - Double-buffered content-hash deduplication - Checkpoint, truncation, and crash-recovery with full replay - Integration, property, and UAT tests (incl. 5,500-event deterministic UAT) - Proptest coverage scaled to 10 000 events/run (was ≤500) to meet acceptance criterion; cases reduced 100→10 to keep runtime comparable **Phase 3 – Storage engine** - StorageEngine trait (get/put/delete/scan/batch/flush) - Key encoding: [EntityId][0x00][Tag][suffix] with ordering/prefix helpers - InMemoryBackend (BTreeMap + RwLock) - FjallStorage with three isolated keyspaces and atomic batch helper - Property tests for key ordering and round-trip correctness Also adds planning docs for phases 4-5, research docs, architecture overview, and roadmap updates. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
24 KiB
Task 04: Checkpoint and Restore
Context
Milestone: 1 -- Signal Engine Phase: m1p4 -- Signal Ledger Depends On: Task 03 (Signal Ledger and Velocity) Blocks: m1p5 (Entity CRUD and Signal Write API) Complexity: M
Objective
Deliver the checkpoint and restore mechanism for the SignalLedger. Hot-tier decay scores and warm-tier bucketed counters are in-memory state. Without persistence, a crash loses all signal aggregates and requires full WAL replay from the beginning of time. Checkpoint/restore writes the current in-memory state to the StorageEngine (via Tag::Sig) periodically, so that crash recovery only needs to replay WAL events since the last checkpoint.
This task implements:
- Checkpoint: Serialize all
DashMapentries to theStorageEngineusing the existing key encoding (encode_key(entity_id, Tag::Sig, suffix)). - Restore: On startup, scan the
Tag::Sigkey range and reconstructEntitySignalEntryinstances into theDashMap. - Serialization format: A compact binary format for
HotSignalStateandBucketedCounterSnapshot.
The checkpoint is a consistent snapshot of the signal ledger at a point in time. After restore, WAL events after the checkpoint's sequence number are replayed to bring the state up to date. The WAL replay mechanism itself is m1p2's responsibility; this task provides the checkpoint() and restore() methods that m1p5 will call.
Requirements
SignalLedger::checkpoint()writes all entries toStorageEngineviaTag::SigkeysSignalLedger::restore()reads allTag::Sigkeys and populates theDashMap- Key format:
encode_key(entity_id, Tag::Sig, &[signal_type_id_hi, signal_type_id_lo]) - Value format: deterministic binary serialization of hot-tier + warm-tier state
- Checkpoint must be consistent: no partial entries (use
write_batchfor atomicity) - Restore + re-checkpoint produces identical storage content (roundtrip property)
- Checkpoint duration target: < 2 seconds for 10,000 entity-signal pairs
StorageEngineis passed by reference --SignalLedgerdoes not own storage (m1p5'sTidalDBowns both)- No external serialization dependencies (no serde, no bincode) -- hand-rolled binary for control and
#![forbid(unsafe_code)]compatibility
Technical Design
Module Structure
tidal/src/signals/
checkpoint.rs -- checkpoint, restore, serialization helpers
Public API
// === signals/checkpoint.rs ===
use crate::schema::EntityId;
use crate::storage::{StorageEngine, Tag, WriteBatch, encode_key, entity_tag_prefix, parse_key};
use super::ledger::{SignalLedger, EntitySignalEntry};
use super::hot::HotSignalState;
use super::warm::BucketedCounterSnapshot;
use super::SignalTypeId;
/// Checkpoint sequence metadata stored alongside the signal state.
/// Used by the WAL replay mechanism to know where to start replaying.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct CheckpointMeta {
/// Timestamp (nanos) when the checkpoint was taken.
pub checkpoint_time_ns: u64,
/// WAL sequence number at checkpoint time.
/// Events with sequence > this number must be replayed after restore.
pub wal_sequence: u64,
}
impl SignalLedger {
/// Write all in-memory signal state to the storage engine.
///
/// Iterates over the DashMap and serializes each entry to a key-value pair
/// using `Tag::Sig`. Uses `write_batch` for atomicity -- either all entries
/// are written or none.
///
/// The checkpoint metadata (timestamp, WAL sequence) is written to a
/// well-known key: `encode_key(EntityId::new(0), Tag::Sig, b"meta")`.
///
/// # Key Format
///
/// Per-entry key: `[entity_id: 8 BE][0x00][Tag::Sig][signal_type_id: 2 BE]`
/// Meta key: `[0x00..0x00 (8 bytes)][0x00][Tag::Sig][b"meta"]`
///
/// # Errors
///
/// Returns `LumenError::Storage` if the write batch fails.
pub fn checkpoint(
&self,
storage: &dyn StorageEngine,
meta: CheckpointMeta,
) -> crate::Result<()>;
/// Restore in-memory signal state from the storage engine.
///
/// Scans all keys with `Tag::Sig` prefix for each entity kind's keyspace,
/// deserializes the values, and populates the DashMap.
///
/// Returns the checkpoint metadata (for the WAL to know where to resume).
/// Returns `None` if no checkpoint exists (first boot).
///
/// # Errors
///
/// Returns `LumenError::Storage` on I/O failure.
/// Returns `LumenError::Internal` on deserialization failure (corrupt checkpoint).
pub fn restore(
&self,
storage: &dyn StorageEngine,
) -> crate::Result<Option<CheckpointMeta>>;
/// Return the number of entries currently in the DashMap.
/// Used for diagnostics and testing.
pub fn entry_count(&self) -> usize;
}
/// Serialize an EntitySignalEntry to bytes.
///
/// Binary format (all values little-endian for simplicity):
///
/// ```text
/// Offset Size Field
/// 0 1 version (0x01)
/// 1 8 entity_id (u64 LE)
/// 9 2 signal_type_id (u16 LE)
/// 11 2 flags (u16 LE)
/// 13 8 last_update_ns (u64 LE)
/// 21 8 decay_score_0 (f64 LE, as u64 bits)
/// 29 8 decay_score_1 (f64 LE)
/// 37 8 decay_score_2 (f64 LE)
/// 45 1 current_minute (u8)
/// 46 1 current_hour (u8)
/// 47 8 all_time_count (u64 LE)
/// 55 8 last_minute_rotation_ns (u64 LE)
/// 63 8 last_hour_rotation_ns (u64 LE)
/// 71 240 minute_buckets (60 * u32 LE)
/// 311 672 hour_buckets (168 * u32 LE)
/// Total: 983 bytes
/// ```
pub fn serialize_entry(
entity_id: EntityId,
signal_type_id: SignalTypeId,
entry: &EntitySignalEntry,
) -> Vec<u8>;
/// Deserialize an EntitySignalEntry from bytes.
///
/// Returns (entity_id, signal_type_id, entry) or an error if the format is invalid.
pub fn deserialize_entry(
bytes: &[u8],
) -> Result<(EntityId, SignalTypeId, EntitySignalEntry), String>;
/// Serialize CheckpointMeta to bytes.
///
/// Format: [version: 1][checkpoint_time_ns: 8 LE][wal_sequence: 8 LE] = 17 bytes
pub fn serialize_meta(meta: &CheckpointMeta) -> Vec<u8>;
/// Deserialize CheckpointMeta from bytes.
pub fn deserialize_meta(bytes: &[u8]) -> Result<CheckpointMeta, String>;
Internal Design
Key encoding for checkpoint entries:
Each (EntityId, SignalTypeId) pair maps to a storage key using the existing encode_key function:
let suffix = signal_type_id.as_u16().to_be_bytes();
let key = encode_key(entity_id, Tag::Sig, &suffix);
This produces: [entity_id: 8 BE][0x00][0x02][signal_type_id: 2 BE] -- 12 bytes total. The Tag::Sig byte (0x02) ensures checkpoint entries live in a separate namespace from event data (Tag::Evt) and metadata (Tag::Meta).
Checkpoint meta key:
The checkpoint metadata is stored at a well-known key using EntityId::new(0) as the entity ID:
let meta_key = encode_key(EntityId::new(0), Tag::Sig, b"meta");
Entity ID 0 is reserved for system-level keys. The suffix b"meta" distinguishes the checkpoint metadata from any entity-signal pair (whose suffix is exactly 2 bytes, never 4).
Atomic checkpoint via write_batch:
The checkpoint writes all entries plus the metadata in a single WriteBatch. This ensures that the checkpoint is either fully written or not written at all. If the process crashes during checkpoint, the previous checkpoint remains valid.
pub fn checkpoint(&self, storage: &dyn StorageEngine, meta: CheckpointMeta) -> crate::Result<()> {
let mut batch = WriteBatch::new();
// Write checkpoint metadata
let meta_key = encode_key(EntityId::new(0), Tag::Sig, b"meta");
batch.put(meta_key, serialize_meta(&meta));
// Write all entries
for entry_ref in self.entries.iter() {
let &(entity_id, signal_type_id) = entry_ref.key();
let entry = entry_ref.value();
let suffix = signal_type_id.as_u16().to_be_bytes();
let key = encode_key(entity_id, Tag::Sig, &suffix);
let value = serialize_entry(entity_id, signal_type_id, entry);
batch.put(key, value);
}
storage.write_batch(batch)?;
storage.flush()?;
Ok(())
}
Restore via prefix scan:
On restore, we scan all keys under Tag::Sig for each entity kind. However, at M1 scope, we only have one keyspace (items). The scan uses entity_tag_prefix is not sufficient since we need to scan across ALL entities. Instead, we scan all keys in the keyspace and filter by Tag::Sig:
Actually, a simpler approach: scan by a known pattern. Since all checkpoint keys have Tag::Sig (0x02) at byte position 9, and we want all of them, we scan the entire keyspace and filter. But scan_prefix requires a prefix. We can iterate entity IDs 0..MAX, but that is impractical.
Better approach: the SignalLedger::restore accepts a &dyn StorageEngine that represents a single keyspace (items in M1). It performs scan_prefix(&[]) -- an empty prefix that returns all keys -- and filters for Tag::Sig keys, excluding the meta key.
Wait -- scan_prefix with empty prefix returns all keys. Then parse_key extracts the tag. This works.
pub fn restore(&self, storage: &dyn StorageEngine) -> crate::Result<Option<CheckpointMeta>> {
let mut meta: Option<CheckpointMeta> = None;
// Read the meta key first
let meta_key = encode_key(EntityId::new(0), Tag::Sig, b"meta");
if let Some(meta_bytes) = storage.get(&meta_key)? {
meta = Some(deserialize_meta(&meta_bytes)
.map_err(|e| LumenError::Internal(format!("corrupt checkpoint meta: {e}")))?);
}
// Scan all Tag::Sig keys (excluding meta)
// Use entity_id=0 tag prefix to get the meta, then scan higher entity IDs
// Actually, iterate all keys and filter:
for (key, value) in storage.scan_prefix(&[]) {
if let Some((entity_id, Tag::Sig, suffix)) = parse_key(&key) {
// Skip the meta key
if entity_id == EntityId::new(0) && suffix == b"meta" {
continue;
}
let (eid, stid, entry) = deserialize_entry(&value)
.map_err(|e| LumenError::Internal(format!("corrupt checkpoint entry: {e}")))?;
self.entries.insert((eid, stid), entry);
}
}
Ok(meta)
}
Serialization format:
Hand-rolled binary serialization is used instead of serde/bincode because:
- Zero additional dependencies
- Full control over format stability
- Trivial to implement for fixed-layout structs
- Compatible with
#![forbid(unsafe_code)]without question
The format uses a version byte (0x01) at offset 0. If the format changes in future milestones, the version byte enables backward-compatible deserialization.
Little-endian is used for serialized values (vs big-endian for storage keys). The choice does not matter for correctness; little-endian matches the native byte order on x86/ARM64/RISC-V (the target platforms) and avoids byte-swapping on the common path.
Error Handling
- Storage write failure: returns
LumenError::Storage(StorageError::...). - Corrupt checkpoint data (deserialization failure): returns
LumenError::Internal(...)with a descriptive message. This should never happen in normal operation -- it indicates disk corruption or a bug. - No checkpoint found on restore: returns
Ok(None). The caller (m1p5'sTidalDB::open) handles this by starting with empty state and replaying the entire WAL.
Test Strategy
Property Tests
use proptest::prelude::*;
// Checkpoint-restore roundtrip preserves all state.
proptest! {
#[test]
fn checkpoint_restore_roundtrip(
entity_count in 1usize..50,
signals_per_entity in 1usize..20,
) {
let schema = test_schema();
let ledger = SignalLedger::new(schema.clone(), Box::new(NoopWalWriter));
// Populate with random signals
let now_ns = 1_000_000_000_000u64;
for entity in 0..entity_count as u64 {
for i in 0..signals_per_entity {
let ts = Timestamp::from_nanos(now_ns + (i as u64) * 1_000_000_000);
ledger.record_signal("view", EntityId::new(entity + 1), 1.0, ts).unwrap();
}
}
// Checkpoint to in-memory storage
let storage = InMemoryBackend::new();
let meta = CheckpointMeta { checkpoint_time_ns: now_ns, wal_sequence: 42 };
ledger.checkpoint(&storage, meta).unwrap();
// Restore into a fresh ledger
let ledger2 = SignalLedger::new(schema, Box::new(NoopWalWriter));
let restored_meta = ledger2.restore(&storage).unwrap();
// Meta matches
prop_assert_eq!(restored_meta, Some(meta));
// Entry count matches
prop_assert_eq!(ledger2.entry_count(), ledger.entry_count());
// Spot-check: decay scores match for all entities
for entity in 0..entity_count as u64 {
let eid = EntityId::new(entity + 1);
let original = ledger.read_decay_score(eid, "view", 0).unwrap();
let restored = ledger2.read_decay_score(eid, "view", 0).unwrap();
match (original, restored) {
(Some(o), Some(r)) => {
// Stored scores should match exactly (no lazy decay applied yet)
prop_assert!((o - r).abs() < 1e-10,
"entity {entity}: original={o}, restored={r}");
}
(None, None) => {}
_ => prop_assert!(false, "entity {entity}: mismatch in Some/None"),
}
}
// Spot-check: windowed counts match
for entity in 0..entity_count as u64 {
let eid = EntityId::new(entity + 1);
let orig_count = ledger.read_windowed_count(eid, "view", Window::AllTime).unwrap();
let rest_count = ledger2.read_windowed_count(eid, "view", Window::AllTime).unwrap();
prop_assert_eq!(orig_count, rest_count,
"entity {entity}: all-time count mismatch");
}
}
}
// Serialization roundtrip for individual entries.
proptest! {
#[test]
fn serialize_deserialize_entry_roundtrip(
entity_id_val in 1u64..1_000_000,
signal_type_id_val in 0u16..64,
score_0 in 0.0f64..1e12,
score_1 in 0.0f64..1e12,
score_2 in 0.0f64..1e12,
last_update in 0u64..2_000_000_000_000,
all_time in 0u64..1_000_000,
) {
let entity_id = EntityId::new(entity_id_val);
let signal_type_id = SignalTypeId::new(signal_type_id_val);
let hot = HotSignalState::new(entity_id_val, signal_type_id_val);
hot.restore(last_update, &[score_0, score_1, score_2]);
let warm = BucketedCounter::new();
// Set all-time count via increment_by
// (Or we test with the snapshot directly)
let entry = EntitySignalEntry { hot, warm };
let bytes = serialize_entry(entity_id, signal_type_id, &entry);
let (eid, stid, restored) = deserialize_entry(&bytes).unwrap();
prop_assert_eq!(eid, entity_id);
prop_assert_eq!(stid, signal_type_id);
prop_assert!((restored.hot.stored_score(0) - score_0).abs() < 1e-15);
prop_assert!((restored.hot.stored_score(1) - score_1).abs() < 1e-15);
prop_assert!((restored.hot.stored_score(2) - score_2).abs() < 1e-15);
prop_assert_eq!(restored.hot.last_update_ns(), last_update);
}
}
// Meta serialization roundtrip.
proptest! {
#[test]
fn serialize_deserialize_meta_roundtrip(
checkpoint_time_ns: u64,
wal_sequence: u64,
) {
let meta = CheckpointMeta { checkpoint_time_ns, wal_sequence };
let bytes = serialize_meta(&meta);
let restored = deserialize_meta(&bytes).unwrap();
prop_assert_eq!(restored, meta);
}
}
Unit Tests
#[test]
fn checkpoint_to_empty_storage() {
let schema = test_schema();
let ledger = SignalLedger::new(schema, Box::new(NoopWalWriter));
// Record some signals
let now = Timestamp::now();
for i in 0..10 {
ledger.record_signal("view", EntityId::new(i + 1), 1.0, now).unwrap();
}
let storage = InMemoryBackend::new();
let meta = CheckpointMeta { checkpoint_time_ns: now.as_nanos(), wal_sequence: 100 };
ledger.checkpoint(&storage, meta).unwrap();
// Verify keys were written
// Meta key + 10 entity keys = 11 total
let all_keys: Vec<_> = storage.scan_prefix(&[]).collect();
assert_eq!(all_keys.len(), 11, "expected 11 keys, got {}", all_keys.len());
}
#[test]
fn restore_from_empty_storage() {
let schema = test_schema();
let ledger = SignalLedger::new(schema, Box::new(NoopWalWriter));
let storage = InMemoryBackend::new();
let meta = ledger.restore(&storage).unwrap();
assert!(meta.is_none(), "no checkpoint should return None");
assert_eq!(ledger.entry_count(), 0);
}
#[test]
fn restore_preserves_decay_scores() {
let schema = test_schema();
let ledger = SignalLedger::new(schema.clone(), Box::new(NoopWalWriter));
// Write signals with known values
let ts = Timestamp::from_nanos(1_000_000_000_000);
ledger.record_signal("view", EntityId::new(42), 5.0, ts).unwrap();
ledger.record_signal("view", EntityId::new(42), 3.0,
Timestamp::from_nanos(1_001_000_000_000)).unwrap();
// Checkpoint
let storage = InMemoryBackend::new();
let meta = CheckpointMeta { checkpoint_time_ns: 1_002_000_000_000, wal_sequence: 50 };
ledger.checkpoint(&storage, meta).unwrap();
// Restore
let ledger2 = SignalLedger::new(schema, Box::new(NoopWalWriter));
let restored_meta = ledger2.restore(&storage).unwrap().unwrap();
assert_eq!(restored_meta.wal_sequence, 50);
// Scores should match
let query_ts = Timestamp::from_nanos(1_002_000_000_000);
let original = ledger.read_decay_score(EntityId::new(42), "view", 0).unwrap();
let restored = ledger2.read_decay_score(EntityId::new(42), "view", 0).unwrap();
assert!(original.is_some());
assert!(restored.is_some());
}
#[test]
fn restore_preserves_windowed_counts() {
let schema = test_schema();
let ledger = SignalLedger::new(schema.clone(), Box::new(NoopWalWriter));
let ts = Timestamp::from_nanos(1_000_000_000_000);
for i in 0..100 {
ledger.record_signal("view", EntityId::new(1), 1.0,
Timestamp::from_nanos(ts.as_nanos() + i * 100_000_000)).unwrap();
}
let storage = InMemoryBackend::new();
let meta = CheckpointMeta { checkpoint_time_ns: ts.as_nanos() + 10_000_000_000, wal_sequence: 0 };
ledger.checkpoint(&storage, meta).unwrap();
let ledger2 = SignalLedger::new(schema, Box::new(NoopWalWriter));
ledger2.restore(&storage).unwrap();
let count_orig = ledger.read_windowed_count(EntityId::new(1), "view", Window::AllTime).unwrap();
let count_rest = ledger2.read_windowed_count(EntityId::new(1), "view", Window::AllTime).unwrap();
assert_eq!(count_orig, count_rest);
assert_eq!(count_rest, 100);
}
#[test]
fn serialize_entry_version_byte() {
let entry = EntitySignalEntry {
hot: HotSignalState::new(1, 0),
warm: BucketedCounter::new(),
};
let bytes = serialize_entry(EntityId::new(1), SignalTypeId::new(0), &entry);
assert_eq!(bytes[0], 0x01, "version byte should be 0x01");
}
#[test]
fn deserialize_entry_rejects_wrong_version() {
let mut bytes = vec![0x00; 983]; // wrong version byte
let result = deserialize_entry(&bytes);
assert!(result.is_err());
}
#[test]
fn deserialize_entry_rejects_truncated_data() {
let result = deserialize_entry(&[0x01, 0x00]); // too short
assert!(result.is_err());
}
#[test]
fn checkpoint_overwrites_previous() {
let schema = test_schema();
let ledger = SignalLedger::new(schema.clone(), Box::new(NoopWalWriter));
let storage = InMemoryBackend::new();
// First checkpoint with 5 entities
let ts = Timestamp::now();
for i in 0..5 {
ledger.record_signal("view", EntityId::new(i + 1), 1.0, ts).unwrap();
}
ledger.checkpoint(&storage, CheckpointMeta { checkpoint_time_ns: 1, wal_sequence: 10 }).unwrap();
// Second checkpoint with 3 more entities (8 total)
for i in 5..8 {
ledger.record_signal("view", EntityId::new(i + 1), 1.0, ts).unwrap();
}
ledger.checkpoint(&storage, CheckpointMeta { checkpoint_time_ns: 2, wal_sequence: 20 }).unwrap();
// Restore should have all 8 entries
let ledger2 = SignalLedger::new(schema, Box::new(NoopWalWriter));
let meta = ledger2.restore(&storage).unwrap().unwrap();
assert_eq!(meta.wal_sequence, 20);
assert_eq!(ledger2.entry_count(), 8);
}
Acceptance Criteria
SignalLedger::checkpoint()writes all entries toStorageEngineviaTag::Sigkeys in a singleWriteBatchSignalLedger::restore()reads allTag::Sigkeys and populates theDashMap- Checkpoint metadata (timestamp, WAL sequence) stored at well-known key and recoverable on restore
- Checkpoint-restore roundtrip preserves: decay scores (to 15 decimal places), windowed counts (exact), all-time counts (exact)
- Serialization format has a version byte; deserialization rejects unknown versions
- Deserialization rejects truncated or corrupt data with descriptive error
InMemoryBackendused for all tests (deterministic, no I/O)- No
unsafecode cargo clippy -- -D warningspasses- All property tests and unit tests pass
Research References
- docs/research/tidaldb_signal_ledger.md -- Section 10 (checkpoint/restore: "hot-tier state serialized to
entity_signal_stateCF every 30-60 seconds") - thoughts.md -- Part II.1 (WAL as source of truth: "everything else is derived state that can always be recomputed from events")
Spec References
- docs/specs/03-signal-system.md -- Section 3 (cold tier:
entity_signal_stateCF for crash recovery checkpoint), Section 9 (background materializer: "checkpoint hot-tier state every 30-60 seconds"), invariant INV-CR-2 (checkpoint consistency: "the hot-tier checkpoint, when restored and replayed from the checkpoint's WAL position, produces state identical to the pre-crash state"), crash recovery targets (Section 12: hot-tier restore < 10 seconds for 10M entities) - docs/specs/00-architecture-overview.md -- Section 3 (Materializer trait:
checkpoint()writes state to storage,restore()reads it back)
Implementation Notes
- The
StorageEngineis passed as&dyn StorageEngineto bothcheckpoint()andrestore(). In m1p5,TidalDBowns both theSignalLedgerand theFjallStorage. It passes the appropriate keyspace backend to checkpoint/restore. - The checkpoint writes to the same keyspace as entity metadata and events. The
Tag::Sigdiscriminant in the key encoding ensures no collisions withTag::MetaorTag::Evtkeys. - At M1 scale (100 entities, 3 signal types, ~300 entries), checkpoint serializes 300 * 983 bytes = ~295 KB. Trivially fast.
- At production scale (10M entities, 6 signal types, ~60M entries), checkpoint serializes ~60M * 983 bytes = ~59 GB. This is too large for a single batch write. However, production-scale checkpointing is an M5/M6 concern. M1's checkpoint is designed for correctness, not production scale. The batch approach works at M1 scale.
- Do NOT implement incremental/delta checkpointing. Full checkpoint on every call. Incremental checkpointing (only writing changed entries) is an optimization for M5+.
- Do NOT implement checkpoint scheduling. m1p5's
TidalDBwill callcheckpoint()on shutdown. Periodic checkpointing (every 30 seconds) is a m1p2/materializer concern. - The
scan_prefix(&[])approach for restore scans ALL keys, not justTag::Sigkeys. This is correct but not optimal -- at M1 scale it is fast. At production scale, a dedicated scan with aTag::Sig-specific prefix would be needed. This optimization is deferred.