//! Integration tests for gateway routing. #![allow(clippy::unwrap_used, clippy::expect_used)] use axum::body::Body; use axum::http::{Request, StatusCode}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::Arc; use stemedb_cluster::config::SwimConfig; use stemedb_cluster::membership::{NodeId, NodeInfo, SwimMembership}; use stemedb_cluster::sharding::{MetaRange, RangeRouter}; use stemedb_cluster::Gateway; use tower::ServiceExt; fn test_addr(port: u16) -> SocketAddr { SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port) } fn test_node_id(n: u8) -> NodeId { NodeId::from_bytes([n; 16]) } fn create_test_gateway() -> (Gateway, Arc, Arc) { let local_id = test_node_id(1); let local_info = NodeInfo::new(local_id, test_addr(9090), test_addr(8080)); let router = Arc::new(RangeRouter::new(local_id)); let membership = Arc::new(SwimMembership::new(local_info, SwimConfig::default())); // Initialize with some shards let nodes = vec![test_node_id(1), test_node_id(2), test_node_id(3)]; let meta = MetaRange::with_initial_shards(8, &nodes, 2); router.update_meta_range(meta); // Add members let node2 = NodeInfo::new(test_node_id(2), test_addr(9091), test_addr(8081)); let node3 = NodeInfo::new(test_node_id(3), test_addr(9092), test_addr(8082)); membership.alive_node(test_node_id(2), node2); membership.alive_node(test_node_id(3), node3); let gateway = Gateway::new(router.clone(), membership.clone(), test_addr(8080)); (gateway, router, membership) } #[tokio::test] async fn test_health_endpoint() { let (gateway, _router, membership) = create_test_gateway(); // Mark as joined membership.join(vec![]).await.unwrap(); let app = gateway.router(); let response = app .oneshot(Request::builder().uri("/v1/health").body(Body::empty()).unwrap()) .await .unwrap(); assert_eq!(response.status(), StatusCode::OK); let body = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap(); let health: serde_json::Value = serde_json::from_slice(&body).unwrap(); assert_eq!(health["healthy"], true); assert_eq!(health["reachable_nodes"], 2); assert_eq!(health["joined"], true); } #[tokio::test] async fn test_cluster_status_endpoint() { let (gateway, _router, _membership) = create_test_gateway(); let app = gateway.router(); let response = app .oneshot(Request::builder().uri("/v1/cluster/status").body(Body::empty()).unwrap()) .await .unwrap(); assert_eq!(response.status(), StatusCode::OK); let body = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap(); let status: serde_json::Value = serde_json::from_slice(&body).unwrap(); assert_eq!(status["node_count"], 2); assert_eq!(status["shard_count"], 8); } #[tokio::test] async fn test_route_test_endpoint() { let (gateway, _router, _membership) = create_test_gateway(); let app = gateway.router(); let response = app .oneshot( Request::builder() .uri("/v1/route?subject=test:subject:123") .body(Body::empty()) .unwrap(), ) .await .unwrap(); assert_eq!(response.status(), StatusCode::OK); let body = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap(); let route: serde_json::Value = serde_json::from_slice(&body).unwrap(); assert_eq!(route["subject"], "test:subject:123"); assert!(route["shard_id"].is_number()); assert!(route["replicas"].is_array()); } #[tokio::test] async fn test_route_endpoint_missing_subject() { let (gateway, _router, _membership) = create_test_gateway(); let app = gateway.router(); let response = app .oneshot(Request::builder().uri("/v1/route").body(Body::empty()).unwrap()) .await .unwrap(); assert_eq!(response.status(), StatusCode::BAD_REQUEST); } #[tokio::test] async fn test_assert_endpoint_routes_to_leader() { let (gateway, _router, _membership) = create_test_gateway(); let app = gateway.router(); let body = serde_json::json!({ "subject": "test:subject", "predicate": "schema:name", "object": "Test", "signature": "sig123", "public_key": "pk456" }); let response = app .oneshot( Request::builder() .method("POST") .uri("/v1/assert") .header("content-type", "application/json") .body(Body::from(serde_json::to_string(&body).unwrap())) .unwrap(), ) .await .unwrap(); assert_eq!(response.status(), StatusCode::OK); let body = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap(); let result: serde_json::Value = serde_json::from_slice(&body).unwrap(); assert!(result["shard_id"].is_number()); assert!(result["leader_node"].is_string()); } #[tokio::test] async fn test_query_endpoint_routes_to_replica() { let (gateway, _router, _membership) = create_test_gateway(); let app = gateway.router(); let response = app .oneshot( Request::builder().uri("/v1/query?subject=test:subject").body(Body::empty()).unwrap(), ) .await .unwrap(); assert_eq!(response.status(), StatusCode::OK); let body = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap(); let result: serde_json::Value = serde_json::from_slice(&body).unwrap(); assert!(result["shard_id"].is_number()); assert!(result["served_by"].is_string()); } #[tokio::test] async fn test_gateway_routes_same_subject_consistently() { let (gateway, router, _membership) = create_test_gateway(); // Route the same subject multiple times let subject = "consistency:test:subject"; let shard1 = router.route_subject(subject).unwrap(); let shard2 = router.route_subject(subject).unwrap(); assert_eq!(shard1, shard2, "Same subject should route to same shard"); // Verify via HTTP endpoint too let app = gateway.router(); let response = app .oneshot( Request::builder() .uri(format!("/v1/route?subject={subject}")) .body(Body::empty()) .unwrap(), ) .await .unwrap(); let body = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap(); let route: serde_json::Value = serde_json::from_slice(&body).unwrap(); assert_eq!(route["shard_id"].as_u64().unwrap(), shard1 as u64); } #[tokio::test] async fn test_shard_info_endpoint() { let (gateway, _router, _membership) = create_test_gateway(); let app = gateway.router(); let response = app .oneshot(Request::builder().uri("/v1/shards/0").body(Body::empty()).unwrap()) .await .unwrap(); assert_eq!(response.status(), StatusCode::OK); let body = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap(); let shard: serde_json::Value = serde_json::from_slice(&body).unwrap(); assert_eq!(shard["shard_id"], 0); assert!(shard["replicas"].is_array()); } #[tokio::test] async fn test_shard_info_not_found() { let (gateway, _router, _membership) = create_test_gateway(); let app = gateway.router(); let response = app .oneshot(Request::builder().uri("/v1/shards/999").body(Body::empty()).unwrap()) .await .unwrap(); assert_eq!(response.status(), StatusCode::NOT_FOUND); }