use std::path::PathBuf; use std::time::{Duration, Instant}; use crossbeam::channel::Receiver; use super::dedup::DedupWindow; use super::error::WalError; use super::format::{self, EventRecord}; use super::segment::{self, SegmentWriter}; /// Commands sent from `WalHandle` to the writer thread. pub enum WalCommand { /// Append a signal event. The reply channel receives the assigned /// sequence number (or an error) once the batch containing this /// event has been durably fsynced. Append { event: EventRecord, reply: crossbeam::channel::Sender>, }, /// Delete segments whose first sequence number is less than `before_seq`. /// Runs inside the writer thread to avoid racing with concurrent writes. TruncateBefore { before_seq: u64, reply: crossbeam::channel::Sender>, }, /// Graceful shutdown: flush remaining events and exit. Shutdown, } /// Configuration for the group commit writer. pub struct WriterConfig { pub dir: PathBuf, pub segment_size: u64, pub batch_size: usize, pub batch_timeout: Duration, pub dedup_window: Duration, } /// The group commit writer loop. /// /// Runs on a dedicated thread. Receives events via crossbeam channel, /// accumulates them into batches, writes batches to the WAL segment, /// and fsyncs once per batch. Callers are notified of their sequence /// numbers via per-event reply channels. /// /// # Batch formation /// /// 1. Block until the first event arrives. /// 2. Drain additional events from the channel up to `batch_size` or /// until `batch_timeout` elapses (whichever comes first). /// 3. Deduplicate events, encode the batch, write to segment, fsync. /// 4. Send sequence numbers back to all waiting callers. /// /// # Errors /// /// Returns `WalError::Io` on filesystem failure during batch writes or fsync. /// Returns `WalError::Corruption` if batch encoding fails (should not happen /// under normal operation). /// /// # Panics /// /// Panics if the system clock is before the Unix epoch (same as `Timestamp::now()`). // The function exceeds 100 lines due to the shutdown-drain path (B-3 fix). // Extracting a helper would require restructuring the module, which is outside // the scope of these targeted fixes. #[allow(clippy::too_many_lines)] pub fn run_writer( rx: &Receiver, config: &WriterConfig, mut segment: SegmentWriter, start_seq: u64, mut dedup: DedupWindow, ) -> Result<(), WalError> { let mut next_seq = start_seq; let mut batch: Vec<( EventRecord, crossbeam::channel::Sender>, )> = Vec::with_capacity(config.batch_size); let mut shutdown_requested = false; loop { // Block until the first event arrives (or shutdown/disconnect) match rx.recv() { Ok(WalCommand::Append { event, reply }) => { batch.push((event, reply)); } Ok(WalCommand::TruncateBefore { before_seq, reply }) => { let result = segment::delete_segments_before(&config.dir, before_seq); let _ = reply.send(result.map(|_| ())); continue; } Ok(WalCommand::Shutdown) | Err(_) => { break; } } // Drain up to batch_size with deadline let deadline = Instant::now() + config.batch_timeout; while batch.len() < config.batch_size { match rx.recv_deadline(deadline) { Ok(WalCommand::Append { event, reply }) => { batch.push((event, reply)); } Ok(WalCommand::TruncateBefore { before_seq, reply }) => { let result = segment::delete_segments_before(&config.dir, before_seq); let _ = reply.send(result.map(|_| ())); // Continue draining the batch; truncation is a side-effect, // not a batch-terminating event. } Ok(WalCommand::Shutdown) | Err(crossbeam::channel::RecvTimeoutError::Disconnected) => { shutdown_requested = true; break; } Err(crossbeam::channel::RecvTimeoutError::Timeout) => break, } } // Deduplicate and separate into kept events and duplicate replies let mut kept_events: Vec = Vec::with_capacity(batch.len()); let mut kept_replies: Vec>> = Vec::with_capacity(batch.len()); let mut dup_replies: Vec>> = Vec::new(); // drain(..) is intentional: we reuse batch's heap allocation across loop iterations. #[allow(clippy::iter_with_drain)] for (event, reply) in batch.drain(..) { if dedup.is_duplicate(&event) { dup_replies.push(reply); } else { kept_events.push(event); kept_replies.push(reply); } } // Notify duplicate senders with seq=0 (sentinel for dedup). for reply in dup_replies { let _ = reply.send(Ok(0)); } // Write the batch if there are any non-duplicate events if !kept_events.is_empty() { let batch_seq = next_seq; let batch_ts = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .expect("system clock is before Unix epoch") .as_nanos(); #[allow(clippy::cast_possible_truncation)] let batch_ts_u64 = batch_ts as u64; // Wrap the write path in a closure so we can notify callers of // the specific error before propagating it. Without this, an // early `?` return would drop pending reply channels, leaving // callers blocked forever (or receiving a generic Closed error // instead of the real I/O error). let write_result = (|| -> Result { let encoded = format::encode_batch(&kept_events, batch_seq, batch_ts_u64)?; if segment.needs_rotation() { segment.rotate(batch_seq)?; } segment.write_batch_bytes(&encoded)?; segment.sync()?; Ok(batch_seq) })(); match write_result { Ok(_) => { let event_count = kept_events.len() as u64; segment.set_last_seq(batch_seq + event_count - 1); for (i, reply) in kept_replies.into_iter().enumerate() { let _ = reply.send(Ok(batch_seq + i as u64)); } next_seq = batch_seq + event_count; } Err(ref err) => { // Notify all waiting callers with the actual error before // propagating. We cannot clone WalError, so we send a // synthetic I/O error with the same description. let err_msg = err.to_string(); for reply in kept_replies { let _ = reply.send(Err(WalError::Io(std::io::Error::other(err_msg.clone())))); } // write_result is known to be Err here; the Ok branch is // handled above, so this else-branch is unreachable. return Err(write_result .expect_err("write_result is Err in this branch; Ok is handled above")); } } } if shutdown_requested { break; } } // Drain any remaining commands that arrived before senders observed // the shutdown. This ensures in-flight append() calls are not silently // dropped, which would cause callers to block forever or receive // WalError::Closed instead of a real sequence number. let mut final_batch: Vec<( EventRecord, crossbeam::channel::Sender>, )> = Vec::new(); loop { match rx.try_recv() { Ok(WalCommand::Append { event, reply }) => { final_batch.push((event, reply)); } Ok(WalCommand::TruncateBefore { before_seq, reply }) => { let result = segment::delete_segments_before(&config.dir, before_seq); let _ = reply.send(result.map(|_| ())); } Ok(WalCommand::Shutdown) => { // Ignore duplicate shutdown commands } Err( crossbeam::channel::TryRecvError::Empty | crossbeam::channel::TryRecvError::Disconnected, ) => break, } } // Flush the final drain batch if non-empty if !final_batch.is_empty() { let mut kept_events: Vec = Vec::with_capacity(final_batch.len()); let mut kept_replies: Vec>> = Vec::with_capacity(final_batch.len()); let mut dup_replies: Vec>> = Vec::new(); for (event, reply) in final_batch { if dedup.is_duplicate(&event) { dup_replies.push(reply); } else { kept_events.push(event); kept_replies.push(reply); } } for reply in dup_replies { let _ = reply.send(Ok(0)); } if !kept_events.is_empty() { let batch_seq = next_seq; let batch_ts = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .expect("system clock is before Unix epoch") .as_nanos(); #[allow(clippy::cast_possible_truncation)] let batch_ts_u64 = batch_ts as u64; let encoded = format::encode_batch(&kept_events, batch_seq, batch_ts_u64)?; if segment.needs_rotation() { segment.rotate(batch_seq)?; } segment.write_batch_bytes(&encoded)?; segment.sync()?; let event_count = kept_events.len() as u64; segment.set_last_seq(batch_seq + event_count - 1); for (i, reply) in kept_replies.into_iter().enumerate() { let _ = reply.send(Ok(batch_seq + i as u64)); } } } // Final sync before exit segment.sync()?; Ok(()) } #[cfg(test)] mod tests { use super::*; use crossbeam::channel::bounded; fn make_event(id: u64) -> EventRecord { EventRecord { entity_id: id, signal_type: 1, weight: 1.0, timestamp_nanos: 1_000_000_000, } } #[test] fn writer_processes_single_event() { let dir = tempfile::tempdir().expect("tempdir creation should succeed"); let (tx, rx) = bounded(100); let segment = SegmentWriter::open(dir.path(), 1, 16 * 1024 * 1024).expect("open should succeed"); let dedup = DedupWindow::new(Duration::from_secs(30)); let config = WriterConfig { dir: dir.path().to_path_buf(), segment_size: 16 * 1024 * 1024, batch_size: 100, batch_timeout: Duration::from_millis(10), dedup_window: Duration::from_secs(30), }; let (reply_tx, reply_rx) = bounded(1); tx.send(WalCommand::Append { event: make_event(42), reply: reply_tx, }) .expect("send should succeed"); tx.send(WalCommand::Shutdown).expect("send should succeed"); let handle = std::thread::spawn(move || run_writer(&rx, &config, segment, 1, dedup)); let seq = reply_rx .recv() .expect("should receive reply") .expect("should be ok"); assert_eq!(seq, 1); handle .join() .expect("thread should join") .expect("writer should succeed"); } #[test] fn writer_deduplicates_events() { let dir = tempfile::tempdir().expect("tempdir creation should succeed"); let (tx, rx) = bounded(100); let segment = SegmentWriter::open(dir.path(), 1, 16 * 1024 * 1024).expect("open should succeed"); let dedup = DedupWindow::new(Duration::from_secs(30)); let config = WriterConfig { dir: dir.path().to_path_buf(), segment_size: 16 * 1024 * 1024, batch_size: 100, batch_timeout: Duration::from_millis(10), dedup_window: Duration::from_secs(30), }; let event = make_event(42); let (reply_tx1, reply_rx1) = bounded(1); let (reply_tx2, reply_rx2) = bounded(1); tx.send(WalCommand::Append { event: event.clone(), reply: reply_tx1, }) .expect("send should succeed"); tx.send(WalCommand::Append { event, reply: reply_tx2, }) .expect("send should succeed"); tx.send(WalCommand::Shutdown).expect("send should succeed"); let handle = std::thread::spawn(move || run_writer(&rx, &config, segment, 1, dedup)); let seq1 = reply_rx1 .recv() .expect("should receive") .expect("should be ok"); let seq2 = reply_rx2 .recv() .expect("should receive") .expect("should be ok"); assert_eq!(seq1, 1); assert_eq!(seq2, 0); // deduplicated handle .join() .expect("thread should join") .expect("writer should succeed"); } #[test] fn writer_handles_channel_disconnect() { let dir = tempfile::tempdir().expect("tempdir creation should succeed"); let (tx, rx) = bounded(100); let segment = SegmentWriter::open(dir.path(), 1, 16 * 1024 * 1024).expect("open should succeed"); let dedup = DedupWindow::new(Duration::from_secs(30)); let config = WriterConfig { dir: dir.path().to_path_buf(), segment_size: 16 * 1024 * 1024, batch_size: 100, batch_timeout: Duration::from_millis(10), dedup_window: Duration::from_secs(30), }; drop(tx); // Disconnect immediately let result = run_writer(&rx, &config, segment, 1, dedup); assert!(result.is_ok()); } #[test] fn writer_assigns_monotonic_sequences() { let dir = tempfile::tempdir().expect("tempdir creation should succeed"); let (tx, rx) = bounded(100); let segment = SegmentWriter::open(dir.path(), 1, 16 * 1024 * 1024).expect("open should succeed"); let dedup = DedupWindow::new(Duration::from_secs(30)); let config = WriterConfig { dir: dir.path().to_path_buf(), segment_size: 16 * 1024 * 1024, batch_size: 100, batch_timeout: Duration::from_millis(10), dedup_window: Duration::from_secs(30), }; let mut reply_rxs = Vec::new(); for i in 0..5 { let (reply_tx, reply_rx) = bounded(1); tx.send(WalCommand::Append { event: make_event(i), reply: reply_tx, }) .expect("send should succeed"); reply_rxs.push(reply_rx); } tx.send(WalCommand::Shutdown).expect("send should succeed"); let handle = std::thread::spawn(move || run_writer(&rx, &config, segment, 1, dedup)); let mut seqs = Vec::new(); for reply_rx in reply_rxs { let seq = reply_rx .recv() .expect("should receive") .expect("should be ok"); seqs.push(seq); } // Verify monotonically increasing for window in seqs.windows(2) { assert!(window[0] < window[1], "seqs not monotonic: {seqs:?}"); } assert_eq!(seqs[0], 1); handle .join() .expect("thread should join") .expect("writer should succeed"); } }