333 lines
11 KiB
Markdown
333 lines
11 KiB
Markdown
# Task 08: RLHF Training Data Export
|
|
|
|
## Delivers
|
|
|
|
`db.export_signals(ExportRequest)` API that reads signal events from the WAL by time range and signal type, returning a flat `Vec<ExportedSignal>` suitable for RLHF training pipelines. Supports `ExportFormat::JsonLines` for direct consumption by ML tooling.
|
|
|
|
This is the first step toward closing the RLHF loop: agent sessions write reward signals via the session layer, and `export_signals` extracts them for offline training. The export reads WAL segment files directly -- it does not interfere with the live write path.
|
|
|
|
## Complexity: M
|
|
|
|
## Dependencies
|
|
|
|
- task-01 complete (establishes instrumentation pattern)
|
|
- `tidal/src/wal/mod.rs` -- WAL segment files, `reader::recover()`
|
|
- `tidal/src/wal/reader.rs` -- segment scanning and event recovery
|
|
- `tidal/src/wal/format.rs` -- `EventRecord` wire format
|
|
- `tidal/src/signals/mod.rs` -- `SignalLedger` for signal type name resolution
|
|
- `tidal/src/schema/mod.rs` -- `Schema` for signal type definitions
|
|
|
|
## Technical Design
|
|
|
|
### 1. Type definitions
|
|
|
|
Create `tidal/src/db/export.rs`:
|
|
|
|
```rust
|
|
use crate::schema::EntityId;
|
|
|
|
/// Output format for exported signals.
|
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
|
pub enum ExportFormat {
|
|
/// One JSON object per line, no enclosing array.
|
|
/// Each line: `{"entity_id":42,"signal_type":"view","weight":1.0,"timestamp_ns":1708700000000000000,"user_id":null}`
|
|
JsonLines,
|
|
}
|
|
|
|
/// Request parameters for signal export.
|
|
///
|
|
/// Filters WAL events by time range, signal type, and optionally user ID.
|
|
/// At least one of `since` or `until` must be set to prevent unbounded scans.
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```ignore
|
|
/// let req = ExportRequest {
|
|
/// user_id: None, // all users
|
|
/// signal_types: vec!["view".into(), "like".into()],
|
|
/// since: Some(one_hour_ago),
|
|
/// until: Some(now),
|
|
/// format: ExportFormat::JsonLines,
|
|
/// limit: Some(10_000),
|
|
/// };
|
|
/// let signals = db.export_signals(req)?;
|
|
/// ```
|
|
#[derive(Debug, Clone)]
|
|
pub struct ExportRequest {
|
|
/// Filter to signals for a specific user. `None` means all users.
|
|
///
|
|
/// Note: the WAL stores `entity_id` (the content item), not `user_id`.
|
|
/// User-scoped export requires a secondary lookup from the session
|
|
/// journal or signal ledger's per-user index. If the user_id filter
|
|
/// is set but no user-to-entity mapping exists, an empty result is
|
|
/// returned.
|
|
pub user_id: Option<u64>,
|
|
/// Signal type names to include. Empty means all signal types.
|
|
pub signal_types: Vec<String>,
|
|
/// Start of the time range (inclusive). Nanosecond timestamp.
|
|
/// `None` means "from the beginning of the WAL."
|
|
pub since: Option<u64>,
|
|
/// End of the time range (exclusive). Nanosecond timestamp.
|
|
/// `None` means "up to now."
|
|
pub until: Option<u64>,
|
|
/// Output format.
|
|
pub format: ExportFormat,
|
|
/// Maximum number of events to return. `None` means no limit.
|
|
/// Applied after filtering.
|
|
pub limit: Option<usize>,
|
|
}
|
|
|
|
impl ExportRequest {
|
|
/// Create a request for all signals in a time range.
|
|
#[must_use]
|
|
pub fn time_range(since: u64, until: u64) -> Self {
|
|
Self {
|
|
user_id: None,
|
|
signal_types: Vec::new(),
|
|
since: Some(since),
|
|
until: Some(until),
|
|
format: ExportFormat::JsonLines,
|
|
limit: None,
|
|
}
|
|
}
|
|
|
|
/// Create a request for specific signal types in a time range.
|
|
#[must_use]
|
|
pub fn signals_in_range(
|
|
signal_types: Vec<String>,
|
|
since: u64,
|
|
until: u64,
|
|
) -> Self {
|
|
Self {
|
|
user_id: None,
|
|
signal_types,
|
|
since: Some(since),
|
|
until: Some(until),
|
|
format: ExportFormat::JsonLines,
|
|
limit: None,
|
|
}
|
|
}
|
|
}
|
|
|
|
/// A single exported signal event.
|
|
///
|
|
/// Flat struct with all fields populated. Suitable for serialization
|
|
/// to JSON lines or other tabular formats.
|
|
#[derive(Debug, Clone, PartialEq)]
|
|
pub struct ExportedSignal {
|
|
/// The entity (content item) that received the signal.
|
|
pub entity_id: u64,
|
|
/// The signal type name (e.g., "view", "like", "skip").
|
|
pub signal_type: String,
|
|
/// The signal weight at write time.
|
|
pub weight: f32,
|
|
/// Nanosecond timestamp when the signal was written.
|
|
pub timestamp_ns: u64,
|
|
}
|
|
|
|
impl ExportedSignal {
|
|
/// Render as a JSON line (no trailing newline).
|
|
#[must_use]
|
|
pub fn to_json_line(&self) -> String {
|
|
format!(
|
|
r#"{{"entity_id":{},"signal_type":"{}","weight":{},"timestamp_ns":{}}}"#,
|
|
self.entity_id, self.signal_type, self.weight, self.timestamp_ns
|
|
)
|
|
}
|
|
}
|
|
```
|
|
|
|
### 2. WAL scanning implementation
|
|
|
|
The export reads WAL segment files using the existing `reader` module. The scan:
|
|
|
|
1. Lists all WAL segment files in the WAL directory
|
|
2. For each segment, reads all events using `reader::read_segment()`
|
|
3. Filters by timestamp range (`since <= timestamp_ns < until`)
|
|
4. Filters by signal type (if `signal_types` is non-empty, resolve names to type IDs via the schema and filter)
|
|
5. Collects into `Vec<ExportedSignal>`
|
|
6. Applies `limit` if set
|
|
|
|
```rust
|
|
impl TidalDb {
|
|
/// Export signal events from the WAL for offline analysis or RLHF training.
|
|
///
|
|
/// Reads WAL segment files directly. Does not interfere with the live
|
|
/// write path. For large WAL backlogs, use narrow time ranges.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Returns `TidalError::Internal` if WAL segments cannot be read.
|
|
/// Returns `TidalError::Schema(UnknownSignalType)` if a signal type
|
|
/// name in the request does not exist in the schema.
|
|
pub fn export_signals(&self, request: ExportRequest) -> crate::Result<Vec<ExportedSignal>> {
|
|
// Implementation: scan WAL segments, filter, collect
|
|
}
|
|
}
|
|
```
|
|
|
|
### 3. Signal type resolution
|
|
|
|
The WAL stores signal types as `u8` IDs. The export must resolve these back to names using the schema's signal type definitions:
|
|
|
|
```rust
|
|
// Build a lookup table: signal_type_id -> signal_type_name
|
|
let type_names: HashMap<u8, String> = schema
|
|
.signal_types()
|
|
.iter()
|
|
.enumerate()
|
|
.map(|(i, def)| (i as u8, def.name.clone()))
|
|
.collect();
|
|
|
|
// Build a filter set if signal_types is non-empty
|
|
let type_filter: HashSet<u8> = if request.signal_types.is_empty() {
|
|
HashSet::new() // empty means "accept all"
|
|
} else {
|
|
request.signal_types.iter()
|
|
.map(|name| schema.resolve_signal_type(name)
|
|
.ok_or_else(|| TidalError::Schema(SchemaError::UnknownSignalType(name.clone()))))
|
|
.collect::<Result<HashSet<u8>, _>>()?
|
|
};
|
|
```
|
|
|
|
### 4. Module wiring
|
|
|
|
In `tidal/src/db/mod.rs`, add:
|
|
|
|
```rust
|
|
pub(crate) mod export;
|
|
```
|
|
|
|
Re-export from `tidal/src/lib.rs`:
|
|
|
|
```rust
|
|
pub use db::export::{ExportFormat, ExportRequest, ExportedSignal};
|
|
```
|
|
|
|
## Acceptance Criteria
|
|
|
|
- [ ] `ExportFormat` enum with `JsonLines` variant
|
|
- [ ] `ExportRequest` struct with `user_id`, `signal_types`, `since`, `until`, `format`, `limit` fields
|
|
- [ ] `ExportRequest::time_range()` and `ExportRequest::signals_in_range()` constructors
|
|
- [ ] `ExportedSignal` struct with `entity_id`, `signal_type`, `weight`, `timestamp_ns` fields
|
|
- [ ] `ExportedSignal::to_json_line()` produces valid JSON
|
|
- [ ] `db.export_signals(request) -> Result<Vec<ExportedSignal>>` scans WAL segments
|
|
- [ ] Time range filtering works correctly (`since` inclusive, `until` exclusive)
|
|
- [ ] Signal type filtering works correctly (empty means all types)
|
|
- [ ] `limit` parameter caps the number of returned events
|
|
- [ ] Unknown signal type names in `signal_types` return `SchemaError::UnknownSignalType`
|
|
- [ ] Export does not interfere with live WAL writes (read-only scan)
|
|
- [ ] Types re-exported from `lib.rs`
|
|
- [ ] `cargo clippy -D warnings` and `cargo fmt --check` pass
|
|
|
|
## Test Strategy
|
|
|
|
```rust
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
#[test]
|
|
fn exported_signal_json_line_format() {
|
|
let sig = ExportedSignal {
|
|
entity_id: 42,
|
|
signal_type: "view".to_string(),
|
|
weight: 1.0,
|
|
timestamp_ns: 1_700_000_000_000_000_000,
|
|
};
|
|
let line = sig.to_json_line();
|
|
assert!(line.starts_with('{'));
|
|
assert!(line.ends_with('}'));
|
|
assert!(line.contains("\"entity_id\":42"));
|
|
assert!(line.contains("\"signal_type\":\"view\""));
|
|
assert!(line.contains("\"weight\":1"));
|
|
assert!(line.contains("\"timestamp_ns\":1700000000000000000"));
|
|
}
|
|
|
|
#[test]
|
|
fn export_request_time_range_constructor() {
|
|
let req = ExportRequest::time_range(100, 200);
|
|
assert_eq!(req.since, Some(100));
|
|
assert_eq!(req.until, Some(200));
|
|
assert!(req.signal_types.is_empty());
|
|
assert!(req.user_id.is_none());
|
|
assert!(req.limit.is_none());
|
|
assert_eq!(req.format, ExportFormat::JsonLines);
|
|
}
|
|
|
|
#[test]
|
|
fn export_request_signals_in_range_constructor() {
|
|
let req = ExportRequest::signals_in_range(
|
|
vec!["view".into(), "like".into()],
|
|
100,
|
|
200,
|
|
);
|
|
assert_eq!(req.signal_types.len(), 2);
|
|
assert_eq!(req.since, Some(100));
|
|
}
|
|
}
|
|
```
|
|
|
|
Integration test:
|
|
|
|
```rust
|
|
#[test]
|
|
fn export_signals_returns_written_events() {
|
|
let db = make_test_db_with_schema_and_wal();
|
|
let t1 = Timestamp::now().as_nanos();
|
|
db.signal("view", EntityId::new(1), 1.0, Timestamp::from_nanos(t1)).unwrap();
|
|
db.signal("like", EntityId::new(2), 1.0, Timestamp::from_nanos(t1 + 1000)).unwrap();
|
|
db.signal("view", EntityId::new(3), 1.0, Timestamp::from_nanos(t1 + 2000)).unwrap();
|
|
|
|
// Export all
|
|
let req = ExportRequest::time_range(t1, t1 + 10_000);
|
|
let signals = db.export_signals(req).unwrap();
|
|
assert_eq!(signals.len(), 3);
|
|
|
|
// Export views only
|
|
let req = ExportRequest::signals_in_range(vec!["view".into()], t1, t1 + 10_000);
|
|
let signals = db.export_signals(req).unwrap();
|
|
assert_eq!(signals.len(), 2);
|
|
assert!(signals.iter().all(|s| s.signal_type == "view"));
|
|
|
|
// Export with limit
|
|
let mut req = ExportRequest::time_range(t1, t1 + 10_000);
|
|
req.limit = Some(1);
|
|
let signals = db.export_signals(req).unwrap();
|
|
assert_eq!(signals.len(), 1);
|
|
}
|
|
|
|
#[test]
|
|
fn export_signals_unknown_type_returns_error() {
|
|
let db = make_test_db_with_schema_and_wal();
|
|
let req = ExportRequest::signals_in_range(
|
|
vec!["nonexistent".into()],
|
|
0,
|
|
u64::MAX,
|
|
);
|
|
let result = db.export_signals(req);
|
|
assert!(result.is_err());
|
|
}
|
|
|
|
#[test]
|
|
fn export_signals_empty_wal_returns_empty() {
|
|
let db = make_test_db_with_schema_and_wal();
|
|
let req = ExportRequest::time_range(0, u64::MAX);
|
|
let signals = db.export_signals(req).unwrap();
|
|
assert!(signals.is_empty());
|
|
}
|
|
|
|
#[test]
|
|
fn exported_signal_json_lines_output() {
|
|
let db = make_test_db_with_schema_and_wal();
|
|
let t1 = 1_700_000_000_000_000_000u64;
|
|
db.signal("view", EntityId::new(42), 2.5, Timestamp::from_nanos(t1)).unwrap();
|
|
|
|
let req = ExportRequest::time_range(t1, t1 + 1);
|
|
let signals = db.export_signals(req).unwrap();
|
|
let line = signals[0].to_json_line();
|
|
assert!(line.contains("\"entity_id\":42"));
|
|
assert!(line.contains("\"signal_type\":\"view\""));
|
|
}
|
|
```
|