461: Producer-Consumer Pattern
Difficulty: 3 Level: Intermediate Decouple data production from consumption using a bounded channel with automatic backpressure.The Problem This Solves
In many real systems โ log ingestion, image processing pipelines, network packet handlers โ data arrives faster than it can be processed. Without coordination, producers flood memory with unprocessed work until the system crashes or OOMs. You need a way for producers to slow down automatically when consumers fall behind. The naive fix is an unbounded queue. It "works" until your server's RAM fills up under load spikes. What you really want is backpressure: the producer blocks until there's room, naturally throttling the overall system to the consumer's speed. Beyond throttling, you also need clean shutdown. A producer that finishes its work needs to signal "no more items" without a sentinel value that every consumer must check for. In Rust, dropping all senders is that signal โ the channel itself becomes the lifecycle manager.The Intuition
A bounded `sync_channel(N)` is a concurrent, backpressure-aware queue: when it's full, `send` blocks the producer; when it's empty, `recv` blocks the consumer; and when all senders drop, `recv` returns `Err` โ clean shutdown with no extra code. The core trade-off is throughput vs. memory: smaller buffer = less memory but more blocking; larger buffer = smoother throughput but more latency under load.How It Works in Rust
use std::sync::mpsc;
use std::thread;
// Bounded channel: at most 4 items in flight at once
let (tx, rx) = mpsc::sync_channel::<i32>(4);
// Producer thread โ blocks on send when buffer is full
let producer = thread::spawn(move || {
for i in 0..20 {
tx.send(i).unwrap(); // blocks if channel is full
}
// tx drops here โ channel closes โ consumers see Err
});
// Consumer thread โ blocks on recv when buffer is empty
let consumer = thread::spawn(move || {
for item in rx { // iterator ends when all senders drop
println!("consumed {}", item);
}
});
producer.join().unwrap();
consumer.join().unwrap();
For multiple consumers, wrap `rx` in `Arc<Mutex<Receiver>>` so threads compete for items.
What This Unlocks
- Log/event pipelines: collectors produce at burst speed; processors consume at sustained speed โ bounded channel absorbs spikes.
- Work-stealing thread pools: distribute tasks to N workers without a dispatcher loop.
- Rate-limited writes: disk/network writer consumes at its max speed; producer auto-throttles.
Key Differences
| Concept | OCaml | Rust |
|---|---|---|
| Bounded queue | `Queue.t` + `Mutex` + `Condition` | `mpsc::sync_channel(cap)` |
| Backpressure | Manual capacity check + wait | `SyncSender::send` blocks automatically |
| Multi-consumer | `Arc<Mutex<Queue>>` | `Arc<Mutex<Receiver>>` |
| Shutdown signal | Sentinel `None` value | Drop all `Sender`s โ `recv` returns `Err` |
| Channel type | User-built | `std::sync::mpsc` (stdlib) |
// 461. Producer-consumer pattern
use std::sync::{Arc, Mutex, mpsc};
use std::thread;
fn main() {
let (tx, rx) = mpsc::sync_channel::<u32>(5); // bounded = backpressure
let rx = Arc::new(Mutex::new(rx));
let producers: Vec<_> = (0..3).map(|id| {
let tx = tx.clone();
thread::spawn(move || { for i in 0..8u32 { tx.send(id*100+i).unwrap(); } println!("p{} done",id); })
}).collect();
drop(tx);
let consumers: Vec<_> = (0..2).map(|id| {
let rx = Arc::clone(&rx);
thread::spawn(move || { let mut n=0; loop { match rx.lock().unwrap().recv() { Ok(v)=>{println!("c{} got {}",id,v); n+=1;} Err(_)=>break; } } n })
}).collect();
for p in producers { p.join().unwrap(); }
let total: u32 = consumers.into_iter().map(|c| c.join().unwrap()).sum();
println!("total consumed: {} (expected 24)", total);
}
#[cfg(test)]
mod tests {
use super::*;
#[test] fn test_all_consumed() {
let (tx,rx)=mpsc::sync_channel::<u32>(4); let rx=Arc::new(Mutex::new(rx));
let ps:Vec<_>=(0..2).map(|id|{let tx=tx.clone(); thread::spawn(move || { for i in 0..5u32 { tx.send(id*10+i).unwrap(); } })}).collect();
drop(tx);
let c=thread::spawn(move || rx.lock().unwrap().iter().count());
for p in ps { p.join().unwrap(); }
assert_eq!(c.join().unwrap(), 10);
}
}
(* 461. Producer-consumer โ OCaml *)
let q=Queue.create () let m=Mutex.create ()
let ne=Condition.create () let nf=Condition.create ()
let cap=5 let done_=ref false
let produce v = Mutex.lock m; while Queue.length q>=cap do Condition.wait nf m done;
Queue.push v q; Condition.signal ne; Mutex.unlock m
let consume () = Mutex.lock m; while Queue.is_empty q && not !done_ do Condition.wait ne m done;
let r = if Queue.is_empty q then None
else (let v=Queue.pop q in Condition.signal nf; Some v) in
Mutex.unlock m; r
let () =
let p=Thread.create (fun () ->
for i=1 to 10 do produce i done;
Mutex.lock m; done_:=true; Condition.broadcast ne; Mutex.unlock m
) () in
let rec loop () = match consume () with None->() | Some v -> Printf.printf "got %d\n%!" v; loop () in
Thread.create loop () |> Thread.join;
Thread.join p