Milestone 8 (phases 1-4): - Shard-aware WAL segment naming, BatchHeader v2, ShardRouter - Transport trait, InProcessTransport, WalShipper, FollowerDb - HLC, PNCounter, LWWRegister, CrdtSignalState, ReconciliationEngine - Session replication bridge with SeqNo/HWM, idempotency store Forage application: - Multi-source discovery engine with MAB exploration - Embedding-based label system, server handlers, UI refresh Other: - QUICKSTART.md, README.md, milestone-8 planning docs - Hard negative union semantics, RLHF export enhancements - Recovery benchmark and visibility test expansions - Split 8 oversized source files per CODING_GUIDELINES §9 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
4.4 KiB
4.4 KiB
Task 02: InProcessTransport
Delivers
Implement InProcessTransport using tokio::sync::mpsc::Sender/Receiver pairs in tidal/src/replication/in_process.rs. One channel per (leader, follower) pair. Used exclusively in tests -- never in production code.
Complexity: S
Dependencies
- Task 01 (Transport trait, WalSegmentPayload)
Technical Design
// tidal/src/replication/in_process.rs
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;
use crate::replication::transport::{Transport, TransportError, WalSegmentPayload};
use crate::replication::ShardId;
/// Bounded channel capacity for in-process segment delivery.
const DEFAULT_CHANNEL_CAPACITY: usize = 256;
/// In-process WAL segment transport for testing.
///
/// Creates a mesh of mpsc channels between shards. Each shard has
/// a sender map (shard -> Sender) and a single receiver.
///
/// Usage:
/// ```rust
/// let factory = InProcessTransportFactory::new();
/// let leader_transport = factory.create(ShardId(0));
/// let follower_transport = factory.create(ShardId(1));
/// factory.connect(ShardId(0), ShardId(1)); // leader can send to follower
/// ```
pub struct InProcessTransportFactory {
senders: Arc<Mutex<HashMap<ShardId, HashMap<ShardId, mpsc::Sender<WalSegmentPayload>>>>>,
receivers: Arc<Mutex<HashMap<ShardId, mpsc::Receiver<WalSegmentPayload>>>>,
capacity: usize,
}
impl InProcessTransportFactory {
pub fn new() -> Self {
Self {
senders: Arc::new(Mutex::new(HashMap::new())),
receivers: Arc::new(Mutex::new(HashMap::new())),
capacity: DEFAULT_CHANNEL_CAPACITY,
}
}
pub fn with_capacity(mut self, capacity: usize) -> Self {
self.capacity = capacity;
self
}
/// Create a transport endpoint for `shard_id`.
pub fn create(&self, shard_id: ShardId) -> Arc<InProcessTransport> {
let (tx, rx) = mpsc::channel(self.capacity);
let mut senders = self.senders.lock().unwrap();
let mut receivers = self.receivers.lock().unwrap();
senders.entry(shard_id).or_default();
receivers.insert(shard_id, rx);
Arc::new(InProcessTransport {
local: shard_id,
senders: Arc::clone(&self.senders),
receiver: Mutex::new(Some(rx)),
})
}
/// Wire a one-way connection: `from` can send to `to`.
pub fn connect(&self, from: ShardId, to: ShardId) {
let (tx, rx) = mpsc::channel(self.capacity);
self.senders
.lock()
.unwrap()
.entry(from)
.or_default()
.insert(to, tx);
// Store the receiver in the `to` shard's transport.
// (Implementation detail: injects directly into the transport's receiver field)
}
}
pub struct InProcessTransport {
local: ShardId,
senders: Arc<Mutex<HashMap<ShardId, HashMap<ShardId, mpsc::Sender<WalSegmentPayload>>>>>,
receiver: Mutex<Option<mpsc::Receiver<WalSegmentPayload>>>,
}
#[async_trait::async_trait]
impl Transport for InProcessTransport {
async fn send_segment(
&self,
to_shard: ShardId,
payload: WalSegmentPayload,
) -> Result<(), TransportError> {
let sender = {
let senders = self.senders.lock().unwrap();
senders
.get(&self.local)
.and_then(|map| map.get(&to_shard))
.cloned()
.ok_or(TransportError::UnknownPeer(to_shard))?
};
sender
.send(payload)
.await
.map_err(|_| TransportError::Closed)
}
async fn recv_segment(&self) -> Option<WalSegmentPayload> {
let mut guard = self.receiver.lock().unwrap();
if let Some(rx) = guard.as_mut() {
rx.recv().await
} else {
None
}
}
fn local_shard(&self) -> ShardId {
self.local
}
}
Acceptance Criteria
InProcessTransportFactory::create(shard_id)returns a transport endpoint for that shardsend_segmentdelivers the payload to the receiver's channelrecv_segmentreturnsNonewhen all senders are dropped (channel closed)send_segmentto an unregistered peer returnsTransportError::UnknownPeer- Concurrent sends from multiple tasks are safe (mpsc semantics)
- Unit test: send 100 segments from one transport, receive all 100 on another
cargo clippy -D warningsandcargo fmtpass