🦀 Functional Rust

999: Rate Limiter — Token Bucket

Difficulty: Intermediate Category: Async / Concurrency FP Patterns Concept: Token bucket algorithm — refill N tokens/sec, consume one per request Key Insight: `Instant::now().elapsed().as_secs_f64() * rate` gives tokens earned since last refill; `min(tokens + earned, capacity)` prevents overflow; `Mutex<State>` makes it thread-safe

Versions

DirectoryDescription
`std/`Standard library version using `std::sync`, `std::thread`
`tokio/`Tokio async runtime version using `tokio::sync`, `tokio::spawn`

Running

# Standard library version
cd std && cargo test

# Tokio version
cd tokio && cargo test
// 999: Rate Limiter — Token Bucket
// Tokens refill over time; consume one per request. Uses std::time::Instant.

use std::sync::Mutex;
use std::time::{Duration, Instant};
use std::thread;

struct TokenBucket {
    state: Mutex<BucketState>,
    capacity: f64,
    refill_rate: f64, // tokens per second
}

struct BucketState {
    tokens: f64,
    last_refill: Instant,
}

impl TokenBucket {
    fn new(capacity: f64, refill_rate: f64) -> Self {
        TokenBucket {
            state: Mutex::new(BucketState {
                tokens: capacity,
                last_refill: Instant::now(),
            }),
            capacity,
            refill_rate,
        }
    }

    fn refill(state: &mut BucketState, capacity: f64, refill_rate: f64) {
        let elapsed = state.last_refill.elapsed().as_secs_f64();
        let new_tokens = elapsed * refill_rate;
        state.tokens = (state.tokens + new_tokens).min(capacity);
        state.last_refill = Instant::now();
    }

    fn try_acquire(&self, cost: f64) -> bool {
        let mut state = self.state.lock().unwrap();
        Self::refill(&mut state, self.capacity, self.refill_rate);
        if state.tokens >= cost {
            state.tokens -= cost;
            true
        } else {
            false
        }
    }

    fn acquire(&self, cost: f64) {
        while !self.try_acquire(cost) {
            thread::sleep(Duration::from_millis(1));
        }
    }

    fn available_tokens(&self) -> f64 {
        let mut state = self.state.lock().unwrap();
        Self::refill(&mut state, self.capacity, self.refill_rate);
        state.tokens
    }
}

// --- Approach 1: Burst then deny ---
fn burst_then_deny() -> (usize, usize) {
    let bucket = TokenBucket::new(5.0, 1.0); // 5 capacity, 1 token/sec
    let mut allowed = 0;
    let mut denied = 0;

    for _ in 0..10 {
        if bucket.try_acquire(1.0) { allowed += 1; } else { denied += 1; }
    }
    (allowed, denied)
}

// --- Approach 2: Refill over time ---
fn refill_over_time() -> usize {
    let bucket = TokenBucket::new(3.0, 1000.0); // 1000 tokens/sec

    // Drain all 3 tokens
    for _ in 0..3 { assert!(bucket.try_acquire(1.0)); }
    assert!(!bucket.try_acquire(1.0)); // empty

    // Wait 10ms → should get ~10 tokens back, capped at 3
    thread::sleep(Duration::from_millis(15));

    let mut refilled = 0;
    for _ in 0..5 {
        if bucket.try_acquire(1.0) { refilled += 1; }
    }
    refilled
}

// --- Approach 3: Rate-limited batch processing ---
fn rate_limited_processing(items: Vec<i32>, rps: f64) -> Vec<i32> {
    let bucket = TokenBucket::new(rps, rps); // allow rps/sec burst
    items.into_iter().map(|item| {
        bucket.acquire(1.0); // wait for token
        item * 2
    }).collect()
}

fn main() {
    let (allowed, denied) = burst_then_deny();
    println!("burst: {} allowed, {} denied", allowed, denied);

    let refilled = refill_over_time();
    println!("refilled: {} tokens", refilled);

    let results = rate_limited_processing(vec![1, 2, 3], 1000.0);
    println!("rate-limited results: {:?}", results);
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_burst_allows_up_to_capacity() {
        let (allowed, denied) = burst_then_deny();
        assert_eq!(allowed, 5);
        assert_eq!(denied, 5);
    }

    #[test]
    fn test_tokens_refill() {
        let refilled = refill_over_time();
        assert!(refilled >= 3, "expected at least 3 tokens refilled, got {}", refilled);
    }

    #[test]
    fn test_try_acquire_returns_false_when_empty() {
        let bucket = TokenBucket::new(2.0, 0.001); // very slow refill
        bucket.try_acquire(1.0);
        bucket.try_acquire(1.0);
        assert!(!bucket.try_acquire(1.0));
    }

    #[test]
    fn test_available_tokens_starts_at_capacity() {
        let bucket = TokenBucket::new(10.0, 1.0);
        let tokens = bucket.available_tokens();
        assert!((tokens - 10.0).abs() < 0.1, "expected ~10, got {}", tokens);
    }

    #[test]
    fn test_cost_greater_than_one() {
        let bucket = TokenBucket::new(10.0, 1.0);
        // Acquire 5 tokens at once (one heavy request)
        assert!(bucket.try_acquire(5.0));
        // Now only 5 left — can't take 6
        assert!(!bucket.try_acquire(6.0));
        // Can take 5
        assert!(bucket.try_acquire(5.0));
    }

    #[test]
    fn test_rate_limited_processing() {
        let results = rate_limited_processing(vec![1, 2, 3], 1000.0);
        assert_eq!(results, vec![2, 4, 6]);
    }
}
(* 999: Rate Limiter — Token Bucket *)
(* Refill tokens at a fixed rate; consume one per request *)

type token_bucket = {
  mutable tokens: float;
  capacity: float;
  refill_rate: float;  (* tokens per second *)
  mutable last_refill: float;
  m: Mutex.t;
}

let make_bucket ?(capacity=10.0) ?(refill_rate=5.0) () = {
  tokens = capacity;
  capacity;
  refill_rate;
  last_refill = Unix.gettimeofday ();
  m = Mutex.create ();
}

let refill bucket =
  let now = Unix.gettimeofday () in
  let elapsed = now -. bucket.last_refill in
  let new_tokens = elapsed *. bucket.refill_rate in
  bucket.tokens <- Float.min bucket.capacity (bucket.tokens +. new_tokens);
  bucket.last_refill <- now

let try_acquire ?(cost=1.0) bucket =
  Mutex.lock bucket.m;
  refill bucket;
  let allowed = bucket.tokens >= cost in
  if allowed then bucket.tokens <- bucket.tokens -. cost;
  Mutex.unlock bucket.m;
  allowed

let acquire ?(cost=1.0) bucket =
  let rec wait () =
    if try_acquire ~cost bucket then ()
    else (Unix.sleepf 0.001; wait ())
  in
  wait ()

(* --- Approach 1: Burst then throttle --- *)

let () =
  let bucket = make_bucket ~capacity:5.0 ~refill_rate:100.0 () in
  let allowed = ref 0 in
  let denied = ref 0 in

  (* Try 10 requests immediately — only 5 should pass (capacity=5) *)
  for _ = 1 to 10 do
    if try_acquire bucket then incr allowed
    else incr denied
  done;

  assert (!allowed = 5);
  assert (!denied = 5);
  Printf.printf "Approach 1 (burst): %d allowed, %d denied\n" !allowed !denied

(* --- Approach 2: Refill over time --- *)

let () =
  let bucket = make_bucket ~capacity:3.0 ~refill_rate:1000.0 () in
  (* Drain it *)
  for _ = 1 to 3 do assert (try_acquire bucket) done;
  assert (not (try_acquire bucket));  (* empty *)

  (* Wait for refill *)
  Unix.sleepf 0.01;  (* 10ms * 1000 tokens/s = 10 tokens *)

  let allowed = ref 0 in
  for _ = 1 to 5 do
    if try_acquire bucket then incr allowed
  done;
  assert (!allowed >= 3);  (* at least 3 tokens refilled *)
  Printf.printf "Approach 2 (refill): %d tokens refilled\n" !allowed

let () = Printf.printf "✓ All tests passed\n"

📊 Detailed Comparison

Rate Limiter — Token Bucket — Comparison

Core Insight

The token bucket algorithm is continuous-time rate limiting: tokens accumulate at a fixed rate up to capacity, and each request costs tokens. This allows bursts (up to capacity) while enforcing average rate.

OCaml Approach

  • `Unix.gettimeofday ()` returns a `float` (seconds since epoch)
  • `elapsed = now - last_refill` gives time delta
  • `new_tokens = elapsed * refill_rate` — continuous refill
  • `Float.min capacity (tokens + new_tokens)` — cap at capacity
  • `Mutex` for thread safety; `try_acquire` is non-blocking

Rust Approach

  • `Instant::now()` — monotonic clock, immune to system time changes
  • `.elapsed().as_secs_f64()` for fractional seconds
  • `Mutex<BucketState>` wraps mutable state (tokens + last_refill)
  • `acquire` spins with `thread::sleep(1ms)` when empty
  • `try_acquire(cost)` for variable-cost requests (e.g., large queries cost more)

Comparison Table

ConceptOCamlRust
Time primitive`Unix.gettimeofday ()` (wall clock)`Instant::now()` (monotonic)
Elapsed time`now -. last_refill` (float secs)`.elapsed().as_secs_f64()`
Refill formula`min capacity (tokens + dt rate)``(tokens + dt rate).min(capacity)`
Non-blocking check`try_acquire``try_acquire(cost) -> bool`
Blocking acquireSpin with `sleepf 0.001`Spin with `thread::sleep(1ms)`
Variable cost`~cost` parameter`cost: f64` parameter
Thread safety`Mutex.t` + explicit lock/unlock`Mutex<BucketState>` RAII

std vs tokio

Aspectstd versiontokio version
RuntimeOS threads via `std::thread`Async tasks on tokio runtime
Synchronization`std::sync::Mutex`, `Condvar``tokio::sync::Mutex`, channels
Channels`std::sync::mpsc` (unbounded)`tokio::sync::mpsc` (bounded, async)
BlockingThread blocks on lock/recvTask yields, runtime switches tasks
OverheadOne OS thread per taskMany tasks per thread (M:N)
Best forCPU-bound, simple concurrencyI/O-bound, high-concurrency servers