341: Buffered Concurrent Stream Processing
Difficulty: 4 Level: Expert Limit concurrent work with a semaphore โ process up to N items at once without overwhelming downstream resources.The Problem This Solves
When processing a large stream of items โ API calls, database writes, file transforms โ spawning an unbounded number of threads will exhaust memory, saturate connections, and cause cascading failures. You need a bounded pipeline that keeps N workers busy at all times and blocks new work from starting until a slot opens. This is the sliding-window concurrency pattern. It appears everywhere: web scrapers that must respect rate limits, batch upload clients, parallel test runners, and data pipelines with expensive per-item work. The naive solution โ `collect` all tasks first, then join โ either starves workers waiting for the slow ones or blows up memory. A semaphore solves this: it acts as a counter of available slots, automatically refilling as each task completes.The Intuition
Think of a bank with exactly N teller windows. Customers (tasks) queue up, and each one waits until a window opens. The moment a teller finishes, the next customer steps forward. You don't need to know how long each transaction takes โ the semaphore enforces the limit automatically. In Rust, a `Semaphore` wraps a `Mutex<usize>` + `Condvar`. Acquiring decrements the counter (blocking at zero); releasing increments it and wakes a waiter.How It Works in Rust
1. Semaphore โ `Mutex<usize>` for the permit count, `Condvar` to park threads waiting for a permit. 2. Spawn all tasks โ iterate over input with `.enumerate()`, capturing `(index, item)` per thread. 3. Each thread acquires before calling `f(item)` and releases immediately after โ not at drop. 4. Collect indexed results into `Arc<Mutex<Vec<(usize, U)>>>`. 5. Restore order โ sort by index after joining all handles. The semaphore is shared via `Arc::clone`, the closure via `Arc<F>`. Results vec is also `Arc`-shared. No async runtime needed โ pure `std::thread`.sem.acquire(); // blocks if N tasks already running
let result = f(item);
sem.release(); // opens a slot for the next waiter
results.lock().unwrap().push((i, result));
What This Unlocks
- Backpressure without async โ rate-limit concurrent work using only `std` primitives.
- Ordered output from unordered execution โ index-tagged results survive any completion ordering.
- Drop-in parallel map โ `buffered_map(items, 4, heavy_fn)` replaces a sequential loop with bounded concurrency.
Key Differences
| Concept | OCaml | Rust |
|---|---|---|
| Concurrency cap | `Semaphore` module / manual counter | `Mutex<usize>` + `Condvar` |
| Result collection | `Array.of_seq` with futures | `Arc<Mutex<Vec<(usize, U)>>>` |
| Ordered output | Future array preserves order | Sort by captured index after join |
| Backpressure | Bounded channel / `Lwt_pool` | Semaphore blocks the spawning thread |
Versions
| Directory | Description |
|---|---|
| `std/` | Standard library version using `std::sync`, `std::thread` |
| `tokio/` | Tokio async runtime version using `tokio::sync`, `tokio::spawn` |
Running
# Standard library version
cd std && cargo test
# Tokio version
cd tokio && cargo test
use std::sync::{Arc, Mutex, Condvar};
use std::thread;
use std::time::Duration;
struct Semaphore { count: Mutex<usize>, cond: Condvar }
impl Semaphore {
fn new(n: usize) -> Arc<Self> {
Arc::new(Self { count: Mutex::new(n), cond: Condvar::new() })
}
fn acquire(&self) {
let mut c = self.count.lock().unwrap();
while *c == 0 { c = self.cond.wait(c).unwrap(); }
*c -= 1;
}
fn release(&self) {
*self.count.lock().unwrap() += 1;
self.cond.notify_one();
}
}
fn buffered_map<T, U, F>(items: Vec<T>, concurrency: usize, f: F) -> Vec<U>
where
T: Send + 'static,
U: Send + 'static,
F: Fn(T) -> U + Send + Sync + 'static,
{
let sem = Semaphore::new(concurrency);
let f = Arc::new(f);
let results: Arc<Mutex<Vec<(usize, U)>>> = Arc::new(Mutex::new(Vec::new()));
let handles: Vec<_> = items
.into_iter()
.enumerate()
.map(|(i, item)| {
let sem = Arc::clone(&sem);
let f = Arc::clone(&f);
let results = Arc::clone(&results);
thread::spawn(move || {
sem.acquire();
let result = f(item);
sem.release();
results.lock().unwrap().push((i, result));
})
})
.collect();
for h in handles { h.join().unwrap(); }
let mut res = results.lock().unwrap().drain(..).collect::<Vec<_>>();
res.sort_by_key(|(i, _)| *i);
res.into_iter().map(|(_, v)| v).collect()
}
fn main() {
let items: Vec<u64> = (1..=10).collect();
let results = buffered_map(items, 3, |x| {
thread::sleep(Duration::from_millis((11 - x) * 3));
x * x
});
println!("Results: {results:?}");
println!("Sum of squares 1-10: {}", results.iter().sum::<u64>());
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn buffered_map_all_results() {
let r = buffered_map(vec![1u64,2,3,4,5], 2, |x| x * 2);
assert_eq!(r, vec![2,4,6,8,10]);
}
#[test]
fn concurrency_1_sequential() {
let r = buffered_map(vec![1,2,3], 1, |x: i32| x + 10);
assert_eq!(r, vec![11,12,13]);
}
}
(* OCaml: buffered concurrent processing *)
let process_with_limit n items f =
let sem = Semaphore.counting_semaphore n in
let results = Array.make (List.length items) None in
let threads = List.mapi (fun i item ->
Thread.create (fun () ->
Semaphore.acquire sem;
let r = f item in
results.(i) <- Some r;
Semaphore.release sem
) ()
) items in
List.iter Thread.join threads;
Array.to_list results |> List.filter_map Fun.id
(* Simpler version using Thread pool concept *)
let chunked_process chunk_size items f =
let chunks =
let rec go acc = function
| [] -> List.rev acc
| lst ->
let chunk = List.filteri (fun i _ -> i < chunk_size) lst in
let rest = List.filteri (fun i _ -> i >= chunk_size) lst in
go (chunk :: acc) rest
in go [] items
in
List.concat_map (fun chunk ->
let threads = List.map (fun x -> Thread.create (fun () -> f x) ()) chunk in
List.map Thread.join threads
) chunks
let () =
let items = List.init 10 (fun i -> i + 1) in
let results = chunked_process 3 items (fun x ->
Thread.delay (float_of_int (11 - x) *. 0.005);
x * x
) in
List.iter (Printf.printf "%d ") results;
print_newline ()
๐ Detailed Comparison
920-buffered-stream โ Language Comparison
std vs tokio
| Aspect | std version | tokio version |
|---|---|---|
| Runtime | OS threads via `std::thread` | Async tasks on tokio runtime |
| Synchronization | `std::sync::Mutex`, `Condvar` | `tokio::sync::Mutex`, channels |
| Channels | `std::sync::mpsc` (unbounded) | `tokio::sync::mpsc` (bounded, async) |
| Blocking | Thread blocks on lock/recv | Task yields, runtime switches tasks |
| Overhead | One OS thread per task | Many tasks per thread (M:N) |
| Best for | CPU-bound, simple concurrency | I/O-bound, high-concurrency servers |