stemedb/crates/stemedb-cluster/tests/sharding_test.rs
jordan 7ae0adaba4 fix: clippy for_kv_map lint in sharding integration test
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-02 20:58:22 -07:00

300 lines
10 KiB
Rust

//! Integration tests for data sharding.
#![allow(clippy::unwrap_used, clippy::expect_used)]
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
use stemedb_cluster::config::{ShardingConfig, SwimConfig};
use stemedb_cluster::membership::{NodeId, NodeInfo, SwimMembership};
use stemedb_cluster::sharding::{MetaRange, RangeDescriptor, RangeManager, RangeRouter, ShardId};
use stemedb_core::types::HlcTimestamp;
fn test_addr(port: u16) -> SocketAddr {
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port)
}
fn test_node_id(n: u8) -> NodeId {
NodeId::from_bytes([n; 16])
}
fn test_node_info(n: u8) -> NodeInfo {
let id = test_node_id(n);
NodeInfo::new(id, test_addr(9090 + n as u16), test_addr(8080 + n as u16))
}
fn create_test_membership(n: u8) -> Arc<SwimMembership> {
let info = test_node_info(n);
Arc::new(SwimMembership::new(info, SwimConfig::default()))
}
#[test]
fn test_subject_routing_consistency() {
let router = RangeRouter::new(test_node_id(1));
// Initialize with 16 shards across 3 nodes
let nodes = vec![test_node_id(1), test_node_id(2), test_node_id(3)];
let meta = MetaRange::with_initial_shards(16, &nodes, 3);
router.update_meta_range(meta);
// Same subject should always route to same shard
let subjects = ["user:alice", "user:bob", "org:acme", "product:widget", "claim:earth-is-round"];
for subject in &subjects {
let shard1 = router.route_subject(subject).unwrap();
let shard2 = router.route_subject(subject).unwrap();
assert_eq!(shard1, shard2, "Subject '{subject}' routed inconsistently");
}
}
#[test]
fn test_subject_routing_distribution() {
let router = RangeRouter::new(test_node_id(1));
let nodes = vec![test_node_id(1), test_node_id(2), test_node_id(3)];
let meta = MetaRange::with_initial_shards(8, &nodes, 2);
router.update_meta_range(meta);
// Route many subjects and check distribution
let mut shard_counts: HashMap<ShardId, usize> = HashMap::new();
for i in 0..10000 {
let subject = format!("test:subject:{i}");
let shard = router.route_subject(&subject).unwrap();
*shard_counts.entry(shard).or_insert(0) += 1;
}
// Each of 8 shards should have roughly 1250 subjects (12.5%)
// Allow 40% variance for small sample
for count in shard_counts.values() {
assert!(*count > 750, "Shard has too few subjects: {count} (expected ~1250)");
assert!(*count < 1750, "Shard has too many subjects: {count} (expected ~1250)");
}
// All 8 shards should have been used
assert_eq!(shard_counts.len(), 8, "Not all shards received subjects");
}
#[test]
fn test_different_subjects_can_route_to_different_shards() {
let router = RangeRouter::new(test_node_id(1));
let nodes = vec![test_node_id(1), test_node_id(2)];
let meta = MetaRange::with_initial_shards(4, &nodes, 2);
router.update_meta_range(meta);
// With enough different subjects, we should see multiple different shards
let mut shards_seen = std::collections::HashSet::new();
for i in 0..100 {
let subject = format!("subject_{i}");
shards_seen.insert(router.route_subject(&subject).unwrap());
}
// Should have seen at least 2 different shards
assert!(shards_seen.len() >= 2, "Expected multiple shards, got {shards_seen:?}");
}
#[tokio::test]
async fn test_range_split_at_threshold() {
let local_id = test_node_id(1);
let router = Arc::new(RangeRouter::new(local_id));
let membership = create_test_membership(1);
// Use small threshold for testing (1MB)
let config = ShardingConfig::testing();
let manager = RangeManager::new(router.clone(), membership, config.clone(), local_id);
// Initialize with 1 shard
let meta = MetaRange::with_initial_shards(1, &[local_id], 1);
router.update_meta_range(meta);
// Simulate shard growing beyond threshold
manager
.update_shard_stats(0, 2 * 1024 * 1024, 5000) // 2MB > 1MB threshold
.unwrap();
// Check splits
let splits = manager.check_splits();
assert_eq!(splits.len(), 1);
assert_eq!(splits[0], 0);
// Perform split
let (left, right) = manager.split_range(0).await.unwrap();
// Should now have 2 shards
assert_eq!(router.num_shards(), 2);
// Both shards should exist and have the same replicas
let left_desc = router.get_descriptor(left).unwrap();
let right_desc = router.get_descriptor(right).unwrap();
// Left ends where right begins
assert_eq!(left_desc.end_key, right_desc.start_key);
// Size should be split roughly in half
assert_eq!(left_desc.size_bytes, 1024 * 1024); // 1MB
assert_eq!(right_desc.size_bytes, 1024 * 1024); // 1MB
}
#[tokio::test]
async fn test_range_merge_below_threshold() {
let local_id = test_node_id(1);
let router = Arc::new(RangeRouter::new(local_id));
let membership = create_test_membership(1);
let config = ShardingConfig::testing();
let manager = RangeManager::new(router.clone(), membership, config.clone(), local_id);
// Create two adjacent shards with small data
let mut meta = MetaRange::new();
let mut left = RangeDescriptor::new(0, Some(vec![0x00]), Some(vec![0x80]), vec![local_id]);
left.size_bytes = 100 * 1024; // 100KB
let mut right = RangeDescriptor::new(1, Some(vec![0x80]), Some(vec![0xFF]), vec![local_id]);
right.size_bytes = 100 * 1024; // 100KB
meta.upsert(left, HlcTimestamp::default());
meta.upsert(right, HlcTimestamp::default());
router.update_meta_range(meta);
// Check merges - combined 200KB < 256KB threshold
let merges = manager.check_merges();
assert_eq!(merges.len(), 1);
assert_eq!(merges[0], (0, 1));
// Perform merge
let merged = manager.merge_ranges(0, 1).await.unwrap();
// Should now have 1 shard
assert_eq!(router.num_shards(), 1);
// Merged shard should cover the full range of both
let desc = router.get_descriptor(merged).unwrap();
assert_eq!(desc.start_key, Some(vec![0x00]));
assert_eq!(desc.end_key, Some(vec![0xFF]));
assert_eq!(desc.size_bytes, 200 * 1024);
}
#[test]
fn test_meta_range_gossip_merge() {
let nodes = vec![test_node_id(1), test_node_id(2), test_node_id(3)];
// Node1 and Node2 start with same meta-range
let router1 = RangeRouter::new(test_node_id(1));
let router2 = RangeRouter::new(test_node_id(2));
let meta = MetaRange::with_initial_shards(4, &nodes, 2);
router1.update_meta_range(meta.clone());
router2.update_meta_range(meta);
// Node1 updates shard 0 statistics
let mut meta1 = router1.get_meta_range();
if let Some(desc) = meta1.get_mut(0) {
desc.size_bytes = 5000;
desc.generation = 10;
}
meta1.version = 10;
router1.update_meta_range(meta1.clone());
// Node2 merges Node1's updates via gossip
router2.merge_meta_range(&meta1);
// Node2 should now have the updated stats
let desc2 = router2.get_descriptor(0).unwrap();
assert_eq!(desc2.size_bytes, 5000);
assert_eq!(desc2.generation, 10);
}
#[test]
fn test_shard_assignment_to_nodes() {
let nodes = vec![test_node_id(1), test_node_id(2), test_node_id(3)];
let meta = MetaRange::with_initial_shards(12, &nodes, 3);
// Each node should be assigned to all shards (RF=3, 3 nodes)
for node in &nodes {
let shards = meta.shards_for_node(*node);
assert!(!shards.is_empty(), "Node {} has no shard assignments", node.short_hex());
}
// Each shard should have exactly 3 replicas
for shard_id in 0..12 {
let desc = meta.get(shard_id).unwrap();
assert_eq!(
desc.replicas.len(),
3,
"Shard {shard_id} has {} replicas, expected 3",
desc.replicas.len()
);
}
}
#[test]
fn test_leader_assignment_round_robin() {
let nodes = vec![test_node_id(1), test_node_id(2), test_node_id(3)];
let meta = MetaRange::with_initial_shards(9, &nodes, 3);
// Each node should be leader for exactly 3 shards (9/3 = 3)
for node in &nodes {
let leader_shards = meta.leader_shards_for_node(*node);
assert_eq!(
leader_shards.len(),
3,
"Node {} leads {} shards, expected 3",
node.short_hex(),
leader_shards.len()
);
}
}
#[tokio::test]
async fn test_split_preserves_replicas() {
let local_id = test_node_id(1);
let router = Arc::new(RangeRouter::new(local_id));
let membership = create_test_membership(1);
let config = ShardingConfig::testing();
let manager = RangeManager::new(router.clone(), membership, config, local_id);
// Create a shard with 3 replicas
let replicas = vec![test_node_id(1), test_node_id(2), test_node_id(3)];
let meta = MetaRange::with_initial_shards(1, &replicas, 3);
router.update_meta_range(meta);
// Split it
let (left, right) = manager.split_range(0).await.unwrap();
// Both halves should have the same replicas
let left_desc = router.get_descriptor(left).unwrap();
let right_desc = router.get_descriptor(right).unwrap();
assert_eq!(left_desc.replicas.len(), 3);
assert_eq!(right_desc.replicas.len(), 3);
assert_eq!(left_desc.replicas, right_desc.replicas);
}
#[tokio::test]
async fn test_non_adjacent_merge_fails() {
let local_id = test_node_id(1);
let router = Arc::new(RangeRouter::new(local_id));
let membership = create_test_membership(1);
let config = ShardingConfig::testing();
let manager = RangeManager::new(router.clone(), membership, config, local_id);
// Create two non-adjacent shards
let mut meta = MetaRange::new();
meta.upsert(
RangeDescriptor::new(0, Some(vec![0x00]), Some(vec![0x40]), vec![local_id]),
HlcTimestamp::default(),
);
meta.upsert(
RangeDescriptor::new(1, Some(vec![0x80]), Some(vec![0xFF]), vec![local_id]),
HlcTimestamp::default(),
);
router.update_meta_range(meta);
// Merge should fail - not adjacent
let result = manager.merge_ranges(0, 1).await;
assert!(result.is_err());
}