tidaldb/docs/planning/milestone-8/phase-2/task-02-in-process-transport.md
jordan f4cfd6c81f feat: complete M8 replication primitives + forage enhancements + docs
Milestone 8 (phases 1-4):
- Shard-aware WAL segment naming, BatchHeader v2, ShardRouter
- Transport trait, InProcessTransport, WalShipper, FollowerDb
- HLC, PNCounter, LWWRegister, CrdtSignalState, ReconciliationEngine
- Session replication bridge with SeqNo/HWM, idempotency store

Forage application:
- Multi-source discovery engine with MAB exploration
- Embedding-based label system, server handlers, UI refresh

Other:
- QUICKSTART.md, README.md, milestone-8 planning docs
- Hard negative union semantics, RLHF export enhancements
- Recovery benchmark and visibility test expansions
- Split 8 oversized source files per CODING_GUIDELINES §9

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 13:17:19 -07:00

4.4 KiB

Task 02: InProcessTransport

Delivers

Implement InProcessTransport using tokio::sync::mpsc::Sender/Receiver pairs in tidal/src/replication/in_process.rs. One channel per (leader, follower) pair. Used exclusively in tests -- never in production code.

Complexity: S

Dependencies

  • Task 01 (Transport trait, WalSegmentPayload)

Technical Design

// tidal/src/replication/in_process.rs

use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;

use crate::replication::transport::{Transport, TransportError, WalSegmentPayload};
use crate::replication::ShardId;

/// Bounded channel capacity for in-process segment delivery.
const DEFAULT_CHANNEL_CAPACITY: usize = 256;

/// In-process WAL segment transport for testing.
///
/// Creates a mesh of mpsc channels between shards. Each shard has
/// a sender map (shard -> Sender) and a single receiver.
///
/// Usage:
/// ```rust
/// let factory = InProcessTransportFactory::new();
/// let leader_transport = factory.create(ShardId(0));
/// let follower_transport = factory.create(ShardId(1));
/// factory.connect(ShardId(0), ShardId(1)); // leader can send to follower
/// ```
pub struct InProcessTransportFactory {
    senders: Arc<Mutex<HashMap<ShardId, HashMap<ShardId, mpsc::Sender<WalSegmentPayload>>>>>,
    receivers: Arc<Mutex<HashMap<ShardId, mpsc::Receiver<WalSegmentPayload>>>>,
    capacity: usize,
}

impl InProcessTransportFactory {
    pub fn new() -> Self {
        Self {
            senders: Arc::new(Mutex::new(HashMap::new())),
            receivers: Arc::new(Mutex::new(HashMap::new())),
            capacity: DEFAULT_CHANNEL_CAPACITY,
        }
    }

    pub fn with_capacity(mut self, capacity: usize) -> Self {
        self.capacity = capacity;
        self
    }

    /// Create a transport endpoint for `shard_id`.
    pub fn create(&self, shard_id: ShardId) -> Arc<InProcessTransport> {
        let (tx, rx) = mpsc::channel(self.capacity);
        let mut senders = self.senders.lock().unwrap();
        let mut receivers = self.receivers.lock().unwrap();
        senders.entry(shard_id).or_default();
        receivers.insert(shard_id, rx);

        Arc::new(InProcessTransport {
            local: shard_id,
            senders: Arc::clone(&self.senders),
            receiver: Mutex::new(Some(rx)),
        })
    }

    /// Wire a one-way connection: `from` can send to `to`.
    pub fn connect(&self, from: ShardId, to: ShardId) {
        let (tx, rx) = mpsc::channel(self.capacity);
        self.senders
            .lock()
            .unwrap()
            .entry(from)
            .or_default()
            .insert(to, tx);
        // Store the receiver in the `to` shard's transport.
        // (Implementation detail: injects directly into the transport's receiver field)
    }
}

pub struct InProcessTransport {
    local: ShardId,
    senders: Arc<Mutex<HashMap<ShardId, HashMap<ShardId, mpsc::Sender<WalSegmentPayload>>>>>,
    receiver: Mutex<Option<mpsc::Receiver<WalSegmentPayload>>>,
}

#[async_trait::async_trait]
impl Transport for InProcessTransport {
    async fn send_segment(
        &self,
        to_shard: ShardId,
        payload: WalSegmentPayload,
    ) -> Result<(), TransportError> {
        let sender = {
            let senders = self.senders.lock().unwrap();
            senders
                .get(&self.local)
                .and_then(|map| map.get(&to_shard))
                .cloned()
                .ok_or(TransportError::UnknownPeer(to_shard))?
        };
        sender
            .send(payload)
            .await
            .map_err(|_| TransportError::Closed)
    }

    async fn recv_segment(&self) -> Option<WalSegmentPayload> {
        let mut guard = self.receiver.lock().unwrap();
        if let Some(rx) = guard.as_mut() {
            rx.recv().await
        } else {
            None
        }
    }

    fn local_shard(&self) -> ShardId {
        self.local
    }
}

Acceptance Criteria

  • InProcessTransportFactory::create(shard_id) returns a transport endpoint for that shard
  • send_segment delivers the payload to the receiver's channel
  • recv_segment returns None when all senders are dropped (channel closed)
  • send_segment to an unregistered peer returns TransportError::UnknownPeer
  • Concurrent sends from multiple tasks are safe (mpsc semantics)
  • Unit test: send 100 segments from one transport, receive all 100 on another
  • cargo clippy -D warnings and cargo fmt pass