//! Gossip broadcast implementation. //! //! The gossip layer pushes new assertions to peers immediately after //! local ingestion, providing low-latency replication. //! //! # Design //! //! - **Fanout**: Each assertion is sent to N peers (configurable) //! - **Best-effort**: Failures are logged but don't block ingestion //! - **Idempotent**: Receivers handle duplicates gracefully //! //! # Example //! //! ```ignore //! let broadcaster = GossipBroadcaster::new(vec!["http://peer:18182".into()]).await?; //! //! // Called after each successful ingestion //! broadcaster.broadcast(&hash, &data, &hlc).await?; //! ``` use crate::error::Result; use async_trait::async_trait; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::Arc; use std::time::Instant; use stemedb_core::types::HlcTimestamp; use stemedb_rpc::proto::GossipRequest; use stemedb_rpc::SyncClient; use tokio::sync::Mutex; use tracing::{debug, info, instrument, warn}; // Re-export the trait and error from stemedb-ingest for convenience pub use stemedb_ingest::gossip::{GossipBroadcast, GossipError}; /// Token bucket rate limiter for gossip broadcast. /// /// Limits the number of messages that can be sent per second to prevent /// overwhelming peer nodes under high ingestion load. struct RateLimiter { /// Maximum tokens (messages) allowed per second. max_per_second: u32, /// Current token count. tokens: Mutex, /// Last refill time. last_refill: Mutex, } impl RateLimiter { /// Create a new rate limiter with the given messages-per-second limit. fn new(max_per_second: u32) -> Self { Self { max_per_second, tokens: Mutex::new(max_per_second as f64), last_refill: Mutex::new(Instant::now()), } } /// Try to acquire a token. Returns true if allowed, false if rate limited. async fn try_acquire(&self) -> bool { let mut tokens = self.tokens.lock().await; let mut last_refill = self.last_refill.lock().await; // Refill tokens based on elapsed time let now = Instant::now(); let elapsed = now.duration_since(*last_refill); let refill = elapsed.as_secs_f64() * self.max_per_second as f64; *tokens = (*tokens + refill).min(self.max_per_second as f64); *last_refill = now; // Try to consume a token if *tokens >= 1.0 { *tokens -= 1.0; true } else { false } } } /// Gossip broadcaster that sends assertions to peer nodes. pub struct GossipBroadcaster { clients: Vec>, fanout: usize, enabled: AtomicBool, /// Optional rate limiter to prevent overwhelming peers. rate_limiter: Option, // Metrics messages_sent: AtomicU64, send_failures: AtomicU64, rate_limited: AtomicU64, } impl GossipBroadcaster { /// Create a new gossip broadcaster. /// /// # Arguments /// /// * `peer_addrs` - List of peer addresses to connect to /// /// # Returns /// /// A broadcaster connected to all reachable peers. pub async fn new(peer_addrs: Vec) -> Result { Self::with_fanout(peer_addrs, 3).await } /// Create a gossip broadcaster with custom fanout. /// /// # Arguments /// /// * `peer_addrs` - List of peer addresses /// * `fanout` - Number of peers to send each message to pub async fn with_fanout(peer_addrs: Vec, fanout: usize) -> Result { let mut clients = Vec::with_capacity(peer_addrs.len()); for addr in &peer_addrs { match SyncClient::connect(addr).await { Ok(client) => { info!(peer = %addr, "Connected to peer for gossip"); clients.push(Arc::new(client)); } Err(e) => { // Log but don't fail - peer may come online later warn!(peer = %addr, error = %e, "Failed to connect to peer"); } } } if clients.is_empty() && !peer_addrs.is_empty() { warn!("No peers reachable for gossip broadcast"); } Ok(Self { clients, fanout, enabled: AtomicBool::new(true), rate_limiter: None, messages_sent: AtomicU64::new(0), send_failures: AtomicU64::new(0), rate_limited: AtomicU64::new(0), }) } /// Configure rate limiting for gossip broadcast. /// /// # Arguments /// /// * `max_per_second` - Maximum messages to send per second /// /// # Example /// /// ```ignore /// let broadcaster = GossipBroadcaster::new(peers).await? /// .with_rate_limit(1000); // Max 1000 messages/sec /// ``` #[must_use] pub fn with_rate_limit(mut self, max_per_second: u32) -> Self { self.rate_limiter = Some(RateLimiter::new(max_per_second)); self } /// Get the number of messages sent. pub fn messages_sent(&self) -> u64 { self.messages_sent.load(Ordering::Relaxed) } /// Get the number of send failures. pub fn send_failures(&self) -> u64 { self.send_failures.load(Ordering::Relaxed) } /// Get the number of connected clients. pub fn client_count(&self) -> usize { self.clients.len() } /// Get the number of rate-limited messages. pub fn rate_limited(&self) -> u64 { self.rate_limited.load(Ordering::Relaxed) } } #[async_trait] impl GossipBroadcast for GossipBroadcaster { #[instrument(skip(self, hash, data, hlc), fields(hash = %hex::encode(&hash[..8])))] async fn broadcast( &self, hash: &[u8; 32], data: &[u8], hlc: &HlcTimestamp, ) -> std::result::Result<(), GossipError> { if !self.enabled.load(Ordering::Relaxed) { debug!("Gossip disabled, skipping broadcast"); return Ok(()); } if self.clients.is_empty() { debug!("No peers connected, skipping gossip"); return Ok(()); } // Check rate limiter if configured if let Some(ref limiter) = self.rate_limiter { if !limiter.try_acquire().await { self.rate_limited.fetch_add(1, Ordering::Relaxed); debug!("Gossip rate limited, skipping broadcast"); return Ok(()); } } let request = GossipRequest { assertion_hash: hash.to_vec(), assertion_data: data.to_vec(), hlc_time: hlc.time_ntp64, hlc_counter: 0, // Counter is embedded in time_ntp64 hlc_node_id: hlc.node_id.to_vec(), }; // Select peers for fanout (round-robin or random in future) let targets: Vec<_> = self.clients.iter().take(self.fanout).collect(); if targets.is_empty() { return Ok(()); } debug!(peer_count = targets.len(), "Broadcasting to peers"); // Send to all target peers concurrently let mut handles = Vec::with_capacity(targets.len()); for client in targets { let client = client.clone(); let req = request.clone(); handles.push(tokio::spawn(async move { client.gossip(req).await })); } // Collect results let mut success_count = 0u32; let mut failure_count = 0u32; for handle in handles { match handle.await { Ok(Ok(response)) => { if response.accepted { success_count += 1; } else { warn!(error = %response.error, "Peer rejected gossip"); failure_count += 1; } } Ok(Err(e)) => { warn!(error = %e, "Gossip RPC failed"); failure_count += 1; } Err(e) => { warn!(error = %e, "Gossip task panicked"); failure_count += 1; } } } // Update metrics self.messages_sent.fetch_add(u64::from(success_count), Ordering::Relaxed); self.send_failures.fetch_add(u64::from(failure_count), Ordering::Relaxed); // Best-effort: success if at least one peer accepted if success_count > 0 { debug!(success = success_count, failures = failure_count, "Gossip broadcast complete"); Ok(()) } else if failure_count > 0 { // All peers failed, but don't block the caller warn!(failures = failure_count, "All gossip targets failed"); Ok(()) } else { Ok(()) } } fn is_enabled(&self) -> bool { self.enabled.load(Ordering::Relaxed) } fn enable(&self) { self.enabled.store(true, Ordering::Relaxed); info!("Gossip broadcast enabled"); } fn disable(&self) { self.enabled.store(false, Ordering::Relaxed); info!("Gossip broadcast disabled"); } } #[cfg(test)] mod tests { use super::*; use stemedb_ingest::NoOpGossipBroadcast; #[tokio::test] async fn test_noop_broadcaster() { let broadcaster = NoOpGossipBroadcast; let hash = [1u8; 32]; let data = vec![1, 2, 3]; let hlc = HlcTimestamp::new(1000, [1u8; 16]); broadcaster.broadcast(&hash, &data, &hlc).await.expect("should succeed"); assert!(!broadcaster.is_enabled()); } #[tokio::test] async fn test_broadcaster_no_peers() { let broadcaster = GossipBroadcaster::new(vec![]).await.expect("create"); assert_eq!(broadcaster.client_count(), 0); assert!(broadcaster.is_enabled()); let hash = [1u8; 32]; let data = vec![1, 2, 3]; let hlc = HlcTimestamp::new(1000, [1u8; 16]); // Should succeed even with no peers broadcaster.broadcast(&hash, &data, &hlc).await.expect("should succeed"); } #[tokio::test] async fn test_enable_disable() { let broadcaster = GossipBroadcaster::new(vec![]).await.expect("create"); assert!(broadcaster.is_enabled()); broadcaster.disable(); assert!(!broadcaster.is_enabled()); broadcaster.enable(); assert!(broadcaster.is_enabled()); } }