stemedb/crates/stemedb-cluster/tests/gateway_test.rs
jordan afed95fe26 feat: Multi-node cluster coordination (Phase 6C)
Add stemedb-cluster crate implementing horizontal scaling:

- SWIM-based membership protocol for node discovery and failure detection
- Consistent hashing (jump hash) for subject-to-shard routing
- Range management with dynamic split (>64MB) and merge (<20MB) operations
- Stateless HTTP gateway for client request routing via axum
- Meta-range gossip merge for cluster-wide metadata propagation

Includes restrictive CORS policy, proper error propagation from routing,
replica cache invalidation on node failure, and 84 tests (57 unit + 27
integration). Raft MV coordination deferred per design decision.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-02 20:57:54 -07:00

240 lines
7.4 KiB
Rust

//! Integration tests for gateway routing.
#![allow(clippy::unwrap_used, clippy::expect_used)]
use axum::body::Body;
use axum::http::{Request, StatusCode};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
use stemedb_cluster::config::SwimConfig;
use stemedb_cluster::membership::{NodeId, NodeInfo, SwimMembership};
use stemedb_cluster::sharding::{MetaRange, RangeRouter};
use stemedb_cluster::Gateway;
use tower::ServiceExt;
fn test_addr(port: u16) -> SocketAddr {
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port)
}
fn test_node_id(n: u8) -> NodeId {
NodeId::from_bytes([n; 16])
}
fn create_test_gateway() -> (Gateway, Arc<RangeRouter>, Arc<SwimMembership>) {
let local_id = test_node_id(1);
let local_info = NodeInfo::new(local_id, test_addr(9090), test_addr(8080));
let router = Arc::new(RangeRouter::new(local_id));
let membership = Arc::new(SwimMembership::new(local_info, SwimConfig::default()));
// Initialize with some shards
let nodes = vec![test_node_id(1), test_node_id(2), test_node_id(3)];
let meta = MetaRange::with_initial_shards(8, &nodes, 2);
router.update_meta_range(meta);
// Add members
let node2 = NodeInfo::new(test_node_id(2), test_addr(9091), test_addr(8081));
let node3 = NodeInfo::new(test_node_id(3), test_addr(9092), test_addr(8082));
membership.alive_node(test_node_id(2), node2);
membership.alive_node(test_node_id(3), node3);
let gateway = Gateway::new(router.clone(), membership.clone(), test_addr(8080));
(gateway, router, membership)
}
#[tokio::test]
async fn test_health_endpoint() {
let (gateway, _router, membership) = create_test_gateway();
// Mark as joined
membership.join(vec![]).await.unwrap();
let app = gateway.router();
let response = app
.oneshot(Request::builder().uri("/v1/health").body(Body::empty()).unwrap())
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap();
let health: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(health["healthy"], true);
assert_eq!(health["reachable_nodes"], 2);
assert_eq!(health["joined"], true);
}
#[tokio::test]
async fn test_cluster_status_endpoint() {
let (gateway, _router, _membership) = create_test_gateway();
let app = gateway.router();
let response = app
.oneshot(Request::builder().uri("/v1/cluster/status").body(Body::empty()).unwrap())
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap();
let status: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(status["node_count"], 2);
assert_eq!(status["shard_count"], 8);
}
#[tokio::test]
async fn test_route_test_endpoint() {
let (gateway, _router, _membership) = create_test_gateway();
let app = gateway.router();
let response = app
.oneshot(
Request::builder()
.uri("/v1/route?subject=test:subject:123")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap();
let route: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(route["subject"], "test:subject:123");
assert!(route["shard_id"].is_number());
assert!(route["replicas"].is_array());
}
#[tokio::test]
async fn test_route_endpoint_missing_subject() {
let (gateway, _router, _membership) = create_test_gateway();
let app = gateway.router();
let response = app
.oneshot(Request::builder().uri("/v1/route").body(Body::empty()).unwrap())
.await
.unwrap();
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn test_assert_endpoint_routes_to_leader() {
let (gateway, _router, _membership) = create_test_gateway();
let app = gateway.router();
let body = serde_json::json!({
"subject": "test:subject",
"predicate": "schema:name",
"object": "Test",
"signature": "sig123",
"public_key": "pk456"
});
let response = app
.oneshot(
Request::builder()
.method("POST")
.uri("/v1/assert")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&body).unwrap()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap();
let result: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert!(result["shard_id"].is_number());
assert!(result["leader_node"].is_string());
}
#[tokio::test]
async fn test_query_endpoint_routes_to_replica() {
let (gateway, _router, _membership) = create_test_gateway();
let app = gateway.router();
let response = app
.oneshot(
Request::builder().uri("/v1/query?subject=test:subject").body(Body::empty()).unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap();
let result: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert!(result["shard_id"].is_number());
assert!(result["served_by"].is_string());
}
#[tokio::test]
async fn test_gateway_routes_same_subject_consistently() {
let (gateway, router, _membership) = create_test_gateway();
// Route the same subject multiple times
let subject = "consistency:test:subject";
let shard1 = router.route_subject(subject).unwrap();
let shard2 = router.route_subject(subject).unwrap();
assert_eq!(shard1, shard2, "Same subject should route to same shard");
// Verify via HTTP endpoint too
let app = gateway.router();
let response = app
.oneshot(
Request::builder()
.uri(format!("/v1/route?subject={subject}"))
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
let body = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap();
let route: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(route["shard_id"].as_u64().unwrap(), shard1 as u64);
}
#[tokio::test]
async fn test_shard_info_endpoint() {
let (gateway, _router, _membership) = create_test_gateway();
let app = gateway.router();
let response = app
.oneshot(Request::builder().uri("/v1/shards/0").body(Body::empty()).unwrap())
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap();
let shard: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(shard["shard_id"], 0);
assert!(shard["replicas"].is_array());
}
#[tokio::test]
async fn test_shard_info_not_found() {
let (gateway, _router, _membership) = create_test_gateway();
let app = gateway.router();
let response = app
.oneshot(Request::builder().uri("/v1/shards/999").body(Body::empty()).unwrap())
.await
.unwrap();
assert_eq!(response.status(), StatusCode::NOT_FOUND);
}