๐Ÿฆ€ Functional Rust

341: Buffered Concurrent Stream Processing

Difficulty: 4 Level: Expert Limit concurrent work with a semaphore โ€” process up to N items at once without overwhelming downstream resources.

The Problem This Solves

When processing a large stream of items โ€” API calls, database writes, file transforms โ€” spawning an unbounded number of threads will exhaust memory, saturate connections, and cause cascading failures. You need a bounded pipeline that keeps N workers busy at all times and blocks new work from starting until a slot opens. This is the sliding-window concurrency pattern. It appears everywhere: web scrapers that must respect rate limits, batch upload clients, parallel test runners, and data pipelines with expensive per-item work. The naive solution โ€” `collect` all tasks first, then join โ€” either starves workers waiting for the slow ones or blows up memory. A semaphore solves this: it acts as a counter of available slots, automatically refilling as each task completes.

The Intuition

Think of a bank with exactly N teller windows. Customers (tasks) queue up, and each one waits until a window opens. The moment a teller finishes, the next customer steps forward. You don't need to know how long each transaction takes โ€” the semaphore enforces the limit automatically. In Rust, a `Semaphore` wraps a `Mutex<usize>` + `Condvar`. Acquiring decrements the counter (blocking at zero); releasing increments it and wakes a waiter.

How It Works in Rust

1. Semaphore โ€” `Mutex<usize>` for the permit count, `Condvar` to park threads waiting for a permit. 2. Spawn all tasks โ€” iterate over input with `.enumerate()`, capturing `(index, item)` per thread. 3. Each thread acquires before calling `f(item)` and releases immediately after โ€” not at drop. 4. Collect indexed results into `Arc<Mutex<Vec<(usize, U)>>>`. 5. Restore order โ€” sort by index after joining all handles. The semaphore is shared via `Arc::clone`, the closure via `Arc<F>`. Results vec is also `Arc`-shared. No async runtime needed โ€” pure `std::thread`.
sem.acquire();       // blocks if N tasks already running
let result = f(item);
sem.release();       // opens a slot for the next waiter
results.lock().unwrap().push((i, result));

What This Unlocks

Key Differences

ConceptOCamlRust
Concurrency cap`Semaphore` module / manual counter`Mutex<usize>` + `Condvar`
Result collection`Array.of_seq` with futures`Arc<Mutex<Vec<(usize, U)>>>`
Ordered outputFuture array preserves orderSort by captured index after join
BackpressureBounded channel / `Lwt_pool`Semaphore blocks the spawning thread

Versions

DirectoryDescription
`std/`Standard library version using `std::sync`, `std::thread`
`tokio/`Tokio async runtime version using `tokio::sync`, `tokio::spawn`

Running

# Standard library version
cd std && cargo test

# Tokio version
cd tokio && cargo test
use std::sync::{Arc, Mutex, Condvar};
use std::thread;
use std::time::Duration;

struct Semaphore { count: Mutex<usize>, cond: Condvar }

impl Semaphore {
    fn new(n: usize) -> Arc<Self> {
        Arc::new(Self { count: Mutex::new(n), cond: Condvar::new() })
    }
    fn acquire(&self) {
        let mut c = self.count.lock().unwrap();
        while *c == 0 { c = self.cond.wait(c).unwrap(); }
        *c -= 1;
    }
    fn release(&self) {
        *self.count.lock().unwrap() += 1;
        self.cond.notify_one();
    }
}

fn buffered_map<T, U, F>(items: Vec<T>, concurrency: usize, f: F) -> Vec<U>
where
    T: Send + 'static,
    U: Send + 'static,
    F: Fn(T) -> U + Send + Sync + 'static,
{
    let sem = Semaphore::new(concurrency);
    let f = Arc::new(f);
    let results: Arc<Mutex<Vec<(usize, U)>>> = Arc::new(Mutex::new(Vec::new()));

    let handles: Vec<_> = items
        .into_iter()
        .enumerate()
        .map(|(i, item)| {
            let sem = Arc::clone(&sem);
            let f = Arc::clone(&f);
            let results = Arc::clone(&results);
            thread::spawn(move || {
                sem.acquire();
                let result = f(item);
                sem.release();
                results.lock().unwrap().push((i, result));
            })
        })
        .collect();

    for h in handles { h.join().unwrap(); }

    let mut res = results.lock().unwrap().drain(..).collect::<Vec<_>>();
    res.sort_by_key(|(i, _)| *i);
    res.into_iter().map(|(_, v)| v).collect()
}

fn main() {
    let items: Vec<u64> = (1..=10).collect();
    let results = buffered_map(items, 3, |x| {
        thread::sleep(Duration::from_millis((11 - x) * 3));
        x * x
    });
    println!("Results: {results:?}");
    println!("Sum of squares 1-10: {}", results.iter().sum::<u64>());
}

#[cfg(test)]
mod tests {
    use super::*;
    #[test]
    fn buffered_map_all_results() {
        let r = buffered_map(vec![1u64,2,3,4,5], 2, |x| x * 2);
        assert_eq!(r, vec![2,4,6,8,10]);
    }
    #[test]
    fn concurrency_1_sequential() {
        let r = buffered_map(vec![1,2,3], 1, |x: i32| x + 10);
        assert_eq!(r, vec![11,12,13]);
    }
}
(* OCaml: buffered concurrent processing *)

let process_with_limit n items f =
  let sem = Semaphore.counting_semaphore n in
  let results = Array.make (List.length items) None in
  let threads = List.mapi (fun i item ->
    Thread.create (fun () ->
      Semaphore.acquire sem;
      let r = f item in
      results.(i) <- Some r;
      Semaphore.release sem
    ) ()
  ) items in
  List.iter Thread.join threads;
  Array.to_list results |> List.filter_map Fun.id

(* Simpler version using Thread pool concept *)
let chunked_process chunk_size items f =
  let chunks =
    let rec go acc = function
      | [] -> List.rev acc
      | lst ->
        let chunk = List.filteri (fun i _ -> i < chunk_size) lst in
        let rest  = List.filteri (fun i _ -> i >= chunk_size) lst in
        go (chunk :: acc) rest
    in go [] items
  in
  List.concat_map (fun chunk ->
    let threads = List.map (fun x -> Thread.create (fun () -> f x) ()) chunk in
    List.map Thread.join threads
  ) chunks

let () =
  let items = List.init 10 (fun i -> i + 1) in
  let results = chunked_process 3 items (fun x ->
    Thread.delay (float_of_int (11 - x) *. 0.005);
    x * x
  ) in
  List.iter (Printf.printf "%d ") results;
  print_newline ()

๐Ÿ“Š Detailed Comparison

920-buffered-stream โ€” Language Comparison

std vs tokio

Aspectstd versiontokio version
RuntimeOS threads via `std::thread`Async tasks on tokio runtime
Synchronization`std::sync::Mutex`, `Condvar``tokio::sync::Mutex`, channels
Channels`std::sync::mpsc` (unbounded)`tokio::sync::mpsc` (bounded, async)
BlockingThread blocks on lock/recvTask yields, runtime switches tasks
OverheadOne OS thread per taskMany tasks per thread (M:N)
Best forCPU-bound, simple concurrencyI/O-bound, high-concurrency servers