# Task 03: WalShipper ## Delivers `WalShipper` background task in `tidal/src/replication/shipper.rs`. Watches for newly sealed WAL segments in the data directory, ships them to all registered follower shards via `Transport`, and tracks the last-shipped seqno per follower. ## Complexity: M ## Dependencies - Task 01 (Transport trait) - Task 02 (InProcessTransport, needed for tests) ## Technical Design ```rust // tidal/src/replication/shipper.rs /// Polls for newly sealed WAL segments and ships them to followers. /// /// Runs as a background tokio task. Exits when `shutdown_rx` receives. /// Ships to all registered followers in parallel (join_all). pub struct WalShipper { transport: Arc, followers: Vec, data_dir: PathBuf, shard_id: ShardId, poll_interval: Duration, last_shipped: AtomicU64, } impl WalShipper { pub fn new( transport: Arc, followers: Vec, data_dir: PathBuf, shard_id: ShardId, ) -> Self { Self { transport, followers, data_dir, shard_id, poll_interval: Duration::from_secs(2), last_shipped: AtomicU64::new(0), } } /// Start the shipper as a background task. /// /// Returns a handle that can be used to signal shutdown. pub fn start(self: Arc, shutdown_rx: tokio::sync::watch::Receiver) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { self.run(shutdown_rx).await; }) } async fn run(&self, mut shutdown: tokio::sync::watch::Receiver) { let mut interval = tokio::time::interval(self.poll_interval); loop { tokio::select! { _ = interval.tick() => { if let Err(e) = self.ship_pending_segments().await { tracing::warn!("WalShipper: error shipping segments: {e}"); } } Ok(_) = shutdown.changed() => { if *shutdown.borrow() { // Final ship before shutdown let _ = self.ship_pending_segments().await; break; } } } } } async fn ship_pending_segments(&self) -> Result<(), WalError> { let last = self.last_shipped.load(Ordering::Acquire); let segments = list_sealed_segments_since(&self.data_dir, self.shard_id, last)?; for (seqno, path) in segments { let bytes = tokio::fs::read(&path).await?; let payload = WalSegmentPayload { id: WalSegmentId::new( RegionId::SINGLE, // will be populated from NodeConfig in Phase 8.5 self.shard_id, seqno, ), bytes: bytes::Bytes::from(bytes), event_count: 0, // filled from BatchHeader decode }; // Ship to all followers in parallel. let futs: Vec<_> = self.followers.iter() .map(|&follower| { let transport = Arc::clone(&self.transport); let payload = payload.clone(); async move { transport.send_segment(follower, payload).await } }) .collect(); futures::future::join_all(futs).await; self.last_shipped.store(seqno, Ordering::Release); } Ok(()) } } ``` ## Acceptance Criteria - [ ] `WalShipper::start()` spawns a background tokio task - [ ] Shipper polls `data_dir` for sealed segments with seqno > `last_shipped` - [ ] Segments are shipped to all followers in parallel via `Transport::send_segment` - [ ] `last_shipped` is updated after each segment is shipped to all followers - [ ] Shutdown signal causes the shipper to flush pending segments then exit - [ ] Shipper handles transport errors gracefully (logs warning, does not crash) - [ ] Integration test: leader with 10 segments -> shipper delivers all 10 to follower transport - [ ] `cargo clippy -D warnings` and `cargo fmt` pass