Add stemedb-cluster crate implementing horizontal scaling: - SWIM-based membership protocol for node discovery and failure detection - Consistent hashing (jump hash) for subject-to-shard routing - Range management with dynamic split (>64MB) and merge (<20MB) operations - Stateless HTTP gateway for client request routing via axum - Meta-range gossip merge for cluster-wide metadata propagation Includes restrictive CORS policy, proper error propagation from routing, replica cache invalidation on node failure, and 84 tests (57 unit + 27 integration). Raft MV coordination deferred per design decision. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
240 lines
7.4 KiB
Rust
240 lines
7.4 KiB
Rust
//! Integration tests for gateway routing.
|
|
#![allow(clippy::unwrap_used, clippy::expect_used)]
|
|
|
|
use axum::body::Body;
|
|
use axum::http::{Request, StatusCode};
|
|
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
|
use std::sync::Arc;
|
|
use stemedb_cluster::config::SwimConfig;
|
|
use stemedb_cluster::membership::{NodeId, NodeInfo, SwimMembership};
|
|
use stemedb_cluster::sharding::{MetaRange, RangeRouter};
|
|
use stemedb_cluster::Gateway;
|
|
use tower::ServiceExt;
|
|
|
|
fn test_addr(port: u16) -> SocketAddr {
|
|
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port)
|
|
}
|
|
|
|
fn test_node_id(n: u8) -> NodeId {
|
|
NodeId::from_bytes([n; 16])
|
|
}
|
|
|
|
fn create_test_gateway() -> (Gateway, Arc<RangeRouter>, Arc<SwimMembership>) {
|
|
let local_id = test_node_id(1);
|
|
let local_info = NodeInfo::new(local_id, test_addr(9090), test_addr(8080));
|
|
|
|
let router = Arc::new(RangeRouter::new(local_id));
|
|
let membership = Arc::new(SwimMembership::new(local_info, SwimConfig::default()));
|
|
|
|
// Initialize with some shards
|
|
let nodes = vec![test_node_id(1), test_node_id(2), test_node_id(3)];
|
|
let meta = MetaRange::with_initial_shards(8, &nodes, 2);
|
|
router.update_meta_range(meta);
|
|
|
|
// Add members
|
|
let node2 = NodeInfo::new(test_node_id(2), test_addr(9091), test_addr(8081));
|
|
let node3 = NodeInfo::new(test_node_id(3), test_addr(9092), test_addr(8082));
|
|
membership.alive_node(test_node_id(2), node2);
|
|
membership.alive_node(test_node_id(3), node3);
|
|
|
|
let gateway = Gateway::new(router.clone(), membership.clone(), test_addr(8080));
|
|
(gateway, router, membership)
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_health_endpoint() {
|
|
let (gateway, _router, membership) = create_test_gateway();
|
|
|
|
// Mark as joined
|
|
membership.join(vec![]).await.unwrap();
|
|
|
|
let app = gateway.router();
|
|
|
|
let response = app
|
|
.oneshot(Request::builder().uri("/v1/health").body(Body::empty()).unwrap())
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(response.status(), StatusCode::OK);
|
|
|
|
let body = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap();
|
|
let health: serde_json::Value = serde_json::from_slice(&body).unwrap();
|
|
|
|
assert_eq!(health["healthy"], true);
|
|
assert_eq!(health["reachable_nodes"], 2);
|
|
assert_eq!(health["joined"], true);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_cluster_status_endpoint() {
|
|
let (gateway, _router, _membership) = create_test_gateway();
|
|
let app = gateway.router();
|
|
|
|
let response = app
|
|
.oneshot(Request::builder().uri("/v1/cluster/status").body(Body::empty()).unwrap())
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(response.status(), StatusCode::OK);
|
|
|
|
let body = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap();
|
|
let status: serde_json::Value = serde_json::from_slice(&body).unwrap();
|
|
|
|
assert_eq!(status["node_count"], 2);
|
|
assert_eq!(status["shard_count"], 8);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_route_test_endpoint() {
|
|
let (gateway, _router, _membership) = create_test_gateway();
|
|
let app = gateway.router();
|
|
|
|
let response = app
|
|
.oneshot(
|
|
Request::builder()
|
|
.uri("/v1/route?subject=test:subject:123")
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(response.status(), StatusCode::OK);
|
|
|
|
let body = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap();
|
|
let route: serde_json::Value = serde_json::from_slice(&body).unwrap();
|
|
|
|
assert_eq!(route["subject"], "test:subject:123");
|
|
assert!(route["shard_id"].is_number());
|
|
assert!(route["replicas"].is_array());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_route_endpoint_missing_subject() {
|
|
let (gateway, _router, _membership) = create_test_gateway();
|
|
let app = gateway.router();
|
|
|
|
let response = app
|
|
.oneshot(Request::builder().uri("/v1/route").body(Body::empty()).unwrap())
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_assert_endpoint_routes_to_leader() {
|
|
let (gateway, _router, _membership) = create_test_gateway();
|
|
let app = gateway.router();
|
|
|
|
let body = serde_json::json!({
|
|
"subject": "test:subject",
|
|
"predicate": "schema:name",
|
|
"object": "Test",
|
|
"signature": "sig123",
|
|
"public_key": "pk456"
|
|
});
|
|
|
|
let response = app
|
|
.oneshot(
|
|
Request::builder()
|
|
.method("POST")
|
|
.uri("/v1/assert")
|
|
.header("content-type", "application/json")
|
|
.body(Body::from(serde_json::to_string(&body).unwrap()))
|
|
.unwrap(),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(response.status(), StatusCode::OK);
|
|
|
|
let body = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap();
|
|
let result: serde_json::Value = serde_json::from_slice(&body).unwrap();
|
|
|
|
assert!(result["shard_id"].is_number());
|
|
assert!(result["leader_node"].is_string());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_query_endpoint_routes_to_replica() {
|
|
let (gateway, _router, _membership) = create_test_gateway();
|
|
let app = gateway.router();
|
|
|
|
let response = app
|
|
.oneshot(
|
|
Request::builder().uri("/v1/query?subject=test:subject").body(Body::empty()).unwrap(),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(response.status(), StatusCode::OK);
|
|
|
|
let body = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap();
|
|
let result: serde_json::Value = serde_json::from_slice(&body).unwrap();
|
|
|
|
assert!(result["shard_id"].is_number());
|
|
assert!(result["served_by"].is_string());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_gateway_routes_same_subject_consistently() {
|
|
let (gateway, router, _membership) = create_test_gateway();
|
|
|
|
// Route the same subject multiple times
|
|
let subject = "consistency:test:subject";
|
|
let shard1 = router.route_subject(subject).unwrap();
|
|
let shard2 = router.route_subject(subject).unwrap();
|
|
|
|
assert_eq!(shard1, shard2, "Same subject should route to same shard");
|
|
|
|
// Verify via HTTP endpoint too
|
|
let app = gateway.router();
|
|
let response = app
|
|
.oneshot(
|
|
Request::builder()
|
|
.uri(format!("/v1/route?subject={subject}"))
|
|
.body(Body::empty())
|
|
.unwrap(),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
let body = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap();
|
|
let route: serde_json::Value = serde_json::from_slice(&body).unwrap();
|
|
|
|
assert_eq!(route["shard_id"].as_u64().unwrap(), shard1 as u64);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_shard_info_endpoint() {
|
|
let (gateway, _router, _membership) = create_test_gateway();
|
|
let app = gateway.router();
|
|
|
|
let response = app
|
|
.oneshot(Request::builder().uri("/v1/shards/0").body(Body::empty()).unwrap())
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(response.status(), StatusCode::OK);
|
|
|
|
let body = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap();
|
|
let shard: serde_json::Value = serde_json::from_slice(&body).unwrap();
|
|
|
|
assert_eq!(shard["shard_id"], 0);
|
|
assert!(shard["replicas"].is_array());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_shard_info_not_found() {
|
|
let (gateway, _router, _membership) = create_test_gateway();
|
|
let app = gateway.router();
|
|
|
|
let response = app
|
|
.oneshot(Request::builder().uri("/v1/shards/999").body(Body::empty()).unwrap())
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(response.status(), StatusCode::NOT_FOUND);
|
|
}
|