463: Fan-Out / Fan-In
Difficulty: 3 Level: Intermediate Distribute work to N parallel workers (fan-out) and collect their results into one channel (fan-in) โ the map-reduce pattern.The Problem This Solves
You have a list of slow operations: HTTP requests, database lookups, image resizes. Running them sequentially is wasteful โ your CPU and network sit idle most of the time. You want to run them concurrently, but with a bounded number of workers, not one goroutine per item. The temptation is a `thread::spawn` for every item. That works for 100 items; it crashes or thrashes for 100,000. You need a worker pool: fixed N threads, each pulling items from a shared queue, with results flowing into a single collector. This is fan-out (one task stream โ N workers) + fan-in (N result streams โ one receiver). The subtle part is fan-in: `mpsc` is naturally built for it. Each worker gets a clone of the same `Sender` โ they all funnel results into one `Receiver`. When all workers finish (all `Sender` clones drop), the receiver's loop ends cleanly.The Intuition
Fan-out shares work by giving N workers access to the same work queue; fan-in collects results by giving all workers clones of one `Sender` โ `mpsc` is naturally multi-producer, so fan-in is free. The trade-off: more workers = more parallelism but more scheduling overhead; tune N to your bottleneck (CPU-bound โ N=cores, I/O-bound โ N can be much larger).How It Works in Rust
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
let items: Vec<i32> = (0..20).collect();
// Fan-out: wrap work queue in Arc<Mutex> so workers compete for items
let work = Arc::new(Mutex::new(items.into_iter()));
// Fan-in: single receiver collects all results
let (tx, rx) = mpsc::channel::<i32>();
let handles: Vec<_> = (0..4).map(|_| {
let work = Arc::clone(&work);
let tx = tx.clone(); // each worker gets its own Sender clone
thread::spawn(move || {
loop {
let item = work.lock().unwrap().next();
match item {
Some(n) => tx.send(n * n).unwrap(), // process and send result
None => break, // no more work, exit
}
}
// tx drops here โ one fewer sender
})
}).collect();
drop(tx); // drop the original sender so rx ends when all workers finish
// Collect all results
let results: Vec<i32> = rx.iter().collect();
for h in handles { h.join().unwrap(); }
What This Unlocks
- Parallel HTTP requests: N workers each fetching a URL, results merged into one stream.
- MapReduce: fan-out maps over partitions; fan-in collects partial results for a final reduce.
- Bounded thread pools: process M items with N workers without spawning M threads.
Key Differences
| Concept | OCaml | Rust |
|---|---|---|
| Fan-out distribution | Shared `Queue.t` + N threads | `Arc<Mutex<Iterator>>` โ workers compete |
| Fan-in collection | N threads โ shared result queue | N `Sender` clones โ one `Receiver` |
| Result ordering | Manual index tracking | `(id, result)` tuples |
| Worker count | User-managed | `(0..N).map(...)` |
| Shutdown detection | Sentinel value | All `Sender` clones drop โ `recv` returns `Err` |
// 463. Fan-out / fan-in
use std::sync::{Arc, Mutex, mpsc};
use std::thread;
fn fan_map<T,U,F>(items: Vec<T>, n: usize, f: F) -> Vec<U>
where T:Send+'static, U:Send+'static, F:Fn(T)->U+Send+Sync+'static {
let work = Arc::new(Mutex::new(items.into_iter()));
let f = Arc::new(f);
let (tx,rx) = mpsc::channel::<U>();
let ws: Vec<_> = (0..n).map(|_| {
let (w,f,t) = (Arc::clone(&work),Arc::clone(&f),tx.clone());
thread::spawn(move || loop {
let item = w.lock().unwrap().next();
match item { Some(x) => { let _=t.send(f(x)); } None => break }
})
}).collect();
drop(tx);
for w in ws { w.join().unwrap(); }
rx.iter().collect()
}
fn main() {
let mut r = fan_map((1..=12u64).collect(), 4, |x| x*x);
r.sort(); println!("{:?}", r);
// Fan-in demo: 4 producers โ 1 channel
let (tx,rx) = mpsc::channel::<String>();
let ps: Vec<_> = (0..4).map(|id|{let tx=tx.clone(); thread::spawn(move || tx.send(format!("from-{}",id)).unwrap())}).collect();
drop(tx);
for p in ps { p.join().unwrap(); }
let mut msgs: Vec<String>=rx.iter().collect(); msgs.sort();
println!("{:?}", msgs);
}
#[cfg(test)]
mod tests {
use super::*;
#[test] fn test_fan_map() { let mut r=fan_map((1..=8u32).collect(),4,|x|x*2); r.sort(); assert_eq!(r,vec![2,4,6,8,10,12,14,16]); }
#[test] fn test_all() { assert_eq!(fan_map((0..100u32).collect(),8,|x|x).len(), 100); }
}
(* 463. Fan-out / fan-in โ OCaml *)
let fan_map n f items =
let wq=Queue.create () and rq=Queue.create () in
let wm=Mutex.create () and rm=Mutex.create () in
let rc=Condition.create () in
List.iter (fun x->Queue.push x wq) items;
let ws=Array.init n (fun _ -> Thread.create (fun () ->
let go=ref true in while !go do
Mutex.lock wm;
if Queue.is_empty wq then (Mutex.unlock wm; go:=false)
else let x=Queue.pop wq in (Mutex.unlock wm;
let r=f x in Mutex.lock rm; Queue.push r rq; Condition.signal rc; Mutex.unlock rm)
done) ()
) in
Array.iter Thread.join ws;
let results=ref [] in
while not (Queue.is_empty rq) do results:=Queue.pop rq :: !results done;
!results
let () =
let r = fan_map 4 (fun x->x*x) (List.init 12 (fun i->i+1)) in
Printf.printf "%s\n" (String.concat " " (List.map string_of_int (List.sort compare r)))