//! gRPC client for node-to-node sync operations. //! //! Provides a high-level client with exponential backoff retry for transient failures. //! All operations are async and safe to call concurrently. //! //! # Example //! //! ```ignore //! use stemedb_rpc::client::{SyncClient, RetryConfig}; //! //! let client = SyncClient::connect("http://peer:18182").await?; //! //! // Gossip an assertion //! let resp = client.gossip(GossipRequest { ... }).await?; //! //! // Exchange Merkle roots //! let resp = client.exchange_roots(RootExchangeRequest { ... }).await?; //! ``` use crate::error::{Result, RpcError}; use crate::proto::sync_service_client::SyncServiceClient; use crate::proto::{ FetchRequest, FetchResponse, GetLeavesRequest, GetLeavesResponse, GossipRequest, GossipResponse, PingRequest, PingResponse, RootExchangeRequest, RootExchangeResponse, }; use backoff::backoff::Backoff; use backoff::ExponentialBackoff; use std::time::Duration; use tonic::transport::Channel; use tracing::{debug, instrument, warn}; /// Configuration for retry behavior. #[derive(Debug, Clone)] pub struct RetryConfig { /// Maximum number of retry attempts (default: 5). pub max_retries: u32, /// Initial backoff duration (default: 1 second). pub initial_backoff: Duration, /// Maximum backoff duration (default: 60 seconds). pub max_backoff: Duration, } impl Default for RetryConfig { fn default() -> Self { Self { max_retries: 5, initial_backoff: Duration::from_secs(1), max_backoff: Duration::from_secs(60), } } } /// Client for sync operations with automatic retry. /// /// Thread-safe and cloneable - can be shared across tasks. #[derive(Clone)] pub struct SyncClient { inner: SyncServiceClient, retry_config: RetryConfig, peer_addr: String, } impl SyncClient { /// Connect to a sync service endpoint. /// /// # Arguments /// /// * `addr` - The endpoint address (e.g., "http://localhost:18182") /// /// # Errors /// /// Returns `RpcError::Connection` if the connection fails. #[instrument(skip_all, fields(addr = %addr))] pub async fn connect(addr: &str) -> Result { debug!("Connecting to sync service"); let channel = Channel::from_shared(addr.to_string()) .map_err(|e| RpcError::InvalidData(e.to_string()))? .connect() .await?; Ok(Self { inner: SyncServiceClient::new(channel), retry_config: RetryConfig::default(), peer_addr: addr.to_string(), }) } /// Configure retry behavior. #[must_use] pub fn with_retry_config(mut self, config: RetryConfig) -> Self { self.retry_config = config; self } /// Returns the peer address this client is connected to. #[must_use] pub fn peer_addr(&self) -> &str { &self.peer_addr } /// Create an exponential backoff iterator from the config. /// /// Includes 50% randomization (jitter) to prevent "thundering herd" /// when multiple clients retry simultaneously after a transient failure. fn create_backoff(&self) -> ExponentialBackoff { ExponentialBackoff { current_interval: self.retry_config.initial_backoff, initial_interval: self.retry_config.initial_backoff, max_interval: self.retry_config.max_backoff, max_elapsed_time: None, // We control max retries ourselves randomization_factor: 0.5, // ±50% jitter to prevent thundering herd ..Default::default() } } /// Gossip an assertion to the peer. /// /// Pushes a new assertion immediately after local ingestion. /// Retries on transient failures with exponential backoff. #[instrument(skip(self, request), fields(hash_len = request.assertion_hash.len()))] pub async fn gossip(&self, request: GossipRequest) -> Result { self.with_retry(|mut client| { let req = request.clone(); async move { client.gossip(tonic::Request::new(req)).await } }) .await } /// Exchange Merkle roots with the peer. /// /// Used for anti-entropy sync to detect divergence. #[instrument(skip(self, request), fields(assertion_count = request.assertion_count))] pub async fn exchange_roots( &self, request: RootExchangeRequest, ) -> Result { self.with_retry(|mut client| { let req = request.clone(); async move { client.exchange_roots(tonic::Request::new(req)).await } }) .await } /// Fetch assertions by hash from the peer. /// /// Used after ExchangeRoots to pull missing assertions. #[instrument(skip(self, request), fields(hash_count = request.hashes.len()))] pub async fn fetch_assertions(&self, request: FetchRequest) -> Result { self.with_retry(|mut client| { let req = request.clone(); async move { client.fetch_assertions(tonic::Request::new(req)).await } }) .await } /// Ping the peer for health check. #[instrument(skip(self, request))] pub async fn ping(&self, request: PingRequest) -> Result { self.with_retry(|mut client| { let req = request.clone(); async move { client.ping(tonic::Request::new(req)).await } }) .await } /// Get all Merkle tree leaf hashes from the peer. /// /// Used during anti-entropy sync to compute the diff. #[instrument(skip(self, request), fields(max_leaves = request.max_leaves))] pub async fn get_leaves(&self, request: GetLeavesRequest) -> Result { self.with_retry(|mut client| { let req = request; // Copy, no clone needed async move { client.get_leaves(tonic::Request::new(req)).await } }) .await } /// Execute an operation with retry on transient failures. async fn with_retry(&self, op: F) -> Result where F: Fn(SyncServiceClient) -> Fut, Fut: std::future::Future, tonic::Status>>, { let mut backoff = self.create_backoff(); let mut attempts = 0u32; let mut last_error; loop { attempts += 1; let client = self.inner.clone(); match op(client).await { Ok(response) => return Ok(response.into_inner()), Err(status) => { last_error = status.message().to_string(); // Don't retry on permanent errors if !Self::is_retryable(&status) { return Err(RpcError::from(status)); } // Check retry limit if attempts >= self.retry_config.max_retries { return Err(RpcError::RetryExhausted { attempts, last_error }); } // Get next backoff duration if let Some(duration) = backoff.next_backoff() { warn!( attempt = attempts, max = self.retry_config.max_retries, delay_ms = duration.as_millis(), error = %last_error, "Retrying after transient error" ); tokio::time::sleep(duration).await; } else { return Err(RpcError::RetryExhausted { attempts, last_error }); } } } } } /// Determine if a status code is retryable. fn is_retryable(status: &tonic::Status) -> bool { matches!( status.code(), tonic::Code::Unavailable | tonic::Code::DeadlineExceeded | tonic::Code::Aborted | tonic::Code::ResourceExhausted | tonic::Code::Unknown ) } } #[cfg(test)] mod tests { use super::*; #[test] fn test_retry_config_default() { let config = RetryConfig::default(); assert_eq!(config.max_retries, 5); assert_eq!(config.initial_backoff, Duration::from_secs(1)); assert_eq!(config.max_backoff, Duration::from_secs(60)); } #[test] fn test_is_retryable() { assert!(SyncClient::is_retryable(&tonic::Status::unavailable("test"))); assert!(SyncClient::is_retryable(&tonic::Status::deadline_exceeded("test"))); assert!(SyncClient::is_retryable(&tonic::Status::aborted("test"))); assert!(SyncClient::is_retryable(&tonic::Status::resource_exhausted("test"))); assert!(SyncClient::is_retryable(&tonic::Status::unknown("test"))); // Non-retryable assert!(!SyncClient::is_retryable(&tonic::Status::invalid_argument("test"))); assert!(!SyncClient::is_retryable(&tonic::Status::not_found("test"))); assert!(!SyncClient::is_retryable(&tonic::Status::permission_denied("test"))); } }