use super::error::WalError; /// Magic bytes identifying a tidalDB WAL batch frame: "TIDL" in LE byte order. /// /// Stored as `[0x44, 0x4C, 0x49, 0x54]` which is `0x54494C44` as a u32 LE. /// This allows `u32::from_le_bytes(magic) == 0x54494C44` to validate. pub const MAGIC: [u8; 4] = [0x44, 0x4C, 0x49, 0x54]; /// Current wire format version. pub const FORMAT_VERSION: u8 = 1; /// Record type discriminant for signal events. pub const RECORD_TYPE_SIGNAL: u8 = 0x01; /// Size of the batch header in bytes (one cache line). pub const HEADER_SIZE: usize = 64; /// Size of a single event record in bytes. pub const EVENT_SIZE: usize = 21; /// Maximum number of events in a single batch. pub const MAX_EVENTS_PER_BATCH: u16 = 256; /// Decoded batch header. #[derive(Debug, Clone, PartialEq, Eq)] pub struct BatchHeader { pub version: u8, pub flags: u8, pub event_count: u16, pub first_seq: u64, pub batch_timestamp: u64, pub payload_len: u32, pub checksum: [u8; 32], } /// A single signal event record in wire format. #[derive(Debug, Clone, PartialEq)] pub struct EventRecord { pub entity_id: u64, pub signal_type: u8, pub weight: f32, pub timestamp_nanos: u64, } impl EventRecord { /// Serialize this event into the 21-byte wire format. #[must_use] pub fn to_bytes(&self) -> [u8; EVENT_SIZE] { let mut buf = [0u8; EVENT_SIZE]; buf[0..8].copy_from_slice(&self.entity_id.to_le_bytes()); buf[8] = self.signal_type; buf[9..13].copy_from_slice(&self.weight.to_le_bytes()); buf[13..21].copy_from_slice(&self.timestamp_nanos.to_le_bytes()); buf } /// Deserialize an event from 21 bytes of wire format. /// /// # Errors /// /// Returns `WalError::Corruption` if the slice is not exactly 21 bytes. pub fn from_bytes(bytes: &[u8]) -> Result { if bytes.len() != EVENT_SIZE { return Err(WalError::Corruption { message: format!( "event record: expected {EVENT_SIZE} bytes, got {}", bytes.len() ), }); } let entity_id = u64::from_le_bytes(bytes[0..8].try_into().map_err(|_| WalError::Corruption { message: "event record: invalid entity_id bytes".into(), })?); let signal_type = bytes[8]; let weight = f32::from_le_bytes(bytes[9..13].try_into().map_err(|_| WalError::Corruption { message: "event record: invalid weight bytes".into(), })?); let timestamp_nanos = u64::from_le_bytes(bytes[13..21].try_into().map_err(|_| WalError::Corruption { message: "event record: invalid timestamp bytes".into(), })?); Ok(Self { entity_id, signal_type, weight, timestamp_nanos, }) } } /// Encode a batch of events into the WAL wire format. /// /// Produces a byte vector containing the 64-byte header followed by /// tightly packed 21-byte event records. The BLAKE3 checksum covers /// `header[0..32] || event_bytes`. /// /// # Errors /// /// Returns `WalError::Corruption` if `events` is empty or exceeds /// `MAX_EVENTS_PER_BATCH`. pub fn encode_batch( events: &[EventRecord], first_seq: u64, batch_ts: u64, ) -> Result, WalError> { let event_count = events.len(); if event_count == 0 || event_count > usize::from(MAX_EVENTS_PER_BATCH) { return Err(WalError::Corruption { message: format!( "batch event count {event_count} out of range [1, {MAX_EVENTS_PER_BATCH}]" ), }); } let payload_len = event_count * EVENT_SIZE; let total_len = HEADER_SIZE + payload_len; let mut buf = vec![0u8; total_len]; // Write header fields [0..32] buf[0..4].copy_from_slice(&MAGIC); buf[4] = FORMAT_VERSION; buf[5] = 0; // flags: reserved #[allow(clippy::cast_possible_truncation)] let count_u16 = event_count as u16; buf[6..8].copy_from_slice(&count_u16.to_le_bytes()); buf[8..16].copy_from_slice(&first_seq.to_le_bytes()); buf[16..24].copy_from_slice(&batch_ts.to_le_bytes()); #[allow(clippy::cast_possible_truncation)] let payload_len_u32 = payload_len as u32; buf[24..28].copy_from_slice(&payload_len_u32.to_le_bytes()); // [28..32] reserved, already zeroed // Write event records starting at offset 64 for (i, event) in events.iter().enumerate() { let offset = HEADER_SIZE + i * EVENT_SIZE; buf[offset..offset + EVENT_SIZE].copy_from_slice(&event.to_bytes()); } // Compute BLAKE3 over header[0..32] || event_bytes let checksum = compute_checksum(&buf[0..32], &buf[HEADER_SIZE..]); buf[32..64].copy_from_slice(checksum.as_bytes()); Ok(buf) } /// Decode a batch from raw bytes. /// /// Two-phase validation: /// - Phase 1: magic bytes, version, payload length bounds /// - Phase 2: BLAKE3 checksum verification /// /// # Errors /// /// Returns `WalError::Corruption` on any validation failure. pub fn decode_batch(bytes: &[u8]) -> Result<(BatchHeader, Vec), WalError> { if bytes.len() < HEADER_SIZE { return Err(WalError::Corruption { message: format!( "batch too short for header: {} bytes, need {HEADER_SIZE}", bytes.len() ), }); } // Phase 1: structural validation if bytes[0..4] != MAGIC { return Err(WalError::Corruption { message: "invalid magic bytes".into(), }); } let version = bytes[4]; if version != FORMAT_VERSION { return Err(WalError::Corruption { message: format!("unsupported format version: {version}"), }); } let flags = bytes[5]; let event_count = u16::from_le_bytes(bytes[6..8].try_into().map_err(|_| WalError::Corruption { message: "invalid event_count bytes".into(), })?); if event_count == 0 || event_count > MAX_EVENTS_PER_BATCH { return Err(WalError::Corruption { message: format!("event count {event_count} out of range [1, {MAX_EVENTS_PER_BATCH}]"), }); } let first_seq = u64::from_le_bytes(bytes[8..16].try_into().map_err(|_| WalError::Corruption { message: "invalid first_seq bytes".into(), })?); let batch_timestamp = u64::from_le_bytes(bytes[16..24].try_into().map_err(|_| WalError::Corruption { message: "invalid batch_timestamp bytes".into(), })?); let payload_len = u32::from_le_bytes(bytes[24..28].try_into().map_err(|_| WalError::Corruption { message: "invalid payload_len bytes".into(), })?); let expected_payload = u32::from(event_count) * EVENT_SIZE as u32; if payload_len != expected_payload { return Err(WalError::Corruption { message: format!( "payload_len {payload_len} != event_count {event_count} * {EVENT_SIZE}" ), }); } let total_len = HEADER_SIZE + payload_len as usize; if bytes.len() < total_len { return Err(WalError::Corruption { message: format!( "batch truncated: have {} bytes, need {total_len}", bytes.len() ), }); } // Extract stored checksum let mut checksum = [0u8; 32]; checksum.copy_from_slice(&bytes[32..64]); // Phase 2: BLAKE3 verification let event_bytes = &bytes[HEADER_SIZE..total_len]; let computed = compute_checksum(&bytes[0..32], event_bytes); if computed.as_bytes() != &checksum { return Err(WalError::Corruption { message: "BLAKE3 checksum mismatch".into(), }); } // Parse event records let mut events = Vec::with_capacity(usize::from(event_count)); for i in 0..usize::from(event_count) { let offset = i * EVENT_SIZE; let event = EventRecord::from_bytes(&event_bytes[offset..offset + EVENT_SIZE])?; events.push(event); } let header = BatchHeader { version, flags, event_count, first_seq, batch_timestamp, payload_len, checksum, }; Ok((header, events)) } /// Compute the BLAKE3 checksum for a batch. /// /// Input: `header_prefix[0..32] || event_bytes`. /// The hash field at `[32..64]` is NOT part of the hash input. fn compute_checksum(header_prefix: &[u8], event_bytes: &[u8]) -> blake3::Hash { let mut hasher = blake3::Hasher::new(); hasher.update(header_prefix); hasher.update(event_bytes); hasher.finalize() } /// Compute the per-event content hash used for deduplication. /// /// Returns the first 128 bits of the BLAKE3 hash of the 21-byte event record. /// /// # Panics /// /// Cannot panic. The `expect` is on a `try_into` converting a 16-byte slice /// (from a 32-byte BLAKE3 hash) into `[u8; 16]`, which is infallible. #[must_use] pub fn event_content_hash(event: &EventRecord) -> u128 { let bytes = event.to_bytes(); let hash = blake3::hash(&bytes); let hash_bytes: &[u8; 32] = hash.as_bytes(); u128::from_le_bytes( hash_bytes[..16] .try_into() .expect("BLAKE3 hash is always 32 bytes; first 16 is infallible"), ) } #[cfg(test)] #[allow(clippy::unwrap_used)] mod tests { use super::*; fn sample_event(id: u64) -> EventRecord { EventRecord { entity_id: id, signal_type: RECORD_TYPE_SIGNAL, weight: 1.0, timestamp_nanos: 1_000_000_000, } } #[test] fn event_record_roundtrip() { let event = sample_event(42); let bytes = event.to_bytes(); assert_eq!(bytes.len(), EVENT_SIZE); let decoded = EventRecord::from_bytes(&bytes).expect("decode should succeed"); assert_eq!(decoded, event); } #[test] fn event_record_wrong_size() { let result = EventRecord::from_bytes(&[0u8; 10]); assert!(result.is_err()); } #[test] fn encode_decode_roundtrip_single() { let events = vec![sample_event(1)]; let encoded = encode_batch(&events, 100, 999).expect("encode should succeed"); assert_eq!(encoded.len(), HEADER_SIZE + EVENT_SIZE); let (header, decoded_events) = decode_batch(&encoded).expect("decode should succeed"); assert_eq!(header.version, FORMAT_VERSION); assert_eq!(header.event_count, 1); assert_eq!(header.first_seq, 100); assert_eq!(header.batch_timestamp, 999); assert_eq!(header.payload_len, EVENT_SIZE as u32); assert_eq!(decoded_events.len(), 1); assert_eq!(decoded_events[0], events[0]); } #[test] fn encode_decode_roundtrip_multi() { let events: Vec = (0..50).map(sample_event).collect(); let encoded = encode_batch(&events, 1, 42).expect("encode should succeed"); let (header, decoded) = decode_batch(&encoded).expect("decode should succeed"); assert_eq!(header.event_count, 50); assert_eq!(decoded.len(), 50); for (original, decoded_ev) in events.iter().zip(decoded.iter()) { assert_eq!(original, decoded_ev); } } #[test] fn encode_empty_batch_fails() { let result = encode_batch(&[], 1, 1); assert!(result.is_err()); } #[test] fn encode_oversized_batch_fails() { let events: Vec = (0..=u64::from(MAX_EVENTS_PER_BATCH)) .map(sample_event) .collect(); let result = encode_batch(&events, 1, 1); assert!(result.is_err()); } #[test] fn corrupt_payload_byte_fails_blake3() { let events = vec![sample_event(1), sample_event(2)]; let mut encoded = encode_batch(&events, 1, 1).expect("encode should succeed"); // Flip a byte in the payload (event data area) let payload_offset = HEADER_SIZE + 5; encoded[payload_offset] ^= 0xFF; let result = decode_batch(&encoded); assert!(result.is_err()); let err_msg = result.expect_err("should fail").to_string(); assert!(err_msg.contains("checksum")); } #[test] fn corrupt_header_field_fails_blake3() { let events = vec![sample_event(1)]; let mut encoded = encode_batch(&events, 1, 1).expect("encode should succeed"); // Corrupt the first_seq field in the header (byte 8) encoded[10] ^= 0xFF; let result = decode_batch(&encoded); assert!(result.is_err()); } #[test] fn invalid_magic_detected() { let events = vec![sample_event(1)]; let mut encoded = encode_batch(&events, 1, 1).expect("encode should succeed"); encoded[0] = 0xFF; // corrupt magic let result = decode_batch(&encoded); assert!(result.is_err()); let err_msg = result.expect_err("should fail").to_string(); assert!(err_msg.contains("magic")); } #[test] fn truncated_header_detected() { let result = decode_batch(&[0u8; 32]); assert!(result.is_err()); } #[test] fn truncated_payload_detected() { let events = vec![sample_event(1)]; let encoded = encode_batch(&events, 1, 1).expect("encode should succeed"); // Truncate: give header but only partial payload let result = decode_batch(&encoded[..HEADER_SIZE + 5]); assert!(result.is_err()); } #[test] fn magic_bytes_are_tidl() { // Verify 0x54494C44 LE = "TIDL" assert_eq!(u32::from_le_bytes(MAGIC), 0x5449_4C44); } #[test] fn event_content_hash_deterministic() { let event = sample_event(42); let h1 = event_content_hash(&event); let h2 = event_content_hash(&event); assert_eq!(h1, h2); } #[test] fn event_content_hash_differs_for_different_events() { let h1 = event_content_hash(&sample_event(1)); let h2 = event_content_hash(&sample_event(2)); assert_ne!(h1, h2); } #[test] fn header_size_is_cache_line() { assert_eq!(HEADER_SIZE, 64); } #[test] fn event_size_is_21() { assert_eq!(EVENT_SIZE, 21); } mod proptests { use super::*; use proptest::prelude::*; fn arb_event() -> impl Strategy { (any::(), any::(), any::(), any::()).prop_map( |(entity_id, signal_type, weight, timestamp_nanos)| EventRecord { entity_id, signal_type, weight, timestamp_nanos, }, ) } proptest! { #[test] fn event_roundtrip(event in arb_event()) { let bytes = event.to_bytes(); let decoded = EventRecord::from_bytes(&bytes)?; prop_assert_eq!(decoded.entity_id, event.entity_id); prop_assert_eq!(decoded.signal_type, event.signal_type); // f32 NaN != NaN, so compare bits prop_assert_eq!( decoded.weight.to_bits(), event.weight.to_bits() ); prop_assert_eq!(decoded.timestamp_nanos, event.timestamp_nanos); } #[test] fn batch_roundtrip( events in proptest::collection::vec(arb_event(), 1..=100), first_seq in any::(), batch_ts in any::(), ) { let encoded = encode_batch(&events, first_seq, batch_ts)?; let (header, decoded) = decode_batch(&encoded)?; prop_assert_eq!(header.first_seq, first_seq); prop_assert_eq!(header.batch_timestamp, batch_ts); prop_assert_eq!(decoded.len(), events.len()); for (orig, dec) in events.iter().zip(decoded.iter()) { prop_assert_eq!(orig.entity_id, dec.entity_id); prop_assert_eq!(orig.signal_type, dec.signal_type); prop_assert_eq!(orig.weight.to_bits(), dec.weight.to_bits()); prop_assert_eq!(orig.timestamp_nanos, dec.timestamp_nanos); } } #[test] fn corrupt_any_payload_byte_fails( events in proptest::collection::vec(arb_event(), 1..=50), corrupt_offset in 0usize..1050, ) { let encoded = encode_batch(&events, 1, 1)?; let payload_start = HEADER_SIZE; let payload_end = encoded.len(); let payload_size = payload_end - payload_start; if payload_size == 0 { return Ok(()); } let actual_offset = payload_start + (corrupt_offset % payload_size); let mut corrupted = encoded; corrupted[actual_offset] ^= 0xFF; let result = decode_batch(&corrupted); prop_assert!(result.is_err()); } } } }