11 KiB
Task 08: RLHF Training Data Export
Delivers
db.export_signals(ExportRequest) API that reads signal events from the WAL by time range and signal type, returning a flat Vec<ExportedSignal> suitable for RLHF training pipelines. Supports ExportFormat::JsonLines for direct consumption by ML tooling.
This is the first step toward closing the RLHF loop: agent sessions write reward signals via the session layer, and export_signals extracts them for offline training. The export reads WAL segment files directly -- it does not interfere with the live write path.
Complexity: M
Dependencies
- task-01 complete (establishes instrumentation pattern)
tidal/src/wal/mod.rs-- WAL segment files,reader::recover()tidal/src/wal/reader.rs-- segment scanning and event recoverytidal/src/wal/format.rs--EventRecordwire formattidal/src/signals/mod.rs--SignalLedgerfor signal type name resolutiontidal/src/schema/mod.rs--Schemafor signal type definitions
Technical Design
1. Type definitions
Create tidal/src/db/export.rs:
use crate::schema::EntityId;
/// Output format for exported signals.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ExportFormat {
/// One JSON object per line, no enclosing array.
/// Each line: `{"entity_id":42,"signal_type":"view","weight":1.0,"timestamp_ns":1708700000000000000,"user_id":null}`
JsonLines,
}
/// Request parameters for signal export.
///
/// Filters WAL events by time range, signal type, and optionally user ID.
/// At least one of `since` or `until` must be set to prevent unbounded scans.
///
/// # Examples
///
/// ```ignore
/// let req = ExportRequest {
/// user_id: None, // all users
/// signal_types: vec!["view".into(), "like".into()],
/// since: Some(one_hour_ago),
/// until: Some(now),
/// format: ExportFormat::JsonLines,
/// limit: Some(10_000),
/// };
/// let signals = db.export_signals(req)?;
/// ```
#[derive(Debug, Clone)]
pub struct ExportRequest {
/// Filter to signals for a specific user. `None` means all users.
///
/// Note: the WAL stores `entity_id` (the content item), not `user_id`.
/// User-scoped export requires a secondary lookup from the session
/// journal or signal ledger's per-user index. If the user_id filter
/// is set but no user-to-entity mapping exists, an empty result is
/// returned.
pub user_id: Option<u64>,
/// Signal type names to include. Empty means all signal types.
pub signal_types: Vec<String>,
/// Start of the time range (inclusive). Nanosecond timestamp.
/// `None` means "from the beginning of the WAL."
pub since: Option<u64>,
/// End of the time range (exclusive). Nanosecond timestamp.
/// `None` means "up to now."
pub until: Option<u64>,
/// Output format.
pub format: ExportFormat,
/// Maximum number of events to return. `None` means no limit.
/// Applied after filtering.
pub limit: Option<usize>,
}
impl ExportRequest {
/// Create a request for all signals in a time range.
#[must_use]
pub fn time_range(since: u64, until: u64) -> Self {
Self {
user_id: None,
signal_types: Vec::new(),
since: Some(since),
until: Some(until),
format: ExportFormat::JsonLines,
limit: None,
}
}
/// Create a request for specific signal types in a time range.
#[must_use]
pub fn signals_in_range(
signal_types: Vec<String>,
since: u64,
until: u64,
) -> Self {
Self {
user_id: None,
signal_types,
since: Some(since),
until: Some(until),
format: ExportFormat::JsonLines,
limit: None,
}
}
}
/// A single exported signal event.
///
/// Flat struct with all fields populated. Suitable for serialization
/// to JSON lines or other tabular formats.
#[derive(Debug, Clone, PartialEq)]
pub struct ExportedSignal {
/// The entity (content item) that received the signal.
pub entity_id: u64,
/// The signal type name (e.g., "view", "like", "skip").
pub signal_type: String,
/// The signal weight at write time.
pub weight: f32,
/// Nanosecond timestamp when the signal was written.
pub timestamp_ns: u64,
}
impl ExportedSignal {
/// Render as a JSON line (no trailing newline).
#[must_use]
pub fn to_json_line(&self) -> String {
format!(
r#"{{"entity_id":{},"signal_type":"{}","weight":{},"timestamp_ns":{}}}"#,
self.entity_id, self.signal_type, self.weight, self.timestamp_ns
)
}
}
2. WAL scanning implementation
The export reads WAL segment files using the existing reader module. The scan:
- Lists all WAL segment files in the WAL directory
- For each segment, reads all events using
reader::read_segment() - Filters by timestamp range (
since <= timestamp_ns < until) - Filters by signal type (if
signal_typesis non-empty, resolve names to type IDs via the schema and filter) - Collects into
Vec<ExportedSignal> - Applies
limitif set
impl TidalDb {
/// Export signal events from the WAL for offline analysis or RLHF training.
///
/// Reads WAL segment files directly. Does not interfere with the live
/// write path. For large WAL backlogs, use narrow time ranges.
///
/// # Errors
///
/// Returns `TidalError::Internal` if WAL segments cannot be read.
/// Returns `TidalError::Schema(UnknownSignalType)` if a signal type
/// name in the request does not exist in the schema.
pub fn export_signals(&self, request: ExportRequest) -> crate::Result<Vec<ExportedSignal>> {
// Implementation: scan WAL segments, filter, collect
}
}
3. Signal type resolution
The WAL stores signal types as u8 IDs. The export must resolve these back to names using the schema's signal type definitions:
// Build a lookup table: signal_type_id -> signal_type_name
let type_names: HashMap<u8, String> = schema
.signal_types()
.iter()
.enumerate()
.map(|(i, def)| (i as u8, def.name.clone()))
.collect();
// Build a filter set if signal_types is non-empty
let type_filter: HashSet<u8> = if request.signal_types.is_empty() {
HashSet::new() // empty means "accept all"
} else {
request.signal_types.iter()
.map(|name| schema.resolve_signal_type(name)
.ok_or_else(|| TidalError::Schema(SchemaError::UnknownSignalType(name.clone()))))
.collect::<Result<HashSet<u8>, _>>()?
};
4. Module wiring
In tidal/src/db/mod.rs, add:
pub(crate) mod export;
Re-export from tidal/src/lib.rs:
pub use db::export::{ExportFormat, ExportRequest, ExportedSignal};
Acceptance Criteria
ExportFormatenum withJsonLinesvariantExportRequeststruct withuser_id,signal_types,since,until,format,limitfieldsExportRequest::time_range()andExportRequest::signals_in_range()constructorsExportedSignalstruct withentity_id,signal_type,weight,timestamp_nsfieldsExportedSignal::to_json_line()produces valid JSONdb.export_signals(request) -> Result<Vec<ExportedSignal>>scans WAL segments- Time range filtering works correctly (
sinceinclusive,untilexclusive) - Signal type filtering works correctly (empty means all types)
limitparameter caps the number of returned events- Unknown signal type names in
signal_typesreturnSchemaError::UnknownSignalType - Export does not interfere with live WAL writes (read-only scan)
- Types re-exported from
lib.rs cargo clippy -D warningsandcargo fmt --checkpass
Test Strategy
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn exported_signal_json_line_format() {
let sig = ExportedSignal {
entity_id: 42,
signal_type: "view".to_string(),
weight: 1.0,
timestamp_ns: 1_700_000_000_000_000_000,
};
let line = sig.to_json_line();
assert!(line.starts_with('{'));
assert!(line.ends_with('}'));
assert!(line.contains("\"entity_id\":42"));
assert!(line.contains("\"signal_type\":\"view\""));
assert!(line.contains("\"weight\":1"));
assert!(line.contains("\"timestamp_ns\":1700000000000000000"));
}
#[test]
fn export_request_time_range_constructor() {
let req = ExportRequest::time_range(100, 200);
assert_eq!(req.since, Some(100));
assert_eq!(req.until, Some(200));
assert!(req.signal_types.is_empty());
assert!(req.user_id.is_none());
assert!(req.limit.is_none());
assert_eq!(req.format, ExportFormat::JsonLines);
}
#[test]
fn export_request_signals_in_range_constructor() {
let req = ExportRequest::signals_in_range(
vec!["view".into(), "like".into()],
100,
200,
);
assert_eq!(req.signal_types.len(), 2);
assert_eq!(req.since, Some(100));
}
}
Integration test:
#[test]
fn export_signals_returns_written_events() {
let db = make_test_db_with_schema_and_wal();
let t1 = Timestamp::now().as_nanos();
db.signal("view", EntityId::new(1), 1.0, Timestamp::from_nanos(t1)).unwrap();
db.signal("like", EntityId::new(2), 1.0, Timestamp::from_nanos(t1 + 1000)).unwrap();
db.signal("view", EntityId::new(3), 1.0, Timestamp::from_nanos(t1 + 2000)).unwrap();
// Export all
let req = ExportRequest::time_range(t1, t1 + 10_000);
let signals = db.export_signals(req).unwrap();
assert_eq!(signals.len(), 3);
// Export views only
let req = ExportRequest::signals_in_range(vec!["view".into()], t1, t1 + 10_000);
let signals = db.export_signals(req).unwrap();
assert_eq!(signals.len(), 2);
assert!(signals.iter().all(|s| s.signal_type == "view"));
// Export with limit
let mut req = ExportRequest::time_range(t1, t1 + 10_000);
req.limit = Some(1);
let signals = db.export_signals(req).unwrap();
assert_eq!(signals.len(), 1);
}
#[test]
fn export_signals_unknown_type_returns_error() {
let db = make_test_db_with_schema_and_wal();
let req = ExportRequest::signals_in_range(
vec!["nonexistent".into()],
0,
u64::MAX,
);
let result = db.export_signals(req);
assert!(result.is_err());
}
#[test]
fn export_signals_empty_wal_returns_empty() {
let db = make_test_db_with_schema_and_wal();
let req = ExportRequest::time_range(0, u64::MAX);
let signals = db.export_signals(req).unwrap();
assert!(signals.is_empty());
}
#[test]
fn exported_signal_json_lines_output() {
let db = make_test_db_with_schema_and_wal();
let t1 = 1_700_000_000_000_000_000u64;
db.signal("view", EntityId::new(42), 2.5, Timestamp::from_nanos(t1)).unwrap();
let req = ExportRequest::time_range(t1, t1 + 1);
let signals = db.export_signals(req).unwrap();
let line = signals[0].to_json_line();
assert!(line.contains("\"entity_id\":42"));
assert!(line.contains("\"signal_type\":\"view\""));
}