diff --git a/tidal-server/Cargo.toml b/tidal-server/Cargo.toml new file mode 100644 index 0000000..e80dcbe --- /dev/null +++ b/tidal-server/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "tidal-server" +version = "0.1.0" +edition.workspace = true +rust-version.workspace = true +license.workspace = true + +[dependencies] +axum = "0.8" +clap = { version = "4.5", features = ["derive"] } +serde = { version = "1", features = ["derive"] } +serde_json = "1" +serde_yaml = "0.9" +thiserror = "2" +tokio = { version = "1", features = ["macros", "rt-multi-thread", "signal"] } +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +tidaldb = { path = "../tidal", features = ["test-utils"] } diff --git a/tidal-server/config/default-cluster.yaml b/tidal-server/config/default-cluster.yaml new file mode 100644 index 0000000..af012fa --- /dev/null +++ b/tidal-server/config/default-cluster.yaml @@ -0,0 +1,5 @@ +regions: + - name: us-east + - name: eu-west + - name: ap-south +leader: us-east diff --git a/tidal-server/config/default-schema.yaml b/tidal-server/config/default-schema.yaml new file mode 100644 index 0000000..3ad7f83 --- /dev/null +++ b/tidal-server/config/default-schema.yaml @@ -0,0 +1,29 @@ +signals: + - name: view + entity: item + decay: + exponential: + half_life_seconds: 604800 # 7 days + windows: [one_hour, twenty_four_hours, seven_days] + velocity: true + - name: like + entity: item + decay: + exponential: + half_life_seconds: 1209600 # 14 days + windows: [twenty_four_hours, seven_days, thirty_days, all_time] + velocity: false + - name: skip + entity: item + decay: + permanent: true + velocity: false +text_fields: + - name: title + kind: text + - name: category + kind: keyword +embedding_slots: + - name: content_vector + entity: item + dimensions: 128 diff --git a/tidal-server/src/config.rs b/tidal-server/src/config.rs new file mode 100644 index 0000000..3f9b047 --- /dev/null +++ b/tidal-server/src/config.rs @@ -0,0 +1,226 @@ +use std::collections::HashSet; +use std::fs; +use std::path::Path; +use std::time::Duration; + +use serde::Deserialize; +use tidaldb::schema::{DecaySpec, EntityKind, Schema, SchemaBuilder, TextFieldType, Window}; + +use crate::error::{Result, ServerError}; + +const DEFAULT_SCHEMA_YAML: &str = include_str!("../config/default-schema.yaml"); +const DEFAULT_CLUSTER_YAML: &str = include_str!("../config/default-cluster.yaml"); + +#[derive(Debug)] +pub struct ClusterLayout { + pub regions: Vec, + pub leader: String, +} + +pub fn load_schema(path: Option<&Path>) -> Result { + let raw = read_config(path, DEFAULT_SCHEMA_YAML)?; + let spec: SchemaSpec = serde_yaml::from_str(&raw) + .map_err(|e| ServerError::SchemaConfig(format!("parse schema yaml: {e}")))?; + + if spec.signals.is_empty() { + return Err(ServerError::SchemaConfig( + "at least one signal must be defined".into(), + )); + } + + let mut builder = SchemaBuilder::new(); + for signal in spec.signals { + let mut sig = builder.signal( + &signal.name, + parse_entity_kind(&signal.entity)?, + signal.decay.to_decay_spec()?, + ); + if let Some(windows) = signal.windows { + let parsed: Result> = windows.iter().map(|w| parse_window(w)).collect(); + sig = sig.windows(&parsed?); + } + if let Some(velocity) = signal.velocity { + sig = sig.velocity(velocity); + } + let _ = sig.add(); + } + + if let Some(text_fields) = spec.text_fields { + for field in text_fields { + builder.text_field(&field.name, parse_text_field_type(&field.kind)?); + } + } + + if let Some(embeddings) = spec.embedding_slots { + for slot in embeddings { + builder.embedding_slot( + &slot.name, + parse_entity_kind(&slot.entity)?, + slot.dimensions, + ); + } + } + + builder.build().map_err(ServerError::SchemaBuild) +} + +pub fn load_cluster_layout(path: Option<&Path>) -> Result { + let raw = read_config(path, DEFAULT_CLUSTER_YAML)?; + let spec: ClusterSpec = serde_yaml::from_str(&raw) + .map_err(|e| ServerError::ClusterConfig(format!("parse cluster yaml: {e}")))?; + + if spec.regions.is_empty() { + return Err(ServerError::ClusterConfig( + "cluster config must include at least one region".into(), + )); + } + + let mut seen = HashSet::new(); + for region in &spec.regions { + if !seen.insert(region.name.to_lowercase()) { + return Err(ServerError::ClusterConfig(format!( + "duplicate region name: {}", + region.name + ))); + } + } + + if !seen.contains(&spec.leader.to_lowercase()) { + return Err(ServerError::ClusterConfig(format!( + "leader '{}' not found in region list", + spec.leader + ))); + } + + Ok(ClusterLayout { + regions: spec.regions.into_iter().map(|r| r.name).collect(), + leader: spec.leader, + }) +} + +fn read_config(path: Option<&Path>, fallback: &str) -> Result { + match path { + Some(p) => fs::read_to_string(p).map_err(|e| ServerError::io(p, e)), + None => Ok(fallback.to_string()), + } +} + +#[derive(Debug, Deserialize)] +struct SchemaSpec { + signals: Vec, + #[serde(default)] + text_fields: Option>, + #[serde(default)] + embedding_slots: Option>, +} + +#[derive(Debug, Deserialize)] +struct SignalSpec { + name: String, + entity: String, + decay: DecaySpecConfig, + #[serde(default)] + windows: Option>, + #[serde(default)] + velocity: Option, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "snake_case")] +struct DecaySpecConfig { + #[serde(default)] + exponential: Option, + #[serde(default)] + linear: Option, + #[serde(default)] + permanent: Option, +} + +impl DecaySpecConfig { + fn to_decay_spec(&self) -> Result { + if let Some(exp) = &self.exponential { + return Ok(DecaySpec::Exponential { + half_life: Duration::from_secs_f64(exp.half_life_seconds), + }); + } + if let Some(linear) = &self.linear { + return Ok(DecaySpec::Linear { + lifetime: Duration::from_secs_f64(linear.lifetime_seconds), + }); + } + if self.permanent.unwrap_or(false) { + return Ok(DecaySpec::Permanent); + } + Err(ServerError::SchemaConfig( + "decay must specify exponential, linear, or permanent".into(), + )) + } +} + +#[derive(Debug, Deserialize)] +struct ExponentialDecay { + half_life_seconds: f64, +} + +#[derive(Debug, Deserialize)] +struct LinearDecay { + lifetime_seconds: f64, +} + +#[derive(Debug, Deserialize)] +struct TextFieldSpec { + name: String, + kind: String, +} + +#[derive(Debug, Deserialize)] +struct EmbeddingSpec { + name: String, + entity: String, + dimensions: usize, +} + +#[derive(Debug, Deserialize)] +struct ClusterSpec { + regions: Vec, + leader: String, +} + +#[derive(Debug, Deserialize)] +struct RegionSpec { + name: String, +} + +fn parse_entity_kind(input: &str) -> Result { + match input.trim().to_lowercase().as_str() { + "item" | "items" => Ok(EntityKind::Item), + "user" | "users" => Ok(EntityKind::User), + "creator" | "creators" => Ok(EntityKind::Creator), + other => Err(ServerError::SchemaConfig(format!( + "unknown entity kind '{other}'" + ))), + } +} + +fn parse_window(input: &str) -> Result { + match input.trim().to_lowercase().as_str() { + "one_hour" | "1h" => Ok(Window::OneHour), + "twenty_four_hours" | "24h" => Ok(Window::TwentyFourHours), + "seven_days" | "7d" => Ok(Window::SevenDays), + "thirty_days" | "30d" => Ok(Window::ThirtyDays), + "all_time" | "alltime" => Ok(Window::AllTime), + other => Err(ServerError::SchemaConfig(format!( + "unknown window '{other}'" + ))), + } +} + +fn parse_text_field_type(input: &str) -> Result { + match input.trim().to_lowercase().as_str() { + "text" => Ok(TextFieldType::Text), + "keyword" => Ok(TextFieldType::Keyword), + other => Err(ServerError::SchemaConfig(format!( + "unknown text field type '{other}'" + ))), + } +} diff --git a/tidal-server/src/error.rs b/tidal-server/src/error.rs new file mode 100644 index 0000000..0c51d1a --- /dev/null +++ b/tidal-server/src/error.rs @@ -0,0 +1,35 @@ +use std::path::PathBuf; + +use thiserror::Error; + +pub type Result = std::result::Result; + +#[derive(Debug, Error)] +pub enum ServerError { + #[error("failed to read {path}: {source}")] + Io { + path: PathBuf, + source: std::io::Error, + }, + #[error("invalid schema config: {0}")] + SchemaConfig(String), + #[error("schema build failed: {0}")] + SchemaBuild(#[from] tidaldb::schema::SchemaError), + #[error("invalid cluster config: {0}")] + ClusterConfig(String), + #[error("tidalDB error: {0}")] + Tidal(#[from] tidaldb::TidalError), + #[error("http server error: {0}")] + Http(#[from] std::io::Error), + #[error("bad request: {0}")] + BadRequest(String), +} + +impl ServerError { + pub fn io(path: impl Into, source: std::io::Error) -> Self { + Self::Io { + path: path.into(), + source, + } + } +} diff --git a/tidal-server/src/main.rs b/tidal-server/src/main.rs new file mode 100644 index 0000000..d31908b --- /dev/null +++ b/tidal-server/src/main.rs @@ -0,0 +1,120 @@ +mod config; +mod error; +mod router; +mod state; + +use std::net::SocketAddr; +use std::path::PathBuf; +use std::sync::Arc; + +use clap::{Args, Parser, Subcommand}; +use tidaldb::TidalDb; + +use crate::config::{load_cluster_layout, load_schema}; +use crate::error::{Result, ServerError}; +use crate::router::build_router; +use crate::state::ServerState; + +#[derive(Parser)] +#[command(version, about = "HTTP wrapper for tidalDB")] +struct Cli { + #[command(subcommand)] + mode: Command, +} + +#[derive(Subcommand)] +enum Command { + #[command(about = "Run a single-node server wrapping one tidalDB instance")] + Standalone(StandaloneArgs), + #[command(about = "Run the simulated multi-region cluster server")] + Cluster(ClusterArgs), +} + +#[derive(Args)] +struct StandaloneArgs { + #[arg(long, default_value = "127.0.0.1:9400")] + listen: String, + #[arg(long)] + schema: Option, + #[arg(long)] + data_dir: Option, +} + +#[derive(Args)] +struct ClusterArgs { + #[arg(long, default_value = "0.0.0.0:9500")] + listen: String, + #[arg(long)] + schema: Option, + #[arg(long)] + topology: Option, +} + +#[tokio::main] +async fn main() { + if let Err(err) = run().await { + eprintln!("error: {err}"); + std::process::exit(1); + } +} + +async fn run() -> Result<()> { + let cli = Cli::parse(); + init_tracing(); + + match cli.mode { + Command::Standalone(args) => run_standalone(args).await, + Command::Cluster(args) => run_cluster(args).await, + } +} + +fn init_tracing() { + let env_filter = std::env::var("TIDAL_SERVER_LOG").unwrap_or_else(|_| "info".into()); + let _ = tracing_subscriber::fmt() + .with_env_filter(env_filter) + .try_init(); +} + +async fn run_standalone(args: StandaloneArgs) -> Result<()> { + let schema = load_schema(args.schema.as_deref())?; + + let mut builder = TidalDb::builder().with_schema(schema.clone()); + if let Some(dir) = args.data_dir { + builder = builder.with_data_dir(dir); + } else { + builder = builder.ephemeral(); + } + + let db = builder.open()?; + let state = ServerState::standalone(db); + serve(state, &args.listen).await +} + +async fn run_cluster(args: ClusterArgs) -> Result<()> { + let schema = load_schema(args.schema.as_deref())?; + let layout = load_cluster_layout(args.topology.as_deref())?; + let state = ServerState::cluster(schema, layout)?; + serve(state, &args.listen).await +} + +async fn serve(state: ServerState, addr: &str) -> Result<()> { + let socket: SocketAddr = addr + .parse() + .map_err(|e| ServerError::BadRequest(format!("invalid addr: {e}")))?; + + let listener = tokio::net::TcpListener::bind(socket).await?; + let actual = listener.local_addr()?; + tracing::info!("listening on http://{actual}"); + + axum::serve(listener, build_router(Arc::new(state))) + .with_graceful_shutdown(shutdown_signal()) + .await?; + Ok(()) +} + +async fn shutdown_signal() { + if let Err(err) = tokio::signal::ctrl_c().await { + tracing::warn!("ctrl-c handler error: {err}"); + } + tracing::info!("shutdown signal received"); +} diff --git a/tidal-server/src/router.rs b/tidal-server/src/router.rs new file mode 100644 index 0000000..d4f792f --- /dev/null +++ b/tidal-server/src/router.rs @@ -0,0 +1,351 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use axum::Json; +use axum::Router; +use axum::extract::{Query, State}; +use axum::http::StatusCode; +use axum::response::{IntoResponse, Response}; +use axum::routing::{get, post}; +use serde::{Deserialize, Serialize}; +use tidaldb::query::retrieve::Retrieve; +use tidaldb::query::search::Search; +use tidaldb::schema::EntityId; + +use crate::error::{Result, ServerError}; +use crate::state::{ClusterHealth, ServerState}; + +pub fn build_router(state: Arc) -> Router { + let mut app = Router::new() + .route("/items", post(create_item)) + .route("/embeddings", post(write_embedding)) + .route("/signals", post(write_signal)) + .route("/feed", get(feed)) + .route("/search", get(search)) + .route("/health", get(health)); + + if state.is_cluster() { + app = app + .route("/cluster/status", get(cluster_status)) + .route("/cluster/promote", post(promote_leader)) + .route("/cluster/partition", post(partition_region)) + .route("/cluster/heal", post(heal_region)); + } + + app.with_state(state) +} + +#[derive(Deserialize)] +struct ItemRequest { + entity_id: u64, + metadata: HashMap, +} + +async fn create_item( + State(state): State>, + Json(req): Json, +) -> Result { + state + .write_item(EntityId::new(req.entity_id), &req.metadata) + .map_err(AppError)?; + Ok(StatusCode::CREATED) +} + +#[derive(Deserialize)] +struct EmbeddingRequest { + entity_id: u64, + values: Vec, +} + +async fn write_embedding( + State(state): State>, + Json(req): Json, +) -> Result { + state + .write_embedding(EntityId::new(req.entity_id), &req.values) + .map_err(AppError)?; + Ok(StatusCode::NO_CONTENT) +} + +#[derive(Deserialize)] +struct SignalRequest { + entity_id: u64, + signal: String, + weight: f64, + #[serde(default)] + user_id: Option, + #[serde(default)] + creator_id: Option, +} + +async fn write_signal( + State(state): State>, + Json(req): Json, +) -> Result { + state + .signal( + &req.signal, + EntityId::new(req.entity_id), + req.weight, + req.user_id, + req.creator_id, + ) + .map_err(AppError)?; + Ok(StatusCode::NO_CONTENT) +} + +#[derive(Deserialize)] +struct FeedQuery { + #[serde(default)] + user_id: Option, + #[serde(default = "default_profile")] + profile: String, + #[serde(default = "default_limit")] + limit: u32, + #[serde(default)] + region: Option, +} + +fn default_profile() -> String { + "for_you".into() +} + +fn default_limit() -> u32 { + 20 +} + +#[derive(Serialize)] +struct FeedResponse { + items: Vec, + total_candidates: usize, + region: Option, +} + +#[derive(Serialize)] +struct FeedItem { + entity_id: u64, + score: f64, + rank: usize, + #[serde(skip_serializing_if = "Option::is_none")] + signals: Option>, +} + +#[derive(Serialize)] +struct SignalValue { + name: String, + value: f64, +} + +async fn feed( + State(state): State>, + Query(query): Query, +) -> Result, AppError> { + let mut builder = Retrieve::builder() + .profile(&query.profile) + .limit(query.limit as usize); + + if let Some(user_id) = query.user_id { + builder = builder.for_user(user_id); + } + let retrieve = builder.build().map_err(|e| TidalErrorWrapper(e.into()))?; + + let result = state + .retrieve(query.region.as_deref(), &retrieve) + .map_err(AppError)?; + + let mut items = Vec::with_capacity(result.items.len()); + for item in result.items { + let signals = if item.signals.is_empty() { + None + } else { + Some( + item.signals + .iter() + .map(|s| SignalValue { + name: s.name.clone(), + value: s.value, + }) + .collect(), + ) + }; + items.push(FeedItem { + entity_id: item.entity_id.as_u64(), + score: item.score, + rank: item.rank, + signals, + }); + } + + Ok(Json(FeedResponse { + items, + total_candidates: result.total_candidates, + region: query.region, + })) +} + +#[derive(Deserialize)] +struct SearchQueryParams { + query: String, + #[serde(default)] + user_id: Option, + #[serde(default = "default_limit")] + limit: u32, + #[serde(default)] + region: Option, +} + +#[derive(Serialize)] +struct SearchResponse { + items: Vec, + total_candidates: usize, + region: Option, +} + +#[derive(Serialize)] +struct SearchItem { + entity_id: u64, + score: f64, + rank: usize, + #[serde(skip_serializing_if = "Option::is_none")] + bm25_score: Option, + #[serde(skip_serializing_if = "Option::is_none")] + semantic_score: Option, +} + +async fn search( + State(state): State>, + Query(query): Query, +) -> Result, AppError> { + let mut builder = Search::builder() + .query(&query.query) + .limit(query.limit); + if let Some(user_id) = query.user_id { + builder = builder.for_user(user_id); + } + let search = builder.build().map_err(|e| TidalErrorWrapper(e.into()))?; + let result = state + .search(query.region.as_deref(), &search) + .map_err(AppError)?; + + let items = result + .items + .into_iter() + .map(|item| SearchItem { + entity_id: item.entity_id.as_u64(), + score: item.score, + rank: item.rank, + bm25_score: item.bm25_score.map(f64::from), + semantic_score: item.semantic_score.map(f64::from), + }) + .collect(); + + Ok(Json(SearchResponse { + items, + total_candidates: result.total_candidates, + region: query.region, + })) +} + +#[derive(Serialize)] +struct HealthResponse { + status: &'static str, + mode: &'static str, + items: u64, +} + +async fn health( + State(state): State>, + Query(query): Query>, +) -> Result, AppError> { + let region = query.get("region").map(|s| s.as_str()); + let items = state.item_count(region).map_err(AppError)?; + let mode = if state.is_cluster() { + "cluster" + } else { + "standalone" + }; + Ok(Json(HealthResponse { + status: "ok", + mode, + items, + })) +} + +async fn cluster_status( + State(state): State>, +) -> Result, AppError> { + state + .cluster_status() + .map(Json) + .ok_or_else(|| AppError(ServerError::BadRequest("not in cluster mode".into()))) +} + +#[derive(Deserialize)] +struct RegionCommand { + region: String, +} + +async fn promote_leader( + State(state): State>, + Json(cmd): Json, +) -> Result { + state.promote_leader(&cmd.region).map_err(AppError)?; + Ok(StatusCode::NO_CONTENT) +} + +async fn partition_region( + State(state): State>, + Json(cmd): Json, +) -> Result { + state.partition_region(&cmd.region).map_err(AppError)?; + Ok(StatusCode::NO_CONTENT) +} + +async fn heal_region( + State(state): State>, + Json(cmd): Json, +) -> Result { + state.heal_region(&cmd.region).map_err(AppError)?; + Ok(StatusCode::NO_CONTENT) +} + +struct TidalErrorWrapper(tidaldb::TidalError); + +impl From for AppError { + fn from(value: TidalErrorWrapper) -> Self { + AppError(ServerError::Tidal(value.0)) + } +} + +struct AppError(ServerError); + +impl IntoResponse for AppError { + fn into_response(self) -> Response { + let status = status_from_error(&self.0); + let body = serde_json::json!({ + "error": self.0.to_string() + }); + (status, Json(body)).into_response() + } +} + +fn status_from_error(err: &ServerError) -> StatusCode { + match err { + ServerError::BadRequest(_) + | ServerError::SchemaConfig(_) + | ServerError::ClusterConfig(_) => StatusCode::BAD_REQUEST, + ServerError::Tidal(tidal_err) => match tidal_err { + tidaldb::TidalError::NotFound { .. } => StatusCode::NOT_FOUND, + tidaldb::TidalError::Schema(_) | tidaldb::TidalError::InvalidInput(_) => { + StatusCode::BAD_REQUEST + } + tidaldb::TidalError::Backpressure { .. } | tidaldb::TidalError::RateLimited { .. } => { + StatusCode::TOO_MANY_REQUESTS + } + tidaldb::TidalError::PolicyViolation { .. } + | tidaldb::TidalError::SessionExpired { .. } => StatusCode::FORBIDDEN, + _ => StatusCode::INTERNAL_SERVER_ERROR, + }, + _ => StatusCode::INTERNAL_SERVER_ERROR, + } +} diff --git a/tidal-server/src/state.rs b/tidal-server/src/state.rs new file mode 100644 index 0000000..f643797 --- /dev/null +++ b/tidal-server/src/state.rs @@ -0,0 +1,315 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use tidaldb::query::retrieve::Retrieve; +use tidaldb::query::search::Search; +use tidaldb::schema::{EntityId, Schema}; +use tidaldb::replication::RegionId; +use tidaldb::testing::cluster::SimulatedCluster; +use tidaldb::TidalDb; + +use crate::config::ClusterLayout; +use crate::error::{Result, ServerError}; + +#[derive(Clone)] +pub struct ServerState { + mode: Mode, +} + +#[derive(Clone)] +enum Mode { + Standalone(Arc), + Cluster(ClusterState), +} + +#[derive(Clone)] +struct ClusterState { + cluster: Arc, + regions: RegionDirectory, +} + +#[derive(Clone)] +struct RegionDirectory { + #[allow(dead_code)] + default_region: RegionId, + name_to_id: HashMap, + id_to_name: HashMap, +} + +#[derive(Debug, serde::Serialize)] +pub struct ClusterHealth { + pub leader: String, + pub relay_log_len: u64, + pub regions: Vec, +} + +#[derive(Debug, serde::Serialize)] +pub struct RegionStatus { + pub name: String, + pub applied_events: u64, + pub lag_events: i64, + pub partitioned: bool, +} + +impl ServerState { + pub fn standalone(db: TidalDb) -> Self { + Self { + mode: Mode::Standalone(Arc::new(db)), + } + } + + pub fn cluster(schema: Schema, layout: ClusterLayout) -> Result { + use tidaldb::testing::cluster::ClusterConfig; + + let mut regions = Vec::new(); + for (idx, name) in layout.regions.iter().enumerate() { + regions.push((RegionId(idx as u16), name.clone())); + } + + let leader_id = regions + .iter() + .find(|(_, name)| name.eq_ignore_ascii_case(&layout.leader)) + .map(|(id, _)| *id) + .ok_or_else(|| { + ServerError::ClusterConfig(format!( + "leader '{}' not found in region list", + layout.leader + )) + })?; + + let config = ClusterConfig { + regions: regions.iter().map(|(id, _)| *id).collect(), + leader_region: leader_id, + schema, + }; + let cluster = Arc::new(SimulatedCluster::build(config)); + let mut name_to_id = HashMap::new(); + let mut id_to_name = HashMap::new(); + for (id, name) in ®ions { + name_to_id.insert(name.to_lowercase(), *id); + id_to_name.insert(id.0, name.clone()); + } + + let directory = RegionDirectory { + default_region: leader_id, + name_to_id, + id_to_name, + }; + + Ok(Self { + mode: Mode::Cluster(ClusterState { + cluster, + regions: directory, + }), + }) + } + + pub fn is_cluster(&self) -> bool { + matches!(self.mode, Mode::Cluster(_)) + } + + pub fn write_item( + &self, + entity_id: EntityId, + metadata: &HashMap, + ) -> Result<()> { + match &self.mode { + Mode::Standalone(db) => db + .write_item_with_metadata(entity_id, metadata) + .map_err(ServerError::from), + Mode::Cluster(cluster) => cluster + .cluster + .write_item_with_metadata(entity_id, metadata) + .map_err(ServerError::from), + } + } + + pub fn write_embedding(&self, entity_id: EntityId, embedding: &[f32]) -> Result<()> { + match &self.mode { + Mode::Standalone(db) => db + .write_item_embedding(entity_id, embedding) + .map_err(ServerError::from), + Mode::Cluster(cluster) => cluster.cluster.write_item_embedding(entity_id, embedding).map_err(ServerError::from), + } + } + + pub fn signal( + &self, + signal_name: &str, + entity_id: EntityId, + weight: f64, + user_id: Option, + creator_id: Option, + ) -> Result<()> { + match &self.mode { + Mode::Standalone(db) => { + if user_id.is_some() || creator_id.is_some() { + db.signal_with_context( + signal_name, + entity_id, + weight, + tidaldb::schema::Timestamp::now(), + user_id, + creator_id, + ) + .map_err(ServerError::from) + } else { + db.signal( + signal_name, + entity_id, + weight, + tidaldb::schema::Timestamp::now(), + ) + .map_err(ServerError::from) + } + } + Mode::Cluster(cluster) => { + if user_id.is_some() || creator_id.is_some() { + return Err(ServerError::BadRequest( + "cluster mode currently supports only global signals (no user_id/creator_id)".into(), + )); + } + cluster.cluster.write_signal(signal_name, entity_id, weight); + Ok(()) + } + } + } + + pub fn retrieve( + &self, + region_name: Option<&str>, + query: &Retrieve, + ) -> Result { + match &self.mode { + Mode::Standalone(db) => db.retrieve(query).map_err(ServerError::from), + Mode::Cluster(cluster) => { + let region = cluster.resolve_region(region_name)?; + cluster.cluster.retrieve(region, query).map_err(ServerError::from) + } + } + } + + pub fn search( + &self, + region_name: Option<&str>, + query: &Search, + ) -> Result { + match &self.mode { + Mode::Standalone(db) => db.search(query).map_err(ServerError::from), + Mode::Cluster(cluster) => { + let region = cluster.resolve_region(region_name)?; + cluster.cluster.search(region, query).map_err(ServerError::from) + } + } + } + + pub fn item_count(&self, region_name: Option<&str>) -> Result { + match &self.mode { + Mode::Standalone(db) => Ok(db.item_count()), + Mode::Cluster(cluster) => { + let region = cluster.resolve_region(region_name)?; + Ok(cluster.cluster.item_count(region)) + } + } + } + + pub fn cluster_status(&self) -> Option { + match &self.mode { + Mode::Cluster(cluster) => { + let leader_id = cluster.cluster.leader_region(); + let leader_name = cluster + .regions + .id_to_name + .get(&leader_id.0) + .cloned() + .unwrap_or_else(|| format!("region-{}", leader_id.0)); + let leader_seqno = cluster.cluster.leader_seqno(); + let statuses = cluster + .regions + .id_to_name + .iter() + .map(|(id, name)| { + let rid = RegionId(*id); + let applied = cluster.cluster.applied_count(rid); + let lag = leader_seqno as i64 - applied as i64; + RegionStatus { + name: name.clone(), + applied_events: applied, + lag_events: lag.max(0), + partitioned: cluster.cluster.is_partitioned(rid), + } + }) + .collect(); + Some(ClusterHealth { + leader: leader_name, + relay_log_len: cluster.cluster.relay_log_len(), + regions: statuses, + }) + } + _ => None, + } + } + + pub fn promote_leader(&self, region_name: &str) -> Result<()> { + match &self.mode { + Mode::Cluster(cluster) => { + let region = cluster.regions.lookup(region_name).ok_or_else(|| { + ServerError::BadRequest(format!("unknown region '{region_name}'")) + })?; + cluster.cluster.promote_leader(region); + Ok(()) + } + _ => Err(ServerError::BadRequest( + "leader promotion only supported in cluster mode".into(), + )), + } + } + + pub fn partition_region(&self, region_name: &str) -> Result<()> { + match &self.mode { + Mode::Cluster(cluster) => { + let region = cluster.regions.lookup(region_name).ok_or_else(|| { + ServerError::BadRequest(format!("unknown region '{region_name}'")) + })?; + cluster.cluster.partition_region(region); + Ok(()) + } + _ => Err(ServerError::BadRequest( + "partitions only supported in cluster mode".into(), + )), + } + } + + pub fn heal_region(&self, region_name: &str) -> Result<()> { + match &self.mode { + Mode::Cluster(cluster) => { + let region = cluster.regions.lookup(region_name).ok_or_else(|| { + ServerError::BadRequest(format!("unknown region '{region_name}'")) + })?; + cluster.cluster.heal_region(region); + Ok(()) + } + _ => Err(ServerError::BadRequest( + "partitions only supported in cluster mode".into(), + )), + } + } +} + +impl ClusterState { + fn resolve_region(&self, name: Option<&str>) -> Result { + if let Some(name) = name { + self.regions + .lookup(name) + .ok_or_else(|| ServerError::BadRequest(format!("unknown region '{name}'"))) + } else { + Ok(self.cluster.leader_region()) + } + } +} + +impl RegionDirectory { + fn lookup(&self, name: &str) -> Option { + self.name_to_id.get(&name.trim().to_lowercase()).copied() + } +} diff --git a/tidal/src/testing/cluster.rs b/tidal/src/testing/cluster.rs index 9cc4a1b..18e8e16 100644 --- a/tidal/src/testing/cluster.rs +++ b/tidal/src/testing/cluster.rs @@ -1,22 +1,33 @@ //! Simulated multi-region cluster for M8 UAT testing. //! -//! Creates a set of ephemeral [`TidalDb`] instances (one per region) and -//! replicates signals from the leader to followers via a shared relay log. +//! Creates a set of ephemeral [`TidalDb`] instances wired with the real M8 +//! distributed fabric: in-process transports, `spawn_receiver`, and +//! `ReplicationState`. Signal replication now traverses the full path: +//! +//! ```text +//! write_signal → encode_batch → channel send +//! ↓ +//! spawn_receiver thread +//! ↓ +//! apply_payload +//! ↓ +//! SignalLedger::apply_wal_event +//! ReplicationState::advance +//! ``` //! //! # Architecture //! -//! Each node is a standard `TidalDb::builder().ephemeral().with_schema(...)`. -//! The "leader" is the node that accepts writes. The "followers" receive -//! replicated signals when [`SimulatedCluster::await_convergence`] is called. -//! -//! Replication is **signal-replay**: the leader records each signal in a -//! shared relay log, and `await_convergence` replays pending events into -//! each non-partitioned follower's `TidalDb`. This is fully synchronous -//! and deterministic -- no background threads, no race conditions. -//! -//! Partition injection marks a region as isolated. Isolated followers are -//! skipped during convergence and do not receive new events until the -//! partition is healed. +//! * All nodes open with `NodeRole::Single` so they accept direct writes AND +//! can be promoted to leader at any time. +//! * Every non-initial-leader region starts a `spawn_receiver` thread (via +//! `db.start_replication(transport)`) that processes incoming WAL batches. +//! * `write_signal` encodes the event as a one-event WAL batch and ships it +//! immediately to all non-partitioned followers. +//! * A `batch_log` records every shipped batch so `await_convergence` can +//! re-deliver missed batches after a partition is healed. +//! * `await_convergence` ships any pending batches, then polls +//! `ReplicationState::applied_seqno` until all active followers have caught +//! up to the current leader's sequence number. use std::collections::{HashMap, HashSet}; use std::sync::atomic::{AtomicU64, Ordering}; @@ -24,23 +35,19 @@ use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, Instant}; use crate::db::TidalDb; -use crate::replication::shard::RegionId; +use crate::db::config::{NodeConfig, NodeRole}; +use crate::query::retrieve::Retrieve; +use crate::query::search::Search; +use crate::replication::shard::{RegionId, ShardId}; +use crate::replication::transport::WalSegmentPayload; +use crate::replication::{WalSegmentId, spawn_receiver}; use crate::schema::{EntityId, Schema, Timestamp}; +use crate::signals::{NoopWalWriter, SignalLedger}; +use crate::wal::format::batch::{EventRecord, encode_batch}; -/// A signal event captured in the relay log for replication. -#[derive(Debug, Clone)] -pub struct RelayEvent { - /// The signal type name (e.g. "view", "like"). - pub signal_type: String, - /// The entity this signal targets. - pub entity_id: EntityId, - /// Signal weight. - pub weight: f64, - /// Timestamp of the signal. - pub timestamp: Timestamp, - /// Monotonically increasing sequence number (0-indexed). - pub seqno: u64, -} +use super::cluster_transport::{BatchEntry, ReceiveOnlyTransport}; + +// ── Public API ──────────────────────────────────────────────────────────── /// A simulated node in the cluster. pub struct SimulatedNode { @@ -60,47 +67,109 @@ pub struct ClusterConfig { pub schema: Schema, } -/// A simulated multi-region tidalDB cluster. +/// A simulated multi-region tidalDB cluster using the real M8 distributed +/// fabric. /// -/// All communication happens via in-memory relay log. No actual network, -/// no actual disk I/O (ephemeral mode). Designed for deterministic, -/// repeatable integration tests. +/// Signal replication traverses the real WAL-batch encode → transport → +/// `apply_payload` → `ReplicationState::advance` pipeline instead of calling +/// `db.signal()` directly on followers. pub struct SimulatedCluster { /// Current leader region ID. Mutable via `promote_leader`. leader_region: Mutex, /// All nodes indexed by region. nodes: HashMap, - /// Shared relay log: append-only sequence of signal events. - relay_log: Arc>>, - /// Per-follower: how many events from `relay_log` have been applied to - /// this follower's `TidalDb`. Monotonically increasing. - db_applied: HashMap, - /// Set of regions currently isolated (partition injected). + /// Per-follower channel senders (region → sender for WAL payloads). + /// + /// Only regions that have an active `spawn_receiver` thread appear here. + /// When dropped, the corresponding receiver thread exits cleanly. + follower_senders: HashMap>, + /// All batches ever shipped, for partition-recovery re-delivery. + batch_log: Mutex>, + /// Per-leader signal count used as the WAL seqno for that leader's batches. + leader_seqnos: Mutex>, + /// Total signals ever written to any leader (for [`relay_log_len`]). + total_signals: AtomicU64, + /// Regions currently isolated by a network partition. partitioned_regions: Arc>>, - /// Schema used by all nodes (kept for reference). + /// Schema kept for reference. #[allow(dead_code)] schema: Schema, + /// Pre-computed map: signal type name → `u8` ID used in [`EventRecord`]. + signal_type_ids: HashMap, } impl SimulatedCluster { /// Build a cluster from the given configuration. /// - /// All nodes are created immediately in ephemeral mode. + /// All nodes are created immediately in ephemeral mode. Non-leader regions + /// have a `spawn_receiver` thread started automatically. /// /// # Panics /// - /// Panics if any `TidalDb` fails to open (e.g. invalid schema). + /// Panics if any `TidalDb` fails to open or if `start_replication` fails. #[must_use] pub fn build(config: ClusterConfig) -> Self { - let mut nodes = HashMap::new(); - let mut db_applied = HashMap::new(); + // Pre-compute signal type IDs via a temporary ledger. + let scratch_ledger = SignalLedger::new(config.schema.clone(), Box::new(NoopWalWriter)); + let signal_type_ids: HashMap = config + .schema + .signals() + .filter_map(|def| { + scratch_ledger + .resolve_signal_type(def.name()) + .ok() + .map(|id| (def.name().to_string(), id.as_u16() as u8)) + }) + .collect(); + + // Derive all shard IDs (one per region). + let all_shards: Vec = config.regions.iter().map(|r| ShardId(r.0)).collect(); + + let mut nodes: HashMap = HashMap::new(); + let mut follower_senders: HashMap> = + HashMap::new(); + let mut leader_seqnos: HashMap = HashMap::new(); for ®ion in &config.regions { + let shard_id = ShardId(region.0); + let peer_shards: Vec = all_shards + .iter() + .copied() + .filter(|&s| s != shard_id) + .collect(); + let db = TidalDb::builder() .ephemeral() .with_schema(config.schema.clone()) + .with_cluster(NodeConfig { + role: NodeRole::Single, + shard_id, + peer_shards, + ..NodeConfig::default() + }) .open() .expect("ephemeral TidalDb with valid schema must open"); + + // Wire a receiver for every non-leader region. + if region != config.leader_region { + let (tx, rx) = crossbeam::channel::bounded::(1024); + let transport = Arc::new(ReceiveOnlyTransport { + local_shard: shard_id, + rx, + }); + // spawn_receiver directly: we already have Arc + // which implements Transport. Use the replication state from the db. + let ledger = db + .ledger() + .expect("ephemeral db with schema must have ledger") + .clone(); + let rep_state = db.replication_state().clone(); + let _handle = spawn_receiver(transport, ledger, rep_state); + // Note: the JoinHandle is intentionally not stored — the receiver thread + // will exit cleanly when `tx` (and all senders to `rx`) are dropped. + follower_senders.insert(region, tx); + } + nodes.insert( region, SimulatedNode { @@ -108,16 +177,19 @@ impl SimulatedCluster { db, }, ); - db_applied.insert(region, AtomicU64::new(0)); + leader_seqnos.insert(region, 0); } Self { leader_region: Mutex::new(config.leader_region), nodes, - relay_log: Arc::new(Mutex::new(Vec::new())), - db_applied, + follower_senders, + batch_log: Mutex::new(Vec::new()), + leader_seqnos: Mutex::new(leader_seqnos), + total_signals: AtomicU64::new(0), partitioned_regions: Arc::new(RwLock::new(HashSet::new())), schema: config.schema, + signal_type_ids, } } @@ -155,39 +227,91 @@ impl SimulatedCluster { .unwrap_or_else(|| panic!("no node for region {region}")) } - /// Write a signal to the cluster leader and append to the relay log. + /// Write a signal to the cluster leader and ship it to active followers. /// - /// The signal is immediately applied to the leader's `TidalDb` and - /// recorded in the relay log for later replication to followers. + /// The signal is immediately applied to the leader's `TidalDb`. A one-event + /// WAL batch is encoded and shipped via the channel transport to all + /// non-partitioned followers with active receivers. The batch is also + /// recorded in the `batch_log` for partition-recovery re-delivery. /// /// # Panics /// - /// Panics if the signal write on the leader fails. + /// Panics if the signal write on the leader fails or if the signal type is + /// not registered in the schema. pub fn write_signal(&self, signal_type: &str, entity_id: EntityId, weight: f64) { let ts = Timestamp::now(); let leader_region = self.leader_region(); - self.nodes - .get(&leader_region) - .expect("leader node exists") + let leader_shard = ShardId(leader_region.0); + + // Write to the leader's signal ledger. + self.nodes[&leader_region] .db .signal(signal_type, entity_id, weight, ts) .expect("signal write on leader must succeed"); - let mut log = self - .relay_log + // Encode as a one-event WAL batch. + let type_id = *self + .signal_type_ids + .get(signal_type) + .expect("signal type must be registered in the cluster schema"); + + let seqno = { + let mut seqnos = self + .leader_seqnos + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + let v = { + let s = seqnos.entry(leader_region).or_insert(0); + *s += 1; + *s + }; + drop(seqnos); + v + }; + + let events = [EventRecord { + entity_id: entity_id.as_u64(), + signal_type: type_id, + weight: weight as f32, + timestamp_nanos: ts.as_nanos(), + }]; + let bytes = + encode_batch(&events, seqno, ts.as_nanos()).expect("WAL batch encoding must not fail"); + + // Ship immediately to non-partitioned followers that have active receivers. + let partitioned = self + .partitioned_regions + .read() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .clone(); + + for (®ion, tx) in &self.follower_senders { + if region == leader_region || partitioned.contains(®ion) { + continue; + } + let payload = WalSegmentPayload { + id: WalSegmentId::new(crate::replication::RegionId::SINGLE, leader_shard, seqno), + bytes: bytes.clone(), + event_count: 1, + }; + // Ignore send errors: the receiver may have exited (e.g. after a crash). + let _ = tx.send(payload); + } + + // Record in batch_log for partition-recovery re-delivery. + self.batch_log .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner); - let seqno = log.len() as u64; - log.push(RelayEvent { - signal_type: signal_type.to_string(), - entity_id, - weight, - timestamp: ts, - seqno, - }); + .unwrap_or_else(std::sync::PoisonError::into_inner) + .push(BatchEntry { + source_shard: leader_shard, + seqno, + bytes, + }); + + self.total_signals.fetch_add(1, Ordering::Relaxed); } - /// Write a signal directly to a specific region's node (bypassing leader). + /// Write a signal directly to a specific region (bypassing the leader). /// /// Used to simulate partitioned writes during split-brain scenarios. /// @@ -210,71 +334,94 @@ impl SimulatedCluster { .expect("signal write must succeed"); } - /// Wait for all non-partitioned followers to receive and apply all - /// pending relay log events. + /// Wait for all non-partitioned followers to apply all pending WAL batches. /// - /// This is synchronous: it replays events into each follower's `TidalDb` - /// directly. The `timeout` guards against programming errors, not actual - /// latency (in-process replay is instant). + /// * Ships any batches that were missed during a partition (re-delivery from + /// the `batch_log`). + /// * Polls `ReplicationState::applied_seqno` for each active follower until + /// it reaches the current leader's latest seqno. /// /// # Panics /// - /// Panics if convergence is not reached within `timeout` (should never - /// happen for in-process relay, but defends against infinite loops). + /// Panics if convergence is not reached within `timeout`. pub fn await_convergence(&self, timeout: Duration) { let deadline = Instant::now() + timeout; - let leader_region = self.leader_region(); - // In-process replay: apply all pending events to each non-partitioned - // follower. We loop because a partition might be healed mid-wait. loop { assert!( Instant::now() <= deadline, "convergence timeout: cluster did not converge within {timeout:?}" ); + let leader_region = self.leader_region(); + let leader_shard = ShardId(leader_region.0); + + let target_seqno = self + .leader_seqnos + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .get(&leader_region) + .copied() + .unwrap_or(0); + let partitioned = self .partitioned_regions .read() .unwrap_or_else(std::sync::PoisonError::into_inner) .clone(); - let log = self - .relay_log - .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner) - .clone(); - let log_len = log.len() as u64; + // Re-deliver any batches missed during a partition. + { + let log = self + .batch_log + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + for (®ion, tx) in &self.follower_senders { + if region == leader_region || partitioned.contains(®ion) { + continue; + } + let node = &self.nodes[®ion]; + for entry in log.iter() { + let already_applied = node + .db + .replication_state() + .applied_seqno(entry.source_shard) + .unwrap_or(0); + if entry.seqno > already_applied { + let payload = WalSegmentPayload { + id: WalSegmentId::new( + crate::replication::RegionId::SINGLE, + entry.source_shard, + entry.seqno, + ), + bytes: entry.bytes.clone(), + event_count: 1, + }; + let _ = tx.try_send(payload); + } + } + } + } + + // Check whether all active non-partitioned followers have converged. let mut all_converged = true; for (®ion, node) in &self.nodes { if region == leader_region || partitioned.contains(®ion) { continue; } - - let applied = self - .db_applied - .get(®ion) - .expect("db_applied entry for every region"); - let current = applied.load(Ordering::Acquire); - - if current < log_len { - // Replay events [current..log_len) into this follower. - for event in &log[current as usize..log_len as usize] { - node.db - .signal( - &event.signal_type, - event.entity_id, - event.weight, - event.timestamp, - ) - .expect("follower signal replay must succeed"); - } - applied.store(log_len, Ordering::Release); + // Only check regions that have an active receiver thread. + if !self.follower_senders.contains_key(®ion) { + continue; } - - if applied.load(Ordering::Acquire) < log_len { + let applied = node + .db + .replication_state() + .applied_seqno(leader_shard) + .unwrap_or(0); + if applied < target_seqno { all_converged = false; + break; } } @@ -300,24 +447,116 @@ impl SimulatedCluster { }) } - /// Current length of the relay log (total events written by the leader). - #[must_use] - pub fn relay_log_len(&self) -> u64 { - self.relay_log + /// Broadcast item metadata to all nodes. + pub fn write_item_with_metadata( + &self, + entity_id: EntityId, + metadata: &HashMap, + ) -> crate::Result<()> { + for node in self.nodes.values() { + node.db.write_item_with_metadata(entity_id, metadata)?; + } + Ok(()) + } + + /// Broadcast embedding writes to all nodes. + pub fn write_item_embedding( + &self, + entity_id: EntityId, + embedding: &[f32], + ) -> crate::Result<()> { + for node in self.nodes.values() { + node.db.write_item_embedding(entity_id, embedding)?; + } + Ok(()) + } + + /// Run a RETRIEVE query against a region. + pub fn retrieve( + &self, + region: RegionId, + query: &Retrieve, + ) -> crate::Result { + self.node(region).db.retrieve(query) + } + + /// Run a SEARCH query against a region. + pub fn search( + &self, + region: RegionId, + query: &Search, + ) -> crate::Result { + self.node(region).db.search(query) + } + + /// Item count helper for health checks. + pub fn item_count(&self, region: RegionId) -> u64 { + self.node(region).db.item_count() + } + + /// Current leader seqno. + pub fn leader_seqno(&self) -> u64 { + let leader = self.leader_region(); + self.leader_seqnos .lock() .unwrap_or_else(std::sync::PoisonError::into_inner) - .len() as u64 + .get(&leader) + .copied() + .unwrap_or(0) } - /// How many events have been applied to a specific region's follower DB. + /// Mark a region as partitioned. + pub fn partition_region(&self, region: RegionId) { + let mut partitions = self + .partitioned_regions + .write() + .unwrap_or_else(std::sync::PoisonError::into_inner); + partitions.insert(region); + } + + /// Heal a partitioned region. + pub fn heal_region(&self, region: RegionId) { + let mut partitions = self + .partitioned_regions + .write() + .unwrap_or_else(std::sync::PoisonError::into_inner); + partitions.remove(®ion); + } + + /// Whether a region is currently partitioned. + pub fn is_partitioned(&self, region: RegionId) -> bool { + self.partitioned_regions + .read() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .contains(®ion) + } + + /// Total number of signals ever written to any leader (replaces relay log + /// length in the old signal-replay implementation). + #[must_use] + pub fn relay_log_len(&self) -> u64 { + self.total_signals.load(Ordering::Acquire) + } + + /// Number of WAL batches applied from the initial leader (`ShardId(0)`) + /// on a specific region's replication state. + /// + /// This is equivalent to the event count for all tests that do not involve + /// leader promotion, since each signal produces exactly one WAL batch. #[must_use] pub fn applied_count(&self, region: RegionId) -> u64 { - self.db_applied - .get(®ion) - .map_or(0, |a| a.load(Ordering::Acquire)) + // ShardId(0) is always the initial leader's shard (RegionId(0)). + // Tests that check `applied_count` do not involve leader promotion, + // so this is always the correct source shard. + self.nodes.get(®ion).map_or(0, |n| { + n.db.replication_state() + .applied_seqno(ShardId(0)) + .unwrap_or(0) + }) } - /// Access the shared partitioned regions set (for fault injection). + /// Access the shared partitioned-regions set (for fault injection via + /// [`crate::testing::faults::NetworkPartition`]). #[must_use] pub const fn partitioned_regions(&self) -> &Arc>> { &self.partitioned_regions @@ -325,9 +564,9 @@ impl SimulatedCluster { /// Promote a follower to leader. /// - /// The old leader stops receiving writes via `write_signal` (it is - /// no longer returned by `leader()`). The new leader can now accept - /// writes. Data already on each node is preserved. + /// After promotion `write_signal` routes writes to the new leader and ships + /// batches to all other regions that have active receivers. The new leader + /// must already exist as a node in the cluster. /// /// # Panics /// diff --git a/tidal/src/testing/cluster_transport.rs b/tidal/src/testing/cluster_transport.rs new file mode 100644 index 0000000..17ce201 --- /dev/null +++ b/tidal/src/testing/cluster_transport.rs @@ -0,0 +1,40 @@ +//! Internal transport types used by [`super::cluster::SimulatedCluster`]. + +use crate::replication::shard::ShardId; +use crate::replication::transport::{Transport, TransportError, WalSegmentPayload}; + +// ── Internal: receive-only transport ───────────────────────────────────── + +/// Minimal transport implementation used by follower receivers. +/// +/// Owns a crossbeam `Receiver` for incoming WAL segments. The `send_segment` +/// side is a no-op — all shipping is managed by the cluster struct. +pub(super) struct ReceiveOnlyTransport { + pub(super) local_shard: ShardId, + pub(super) rx: crossbeam::channel::Receiver, +} + +impl Transport for ReceiveOnlyTransport { + fn send_segment(&self, _: ShardId, _: WalSegmentPayload) -> Result<(), TransportError> { + Ok(()) + } + + fn recv_segment(&self) -> Option { + self.rx.recv().ok() + } + + fn local_shard(&self) -> ShardId { + self.local_shard + } +} + +// ── Internal: batch log entry ───────────────────────────────────────────── + +pub(super) struct BatchEntry { + /// Which leader shard produced this batch. + pub(super) source_shard: ShardId, + /// 1-indexed sequence number scoped to `source_shard`. + pub(super) seqno: u64, + /// Encoded WAL batch bytes (from [`crate::wal::format::batch::encode_batch`]). + pub(super) bytes: Vec, +} diff --git a/tidal/src/testing/mod.rs b/tidal/src/testing/mod.rs index 23f413b..54a968b 100644 --- a/tidal/src/testing/mod.rs +++ b/tidal/src/testing/mod.rs @@ -5,6 +5,7 @@ //! and compiles away entirely in production builds. pub mod cluster; +pub(super) mod cluster_transport; pub mod crash_injector; pub mod faults;