stemedb/crates/stemedb-storage/src/key_codec/mod.rs
jordan 2b0923f20e feat: Distributed replication foundation (Phase 6A) - HLC, Merkle trees, CRDT stores, sync protocol
- Add Hybrid Logical Clock (HLC) for causality tracking across nodes
- Implement Merkle tree for efficient diff/sync with BLAKE3 hashing
- Add CRDT-aware stores for assertions and votes with vector clocks
- Create stemedb-sync crate with anti-entropy and gossip protocols
- Add stemedb-rpc crate with gRPC sync service (proto definitions)
- Implement SupersessionChain for tracking assertion lifecycles
- Add Aphoria application for code analysis/reporting
- Add battery11 replication test scaffolding
- Fix .gitignore to exclude nested target directories

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-02 19:31:54 -07:00

422 lines
14 KiB
Rust

//! Central key encoding/decoding for subject-prefix range sharding.
//!
//! ALL storage keys flow through this module. Keys are partitioned into two families:
//!
//! **Subject-prefixed keys** — co-located by subject for range sharding:
//! ```text
//! {subject}\x00{TAG}:{suffix}
//! ```
//!
//! **Global keys** — metadata, trust, quotas, epochs (sort first under `\x00`):
//! ```text
//! \x00{TAG}:{suffix}
//! ```
//!
//! A prefix scan on `{subject}\x00` returns ALL data for that subject.
//! A prefix scan on `\x00` returns ALL global metadata.
use crate::error::{Result, StorageError};
/// Separator byte between subject and tag. Also serves as global key prefix.
pub const SEPARATOR: u8 = 0x00;
// ── Subject validation ──────────────────────────────────────────────
/// Validate that a subject string does not contain the separator byte.
///
/// Subjects containing `\x00` would corrupt key boundaries. This MUST be
/// called on all inbound subjects at the ingestion boundary.
pub fn validate_subject(subject: &str) -> Result<()> {
if subject.as_bytes().contains(&SEPARATOR) {
return Err(StorageError::InputValidation(
"Subject must not contain null byte (\\x00)".to_string(),
));
}
if subject.is_empty() {
return Err(StorageError::InputValidation("Subject must not be empty".to_string()));
}
Ok(())
}
// ── Key builders ────────────────────────────────────────────────────
/// Build a subject-prefixed key: `{subject}\x00{tag}{suffix}`.
fn subject_key(subject: &str, tag: &[u8], suffix: &[u8]) -> Vec<u8> {
let mut key = Vec::with_capacity(subject.len() + 1 + tag.len() + suffix.len());
key.extend_from_slice(subject.as_bytes());
key.push(SEPARATOR);
key.extend_from_slice(tag);
key.extend_from_slice(suffix);
key
}
/// Build a global key: `\x00{tag}{suffix}`.
fn global_key(tag: &[u8], suffix: &[u8]) -> Vec<u8> {
let mut key = Vec::with_capacity(1 + tag.len() + suffix.len());
key.push(SEPARATOR);
key.extend_from_slice(tag);
key.extend_from_slice(suffix);
key
}
// ── Subject-prefixed keys ───────────────────────────────────────────
/// Assertion key: `{subject}\x00H:{hash_hex}`
pub fn assertion_key(subject: &str, hash_hex: &str) -> Vec<u8> {
subject_key(subject, b"H:", hash_hex.as_bytes())
}
/// Subject index key: `{subject}\x00S:`
pub fn subject_index_key(subject: &str) -> Vec<u8> {
subject_key(subject, b"S:", b"")
}
/// Subject+predicate index key: `{subject}\x00SP:{predicate}`
pub fn subject_predicate_key(subject: &str, predicate: &str) -> Vec<u8> {
subject_key(subject, b"SP:", predicate.as_bytes())
}
/// Materialized view key: `{subject}\x00MV:{predicate}`
pub fn mv_key(subject: &str, predicate: &str) -> Vec<u8> {
subject_key(subject, b"MV:", predicate.as_bytes())
}
/// Vote key: `{subject}\x00V:{assert_hex}:{vote_hex}`
pub fn vote_key(subject: &str, assertion_hex: &str, vote_hex: &str) -> Vec<u8> {
let suffix = format!("{}:{}", assertion_hex, vote_hex);
subject_key(subject, b"V:", suffix.as_bytes())
}
/// Vote scan prefix: `{subject}\x00V:{assert_hex}:`
pub fn vote_scan_prefix(subject: &str, assertion_hex: &str) -> Vec<u8> {
let suffix = format!("{}:", assertion_hex);
subject_key(subject, b"V:", suffix.as_bytes())
}
/// Vote count cache key: `{subject}\x00VC:{assert_hex}`
pub fn vote_count_key(subject: &str, assertion_hex: &str) -> Vec<u8> {
subject_key(subject, b"VC:", assertion_hex.as_bytes())
}
/// Vote weight cache key: `{subject}\x00VW:{assert_hex}`
pub fn vote_weight_key(subject: &str, assertion_hex: &str) -> Vec<u8> {
subject_key(subject, b"VW:", assertion_hex.as_bytes())
}
/// Vote count scan prefix: `{subject}\x00VC:` - for scanning all vote counts under a subject.
pub fn vote_count_prefix(subject: &str) -> Vec<u8> {
subject_key(subject, b"VC:", b"")
}
/// Assertion scan prefix: `{subject}\x00H:` - for scanning all assertions under a subject.
pub fn assertion_prefix(subject: &str) -> Vec<u8> {
subject_key(subject, b"H:", b"")
}
/// Gold standard key: `{subject}\x00GS:{predicate}`
pub fn gold_standard_key(subject: &str, predicate: &str) -> Vec<u8> {
subject_key(subject, b"GS:", predicate.as_bytes())
}
/// Subject+predicate scan prefix: `{subject}\x00SP:` — returns all SP keys for a subject.
pub fn subject_predicate_scan_prefix(subject: &str) -> Vec<u8> {
subject_key(subject, b"SP:", b"")
}
/// Subject scan prefix: `{subject}\x00` — returns ALL data for a subject.
pub fn subject_scan_prefix(subject: &str) -> Vec<u8> {
let mut key = Vec::with_capacity(subject.len() + 1);
key.extend_from_slice(subject.as_bytes());
key.push(SEPARATOR);
key
}
// ── Global keys ─────────────────────────────────────────────────────
/// Trust rank key: `\x00TRUST:{agent_id_hex}`
pub fn trust_rank_key(agent_id_hex: &str) -> Vec<u8> {
global_key(b"TRUST:", agent_id_hex.as_bytes())
}
/// Quota record key: `\x00QUOTA:{agent_hex}:{window}`
pub fn quota_key(agent_hex: &str, window: u64) -> Vec<u8> {
let suffix = format!("{}:{}", agent_hex, window);
global_key(b"QUOTA:", suffix.as_bytes())
}
/// Quota limit key: `\x00QLIMIT:{agent_id_hex}`
pub fn quota_limit_key(agent_id_hex: &str) -> Vec<u8> {
global_key(b"QLIMIT:", agent_id_hex.as_bytes())
}
/// Epoch key: `\x00E:{epoch_id_hex}`
pub fn epoch_key(epoch_id_hex: &str) -> Vec<u8> {
global_key(b"E:", epoch_id_hex.as_bytes())
}
/// Superseded marker key: `\x00SUPERSEDED:{epoch_id_hex}`
pub fn superseded_key(epoch_id_hex: &str) -> Vec<u8> {
global_key(b"SUPERSEDED:", epoch_id_hex.as_bytes())
}
/// Supersession record key: `\x00SUP:{target_hash_hex}`
pub fn supersession_key(target_hash_hex: &str) -> Vec<u8> {
global_key(b"SUP:", target_hash_hex.as_bytes())
}
/// Supersession agent index key: `\x00SUP:IDX:{agent_hex}:{ts_be_bytes}`
pub fn supersession_index_key(agent_hex: &str, timestamp_be_bytes: &[u8]) -> Vec<u8> {
let mut suffix = Vec::with_capacity(agent_hex.len() + 1 + timestamp_be_bytes.len());
suffix.extend_from_slice(agent_hex.as_bytes());
suffix.push(b':');
suffix.extend_from_slice(timestamp_be_bytes);
global_key(b"SUP:IDX:", &suffix)
}
/// Supersession agent scan prefix: `\x00SUP:IDX:{agent_hex}:`
pub fn supersession_index_prefix(agent_hex: &str) -> Vec<u8> {
let suffix = format!("{}:", agent_hex);
global_key(b"SUP:IDX:", suffix.as_bytes())
}
/// Audit record key: `\x00AUD:{query_id_hex}`
pub fn audit_key(query_id_hex: &str) -> Vec<u8> {
global_key(b"AUD:", query_id_hex.as_bytes())
}
/// Audit agent index key: `\x00AUDA:{agent_hex}:{timestamp_hex}:{query_hex}`
pub fn audit_agent_index_key(agent_hex: &str, timestamp_hex: &str, query_hex: &str) -> Vec<u8> {
let suffix = format!("{}:{}:{}", agent_hex, timestamp_hex, query_hex);
global_key(b"AUDA:", suffix.as_bytes())
}
/// Audit agent scan prefix: `\x00AUDA:{agent_hex}:`
pub fn audit_agent_prefix(agent_hex: &str) -> Vec<u8> {
let suffix = format!("{}:", agent_hex);
global_key(b"AUDA:", suffix.as_bytes())
}
/// Audit listing prefix: `\x00AUD:`
pub fn audit_scan_prefix() -> Vec<u8> {
global_key(b"AUD:", b"")
}
/// Escalation key: `\x00ESC:{timestamp}:{id_hex}`
pub fn escalation_key(timestamp: u64, id_hex: &str) -> Vec<u8> {
let suffix = format!("{}:{}", timestamp, id_hex);
global_key(b"ESC:", suffix.as_bytes())
}
/// Escalation scan prefix: `\x00ESC:`
pub fn escalation_scan_prefix() -> Vec<u8> {
global_key(b"ESC:", b"")
}
/// Trust pack key: `\x00TP:{pack_id_bytes}`
pub fn trust_pack_key(pack_id: &[u8]) -> Vec<u8> {
global_key(b"TP:", pack_id)
}
/// Trust pack scan prefix: `\x00TP:`
pub fn trust_pack_scan_prefix() -> Vec<u8> {
global_key(b"TP:", b"")
}
/// Gold standard verified key: `\x00GS_VERIFIED:{agent_hex}:{subject}:{predicate}`
pub fn gs_verified_key(agent_hex: &str, subject: &str, predicate: &str) -> Vec<u8> {
let suffix = format!("{}:{}:{}", agent_hex, subject, predicate);
global_key(b"GS_VERIFIED:", suffix.as_bytes())
}
/// Cursor key: `\x00META:cursor:ingest`
pub fn cursor_key() -> Vec<u8> {
global_key(b"META:cursor:ingest", b"")
}
/// Assertion count key: `\x00META:assertion_count`
pub fn assertion_count_key() -> Vec<u8> {
global_key(b"META:assertion_count", b"")
}
/// Trust rank scan prefix for decay: `\x00TRUST:`
pub fn trust_rank_scan_prefix() -> Vec<u8> {
global_key(b"TRUST:", b"")
}
// ── Secondary indexes ───────────────────────────────────────────────
/// Known subjects index key: `\x00SUBJECTS:{subject}`
pub fn subjects_index_key(subject: &str) -> Vec<u8> {
global_key(b"SUBJECTS:", subject.as_bytes())
}
/// Known subjects scan prefix: `\x00SUBJECTS:`
pub fn subjects_scan_prefix() -> Vec<u8> {
global_key(b"SUBJECTS:", b"")
}
/// Gold standard listing index: `\x00GS_LIST:{subject}:{predicate}`
pub fn gs_list_key(subject: &str, predicate: &str) -> Vec<u8> {
let suffix = format!("{}:{}", subject, predicate);
global_key(b"GS_LIST:", suffix.as_bytes())
}
/// Gold standard listing scan prefix: `\x00GS_LIST:`
pub fn gs_list_scan_prefix() -> Vec<u8> {
global_key(b"GS_LIST:", b"")
}
/// Hash-to-subject reverse index: `\x00HASH_SUBJECT:{hash_hex}`
pub fn hash_subject_key(hash_hex: &str) -> Vec<u8> {
global_key(b"HASH_SUBJECT:", hash_hex.as_bytes())
}
// ── Vector Index Persistence ─────────────────────────────────────────
//
// These keys are reserved for KV-backed cursor persistence (future phase).
// Currently, PersistentVectorIndex stores version in filename and cursors
// are rebuilt from WAL replay.
/// Vector index metadata key: `\x00VI:meta`
#[allow(dead_code)]
pub fn vi_meta_key() -> Vec<u8> {
global_key(b"VI:meta", b"")
}
/// Vector index hot cursor key: `\x00VI:hot_cursor`
///
/// Stores the WAL offset from which the hot index should replay on restart.
#[allow(dead_code)]
pub fn vi_hot_cursor_key() -> Vec<u8> {
global_key(b"VI:hot_cursor", b"")
}
/// Vector index cold version key: `\x00VI:cold_version`
///
/// Stores the version number of the current cold index snapshot.
#[allow(dead_code)]
pub fn vi_cold_version_key() -> Vec<u8> {
global_key(b"VI:cold_version", b"")
}
// ── Visual Index Persistence ─────────────────────────────────────────
/// Visual index metadata key: `\x00VH:meta`
#[allow(dead_code)]
pub fn vh_meta_key() -> Vec<u8> {
global_key(b"VH:meta", b"")
}
// ── Concept Alias Keys ───────────────────────────────────────────────
/// Alias forward key: `\x00CA:{alias_path}`
///
/// Maps an alias path to its canonical ConceptPath.
pub fn alias_key(alias_path: &str) -> Vec<u8> {
global_key(b"CA:", alias_path.as_bytes())
}
/// Alias reverse key: `\x00CAR:{canonical_path}`
///
/// Maps a canonical path to all alias paths (stored as Vec<String>).
pub fn alias_reverse_key(canonical_path: &str) -> Vec<u8> {
global_key(b"CAR:", canonical_path.as_bytes())
}
/// Alias scan prefix: `\x00CA:`
///
/// Used to list all aliases in the store.
pub fn alias_scan_prefix() -> Vec<u8> {
global_key(b"CA:", b"")
}
// ── Key extraction / parsing ────────────────────────────────────────
/// Extract subject from a `\x00SUBJECTS:{subject}` key.
///
/// Returns the subject string, or `None` if the key doesn't match the expected format.
pub fn extract_subject_from_subjects_key(key: &[u8]) -> Option<String> {
let prefix = b"\x00SUBJECTS:";
if key.starts_with(prefix) {
std::str::from_utf8(&key[prefix.len()..]).ok().map(|s| s.to_string())
} else {
None
}
}
/// Extract subject and predicate from a `{subject}\x00SP:{predicate}` key.
///
/// Returns `(subject, predicate)` or `None` if the key doesn't match.
pub fn extract_sp_key(key: &[u8]) -> Option<(String, String)> {
// Find the \x00 separator
let sep_pos = memchr::memchr(SEPARATOR, key)?;
if sep_pos == 0 {
return None; // Global key, not subject-prefixed
}
let subject = std::str::from_utf8(&key[..sep_pos]).ok()?;
let after_sep = &key[sep_pos + 1..];
// Check for SP: tag
if !after_sep.starts_with(b"SP:") {
return None;
}
let predicate = std::str::from_utf8(&after_sep[3..]).ok()?;
if subject.is_empty() || predicate.is_empty() {
return None;
}
Some((subject.to_string(), predicate.to_string()))
}
/// Extract the tag portion from a key (the part after the separator).
///
/// For subject-prefixed keys: returns bytes after `{subject}\x00`
/// For global keys: returns bytes after `\x00`
pub fn extract_tag(key: &[u8]) -> &[u8] {
if key.first() == Some(&SEPARATOR) {
// Global key: \x00TAG:rest
&key[1..]
} else if let Some(pos) = memchr::memchr(SEPARATOR, key) {
// Subject-prefixed: subject\x00TAG:rest
&key[pos + 1..]
} else {
key
}
}
/// Check if a key is a global key (starts with `\x00`).
pub fn is_global_key(key: &[u8]) -> bool {
key.first() == Some(&SEPARATOR)
}
/// Extract the subject from a subject-prefixed key.
///
/// Returns `None` for global keys or keys without a separator.
pub fn extract_subject(key: &[u8]) -> Option<&str> {
if is_global_key(key) {
return None;
}
if let Some(pos) = memchr::memchr(SEPARATOR, key) {
std::str::from_utf8(&key[..pos]).ok()
} else {
None
}
}
/// Extract alias path from a `\x00CA:{alias_path}` key.
///
/// Returns the alias path string, or `None` if the key doesn't match the expected format.
pub fn extract_alias_path(key: &[u8]) -> Option<String> {
let prefix = b"\x00CA:";
if key.starts_with(prefix) {
std::str::from_utf8(&key[prefix.len()..]).ok().map(|s| s.to_string())
} else {
None
}
}
#[cfg(test)]
mod tests;