tidaldb/docs/planning/milestone-7/phase-4/task-08-rlhf-export.md
2026-02-23 22:41:16 -07:00

333 lines
11 KiB
Markdown

# Task 08: RLHF Training Data Export
## Delivers
`db.export_signals(ExportRequest)` API that reads signal events from the WAL by time range and signal type, returning a flat `Vec<ExportedSignal>` suitable for RLHF training pipelines. Supports `ExportFormat::JsonLines` for direct consumption by ML tooling.
This is the first step toward closing the RLHF loop: agent sessions write reward signals via the session layer, and `export_signals` extracts them for offline training. The export reads WAL segment files directly -- it does not interfere with the live write path.
## Complexity: M
## Dependencies
- task-01 complete (establishes instrumentation pattern)
- `tidal/src/wal/mod.rs` -- WAL segment files, `reader::recover()`
- `tidal/src/wal/reader.rs` -- segment scanning and event recovery
- `tidal/src/wal/format.rs` -- `EventRecord` wire format
- `tidal/src/signals/mod.rs` -- `SignalLedger` for signal type name resolution
- `tidal/src/schema/mod.rs` -- `Schema` for signal type definitions
## Technical Design
### 1. Type definitions
Create `tidal/src/db/export.rs`:
```rust
use crate::schema::EntityId;
/// Output format for exported signals.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ExportFormat {
/// One JSON object per line, no enclosing array.
/// Each line: `{"entity_id":42,"signal_type":"view","weight":1.0,"timestamp_ns":1708700000000000000,"user_id":null}`
JsonLines,
}
/// Request parameters for signal export.
///
/// Filters WAL events by time range, signal type, and optionally user ID.
/// At least one of `since` or `until` must be set to prevent unbounded scans.
///
/// # Examples
///
/// ```ignore
/// let req = ExportRequest {
/// user_id: None, // all users
/// signal_types: vec!["view".into(), "like".into()],
/// since: Some(one_hour_ago),
/// until: Some(now),
/// format: ExportFormat::JsonLines,
/// limit: Some(10_000),
/// };
/// let signals = db.export_signals(req)?;
/// ```
#[derive(Debug, Clone)]
pub struct ExportRequest {
/// Filter to signals for a specific user. `None` means all users.
///
/// Note: the WAL stores `entity_id` (the content item), not `user_id`.
/// User-scoped export requires a secondary lookup from the session
/// journal or signal ledger's per-user index. If the user_id filter
/// is set but no user-to-entity mapping exists, an empty result is
/// returned.
pub user_id: Option<u64>,
/// Signal type names to include. Empty means all signal types.
pub signal_types: Vec<String>,
/// Start of the time range (inclusive). Nanosecond timestamp.
/// `None` means "from the beginning of the WAL."
pub since: Option<u64>,
/// End of the time range (exclusive). Nanosecond timestamp.
/// `None` means "up to now."
pub until: Option<u64>,
/// Output format.
pub format: ExportFormat,
/// Maximum number of events to return. `None` means no limit.
/// Applied after filtering.
pub limit: Option<usize>,
}
impl ExportRequest {
/// Create a request for all signals in a time range.
#[must_use]
pub fn time_range(since: u64, until: u64) -> Self {
Self {
user_id: None,
signal_types: Vec::new(),
since: Some(since),
until: Some(until),
format: ExportFormat::JsonLines,
limit: None,
}
}
/// Create a request for specific signal types in a time range.
#[must_use]
pub fn signals_in_range(
signal_types: Vec<String>,
since: u64,
until: u64,
) -> Self {
Self {
user_id: None,
signal_types,
since: Some(since),
until: Some(until),
format: ExportFormat::JsonLines,
limit: None,
}
}
}
/// A single exported signal event.
///
/// Flat struct with all fields populated. Suitable for serialization
/// to JSON lines or other tabular formats.
#[derive(Debug, Clone, PartialEq)]
pub struct ExportedSignal {
/// The entity (content item) that received the signal.
pub entity_id: u64,
/// The signal type name (e.g., "view", "like", "skip").
pub signal_type: String,
/// The signal weight at write time.
pub weight: f32,
/// Nanosecond timestamp when the signal was written.
pub timestamp_ns: u64,
}
impl ExportedSignal {
/// Render as a JSON line (no trailing newline).
#[must_use]
pub fn to_json_line(&self) -> String {
format!(
r#"{{"entity_id":{},"signal_type":"{}","weight":{},"timestamp_ns":{}}}"#,
self.entity_id, self.signal_type, self.weight, self.timestamp_ns
)
}
}
```
### 2. WAL scanning implementation
The export reads WAL segment files using the existing `reader` module. The scan:
1. Lists all WAL segment files in the WAL directory
2. For each segment, reads all events using `reader::read_segment()`
3. Filters by timestamp range (`since <= timestamp_ns < until`)
4. Filters by signal type (if `signal_types` is non-empty, resolve names to type IDs via the schema and filter)
5. Collects into `Vec<ExportedSignal>`
6. Applies `limit` if set
```rust
impl TidalDb {
/// Export signal events from the WAL for offline analysis or RLHF training.
///
/// Reads WAL segment files directly. Does not interfere with the live
/// write path. For large WAL backlogs, use narrow time ranges.
///
/// # Errors
///
/// Returns `TidalError::Internal` if WAL segments cannot be read.
/// Returns `TidalError::Schema(UnknownSignalType)` if a signal type
/// name in the request does not exist in the schema.
pub fn export_signals(&self, request: ExportRequest) -> crate::Result<Vec<ExportedSignal>> {
// Implementation: scan WAL segments, filter, collect
}
}
```
### 3. Signal type resolution
The WAL stores signal types as `u8` IDs. The export must resolve these back to names using the schema's signal type definitions:
```rust
// Build a lookup table: signal_type_id -> signal_type_name
let type_names: HashMap<u8, String> = schema
.signal_types()
.iter()
.enumerate()
.map(|(i, def)| (i as u8, def.name.clone()))
.collect();
// Build a filter set if signal_types is non-empty
let type_filter: HashSet<u8> = if request.signal_types.is_empty() {
HashSet::new() // empty means "accept all"
} else {
request.signal_types.iter()
.map(|name| schema.resolve_signal_type(name)
.ok_or_else(|| TidalError::Schema(SchemaError::UnknownSignalType(name.clone()))))
.collect::<Result<HashSet<u8>, _>>()?
};
```
### 4. Module wiring
In `tidal/src/db/mod.rs`, add:
```rust
pub(crate) mod export;
```
Re-export from `tidal/src/lib.rs`:
```rust
pub use db::export::{ExportFormat, ExportRequest, ExportedSignal};
```
## Acceptance Criteria
- [ ] `ExportFormat` enum with `JsonLines` variant
- [ ] `ExportRequest` struct with `user_id`, `signal_types`, `since`, `until`, `format`, `limit` fields
- [ ] `ExportRequest::time_range()` and `ExportRequest::signals_in_range()` constructors
- [ ] `ExportedSignal` struct with `entity_id`, `signal_type`, `weight`, `timestamp_ns` fields
- [ ] `ExportedSignal::to_json_line()` produces valid JSON
- [ ] `db.export_signals(request) -> Result<Vec<ExportedSignal>>` scans WAL segments
- [ ] Time range filtering works correctly (`since` inclusive, `until` exclusive)
- [ ] Signal type filtering works correctly (empty means all types)
- [ ] `limit` parameter caps the number of returned events
- [ ] Unknown signal type names in `signal_types` return `SchemaError::UnknownSignalType`
- [ ] Export does not interfere with live WAL writes (read-only scan)
- [ ] Types re-exported from `lib.rs`
- [ ] `cargo clippy -D warnings` and `cargo fmt --check` pass
## Test Strategy
```rust
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn exported_signal_json_line_format() {
let sig = ExportedSignal {
entity_id: 42,
signal_type: "view".to_string(),
weight: 1.0,
timestamp_ns: 1_700_000_000_000_000_000,
};
let line = sig.to_json_line();
assert!(line.starts_with('{'));
assert!(line.ends_with('}'));
assert!(line.contains("\"entity_id\":42"));
assert!(line.contains("\"signal_type\":\"view\""));
assert!(line.contains("\"weight\":1"));
assert!(line.contains("\"timestamp_ns\":1700000000000000000"));
}
#[test]
fn export_request_time_range_constructor() {
let req = ExportRequest::time_range(100, 200);
assert_eq!(req.since, Some(100));
assert_eq!(req.until, Some(200));
assert!(req.signal_types.is_empty());
assert!(req.user_id.is_none());
assert!(req.limit.is_none());
assert_eq!(req.format, ExportFormat::JsonLines);
}
#[test]
fn export_request_signals_in_range_constructor() {
let req = ExportRequest::signals_in_range(
vec!["view".into(), "like".into()],
100,
200,
);
assert_eq!(req.signal_types.len(), 2);
assert_eq!(req.since, Some(100));
}
}
```
Integration test:
```rust
#[test]
fn export_signals_returns_written_events() {
let db = make_test_db_with_schema_and_wal();
let t1 = Timestamp::now().as_nanos();
db.signal("view", EntityId::new(1), 1.0, Timestamp::from_nanos(t1)).unwrap();
db.signal("like", EntityId::new(2), 1.0, Timestamp::from_nanos(t1 + 1000)).unwrap();
db.signal("view", EntityId::new(3), 1.0, Timestamp::from_nanos(t1 + 2000)).unwrap();
// Export all
let req = ExportRequest::time_range(t1, t1 + 10_000);
let signals = db.export_signals(req).unwrap();
assert_eq!(signals.len(), 3);
// Export views only
let req = ExportRequest::signals_in_range(vec!["view".into()], t1, t1 + 10_000);
let signals = db.export_signals(req).unwrap();
assert_eq!(signals.len(), 2);
assert!(signals.iter().all(|s| s.signal_type == "view"));
// Export with limit
let mut req = ExportRequest::time_range(t1, t1 + 10_000);
req.limit = Some(1);
let signals = db.export_signals(req).unwrap();
assert_eq!(signals.len(), 1);
}
#[test]
fn export_signals_unknown_type_returns_error() {
let db = make_test_db_with_schema_and_wal();
let req = ExportRequest::signals_in_range(
vec!["nonexistent".into()],
0,
u64::MAX,
);
let result = db.export_signals(req);
assert!(result.is_err());
}
#[test]
fn export_signals_empty_wal_returns_empty() {
let db = make_test_db_with_schema_and_wal();
let req = ExportRequest::time_range(0, u64::MAX);
let signals = db.export_signals(req).unwrap();
assert!(signals.is_empty());
}
#[test]
fn exported_signal_json_lines_output() {
let db = make_test_db_with_schema_and_wal();
let t1 = 1_700_000_000_000_000_000u64;
db.signal("view", EntityId::new(42), 2.5, Timestamp::from_nanos(t1)).unwrap();
let req = ExportRequest::time_range(t1, t1 + 1);
let signals = db.export_signals(req).unwrap();
let line = signals[0].to_json_line();
assert!(line.contains("\"entity_id\":42"));
assert!(line.contains("\"signal_type\":\"view\""));
}
```