From 6c6ee04e9c83d10d4cadbe49428cc9c2a3f57867 Mon Sep 17 00:00:00 2001 From: jordan Date: Sat, 7 Mar 2026 15:09:29 -0700 Subject: [PATCH] =?UTF-8?q?feat:=20complete=20cluster=20integration=20?= =?UTF-8?q?=E2=80=94=20SWIM=20gossip,=20gRPC=20server,=20shard=20rebalanci?= =?UTF-8?q?ng,=20single=20binary?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 8-task cluster completion bringing 3-replica StatefulSet from isolated nodes to fully functional cluster: 1. Fix Gateway /metrics 500 (wire PrometheusHandle) 2. gRPC server + SWIM background tasks (probe, suspicion, gossip dissemination) 3. join() registers peers in membership table via PingResponse fields 4. Shard rebalancing on membership changes (deterministic round-robin) 5. API cluster wiring (DNS resolution, Gateway, gRPC, gossip broadcaster) 6. Single binary merge (stemedb-api --features cluster replaces stemedb-node) 7. Auth header forwarding (X-API-Key passed through Gateway to backends) 8. CORS restriction (STEMEDB_ALLOWED_ORIGINS env var, permissive fallback) --- CLAUDE.md | 42 +-- Cargo.lock | 5 + Dockerfile | 5 +- applications/aphoria/src/remote/client.rs | 1 - crates/stemedb-api/Cargo.toml | 7 +- crates/stemedb-api/src/lib.rs | 2 + crates/stemedb-api/src/main.rs | 255 ++++++++++++++---- crates/stemedb-api/src/routers.rs | 36 ++- crates/stemedb-api/src/sync_storage_bridge.rs | 113 ++++++++ crates/stemedb-cluster/src/bin/node.rs | 11 +- .../src/gateway/handlers/query_handlers.rs | 10 +- .../src/gateway/handlers/write_handlers.rs | 19 +- crates/stemedb-cluster/src/gateway/service.rs | 26 +- crates/stemedb-cluster/src/membership/swim.rs | 222 ++++++++++++++- .../stemedb-cluster/src/sharding/manager.rs | 36 +++ crates/stemedb-rpc/proto/sync.proto | 33 +++ crates/stemedb-rpc/src/server.rs | 27 +- docs/operations/README.md | 2 +- .../deployment/k8s-deploy-roadmap.md | 105 ++++---- .../three-node-cluster.md | 67 ++--- scripts/entrypoint.sh | 39 +-- 21 files changed, 831 insertions(+), 232 deletions(-) create mode 100644 crates/stemedb-api/src/sync_storage_bridge.rs diff --git a/CLAUDE.md b/CLAUDE.md index 35e8856..20ad2ef 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -488,29 +488,39 @@ All production infra is under the `jordan@roamrhino.com` Google account, GCP pro | Service | URL | |---------|-----| -| StemeDB API | `https://stemedb.threesix.ai` | -| StemeDB internal | `http://stemedb-api.stemedb.svc:18180` | +| StemeDB API (external) | `https://stemedb.threesix.ai` (→ Gateway :18181) | +| StemeDB Gateway (internal) | `http://stemedb-gateway.stemedb.svc:18181` | +| StemeDB API (per-pod) | `http://stemedb-{0,1,2}.stemedb-headless.stemedb.svc:18180` | ### Deployment Workflow -```bash -# 1. Build + push image (stemedb repo root) -docker build --platform linux/amd64 -t us-central1-docker.pkg.dev/orchard9/docker-images/stemedb-api:latest . -docker push us-central1-docker.pkg.dev/orchard9/docker-images/stemedb-api:latest +**Automated (normal):** Push to `main` → Woodpecker CI → Kaniko build → Zot registry → `kubectl set image statefulset/stemedb` -# 2. Add/update DNS A record (get Traefik IP first) +```bash +# Automated deploy: just push to main +git push origin main +# Woodpecker pipeline handles: build → registry.threesix.ai → kubectl rollout + +# Manual deploy (if needed): +docker build --platform linux/amd64 -t registry.threesix.ai/stemedb-api:latest . +docker push registry.threesix.ai/stemedb-api:latest +kubectl --kubeconfig ~/.kube/orchard9-k3sf.yaml set image statefulset/stemedb \ + stemedb=registry.threesix.ai/stemedb-api:latest -n stemedb +kubectl --kubeconfig ~/.kube/orchard9-k3sf.yaml rollout status statefulset/stemedb -n stemedb --timeout=300s + +# First-time setup only: +# 1. Create GCP secret +echo -n "steme_live_$(openssl rand -hex 24)" | \ + gcloud secrets create stemedb-root-api-key --project=orchard9 \ + --replication-policy=automatic --data-file=- + +# 2. Apply k8s manifests +kubectl apply -k /Users/jordanwashburn/Workspace/orchard9/k3s-fleet/deployments/k8s/base/stemedb/ + +# 3. Add DNS A record (Cloudflare) TRAEFIK_IP=$(kubectl get svc -n kube-system traefik -o jsonpath='{.status.loadBalancer.ingress[0].ip}') curl -X POST "https://api.cloudflare.com/client/v4/zones/$THREESIX_CLOUDFLARE_ZONE_ID/dns_records" \ -H "Authorization: Bearer $THREESIX_CLOUDFLARE_API_TOKEN" \ -H "Content-Type: application/json" \ -d "{\"type\":\"A\",\"name\":\"stemedb\",\"content\":\"$TRAEFIK_IP\",\"ttl\":1,\"proxied\":false}" - -# 3. Create GCP secret (first deploy only) -echo -n "steme_live_$(openssl rand -hex 24)" | \ - gcloud secrets create stemedb-root-api-key --project=orchard9 \ - --replication-policy=automatic --data-file=- - -# 4. Deploy -kubectl apply -k /Users/jordanwashburn/Workspace/orchard9/k3s-fleet/deployments/k8s/base/stemedb/ -kubectl rollout status deployment/stemedb-api -n stemedb --timeout=120s ``` diff --git a/Cargo.lock b/Cargo.lock index 22caa55..ab7260c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3750,6 +3750,7 @@ name = "stemedb-api" version = "0.1.0" dependencies = [ "aphoria", + "async-trait", "axum", "axum-server", "base64 0.22.1", @@ -3771,17 +3772,21 @@ dependencies = [ "stemedb-core", "stemedb-ingest", "stemedb-lens", + "stemedb-merkle", "stemedb-query", + "stemedb-rpc", "stemedb-storage", "stemedb-sync", "stemedb-wal", "tempfile", "thiserror 1.0.69", "tokio", + "tonic", "tower 0.4.13", "tower-http 0.5.2", "tracing", "tracing-subscriber", + "uhlc", "utoipa", "utoipa-axum", "utoipa-swagger-ui", diff --git a/Dockerfile b/Dockerfile index ea8208b..8e32227 100644 --- a/Dockerfile +++ b/Dockerfile @@ -42,9 +42,9 @@ RUN cargo chef cook --release --recipe-path recipe.json # Inherits compiled deps from cacher; only workspace source is compiled here. FROM cacher AS builder COPY . . -RUN cargo build --release -p stemedb-api -p stemedb-cluster +RUN cargo build --release -p stemedb-api --features cluster # Strip debug symbols before copying to runtime image -RUN strip target/release/stemedb-api target/release/stemedb-node +RUN strip target/release/stemedb-api # Stage 4: Runtime — minimal production image FROM debian:bookworm-slim AS runtime @@ -59,7 +59,6 @@ RUN apt-get update && \ RUN useradd --system --no-create-home --shell /bin/false stemedb COPY --from=builder /app/target/release/stemedb-api /usr/local/bin/stemedb-api -COPY --from=builder /app/target/release/stemedb-node /usr/local/bin/stemedb-node COPY scripts/entrypoint.sh /usr/local/bin/entrypoint.sh RUN chmod +x /usr/local/bin/entrypoint.sh && \ diff --git a/applications/aphoria/src/remote/client.rs b/applications/aphoria/src/remote/client.rs index 2cffa4e..5993b46 100644 --- a/applications/aphoria/src/remote/client.rs +++ b/applications/aphoria/src/remote/client.rs @@ -615,7 +615,6 @@ mod tests { offline_fallback: OfflineFallback::Skip, max_retries: 3, retry_delay_ms: 1000, - team_id: None, }; let result = RemoteClaimStore::new(&config); diff --git a/crates/stemedb-api/Cargo.toml b/crates/stemedb-api/Cargo.toml index f160171..7cea120 100644 --- a/crates/stemedb-api/Cargo.toml +++ b/crates/stemedb-api/Cargo.toml @@ -10,7 +10,7 @@ workspace = true [features] default = ["aphoria"] aphoria = ["dep:aphoria"] -cluster = ["dep:stemedb-cluster", "dep:stemedb-sync"] +cluster = ["dep:stemedb-cluster", "dep:stemedb-sync", "dep:stemedb-rpc", "dep:stemedb-merkle", "dep:tonic", "dep:uhlc", "dep:async-trait"] [dependencies] stemedb-core = { path = "../stemedb-core" } @@ -26,6 +26,11 @@ aphoria = { path = "../../applications/aphoria", optional = true } # Optional: Multi-node cluster participation stemedb-cluster = { path = "../stemedb-cluster", optional = true } stemedb-sync = { path = "../stemedb-sync", optional = true } +stemedb-rpc = { path = "../stemedb-rpc", optional = true } +stemedb-merkle = { path = "../stemedb-merkle", optional = true } +tonic = { version = "0.12", optional = true } +uhlc = { version = "0.7", optional = true } +async-trait = { version = "0.1", optional = true } axum = { version = "0.7", features = ["json"] } axum-server = { version = "0.7", features = ["tls-rustls"] } diff --git a/crates/stemedb-api/src/lib.rs b/crates/stemedb-api/src/lib.rs index 9401f73..a181eaa 100644 --- a/crates/stemedb-api/src/lib.rs +++ b/crates/stemedb-api/src/lib.rs @@ -42,6 +42,8 @@ pub mod scan_cache; pub mod services; pub mod state; pub mod store_helpers; +#[cfg(feature = "cluster")] +pub mod sync_storage_bridge; use utoipa::OpenApi; diff --git a/crates/stemedb-api/src/main.rs b/crates/stemedb-api/src/main.rs index 77286c0..5428639 100644 --- a/crates/stemedb-api/src/main.rs +++ b/crates/stemedb-api/src/main.rs @@ -169,11 +169,207 @@ async fn main() -> Result<(), Box> { .transpose()?; let state = AppState::new(write_journal, read_journal, Arc::clone(&store), corpus_store); + + // Bootstrap root API key from env (idempotent: no-op if key already exists). + if let Err(e) = bootstrap::bootstrap_root_api_key(&*state.api_key_store).await { + error!("Failed to bootstrap root API key: {}", e); + std::process::exit(1); + } + + // Cluster mode: join SWIM membership, start gRPC server, Gateway, gossip, and anti-entropy. + // Requires the `cluster` feature to be enabled at compile time. + #[cfg(feature = "cluster")] + let cluster_gossip: Option> = { + let cluster_mode = std::env::var("STEMEDB_CLUSTER_MODE") + .map(|v| v.to_lowercase() == "true" || v == "1") + .unwrap_or(false); + + if cluster_mode { + use stemedb_cluster::{ + stable_node_id, Gateway, NodeInfo, RangeManager, RangeRouter, ShardingConfig, + SwimConfig, SwimMembership, + }; + use stemedb_rpc::proto::sync_service_server::SyncServiceServer; + use stemedb_rpc::SyncServiceHandler; + + let node_id = stable_node_id(); + + let rpc_addr: SocketAddr = std::env::var("STEMEDB_NODE_RPC_ADDR") + .unwrap_or_else(|_| "0.0.0.0:18182".to_string()) + .parse() + .unwrap_or_else(|_| SocketAddr::from(([0, 0, 0, 0], 18182))); + + let gateway_addr: SocketAddr = std::env::var("STEMEDB_NODE_API_ADDR") + .unwrap_or_else(|_| "0.0.0.0:18181".to_string()) + .parse() + .unwrap_or_else(|_| SocketAddr::from(([0, 0, 0, 0], 18181))); + + let api_addr: SocketAddr = config + .bind_addr + .parse() + .unwrap_or_else(|_| SocketAddr::from(([0, 0, 0, 0], 18180))); + + // --- Membership --- + let local_info = NodeInfo::new(node_id, rpc_addr, api_addr); + let membership = Arc::new(SwimMembership::new(local_info, SwimConfig::default())); + + // Resolve seeds via DNS (for k8s headless service names) + let raw_seeds = std::env::var("STEMEDB_SEED_NODES").unwrap_or_default(); + let mut seeds = Vec::new(); + for entry in raw_seeds.split(',').filter(|s| !s.trim().is_empty()) { + let entry = entry.trim(); + if let Ok(addr) = entry.parse::() { + seeds.push(addr); + } else { + match tokio::net::lookup_host(entry).await { + Ok(mut addrs) => { + if let Some(addr) = addrs.next() { + info!(seed = entry, resolved = %addr, "Resolved seed node"); + seeds.push(addr); + } + } + Err(e) => { + info!(seed = entry, error = %e, "Failed to resolve seed"); + } + } + } + } + + if let Err(e) = membership.join(seeds).await { + warn!("Cluster join failed (continuing as solo node): {}", e); + } + membership.start(); + membership.spawn_background_tasks(); + + // --- Sharding --- + let num_shards = std::env::var("STEMEDB_NUM_SHARDS") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(4u32); + let replication_factor = std::env::var("STEMEDB_REPLICATION_FACTOR") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(1u32); + + let router = Arc::new(RangeRouter::new(node_id)); + let sharding_config = ShardingConfig::new() + .with_num_shards(num_shards) + .with_replication_factor(replication_factor); + + let range_manager = RangeManager::new( + Arc::clone(&router), + Arc::clone(&membership), + sharding_config, + node_id, + ); + if let Err(e) = range_manager.initialize_shards() { + error!("Failed to initialize shards: {}", e); + } + + // --- gRPC server (SyncStorageBridge) --- + let merkle_manager = Arc::new( + stemedb_sync::MerkleTreeManager::load_or_create(Arc::clone(&store)) + .await + .map_err(|e| format!("Failed to load Merkle tree: {e}"))?, + ); + + let bridge = Arc::new(stemedb_api::sync_storage_bridge::SyncStorageBridge::new( + Arc::clone(&store), + Arc::clone(&merkle_manager), + *node_id.as_bytes(), + rpc_addr.to_string(), + api_addr.to_string(), + )); + + let grpc_service = SyncServiceServer::new(SyncServiceHandler::new(bridge)); + let grpc_rpc_addr = rpc_addr; + tokio::spawn(async move { + info!(addr = %grpc_rpc_addr, "gRPC server listening"); + if let Err(e) = tonic::transport::Server::builder() + .add_service(grpc_service) + .serve(grpc_rpc_addr) + .await + { + error!("gRPC server error: {}", e); + } + }); + + // --- Gossip broadcaster --- + let peer_addrs: Vec = + membership.members().iter().map(|m| format!("http://{}", m.rpc_addr)).collect(); + let gossip_broadcaster: Option> = + match stemedb_sync::GossipBroadcaster::new(peer_addrs).await { + Ok(b) => Some(Arc::new(b)), + Err(e) => { + warn!("Failed to create gossip broadcaster: {}", e); + None + } + }; + + // --- Gateway --- + let gateway = Gateway::new(Arc::clone(&router), Arc::clone(&membership), gateway_addr) + .with_prometheus_handle(Arc::clone(&prometheus_handle)); + + tokio::spawn(async move { + if let Err(e) = gateway.serve().await { + error!("Gateway error: {}", e); + } + }); + + // --- Shard rebalance listener --- + let rebalance_membership = Arc::clone(&membership); + let rebalance_router = Arc::clone(&router); + tokio::spawn(async move { + let mut rx = rebalance_membership.subscribe(); + let sharding_cfg = ShardingConfig::new() + .with_num_shards(num_shards) + .with_replication_factor(replication_factor); + loop { + match rx.recv().await { + Ok(event) => { + if event.is_join() || event.is_failure() || event.is_leave() { + // Debounce: wait 5s for cluster to stabilize + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + let rm = RangeManager::new( + Arc::clone(&rebalance_router), + Arc::clone(&rebalance_membership), + sharding_cfg.clone(), + node_id, + ); + if let Err(e) = rm.rebalance_shards() { + warn!("Shard rebalance failed: {}", e); + } + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + warn!(skipped = n, "Membership event receiver lagged"); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + }); + + info!( + node_id = %node_id.short_hex(), + rpc_addr = %rpc_addr, + gateway_addr = %gateway_addr, + "Cluster mode fully active" + ); + + gossip_broadcaster + } else { + None + } + }; + + #[cfg(not(feature = "cluster"))] + let cluster_gossip: Option> = None; + + // Spawn IngestWorker (after cluster block so gossip broadcaster can be injected) let worker_journal = state.journal.clone(); let worker_store = store; let worker_flush_notify = Arc::clone(&state.flush_notify); let skip_sigs = config.unsafe_skip_signatures; - let worker_memtable = Arc::clone(&state.memtable); tokio::spawn(async move { match IngestWorker::new(worker_journal, worker_store).await { @@ -182,6 +378,9 @@ async fn main() -> Result<(), Box> { .with_flush_notify(worker_flush_notify) .with_memtable(worker_memtable) .with_skip_signature_verification(skip_sigs); + if let Some(gossip) = cluster_gossip { + worker = worker.with_gossip_broadcaster(gossip); + } info!(skip_signatures = skip_sigs, "IngestWorker started"); worker.run().await; } @@ -189,60 +388,6 @@ async fn main() -> Result<(), Box> { } }); - // Bootstrap root API key from env (idempotent: no-op if key already exists). - if let Err(e) = bootstrap::bootstrap_root_api_key(&*state.api_key_store).await { - error!("Failed to bootstrap root API key: {}", e); - std::process::exit(1); - } - - // Cluster mode: join SWIM membership when STEMEDB_CLUSTER_MODE=true. - // Requires the `cluster` feature to be enabled at compile time. - #[cfg(feature = "cluster")] - { - let cluster_mode = std::env::var("STEMEDB_CLUSTER_MODE") - .map(|v| v.to_lowercase() == "true" || v == "1") - .unwrap_or(false); - - if cluster_mode { - use stemedb_cluster::{stable_node_id, NodeInfo, SwimConfig, SwimMembership}; - - let node_id = stable_node_id(); - - let rpc_addr: std::net::SocketAddr = std::env::var("STEMEDB_NODE_RPC_ADDR") - .unwrap_or_else(|_| "127.0.0.1:18182".to_string()) - .parse() - .unwrap_or_else(|_| std::net::SocketAddr::from(([127, 0, 0, 1], 18182))); - - let api_addr: std::net::SocketAddr = config - .bind_addr - .parse() - .unwrap_or_else(|_| std::net::SocketAddr::from(([127, 0, 0, 1], 18180))); - - let local_info = NodeInfo::new(node_id, rpc_addr, api_addr); - let membership = Arc::new(SwimMembership::new(local_info, SwimConfig::default())); - - let seeds: Vec = std::env::var("STEMEDB_CLUSTER_SEEDS") - .unwrap_or_default() - .split(',') - .filter(|s| !s.trim().is_empty()) - .filter_map(|s| s.trim().parse().ok()) - .collect(); - - if !seeds.is_empty() { - if let Err(e) = membership.join(seeds).await { - warn!("Cluster join failed (continuing as solo node): {}", e); - } - } - - membership.start(); - info!( - node_id = %node_id.short_hex(), - rpc_addr = %rpc_addr, - "Cluster mode active" - ); - } - } - // Startup guard: unsafe skip + auth enabled is a fatal misconfiguration. if config.unsafe_skip_signatures && bootstrap::is_auth_enabled() { error!( diff --git a/crates/stemedb-api/src/routers.rs b/crates/stemedb-api/src/routers.rs index 243be33..e108558 100644 --- a/crates/stemedb-api/src/routers.rs +++ b/crates/stemedb-api/src/routers.rs @@ -8,12 +8,13 @@ //! - With Circuit Breaker (full protection stack) use axum::{ + http::{header, HeaderValue, Method}, routing::{get, post}, Router, }; use std::sync::Arc; use std::time::Duration; -use tower_http::cors::{Any, CorsLayer}; +use tower_http::cors::CorsLayer; use tower_http::limit::RequestBodyLimitLayer; use tower_http::timeout::TimeoutLayer; use tower_http::trace::TraceLayer; @@ -107,10 +108,7 @@ pub fn create_router(state: AppState) -> Router { /// Create the axum router with custom security configuration. pub fn create_router_config(state: AppState, security_config: SecurityConfig) -> Router { - let cors = CorsLayer::new() - .allow_origin(Any) // For development; restrict in production - .allow_methods(Any) - .allow_headers(Any); + let cors = build_cors_layer(); let api_router = build_api_routes(&security_config) .with_state(state) @@ -141,7 +139,7 @@ pub fn create_router_with_meter(state: AppState) -> Router { /// Create the axum router with economic throttling and custom security configuration. pub fn create_router_with_meter_config(state: AppState, security_config: SecurityConfig) -> Router { - let cors = CorsLayer::new().allow_origin(Any).allow_methods(Any).allow_headers(Any); + let cors = build_cors_layer(); let meter_layer = MeterLayer::new(Arc::clone(&state.quota_store)); let api_router = build_api_routes(&security_config) @@ -201,7 +199,7 @@ pub fn create_router_with_admission_config( state: AppState, security_config: SecurityConfig, ) -> Router { - let cors = CorsLayer::new().allow_origin(Any).allow_methods(Any).allow_headers(Any); + let cors = build_cors_layer(); let admission_layer = AdmissionLayer::new(Arc::clone(&state.admission_store)); let meter_layer = MeterLayer::new(Arc::clone(&state.quota_store)); @@ -261,7 +259,7 @@ pub fn create_router_with_auth_full_config( auth_config: ApiKeyAuthConfig, security_config: SecurityConfig, ) -> Router { - let cors = CorsLayer::new().allow_origin(Any).allow_methods(Any).allow_headers(Any); + let cors = build_cors_layer(); let api_key_layer = ApiKeyAuthLayer::with_config(Arc::clone(&state.api_key_store), auth_config); let api_router = build_api_routes(&security_config) @@ -301,7 +299,7 @@ pub fn create_router_full_protection_full_config( auth_config: ApiKeyAuthConfig, security_config: SecurityConfig, ) -> Router { - let cors = CorsLayer::new().allow_origin(Any).allow_methods(Any).allow_headers(Any); + let cors = build_cors_layer(); let api_key_layer = ApiKeyAuthLayer::with_config(Arc::clone(&state.api_key_store), auth_config); let circuit_breaker_layer = CircuitBreakerLayer::new(Arc::clone(&state.circuit_breaker_store)); let admission_layer = AdmissionLayer::new(Arc::clone(&state.admission_store)); @@ -361,7 +359,7 @@ pub fn create_router_with_circuit_breaker_config( state: AppState, security_config: SecurityConfig, ) -> Router { - let cors = CorsLayer::new().allow_origin(Any).allow_methods(Any).allow_headers(Any); + let cors = build_cors_layer(); let circuit_breaker_layer = CircuitBreakerLayer::new(Arc::clone(&state.circuit_breaker_store)); let admission_layer = AdmissionLayer::new(Arc::clone(&state.admission_store)); let meter_layer = MeterLayer::new(Arc::clone(&state.quota_store)); @@ -381,6 +379,24 @@ pub fn create_router_with_circuit_breaker_config( .merge(api_router) } +/// Builds a CORS layer from `STEMEDB_ALLOWED_ORIGINS` env var. +/// +/// If set, restricts origins to the comma-separated list. If unset or empty, +/// allows all origins (dev mode). +fn build_cors_layer() -> CorsLayer { + match std::env::var("STEMEDB_ALLOWED_ORIGINS") { + Ok(origins) if !origins.is_empty() => { + let allowed: Vec = + origins.split(',').filter_map(|s| s.trim().parse().ok()).collect(); + CorsLayer::new() + .allow_origin(allowed) + .allow_methods([Method::GET, Method::POST, Method::PUT, Method::DELETE]) + .allow_headers([header::CONTENT_TYPE, header::HeaderName::from_static("x-api-key")]) + } + _ => CorsLayer::permissive(), + } +} + /// Build the API routes without state or layers. /// /// This is an internal helper that defines all the routes and handlers. diff --git a/crates/stemedb-api/src/sync_storage_bridge.rs b/crates/stemedb-api/src/sync_storage_bridge.rs new file mode 100644 index 0000000..ba94911 --- /dev/null +++ b/crates/stemedb-api/src/sync_storage_bridge.rs @@ -0,0 +1,113 @@ +//! Bridge between the gRPC `SyncStorage` trait and stemedb-api's storage layer. +//! +//! This module allows the stemedb-api binary to host a gRPC sync server that +//! delegates to the same `HybridStore` and `MerkleTreeManager` used by the +//! main HTTP API. + +use async_trait::async_trait; +use std::sync::Arc; +use stemedb_merkle::Hash; +use stemedb_rpc::server::SyncStorage; +use stemedb_storage::{HybridStore, KVStore}; +use stemedb_sync::MerkleTreeManager; +use tracing::{debug, warn}; + +/// Bridges the gRPC `SyncStorage` trait to the stemedb-api storage layer. +pub struct SyncStorageBridge { + kv_store: Arc, + merkle_manager: Arc>, + local_node_id: [u8; 16], + rpc_addr: String, + api_addr: String, +} + +impl SyncStorageBridge { + /// Creates a new bridge. + pub fn new( + kv_store: Arc, + merkle_manager: Arc>, + local_node_id: [u8; 16], + rpc_addr: String, + api_addr: String, + ) -> Self { + Self { kv_store, merkle_manager, local_node_id, rpc_addr, api_addr } + } +} + +#[async_trait] +impl SyncStorage for SyncStorageBridge { + async fn store_gossip_assertion( + &self, + hash: [u8; 32], + data: Vec, + _hlc_time: u64, + _hlc_counter: u32, + _hlc_node_id: [u8; 16], + ) -> Result { + // Content-addressed key: store at "A:{hash}" + let key = format!("A:{}", ::hex::encode(hash)); + + // Check if already exists + match self.kv_store.get(key.as_bytes()).await { + Ok(Some(_)) => { + debug!(hash = %::hex::encode(&hash[..8]), "Gossip assertion already exists"); + return Ok(false); + } + Ok(None) => {} + Err(e) => return Err(format!("KV get failed: {e}")), + } + + // Store the assertion data + self.kv_store + .put(key.as_bytes(), &data) + .await + .map_err(|e| format!("KV put failed: {e}"))?; + + // Insert hash into Merkle tree + self.merkle_manager.insert(hash).await.map_err(|e| format!("Merkle insert failed: {e}"))?; + + Ok(true) + } + + async fn get_merkle_state(&self) -> Result<(Option<[u8; 32]>, u64), String> { + let root = + self.merkle_manager.root().await.map_err(|e| format!("Merkle root failed: {e}"))?; + let count = self.merkle_manager.len().await as u64; + Ok((root, count)) + } + + async fn fetch_assertions( + &self, + hashes: Vec<[u8; 32]>, + ) -> Result)>, String> { + let mut results = Vec::with_capacity(hashes.len()); + for hash in hashes { + let key = format!("A:{}", ::hex::encode(hash)); + match self.kv_store.get(key.as_bytes()).await { + Ok(Some(data)) => results.push((hash, data)), + Ok(None) => { + debug!(hash = %::hex::encode(&hash[..8]), "Assertion not found"); + } + Err(e) => { + warn!(error = %e, "Failed to fetch assertion"); + } + } + } + Ok(results) + } + + async fn get_node_info(&self) -> Result<([u8; 16], u64, String, String), String> { + let count = self.merkle_manager.len().await as u64; + Ok((self.local_node_id, count, self.rpc_addr.clone(), self.api_addr.clone())) + } + + async fn get_leaves(&self, max_leaves: u64) -> Result<(Vec, bool), String> { + let all_leaves = self.merkle_manager.leaves().await; + let cap = if max_leaves == 0 { 10000 } else { max_leaves.min(10000) as usize }; + if all_leaves.len() > cap { + Ok((all_leaves.into_iter().take(cap).collect(), true)) + } else { + Ok((all_leaves, false)) + } + } +} diff --git a/crates/stemedb-cluster/src/bin/node.rs b/crates/stemedb-cluster/src/bin/node.rs index 060ef80..f91411e 100644 --- a/crates/stemedb-cluster/src/bin/node.rs +++ b/crates/stemedb-cluster/src/bin/node.rs @@ -19,6 +19,7 @@ use std::net::SocketAddr; use std::sync::Arc; +use metrics_exporter_prometheus::PrometheusBuilder; use tracing::info; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -97,6 +98,12 @@ async fn main() -> Result<(), Box> { tracing_subscriber::registry().with(env_filter).with(tracing_subscriber::fmt::layer()).init(); + let prometheus_handle = Arc::new( + PrometheusBuilder::new() + .install_recorder() + .map_err(|e| format!("Failed to install Prometheus recorder: {e}"))?, + ); + let config = NodeConfig::from_env().await; // Use stable NodeId (env var → hostname → random fallback) @@ -121,6 +128,7 @@ async fn main() -> Result<(), Box> { // Join cluster (bootstrap if no seeds) membership.join(config.seed_nodes.clone()).await?; membership.start(); + membership.spawn_background_tasks(); info!( joined = membership.is_joined(), @@ -143,7 +151,8 @@ async fn main() -> Result<(), Box> { info!(shards = meta.num_shards(), version = meta.version, "Shard meta-range initialized"); // --- Gateway --- - let gateway = Gateway::new(Arc::clone(&router), Arc::clone(&membership), config.api_addr); + let gateway = Gateway::new(Arc::clone(&router), Arc::clone(&membership), config.api_addr) + .with_prometheus_handle(prometheus_handle); info!( addr = %config.api_addr, diff --git a/crates/stemedb-cluster/src/gateway/handlers/query_handlers.rs b/crates/stemedb-cluster/src/gateway/handlers/query_handlers.rs index d1c3aa3..82f61b4 100644 --- a/crates/stemedb-cluster/src/gateway/handlers/query_handlers.rs +++ b/crates/stemedb-cluster/src/gateway/handlers/query_handlers.rs @@ -1,6 +1,7 @@ //! Handlers for read operations (query, health, cluster status, routing). use axum::extract::{Query, State}; +use axum::http::HeaderMap; use axum::Json; use std::sync::Arc; use tracing::instrument; @@ -14,9 +15,10 @@ use super::types::{ApiError, ClusterStatusResponse, HealthResponse, NodeStatusIn /// /// Routes by subject hash to a replica (preferring local) and forwards the /// request via HTTP to that node's stemedb-api. -#[instrument(skip(state), fields(subject = %params.subject))] +#[instrument(skip(state, headers), fields(subject = %params.subject))] pub async fn handle_query( State(state): State>, + headers: HeaderMap, Query(params): Query, ) -> Result, ApiError> { state.inc_requests(); @@ -59,7 +61,11 @@ pub async fn handle_query( "Forwarding query to replica" ); - let response = http_client.get(&url).query(¶ms).send().await.map_err(|e| ApiError { + let mut builder = http_client.get(&url).query(¶ms); + if let Some(key) = headers.get("x-api-key") { + builder = builder.header("x-api-key", key.to_str().unwrap_or_default()); + } + let response = builder.send().await.map_err(|e| ApiError { code: "UNAVAILABLE".to_string(), message: format!("Forward to replica failed: {e}"), })?; diff --git a/crates/stemedb-cluster/src/gateway/handlers/write_handlers.rs b/crates/stemedb-cluster/src/gateway/handlers/write_handlers.rs index b952b08..ce63458 100644 --- a/crates/stemedb-cluster/src/gateway/handlers/write_handlers.rs +++ b/crates/stemedb-cluster/src/gateway/handlers/write_handlers.rs @@ -1,6 +1,7 @@ //! Handlers for write operations (assert, vote). use axum::extract::State; +use axum::http::HeaderMap; use axum::Json; use std::sync::Arc; use tracing::instrument; @@ -13,9 +14,10 @@ use super::types::{ApiError, CreateAssertionRequest, VoteRequest, VoteResponse}; /// /// Routes by subject hash to the shard leader and forwards the request via /// HTTP to that node's stemedb-api. Returns the response from the leader. -#[instrument(skip(state, req), fields(subject = %req.subject))] +#[instrument(skip(state, headers, req), fields(subject = %req.subject))] pub async fn handle_assert( State(state): State>, + headers: HeaderMap, Json(req): Json, ) -> Result, ApiError> { state.inc_requests(); @@ -53,7 +55,11 @@ pub async fn handle_assert( "Forwarding assertion to shard leader" ); - let response = http_client.post(&url).json(&req).send().await.map_err(|e| ApiError { + let mut builder = http_client.post(&url).json(&req); + if let Some(key) = headers.get("x-api-key") { + builder = builder.header("x-api-key", key.to_str().unwrap_or_default()); + } + let response = builder.send().await.map_err(|e| ApiError { code: "UNAVAILABLE".to_string(), message: format!("Forward to leader failed: {e}"), })?; @@ -78,9 +84,10 @@ pub async fn handle_assert( /// POST /v1/vote - Submit a vote. /// /// Routes to the shard leader for the assertion's subject and forwards via HTTP. -#[instrument(skip(state, req), fields(subject = %req.subject))] +#[instrument(skip(state, headers, req), fields(subject = %req.subject))] pub async fn handle_vote( State(state): State>, + headers: HeaderMap, Json(req): Json, ) -> Result, ApiError> { state.inc_requests(); @@ -118,7 +125,11 @@ pub async fn handle_vote( "Forwarding vote to shard leader" ); - let response = http_client.post(&url).json(&req).send().await.map_err(|e| ApiError { + let mut builder = http_client.post(&url).json(&req); + if let Some(key) = headers.get("x-api-key") { + builder = builder.header("x-api-key", key.to_str().unwrap_or_default()); + } + let response = builder.send().await.map_err(|e| ApiError { code: "UNAVAILABLE".to_string(), message: format!("Forward to leader failed: {e}"), })?; diff --git a/crates/stemedb-cluster/src/gateway/service.rs b/crates/stemedb-cluster/src/gateway/service.rs index 1d5c7b4..68530cd 100644 --- a/crates/stemedb-cluster/src/gateway/service.rs +++ b/crates/stemedb-cluster/src/gateway/service.rs @@ -3,7 +3,7 @@ //! The Gateway provides a stateless HTTP interface for clients, routing //! requests to the appropriate shard nodes based on subject hashing. -use axum::http::{header, Method}; +use axum::http::{header, HeaderValue, Method}; use axum::routing::{get, post}; use axum::{Extension, Router}; use dashmap::DashMap; @@ -128,11 +128,7 @@ impl Gateway { .route("/metrics", get(handlers::handle_metrics)) // Middleware .layer(TraceLayer::new_for_http()) - .layer( - CorsLayer::new() - .allow_methods([Method::GET, Method::POST]) - .allow_headers([header::CONTENT_TYPE]), - ) + .layer(build_cors_layer()) // State .with_state(self.state.clone()); @@ -175,6 +171,24 @@ impl Gateway { } } +/// Builds a CORS layer from `STEMEDB_ALLOWED_ORIGINS` env var. +/// +/// If set, restricts origins to the comma-separated list. If unset or empty, +/// allows all origins (dev mode). +fn build_cors_layer() -> CorsLayer { + match std::env::var("STEMEDB_ALLOWED_ORIGINS") { + Ok(origins) if !origins.is_empty() => { + let allowed: Vec = + origins.split(',').filter_map(|s| s.trim().parse().ok()).collect(); + CorsLayer::new() + .allow_origin(allowed) + .allow_methods([Method::GET, Method::POST, Method::PUT, Method::DELETE]) + .allow_headers([header::CONTENT_TYPE, header::HeaderName::from_static("x-api-key")]) + } + _ => CorsLayer::permissive(), + } +} + /// Builder for Gateway configuration. pub struct GatewayBuilder { router: Option>, diff --git a/crates/stemedb-cluster/src/membership/swim.rs b/crates/stemedb-cluster/src/membership/swim.rs index 8eaeddf..2bd569e 100644 --- a/crates/stemedb-cluster/src/membership/swim.rs +++ b/crates/stemedb-cluster/src/membership/swim.rs @@ -13,6 +13,7 @@ use parking_lot::RwLock; use rand::seq::SliceRandom; use std::collections::VecDeque; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::Arc; use std::time::Instant; use tokio::sync::broadcast; use tracing::{debug, info, instrument, warn}; @@ -130,16 +131,43 @@ impl SwimMembership { } }; - let ping = stemedb_rpc::proto::PingRequest { node_id: local_id.as_bytes().to_vec() }; + let ping = stemedb_rpc::proto::PingRequest { + node_id: local_id.as_bytes().to_vec(), + updates: Vec::new(), + }; match client.ping(ping).await { Ok(resp) => { - let seed_id_hex = hex::encode(&resp.node_id[..resp.node_id.len().min(4)]); - info!( - seed = %seed_addr, - seed_id = %seed_id_hex, - "Seed reachable, cluster join successful" - ); + if resp.node_id.len() >= 16 { + let mut seed_id_bytes = [0u8; 16]; + seed_id_bytes.copy_from_slice(&resp.node_id[..16]); + let seed_node_id = NodeId::from_bytes(seed_id_bytes); + + // Use addresses from PingResponse, falling back to seed_addr + let seed_rpc: std::net::SocketAddr = + resp.rpc_addr.parse().unwrap_or(*seed_addr); + let seed_api: std::net::SocketAddr = + resp.api_addr.parse().unwrap_or_else(|_| { + std::net::SocketAddr::new( + seed_addr.ip(), + seed_addr.port().saturating_sub(2), + ) + }); + + let seed_info = NodeInfo::new(seed_node_id, seed_rpc, seed_api); + self.alive_node(seed_node_id, seed_info); + + info!( + seed = %seed_addr, + seed_id = %seed_node_id.short_hex(), + "Registered seed in membership table" + ); + } else { + info!( + seed = %seed_addr, + "Seed reachable but returned short node_id" + ); + } contacted += 1; } Err(e) => { @@ -521,6 +549,186 @@ impl SwimMembership { self.running.store(true, Ordering::SeqCst); } + /// Spawns background SWIM protocol tasks (probe, suspicion check, gossip). + /// + /// Must be called after `start()` and `join()`. Spawns 3 tokio tasks that + /// run until `stop()` is called. + pub fn spawn_background_tasks(self: &Arc) { + let membership = Arc::clone(self); + let probe_interval = membership.config.probe_interval; + + // 1. Probe loop — pings a random member every probe_interval + let m = Arc::clone(&membership); + tokio::spawn(async move { + let mut ticker = tokio::time::interval(probe_interval); + loop { + ticker.tick().await; + if !m.is_running() { + break; + } + let target_id = match m.select_probe_target() { + Some(id) => id, + None => continue, + }; + let target_info = match m.get_member(target_id) { + Some(info) => info, + None => continue, + }; + let addr = format!("http://{}", target_info.rpc_addr); + let local_id = m.local_id(); + let gossip_batch = m.get_gossip_batch(5); + + let updates: Vec = gossip_batch + .iter() + .map(|entry| stemedb_rpc::proto::MembershipUpdate { + node_id: entry.node.id.as_bytes().to_vec(), + rpc_addr: entry.node.rpc_addr.to_string(), + api_addr: entry.node.api_addr.to_string(), + state: match entry.state { + NodeState::Alive => 0, + NodeState::Suspect => 1, + NodeState::Dead => 2, + NodeState::Left => 3, + }, + lamport_time: entry.lamport_time, + incarnation: entry.node.incarnation, + }) + .collect(); + + match stemedb_rpc::SyncClient::connect(&addr).await { + Ok(client) => { + let ping = stemedb_rpc::proto::PingRequest { + node_id: local_id.as_bytes().to_vec(), + updates, + }; + match client.ping(ping).await { + Ok(resp) => { + if resp.node_id.len() >= 16 { + let mut id_bytes = [0u8; 16]; + id_bytes.copy_from_slice(&resp.node_id[..16]); + let peer_id = NodeId::from_bytes(id_bytes); + let peer_rpc: std::net::SocketAddr = + resp.rpc_addr.parse().unwrap_or(target_info.rpc_addr); + let peer_api: std::net::SocketAddr = + resp.api_addr.parse().unwrap_or(target_info.api_addr); + m.alive_node( + peer_id, + NodeInfo::new(peer_id, peer_rpc, peer_api), + ); + } + // Process piggybacked membership updates + for update in resp.updates { + if update.node_id.len() >= 16 { + let mut id_bytes = [0u8; 16]; + id_bytes.copy_from_slice(&update.node_id[..16]); + let upd_id = NodeId::from_bytes(id_bytes); + if upd_id == m.local_id() { + continue; + } + let upd_rpc: std::net::SocketAddr = + match update.rpc_addr.parse() { + Ok(a) => a, + Err(_) => continue, + }; + let upd_api: std::net::SocketAddr = + match update.api_addr.parse() { + Ok(a) => a, + Err(_) => continue, + }; + let mut node_info = NodeInfo::new(upd_id, upd_rpc, upd_api); + node_info.incarnation = update.incarnation; + let state = match update.state { + 0 => NodeState::Alive, + 1 => NodeState::Suspect, + 2 => NodeState::Dead, + _ => NodeState::Left, + }; + let entry = MembershipEntry::new( + node_info, + state, + update.lamport_time, + ); + m.process_membership_update(entry); + } + } + } + Err(_) => { + m.suspect_node(target_id); + } + } + } + Err(_) => { + m.suspect_node(target_id); + } + } + } + }); + + // 2. Suspicion checker — promotes suspects to dead + let m = Arc::clone(&membership); + tokio::spawn(async move { + let mut ticker = tokio::time::interval(std::time::Duration::from_secs(1)); + loop { + ticker.tick().await; + if !m.is_running() { + break; + } + m.check_suspicion_timeouts(); + } + }); + + // 3. Gossip disseminator — sends batched updates to random peers + let m = Arc::clone(&membership); + let gossip_interval = membership.config.gossip_interval; + tokio::spawn(async move { + let mut ticker = tokio::time::interval(gossip_interval); + loop { + ticker.tick().await; + if !m.is_running() { + break; + } + let batch = m.get_gossip_batch(5); + if batch.is_empty() { + continue; + } + // Pick a random alive member to send gossip to + let target = match m.select_probe_target() { + Some(id) => id, + None => continue, + }; + let target_info = match m.get_member(target) { + Some(info) => info, + None => continue, + }; + let addr = format!("http://{}", target_info.rpc_addr); + let updates: Vec = batch + .iter() + .map(|entry| stemedb_rpc::proto::MembershipUpdate { + node_id: entry.node.id.as_bytes().to_vec(), + rpc_addr: entry.node.rpc_addr.to_string(), + api_addr: entry.node.api_addr.to_string(), + state: match entry.state { + NodeState::Alive => 0, + NodeState::Suspect => 1, + NodeState::Dead => 2, + NodeState::Left => 3, + }, + lamport_time: entry.lamport_time, + incarnation: entry.node.incarnation, + }) + .collect(); + + if let Ok(client) = stemedb_rpc::SyncClient::connect(&addr).await { + let ping = stemedb_rpc::proto::PingRequest { + node_id: m.local_id().as_bytes().to_vec(), + updates, + }; + let _ = client.ping(ping).await; + } + } + }); + } + /// Stops the background SWIM protocol tasks. pub fn stop(&self) { self.running.store(false, Ordering::SeqCst); diff --git a/crates/stemedb-cluster/src/sharding/manager.rs b/crates/stemedb-cluster/src/sharding/manager.rs index 4e255c8..e9bd5ac 100644 --- a/crates/stemedb-cluster/src/sharding/manager.rs +++ b/crates/stemedb-cluster/src/sharding/manager.rs @@ -320,6 +320,42 @@ impl RangeManager { Ok(()) } + /// Rebalances shards across all alive members. + /// + /// Redistributes shard replicas using round-robin across all alive nodes + /// (including self). Called when membership changes (node join/leave/fail). + #[instrument(skip(self))] + pub fn rebalance_shards(&self) -> Result<()> { + let mut node_ids: Vec = self.membership.members().iter().map(|n| n.id).collect(); + // Always include self + if !node_ids.contains(&self.local_node_id) { + node_ids.push(self.local_node_id); + } + // Sort for deterministic ordering across all nodes + node_ids.sort_by_key(|id| *id.as_bytes()); + + if node_ids.is_empty() { + warn!("No nodes for rebalance"); + return Ok(()); + } + + let meta = MetaRange::with_initial_shards( + self.config.num_shards, + &node_ids, + self.config.replication_factor, + ); + + self.router.update_meta_range(meta); + + info!( + alive_nodes = node_ids.len(), + num_shards = self.config.num_shards, + "Rebalanced shards" + ); + + Ok(()) + } + /// Computes the midpoint key for splitting a range. fn compute_midpoint(&self, desc: &RangeDescriptor) -> Vec { // If we have concrete bounds, compute actual midpoint diff --git a/crates/stemedb-rpc/proto/sync.proto b/crates/stemedb-rpc/proto/sync.proto index 2400653..a003078 100644 --- a/crates/stemedb-rpc/proto/sync.proto +++ b/crates/stemedb-rpc/proto/sync.proto @@ -93,6 +93,9 @@ message AssertionData { message PingRequest { // Sender's node ID (16 bytes) bytes node_id = 1; + + // Piggybacked SWIM membership gossip updates + repeated MembershipUpdate updates = 2; } message PingResponse { @@ -101,6 +104,36 @@ message PingResponse { // Number of assertions on this node uint64 assertion_count = 2; + + // Piggybacked SWIM membership gossip updates + repeated MembershipUpdate updates = 3; + + // Responder's RPC address + string rpc_addr = 4; + + // Responder's API address + string api_addr = 5; +} + +// MembershipUpdate carries SWIM gossip state for a single node. +message MembershipUpdate { + // Node ID (16 bytes) + bytes node_id = 1; + + // RPC address (e.g., "10.0.0.1:18182") + string rpc_addr = 2; + + // API address (e.g., "10.0.0.1:18180") + string api_addr = 3; + + // Node state: 0=Alive, 1=Suspect, 2=Dead, 3=Left + uint32 state = 4; + + // Lamport clock time + uint64 lamport_time = 5; + + // Incarnation number for crashing/rejoining detection + uint64 incarnation = 6; } // GetLeavesRequest requests all Merkle tree leaf hashes. diff --git a/crates/stemedb-rpc/src/server.rs b/crates/stemedb-rpc/src/server.rs index 0b3f0d1..5f78bf7 100644 --- a/crates/stemedb-rpc/src/server.rs +++ b/crates/stemedb-rpc/src/server.rs @@ -57,8 +57,8 @@ pub trait SyncStorage: Send + Sync + 'static { hashes: Vec<[u8; 32]>, ) -> Result)>, String>; - /// Get this node's ID and assertion count for ping response. - async fn get_node_info(&self) -> Result<([u8; 16], u64), String>; + /// Get this node's ID, assertion count, and addresses for ping response. + async fn get_node_info(&self) -> Result<([u8; 16], u64, String, String), String>; /// Get all Merkle tree leaf hashes. /// @@ -229,12 +229,18 @@ impl SyncService for SyncServiceHandler { #[instrument(skip(self, _request))] async fn ping(&self, _request: Request) -> Result, Status> { - let (node_id, assertion_count) = + let (node_id, assertion_count, rpc_addr, api_addr) = self.storage.get_node_info().await.map_err(Status::internal)?; debug!(assertion_count, "Responding to ping"); - Ok(Response::new(PingResponse { node_id: node_id.to_vec(), assertion_count })) + Ok(Response::new(PingResponse { + node_id: node_id.to_vec(), + assertion_count, + updates: Vec::new(), + rpc_addr, + api_addr, + })) } #[instrument(skip(self, request), fields(max_leaves = request.get_ref().max_leaves))] @@ -291,8 +297,13 @@ mod tests { Ok(hashes.into_iter().map(|h| (h, vec![1, 2, 3])).collect()) } - async fn get_node_info(&self) -> Result<([u8; 16], u64), String> { - Ok((self.node_id, self.assertion_count)) + async fn get_node_info(&self) -> Result<([u8; 16], u64, String, String), String> { + Ok(( + self.node_id, + self.assertion_count, + "127.0.0.1:18182".to_string(), + "127.0.0.1:18180".to_string(), + )) } async fn get_leaves(&self, max_leaves: u64) -> Result<(Vec<[u8; 32]>, bool), String> { @@ -310,11 +321,13 @@ mod tests { let storage = Arc::new(MockStorage { node_id: [42u8; 16], assertion_count: 100 }); let handler = SyncServiceHandler::new(storage); - let request = Request::new(PingRequest { node_id: vec![1u8; 16] }); + let request = Request::new(PingRequest { node_id: vec![1u8; 16], updates: Vec::new() }); let response = handler.ping(request).await.expect("ping should succeed"); assert_eq!(response.get_ref().node_id, vec![42u8; 16]); assert_eq!(response.get_ref().assertion_count, 100); + assert_eq!(response.get_ref().rpc_addr, "127.0.0.1:18182"); + assert_eq!(response.get_ref().api_addr, "127.0.0.1:18180"); } #[tokio::test] diff --git a/docs/operations/README.md b/docs/operations/README.md index 342bf10..08578b3 100644 --- a/docs/operations/README.md +++ b/docs/operations/README.md @@ -131,4 +131,4 @@ Submit pull requests to keep this guide current and valuable. --- -**Last Updated:** 2026-03-02 +**Last Updated:** 2026-03-07 diff --git a/docs/operations/deployment/k8s-deploy-roadmap.md b/docs/operations/deployment/k8s-deploy-roadmap.md index 9cd81c9..38703ff 100644 --- a/docs/operations/deployment/k8s-deploy-roadmap.md +++ b/docs/operations/deployment/k8s-deploy-roadmap.md @@ -46,29 +46,33 @@ all probes), ClusterIP Service, Traefik Ingress at `stemedb.threesix.ai`. --- -### ~~5. Image registry — k3s cannot pull without a registry~~ ✅ RESOLVED (2026-03-02) +### ~~5. Image registry — k3s cannot pull without a registry~~ ✅ RESOLVED (2026-03-07) -Registry confirmed: `us-central1-docker.pkg.dev/orchard9/docker-images/` (GAR). -`imagePullSecrets: gcr-secret` wired in Deployment. Dockerfile updated with `--features aphoria`. +Registry: `registry.threesix.ai` (Zot OCI registry on k3s). Woodpecker CI pipeline (`.woodpecker.yml`) +builds via Kaniko and pushes automatically on every merge to main. No manual docker build needed. -**Remaining manual step:** `docker build && docker push` to populate the image. +**Image:** `registry.threesix.ai/stemedb-api:latest` (also tagged with short commit SHA) --- ## Pre-Deploy Checklist (Manual Steps Before `kubectl apply`) -```bash -# 1. Build and push image (from stemedb repo root) -docker build -t us-central1-docker.pkg.dev/orchard9/docker-images/stemedb-api:latest . -docker push us-central1-docker.pkg.dev/orchard9/docker-images/stemedb-api:latest +> **Note:** Image builds are now automated via Woodpecker CI. Push to `main` → Kaniko builds → +> pushes to `registry.threesix.ai` → `kubectl set image` on StatefulSet. Manual steps below are +> only needed for first-time setup. -# 2. Create root API key in GCP Secret Manager +```bash +# 1. Image builds are automatic (Woodpecker CI). For manual builds: +docker build --platform linux/amd64 -t registry.threesix.ai/stemedb-api:latest . +docker push registry.threesix.ai/stemedb-api:latest + +# 2. Create root API key in GCP Secret Manager (first deploy only) ROOT_KEY="steme_live_$(openssl rand -hex 24)" echo "Root key: $ROOT_KEY" # Save this — needed for provision-project-keys.sh echo -n "$ROOT_KEY" | gcloud secrets create stemedb-root-api-key \ --project=orchard9 --replication-policy=automatic --data-file=- -# 3. Add DNS: stemedb.threesix.ai → Traefik LB IP (Cloudflare) +# 3. Add DNS: stemedb.threesix.ai → Traefik LB IP (Cloudflare) — already done ``` --- @@ -310,7 +314,7 @@ curl https://stemedb.yourdomain.com/v1/health --- -## Phase 1 Checklist (Week 1 — Gate: First Project Can Connect) +## Phase 1 Checklist (Week 1 — Gate: First Project Can Connect) ✅ COMPLETE | # | Task | File(s) | Status | |---|------|---------|--------| @@ -320,28 +324,32 @@ curl https://stemedb.yourdomain.com/v1/health | 4 | Add `--features aphoria` to Dockerfile | `Dockerfile` | ✅ Done | | 5 | Create k8s manifests | `k3s-fleet/.../stemedb/` | ✅ Done | | 6 | Write `scripts/provision-project-keys.sh` | `scripts/` | ✅ Done | -| 7 | Build + push Docker image | GAR | ⏳ Manual | -| 8 | Store root API key in GCP Secret Manager | GCP Console | ⏳ Manual | -| 9 | Add DNS record: `stemedb.threesix.ai` | Cloudflare | ⏳ Manual | -| 10 | Deploy to k3s + smoke test | k3s-fleet | ⏳ Pending | +| 7 | Build + push Docker image | Woodpecker CI → Zot registry | ✅ Done (automated) | +| 8 | Store root API key in GCP Secret Manager | GCP Console | ✅ Done | +| 9 | Add DNS record: `stemedb.threesix.ai` | Cloudflare | ✅ Done | +| 10 | Deploy to k3s + smoke test | k3s-fleet | ✅ Done | +| 11 | Upgrade to 3-node StatefulSet | `stemedb.yaml` | ✅ Done (2026-03-07) | +| 12 | Woodpecker CI/CD pipeline | `.woodpecker.yml` | ✅ Done | **Gate test (run after deploy):** ```bash -# Health check +# Health check (routes through Gateway on :18181) curl https://stemedb.threesix.ai/v1/health +# Direct API health on each pod (port-forward to :18180) +kubectl port-forward pod/stemedb-0 18180:18180 -n stemedb & +curl http://127.0.0.1:18180/v1/health + # Unauthenticated write → 401 curl -s -o /dev/null -w "%{http_code}" -X POST \ https://stemedb.threesix.ai/v1/assert -H "Content-Type: application/json" -d '{}' -# Authenticated write → 200/201 -curl -X POST https://stemedb.threesix.ai/v1/assert \ - -H "X-API-Key: $ROOT_KEY" -H "Content-Type: application/json" \ - -d '{"subject":"test/ping","predicate":"alive","value":true,"agent_id":"test"}' +# Cluster status +curl https://stemedb.threesix.ai/v1/cluster/status -# Confirm key persists across restart -kubectl rollout restart deployment/stemedb-api -n stemedb -kubectl rollout status deployment/stemedb-api -n stemedb --timeout=120s +# Confirm pods survive rolling restart +kubectl rollout restart statefulset/stemedb -n stemedb +kubectl rollout status statefulset/stemedb -n stemedb --timeout=300s curl https://stemedb.threesix.ai/v1/health ``` @@ -646,24 +654,24 @@ restarts (Recreate strategy = brief downtime), Aphoria should retry rather than | Item | Why not | |------|---------| -| HPA | StemeDB is stateful (embedded KV). Cannot scale horizontally. | -| mTLS between pods | Single service. Add when you have a second service. | +| HPA | StemeDB is stateful (embedded KV). StatefulSet replicas are fixed at 3. | +| mTLS between pods | Internal cluster traffic is on private network. Add when exposing cross-cluster. | | WAF | Body limits + Traefik rate limit + circuit breaker is sufficient for 100 known projects. | | Per-tenant namespaces | Multiplies operational surface 100x. API key isolation is the right model. | -| Multi-region / clustering | 3-node k3s + Longhorn 2-replica is your HA story. P6 in roadmap. | +| ~~Multi-region / clustering~~ | ✅ 3-node cluster deployed. Next: full SWIM inter-node connectivity. | | PITR with WAL timestamps | 6-hour backup RPO is acceptable for pilot. Improve later. | | Secrets rotation automation | Manual rotation via `/v1/admin/api-keys/:hash/rotate` is fine for 100 projects. | | Distributed tracing | You have one service. WAL fsync histogram covers what you need. | --- -## Open Questions (Resolve Week 1) +## Open Questions (Resolved) -1. **Image registry**: Which registry does k3s-fleet already use? Check `get_service_config()` in `deploy-stack.sh`. -2. **Bootstrap key API**: Verify exact method signatures on `ApiKeyStore` before writing the seed logic in `main.rs`. -3. **Aphoria scan model**: Do projects run `aphoria scan` locally (calling remote StemeDB) or as a k8s Job? Determines where retry logic lives. -4. **GCS bucket**: Does one exist for backups, or does it need to be created? -5. **CORS**: All router variants in `routers.rs` use `allow_origin(Any)`. Production needs this restricted to Traefik's internal domain. Add `STEMEDB_ALLOWED_ORIGINS` env var support. +1. ~~**Image registry**~~: ✅ Zot OCI registry at `registry.threesix.ai` on k3s. Woodpecker CI pushes automatically. +2. ~~**Bootstrap key API**~~: ✅ `bootstrap::bootstrap_root_api_key()` wired in main.rs. +3. **Aphoria scan model**: Projects run `aphoria scan --persist` locally, calling remote StemeDB. Retry logic lives in Aphoria binary. +4. **GCS bucket**: Needs to be created for backups (Phase 2). +5. **CORS**: All router variants use `allow_origin(Any)`. Restrict before public launch. --- @@ -672,34 +680,31 @@ restarts (Recreate strategy = brief downtime), Aphoria should retry rather than | Risk | Likelihood | Mitigation | |------|-----------|-----------| | Longhorn fsync latency at 100-project burst | Medium | Pin pod + volume to same node (Phase 3), `dataLocality: bestEffort`; monitor WAL p99 from day 1 | -| Single-instance downtime during deploys | High (Recreate strategy) | Startup probe + maintenance window policy + Aphoria retry logic | +| Rolling restart brief downtime | Medium (StatefulSet rolls one pod at a time) | 3 replicas + readiness probe; Gateway routes to healthy pods | | Fresh PVC after disaster = 100 project keys lost | Low but catastrophic | Bootstrap key seed in `main.rs` + `provision-project-keys.sh` idempotent re-run | -| Image registry blocker | High if unresolved | Resolve Day 1; entire deployment depends on it | +| ~~Image registry blocker~~ | ✅ Resolved | Zot registry on k3s, Woodpecker CI automates builds | | CORS vulnerability | Medium | `allow_origin(Any)` in all router variants; fix before public launch | --- -## Directory Structure After Phase 1 +## Directory Structure (Current) ``` -deployments/ -└── k8s/ - └── base/ - └── stemedb/ - ├── kustomization.yaml - ├── namespace.yaml - ├── pvc.yaml - ├── deployment.yaml - ├── service.yaml - ├── ingress.yaml - ├── middleware.yaml - └── external-secret.yaml +# k3s-fleet repo +k3s-fleet/deployments/k8s/base/stemedb/ +└── stemedb.yaml # All-in-one: ExternalSecret, headless Service, + # gateway Service, 3-replica StatefulSet, Ingress +# stemedb repo scripts/ -└── provision-project-keys.sh (new) +├── entrypoint.sh # Dual-binary launcher (cluster mode) +└── provision-project-keys.sh + +.woodpecker.yml # CI/CD: Kaniko → Zot registry → kubectl deploy +Dockerfile # Multi-stage: builds stemedb-api + stemedb-node ``` -After Phase 2, add to `deployments/k8s/base/stemedb/`: +After Phase 2 hardening, add to `k3s-fleet/.../stemedb/`: - `backup-cronjob.yaml` - `service-monitor.yaml` - `alert-rules.yaml` @@ -708,4 +713,4 @@ After Phase 2, add to `deployments/k8s/base/stemedb/`: --- -*Last updated: 2026-03-02 — Week 1 code changes complete; 3 manual steps remain before deploy* +*Last updated: 2026-03-07 — Phase 1 complete, 3-node StatefulSet deployed with Woodpecker CI/CD* diff --git a/docs/operations/reference-architecture/three-node-cluster.md b/docs/operations/reference-architecture/three-node-cluster.md index 6c12d1f..0b01e36 100644 --- a/docs/operations/reference-architecture/three-node-cluster.md +++ b/docs/operations/reference-architecture/three-node-cluster.md @@ -114,11 +114,11 @@ Internet ──→ LB → │ Cluster Gateway (port 18181) │ ### Node Layout -Each node runs the full stack: -- **stemedb-api** (port 18180) - HTTP API, queries, ingest -- **stemedb-gateway** (port 18181) - Cluster coordination -- **stemedb-rpc** (port 18182) - gRPC replication -- **SWIM gossip** (port 18183) - Membership, failure detection +Each pod runs two binaries via `scripts/entrypoint.sh`: +- **stemedb-api** (port 18180) — HTTP API, WAL, KV store, ingestion, query +- **stemedb-node** (port 18181) — Cluster Gateway HTTP (client-facing, routes by shard) +- **stemedb-node** (port 18182) — gRPC (node-to-node sync) +- **stemedb-node** (port 18183) — SWIM gossip (membership, failure detection) ### Replication @@ -142,39 +142,40 @@ Each node runs the full stack: --- -## k3s Deployment Path (Current — Longhorn + StatefulSet) +## k3s Deployment (Current — StatefulSet + Longhorn) -> This is the **current production deployment path** for k3s-fleet. The bare-metal steps below -> are for non-k8s environments and use a config.toml interface that is not yet wired to the binary. +> This is the **current production deployment** on k3s-fleet. Deployed and running as of 2026-03-07. +> The bare-metal steps below are for non-k8s environments and use a config.toml interface that is +> not yet wired to the binary. -For each cluster node on k3s, deploy a separate StatefulSet with its own Longhorn PVC: +A single 3-replica StatefulSet with `VolumeClaimTemplates` provides each pod its own 50Gi Longhorn PVC: ``` k3s-fleet/deployments/k8s/base/stemedb/ -├── stemedb.yaml # Node A (current single-node — Phase 1) -├── stemedb-b.yaml # Node B (Phase 2 — add when ready to scale reads) -├── stemedb-c.yaml # Node C (Phase 2) -└── kustomization.yaml +└── stemedb.yaml # Everything: ExternalSecret, headless Service, gateway Service, + # 3-replica StatefulSet, Ingress ``` -**Critical k3s constraints:** -- Each node needs its own `ReadWriteOnce` Longhorn PVC — embedded KV (fjall) cannot share a volume -- Use `strategy: Recreate` on each Deployment (not RollingUpdate) — RWO PVC + 2 pods = deadlock -- Cluster gateway (port 18181) must be exposed as a separate Service for inter-node routing -- Use `topologySpreadConstraints` to ensure nodes land on different k3s worker hosts +**Architecture per pod (dual-binary via entrypoint.sh):** +- `stemedb-api` (:18180) — storage engine (WAL + KV) +- `stemedb-node` (:18181) — Gateway HTTP (client-facing, cluster routing) +- `stemedb-node` (:18182) — gRPC (node-to-node sync) +- `stemedb-node` (:18183) — SWIM gossip (membership) -**Phase 2 read-replica k8s addition (when ready):** -```yaml -# Add to stemedb-b.yaml — identical to stemedb.yaml except: -# - Different node ID env var -# - STEMEDB_CLUSTER_SEEDS pointing to Node A's gateway ClusterIP -# - Its own PVC claim -env: - - name: STEMEDB_NODE_ID - value: "node-b" - - name: STEMEDB_CLUSTER_SEEDS - value: "stemedb-api.stemedb.svc:18181" -``` +**Pod DNS:** `stemedb-{0,1,2}.stemedb-headless.stemedb.svc.cluster.local` + +**Key k8s resources:** +- **Headless Service** (`stemedb-headless`) — stable DNS for all 4 ports +- **Gateway Service** (`stemedb-gateway`) — ClusterIP routing to Gateway port 18181 +- **Ingress** — `stemedb.threesix.ai` → `stemedb-gateway:18181` via Traefik + +**Environment variables (set in StatefulSet):** +- `STEMEDB_CLUSTER_MODE=true` — starts both binaries via `entrypoint.sh` +- `STEMEDB_NODE_ID` — from `metadata.name` (stemedb-0, stemedb-1, stemedb-2) → stable NodeId via BLAKE3 +- `STEMEDB_SEED_NODES` — all 3 headless DNS names on port 18182 +- `STEMEDB_NUM_SHARDS=4`, `STEMEDB_REPLICATION_FACTOR=2` + +**CI/CD:** Woodpecker pipeline (`.woodpecker.yml`) → Kaniko builds → Zot registry (`registry.threesix.ai`) → `kubectl set image statefulset/stemedb` **See:** [k8s Deploy Roadmap](../deployment/k8s-deploy-roadmap.md) for the phased rollout plan. @@ -486,8 +487,8 @@ Three nodes on k3s handles the 100-project target. For mass traffic beyond that, | Phase | Target | Work type | What changes | |-------|--------|-----------|-------------| | **Phase 1** | 1 node, 100 projects | ✅ Done | Single Deployment, Longhorn PVC, auth wired | -| **Phase 2** | 3 nodes, read-scaled | Ops-heavy | Add 2 read replicas as separate Deployments; cluster gateway routes reads round-robin | -| **Phase 3** | 3 nodes, write-sharded | Code-heavy | Gateway enforces shard ownership; each node owns ⅓ of subject hash space; reads still any-node | +| **Phase 2** | 3 nodes, cluster mode | ✅ Done | 3-replica StatefulSet, dual-binary, SWIM, Gateway routing, Woodpecker CI | +| **Phase 3** | 3 nodes, full replication | Code-heavy | SWIM inter-node connectivity, Gateway HTTP forwarding, Merkle anti-entropy | | **Phase 4** | N nodes, coordinator | Code-heavy | Designate one node (or small 3-node Raft group) exclusively for mutable admin state; assertion nodes are pure data | **What you do NOT need:** Raft on the assertion write path. The append-only, content-addressed design means there are no write conflicts to serialize. Raft belongs only on the mutable admin state path (Phase 4), which is a small fraction of total traffic. @@ -504,4 +505,4 @@ Three nodes on k3s handles the 100-project target. For mass traffic beyond that, --- -**Last Updated:** 2026-03-02 — Added architectural rationale (gossip vs Raft), k3s deployment path, fixed mutable-state coordination notes, added 4-phase scaling table +**Last Updated:** 2026-03-07 — Updated k3s section to match actual 3-replica StatefulSet deployment, dual-binary architecture, Woodpecker CI pipeline diff --git a/scripts/entrypoint.sh b/scripts/entrypoint.sh index 629aadc..0b045d1 100644 --- a/scripts/entrypoint.sh +++ b/scripts/entrypoint.sh @@ -1,37 +1,6 @@ #!/bin/bash -# StemeDB cluster entrypoint — runs both stemedb-api (storage) and stemedb-node (gateway/SWIM). +# StemeDB entrypoint — single binary handles both standalone and cluster mode. # -# In single-node mode (STEMEDB_CLUSTER_MODE unset or "false"), only stemedb-api runs. -# In cluster mode (STEMEDB_CLUSTER_MODE=true), both binaries run side-by-side. - -set -e - -CLUSTER_MODE="${STEMEDB_CLUSTER_MODE:-false}" - -if [ "$CLUSTER_MODE" = "true" ] || [ "$CLUSTER_MODE" = "1" ]; then - echo "Starting StemeDB in cluster mode" - - # Start stemedb-api in background (storage engine on :18180) - stemedb-api & - API_PID=$! - - # Wait briefly for API to bind before starting the gateway - sleep 1 - - # Start stemedb-node in foreground (gateway :18181, gRPC :18182, SWIM :18183) - stemedb-node & - NODE_PID=$! - - # Trap signals to shut down both processes - trap 'kill $API_PID $NODE_PID 2>/dev/null; wait' TERM INT - - # Wait for either process to exit — if one dies, kill both - wait -n $API_PID $NODE_PID 2>/dev/null || true - EXIT_CODE=$? - kill $API_PID $NODE_PID 2>/dev/null || true - wait - exit $EXIT_CODE -else - echo "Starting StemeDB in single-node mode" - exec stemedb-api "$@" -fi +# In cluster mode (STEMEDB_CLUSTER_MODE=true), stemedb-api starts the Gateway, +# gRPC server, and SWIM membership internally alongside the HTTP API. +exec stemedb-api "$@"