Add stemedb-cluster crate implementing horizontal scaling: - SWIM-based membership protocol for node discovery and failure detection - Consistent hashing (jump hash) for subject-to-shard routing - Range management with dynamic split (>64MB) and merge (<20MB) operations - Stateless HTTP gateway for client request routing via axum - Meta-range gossip merge for cluster-wide metadata propagation Includes restrictive CORS policy, proper error propagation from routing, replica cache invalidation on node failure, and 84 tests (57 unit + 27 integration). Raft MV coordination deferred per design decision. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
261 lines
8.1 KiB
Rust
261 lines
8.1 KiB
Rust
//! Integration tests for cluster membership.
|
|
#![allow(clippy::unwrap_used, clippy::expect_used)]
|
|
|
|
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
|
use stemedb_cluster::membership::{
|
|
MembershipEntry, MembershipEvent, NodeId, NodeInfo, NodeState, SwimMembership,
|
|
};
|
|
use stemedb_cluster::SwimConfig;
|
|
|
|
fn test_addr(port: u16) -> SocketAddr {
|
|
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port)
|
|
}
|
|
|
|
fn test_node_info(n: u8) -> NodeInfo {
|
|
let id = NodeId::from_bytes([n; 16]);
|
|
NodeInfo::new(id, test_addr(9090 + n as u16), test_addr(8080 + n as u16))
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_three_node_discovery_via_manual_updates() {
|
|
// Simulate 3 nodes discovering each other via gossip updates
|
|
let node1_info = test_node_info(1);
|
|
let node2_info = test_node_info(2);
|
|
let node3_info = test_node_info(3);
|
|
|
|
let config = SwimConfig::fast();
|
|
|
|
// Create 3 membership instances
|
|
let m1 = SwimMembership::new(node1_info.clone(), config.clone());
|
|
let m2 = SwimMembership::new(node2_info.clone(), config.clone());
|
|
let m3 = SwimMembership::new(node3_info.clone(), config.clone());
|
|
|
|
// Bootstrap node1 (first node)
|
|
m1.join(vec![]).await.unwrap();
|
|
|
|
// Node2 joins, discovers node1
|
|
m2.alive_node(node1_info.id, node1_info.clone());
|
|
|
|
// Node3 joins, discovers node1 and node2
|
|
m3.alive_node(node1_info.id, node1_info.clone());
|
|
m3.alive_node(node2_info.id, node2_info.clone());
|
|
|
|
// Node1 discovers node2 and node3
|
|
m1.alive_node(node2_info.id, node2_info.clone());
|
|
m1.alive_node(node3_info.id, node3_info.clone());
|
|
|
|
// Node2 discovers node3
|
|
m2.alive_node(node3_info.id, node3_info.clone());
|
|
|
|
// All nodes should see 2 members (excluding self)
|
|
assert_eq!(m1.member_count(), 2);
|
|
assert_eq!(m2.member_count(), 2);
|
|
assert_eq!(m3.member_count(), 2);
|
|
|
|
// Verify specific members
|
|
assert!(m1.is_member(node2_info.id));
|
|
assert!(m1.is_member(node3_info.id));
|
|
assert!(m2.is_member(node1_info.id));
|
|
assert!(m2.is_member(node3_info.id));
|
|
assert!(m3.is_member(node1_info.id));
|
|
assert!(m3.is_member(node2_info.id));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_node_failure_detection_via_suspicion() {
|
|
let node1_info = test_node_info(1);
|
|
let node2_info = test_node_info(2);
|
|
let node3_info = test_node_info(3);
|
|
|
|
let config = SwimConfig::fast();
|
|
let m1 = SwimMembership::new(node1_info.clone(), config);
|
|
|
|
// Add node2 and node3 as alive members
|
|
m1.alive_node(node2_info.id, node2_info.clone());
|
|
m1.alive_node(node3_info.id, node3_info.clone());
|
|
|
|
assert_eq!(m1.member_count(), 2);
|
|
|
|
// Subscribe to events
|
|
let mut events = m1.subscribe();
|
|
|
|
// Suspect node2 (simulating failed probe)
|
|
m1.suspect_node(node2_info.id);
|
|
|
|
// Node2 should be suspect, not counted as alive
|
|
assert_eq!(m1.member_count(), 1);
|
|
assert!(!m1.is_member(node2_info.id)); // Suspect nodes are not "members"
|
|
|
|
// Verify event was emitted
|
|
let event = events.try_recv().unwrap();
|
|
assert!(matches!(event, MembershipEvent::NodeSuspected(_)));
|
|
|
|
// Confirm failure (suspicion timeout expired)
|
|
m1.fail_node(node2_info.id);
|
|
|
|
let event = events.try_recv().unwrap();
|
|
assert!(matches!(event, MembershipEvent::NodeFailed(_)));
|
|
|
|
// Node3 should still be alive
|
|
assert!(m1.is_member(node3_info.id));
|
|
assert_eq!(m1.member_count(), 1);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_node_rejoin_after_failure() {
|
|
let node1_info = test_node_info(1);
|
|
let mut node2_info = test_node_info(2);
|
|
|
|
let config = SwimConfig::fast();
|
|
let m1 = SwimMembership::new(node1_info.clone(), config);
|
|
|
|
// Add node2
|
|
m1.alive_node(node2_info.id, node2_info.clone());
|
|
assert!(m1.is_member(node2_info.id));
|
|
|
|
// Node2 fails
|
|
m1.suspect_node(node2_info.id);
|
|
m1.fail_node(node2_info.id);
|
|
assert!(!m1.is_member(node2_info.id));
|
|
|
|
// Node2 restarts with higher incarnation
|
|
node2_info.incarnation = 1;
|
|
m1.alive_node(node2_info.id, node2_info.clone());
|
|
|
|
// Node2 should be alive again
|
|
assert!(m1.is_member(node2_info.id));
|
|
assert_eq!(m1.member_count(), 1);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_membership_gossip_propagation() {
|
|
// Simulate gossip propagation across 3 nodes
|
|
let node1_info = test_node_info(1);
|
|
let node2_info = test_node_info(2);
|
|
let node3_info = test_node_info(3);
|
|
|
|
let config = SwimConfig::fast();
|
|
let m1 = SwimMembership::new(node1_info.clone(), config.clone());
|
|
let m2 = SwimMembership::new(node2_info.clone(), config.clone());
|
|
let m3 = SwimMembership::new(node3_info.clone(), config);
|
|
|
|
// Node1 learns about node2
|
|
m1.alive_node(node2_info.id, node2_info.clone());
|
|
|
|
// Node1 gets gossip batch and forwards to node3
|
|
let batch = m1.get_gossip_batch(10);
|
|
assert!(!batch.is_empty());
|
|
|
|
// Forward gossip to node3
|
|
for entry in &batch {
|
|
m3.process_membership_update(entry.clone());
|
|
}
|
|
|
|
// Node3 should now know about node2
|
|
assert!(m3.is_member(node2_info.id));
|
|
|
|
// Node3 learns about node1
|
|
m3.alive_node(node1_info.id, node1_info.clone());
|
|
|
|
// Get node3's gossip and forward to node2
|
|
let batch3 = m3.get_gossip_batch(10);
|
|
for entry in &batch3 {
|
|
m2.process_membership_update(entry.clone());
|
|
}
|
|
|
|
// Node2 should now know about node1 and node3
|
|
assert!(m2.is_member(node1_info.id));
|
|
// node3 is in m3's gossip batch because m3 called alive_node on node1
|
|
// but node3 itself wouldn't be in the batch unless someone else added it
|
|
}
|
|
|
|
#[test]
|
|
fn test_suspicion_timeout_check() {
|
|
let node1_info = test_node_info(1);
|
|
let node2_info = test_node_info(2);
|
|
|
|
let config =
|
|
SwimConfig { suspicion_timeout: std::time::Duration::from_millis(1), ..SwimConfig::fast() };
|
|
|
|
let m1 = SwimMembership::new(node1_info, config);
|
|
m1.alive_node(node2_info.id, node2_info);
|
|
|
|
// Suspect the node
|
|
m1.suspect_node(NodeId::from_bytes([2; 16]));
|
|
|
|
// Wait for suspicion timeout
|
|
std::thread::sleep(std::time::Duration::from_millis(10));
|
|
|
|
// Check timeouts - should promote to dead
|
|
m1.check_suspicion_timeouts();
|
|
|
|
// Node should be dead
|
|
let (_, state) = m1.all_members().into_iter().next().unwrap();
|
|
assert_eq!(state, NodeState::Dead);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_graceful_leave() {
|
|
let node1_info = test_node_info(1);
|
|
let config = SwimConfig::fast();
|
|
let m1 = SwimMembership::new(node1_info, config);
|
|
|
|
// Join and leave
|
|
m1.join(vec![]).await.unwrap();
|
|
assert!(m1.is_joined());
|
|
|
|
m1.leave().await.unwrap();
|
|
assert!(!m1.is_joined());
|
|
}
|
|
|
|
#[test]
|
|
fn test_concurrent_membership_updates() {
|
|
let node1_info = test_node_info(1);
|
|
let config = SwimConfig::default();
|
|
let m1 = SwimMembership::new(node1_info, config);
|
|
|
|
// Simulate concurrent updates for the same node
|
|
let mut node2_v1 = test_node_info(2);
|
|
node2_v1.incarnation = 1;
|
|
|
|
let mut node2_v2 = test_node_info(2);
|
|
node2_v2.incarnation = 2;
|
|
node2_v2.assign_shard(0);
|
|
|
|
// Process older version first
|
|
let entry_v1 = MembershipEntry::new(node2_v1, NodeState::Alive, 1);
|
|
m1.process_membership_update(entry_v1);
|
|
|
|
// Process newer version
|
|
let entry_v2 = MembershipEntry::new(node2_v2.clone(), NodeState::Alive, 2);
|
|
m1.process_membership_update(entry_v2);
|
|
|
|
// Should have the newer version
|
|
let member = m1.get_member(NodeId::from_bytes([2; 16])).unwrap();
|
|
assert_eq!(member.incarnation, 2);
|
|
assert!(member.shard_assignments.contains(&0));
|
|
}
|
|
|
|
#[test]
|
|
fn test_stale_update_ignored() {
|
|
let node1_info = test_node_info(1);
|
|
let config = SwimConfig::default();
|
|
let m1 = SwimMembership::new(node1_info, config);
|
|
|
|
// Add node2 with incarnation 2
|
|
let mut node2_new = test_node_info(2);
|
|
node2_new.incarnation = 2;
|
|
let entry_new = MembershipEntry::new(node2_new, NodeState::Alive, 10);
|
|
m1.process_membership_update(entry_new);
|
|
|
|
// Try to update with older incarnation
|
|
let mut node2_old = test_node_info(2);
|
|
node2_old.incarnation = 1;
|
|
let entry_old = MembershipEntry::new(node2_old, NodeState::Dead, 5);
|
|
m1.process_membership_update(entry_old);
|
|
|
|
// Should still be alive with incarnation 2
|
|
let member = m1.get_member(NodeId::from_bytes([2; 16])).unwrap();
|
|
assert_eq!(member.incarnation, 2);
|
|
}
|