feat: complete cluster integration — SWIM gossip, gRPC server, shard rebalancing, single binary
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed

8-task cluster completion bringing 3-replica StatefulSet from isolated
nodes to fully functional cluster:

1. Fix Gateway /metrics 500 (wire PrometheusHandle)
2. gRPC server + SWIM background tasks (probe, suspicion, gossip dissemination)
3. join() registers peers in membership table via PingResponse fields
4. Shard rebalancing on membership changes (deterministic round-robin)
5. API cluster wiring (DNS resolution, Gateway, gRPC, gossip broadcaster)
6. Single binary merge (stemedb-api --features cluster replaces stemedb-node)
7. Auth header forwarding (X-API-Key passed through Gateway to backends)
8. CORS restriction (STEMEDB_ALLOWED_ORIGINS env var, permissive fallback)
This commit is contained in:
jordan 2026-03-07 15:09:29 -07:00
parent 9afb36078c
commit 6c6ee04e9c
21 changed files with 831 additions and 232 deletions

View File

@ -488,29 +488,39 @@ All production infra is under the `jordan@roamrhino.com` Google account, GCP pro
| Service | URL |
|---------|-----|
| StemeDB API | `https://stemedb.threesix.ai` |
| StemeDB internal | `http://stemedb-api.stemedb.svc:18180` |
| StemeDB API (external) | `https://stemedb.threesix.ai` (→ Gateway :18181) |
| StemeDB Gateway (internal) | `http://stemedb-gateway.stemedb.svc:18181` |
| StemeDB API (per-pod) | `http://stemedb-{0,1,2}.stemedb-headless.stemedb.svc:18180` |
### Deployment Workflow
```bash
# 1. Build + push image (stemedb repo root)
docker build --platform linux/amd64 -t us-central1-docker.pkg.dev/orchard9/docker-images/stemedb-api:latest .
docker push us-central1-docker.pkg.dev/orchard9/docker-images/stemedb-api:latest
**Automated (normal):** Push to `main` → Woodpecker CI → Kaniko build → Zot registry → `kubectl set image statefulset/stemedb`
# 2. Add/update DNS A record (get Traefik IP first)
```bash
# Automated deploy: just push to main
git push origin main
# Woodpecker pipeline handles: build → registry.threesix.ai → kubectl rollout
# Manual deploy (if needed):
docker build --platform linux/amd64 -t registry.threesix.ai/stemedb-api:latest .
docker push registry.threesix.ai/stemedb-api:latest
kubectl --kubeconfig ~/.kube/orchard9-k3sf.yaml set image statefulset/stemedb \
stemedb=registry.threesix.ai/stemedb-api:latest -n stemedb
kubectl --kubeconfig ~/.kube/orchard9-k3sf.yaml rollout status statefulset/stemedb -n stemedb --timeout=300s
# First-time setup only:
# 1. Create GCP secret
echo -n "steme_live_$(openssl rand -hex 24)" | \
gcloud secrets create stemedb-root-api-key --project=orchard9 \
--replication-policy=automatic --data-file=-
# 2. Apply k8s manifests
kubectl apply -k /Users/jordanwashburn/Workspace/orchard9/k3s-fleet/deployments/k8s/base/stemedb/
# 3. Add DNS A record (Cloudflare)
TRAEFIK_IP=$(kubectl get svc -n kube-system traefik -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
curl -X POST "https://api.cloudflare.com/client/v4/zones/$THREESIX_CLOUDFLARE_ZONE_ID/dns_records" \
-H "Authorization: Bearer $THREESIX_CLOUDFLARE_API_TOKEN" \
-H "Content-Type: application/json" \
-d "{\"type\":\"A\",\"name\":\"stemedb\",\"content\":\"$TRAEFIK_IP\",\"ttl\":1,\"proxied\":false}"
# 3. Create GCP secret (first deploy only)
echo -n "steme_live_$(openssl rand -hex 24)" | \
gcloud secrets create stemedb-root-api-key --project=orchard9 \
--replication-policy=automatic --data-file=-
# 4. Deploy
kubectl apply -k /Users/jordanwashburn/Workspace/orchard9/k3s-fleet/deployments/k8s/base/stemedb/
kubectl rollout status deployment/stemedb-api -n stemedb --timeout=120s
```

5
Cargo.lock generated
View File

@ -3750,6 +3750,7 @@ name = "stemedb-api"
version = "0.1.0"
dependencies = [
"aphoria",
"async-trait",
"axum",
"axum-server",
"base64 0.22.1",
@ -3771,17 +3772,21 @@ dependencies = [
"stemedb-core",
"stemedb-ingest",
"stemedb-lens",
"stemedb-merkle",
"stemedb-query",
"stemedb-rpc",
"stemedb-storage",
"stemedb-sync",
"stemedb-wal",
"tempfile",
"thiserror 1.0.69",
"tokio",
"tonic",
"tower 0.4.13",
"tower-http 0.5.2",
"tracing",
"tracing-subscriber",
"uhlc",
"utoipa",
"utoipa-axum",
"utoipa-swagger-ui",

View File

@ -42,9 +42,9 @@ RUN cargo chef cook --release --recipe-path recipe.json
# Inherits compiled deps from cacher; only workspace source is compiled here.
FROM cacher AS builder
COPY . .
RUN cargo build --release -p stemedb-api -p stemedb-cluster
RUN cargo build --release -p stemedb-api --features cluster
# Strip debug symbols before copying to runtime image
RUN strip target/release/stemedb-api target/release/stemedb-node
RUN strip target/release/stemedb-api
# Stage 4: Runtime — minimal production image
FROM debian:bookworm-slim AS runtime
@ -59,7 +59,6 @@ RUN apt-get update && \
RUN useradd --system --no-create-home --shell /bin/false stemedb
COPY --from=builder /app/target/release/stemedb-api /usr/local/bin/stemedb-api
COPY --from=builder /app/target/release/stemedb-node /usr/local/bin/stemedb-node
COPY scripts/entrypoint.sh /usr/local/bin/entrypoint.sh
RUN chmod +x /usr/local/bin/entrypoint.sh && \

View File

@ -615,7 +615,6 @@ mod tests {
offline_fallback: OfflineFallback::Skip,
max_retries: 3,
retry_delay_ms: 1000,
team_id: None,
};
let result = RemoteClaimStore::new(&config);

View File

@ -10,7 +10,7 @@ workspace = true
[features]
default = ["aphoria"]
aphoria = ["dep:aphoria"]
cluster = ["dep:stemedb-cluster", "dep:stemedb-sync"]
cluster = ["dep:stemedb-cluster", "dep:stemedb-sync", "dep:stemedb-rpc", "dep:stemedb-merkle", "dep:tonic", "dep:uhlc", "dep:async-trait"]
[dependencies]
stemedb-core = { path = "../stemedb-core" }
@ -26,6 +26,11 @@ aphoria = { path = "../../applications/aphoria", optional = true }
# Optional: Multi-node cluster participation
stemedb-cluster = { path = "../stemedb-cluster", optional = true }
stemedb-sync = { path = "../stemedb-sync", optional = true }
stemedb-rpc = { path = "../stemedb-rpc", optional = true }
stemedb-merkle = { path = "../stemedb-merkle", optional = true }
tonic = { version = "0.12", optional = true }
uhlc = { version = "0.7", optional = true }
async-trait = { version = "0.1", optional = true }
axum = { version = "0.7", features = ["json"] }
axum-server = { version = "0.7", features = ["tls-rustls"] }

View File

@ -42,6 +42,8 @@ pub mod scan_cache;
pub mod services;
pub mod state;
pub mod store_helpers;
#[cfg(feature = "cluster")]
pub mod sync_storage_bridge;
use utoipa::OpenApi;

View File

@ -169,11 +169,207 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.transpose()?;
let state = AppState::new(write_journal, read_journal, Arc::clone(&store), corpus_store);
// Bootstrap root API key from env (idempotent: no-op if key already exists).
if let Err(e) = bootstrap::bootstrap_root_api_key(&*state.api_key_store).await {
error!("Failed to bootstrap root API key: {}", e);
std::process::exit(1);
}
// Cluster mode: join SWIM membership, start gRPC server, Gateway, gossip, and anti-entropy.
// Requires the `cluster` feature to be enabled at compile time.
#[cfg(feature = "cluster")]
let cluster_gossip: Option<Arc<dyn stemedb_ingest::gossip::GossipBroadcast>> = {
let cluster_mode = std::env::var("STEMEDB_CLUSTER_MODE")
.map(|v| v.to_lowercase() == "true" || v == "1")
.unwrap_or(false);
if cluster_mode {
use stemedb_cluster::{
stable_node_id, Gateway, NodeInfo, RangeManager, RangeRouter, ShardingConfig,
SwimConfig, SwimMembership,
};
use stemedb_rpc::proto::sync_service_server::SyncServiceServer;
use stemedb_rpc::SyncServiceHandler;
let node_id = stable_node_id();
let rpc_addr: SocketAddr = std::env::var("STEMEDB_NODE_RPC_ADDR")
.unwrap_or_else(|_| "0.0.0.0:18182".to_string())
.parse()
.unwrap_or_else(|_| SocketAddr::from(([0, 0, 0, 0], 18182)));
let gateway_addr: SocketAddr = std::env::var("STEMEDB_NODE_API_ADDR")
.unwrap_or_else(|_| "0.0.0.0:18181".to_string())
.parse()
.unwrap_or_else(|_| SocketAddr::from(([0, 0, 0, 0], 18181)));
let api_addr: SocketAddr = config
.bind_addr
.parse()
.unwrap_or_else(|_| SocketAddr::from(([0, 0, 0, 0], 18180)));
// --- Membership ---
let local_info = NodeInfo::new(node_id, rpc_addr, api_addr);
let membership = Arc::new(SwimMembership::new(local_info, SwimConfig::default()));
// Resolve seeds via DNS (for k8s headless service names)
let raw_seeds = std::env::var("STEMEDB_SEED_NODES").unwrap_or_default();
let mut seeds = Vec::new();
for entry in raw_seeds.split(',').filter(|s| !s.trim().is_empty()) {
let entry = entry.trim();
if let Ok(addr) = entry.parse::<SocketAddr>() {
seeds.push(addr);
} else {
match tokio::net::lookup_host(entry).await {
Ok(mut addrs) => {
if let Some(addr) = addrs.next() {
info!(seed = entry, resolved = %addr, "Resolved seed node");
seeds.push(addr);
}
}
Err(e) => {
info!(seed = entry, error = %e, "Failed to resolve seed");
}
}
}
}
if let Err(e) = membership.join(seeds).await {
warn!("Cluster join failed (continuing as solo node): {}", e);
}
membership.start();
membership.spawn_background_tasks();
// --- Sharding ---
let num_shards = std::env::var("STEMEDB_NUM_SHARDS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(4u32);
let replication_factor = std::env::var("STEMEDB_REPLICATION_FACTOR")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(1u32);
let router = Arc::new(RangeRouter::new(node_id));
let sharding_config = ShardingConfig::new()
.with_num_shards(num_shards)
.with_replication_factor(replication_factor);
let range_manager = RangeManager::new(
Arc::clone(&router),
Arc::clone(&membership),
sharding_config,
node_id,
);
if let Err(e) = range_manager.initialize_shards() {
error!("Failed to initialize shards: {}", e);
}
// --- gRPC server (SyncStorageBridge) ---
let merkle_manager = Arc::new(
stemedb_sync::MerkleTreeManager::load_or_create(Arc::clone(&store))
.await
.map_err(|e| format!("Failed to load Merkle tree: {e}"))?,
);
let bridge = Arc::new(stemedb_api::sync_storage_bridge::SyncStorageBridge::new(
Arc::clone(&store),
Arc::clone(&merkle_manager),
*node_id.as_bytes(),
rpc_addr.to_string(),
api_addr.to_string(),
));
let grpc_service = SyncServiceServer::new(SyncServiceHandler::new(bridge));
let grpc_rpc_addr = rpc_addr;
tokio::spawn(async move {
info!(addr = %grpc_rpc_addr, "gRPC server listening");
if let Err(e) = tonic::transport::Server::builder()
.add_service(grpc_service)
.serve(grpc_rpc_addr)
.await
{
error!("gRPC server error: {}", e);
}
});
// --- Gossip broadcaster ---
let peer_addrs: Vec<String> =
membership.members().iter().map(|m| format!("http://{}", m.rpc_addr)).collect();
let gossip_broadcaster: Option<Arc<dyn stemedb_ingest::gossip::GossipBroadcast>> =
match stemedb_sync::GossipBroadcaster::new(peer_addrs).await {
Ok(b) => Some(Arc::new(b)),
Err(e) => {
warn!("Failed to create gossip broadcaster: {}", e);
None
}
};
// --- Gateway ---
let gateway = Gateway::new(Arc::clone(&router), Arc::clone(&membership), gateway_addr)
.with_prometheus_handle(Arc::clone(&prometheus_handle));
tokio::spawn(async move {
if let Err(e) = gateway.serve().await {
error!("Gateway error: {}", e);
}
});
// --- Shard rebalance listener ---
let rebalance_membership = Arc::clone(&membership);
let rebalance_router = Arc::clone(&router);
tokio::spawn(async move {
let mut rx = rebalance_membership.subscribe();
let sharding_cfg = ShardingConfig::new()
.with_num_shards(num_shards)
.with_replication_factor(replication_factor);
loop {
match rx.recv().await {
Ok(event) => {
if event.is_join() || event.is_failure() || event.is_leave() {
// Debounce: wait 5s for cluster to stabilize
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
let rm = RangeManager::new(
Arc::clone(&rebalance_router),
Arc::clone(&rebalance_membership),
sharding_cfg.clone(),
node_id,
);
if let Err(e) = rm.rebalance_shards() {
warn!("Shard rebalance failed: {}", e);
}
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
warn!(skipped = n, "Membership event receiver lagged");
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
});
info!(
node_id = %node_id.short_hex(),
rpc_addr = %rpc_addr,
gateway_addr = %gateway_addr,
"Cluster mode fully active"
);
gossip_broadcaster
} else {
None
}
};
#[cfg(not(feature = "cluster"))]
let cluster_gossip: Option<Arc<dyn stemedb_ingest::gossip::GossipBroadcast>> = None;
// Spawn IngestWorker (after cluster block so gossip broadcaster can be injected)
let worker_journal = state.journal.clone();
let worker_store = store;
let worker_flush_notify = Arc::clone(&state.flush_notify);
let skip_sigs = config.unsafe_skip_signatures;
let worker_memtable = Arc::clone(&state.memtable);
tokio::spawn(async move {
match IngestWorker::new(worker_journal, worker_store).await {
@ -182,6 +378,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.with_flush_notify(worker_flush_notify)
.with_memtable(worker_memtable)
.with_skip_signature_verification(skip_sigs);
if let Some(gossip) = cluster_gossip {
worker = worker.with_gossip_broadcaster(gossip);
}
info!(skip_signatures = skip_sigs, "IngestWorker started");
worker.run().await;
}
@ -189,60 +388,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
});
// Bootstrap root API key from env (idempotent: no-op if key already exists).
if let Err(e) = bootstrap::bootstrap_root_api_key(&*state.api_key_store).await {
error!("Failed to bootstrap root API key: {}", e);
std::process::exit(1);
}
// Cluster mode: join SWIM membership when STEMEDB_CLUSTER_MODE=true.
// Requires the `cluster` feature to be enabled at compile time.
#[cfg(feature = "cluster")]
{
let cluster_mode = std::env::var("STEMEDB_CLUSTER_MODE")
.map(|v| v.to_lowercase() == "true" || v == "1")
.unwrap_or(false);
if cluster_mode {
use stemedb_cluster::{stable_node_id, NodeInfo, SwimConfig, SwimMembership};
let node_id = stable_node_id();
let rpc_addr: std::net::SocketAddr = std::env::var("STEMEDB_NODE_RPC_ADDR")
.unwrap_or_else(|_| "127.0.0.1:18182".to_string())
.parse()
.unwrap_or_else(|_| std::net::SocketAddr::from(([127, 0, 0, 1], 18182)));
let api_addr: std::net::SocketAddr = config
.bind_addr
.parse()
.unwrap_or_else(|_| std::net::SocketAddr::from(([127, 0, 0, 1], 18180)));
let local_info = NodeInfo::new(node_id, rpc_addr, api_addr);
let membership = Arc::new(SwimMembership::new(local_info, SwimConfig::default()));
let seeds: Vec<std::net::SocketAddr> = std::env::var("STEMEDB_CLUSTER_SEEDS")
.unwrap_or_default()
.split(',')
.filter(|s| !s.trim().is_empty())
.filter_map(|s| s.trim().parse().ok())
.collect();
if !seeds.is_empty() {
if let Err(e) = membership.join(seeds).await {
warn!("Cluster join failed (continuing as solo node): {}", e);
}
}
membership.start();
info!(
node_id = %node_id.short_hex(),
rpc_addr = %rpc_addr,
"Cluster mode active"
);
}
}
// Startup guard: unsafe skip + auth enabled is a fatal misconfiguration.
if config.unsafe_skip_signatures && bootstrap::is_auth_enabled() {
error!(

View File

@ -8,12 +8,13 @@
//! - With Circuit Breaker (full protection stack)
use axum::{
http::{header, HeaderValue, Method},
routing::{get, post},
Router,
};
use std::sync::Arc;
use std::time::Duration;
use tower_http::cors::{Any, CorsLayer};
use tower_http::cors::CorsLayer;
use tower_http::limit::RequestBodyLimitLayer;
use tower_http::timeout::TimeoutLayer;
use tower_http::trace::TraceLayer;
@ -107,10 +108,7 @@ pub fn create_router(state: AppState) -> Router {
/// Create the axum router with custom security configuration.
pub fn create_router_config(state: AppState, security_config: SecurityConfig) -> Router {
let cors = CorsLayer::new()
.allow_origin(Any) // For development; restrict in production
.allow_methods(Any)
.allow_headers(Any);
let cors = build_cors_layer();
let api_router = build_api_routes(&security_config)
.with_state(state)
@ -141,7 +139,7 @@ pub fn create_router_with_meter(state: AppState) -> Router {
/// Create the axum router with economic throttling and custom security configuration.
pub fn create_router_with_meter_config(state: AppState, security_config: SecurityConfig) -> Router {
let cors = CorsLayer::new().allow_origin(Any).allow_methods(Any).allow_headers(Any);
let cors = build_cors_layer();
let meter_layer = MeterLayer::new(Arc::clone(&state.quota_store));
let api_router = build_api_routes(&security_config)
@ -201,7 +199,7 @@ pub fn create_router_with_admission_config(
state: AppState,
security_config: SecurityConfig,
) -> Router {
let cors = CorsLayer::new().allow_origin(Any).allow_methods(Any).allow_headers(Any);
let cors = build_cors_layer();
let admission_layer = AdmissionLayer::new(Arc::clone(&state.admission_store));
let meter_layer = MeterLayer::new(Arc::clone(&state.quota_store));
@ -261,7 +259,7 @@ pub fn create_router_with_auth_full_config(
auth_config: ApiKeyAuthConfig,
security_config: SecurityConfig,
) -> Router {
let cors = CorsLayer::new().allow_origin(Any).allow_methods(Any).allow_headers(Any);
let cors = build_cors_layer();
let api_key_layer = ApiKeyAuthLayer::with_config(Arc::clone(&state.api_key_store), auth_config);
let api_router = build_api_routes(&security_config)
@ -301,7 +299,7 @@ pub fn create_router_full_protection_full_config(
auth_config: ApiKeyAuthConfig,
security_config: SecurityConfig,
) -> Router {
let cors = CorsLayer::new().allow_origin(Any).allow_methods(Any).allow_headers(Any);
let cors = build_cors_layer();
let api_key_layer = ApiKeyAuthLayer::with_config(Arc::clone(&state.api_key_store), auth_config);
let circuit_breaker_layer = CircuitBreakerLayer::new(Arc::clone(&state.circuit_breaker_store));
let admission_layer = AdmissionLayer::new(Arc::clone(&state.admission_store));
@ -361,7 +359,7 @@ pub fn create_router_with_circuit_breaker_config(
state: AppState,
security_config: SecurityConfig,
) -> Router {
let cors = CorsLayer::new().allow_origin(Any).allow_methods(Any).allow_headers(Any);
let cors = build_cors_layer();
let circuit_breaker_layer = CircuitBreakerLayer::new(Arc::clone(&state.circuit_breaker_store));
let admission_layer = AdmissionLayer::new(Arc::clone(&state.admission_store));
let meter_layer = MeterLayer::new(Arc::clone(&state.quota_store));
@ -381,6 +379,24 @@ pub fn create_router_with_circuit_breaker_config(
.merge(api_router)
}
/// Builds a CORS layer from `STEMEDB_ALLOWED_ORIGINS` env var.
///
/// If set, restricts origins to the comma-separated list. If unset or empty,
/// allows all origins (dev mode).
fn build_cors_layer() -> CorsLayer {
match std::env::var("STEMEDB_ALLOWED_ORIGINS") {
Ok(origins) if !origins.is_empty() => {
let allowed: Vec<HeaderValue> =
origins.split(',').filter_map(|s| s.trim().parse().ok()).collect();
CorsLayer::new()
.allow_origin(allowed)
.allow_methods([Method::GET, Method::POST, Method::PUT, Method::DELETE])
.allow_headers([header::CONTENT_TYPE, header::HeaderName::from_static("x-api-key")])
}
_ => CorsLayer::permissive(),
}
}
/// Build the API routes without state or layers.
///
/// This is an internal helper that defines all the routes and handlers.

View File

@ -0,0 +1,113 @@
//! Bridge between the gRPC `SyncStorage` trait and stemedb-api's storage layer.
//!
//! This module allows the stemedb-api binary to host a gRPC sync server that
//! delegates to the same `HybridStore` and `MerkleTreeManager` used by the
//! main HTTP API.
use async_trait::async_trait;
use std::sync::Arc;
use stemedb_merkle::Hash;
use stemedb_rpc::server::SyncStorage;
use stemedb_storage::{HybridStore, KVStore};
use stemedb_sync::MerkleTreeManager;
use tracing::{debug, warn};
/// Bridges the gRPC `SyncStorage` trait to the stemedb-api storage layer.
pub struct SyncStorageBridge {
kv_store: Arc<HybridStore>,
merkle_manager: Arc<MerkleTreeManager<HybridStore>>,
local_node_id: [u8; 16],
rpc_addr: String,
api_addr: String,
}
impl SyncStorageBridge {
/// Creates a new bridge.
pub fn new(
kv_store: Arc<HybridStore>,
merkle_manager: Arc<MerkleTreeManager<HybridStore>>,
local_node_id: [u8; 16],
rpc_addr: String,
api_addr: String,
) -> Self {
Self { kv_store, merkle_manager, local_node_id, rpc_addr, api_addr }
}
}
#[async_trait]
impl SyncStorage for SyncStorageBridge {
async fn store_gossip_assertion(
&self,
hash: [u8; 32],
data: Vec<u8>,
_hlc_time: u64,
_hlc_counter: u32,
_hlc_node_id: [u8; 16],
) -> Result<bool, String> {
// Content-addressed key: store at "A:{hash}"
let key = format!("A:{}", ::hex::encode(hash));
// Check if already exists
match self.kv_store.get(key.as_bytes()).await {
Ok(Some(_)) => {
debug!(hash = %::hex::encode(&hash[..8]), "Gossip assertion already exists");
return Ok(false);
}
Ok(None) => {}
Err(e) => return Err(format!("KV get failed: {e}")),
}
// Store the assertion data
self.kv_store
.put(key.as_bytes(), &data)
.await
.map_err(|e| format!("KV put failed: {e}"))?;
// Insert hash into Merkle tree
self.merkle_manager.insert(hash).await.map_err(|e| format!("Merkle insert failed: {e}"))?;
Ok(true)
}
async fn get_merkle_state(&self) -> Result<(Option<[u8; 32]>, u64), String> {
let root =
self.merkle_manager.root().await.map_err(|e| format!("Merkle root failed: {e}"))?;
let count = self.merkle_manager.len().await as u64;
Ok((root, count))
}
async fn fetch_assertions(
&self,
hashes: Vec<[u8; 32]>,
) -> Result<Vec<([u8; 32], Vec<u8>)>, String> {
let mut results = Vec::with_capacity(hashes.len());
for hash in hashes {
let key = format!("A:{}", ::hex::encode(hash));
match self.kv_store.get(key.as_bytes()).await {
Ok(Some(data)) => results.push((hash, data)),
Ok(None) => {
debug!(hash = %::hex::encode(&hash[..8]), "Assertion not found");
}
Err(e) => {
warn!(error = %e, "Failed to fetch assertion");
}
}
}
Ok(results)
}
async fn get_node_info(&self) -> Result<([u8; 16], u64, String, String), String> {
let count = self.merkle_manager.len().await as u64;
Ok((self.local_node_id, count, self.rpc_addr.clone(), self.api_addr.clone()))
}
async fn get_leaves(&self, max_leaves: u64) -> Result<(Vec<Hash>, bool), String> {
let all_leaves = self.merkle_manager.leaves().await;
let cap = if max_leaves == 0 { 10000 } else { max_leaves.min(10000) as usize };
if all_leaves.len() > cap {
Ok((all_leaves.into_iter().take(cap).collect(), true))
} else {
Ok((all_leaves, false))
}
}
}

View File

@ -19,6 +19,7 @@
use std::net::SocketAddr;
use std::sync::Arc;
use metrics_exporter_prometheus::PrometheusBuilder;
use tracing::info;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
@ -97,6 +98,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::registry().with(env_filter).with(tracing_subscriber::fmt::layer()).init();
let prometheus_handle = Arc::new(
PrometheusBuilder::new()
.install_recorder()
.map_err(|e| format!("Failed to install Prometheus recorder: {e}"))?,
);
let config = NodeConfig::from_env().await;
// Use stable NodeId (env var → hostname → random fallback)
@ -121,6 +128,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Join cluster (bootstrap if no seeds)
membership.join(config.seed_nodes.clone()).await?;
membership.start();
membership.spawn_background_tasks();
info!(
joined = membership.is_joined(),
@ -143,7 +151,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
info!(shards = meta.num_shards(), version = meta.version, "Shard meta-range initialized");
// --- Gateway ---
let gateway = Gateway::new(Arc::clone(&router), Arc::clone(&membership), config.api_addr);
let gateway = Gateway::new(Arc::clone(&router), Arc::clone(&membership), config.api_addr)
.with_prometheus_handle(prometheus_handle);
info!(
addr = %config.api_addr,

View File

@ -1,6 +1,7 @@
//! Handlers for read operations (query, health, cluster status, routing).
use axum::extract::{Query, State};
use axum::http::HeaderMap;
use axum::Json;
use std::sync::Arc;
use tracing::instrument;
@ -14,9 +15,10 @@ use super::types::{ApiError, ClusterStatusResponse, HealthResponse, NodeStatusIn
///
/// Routes by subject hash to a replica (preferring local) and forwards the
/// request via HTTP to that node's stemedb-api.
#[instrument(skip(state), fields(subject = %params.subject))]
#[instrument(skip(state, headers), fields(subject = %params.subject))]
pub async fn handle_query(
State(state): State<Arc<GatewayState>>,
headers: HeaderMap,
Query(params): Query<QueryParams>,
) -> Result<Json<serde_json::Value>, ApiError> {
state.inc_requests();
@ -59,7 +61,11 @@ pub async fn handle_query(
"Forwarding query to replica"
);
let response = http_client.get(&url).query(&params).send().await.map_err(|e| ApiError {
let mut builder = http_client.get(&url).query(&params);
if let Some(key) = headers.get("x-api-key") {
builder = builder.header("x-api-key", key.to_str().unwrap_or_default());
}
let response = builder.send().await.map_err(|e| ApiError {
code: "UNAVAILABLE".to_string(),
message: format!("Forward to replica failed: {e}"),
})?;

View File

@ -1,6 +1,7 @@
//! Handlers for write operations (assert, vote).
use axum::extract::State;
use axum::http::HeaderMap;
use axum::Json;
use std::sync::Arc;
use tracing::instrument;
@ -13,9 +14,10 @@ use super::types::{ApiError, CreateAssertionRequest, VoteRequest, VoteResponse};
///
/// Routes by subject hash to the shard leader and forwards the request via
/// HTTP to that node's stemedb-api. Returns the response from the leader.
#[instrument(skip(state, req), fields(subject = %req.subject))]
#[instrument(skip(state, headers, req), fields(subject = %req.subject))]
pub async fn handle_assert(
State(state): State<Arc<GatewayState>>,
headers: HeaderMap,
Json(req): Json<CreateAssertionRequest>,
) -> Result<Json<serde_json::Value>, ApiError> {
state.inc_requests();
@ -53,7 +55,11 @@ pub async fn handle_assert(
"Forwarding assertion to shard leader"
);
let response = http_client.post(&url).json(&req).send().await.map_err(|e| ApiError {
let mut builder = http_client.post(&url).json(&req);
if let Some(key) = headers.get("x-api-key") {
builder = builder.header("x-api-key", key.to_str().unwrap_or_default());
}
let response = builder.send().await.map_err(|e| ApiError {
code: "UNAVAILABLE".to_string(),
message: format!("Forward to leader failed: {e}"),
})?;
@ -78,9 +84,10 @@ pub async fn handle_assert(
/// POST /v1/vote - Submit a vote.
///
/// Routes to the shard leader for the assertion's subject and forwards via HTTP.
#[instrument(skip(state, req), fields(subject = %req.subject))]
#[instrument(skip(state, headers, req), fields(subject = %req.subject))]
pub async fn handle_vote(
State(state): State<Arc<GatewayState>>,
headers: HeaderMap,
Json(req): Json<VoteRequest>,
) -> Result<Json<VoteResponse>, ApiError> {
state.inc_requests();
@ -118,7 +125,11 @@ pub async fn handle_vote(
"Forwarding vote to shard leader"
);
let response = http_client.post(&url).json(&req).send().await.map_err(|e| ApiError {
let mut builder = http_client.post(&url).json(&req);
if let Some(key) = headers.get("x-api-key") {
builder = builder.header("x-api-key", key.to_str().unwrap_or_default());
}
let response = builder.send().await.map_err(|e| ApiError {
code: "UNAVAILABLE".to_string(),
message: format!("Forward to leader failed: {e}"),
})?;

View File

@ -3,7 +3,7 @@
//! The Gateway provides a stateless HTTP interface for clients, routing
//! requests to the appropriate shard nodes based on subject hashing.
use axum::http::{header, Method};
use axum::http::{header, HeaderValue, Method};
use axum::routing::{get, post};
use axum::{Extension, Router};
use dashmap::DashMap;
@ -128,11 +128,7 @@ impl Gateway {
.route("/metrics", get(handlers::handle_metrics))
// Middleware
.layer(TraceLayer::new_for_http())
.layer(
CorsLayer::new()
.allow_methods([Method::GET, Method::POST])
.allow_headers([header::CONTENT_TYPE]),
)
.layer(build_cors_layer())
// State
.with_state(self.state.clone());
@ -175,6 +171,24 @@ impl Gateway {
}
}
/// Builds a CORS layer from `STEMEDB_ALLOWED_ORIGINS` env var.
///
/// If set, restricts origins to the comma-separated list. If unset or empty,
/// allows all origins (dev mode).
fn build_cors_layer() -> CorsLayer {
match std::env::var("STEMEDB_ALLOWED_ORIGINS") {
Ok(origins) if !origins.is_empty() => {
let allowed: Vec<HeaderValue> =
origins.split(',').filter_map(|s| s.trim().parse().ok()).collect();
CorsLayer::new()
.allow_origin(allowed)
.allow_methods([Method::GET, Method::POST, Method::PUT, Method::DELETE])
.allow_headers([header::CONTENT_TYPE, header::HeaderName::from_static("x-api-key")])
}
_ => CorsLayer::permissive(),
}
}
/// Builder for Gateway configuration.
pub struct GatewayBuilder {
router: Option<Arc<RangeRouter>>,

View File

@ -13,6 +13,7 @@ use parking_lot::RwLock;
use rand::seq::SliceRandom;
use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::broadcast;
use tracing::{debug, info, instrument, warn};
@ -130,16 +131,43 @@ impl SwimMembership {
}
};
let ping = stemedb_rpc::proto::PingRequest { node_id: local_id.as_bytes().to_vec() };
let ping = stemedb_rpc::proto::PingRequest {
node_id: local_id.as_bytes().to_vec(),
updates: Vec::new(),
};
match client.ping(ping).await {
Ok(resp) => {
let seed_id_hex = hex::encode(&resp.node_id[..resp.node_id.len().min(4)]);
if resp.node_id.len() >= 16 {
let mut seed_id_bytes = [0u8; 16];
seed_id_bytes.copy_from_slice(&resp.node_id[..16]);
let seed_node_id = NodeId::from_bytes(seed_id_bytes);
// Use addresses from PingResponse, falling back to seed_addr
let seed_rpc: std::net::SocketAddr =
resp.rpc_addr.parse().unwrap_or(*seed_addr);
let seed_api: std::net::SocketAddr =
resp.api_addr.parse().unwrap_or_else(|_| {
std::net::SocketAddr::new(
seed_addr.ip(),
seed_addr.port().saturating_sub(2),
)
});
let seed_info = NodeInfo::new(seed_node_id, seed_rpc, seed_api);
self.alive_node(seed_node_id, seed_info);
info!(
seed = %seed_addr,
seed_id = %seed_id_hex,
"Seed reachable, cluster join successful"
seed_id = %seed_node_id.short_hex(),
"Registered seed in membership table"
);
} else {
info!(
seed = %seed_addr,
"Seed reachable but returned short node_id"
);
}
contacted += 1;
}
Err(e) => {
@ -521,6 +549,186 @@ impl SwimMembership {
self.running.store(true, Ordering::SeqCst);
}
/// Spawns background SWIM protocol tasks (probe, suspicion check, gossip).
///
/// Must be called after `start()` and `join()`. Spawns 3 tokio tasks that
/// run until `stop()` is called.
pub fn spawn_background_tasks(self: &Arc<Self>) {
let membership = Arc::clone(self);
let probe_interval = membership.config.probe_interval;
// 1. Probe loop — pings a random member every probe_interval
let m = Arc::clone(&membership);
tokio::spawn(async move {
let mut ticker = tokio::time::interval(probe_interval);
loop {
ticker.tick().await;
if !m.is_running() {
break;
}
let target_id = match m.select_probe_target() {
Some(id) => id,
None => continue,
};
let target_info = match m.get_member(target_id) {
Some(info) => info,
None => continue,
};
let addr = format!("http://{}", target_info.rpc_addr);
let local_id = m.local_id();
let gossip_batch = m.get_gossip_batch(5);
let updates: Vec<stemedb_rpc::proto::MembershipUpdate> = gossip_batch
.iter()
.map(|entry| stemedb_rpc::proto::MembershipUpdate {
node_id: entry.node.id.as_bytes().to_vec(),
rpc_addr: entry.node.rpc_addr.to_string(),
api_addr: entry.node.api_addr.to_string(),
state: match entry.state {
NodeState::Alive => 0,
NodeState::Suspect => 1,
NodeState::Dead => 2,
NodeState::Left => 3,
},
lamport_time: entry.lamport_time,
incarnation: entry.node.incarnation,
})
.collect();
match stemedb_rpc::SyncClient::connect(&addr).await {
Ok(client) => {
let ping = stemedb_rpc::proto::PingRequest {
node_id: local_id.as_bytes().to_vec(),
updates,
};
match client.ping(ping).await {
Ok(resp) => {
if resp.node_id.len() >= 16 {
let mut id_bytes = [0u8; 16];
id_bytes.copy_from_slice(&resp.node_id[..16]);
let peer_id = NodeId::from_bytes(id_bytes);
let peer_rpc: std::net::SocketAddr =
resp.rpc_addr.parse().unwrap_or(target_info.rpc_addr);
let peer_api: std::net::SocketAddr =
resp.api_addr.parse().unwrap_or(target_info.api_addr);
m.alive_node(
peer_id,
NodeInfo::new(peer_id, peer_rpc, peer_api),
);
}
// Process piggybacked membership updates
for update in resp.updates {
if update.node_id.len() >= 16 {
let mut id_bytes = [0u8; 16];
id_bytes.copy_from_slice(&update.node_id[..16]);
let upd_id = NodeId::from_bytes(id_bytes);
if upd_id == m.local_id() {
continue;
}
let upd_rpc: std::net::SocketAddr =
match update.rpc_addr.parse() {
Ok(a) => a,
Err(_) => continue,
};
let upd_api: std::net::SocketAddr =
match update.api_addr.parse() {
Ok(a) => a,
Err(_) => continue,
};
let mut node_info = NodeInfo::new(upd_id, upd_rpc, upd_api);
node_info.incarnation = update.incarnation;
let state = match update.state {
0 => NodeState::Alive,
1 => NodeState::Suspect,
2 => NodeState::Dead,
_ => NodeState::Left,
};
let entry = MembershipEntry::new(
node_info,
state,
update.lamport_time,
);
m.process_membership_update(entry);
}
}
}
Err(_) => {
m.suspect_node(target_id);
}
}
}
Err(_) => {
m.suspect_node(target_id);
}
}
}
});
// 2. Suspicion checker — promotes suspects to dead
let m = Arc::clone(&membership);
tokio::spawn(async move {
let mut ticker = tokio::time::interval(std::time::Duration::from_secs(1));
loop {
ticker.tick().await;
if !m.is_running() {
break;
}
m.check_suspicion_timeouts();
}
});
// 3. Gossip disseminator — sends batched updates to random peers
let m = Arc::clone(&membership);
let gossip_interval = membership.config.gossip_interval;
tokio::spawn(async move {
let mut ticker = tokio::time::interval(gossip_interval);
loop {
ticker.tick().await;
if !m.is_running() {
break;
}
let batch = m.get_gossip_batch(5);
if batch.is_empty() {
continue;
}
// Pick a random alive member to send gossip to
let target = match m.select_probe_target() {
Some(id) => id,
None => continue,
};
let target_info = match m.get_member(target) {
Some(info) => info,
None => continue,
};
let addr = format!("http://{}", target_info.rpc_addr);
let updates: Vec<stemedb_rpc::proto::MembershipUpdate> = batch
.iter()
.map(|entry| stemedb_rpc::proto::MembershipUpdate {
node_id: entry.node.id.as_bytes().to_vec(),
rpc_addr: entry.node.rpc_addr.to_string(),
api_addr: entry.node.api_addr.to_string(),
state: match entry.state {
NodeState::Alive => 0,
NodeState::Suspect => 1,
NodeState::Dead => 2,
NodeState::Left => 3,
},
lamport_time: entry.lamport_time,
incarnation: entry.node.incarnation,
})
.collect();
if let Ok(client) = stemedb_rpc::SyncClient::connect(&addr).await {
let ping = stemedb_rpc::proto::PingRequest {
node_id: m.local_id().as_bytes().to_vec(),
updates,
};
let _ = client.ping(ping).await;
}
}
});
}
/// Stops the background SWIM protocol tasks.
pub fn stop(&self) {
self.running.store(false, Ordering::SeqCst);

View File

@ -320,6 +320,42 @@ impl RangeManager {
Ok(())
}
/// Rebalances shards across all alive members.
///
/// Redistributes shard replicas using round-robin across all alive nodes
/// (including self). Called when membership changes (node join/leave/fail).
#[instrument(skip(self))]
pub fn rebalance_shards(&self) -> Result<()> {
let mut node_ids: Vec<NodeId> = self.membership.members().iter().map(|n| n.id).collect();
// Always include self
if !node_ids.contains(&self.local_node_id) {
node_ids.push(self.local_node_id);
}
// Sort for deterministic ordering across all nodes
node_ids.sort_by_key(|id| *id.as_bytes());
if node_ids.is_empty() {
warn!("No nodes for rebalance");
return Ok(());
}
let meta = MetaRange::with_initial_shards(
self.config.num_shards,
&node_ids,
self.config.replication_factor,
);
self.router.update_meta_range(meta);
info!(
alive_nodes = node_ids.len(),
num_shards = self.config.num_shards,
"Rebalanced shards"
);
Ok(())
}
/// Computes the midpoint key for splitting a range.
fn compute_midpoint(&self, desc: &RangeDescriptor) -> Vec<u8> {
// If we have concrete bounds, compute actual midpoint

View File

@ -93,6 +93,9 @@ message AssertionData {
message PingRequest {
// Sender's node ID (16 bytes)
bytes node_id = 1;
// Piggybacked SWIM membership gossip updates
repeated MembershipUpdate updates = 2;
}
message PingResponse {
@ -101,6 +104,36 @@ message PingResponse {
// Number of assertions on this node
uint64 assertion_count = 2;
// Piggybacked SWIM membership gossip updates
repeated MembershipUpdate updates = 3;
// Responder's RPC address
string rpc_addr = 4;
// Responder's API address
string api_addr = 5;
}
// MembershipUpdate carries SWIM gossip state for a single node.
message MembershipUpdate {
// Node ID (16 bytes)
bytes node_id = 1;
// RPC address (e.g., "10.0.0.1:18182")
string rpc_addr = 2;
// API address (e.g., "10.0.0.1:18180")
string api_addr = 3;
// Node state: 0=Alive, 1=Suspect, 2=Dead, 3=Left
uint32 state = 4;
// Lamport clock time
uint64 lamport_time = 5;
// Incarnation number for crashing/rejoining detection
uint64 incarnation = 6;
}
// GetLeavesRequest requests all Merkle tree leaf hashes.

View File

@ -57,8 +57,8 @@ pub trait SyncStorage: Send + Sync + 'static {
hashes: Vec<[u8; 32]>,
) -> Result<Vec<([u8; 32], Vec<u8>)>, String>;
/// Get this node's ID and assertion count for ping response.
async fn get_node_info(&self) -> Result<([u8; 16], u64), String>;
/// Get this node's ID, assertion count, and addresses for ping response.
async fn get_node_info(&self) -> Result<([u8; 16], u64, String, String), String>;
/// Get all Merkle tree leaf hashes.
///
@ -229,12 +229,18 @@ impl<S: SyncStorage> SyncService for SyncServiceHandler<S> {
#[instrument(skip(self, _request))]
async fn ping(&self, _request: Request<PingRequest>) -> Result<Response<PingResponse>, Status> {
let (node_id, assertion_count) =
let (node_id, assertion_count, rpc_addr, api_addr) =
self.storage.get_node_info().await.map_err(Status::internal)?;
debug!(assertion_count, "Responding to ping");
Ok(Response::new(PingResponse { node_id: node_id.to_vec(), assertion_count }))
Ok(Response::new(PingResponse {
node_id: node_id.to_vec(),
assertion_count,
updates: Vec::new(),
rpc_addr,
api_addr,
}))
}
#[instrument(skip(self, request), fields(max_leaves = request.get_ref().max_leaves))]
@ -291,8 +297,13 @@ mod tests {
Ok(hashes.into_iter().map(|h| (h, vec![1, 2, 3])).collect())
}
async fn get_node_info(&self) -> Result<([u8; 16], u64), String> {
Ok((self.node_id, self.assertion_count))
async fn get_node_info(&self) -> Result<([u8; 16], u64, String, String), String> {
Ok((
self.node_id,
self.assertion_count,
"127.0.0.1:18182".to_string(),
"127.0.0.1:18180".to_string(),
))
}
async fn get_leaves(&self, max_leaves: u64) -> Result<(Vec<[u8; 32]>, bool), String> {
@ -310,11 +321,13 @@ mod tests {
let storage = Arc::new(MockStorage { node_id: [42u8; 16], assertion_count: 100 });
let handler = SyncServiceHandler::new(storage);
let request = Request::new(PingRequest { node_id: vec![1u8; 16] });
let request = Request::new(PingRequest { node_id: vec![1u8; 16], updates: Vec::new() });
let response = handler.ping(request).await.expect("ping should succeed");
assert_eq!(response.get_ref().node_id, vec![42u8; 16]);
assert_eq!(response.get_ref().assertion_count, 100);
assert_eq!(response.get_ref().rpc_addr, "127.0.0.1:18182");
assert_eq!(response.get_ref().api_addr, "127.0.0.1:18180");
}
#[tokio::test]

View File

@ -131,4 +131,4 @@ Submit pull requests to keep this guide current and valuable.
---
**Last Updated:** 2026-03-02
**Last Updated:** 2026-03-07

View File

@ -46,29 +46,33 @@ all probes), ClusterIP Service, Traefik Ingress at `stemedb.threesix.ai`.
---
### ~~5. Image registry — k3s cannot pull without a registry~~ ✅ RESOLVED (2026-03-02)
### ~~5. Image registry — k3s cannot pull without a registry~~ ✅ RESOLVED (2026-03-07)
Registry confirmed: `us-central1-docker.pkg.dev/orchard9/docker-images/` (GAR).
`imagePullSecrets: gcr-secret` wired in Deployment. Dockerfile updated with `--features aphoria`.
Registry: `registry.threesix.ai` (Zot OCI registry on k3s). Woodpecker CI pipeline (`.woodpecker.yml`)
builds via Kaniko and pushes automatically on every merge to main. No manual docker build needed.
**Remaining manual step:** `docker build && docker push` to populate the image.
**Image:** `registry.threesix.ai/stemedb-api:latest` (also tagged with short commit SHA)
---
## Pre-Deploy Checklist (Manual Steps Before `kubectl apply`)
```bash
# 1. Build and push image (from stemedb repo root)
docker build -t us-central1-docker.pkg.dev/orchard9/docker-images/stemedb-api:latest .
docker push us-central1-docker.pkg.dev/orchard9/docker-images/stemedb-api:latest
> **Note:** Image builds are now automated via Woodpecker CI. Push to `main` → Kaniko builds →
> pushes to `registry.threesix.ai``kubectl set image` on StatefulSet. Manual steps below are
> only needed for first-time setup.
# 2. Create root API key in GCP Secret Manager
```bash
# 1. Image builds are automatic (Woodpecker CI). For manual builds:
docker build --platform linux/amd64 -t registry.threesix.ai/stemedb-api:latest .
docker push registry.threesix.ai/stemedb-api:latest
# 2. Create root API key in GCP Secret Manager (first deploy only)
ROOT_KEY="steme_live_$(openssl rand -hex 24)"
echo "Root key: $ROOT_KEY" # Save this — needed for provision-project-keys.sh
echo -n "$ROOT_KEY" | gcloud secrets create stemedb-root-api-key \
--project=orchard9 --replication-policy=automatic --data-file=-
# 3. Add DNS: stemedb.threesix.ai → Traefik LB IP (Cloudflare)
# 3. Add DNS: stemedb.threesix.ai → Traefik LB IP (Cloudflare) — already done
```
---
@ -310,7 +314,7 @@ curl https://stemedb.yourdomain.com/v1/health
---
## Phase 1 Checklist (Week 1 — Gate: First Project Can Connect)
## Phase 1 Checklist (Week 1 — Gate: First Project Can Connect) ✅ COMPLETE
| # | Task | File(s) | Status |
|---|------|---------|--------|
@ -320,28 +324,32 @@ curl https://stemedb.yourdomain.com/v1/health
| 4 | Add `--features aphoria` to Dockerfile | `Dockerfile` | ✅ Done |
| 5 | Create k8s manifests | `k3s-fleet/.../stemedb/` | ✅ Done |
| 6 | Write `scripts/provision-project-keys.sh` | `scripts/` | ✅ Done |
| 7 | Build + push Docker image | GAR | ⏳ Manual |
| 8 | Store root API key in GCP Secret Manager | GCP Console | ⏳ Manual |
| 9 | Add DNS record: `stemedb.threesix.ai` | Cloudflare | ⏳ Manual |
| 10 | Deploy to k3s + smoke test | k3s-fleet | ⏳ Pending |
| 7 | Build + push Docker image | Woodpecker CI → Zot registry | ✅ Done (automated) |
| 8 | Store root API key in GCP Secret Manager | GCP Console | ✅ Done |
| 9 | Add DNS record: `stemedb.threesix.ai` | Cloudflare | ✅ Done |
| 10 | Deploy to k3s + smoke test | k3s-fleet | ✅ Done |
| 11 | Upgrade to 3-node StatefulSet | `stemedb.yaml` | ✅ Done (2026-03-07) |
| 12 | Woodpecker CI/CD pipeline | `.woodpecker.yml` | ✅ Done |
**Gate test (run after deploy):**
```bash
# Health check
# Health check (routes through Gateway on :18181)
curl https://stemedb.threesix.ai/v1/health
# Direct API health on each pod (port-forward to :18180)
kubectl port-forward pod/stemedb-0 18180:18180 -n stemedb &
curl http://127.0.0.1:18180/v1/health
# Unauthenticated write → 401
curl -s -o /dev/null -w "%{http_code}" -X POST \
https://stemedb.threesix.ai/v1/assert -H "Content-Type: application/json" -d '{}'
# Authenticated write → 200/201
curl -X POST https://stemedb.threesix.ai/v1/assert \
-H "X-API-Key: $ROOT_KEY" -H "Content-Type: application/json" \
-d '{"subject":"test/ping","predicate":"alive","value":true,"agent_id":"test"}'
# Cluster status
curl https://stemedb.threesix.ai/v1/cluster/status
# Confirm key persists across restart
kubectl rollout restart deployment/stemedb-api -n stemedb
kubectl rollout status deployment/stemedb-api -n stemedb --timeout=120s
# Confirm pods survive rolling restart
kubectl rollout restart statefulset/stemedb -n stemedb
kubectl rollout status statefulset/stemedb -n stemedb --timeout=300s
curl https://stemedb.threesix.ai/v1/health
```
@ -646,24 +654,24 @@ restarts (Recreate strategy = brief downtime), Aphoria should retry rather than
| Item | Why not |
|------|---------|
| HPA | StemeDB is stateful (embedded KV). Cannot scale horizontally. |
| mTLS between pods | Single service. Add when you have a second service. |
| HPA | StemeDB is stateful (embedded KV). StatefulSet replicas are fixed at 3. |
| mTLS between pods | Internal cluster traffic is on private network. Add when exposing cross-cluster. |
| WAF | Body limits + Traefik rate limit + circuit breaker is sufficient for 100 known projects. |
| Per-tenant namespaces | Multiplies operational surface 100x. API key isolation is the right model. |
| Multi-region / clustering | 3-node k3s + Longhorn 2-replica is your HA story. P6 in roadmap. |
| ~~Multi-region / clustering~~ | ✅ 3-node cluster deployed. Next: full SWIM inter-node connectivity. |
| PITR with WAL timestamps | 6-hour backup RPO is acceptable for pilot. Improve later. |
| Secrets rotation automation | Manual rotation via `/v1/admin/api-keys/:hash/rotate` is fine for 100 projects. |
| Distributed tracing | You have one service. WAL fsync histogram covers what you need. |
---
## Open Questions (Resolve Week 1)
## Open Questions (Resolved)
1. **Image registry**: Which registry does k3s-fleet already use? Check `get_service_config()` in `deploy-stack.sh`.
2. **Bootstrap key API**: Verify exact method signatures on `ApiKeyStore` before writing the seed logic in `main.rs`.
3. **Aphoria scan model**: Do projects run `aphoria scan` locally (calling remote StemeDB) or as a k8s Job? Determines where retry logic lives.
4. **GCS bucket**: Does one exist for backups, or does it need to be created?
5. **CORS**: All router variants in `routers.rs` use `allow_origin(Any)`. Production needs this restricted to Traefik's internal domain. Add `STEMEDB_ALLOWED_ORIGINS` env var support.
1. ~~**Image registry**~~: ✅ Zot OCI registry at `registry.threesix.ai` on k3s. Woodpecker CI pushes automatically.
2. ~~**Bootstrap key API**~~: ✅ `bootstrap::bootstrap_root_api_key()` wired in main.rs.
3. **Aphoria scan model**: Projects run `aphoria scan --persist` locally, calling remote StemeDB. Retry logic lives in Aphoria binary.
4. **GCS bucket**: Needs to be created for backups (Phase 2).
5. **CORS**: All router variants use `allow_origin(Any)`. Restrict before public launch.
---
@ -672,34 +680,31 @@ restarts (Recreate strategy = brief downtime), Aphoria should retry rather than
| Risk | Likelihood | Mitigation |
|------|-----------|-----------|
| Longhorn fsync latency at 100-project burst | Medium | Pin pod + volume to same node (Phase 3), `dataLocality: bestEffort`; monitor WAL p99 from day 1 |
| Single-instance downtime during deploys | High (Recreate strategy) | Startup probe + maintenance window policy + Aphoria retry logic |
| Rolling restart brief downtime | Medium (StatefulSet rolls one pod at a time) | 3 replicas + readiness probe; Gateway routes to healthy pods |
| Fresh PVC after disaster = 100 project keys lost | Low but catastrophic | Bootstrap key seed in `main.rs` + `provision-project-keys.sh` idempotent re-run |
| Image registry blocker | High if unresolved | Resolve Day 1; entire deployment depends on it |
| ~~Image registry blocker~~ | ✅ Resolved | Zot registry on k3s, Woodpecker CI automates builds |
| CORS vulnerability | Medium | `allow_origin(Any)` in all router variants; fix before public launch |
---
## Directory Structure After Phase 1
## Directory Structure (Current)
```
deployments/
└── k8s/
└── base/
└── stemedb/
├── kustomization.yaml
├── namespace.yaml
├── pvc.yaml
├── deployment.yaml
├── service.yaml
├── ingress.yaml
├── middleware.yaml
└── external-secret.yaml
# k3s-fleet repo
k3s-fleet/deployments/k8s/base/stemedb/
└── stemedb.yaml # All-in-one: ExternalSecret, headless Service,
# gateway Service, 3-replica StatefulSet, Ingress
# stemedb repo
scripts/
└── provision-project-keys.sh (new)
├── entrypoint.sh # Dual-binary launcher (cluster mode)
└── provision-project-keys.sh
.woodpecker.yml # CI/CD: Kaniko → Zot registry → kubectl deploy
Dockerfile # Multi-stage: builds stemedb-api + stemedb-node
```
After Phase 2, add to `deployments/k8s/base/stemedb/`:
After Phase 2 hardening, add to `k3s-fleet/.../stemedb/`:
- `backup-cronjob.yaml`
- `service-monitor.yaml`
- `alert-rules.yaml`
@ -708,4 +713,4 @@ After Phase 2, add to `deployments/k8s/base/stemedb/`:
---
*Last updated: 2026-03-02 — Week 1 code changes complete; 3 manual steps remain before deploy*
*Last updated: 2026-03-07 — Phase 1 complete, 3-node StatefulSet deployed with Woodpecker CI/CD*

View File

@ -114,11 +114,11 @@ Internet ──→ LB → │ Cluster Gateway (port 18181) │
### Node Layout
Each node runs the full stack:
- **stemedb-api** (port 18180) - HTTP API, queries, ingest
- **stemedb-gateway** (port 18181) - Cluster coordination
- **stemedb-rpc** (port 18182) - gRPC replication
- **SWIM gossip** (port 18183) - Membership, failure detection
Each pod runs two binaries via `scripts/entrypoint.sh`:
- **stemedb-api** (port 18180) — HTTP API, WAL, KV store, ingestion, query
- **stemedb-node** (port 18181) — Cluster Gateway HTTP (client-facing, routes by shard)
- **stemedb-node** (port 18182) — gRPC (node-to-node sync)
- **stemedb-node** (port 18183) — SWIM gossip (membership, failure detection)
### Replication
@ -142,39 +142,40 @@ Each node runs the full stack:
---
## k3s Deployment Path (Current — Longhorn + StatefulSet)
## k3s Deployment (Current — StatefulSet + Longhorn)
> This is the **current production deployment path** for k3s-fleet. The bare-metal steps below
> are for non-k8s environments and use a config.toml interface that is not yet wired to the binary.
> This is the **current production deployment** on k3s-fleet. Deployed and running as of 2026-03-07.
> The bare-metal steps below are for non-k8s environments and use a config.toml interface that is
> not yet wired to the binary.
For each cluster node on k3s, deploy a separate StatefulSet with its own Longhorn PVC:
A single 3-replica StatefulSet with `VolumeClaimTemplates` provides each pod its own 50Gi Longhorn PVC:
```
k3s-fleet/deployments/k8s/base/stemedb/
├── stemedb.yaml # Node A (current single-node — Phase 1)
├── stemedb-b.yaml # Node B (Phase 2 — add when ready to scale reads)
├── stemedb-c.yaml # Node C (Phase 2)
└── kustomization.yaml
└── stemedb.yaml # Everything: ExternalSecret, headless Service, gateway Service,
# 3-replica StatefulSet, Ingress
```
**Critical k3s constraints:**
- Each node needs its own `ReadWriteOnce` Longhorn PVC — embedded KV (fjall) cannot share a volume
- Use `strategy: Recreate` on each Deployment (not RollingUpdate) — RWO PVC + 2 pods = deadlock
- Cluster gateway (port 18181) must be exposed as a separate Service for inter-node routing
- Use `topologySpreadConstraints` to ensure nodes land on different k3s worker hosts
**Architecture per pod (dual-binary via entrypoint.sh):**
- `stemedb-api` (:18180) — storage engine (WAL + KV)
- `stemedb-node` (:18181) — Gateway HTTP (client-facing, cluster routing)
- `stemedb-node` (:18182) — gRPC (node-to-node sync)
- `stemedb-node` (:18183) — SWIM gossip (membership)
**Phase 2 read-replica k8s addition (when ready):**
```yaml
# Add to stemedb-b.yaml — identical to stemedb.yaml except:
# - Different node ID env var
# - STEMEDB_CLUSTER_SEEDS pointing to Node A's gateway ClusterIP
# - Its own PVC claim
env:
- name: STEMEDB_NODE_ID
value: "node-b"
- name: STEMEDB_CLUSTER_SEEDS
value: "stemedb-api.stemedb.svc:18181"
```
**Pod DNS:** `stemedb-{0,1,2}.stemedb-headless.stemedb.svc.cluster.local`
**Key k8s resources:**
- **Headless Service** (`stemedb-headless`) — stable DNS for all 4 ports
- **Gateway Service** (`stemedb-gateway`) — ClusterIP routing to Gateway port 18181
- **Ingress**`stemedb.threesix.ai``stemedb-gateway:18181` via Traefik
**Environment variables (set in StatefulSet):**
- `STEMEDB_CLUSTER_MODE=true` — starts both binaries via `entrypoint.sh`
- `STEMEDB_NODE_ID` — from `metadata.name` (stemedb-0, stemedb-1, stemedb-2) → stable NodeId via BLAKE3
- `STEMEDB_SEED_NODES` — all 3 headless DNS names on port 18182
- `STEMEDB_NUM_SHARDS=4`, `STEMEDB_REPLICATION_FACTOR=2`
**CI/CD:** Woodpecker pipeline (`.woodpecker.yml`) → Kaniko builds → Zot registry (`registry.threesix.ai`) → `kubectl set image statefulset/stemedb`
**See:** [k8s Deploy Roadmap](../deployment/k8s-deploy-roadmap.md) for the phased rollout plan.
@ -486,8 +487,8 @@ Three nodes on k3s handles the 100-project target. For mass traffic beyond that,
| Phase | Target | Work type | What changes |
|-------|--------|-----------|-------------|
| **Phase 1** | 1 node, 100 projects | ✅ Done | Single Deployment, Longhorn PVC, auth wired |
| **Phase 2** | 3 nodes, read-scaled | Ops-heavy | Add 2 read replicas as separate Deployments; cluster gateway routes reads round-robin |
| **Phase 3** | 3 nodes, write-sharded | Code-heavy | Gateway enforces shard ownership; each node owns ⅓ of subject hash space; reads still any-node |
| **Phase 2** | 3 nodes, cluster mode | ✅ Done | 3-replica StatefulSet, dual-binary, SWIM, Gateway routing, Woodpecker CI |
| **Phase 3** | 3 nodes, full replication | Code-heavy | SWIM inter-node connectivity, Gateway HTTP forwarding, Merkle anti-entropy |
| **Phase 4** | N nodes, coordinator | Code-heavy | Designate one node (or small 3-node Raft group) exclusively for mutable admin state; assertion nodes are pure data |
**What you do NOT need:** Raft on the assertion write path. The append-only, content-addressed design means there are no write conflicts to serialize. Raft belongs only on the mutable admin state path (Phase 4), which is a small fraction of total traffic.
@ -504,4 +505,4 @@ Three nodes on k3s handles the 100-project target. For mass traffic beyond that,
---
**Last Updated:** 2026-03-02 — Added architectural rationale (gossip vs Raft), k3s deployment path, fixed mutable-state coordination notes, added 4-phase scaling table
**Last Updated:** 2026-03-07 — Updated k3s section to match actual 3-replica StatefulSet deployment, dual-binary architecture, Woodpecker CI pipeline

View File

@ -1,37 +1,6 @@
#!/bin/bash
# StemeDB cluster entrypoint — runs both stemedb-api (storage) and stemedb-node (gateway/SWIM).
# StemeDB entrypoint — single binary handles both standalone and cluster mode.
#
# In single-node mode (STEMEDB_CLUSTER_MODE unset or "false"), only stemedb-api runs.
# In cluster mode (STEMEDB_CLUSTER_MODE=true), both binaries run side-by-side.
set -e
CLUSTER_MODE="${STEMEDB_CLUSTER_MODE:-false}"
if [ "$CLUSTER_MODE" = "true" ] || [ "$CLUSTER_MODE" = "1" ]; then
echo "Starting StemeDB in cluster mode"
# Start stemedb-api in background (storage engine on :18180)
stemedb-api &
API_PID=$!
# Wait briefly for API to bind before starting the gateway
sleep 1
# Start stemedb-node in foreground (gateway :18181, gRPC :18182, SWIM :18183)
stemedb-node &
NODE_PID=$!
# Trap signals to shut down both processes
trap 'kill $API_PID $NODE_PID 2>/dev/null; wait' TERM INT
# Wait for either process to exit — if one dies, kill both
wait -n $API_PID $NODE_PID 2>/dev/null || true
EXIT_CODE=$?
kill $API_PID $NODE_PID 2>/dev/null || true
wait
exit $EXIT_CODE
else
echo "Starting StemeDB in single-node mode"
# In cluster mode (STEMEDB_CLUSTER_MODE=true), stemedb-api starts the Gateway,
# gRPC server, and SWIM membership internally alongside the HTTP API.
exec stemedb-api "$@"
fi