tidaldb/docs/planning/milestone-7/phase-2/task-04-per-agent-rate-limiter.md
2026-02-23 22:41:16 -07:00

14 KiB

Task 04: Per-Agent Token-Bucket Rate Limiter

Delivers

RateLimiter struct with DashMap<(String, u64), TokenBucket> keyed by (agent_id, session_id). Lazy refill computed at check time (no background timer thread). Configurable refill rate. TidalError::RateLimited variant. Wired into session_signal() only -- non-session signal writes (TidalDb::signal()) are NOT rate limited.

Complexity: M

Dependencies

  • task-01 (for module structure: tidal/src/load/rate_limiter.rs)
  • m4 sessions (AgentId, SessionHandle, session_signal())
  • TidalError (tidal/src/schema/error.rs)

Technical Design

1. TokenBucket

A classic token bucket with lazy refill. No background thread. The bucket is refilled to capacity at check time based on elapsed wall-clock time since the last refill.

// tidal/src/load/rate_limiter.rs

use std::time::Instant;

/// A single token bucket with lazy refill.
///
/// Tokens are added lazily at `try_acquire()` time based on the elapsed
/// wall-clock time since the last refill. No background thread or timer
/// is needed. This makes the bucket cheap to create and hold in a
/// `DashMap` shard with no per-bucket overhead when idle.
///
/// Invariant: `tokens <= capacity`. Enforced at construction and after
/// every refill.
pub(crate) struct TokenBucket {
    /// Current available tokens.
    tokens: f64,
    /// Maximum tokens (also the initial value).
    capacity: f64,
    /// Tokens added per second.
    refill_rate: f64,
    /// Wall-clock instant of the last refill computation.
    last_refill: Instant,
}

impl TokenBucket {
    /// Create a new bucket, full to capacity.
    pub(crate) fn new(capacity: f64, refill_rate: f64) -> Self {
        Self {
            tokens: capacity,
            capacity,
            refill_rate,
            last_refill: Instant::now(),
        }
    }

    /// Try to acquire one token. Returns `true` if the token was consumed,
    /// `false` if the bucket is empty.
    ///
    /// On each call, refills the bucket based on elapsed time since the
    /// last refill. This is O(1) with no syscall (Instant::now is vDSO
    /// on Linux, mach_absolute_time on macOS).
    pub(crate) fn try_acquire(&mut self) -> bool {
        self.refill();
        if self.tokens >= 1.0 {
            self.tokens -= 1.0;
            true
        } else {
            false
        }
    }

    /// Estimated milliseconds until one token is available.
    ///
    /// Returns 0 if the bucket has tokens. Used for the `retry_after_ms`
    /// field in `TidalError::RateLimited`.
    pub(crate) fn retry_after_ms(&self) -> u64 {
        if self.tokens >= 1.0 {
            return 0;
        }
        let deficit = 1.0 - self.tokens;
        let secs = deficit / self.refill_rate;
        // Ceil to the next millisecond so the caller never retries too early.
        #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
        let ms = (secs * 1000.0).ceil() as u64;
        ms.max(1) // never suggest 0ms retry
    }

    /// Refill tokens based on elapsed time.
    fn refill(&mut self) {
        let now = Instant::now();
        let elapsed = now.duration_since(self.last_refill);
        let added = elapsed.as_secs_f64() * self.refill_rate;
        self.tokens = (self.tokens + added).min(self.capacity);
        self.last_refill = now;
    }
}

2. RateLimiter

use dashmap::DashMap;

/// Configuration for the per-agent rate limiter.
#[derive(Debug, Clone, Copy)]
pub struct RateLimiterConfig {
    /// Maximum sustained signals per second per (agent, session) pair.
    /// Set to `f64::INFINITY` for no cap (unlimited). Default: unlimited.
    pub signals_per_second: f64,
    /// Burst capacity (max tokens in the bucket).
    /// Set to `f64::INFINITY` for no burst cap (unlimited). Default: unlimited.
    pub burst_capacity: f64,
}

impl Default for RateLimiterConfig {
    /// Unlimited by default — callers must opt-in to rate limiting via
    /// `RateLimiterConfig::limited(rate, burst)` passed to `TidalDb::builder()`.
    fn default() -> Self {
        Self {
            signals_per_second: f64::INFINITY,
            burst_capacity: f64::INFINITY,
        }
    }
}

/// Per-agent, per-session token-bucket rate limiter.
///
/// Uses `DashMap` for concurrent access across threads. Each
/// `(agent_id, session_id)` pair gets its own `TokenBucket`, created
/// lazily on first access. The bucket refills at `signals_per_second`
/// tokens per second up to `burst_capacity`.
///
/// Design choice: keyed by `(agent_id, session_id)` rather than just
/// `agent_id` so that a single agent running multiple concurrent sessions
/// gets separate rate limits per session. If the agent opens 10 sessions,
/// it gets 10x the rate budget -- this is intentional because each session
/// represents a separate user interaction context.
pub struct RateLimiter {
    /// Per-(agent, session) token buckets.
    buckets: DashMap<(String, u64), TokenBucket>,
    config: RateLimiterConfig,
}

impl RateLimiter {
    /// Create a new rate limiter with the given configuration.
    #[must_use]
    pub fn new(config: RateLimiterConfig) -> Self {
        Self {
            buckets: DashMap::new(),
            config,
        }
    }

    /// Check and consume one token for the given agent-session pair.
    ///
    /// Returns `Ok(())` if the signal is allowed.
    /// Returns `Err((retry_after_ms, limit))` if rate-limited.
    ///
    /// Thread-safe: concurrent calls for different keys go to different
    /// DashMap shards. Concurrent calls for the SAME key contend on the
    /// shard lock, which serializes the token check. This is correct
    /// because the token bucket is not thread-safe on its own (`&mut self`).
    pub fn check(
        &self,
        agent_id: &str,
        session_id: u64,
    ) -> Result<(), (u64, f64)> {
        let key = (agent_id.to_owned(), session_id);
        let mut entry = self.buckets.entry(key).or_insert_with(|| {
            TokenBucket::new(self.config.burst_capacity, self.config.signals_per_second)
        });
        if entry.try_acquire() {
            Ok(())
        } else {
            let retry_ms = entry.retry_after_ms();
            Err((retry_ms, self.config.signals_per_second))
        }
    }

    /// Remove the bucket for a closed session.
    ///
    /// Called by `close_session()` to prevent unbounded growth of the
    /// bucket map. After a session is closed, its rate limit state is
    /// no longer needed.
    pub fn remove(&self, agent_id: &str, session_id: u64) {
        self.buckets.remove(&(agent_id.to_owned(), session_id));
    }

    /// Number of active buckets (for metrics/diagnostics).
    #[must_use]
    pub fn active_buckets(&self) -> usize {
        self.buckets.len()
    }
}

3. TidalError::RateLimited variant

// In tidal/src/schema/error.rs, add to TidalError:

/// A session signal write was rejected because the agent exceeded its
/// rate limit. The signal was NOT written. The caller should back off
/// and retry after the suggested delay.
#[error("rate limited: agent '{agent_id}' at {limit} signals/sec, retry after {retry_after_ms}ms")]
RateLimited {
    /// The agent that exceeded its limit.
    agent_id: String,
    /// The configured rate limit (signals/sec).
    limit: f64,
    /// Suggested delay before retrying, in milliseconds.
    retry_after_ms: u64,
},

4. Wire into TidalDb

Add the RateLimiter as a field on TidalDb:

// In tidal/src/db/mod.rs:
rate_limiter: Arc<crate::load::RateLimiter>,

// In from_config() and from_parts():
rate_limiter: Arc::new(crate::load::RateLimiter::new(
    crate::load::rate_limiter::RateLimiterConfig::default(),
)),

5. Rate limit check in session_signal()

The check happens at the top of TidalDb::session_signal(), after the closed-flag check but BEFORE policy evaluation and signal write:

// In tidal/src/db/sessions.rs, in session_signal():

impl TidalDb {
    pub fn session_signal(
        &self,
        handle: &SessionHandle,
        signal_type: &str,
        entity_id: EntityId,
        weight: f64,
        ts: Timestamp,
        annotation: Option<String>,
    ) -> crate::Result<()> {
        // Runtime guard: check the closed flag.
        if handle.closed.load(Ordering::Acquire) {
            return Err(TidalError::Internal(format!(
                "session {} is closed", handle.id
            )));
        }

        // Rate limit check (per-agent, per-session).
        if let Err((retry_after_ms, limit)) =
            self.rate_limiter.check(handle.agent_id.as_str(), handle.id.as_u64())
        {
            return Err(TidalError::RateLimited {
                agent_id: handle.agent_id.as_str().to_owned(),
                limit,
                retry_after_ms,
            });
        }

        // ... existing signal_type validation, policy evaluation, write ...
    }
}

6. Cleanup on session close

In TidalDb::close_session(), remove the rate limiter bucket:

// After removing from self.sessions and before building the snapshot:
self.rate_limiter.remove(handle.agent_id.as_str(), handle.id.as_u64());

7. Non-session signals are NOT rate limited

TidalDb::signal() (the non-session write path) does NOT check the rate limiter. Only session_signal() is rate limited. This is intentional: non-session signals come from the application's own ingestion pipeline, not from untrusted agents. Backpressure on the non-session path is handled by task-03's WAL queue depth check.

Acceptance Criteria

  • TokenBucket with lazy refill, no background thread
  • TokenBucket::try_acquire() returns true/false
  • TokenBucket::retry_after_ms() returns >0 when empty
  • RateLimiter with DashMap<(String, u64), TokenBucket> keyed by (agent_id, session_id)
  • RateLimiterConfig with unlimited default; opt-in via RateLimiterConfig::limited(rate, burst) in builder
  • RateLimiter::check() creates bucket lazily on first call
  • RateLimiter::remove() cleans up closed sessions
  • TidalError::RateLimited { agent_id, limit, retry_after_ms } variant
  • Rate limit check wired into session_signal() before policy evaluation
  • TidalDb::signal() (non-session path) is NOT rate limited
  • Bucket removed in close_session()
  • All existing session tests pass
  • cargo clippy -D warnings clean

Test Strategy

#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
    use super::*;

    #[test]
    fn token_bucket_allows_burst() {
        let mut bucket = TokenBucket::new(5.0, 10.0);
        // Should allow 5 immediate acquires (burst capacity).
        for _ in 0..5 {
            assert!(bucket.try_acquire());
        }
        // 6th should fail (bucket empty, no time has passed).
        assert!(!bucket.try_acquire());
    }

    #[test]
    fn token_bucket_refills_over_time() {
        let mut bucket = TokenBucket::new(5.0, 100.0);
        // Drain all tokens.
        for _ in 0..5 {
            assert!(bucket.try_acquire());
        }
        assert!(!bucket.try_acquire());

        // Wait 50ms -> should refill ~5 tokens at 100/sec.
        std::thread::sleep(std::time::Duration::from_millis(50));
        assert!(bucket.try_acquire(), "bucket should have refilled");
    }

    #[test]
    fn token_bucket_retry_after_when_empty() {
        let mut bucket = TokenBucket::new(1.0, 10.0);
        assert!(bucket.try_acquire());
        assert!(!bucket.try_acquire());
        let retry = bucket.retry_after_ms();
        // At 10 tokens/sec, 1 token deficit = 100ms.
        assert!(retry > 0 && retry <= 100, "retry_after_ms={retry}");
    }

    #[test]
    fn rate_limiter_creates_bucket_lazily() {
        let rl = RateLimiter::new(RateLimiterConfig::default());
        assert_eq!(rl.active_buckets(), 0);
        let _ = rl.check("agent-a", 1);
        assert_eq!(rl.active_buckets(), 1);
    }

    #[test]
    fn rate_limiter_separate_buckets_per_session() {
        let config = RateLimiterConfig {
            signals_per_second: 1.0,
            burst_capacity: 1.0,
        };
        let rl = RateLimiter::new(config);

        // Session 1: consume the one token.
        assert!(rl.check("agent-a", 1).is_ok());
        assert!(rl.check("agent-a", 1).is_err());

        // Session 2: separate bucket, still has a token.
        assert!(rl.check("agent-a", 2).is_ok());
    }

    #[test]
    fn rate_limiter_remove_cleans_up() {
        let rl = RateLimiter::new(RateLimiterConfig::default());
        let _ = rl.check("agent-a", 1);
        assert_eq!(rl.active_buckets(), 1);
        rl.remove("agent-a", 1);
        assert_eq!(rl.active_buckets(), 0);
    }

    #[test]
    fn rate_limited_error_display() {
        let err = crate::TidalError::RateLimited {
            agent_id: "planner".to_string(),
            limit: 100.0,
            retry_after_ms: 50,
        };
        let msg = err.to_string();
        assert!(msg.contains("planner"));
        assert!(msg.contains("100"));
        assert!(msg.contains("50"));
    }

    // Property test: a burst of N signals where N <= burst_capacity always succeeds.
    mod proptests {
        use super::*;
        use proptest::prelude::*;

        proptest! {
            #[test]
            fn burst_within_capacity_always_succeeds(
                capacity in 1u32..100,
                rate in 1u32..1000,
            ) {
                let config = RateLimiterConfig {
                    signals_per_second: f64::from(rate),
                    burst_capacity: f64::from(capacity),
                };
                let rl = RateLimiter::new(config);
                for _ in 0..capacity {
                    prop_assert!(rl.check("agent", 1).is_ok());
                }
            }
        }
    }
}