14 KiB
Task 04: Per-Agent Token-Bucket Rate Limiter
Delivers
RateLimiter struct with DashMap<(String, u64), TokenBucket> keyed by (agent_id, session_id). Lazy refill computed at check time (no background timer thread). Configurable refill rate. TidalError::RateLimited variant. Wired into session_signal() only -- non-session signal writes (TidalDb::signal()) are NOT rate limited.
Complexity: M
Dependencies
- task-01 (for module structure:
tidal/src/load/rate_limiter.rs) - m4 sessions (
AgentId,SessionHandle,session_signal()) TidalError(tidal/src/schema/error.rs)
Technical Design
1. TokenBucket
A classic token bucket with lazy refill. No background thread. The bucket is refilled to capacity at check time based on elapsed wall-clock time since the last refill.
// tidal/src/load/rate_limiter.rs
use std::time::Instant;
/// A single token bucket with lazy refill.
///
/// Tokens are added lazily at `try_acquire()` time based on the elapsed
/// wall-clock time since the last refill. No background thread or timer
/// is needed. This makes the bucket cheap to create and hold in a
/// `DashMap` shard with no per-bucket overhead when idle.
///
/// Invariant: `tokens <= capacity`. Enforced at construction and after
/// every refill.
pub(crate) struct TokenBucket {
/// Current available tokens.
tokens: f64,
/// Maximum tokens (also the initial value).
capacity: f64,
/// Tokens added per second.
refill_rate: f64,
/// Wall-clock instant of the last refill computation.
last_refill: Instant,
}
impl TokenBucket {
/// Create a new bucket, full to capacity.
pub(crate) fn new(capacity: f64, refill_rate: f64) -> Self {
Self {
tokens: capacity,
capacity,
refill_rate,
last_refill: Instant::now(),
}
}
/// Try to acquire one token. Returns `true` if the token was consumed,
/// `false` if the bucket is empty.
///
/// On each call, refills the bucket based on elapsed time since the
/// last refill. This is O(1) with no syscall (Instant::now is vDSO
/// on Linux, mach_absolute_time on macOS).
pub(crate) fn try_acquire(&mut self) -> bool {
self.refill();
if self.tokens >= 1.0 {
self.tokens -= 1.0;
true
} else {
false
}
}
/// Estimated milliseconds until one token is available.
///
/// Returns 0 if the bucket has tokens. Used for the `retry_after_ms`
/// field in `TidalError::RateLimited`.
pub(crate) fn retry_after_ms(&self) -> u64 {
if self.tokens >= 1.0 {
return 0;
}
let deficit = 1.0 - self.tokens;
let secs = deficit / self.refill_rate;
// Ceil to the next millisecond so the caller never retries too early.
#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
let ms = (secs * 1000.0).ceil() as u64;
ms.max(1) // never suggest 0ms retry
}
/// Refill tokens based on elapsed time.
fn refill(&mut self) {
let now = Instant::now();
let elapsed = now.duration_since(self.last_refill);
let added = elapsed.as_secs_f64() * self.refill_rate;
self.tokens = (self.tokens + added).min(self.capacity);
self.last_refill = now;
}
}
2. RateLimiter
use dashmap::DashMap;
/// Configuration for the per-agent rate limiter.
#[derive(Debug, Clone, Copy)]
pub struct RateLimiterConfig {
/// Maximum sustained signals per second per (agent, session) pair.
/// Set to `f64::INFINITY` for no cap (unlimited). Default: unlimited.
pub signals_per_second: f64,
/// Burst capacity (max tokens in the bucket).
/// Set to `f64::INFINITY` for no burst cap (unlimited). Default: unlimited.
pub burst_capacity: f64,
}
impl Default for RateLimiterConfig {
/// Unlimited by default — callers must opt-in to rate limiting via
/// `RateLimiterConfig::limited(rate, burst)` passed to `TidalDb::builder()`.
fn default() -> Self {
Self {
signals_per_second: f64::INFINITY,
burst_capacity: f64::INFINITY,
}
}
}
/// Per-agent, per-session token-bucket rate limiter.
///
/// Uses `DashMap` for concurrent access across threads. Each
/// `(agent_id, session_id)` pair gets its own `TokenBucket`, created
/// lazily on first access. The bucket refills at `signals_per_second`
/// tokens per second up to `burst_capacity`.
///
/// Design choice: keyed by `(agent_id, session_id)` rather than just
/// `agent_id` so that a single agent running multiple concurrent sessions
/// gets separate rate limits per session. If the agent opens 10 sessions,
/// it gets 10x the rate budget -- this is intentional because each session
/// represents a separate user interaction context.
pub struct RateLimiter {
/// Per-(agent, session) token buckets.
buckets: DashMap<(String, u64), TokenBucket>,
config: RateLimiterConfig,
}
impl RateLimiter {
/// Create a new rate limiter with the given configuration.
#[must_use]
pub fn new(config: RateLimiterConfig) -> Self {
Self {
buckets: DashMap::new(),
config,
}
}
/// Check and consume one token for the given agent-session pair.
///
/// Returns `Ok(())` if the signal is allowed.
/// Returns `Err((retry_after_ms, limit))` if rate-limited.
///
/// Thread-safe: concurrent calls for different keys go to different
/// DashMap shards. Concurrent calls for the SAME key contend on the
/// shard lock, which serializes the token check. This is correct
/// because the token bucket is not thread-safe on its own (`&mut self`).
pub fn check(
&self,
agent_id: &str,
session_id: u64,
) -> Result<(), (u64, f64)> {
let key = (agent_id.to_owned(), session_id);
let mut entry = self.buckets.entry(key).or_insert_with(|| {
TokenBucket::new(self.config.burst_capacity, self.config.signals_per_second)
});
if entry.try_acquire() {
Ok(())
} else {
let retry_ms = entry.retry_after_ms();
Err((retry_ms, self.config.signals_per_second))
}
}
/// Remove the bucket for a closed session.
///
/// Called by `close_session()` to prevent unbounded growth of the
/// bucket map. After a session is closed, its rate limit state is
/// no longer needed.
pub fn remove(&self, agent_id: &str, session_id: u64) {
self.buckets.remove(&(agent_id.to_owned(), session_id));
}
/// Number of active buckets (for metrics/diagnostics).
#[must_use]
pub fn active_buckets(&self) -> usize {
self.buckets.len()
}
}
3. TidalError::RateLimited variant
// In tidal/src/schema/error.rs, add to TidalError:
/// A session signal write was rejected because the agent exceeded its
/// rate limit. The signal was NOT written. The caller should back off
/// and retry after the suggested delay.
#[error("rate limited: agent '{agent_id}' at {limit} signals/sec, retry after {retry_after_ms}ms")]
RateLimited {
/// The agent that exceeded its limit.
agent_id: String,
/// The configured rate limit (signals/sec).
limit: f64,
/// Suggested delay before retrying, in milliseconds.
retry_after_ms: u64,
},
4. Wire into TidalDb
Add the RateLimiter as a field on TidalDb:
// In tidal/src/db/mod.rs:
rate_limiter: Arc<crate::load::RateLimiter>,
// In from_config() and from_parts():
rate_limiter: Arc::new(crate::load::RateLimiter::new(
crate::load::rate_limiter::RateLimiterConfig::default(),
)),
5. Rate limit check in session_signal()
The check happens at the top of TidalDb::session_signal(), after the closed-flag check but BEFORE policy evaluation and signal write:
// In tidal/src/db/sessions.rs, in session_signal():
impl TidalDb {
pub fn session_signal(
&self,
handle: &SessionHandle,
signal_type: &str,
entity_id: EntityId,
weight: f64,
ts: Timestamp,
annotation: Option<String>,
) -> crate::Result<()> {
// Runtime guard: check the closed flag.
if handle.closed.load(Ordering::Acquire) {
return Err(TidalError::Internal(format!(
"session {} is closed", handle.id
)));
}
// Rate limit check (per-agent, per-session).
if let Err((retry_after_ms, limit)) =
self.rate_limiter.check(handle.agent_id.as_str(), handle.id.as_u64())
{
return Err(TidalError::RateLimited {
agent_id: handle.agent_id.as_str().to_owned(),
limit,
retry_after_ms,
});
}
// ... existing signal_type validation, policy evaluation, write ...
}
}
6. Cleanup on session close
In TidalDb::close_session(), remove the rate limiter bucket:
// After removing from self.sessions and before building the snapshot:
self.rate_limiter.remove(handle.agent_id.as_str(), handle.id.as_u64());
7. Non-session signals are NOT rate limited
TidalDb::signal() (the non-session write path) does NOT check the rate limiter. Only session_signal() is rate limited. This is intentional: non-session signals come from the application's own ingestion pipeline, not from untrusted agents. Backpressure on the non-session path is handled by task-03's WAL queue depth check.
Acceptance Criteria
TokenBucketwith lazy refill, no background threadTokenBucket::try_acquire()returns true/falseTokenBucket::retry_after_ms()returns >0 when emptyRateLimiterwithDashMap<(String, u64), TokenBucket>keyed by (agent_id, session_id)RateLimiterConfigwith unlimited default; opt-in viaRateLimiterConfig::limited(rate, burst)in builderRateLimiter::check()creates bucket lazily on first callRateLimiter::remove()cleans up closed sessionsTidalError::RateLimited { agent_id, limit, retry_after_ms }variant- Rate limit check wired into
session_signal()before policy evaluation TidalDb::signal()(non-session path) is NOT rate limited- Bucket removed in
close_session() - All existing session tests pass
cargo clippy -D warningsclean
Test Strategy
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
#[test]
fn token_bucket_allows_burst() {
let mut bucket = TokenBucket::new(5.0, 10.0);
// Should allow 5 immediate acquires (burst capacity).
for _ in 0..5 {
assert!(bucket.try_acquire());
}
// 6th should fail (bucket empty, no time has passed).
assert!(!bucket.try_acquire());
}
#[test]
fn token_bucket_refills_over_time() {
let mut bucket = TokenBucket::new(5.0, 100.0);
// Drain all tokens.
for _ in 0..5 {
assert!(bucket.try_acquire());
}
assert!(!bucket.try_acquire());
// Wait 50ms -> should refill ~5 tokens at 100/sec.
std::thread::sleep(std::time::Duration::from_millis(50));
assert!(bucket.try_acquire(), "bucket should have refilled");
}
#[test]
fn token_bucket_retry_after_when_empty() {
let mut bucket = TokenBucket::new(1.0, 10.0);
assert!(bucket.try_acquire());
assert!(!bucket.try_acquire());
let retry = bucket.retry_after_ms();
// At 10 tokens/sec, 1 token deficit = 100ms.
assert!(retry > 0 && retry <= 100, "retry_after_ms={retry}");
}
#[test]
fn rate_limiter_creates_bucket_lazily() {
let rl = RateLimiter::new(RateLimiterConfig::default());
assert_eq!(rl.active_buckets(), 0);
let _ = rl.check("agent-a", 1);
assert_eq!(rl.active_buckets(), 1);
}
#[test]
fn rate_limiter_separate_buckets_per_session() {
let config = RateLimiterConfig {
signals_per_second: 1.0,
burst_capacity: 1.0,
};
let rl = RateLimiter::new(config);
// Session 1: consume the one token.
assert!(rl.check("agent-a", 1).is_ok());
assert!(rl.check("agent-a", 1).is_err());
// Session 2: separate bucket, still has a token.
assert!(rl.check("agent-a", 2).is_ok());
}
#[test]
fn rate_limiter_remove_cleans_up() {
let rl = RateLimiter::new(RateLimiterConfig::default());
let _ = rl.check("agent-a", 1);
assert_eq!(rl.active_buckets(), 1);
rl.remove("agent-a", 1);
assert_eq!(rl.active_buckets(), 0);
}
#[test]
fn rate_limited_error_display() {
let err = crate::TidalError::RateLimited {
agent_id: "planner".to_string(),
limit: 100.0,
retry_after_ms: 50,
};
let msg = err.to_string();
assert!(msg.contains("planner"));
assert!(msg.contains("100"));
assert!(msg.contains("50"));
}
// Property test: a burst of N signals where N <= burst_capacity always succeeds.
mod proptests {
use super::*;
use proptest::prelude::*;
proptest! {
#[test]
fn burst_within_capacity_always_succeeds(
capacity in 1u32..100,
rate in 1u32..1000,
) {
let config = RateLimiterConfig {
signals_per_second: f64::from(rate),
burst_capacity: f64::from(capacity),
};
let rl = RateLimiter::new(config);
for _ in 0..capacity {
prop_assert!(rl.check("agent", 1).is_ok());
}
}
}
}
}