tidaldb/docs/planning/milestone-7/phase-4/task-09-cross-session-aggregation.md
2026-02-23 22:41:16 -07:00

13 KiB

Task 09: Cross-Session Aggregation Query

Delivers

db.user_session_summary(user_id, since) API that scans closed session archives and returns an aggregate view of a user's session history: session count, total signals, total rejections, top signal types, and preference drift (cosine distance between earliest and latest preference vectors). This enables agent orchestrators to assess how a user's taste profile has evolved across sessions.

Complexity: M

Dependencies

  • task-01 complete (establishes instrumentation pattern)
  • tidal/src/session/types.rs -- SessionId, SessionSummary
  • tidal/src/session/snapshot.rs -- SessionSnapshot, SessionContext
  • tidal/src/session/signal_state.rs -- SessionSignalState per-signal data
  • tidal/src/entities/preference.rs -- PreferenceVectors for drift computation
  • tidal/src/db/mod.rs -- TidalDb.closed_sessions DashMap

Technical Design

1. Type definitions

Add to tidal/src/db/export.rs (or a new tidal/src/db/aggregation.rs):

use std::collections::HashMap;

/// Aggregate summary of a user's session history.
///
/// Computed by scanning `closed_sessions` in memory. Only sessions
/// that have been closed during the current process lifetime are visible.
/// Persistent session archive scanning is deferred to M8.
///
/// # Examples
///
/// ```ignore
/// let summary = db.user_session_summary(user_id, one_week_ago_ns)?;
/// println!("Sessions: {}", summary.sessions_count);
/// println!("Top signal: {:?}", summary.top_signal_types.first());
/// if let Some(drift) = summary.preference_drift {
///     println!("Preference drift: {drift:.4}");
/// }
/// ```
#[derive(Debug, Clone)]
pub struct UserSessionSummary {
    /// Number of closed sessions for this user in the time range.
    pub sessions_count: u64,
    /// Total signal writes across all matching sessions.
    pub total_signals: u64,
    /// Total policy rejections across all matching sessions.
    pub total_rejections: u64,
    /// Top signal types by frequency, sorted descending.
    /// Each entry is `(signal_type_name, count)`. Limited to top 10.
    pub top_signal_types: Vec<(String, u64)>,
    /// Cosine distance between the user's preference vector at the
    /// earliest matching session and the current preference vector.
    ///
    /// `None` if no preference vector data is available (user has no
    /// embedding-based interactions, or fewer than 2 sessions exist).
    ///
    /// Range: `[0.0, 2.0]` where 0.0 = identical, 2.0 = opposite.
    /// Computed as `1.0 - cosine_similarity`.
    pub preference_drift: Option<f64>,
    /// The user ID this summary is for.
    pub user_id: u64,
    /// Nanosecond timestamp of the `since` filter applied.
    pub since_ns: u64,
    /// Nanosecond timestamp of the earliest matching session start.
    /// `None` if no sessions matched.
    pub earliest_session_ns: Option<u64>,
    /// Nanosecond timestamp of the latest matching session close.
    /// `None` if no sessions matched.
    pub latest_session_ns: Option<u64>,
}

2. Implementation

impl TidalDb {
    /// Compute an aggregate summary of a user's closed session history.
    ///
    /// Scans `closed_sessions` (in-memory DashMap) for sessions belonging
    /// to `user_id` that started at or after `since_ns`. Returns aggregate
    /// counts and preference drift.
    ///
    /// # Current Limitation
    ///
    /// Only sessions closed during the current process lifetime are visible.
    /// Sessions from previous runs that were archived to persistent storage
    /// are not scanned. This will be addressed in M8 when cross-node session
    /// aggregation requires persistent archive reads.
    ///
    /// # Errors
    ///
    /// Returns `TidalError::NotFound` if no closed sessions exist for the user.
    pub fn user_session_summary(
        &self,
        user_id: u64,
        since_ns: u64,
    ) -> crate::Result<UserSessionSummary> {
        // Implementation outline:
        // 1. Iterate closed_sessions DashMap
        // 2. Filter by user_id and started_at_ns >= since_ns
        // 3. Accumulate totals and signal type frequencies
        // 4. Compute preference drift via cosine distance
        // 5. Return UserSessionSummary
    }
}

3. Session scanning logic

let mut sessions_count: u64 = 0;
let mut total_signals: u64 = 0;
let mut total_rejections: u64 = 0;
let mut signal_freq: HashMap<String, u64> = HashMap::new();
let mut earliest_ns: Option<u64> = None;
let mut latest_ns: Option<u64> = None;

for entry in self.closed_sessions.iter() {
    let snapshot = entry.value();
    if snapshot.user_id != user_id {
        continue;
    }
    if snapshot.started_at_ns < since_ns {
        continue;
    }

    sessions_count += 1;
    total_signals += snapshot.signals_written;
    total_rejections += snapshot.rejections;

    // Accumulate signal type frequencies from snapshot
    for (signal_name, signal_state) in &snapshot.signal_states {
        *signal_freq.entry(signal_name.clone()).or_insert(0) += signal_state.count;
    }

    // Track time range
    earliest_ns = Some(earliest_ns.map_or(snapshot.started_at_ns, |e| e.min(snapshot.started_at_ns)));
    latest_ns = Some(latest_ns.map_or(snapshot.closed_at_ns, |l| l.max(snapshot.closed_at_ns)));
}

if sessions_count == 0 {
    return Err(TidalError::NotFound {
        kind: EntityKind::User,
        id: EntityId::new(user_id),
    });
}

4. Top signal types

let mut top_signal_types: Vec<(String, u64)> = signal_freq.into_iter().collect();
top_signal_types.sort_by(|a, b| b.1.cmp(&a.1));
top_signal_types.truncate(10);

5. Preference drift computation

Cosine distance between the user's current preference vector and a baseline. The baseline is the preference vector at the time of the earliest matching session. Since we do not persist historical preference vectors, we approximate drift using the current preference vector and a zero vector (fresh user):

let preference_drift = self.preference_vectors
    .get(user_id)
    .map(|current_vec| {
        // Cosine distance from the origin (zero vector) = 1.0 - 0.0 = 1.0
        // This is not useful. Instead, compute magnitude as a proxy for drift.
        //
        // Better approach: if we have the preference vector from the earliest
        // session snapshot, use that as baseline.
        //
        // For now, if earliest and latest sessions both have preference snapshots,
        // compute cosine distance between them.
        let norm = current_vec.iter().map(|x| x * x).sum::<f32>().sqrt();
        if norm < f32::EPSILON {
            None
        } else {
            // Without historical snapshots, report None.
            // M8 will store per-session preference snapshots.
            None
        }
    })
    .flatten();

If SessionSnapshot already stores a preference vector snapshot (from the close_session hook that calls apply_session_preference_update), use that for drift computation:

let preference_drift = if let (Some(earliest_snap), Some(latest_snap)) = (earliest_snapshot, latest_snapshot) {
    if let (Some(early_pref), Some(late_pref)) = (&earliest_snap.preference_vector, &latest_snap.preference_vector) {
        Some(cosine_distance(early_pref, late_pref))
    } else {
        None
    }
} else {
    None
};

/// Cosine distance: 1.0 - cosine_similarity.
/// Returns 0.0 for identical vectors, up to 2.0 for opposite vectors.
fn cosine_distance(a: &[f32], b: &[f32]) -> f64 {
    debug_assert_eq!(a.len(), b.len());
    let mut dot = 0.0_f64;
    let mut norm_a = 0.0_f64;
    let mut norm_b = 0.0_f64;
    for (x, y) in a.iter().zip(b.iter()) {
        let xf = f64::from(*x);
        let yf = f64::from(*y);
        dot += xf * yf;
        norm_a += xf * xf;
        norm_b += yf * yf;
    }
    let denom = norm_a.sqrt() * norm_b.sqrt();
    if denom < f64::EPSILON {
        return 0.0;
    }
    1.0 - (dot / denom)
}

6. Module wiring

In tidal/src/db/mod.rs, if using a separate file:

pub(crate) mod aggregation;

Re-export from tidal/src/lib.rs:

pub use db::aggregation::UserSessionSummary;
// or from db::export if co-located:
pub use db::export::UserSessionSummary;

Acceptance Criteria

  • UserSessionSummary struct with sessions_count, total_signals, total_rejections, top_signal_types, preference_drift, user_id, since_ns, earliest_session_ns, latest_session_ns
  • db.user_session_summary(user_id, since_ns) -> Result<UserSessionSummary>
  • Scans closed_sessions DashMap filtered by user_id and started_at_ns
  • top_signal_types sorted descending by count, limited to top 10
  • preference_drift computed as cosine distance when preference vector snapshots are available
  • Returns TidalError::NotFound when no matching sessions exist
  • sessions_count matches the number of closed sessions for the user in range
  • total_signals and total_rejections are correct sums
  • earliest_session_ns and latest_session_ns correctly track time bounds
  • Type re-exported from lib.rs
  • cargo clippy -D warnings and cargo fmt --check pass

Test Strategy

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn cosine_distance_identical_vectors() {
        let a = vec![1.0_f32, 0.0, 0.0];
        let b = vec![1.0_f32, 0.0, 0.0];
        let dist = cosine_distance(&a, &b);
        assert!(dist.abs() < 1e-10);
    }

    #[test]
    fn cosine_distance_orthogonal_vectors() {
        let a = vec![1.0_f32, 0.0];
        let b = vec![0.0_f32, 1.0];
        let dist = cosine_distance(&a, &b);
        assert!((dist - 1.0).abs() < 1e-10);
    }

    #[test]
    fn cosine_distance_opposite_vectors() {
        let a = vec![1.0_f32, 0.0];
        let b = vec![-1.0_f32, 0.0];
        let dist = cosine_distance(&a, &b);
        assert!((dist - 2.0).abs() < 1e-10);
    }

    #[test]
    fn cosine_distance_zero_vector() {
        let a = vec![0.0_f32, 0.0];
        let b = vec![1.0_f32, 0.0];
        let dist = cosine_distance(&a, &b);
        assert!(dist.abs() < 1e-10); // Convention: zero distance for zero vector
    }
}

Integration test:

#[test]
fn user_session_summary_aggregates_correctly() {
    let db = make_test_db_with_sessions_schema();
    let user_id = 42u64;

    // Create and close 3 sessions with signals
    for i in 0..3 {
        let sid = db.start_session(user_id, &AgentId::new("test").unwrap(), "default").unwrap();
        db.session_signal(sid, "view", EntityId::new(i * 10 + 1), 1.0, Timestamp::now()).unwrap();
        db.session_signal(sid, "view", EntityId::new(i * 10 + 2), 1.0, Timestamp::now()).unwrap();
        db.session_signal(sid, "like", EntityId::new(i * 10 + 3), 1.0, Timestamp::now()).unwrap();
        db.close_session(sid).unwrap();
    }

    let summary = db.user_session_summary(user_id, 0).unwrap();
    assert_eq!(summary.sessions_count, 3);
    assert_eq!(summary.total_signals, 9); // 3 per session
    assert_eq!(summary.total_rejections, 0);
    assert_eq!(summary.user_id, user_id);

    // Top signal types: "view" should be first (6 total), then "like" (3 total)
    assert_eq!(summary.top_signal_types[0].0, "view");
    assert_eq!(summary.top_signal_types[0].1, 6);
    assert_eq!(summary.top_signal_types[1].0, "like");
    assert_eq!(summary.top_signal_types[1].1, 3);
}

#[test]
fn user_session_summary_since_filter() {
    let db = make_test_db_with_sessions_schema();
    let user_id = 42u64;

    // Session 1: old
    let sid = db.start_session(user_id, &AgentId::new("test").unwrap(), "default").unwrap();
    db.close_session(sid).unwrap();

    let midpoint = Timestamp::now().as_nanos();

    // Session 2: new
    let sid = db.start_session(user_id, &AgentId::new("test").unwrap(), "default").unwrap();
    db.session_signal(sid, "view", EntityId::new(1), 1.0, Timestamp::now()).unwrap();
    db.close_session(sid).unwrap();

    let summary = db.user_session_summary(user_id, midpoint).unwrap();
    assert_eq!(summary.sessions_count, 1); // only the session after midpoint
}

#[test]
fn user_session_summary_no_sessions_returns_not_found() {
    let db = make_test_db_with_sessions_schema();
    let result = db.user_session_summary(999, 0);
    assert!(matches!(result, Err(TidalError::NotFound { .. })));
}

#[test]
fn user_session_summary_different_user_excluded() {
    let db = make_test_db_with_sessions_schema();

    // User A session
    let sid = db.start_session(1, &AgentId::new("test").unwrap(), "default").unwrap();
    db.session_signal(sid, "view", EntityId::new(1), 1.0, Timestamp::now()).unwrap();
    db.close_session(sid).unwrap();

    // User B session
    let sid = db.start_session(2, &AgentId::new("test").unwrap(), "default").unwrap();
    db.session_signal(sid, "like", EntityId::new(2), 1.0, Timestamp::now()).unwrap();
    db.close_session(sid).unwrap();

    let summary = db.user_session_summary(1, 0).unwrap();
    assert_eq!(summary.sessions_count, 1);
    assert_eq!(summary.total_signals, 1);
    assert_eq!(summary.top_signal_types[0].0, "view");
}