stemedb/crates/stemedb-rpc/src/client.rs
jordan b3e8a9a058 feat: Multi-application expansion with chaos testing and community UI
Major additions:
- Community Next.js app (port 18187) for browsing claims with API docs
- stemedb-chaos crate: Fault injection, chaos testing, CRDT properties
- Latent ingestion system: Reddit/FDA ingesters with ADK-Go agents
- Disputed claims handling: Manual review workflows and validation
- Aphoria security scanner: New extractors (SQL injection, command
  injection, weak crypto, TLS version), policy-based ignores, UAT reports
- Docker infrastructure: Dockerfile, docker-compose.yml for full stack
- VulnBank demo: Intentionally vulnerable multi-language test corpus

SDK & API enhancements:
- Source registry handlers for tracking data provenance
- Metrics endpoint
- Skeptic filtering improvements

Code quality:
- Split 14 large files (>500 lines) into focused modules
- All files now under 500-line limit per project guidelines

Documentation:
- Chaos testing guide, circuit breakers, observability docs
- Phase 7 UAT documentation updates
- Martin Kleppmann technical writer agent

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 01:24:14 -07:00

264 lines
9.0 KiB
Rust

//! gRPC client for node-to-node sync operations.
//!
//! Provides a high-level client with exponential backoff retry for transient failures.
//! All operations are async and safe to call concurrently.
//!
//! # Example
//!
//! ```ignore
//! use stemedb_rpc::client::{SyncClient, RetryConfig};
//!
//! let client = SyncClient::connect("http://peer:18182").await?;
//!
//! // Gossip an assertion
//! let resp = client.gossip(GossipRequest { ... }).await?;
//!
//! // Exchange Merkle roots
//! let resp = client.exchange_roots(RootExchangeRequest { ... }).await?;
//! ```
use crate::error::{Result, RpcError};
use crate::proto::sync_service_client::SyncServiceClient;
use crate::proto::{
FetchRequest, FetchResponse, GetLeavesRequest, GetLeavesResponse, GossipRequest,
GossipResponse, PingRequest, PingResponse, RootExchangeRequest, RootExchangeResponse,
};
use backoff::backoff::Backoff;
use backoff::ExponentialBackoff;
use std::time::Duration;
use tonic::transport::Channel;
use tracing::{debug, instrument, warn};
/// Configuration for retry behavior.
#[derive(Debug, Clone)]
pub struct RetryConfig {
/// Maximum number of retry attempts (default: 5).
pub max_retries: u32,
/// Initial backoff duration (default: 1 second).
pub initial_backoff: Duration,
/// Maximum backoff duration (default: 60 seconds).
pub max_backoff: Duration,
}
impl Default for RetryConfig {
fn default() -> Self {
Self {
max_retries: 5,
initial_backoff: Duration::from_secs(1),
max_backoff: Duration::from_secs(60),
}
}
}
/// Client for sync operations with automatic retry.
///
/// Thread-safe and cloneable - can be shared across tasks.
#[derive(Clone)]
pub struct SyncClient {
inner: SyncServiceClient<Channel>,
retry_config: RetryConfig,
peer_addr: String,
}
impl SyncClient {
/// Connect to a sync service endpoint.
///
/// # Arguments
///
/// * `addr` - The endpoint address (e.g., "http://localhost:18182")
///
/// # Errors
///
/// Returns `RpcError::Connection` if the connection fails.
#[instrument(skip_all, fields(addr = %addr))]
pub async fn connect(addr: &str) -> Result<Self> {
debug!("Connecting to sync service");
let channel = Channel::from_shared(addr.to_string())
.map_err(|e| RpcError::InvalidData(e.to_string()))?
.connect()
.await?;
Ok(Self {
inner: SyncServiceClient::new(channel),
retry_config: RetryConfig::default(),
peer_addr: addr.to_string(),
})
}
/// Configure retry behavior.
#[must_use]
pub fn with_retry_config(mut self, config: RetryConfig) -> Self {
self.retry_config = config;
self
}
/// Returns the peer address this client is connected to.
#[must_use]
pub fn peer_addr(&self) -> &str {
&self.peer_addr
}
/// Create an exponential backoff iterator from the config.
///
/// Includes 50% randomization (jitter) to prevent "thundering herd"
/// when multiple clients retry simultaneously after a transient failure.
fn create_backoff(&self) -> ExponentialBackoff {
ExponentialBackoff {
current_interval: self.retry_config.initial_backoff,
initial_interval: self.retry_config.initial_backoff,
max_interval: self.retry_config.max_backoff,
max_elapsed_time: None, // We control max retries ourselves
randomization_factor: 0.5, // ±50% jitter to prevent thundering herd
..Default::default()
}
}
/// Gossip an assertion to the peer.
///
/// Pushes a new assertion immediately after local ingestion.
/// Retries on transient failures with exponential backoff.
#[instrument(skip(self, request), fields(hash_len = request.assertion_hash.len()))]
pub async fn gossip(&self, request: GossipRequest) -> Result<GossipResponse> {
self.with_retry(|mut client| {
let req = request.clone();
async move { client.gossip(tonic::Request::new(req)).await }
})
.await
}
/// Exchange Merkle roots with the peer.
///
/// Used for anti-entropy sync to detect divergence.
#[instrument(skip(self, request), fields(assertion_count = request.assertion_count))]
pub async fn exchange_roots(
&self,
request: RootExchangeRequest,
) -> Result<RootExchangeResponse> {
self.with_retry(|mut client| {
let req = request.clone();
async move { client.exchange_roots(tonic::Request::new(req)).await }
})
.await
}
/// Fetch assertions by hash from the peer.
///
/// Used after ExchangeRoots to pull missing assertions.
#[instrument(skip(self, request), fields(hash_count = request.hashes.len()))]
pub async fn fetch_assertions(&self, request: FetchRequest) -> Result<FetchResponse> {
self.with_retry(|mut client| {
let req = request.clone();
async move { client.fetch_assertions(tonic::Request::new(req)).await }
})
.await
}
/// Ping the peer for health check.
#[instrument(skip(self, request))]
pub async fn ping(&self, request: PingRequest) -> Result<PingResponse> {
self.with_retry(|mut client| {
let req = request.clone();
async move { client.ping(tonic::Request::new(req)).await }
})
.await
}
/// Get all Merkle tree leaf hashes from the peer.
///
/// Used during anti-entropy sync to compute the diff.
#[instrument(skip(self, request), fields(max_leaves = request.max_leaves))]
pub async fn get_leaves(&self, request: GetLeavesRequest) -> Result<GetLeavesResponse> {
self.with_retry(|mut client| {
let req = request; // Copy, no clone needed
async move { client.get_leaves(tonic::Request::new(req)).await }
})
.await
}
/// Execute an operation with retry on transient failures.
async fn with_retry<F, Fut, T>(&self, op: F) -> Result<T>
where
F: Fn(SyncServiceClient<Channel>) -> Fut,
Fut: std::future::Future<Output = std::result::Result<tonic::Response<T>, tonic::Status>>,
{
let mut backoff = self.create_backoff();
let mut attempts = 0u32;
let mut last_error;
loop {
attempts += 1;
let client = self.inner.clone();
match op(client).await {
Ok(response) => return Ok(response.into_inner()),
Err(status) => {
last_error = status.message().to_string();
// Don't retry on permanent errors
if !Self::is_retryable(&status) {
return Err(RpcError::from(status));
}
// Check retry limit
if attempts >= self.retry_config.max_retries {
return Err(RpcError::RetryExhausted { attempts, last_error });
}
// Get next backoff duration
if let Some(duration) = backoff.next_backoff() {
warn!(
attempt = attempts,
max = self.retry_config.max_retries,
delay_ms = duration.as_millis(),
error = %last_error,
"Retrying after transient error"
);
tokio::time::sleep(duration).await;
} else {
return Err(RpcError::RetryExhausted { attempts, last_error });
}
}
}
}
}
/// Determine if a status code is retryable.
fn is_retryable(status: &tonic::Status) -> bool {
matches!(
status.code(),
tonic::Code::Unavailable
| tonic::Code::DeadlineExceeded
| tonic::Code::Aborted
| tonic::Code::ResourceExhausted
| tonic::Code::Unknown
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_retry_config_default() {
let config = RetryConfig::default();
assert_eq!(config.max_retries, 5);
assert_eq!(config.initial_backoff, Duration::from_secs(1));
assert_eq!(config.max_backoff, Duration::from_secs(60));
}
#[test]
fn test_is_retryable() {
assert!(SyncClient::is_retryable(&tonic::Status::unavailable("test")));
assert!(SyncClient::is_retryable(&tonic::Status::deadline_exceeded("test")));
assert!(SyncClient::is_retryable(&tonic::Status::aborted("test")));
assert!(SyncClient::is_retryable(&tonic::Status::resource_exhausted("test")));
assert!(SyncClient::is_retryable(&tonic::Status::unknown("test")));
// Non-retryable
assert!(!SyncClient::is_retryable(&tonic::Status::invalid_argument("test")));
assert!(!SyncClient::is_retryable(&tonic::Status::not_found("test")));
assert!(!SyncClient::is_retryable(&tonic::Status::permission_denied("test")));
}
}