tidaldb/tidal/src/signals/checkpoint/format.rs
jordan 192c473f55 feat: complete Milestone 5 — full-text search, RRF fusion, and creator search
- M5p1: BM25 text indexing via Tantivy with background syncer (0.26ms @ 10K docs)
- M5p2: RRF fusion layer combining BM25 + ANN scores (46µs @ 1K candidates)
- M5p3: unified Search query API (8-stage pipeline, BM25 + vector + ranking)
- M5p4: creator text + vector indexing and creator search executor (< 20ms @ 200 creators)
- Refactor db/mod.rs into focused sub-modules (creators, items, sessions, signals, etc.)
- Decompose monolithic files into directory modules (query/executor, ranking/diversity, etc.)
- Split brute.rs → brute/mod.rs + brute/tests.rs; extract search executor helpers
- Add benches: fusion, search, session, text_index
- Add M5 UAT test suites (m5_uat, m5_search, m5p4_creator_search, text_index)
- Update blog posts, roadmap, content strategy, and M5 planning docs
- Add tmp/ and .claude/worktrees/ to .gitignore

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-21 23:53:16 -07:00

370 lines
12 KiB
Rust

//! Binary entry serialization for signal checkpoint records.
//!
//! Each `EntitySignalEntry` serializes to a fixed 983-byte record containing
//! the hot-tier decay scores and warm-tier bucketed counters. All payload
//! values use little-endian byte order; a version byte at offset 0 enables
//! future backward-compatible format changes.
use crate::schema::EntityId;
use super::super::SignalTypeId;
use super::super::hot::HotSignalState;
use super::super::ledger::EntitySignalEntry;
use super::super::warm::{BucketedCounter, BucketedCounterSnapshot, HOUR_BUCKETS, MINUTE_BUCKETS};
// ── Constants ─────────────────────────────────────────────────────────────────
const VERSION: u8 = 0x01;
/// Total size of a serialized entry in bytes.
pub(super) const ENTRY_SIZE: usize = 983;
/// Bit 0 of `flags` field: velocity tracking is enabled for this signal.
const FLAG_VELOCITY_ENABLED: u16 = 0x0001;
// ── Serialization ─────────────────────────────────────────────────────────────
/// Serialize an `EntitySignalEntry` to a 983-byte buffer.
///
/// # Binary layout (all payload values little-endian)
///
/// ```text
/// Offset Size Field
/// 0 1 version (0x01)
/// 1 8 entity_id (u64 LE)
/// 9 2 signal_type_id (u16 LE)
/// 11 2 flags (u16 LE) -- bit 0: velocity_enabled
/// 13 8 last_update_ns (u64 LE)
/// 21 8 decay_score_0 (f64 bits LE)
/// 29 8 decay_score_1 (f64 bits LE)
/// 37 8 decay_score_2 (f64 bits LE)
/// 45 1 current_minute (u8)
/// 46 1 current_hour (u8)
/// 47 8 all_time_count (u64 LE)
/// 55 8 last_minute_rotation_ns (u64 LE)
/// 63 8 last_hour_rotation_ns (u64 LE)
/// 71 240 minute_buckets (60 x u32 LE)
/// 311 672 hour_buckets (168 x u32 LE)
/// Total: 983 bytes
/// ```
#[must_use]
pub fn serialize_entry(
entity_id: EntityId,
signal_type_id: SignalTypeId,
entry: &EntitySignalEntry,
) -> Vec<u8> {
let mut buf = Vec::with_capacity(ENTRY_SIZE);
// [0] version
buf.push(VERSION);
// [1..9] entity_id LE
buf.extend_from_slice(&entity_id.as_u64().to_le_bytes());
// [9..11] signal_type_id LE
buf.extend_from_slice(&signal_type_id.as_u16().to_le_bytes());
// [11..13] flags LE -- derived from hot-tier immutable fields
let flags: u16 = if entry.hot.velocity_enabled() {
FLAG_VELOCITY_ENABLED
} else {
0
};
buf.extend_from_slice(&flags.to_le_bytes());
// [13..21] last_update_ns LE
buf.extend_from_slice(&entry.hot.last_update_ns().to_le_bytes());
// [21..45] three decay scores as f64 bits LE
for i in 0..3 {
buf.extend_from_slice(&entry.hot.stored_score(i).to_bits().to_le_bytes());
}
// Snapshot warm tier (atomic reads of all bucket state)
let snap = entry.warm.snapshot();
// [45] current_minute (u8)
buf.push(snap.current_minute);
// [46] current_hour (u8)
buf.push(snap.current_hour);
// [47..55] all_time_count LE
buf.extend_from_slice(&snap.all_time_count.to_le_bytes());
// [55..63] last_minute_rotation_ns LE
buf.extend_from_slice(&snap.last_minute_rotation_ns.to_le_bytes());
// [63..71] last_hour_rotation_ns LE
buf.extend_from_slice(&snap.last_hour_rotation_ns.to_le_bytes());
// [71..311] minute_buckets (60 x u32 LE = 240 bytes)
for &bucket in &snap.minute_buckets {
buf.extend_from_slice(&bucket.to_le_bytes());
}
// [311..983] hour_buckets (168 x u32 LE = 672 bytes)
for &bucket in &snap.hour_buckets {
buf.extend_from_slice(&bucket.to_le_bytes());
}
debug_assert_eq!(buf.len(), ENTRY_SIZE, "serialize_entry produced wrong size");
buf
}
/// Deserialize an `EntitySignalEntry` from bytes.
///
/// Returns `(entity_id, signal_type_id, entry)` on success.
///
/// # Errors
///
/// Returns `Err` if:
/// - The slice is not exactly `ENTRY_SIZE` (983) bytes
/// - The version byte is not `0x01`
/// - Any sub-slice conversion fails due to offset math errors
pub fn deserialize_entry(
bytes: &[u8],
) -> Result<(EntityId, SignalTypeId, EntitySignalEntry), String> {
if bytes.len() != ENTRY_SIZE {
return Err(format!("expected {ENTRY_SIZE} bytes, got {}", bytes.len()));
}
// [0] version check
if bytes[0] != VERSION {
return Err(format!(
"unknown checkpoint version 0x{:02x}, expected 0x{:02x}",
bytes[0], VERSION
));
}
// [1..9] entity_id LE
let entity_id_val = u64::from_le_bytes(
bytes[1..9]
.try_into()
.map_err(|_| "offset math error at entity_id [1..9]".to_string())?,
);
let entity_id = EntityId::new(entity_id_val);
// [9..11] signal_type_id LE
let signal_type_id_val = u16::from_le_bytes(
bytes[9..11]
.try_into()
.map_err(|_| "offset math error at signal_type_id [9..11]".to_string())?,
);
let signal_type_id = SignalTypeId::new(signal_type_id_val);
// [11..13] flags LE
let flags = u16::from_le_bytes(
bytes[11..13]
.try_into()
.map_err(|_| "offset math error at flags [11..13]".to_string())?,
);
let velocity_enabled = (flags & FLAG_VELOCITY_ENABLED) != 0;
// [13..21] last_update_ns LE
let last_update_ns = u64::from_le_bytes(
bytes[13..21]
.try_into()
.map_err(|_| "offset math error at last_update_ns [13..21]".to_string())?,
);
// [21..45] three decay scores as f64 bits LE
let score_0 = f64::from_bits(u64::from_le_bytes(
bytes[21..29]
.try_into()
.map_err(|_| "offset math error at score_0 [21..29]".to_string())?,
));
let score_1 = f64::from_bits(u64::from_le_bytes(
bytes[29..37]
.try_into()
.map_err(|_| "offset math error at score_1 [29..37]".to_string())?,
));
let score_2 = f64::from_bits(u64::from_le_bytes(
bytes[37..45]
.try_into()
.map_err(|_| "offset math error at score_2 [37..45]".to_string())?,
));
// [45] current_minute (u8)
let current_minute = bytes[45];
// [46] current_hour (u8)
let current_hour = bytes[46];
// [47..55] all_time_count LE
let all_time_count = u64::from_le_bytes(
bytes[47..55]
.try_into()
.map_err(|_| "offset math error at all_time_count [47..55]".to_string())?,
);
// [55..63] last_minute_rotation_ns LE
let last_minute_rotation_ns = u64::from_le_bytes(
bytes[55..63]
.try_into()
.map_err(|_| "offset math error at last_minute_rotation_ns [55..63]".to_string())?,
);
// [63..71] last_hour_rotation_ns LE
let last_hour_rotation_ns = u64::from_le_bytes(
bytes[63..71]
.try_into()
.map_err(|_| "offset math error at last_hour_rotation_ns [63..71]".to_string())?,
);
// [71..311] minute_buckets (60 x u32 LE)
let mut minute_buckets = [0u32; MINUTE_BUCKETS];
for (i, bucket) in minute_buckets.iter_mut().enumerate() {
let off = 71 + i * 4;
*bucket = u32::from_le_bytes(bytes[off..off + 4].try_into().map_err(|_| {
format!(
"offset math error at minute_bucket[{i}] [{off}..{}]",
off + 4
)
})?);
}
// [311..983] hour_buckets (168 x u32 LE)
let mut hour_buckets = [0u32; HOUR_BUCKETS];
for (i, bucket) in hour_buckets.iter_mut().enumerate() {
let off = 311 + i * 4;
*bucket =
u32::from_le_bytes(bytes[off..off + 4].try_into().map_err(|_| {
format!("offset math error at hour_bucket[{i}] [{off}..{}]", off + 4)
})?);
}
// Reconstruct hot tier
let hot = HotSignalState::with_flags(entity_id_val, signal_type_id_val, velocity_enabled);
hot.restore(last_update_ns, &[score_0, score_1, score_2]);
// Reconstruct warm tier from snapshot
let warm = BucketedCounter::new();
warm.restore(&BucketedCounterSnapshot {
minute_buckets,
hour_buckets,
current_minute,
current_hour,
all_time_count,
last_minute_rotation_ns,
last_hour_rotation_ns,
});
Ok((entity_id, signal_type_id, EntitySignalEntry { hot, warm }))
}
// ── Tests ─────────────────────────────────────────────────────────────────────
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
#[test]
fn serialize_entry_version_byte() {
let entry = EntitySignalEntry {
hot: HotSignalState::new(1, 0),
warm: BucketedCounter::new(),
};
let bytes = serialize_entry(EntityId::new(1), SignalTypeId::new(0), &entry);
assert_eq!(bytes[0], 0x01, "version byte should be 0x01");
}
#[test]
fn serialize_entry_correct_length() {
let entry = EntitySignalEntry {
hot: HotSignalState::new(42, 3),
warm: BucketedCounter::new(),
};
let bytes = serialize_entry(EntityId::new(42), SignalTypeId::new(3), &entry);
assert_eq!(bytes.len(), ENTRY_SIZE);
}
#[test]
fn deserialize_entry_rejects_wrong_version() {
let bytes = vec![0x00u8; ENTRY_SIZE];
assert!(deserialize_entry(&bytes).is_err());
}
#[test]
fn deserialize_entry_rejects_truncated_data() {
let result = deserialize_entry(&[0x01, 0x00]);
assert!(result.is_err());
}
#[test]
fn deserialize_entry_rejects_wrong_length() {
let bytes = vec![0x01u8; ENTRY_SIZE - 1];
assert!(deserialize_entry(&bytes).is_err());
}
#[test]
fn serialize_deserialize_entry_roundtrip() {
let entity_id = EntityId::new(99);
let signal_type_id = SignalTypeId::new(2);
let hot = HotSignalState::with_flags(99, 2, true);
hot.restore(1_000_000_000_000, &[3.125, 2.71, 1.41]);
let warm = BucketedCounter::with_start_time(1_000_000_000_000);
warm.increment(1_000_000_000_000);
warm.increment(1_000_000_000_001);
let entry = EntitySignalEntry { hot, warm };
let bytes = serialize_entry(entity_id, signal_type_id, &entry);
assert_eq!(bytes.len(), ENTRY_SIZE);
let (eid, stid, restored) = deserialize_entry(&bytes).expect("deserialize ok");
assert_eq!(eid, entity_id);
assert_eq!(stid, signal_type_id);
assert!((restored.hot.stored_score(0) - 3.125).abs() < 1e-15);
assert!((restored.hot.stored_score(1) - 2.71).abs() < 1e-15);
assert!((restored.hot.stored_score(2) - 1.41).abs() < 1e-15);
assert_eq!(restored.hot.last_update_ns(), 1_000_000_000_000);
assert!(restored.hot.velocity_enabled());
assert_eq!(restored.warm.all_time_count(), 2);
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod proptests {
use proptest::prelude::*;
use super::*;
// Entry serialization roundtrip for arbitrary hot-tier state.
proptest! {
#[test]
fn serialize_deserialize_entry_roundtrip(
entity_id_val in 1u64..1_000_000,
signal_type_id_val in 0u16..64,
score_0 in 0.0f64..1e12,
score_1 in 0.0f64..1e12,
score_2 in 0.0f64..1e12,
last_update in 0u64..2_000_000_000_000u64,
all_time in 0u64..1_000_000,
) {
let entity_id = EntityId::new(entity_id_val);
let signal_type_id = SignalTypeId::new(signal_type_id_val);
let hot = HotSignalState::new(entity_id_val, signal_type_id_val);
hot.restore(last_update, &[score_0, score_1, score_2]);
let warm = BucketedCounter::new();
warm.increment_by(all_time as u32, 0);
let entry = EntitySignalEntry { hot, warm };
let bytes = serialize_entry(entity_id, signal_type_id, &entry);
let (eid, stid, restored) = deserialize_entry(&bytes).unwrap();
prop_assert_eq!(eid, entity_id);
prop_assert_eq!(stid, signal_type_id);
prop_assert!((restored.hot.stored_score(0) - score_0).abs() < 1e-15);
prop_assert!((restored.hot.stored_score(1) - score_1).abs() < 1e-15);
prop_assert!((restored.hot.stored_score(2) - score_2).abs() < 1e-15);
prop_assert_eq!(restored.hot.last_update_ns(), last_update);
}
}
}