//! StemeDB cluster node binary. //! //! Starts a single cluster node with: //! - SWIM membership protocol for node discovery //! - Range-based sharding for data distribution //! - Gateway HTTP API for client requests //! //! # Environment Variables //! //! | Variable | Default | Description | //! |----------|---------|-------------| //! | `STEMEDB_NODE_API_ADDR` | `127.0.0.1:18181` | Gateway HTTP address | //! | `STEMEDB_NODE_RPC_ADDR` | `127.0.0.1:18182` | gRPC sync address | //! | `STEMEDB_SEED_NODES` | (empty) | Comma-separated seed node RPC addresses | //! | `STEMEDB_NUM_SHARDS` | `4` | Number of shards | //! | `STEMEDB_REPLICATION_FACTOR` | `1` | Replication factor | //! | `STEMEDB_DATACENTER` | (empty) | Datacenter/region label | use std::net::SocketAddr; use std::sync::Arc; use tracing::info; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use stemedb_cluster::{ stable_node_id, Gateway, NodeInfo, RangeManager, RangeRouter, ShardingConfig, SwimConfig, SwimMembership, }; /// Node configuration loaded from environment variables. struct NodeConfig { api_addr: SocketAddr, rpc_addr: SocketAddr, seed_nodes: Vec, num_shards: u32, replication_factor: u32, datacenter: Option, } impl NodeConfig { fn from_env() -> Self { let api_addr = std::env::var("STEMEDB_NODE_API_ADDR") .unwrap_or_else(|_| "127.0.0.1:18181".to_string()) .parse() .unwrap_or_else(|_| SocketAddr::from(([127, 0, 0, 1], 18181))); let rpc_addr = std::env::var("STEMEDB_NODE_RPC_ADDR") .unwrap_or_else(|_| "127.0.0.1:18182".to_string()) .parse() .unwrap_or_else(|_| SocketAddr::from(([127, 0, 0, 1], 18182))); let seed_nodes = std::env::var("STEMEDB_SEED_NODES") .unwrap_or_default() .split(',') .filter(|s| !s.trim().is_empty()) .filter_map(|s| s.trim().parse().ok()) .collect(); let num_shards = std::env::var("STEMEDB_NUM_SHARDS").ok().and_then(|s| s.parse().ok()).unwrap_or(4); let replication_factor = std::env::var("STEMEDB_REPLICATION_FACTOR") .ok() .and_then(|s| s.parse().ok()) .unwrap_or(1); let datacenter = std::env::var("STEMEDB_DATACENTER").ok(); Self { api_addr, rpc_addr, seed_nodes, num_shards, replication_factor, datacenter } } } #[tokio::main] async fn main() -> Result<(), Box> { // Initialize tracing let env_filter = match tracing_subscriber::EnvFilter::try_from_default_env() { Ok(filter) => filter, Err(_) => "stemedb_cluster=info,tower_http=debug".into(), }; tracing_subscriber::registry().with(env_filter).with(tracing_subscriber::fmt::layer()).init(); let config = NodeConfig::from_env(); // Use stable NodeId (env var → hostname → random fallback) let node_id = stable_node_id(); info!( node_id = %node_id.short_hex(), api_addr = %config.api_addr, rpc_addr = %config.rpc_addr, num_shards = config.num_shards, replication_factor = config.replication_factor, datacenter = ?config.datacenter, seed_count = config.seed_nodes.len(), "Starting StemeDB cluster node" ); // --- Membership --- let local_info = NodeInfo::new(node_id, config.rpc_addr, config.api_addr); let swim_config = SwimConfig::default(); let membership = Arc::new(SwimMembership::new(local_info, swim_config)); // Join cluster (bootstrap if no seeds) membership.join(config.seed_nodes.clone()).await?; membership.start(); info!( joined = membership.is_joined(), members = membership.member_count(), "Membership initialized" ); // --- Sharding --- let router = Arc::new(RangeRouter::new(node_id)); let sharding_config = ShardingConfig::new() .with_num_shards(config.num_shards) .with_replication_factor(config.replication_factor); let range_manager = RangeManager::new(Arc::clone(&router), Arc::clone(&membership), sharding_config, node_id); range_manager.initialize_shards()?; let meta = router.get_meta_range(); info!(shards = meta.num_shards(), version = meta.version, "Shard meta-range initialized"); // --- Gateway --- let gateway = Gateway::new(Arc::clone(&router), Arc::clone(&membership), config.api_addr); info!( addr = %config.api_addr, "Gateway listening — cluster endpoints available:" ); info!(" GET /v1/health - Node health"); info!(" GET /v1/cluster/status - Cluster topology"); info!(" GET /v1/shards/:id - Shard details"); info!(" GET /v1/route?subject=X - Test subject routing"); info!(" POST /v1/assert - Create assertion (routed)"); info!(" GET /v1/query?subject=X - Query assertions (routed)"); gateway.serve().await?; Ok(()) }