stemedb/crates/stemedb-cluster/tests/membership_test.rs
jordan afed95fe26 feat: Multi-node cluster coordination (Phase 6C)
Add stemedb-cluster crate implementing horizontal scaling:

- SWIM-based membership protocol for node discovery and failure detection
- Consistent hashing (jump hash) for subject-to-shard routing
- Range management with dynamic split (>64MB) and merge (<20MB) operations
- Stateless HTTP gateway for client request routing via axum
- Meta-range gossip merge for cluster-wide metadata propagation

Includes restrictive CORS policy, proper error propagation from routing,
replica cache invalidation on node failure, and 84 tests (57 unit + 27
integration). Raft MV coordination deferred per design decision.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-02 20:57:54 -07:00

261 lines
8.1 KiB
Rust

//! Integration tests for cluster membership.
#![allow(clippy::unwrap_used, clippy::expect_used)]
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use stemedb_cluster::membership::{
MembershipEntry, MembershipEvent, NodeId, NodeInfo, NodeState, SwimMembership,
};
use stemedb_cluster::SwimConfig;
fn test_addr(port: u16) -> SocketAddr {
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port)
}
fn test_node_info(n: u8) -> NodeInfo {
let id = NodeId::from_bytes([n; 16]);
NodeInfo::new(id, test_addr(9090 + n as u16), test_addr(8080 + n as u16))
}
#[tokio::test]
async fn test_three_node_discovery_via_manual_updates() {
// Simulate 3 nodes discovering each other via gossip updates
let node1_info = test_node_info(1);
let node2_info = test_node_info(2);
let node3_info = test_node_info(3);
let config = SwimConfig::fast();
// Create 3 membership instances
let m1 = SwimMembership::new(node1_info.clone(), config.clone());
let m2 = SwimMembership::new(node2_info.clone(), config.clone());
let m3 = SwimMembership::new(node3_info.clone(), config.clone());
// Bootstrap node1 (first node)
m1.join(vec![]).await.unwrap();
// Node2 joins, discovers node1
m2.alive_node(node1_info.id, node1_info.clone());
// Node3 joins, discovers node1 and node2
m3.alive_node(node1_info.id, node1_info.clone());
m3.alive_node(node2_info.id, node2_info.clone());
// Node1 discovers node2 and node3
m1.alive_node(node2_info.id, node2_info.clone());
m1.alive_node(node3_info.id, node3_info.clone());
// Node2 discovers node3
m2.alive_node(node3_info.id, node3_info.clone());
// All nodes should see 2 members (excluding self)
assert_eq!(m1.member_count(), 2);
assert_eq!(m2.member_count(), 2);
assert_eq!(m3.member_count(), 2);
// Verify specific members
assert!(m1.is_member(node2_info.id));
assert!(m1.is_member(node3_info.id));
assert!(m2.is_member(node1_info.id));
assert!(m2.is_member(node3_info.id));
assert!(m3.is_member(node1_info.id));
assert!(m3.is_member(node2_info.id));
}
#[tokio::test]
async fn test_node_failure_detection_via_suspicion() {
let node1_info = test_node_info(1);
let node2_info = test_node_info(2);
let node3_info = test_node_info(3);
let config = SwimConfig::fast();
let m1 = SwimMembership::new(node1_info.clone(), config);
// Add node2 and node3 as alive members
m1.alive_node(node2_info.id, node2_info.clone());
m1.alive_node(node3_info.id, node3_info.clone());
assert_eq!(m1.member_count(), 2);
// Subscribe to events
let mut events = m1.subscribe();
// Suspect node2 (simulating failed probe)
m1.suspect_node(node2_info.id);
// Node2 should be suspect, not counted as alive
assert_eq!(m1.member_count(), 1);
assert!(!m1.is_member(node2_info.id)); // Suspect nodes are not "members"
// Verify event was emitted
let event = events.try_recv().unwrap();
assert!(matches!(event, MembershipEvent::NodeSuspected(_)));
// Confirm failure (suspicion timeout expired)
m1.fail_node(node2_info.id);
let event = events.try_recv().unwrap();
assert!(matches!(event, MembershipEvent::NodeFailed(_)));
// Node3 should still be alive
assert!(m1.is_member(node3_info.id));
assert_eq!(m1.member_count(), 1);
}
#[tokio::test]
async fn test_node_rejoin_after_failure() {
let node1_info = test_node_info(1);
let mut node2_info = test_node_info(2);
let config = SwimConfig::fast();
let m1 = SwimMembership::new(node1_info.clone(), config);
// Add node2
m1.alive_node(node2_info.id, node2_info.clone());
assert!(m1.is_member(node2_info.id));
// Node2 fails
m1.suspect_node(node2_info.id);
m1.fail_node(node2_info.id);
assert!(!m1.is_member(node2_info.id));
// Node2 restarts with higher incarnation
node2_info.incarnation = 1;
m1.alive_node(node2_info.id, node2_info.clone());
// Node2 should be alive again
assert!(m1.is_member(node2_info.id));
assert_eq!(m1.member_count(), 1);
}
#[tokio::test]
async fn test_membership_gossip_propagation() {
// Simulate gossip propagation across 3 nodes
let node1_info = test_node_info(1);
let node2_info = test_node_info(2);
let node3_info = test_node_info(3);
let config = SwimConfig::fast();
let m1 = SwimMembership::new(node1_info.clone(), config.clone());
let m2 = SwimMembership::new(node2_info.clone(), config.clone());
let m3 = SwimMembership::new(node3_info.clone(), config);
// Node1 learns about node2
m1.alive_node(node2_info.id, node2_info.clone());
// Node1 gets gossip batch and forwards to node3
let batch = m1.get_gossip_batch(10);
assert!(!batch.is_empty());
// Forward gossip to node3
for entry in &batch {
m3.process_membership_update(entry.clone());
}
// Node3 should now know about node2
assert!(m3.is_member(node2_info.id));
// Node3 learns about node1
m3.alive_node(node1_info.id, node1_info.clone());
// Get node3's gossip and forward to node2
let batch3 = m3.get_gossip_batch(10);
for entry in &batch3 {
m2.process_membership_update(entry.clone());
}
// Node2 should now know about node1 and node3
assert!(m2.is_member(node1_info.id));
// node3 is in m3's gossip batch because m3 called alive_node on node1
// but node3 itself wouldn't be in the batch unless someone else added it
}
#[test]
fn test_suspicion_timeout_check() {
let node1_info = test_node_info(1);
let node2_info = test_node_info(2);
let config =
SwimConfig { suspicion_timeout: std::time::Duration::from_millis(1), ..SwimConfig::fast() };
let m1 = SwimMembership::new(node1_info, config);
m1.alive_node(node2_info.id, node2_info);
// Suspect the node
m1.suspect_node(NodeId::from_bytes([2; 16]));
// Wait for suspicion timeout
std::thread::sleep(std::time::Duration::from_millis(10));
// Check timeouts - should promote to dead
m1.check_suspicion_timeouts();
// Node should be dead
let (_, state) = m1.all_members().into_iter().next().unwrap();
assert_eq!(state, NodeState::Dead);
}
#[tokio::test]
async fn test_graceful_leave() {
let node1_info = test_node_info(1);
let config = SwimConfig::fast();
let m1 = SwimMembership::new(node1_info, config);
// Join and leave
m1.join(vec![]).await.unwrap();
assert!(m1.is_joined());
m1.leave().await.unwrap();
assert!(!m1.is_joined());
}
#[test]
fn test_concurrent_membership_updates() {
let node1_info = test_node_info(1);
let config = SwimConfig::default();
let m1 = SwimMembership::new(node1_info, config);
// Simulate concurrent updates for the same node
let mut node2_v1 = test_node_info(2);
node2_v1.incarnation = 1;
let mut node2_v2 = test_node_info(2);
node2_v2.incarnation = 2;
node2_v2.assign_shard(0);
// Process older version first
let entry_v1 = MembershipEntry::new(node2_v1, NodeState::Alive, 1);
m1.process_membership_update(entry_v1);
// Process newer version
let entry_v2 = MembershipEntry::new(node2_v2.clone(), NodeState::Alive, 2);
m1.process_membership_update(entry_v2);
// Should have the newer version
let member = m1.get_member(NodeId::from_bytes([2; 16])).unwrap();
assert_eq!(member.incarnation, 2);
assert!(member.shard_assignments.contains(&0));
}
#[test]
fn test_stale_update_ignored() {
let node1_info = test_node_info(1);
let config = SwimConfig::default();
let m1 = SwimMembership::new(node1_info, config);
// Add node2 with incarnation 2
let mut node2_new = test_node_info(2);
node2_new.incarnation = 2;
let entry_new = MembershipEntry::new(node2_new, NodeState::Alive, 10);
m1.process_membership_update(entry_new);
// Try to update with older incarnation
let mut node2_old = test_node_info(2);
node2_old.incarnation = 1;
let entry_old = MembershipEntry::new(node2_old, NodeState::Dead, 5);
m1.process_membership_update(entry_old);
// Should still be alive with incarnation 2
let member = m1.get_member(NodeId::from_bytes([2; 16])).unwrap();
assert_eq!(member.incarnation, 2);
}