13 KiB
Task 09: Cross-Session Aggregation Query
Delivers
db.user_session_summary(user_id, since) API that scans closed session archives and returns an aggregate view of a user's session history: session count, total signals, total rejections, top signal types, and preference drift (cosine distance between earliest and latest preference vectors). This enables agent orchestrators to assess how a user's taste profile has evolved across sessions.
Complexity: M
Dependencies
- task-01 complete (establishes instrumentation pattern)
tidal/src/session/types.rs--SessionId,SessionSummarytidal/src/session/snapshot.rs--SessionSnapshot,SessionContexttidal/src/session/signal_state.rs--SessionSignalStateper-signal datatidal/src/entities/preference.rs--PreferenceVectorsfor drift computationtidal/src/db/mod.rs--TidalDb.closed_sessionsDashMap
Technical Design
1. Type definitions
Add to tidal/src/db/export.rs (or a new tidal/src/db/aggregation.rs):
use std::collections::HashMap;
/// Aggregate summary of a user's session history.
///
/// Computed by scanning `closed_sessions` in memory. Only sessions
/// that have been closed during the current process lifetime are visible.
/// Persistent session archive scanning is deferred to M8.
///
/// # Examples
///
/// ```ignore
/// let summary = db.user_session_summary(user_id, one_week_ago_ns)?;
/// println!("Sessions: {}", summary.sessions_count);
/// println!("Top signal: {:?}", summary.top_signal_types.first());
/// if let Some(drift) = summary.preference_drift {
/// println!("Preference drift: {drift:.4}");
/// }
/// ```
#[derive(Debug, Clone)]
pub struct UserSessionSummary {
/// Number of closed sessions for this user in the time range.
pub sessions_count: u64,
/// Total signal writes across all matching sessions.
pub total_signals: u64,
/// Total policy rejections across all matching sessions.
pub total_rejections: u64,
/// Top signal types by frequency, sorted descending.
/// Each entry is `(signal_type_name, count)`. Limited to top 10.
pub top_signal_types: Vec<(String, u64)>,
/// Cosine distance between the user's preference vector at the
/// earliest matching session and the current preference vector.
///
/// `None` if no preference vector data is available (user has no
/// embedding-based interactions, or fewer than 2 sessions exist).
///
/// Range: `[0.0, 2.0]` where 0.0 = identical, 2.0 = opposite.
/// Computed as `1.0 - cosine_similarity`.
pub preference_drift: Option<f64>,
/// The user ID this summary is for.
pub user_id: u64,
/// Nanosecond timestamp of the `since` filter applied.
pub since_ns: u64,
/// Nanosecond timestamp of the earliest matching session start.
/// `None` if no sessions matched.
pub earliest_session_ns: Option<u64>,
/// Nanosecond timestamp of the latest matching session close.
/// `None` if no sessions matched.
pub latest_session_ns: Option<u64>,
}
2. Implementation
impl TidalDb {
/// Compute an aggregate summary of a user's closed session history.
///
/// Scans `closed_sessions` (in-memory DashMap) for sessions belonging
/// to `user_id` that started at or after `since_ns`. Returns aggregate
/// counts and preference drift.
///
/// # Current Limitation
///
/// Only sessions closed during the current process lifetime are visible.
/// Sessions from previous runs that were archived to persistent storage
/// are not scanned. This will be addressed in M8 when cross-node session
/// aggregation requires persistent archive reads.
///
/// # Errors
///
/// Returns `TidalError::NotFound` if no closed sessions exist for the user.
pub fn user_session_summary(
&self,
user_id: u64,
since_ns: u64,
) -> crate::Result<UserSessionSummary> {
// Implementation outline:
// 1. Iterate closed_sessions DashMap
// 2. Filter by user_id and started_at_ns >= since_ns
// 3. Accumulate totals and signal type frequencies
// 4. Compute preference drift via cosine distance
// 5. Return UserSessionSummary
}
}
3. Session scanning logic
let mut sessions_count: u64 = 0;
let mut total_signals: u64 = 0;
let mut total_rejections: u64 = 0;
let mut signal_freq: HashMap<String, u64> = HashMap::new();
let mut earliest_ns: Option<u64> = None;
let mut latest_ns: Option<u64> = None;
for entry in self.closed_sessions.iter() {
let snapshot = entry.value();
if snapshot.user_id != user_id {
continue;
}
if snapshot.started_at_ns < since_ns {
continue;
}
sessions_count += 1;
total_signals += snapshot.signals_written;
total_rejections += snapshot.rejections;
// Accumulate signal type frequencies from snapshot
for (signal_name, signal_state) in &snapshot.signal_states {
*signal_freq.entry(signal_name.clone()).or_insert(0) += signal_state.count;
}
// Track time range
earliest_ns = Some(earliest_ns.map_or(snapshot.started_at_ns, |e| e.min(snapshot.started_at_ns)));
latest_ns = Some(latest_ns.map_or(snapshot.closed_at_ns, |l| l.max(snapshot.closed_at_ns)));
}
if sessions_count == 0 {
return Err(TidalError::NotFound {
kind: EntityKind::User,
id: EntityId::new(user_id),
});
}
4. Top signal types
let mut top_signal_types: Vec<(String, u64)> = signal_freq.into_iter().collect();
top_signal_types.sort_by(|a, b| b.1.cmp(&a.1));
top_signal_types.truncate(10);
5. Preference drift computation
Cosine distance between the user's current preference vector and a baseline. The baseline is the preference vector at the time of the earliest matching session. Since we do not persist historical preference vectors, we approximate drift using the current preference vector and a zero vector (fresh user):
let preference_drift = self.preference_vectors
.get(user_id)
.map(|current_vec| {
// Cosine distance from the origin (zero vector) = 1.0 - 0.0 = 1.0
// This is not useful. Instead, compute magnitude as a proxy for drift.
//
// Better approach: if we have the preference vector from the earliest
// session snapshot, use that as baseline.
//
// For now, if earliest and latest sessions both have preference snapshots,
// compute cosine distance between them.
let norm = current_vec.iter().map(|x| x * x).sum::<f32>().sqrt();
if norm < f32::EPSILON {
None
} else {
// Without historical snapshots, report None.
// M8 will store per-session preference snapshots.
None
}
})
.flatten();
If SessionSnapshot already stores a preference vector snapshot (from the close_session hook that calls apply_session_preference_update), use that for drift computation:
let preference_drift = if let (Some(earliest_snap), Some(latest_snap)) = (earliest_snapshot, latest_snapshot) {
if let (Some(early_pref), Some(late_pref)) = (&earliest_snap.preference_vector, &latest_snap.preference_vector) {
Some(cosine_distance(early_pref, late_pref))
} else {
None
}
} else {
None
};
/// Cosine distance: 1.0 - cosine_similarity.
/// Returns 0.0 for identical vectors, up to 2.0 for opposite vectors.
fn cosine_distance(a: &[f32], b: &[f32]) -> f64 {
debug_assert_eq!(a.len(), b.len());
let mut dot = 0.0_f64;
let mut norm_a = 0.0_f64;
let mut norm_b = 0.0_f64;
for (x, y) in a.iter().zip(b.iter()) {
let xf = f64::from(*x);
let yf = f64::from(*y);
dot += xf * yf;
norm_a += xf * xf;
norm_b += yf * yf;
}
let denom = norm_a.sqrt() * norm_b.sqrt();
if denom < f64::EPSILON {
return 0.0;
}
1.0 - (dot / denom)
}
6. Module wiring
In tidal/src/db/mod.rs, if using a separate file:
pub(crate) mod aggregation;
Re-export from tidal/src/lib.rs:
pub use db::aggregation::UserSessionSummary;
// or from db::export if co-located:
pub use db::export::UserSessionSummary;
Acceptance Criteria
UserSessionSummarystruct withsessions_count,total_signals,total_rejections,top_signal_types,preference_drift,user_id,since_ns,earliest_session_ns,latest_session_nsdb.user_session_summary(user_id, since_ns) -> Result<UserSessionSummary>- Scans
closed_sessionsDashMap filtered by user_id and started_at_ns top_signal_typessorted descending by count, limited to top 10preference_driftcomputed as cosine distance when preference vector snapshots are available- Returns
TidalError::NotFoundwhen no matching sessions exist sessions_countmatches the number of closed sessions for the user in rangetotal_signalsandtotal_rejectionsare correct sumsearliest_session_nsandlatest_session_nscorrectly track time bounds- Type re-exported from
lib.rs cargo clippy -D warningsandcargo fmt --checkpass
Test Strategy
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn cosine_distance_identical_vectors() {
let a = vec![1.0_f32, 0.0, 0.0];
let b = vec![1.0_f32, 0.0, 0.0];
let dist = cosine_distance(&a, &b);
assert!(dist.abs() < 1e-10);
}
#[test]
fn cosine_distance_orthogonal_vectors() {
let a = vec![1.0_f32, 0.0];
let b = vec![0.0_f32, 1.0];
let dist = cosine_distance(&a, &b);
assert!((dist - 1.0).abs() < 1e-10);
}
#[test]
fn cosine_distance_opposite_vectors() {
let a = vec![1.0_f32, 0.0];
let b = vec![-1.0_f32, 0.0];
let dist = cosine_distance(&a, &b);
assert!((dist - 2.0).abs() < 1e-10);
}
#[test]
fn cosine_distance_zero_vector() {
let a = vec![0.0_f32, 0.0];
let b = vec![1.0_f32, 0.0];
let dist = cosine_distance(&a, &b);
assert!(dist.abs() < 1e-10); // Convention: zero distance for zero vector
}
}
Integration test:
#[test]
fn user_session_summary_aggregates_correctly() {
let db = make_test_db_with_sessions_schema();
let user_id = 42u64;
// Create and close 3 sessions with signals
for i in 0..3 {
let sid = db.start_session(user_id, &AgentId::new("test").unwrap(), "default").unwrap();
db.session_signal(sid, "view", EntityId::new(i * 10 + 1), 1.0, Timestamp::now()).unwrap();
db.session_signal(sid, "view", EntityId::new(i * 10 + 2), 1.0, Timestamp::now()).unwrap();
db.session_signal(sid, "like", EntityId::new(i * 10 + 3), 1.0, Timestamp::now()).unwrap();
db.close_session(sid).unwrap();
}
let summary = db.user_session_summary(user_id, 0).unwrap();
assert_eq!(summary.sessions_count, 3);
assert_eq!(summary.total_signals, 9); // 3 per session
assert_eq!(summary.total_rejections, 0);
assert_eq!(summary.user_id, user_id);
// Top signal types: "view" should be first (6 total), then "like" (3 total)
assert_eq!(summary.top_signal_types[0].0, "view");
assert_eq!(summary.top_signal_types[0].1, 6);
assert_eq!(summary.top_signal_types[1].0, "like");
assert_eq!(summary.top_signal_types[1].1, 3);
}
#[test]
fn user_session_summary_since_filter() {
let db = make_test_db_with_sessions_schema();
let user_id = 42u64;
// Session 1: old
let sid = db.start_session(user_id, &AgentId::new("test").unwrap(), "default").unwrap();
db.close_session(sid).unwrap();
let midpoint = Timestamp::now().as_nanos();
// Session 2: new
let sid = db.start_session(user_id, &AgentId::new("test").unwrap(), "default").unwrap();
db.session_signal(sid, "view", EntityId::new(1), 1.0, Timestamp::now()).unwrap();
db.close_session(sid).unwrap();
let summary = db.user_session_summary(user_id, midpoint).unwrap();
assert_eq!(summary.sessions_count, 1); // only the session after midpoint
}
#[test]
fn user_session_summary_no_sessions_returns_not_found() {
let db = make_test_db_with_sessions_schema();
let result = db.user_session_summary(999, 0);
assert!(matches!(result, Err(TidalError::NotFound { .. })));
}
#[test]
fn user_session_summary_different_user_excluded() {
let db = make_test_db_with_sessions_schema();
// User A session
let sid = db.start_session(1, &AgentId::new("test").unwrap(), "default").unwrap();
db.session_signal(sid, "view", EntityId::new(1), 1.0, Timestamp::now()).unwrap();
db.close_session(sid).unwrap();
// User B session
let sid = db.start_session(2, &AgentId::new("test").unwrap(), "default").unwrap();
db.session_signal(sid, "like", EntityId::new(2), 1.0, Timestamp::now()).unwrap();
db.close_session(sid).unwrap();
let summary = db.user_session_summary(1, 0).unwrap();
assert_eq!(summary.sessions_count, 1);
assert_eq!(summary.total_signals, 1);
assert_eq!(summary.top_signal_types[0].0, "view");
}