stemedb/crates/stemedb-lens/src/vote_aware_consensus.rs
jordan 3320c24afa feat: WAL hardening (Phase 5B) - CRC32C, crash recovery, group commit, log rotation
Add CRC32C checksums to WAL record format (v2), implement crash recovery
with automatic truncation of corrupt records, add feature-gated group commit
buffer for batched fsync under concurrent load, and implement log rotation
via segment files with global offset addressing.

Key changes:
- Record format v2: [len:u32][crc32c:u32][blake3:32][payload:N]
- recover_file() scans and truncates corrupt tail records
- GroupCommitBuffer batches fsync via MPSC channel (tokio feature gate)
- SegmentManager with binary search resolution and cursor-based cleanup
- Journal::read() auto-refreshes segments on miss for writer/reader split
- Split recovery.rs and key_codec.rs into directory modules for 500-line max

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-02 12:36:35 -07:00

464 lines
17 KiB
Rust

//! Vote-Aware Consensus Lens: Resolves based on actual vote counts from VoteStore.
//!
//! This lens integrates with the Ballot Box pattern to use real vote counts
//! instead of grouping by object value like the basic ConsensusLens.
//!
//! # Design Philosophy
//!
//! Follows the "Deep Module" principle:
//! - Simple interface: `resolve_async(&[Assertion])` returns winner
//! - Complex implementation: Queries VoteStore, ranks by votes, handles ties
//! - O(1) vote lookups via VoteStore's cached counters
use crate::traits::{compute_conflict_score, Resolution};
use async_trait::async_trait;
use stemedb_core::types::{Assertion, Hash};
use stemedb_storage::vote_store::VoteStore;
use tracing::{debug, instrument};
/// Async version of the Lens trait for operations requiring I/O.
///
/// # Contract
///
/// - **Stateless:** AsyncLenses must not maintain internal state.
/// - **Deterministic:** Same input and storage state produces same output.
/// - **Fast:** Uses cached vote counts from VoteStore for O(1) lookups.
///
/// # Implementation Notes
///
/// AsyncLenses should handle edge cases gracefully:
/// - Empty input: Return `Resolution::empty()`
/// - Single candidate: Return that candidate (trivial resolution)
/// - Ties: Define a consistent tiebreaker (e.g., highest timestamp)
/// - Assertions with no votes: Treat as 0 votes/weight
#[async_trait]
pub trait AsyncLens: Send + Sync {
/// Resolve a set of candidate assertions into a single answer.
///
/// # Arguments
/// * `candidates` - All assertions matching the query filters
///
/// # Returns
/// A resolution containing the winning assertion (if any) and metadata.
async fn resolve_async(&self, candidates: &[Assertion]) -> Resolution;
/// Human-readable name of this lens for logging/debugging.
fn name(&self) -> &'static str;
}
/// Vote-Aware Consensus Lens: Returns the assertion with the highest vote count.
///
/// # Resolution Strategy
///
/// 1. For each candidate assertion, lookup its vote count and aggregate weight
/// 2. Rank assertions by aggregate weight (sum of all vote weights)
/// 3. Return the assertion with highest aggregate weight
/// 4. Tiebreaker: If weights are equal, prefer most recent timestamp
/// 5. Assertions with no votes are treated as having 0 weight
///
/// # Confidence Calculation
///
/// Resolution confidence is based on the winning assertion's vote proportion:
/// - `confidence = winner_weight / total_weight_across_all_candidates`
/// - If no votes exist for any candidate, confidence is 0.0
/// - If only one candidate has votes, confidence is 1.0
///
/// # Example
///
/// ```ignore
/// use stemedb_lens::VoteAwareConsensusLens;
/// use stemedb_storage::{HybridStore, GenericVoteStore};
/// use std::sync::Arc;
///
/// let store = HybridStore::open("./data").await?;
/// let vote_store = Arc::new(GenericVoteStore::new(store));
/// let lens = VoteAwareConsensusLens::new(vote_store);
///
/// let resolution = lens.resolve_async(&candidates).await?;
/// ```
pub struct VoteAwareConsensusLens<V> {
vote_store: std::sync::Arc<V>,
}
impl<V: VoteStore> VoteAwareConsensusLens<V> {
/// Create a new VoteAwareConsensusLens with the given VoteStore.
///
/// The VoteStore is wrapped in an Arc for shared ownership, allowing
/// the lens to be used in multiple contexts.
pub fn new(vote_store: std::sync::Arc<V>) -> Self {
Self { vote_store }
}
/// Compute the content-addressed hash of an assertion.
///
/// This matches the logic used by the ingestion pipeline to ensure
/// we lookup votes for the correct assertion hash.
///
/// Returns `None` if serialization fails, allowing the caller to skip
/// the candidate rather than using a potentially colliding hash.
fn compute_assertion_hash(assertion: &Assertion) -> Option<Hash> {
// Serialize using the canonical serde module, then hash.
let bytes = match stemedb_core::serde::serialize(assertion) {
Ok(b) => b,
Err(e) => {
tracing::warn!("Failed to serialize assertion for hashing: {}", e);
return None;
}
};
let hash_bytes = blake3::hash(&bytes);
Some(*hash_bytes.as_bytes())
}
}
/// Internal struct to track assertion ranking data.
#[derive(Debug)]
struct RankedAssertion<'a> {
assertion: &'a Assertion,
vote_count: u64,
aggregate_weight: f32,
}
#[async_trait]
impl<V: VoteStore + 'static> AsyncLens for VoteAwareConsensusLens<V> {
#[instrument(skip(self, candidates), fields(candidates_count = candidates.len()))]
async fn resolve_async(&self, candidates: &[Assertion]) -> Resolution {
if candidates.is_empty() {
return Resolution::empty();
}
if candidates.len() == 1 {
return Resolution::with_winner(candidates[0].clone(), 1, 1.0, 0.0);
}
// Collect vote data for all candidates
let mut ranked: Vec<RankedAssertion> = Vec::with_capacity(candidates.len());
let mut total_weight = 0.0_f32;
for assertion in candidates {
let assertion_hash = match Self::compute_assertion_hash(assertion) {
Some(hash) => hash,
None => {
// Serialization failed - skip this candidate
debug!("Skipping candidate due to serialization failure");
continue;
}
};
// Lookup vote count and aggregate weight from VoteStore
// These are O(1) operations thanks to VoteStore's cached counters
let vote_count =
match self.vote_store.get_vote_count(&assertion_hash, &assertion.subject).await {
Ok(count) => count,
Err(e) => {
debug!(
assertion_hash = %hex::encode(assertion_hash),
error = %e,
"Failed to get vote count, treating as 0"
);
0
}
};
let aggregate_weight = match self
.vote_store
.get_aggregate_weight(&assertion_hash, &assertion.subject)
.await
{
Ok(weight) => weight,
Err(e) => {
debug!(
assertion_hash = %hex::encode(assertion_hash),
error = %e,
"Failed to get aggregate weight, treating as 0.0"
);
0.0
}
};
total_weight += aggregate_weight;
ranked.push(RankedAssertion { assertion, vote_count, aggregate_weight });
}
// Sort by aggregate weight (descending), then by timestamp (descending) for ties
ranked.sort_by(|a, b| {
b.aggregate_weight
.partial_cmp(&a.aggregate_weight)
.unwrap_or(std::cmp::Ordering::Equal)
.then_with(|| b.assertion.timestamp.cmp(&a.assertion.timestamp))
});
// Select the winner (highest ranked)
if let Some(winner_ranked) = ranked.first() {
let confidence = if total_weight > 0.0 {
winner_ranked.aggregate_weight / total_weight
} else {
// No votes for any candidate
0.0
};
let conflict = compute_conflict_score(candidates);
debug!(
winner_subject = %winner_ranked.assertion.subject,
vote_count = winner_ranked.vote_count,
aggregate_weight = winner_ranked.aggregate_weight,
confidence,
conflict,
"Resolved via vote-aware consensus"
);
Resolution::with_winner(
winner_ranked.assertion.clone(),
candidates.len(),
confidence,
conflict,
)
} else {
// Should never happen since we checked for empty candidates above
Resolution::empty()
}
}
fn name(&self) -> &'static str {
"VoteAwareConsensus"
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use stemedb_core::testing::{self, AssertionBuilder};
use stemedb_core::types::Vote;
use stemedb_storage::{GenericVoteStore, HybridStore};
fn create_assertion(subject: &str, value: f64, timestamp: u64) -> Assertion {
AssertionBuilder::new().subject(subject).object_number(value).timestamp(timestamp).build()
}
fn create_vote(assertion_hash: Hash, agent_id: [u8; 32], weight: f32, timestamp: u64) -> Vote {
testing::test_vote(assertion_hash, agent_id, weight, timestamp)
}
#[tokio::test]
async fn test_empty_candidates() {
let store = Arc::new(HybridStore::open_temp().expect("Failed to create store"));
let vote_store = Arc::new(GenericVoteStore::new(store));
let lens = VoteAwareConsensusLens::new(vote_store);
let resolution = lens.resolve_async(&[]).await;
assert!(resolution.winner.is_none());
assert_eq!(resolution.candidates_count, 0);
}
#[tokio::test]
async fn test_single_candidate() {
let store = Arc::new(HybridStore::open_temp().expect("Failed to create store"));
let vote_store = Arc::new(GenericVoteStore::new(store));
let lens = VoteAwareConsensusLens::new(vote_store);
let assertion = create_assertion("Tesla", 100.0, 1000);
let resolution = lens.resolve_async(std::slice::from_ref(&assertion)).await;
assert!(resolution.winner.is_some());
assert_eq!(resolution.winner.as_ref().unwrap().subject, "Tesla");
assert!((resolution.resolution_confidence - 1.0).abs() < f32::EPSILON);
}
#[tokio::test]
async fn test_selects_highest_vote_weight() {
let store = Arc::new(HybridStore::open_temp().expect("Failed to create store"));
let vote_store = Arc::new(GenericVoteStore::new(store));
let lens = VoteAwareConsensusLens::new(Arc::clone(&vote_store));
// Create three assertions
let a1 = create_assertion("Agent1", 100.0, 1000);
let a2 = create_assertion("Agent2", 200.0, 1100);
let a3 = create_assertion("Agent3", 300.0, 1200);
// Add votes: a1 gets 0.5 weight, a2 gets 1.5 weight (winner), a3 gets 0.3 weight
let hash1 =
VoteAwareConsensusLens::<GenericVoteStore<HybridStore>>::compute_assertion_hash(&a1)
.unwrap();
let hash2 =
VoteAwareConsensusLens::<GenericVoteStore<HybridStore>>::compute_assertion_hash(&a2)
.unwrap();
let hash3 =
VoteAwareConsensusLens::<GenericVoteStore<HybridStore>>::compute_assertion_hash(&a3)
.unwrap();
vote_store
.put_vote(&create_vote(hash1, [1u8; 32], 0.5, 2000), "Agent1")
.await
.expect("put");
vote_store
.put_vote(&create_vote(hash2, [2u8; 32], 0.8, 2001), "Agent2")
.await
.expect("put");
vote_store
.put_vote(&create_vote(hash2, [3u8; 32], 0.7, 2002), "Agent2")
.await
.expect("put");
vote_store
.put_vote(&create_vote(hash3, [4u8; 32], 0.3, 2003), "Agent3")
.await
.expect("put");
let resolution = lens.resolve_async(&[a1, a2.clone(), a3]).await;
assert!(resolution.winner.is_some());
assert_eq!(resolution.winner.as_ref().unwrap().subject, "Agent2");
// Total weight = 0.5 + 0.8 + 0.7 + 0.3 = 2.3
// Winner weight = 0.8 + 0.7 = 1.5
// Confidence = 1.5 / 2.3 ≈ 0.652
let expected_confidence = 1.5 / 2.3;
assert!((resolution.resolution_confidence - expected_confidence).abs() < 0.01);
}
#[tokio::test]
async fn test_no_votes_returns_most_recent() {
let store = Arc::new(HybridStore::open_temp().expect("Failed to create store"));
let vote_store = Arc::new(GenericVoteStore::new(store));
let lens = VoteAwareConsensusLens::new(vote_store);
let old = create_assertion("Old", 100.0, 1000);
let new = create_assertion("New", 200.0, 2000);
let resolution = lens.resolve_async(&[old, new.clone()]).await;
// When no votes exist, should fall back to timestamp tiebreaker
assert!(resolution.winner.is_some());
assert_eq!(resolution.winner.as_ref().unwrap().subject, "New");
// Confidence should be 0.0 since no votes exist
assert!((resolution.resolution_confidence - 0.0).abs() < f32::EPSILON);
}
#[tokio::test]
async fn test_tie_breaking_by_timestamp() {
let store = Arc::new(HybridStore::open_temp().expect("Failed to create store"));
let vote_store = Arc::new(GenericVoteStore::new(store));
let lens = VoteAwareConsensusLens::new(Arc::clone(&vote_store));
let old = create_assertion("Old", 100.0, 1000);
let new = create_assertion("New", 200.0, 2000);
// Give both the same vote weight
let hash_old =
VoteAwareConsensusLens::<GenericVoteStore<HybridStore>>::compute_assertion_hash(&old)
.unwrap();
let hash_new =
VoteAwareConsensusLens::<GenericVoteStore<HybridStore>>::compute_assertion_hash(&new)
.unwrap();
vote_store
.put_vote(&create_vote(hash_old, [1u8; 32], 0.5, 3000), "Old")
.await
.expect("put");
vote_store
.put_vote(&create_vote(hash_new, [2u8; 32], 0.5, 3001), "New")
.await
.expect("put");
let resolution = lens.resolve_async(&[old, new.clone()]).await;
// With equal weights, should pick the newer timestamp
assert!(resolution.winner.is_some());
assert_eq!(resolution.winner.as_ref().unwrap().subject, "New");
}
#[tokio::test]
async fn test_mixed_votes_and_no_votes() {
let store = Arc::new(HybridStore::open_temp().expect("Failed to create store"));
let vote_store = Arc::new(GenericVoteStore::new(store));
let lens = VoteAwareConsensusLens::new(Arc::clone(&vote_store));
let with_votes = create_assertion("WithVotes", 100.0, 1000);
let without_votes = create_assertion("NoVotes", 200.0, 2000);
let hash_with =
VoteAwareConsensusLens::<GenericVoteStore<HybridStore>>::compute_assertion_hash(
&with_votes,
)
.unwrap();
vote_store
.put_vote(&create_vote(hash_with, [1u8; 32], 0.8, 3000), "WithVotes")
.await
.expect("put");
let resolution = lens.resolve_async(&[with_votes.clone(), without_votes]).await;
// Assertion with votes should win
assert!(resolution.winner.is_some());
assert_eq!(resolution.winner.as_ref().unwrap().subject, "WithVotes");
// Confidence should be 1.0 (all weight is on winner)
assert!((resolution.resolution_confidence - 1.0).abs() < f32::EPSILON);
}
#[tokio::test]
async fn test_lens_name() {
let store = Arc::new(HybridStore::open_temp().expect("Failed to create store"));
let vote_store = Arc::new(GenericVoteStore::new(store));
let lens = VoteAwareConsensusLens::new(vote_store);
assert_eq!(lens.name(), "VoteAwareConsensus");
}
#[tokio::test]
async fn test_many_votes_on_single_assertion() {
let store = Arc::new(HybridStore::open_temp().expect("Failed to create store"));
let vote_store = Arc::new(GenericVoteStore::new(store));
let lens = VoteAwareConsensusLens::new(Arc::clone(&vote_store));
let popular = create_assertion("Popular", 100.0, 1000);
let unpopular = create_assertion("Unpopular", 200.0, 1100);
let hash_popular =
VoteAwareConsensusLens::<GenericVoteStore<HybridStore>>::compute_assertion_hash(
&popular,
)
.unwrap();
let hash_unpopular =
VoteAwareConsensusLens::<GenericVoteStore<HybridStore>>::compute_assertion_hash(
&unpopular,
)
.unwrap();
// Popular gets 10 votes
for i in 0..10 {
let agent_id = {
let mut id = [0u8; 32];
id[0] = i;
id
};
vote_store
.put_vote(&create_vote(hash_popular, agent_id, 0.5, 2000 + i as u64), "Popular")
.await
.expect("put");
}
// Unpopular gets 1 vote
vote_store
.put_vote(&create_vote(hash_unpopular, [99u8; 32], 0.5, 2100), "Unpopular")
.await
.expect("put");
let resolution = lens.resolve_async(&[popular.clone(), unpopular]).await;
assert!(resolution.winner.is_some());
assert_eq!(resolution.winner.as_ref().unwrap().subject, "Popular");
// Total weight = 10 * 0.5 + 1 * 0.5 = 5.5
// Winner weight = 5.0
// Confidence = 5.0 / 5.5 ≈ 0.909
let expected_confidence = 5.0 / 5.5;
assert!((resolution.resolution_confidence - expected_confidence).abs() < 0.01);
}
}