๐Ÿฆ€ Functional Rust

463: Fan-Out / Fan-In

Difficulty: 3 Level: Intermediate Distribute work to N parallel workers (fan-out) and collect their results into one channel (fan-in) โ€” the map-reduce pattern.

The Problem This Solves

You have a list of slow operations: HTTP requests, database lookups, image resizes. Running them sequentially is wasteful โ€” your CPU and network sit idle most of the time. You want to run them concurrently, but with a bounded number of workers, not one goroutine per item. The temptation is a `thread::spawn` for every item. That works for 100 items; it crashes or thrashes for 100,000. You need a worker pool: fixed N threads, each pulling items from a shared queue, with results flowing into a single collector. This is fan-out (one task stream โ†’ N workers) + fan-in (N result streams โ†’ one receiver). The subtle part is fan-in: `mpsc` is naturally built for it. Each worker gets a clone of the same `Sender` โ€” they all funnel results into one `Receiver`. When all workers finish (all `Sender` clones drop), the receiver's loop ends cleanly.

The Intuition

Fan-out shares work by giving N workers access to the same work queue; fan-in collects results by giving all workers clones of one `Sender` โ€” `mpsc` is naturally multi-producer, so fan-in is free. The trade-off: more workers = more parallelism but more scheduling overhead; tune N to your bottleneck (CPU-bound โ†’ N=cores, I/O-bound โ†’ N can be much larger).

How It Works in Rust

use std::sync::{mpsc, Arc, Mutex};
use std::thread;

let items: Vec<i32> = (0..20).collect();

// Fan-out: wrap work queue in Arc<Mutex> so workers compete for items
let work = Arc::new(Mutex::new(items.into_iter()));

// Fan-in: single receiver collects all results
let (tx, rx) = mpsc::channel::<i32>();

let handles: Vec<_> = (0..4).map(|_| {
 let work = Arc::clone(&work);
 let tx = tx.clone();          // each worker gets its own Sender clone
 thread::spawn(move || {
     loop {
         let item = work.lock().unwrap().next();
         match item {
             Some(n) => tx.send(n * n).unwrap(),   // process and send result
             None => break,                         // no more work, exit
         }
     }
     // tx drops here โ€” one fewer sender
 })
}).collect();

drop(tx); // drop the original sender so rx ends when all workers finish

// Collect all results
let results: Vec<i32> = rx.iter().collect();

for h in handles { h.join().unwrap(); }

What This Unlocks

Key Differences

ConceptOCamlRust
Fan-out distributionShared `Queue.t` + N threads`Arc<Mutex<Iterator>>` โ€” workers compete
Fan-in collectionN threads โ†’ shared result queueN `Sender` clones โ†’ one `Receiver`
Result orderingManual index tracking`(id, result)` tuples
Worker countUser-managed`(0..N).map(...)`
Shutdown detectionSentinel valueAll `Sender` clones drop โ†’ `recv` returns `Err`
// 463. Fan-out / fan-in
use std::sync::{Arc, Mutex, mpsc};
use std::thread;

fn fan_map<T,U,F>(items: Vec<T>, n: usize, f: F) -> Vec<U>
where T:Send+'static, U:Send+'static, F:Fn(T)->U+Send+Sync+'static {
    let work = Arc::new(Mutex::new(items.into_iter()));
    let f    = Arc::new(f);
    let (tx,rx) = mpsc::channel::<U>();
    let ws: Vec<_> = (0..n).map(|_| {
        let (w,f,t) = (Arc::clone(&work),Arc::clone(&f),tx.clone());
        thread::spawn(move || loop {
            let item = w.lock().unwrap().next();
            match item { Some(x) => { let _=t.send(f(x)); } None => break }
        })
    }).collect();
    drop(tx);
    for w in ws { w.join().unwrap(); }
    rx.iter().collect()
}

fn main() {
    let mut r = fan_map((1..=12u64).collect(), 4, |x| x*x);
    r.sort(); println!("{:?}", r);

    // Fan-in demo: 4 producers โ†’ 1 channel
    let (tx,rx) = mpsc::channel::<String>();
    let ps: Vec<_> = (0..4).map(|id|{let tx=tx.clone(); thread::spawn(move || tx.send(format!("from-{}",id)).unwrap())}).collect();
    drop(tx);
    for p in ps { p.join().unwrap(); }
    let mut msgs: Vec<String>=rx.iter().collect(); msgs.sort();
    println!("{:?}", msgs);
}

#[cfg(test)]
mod tests {
    use super::*;
    #[test] fn test_fan_map() { let mut r=fan_map((1..=8u32).collect(),4,|x|x*2); r.sort(); assert_eq!(r,vec![2,4,6,8,10,12,14,16]); }
    #[test] fn test_all()    { assert_eq!(fan_map((0..100u32).collect(),8,|x|x).len(), 100); }
}
(* 463. Fan-out / fan-in โ€“ OCaml *)
let fan_map n f items =
  let wq=Queue.create () and rq=Queue.create () in
  let wm=Mutex.create () and rm=Mutex.create () in
  let rc=Condition.create () in
  List.iter (fun x->Queue.push x wq) items;
  let ws=Array.init n (fun _ -> Thread.create (fun () ->
    let go=ref true in while !go do
      Mutex.lock wm;
      if Queue.is_empty wq then (Mutex.unlock wm; go:=false)
      else let x=Queue.pop wq in (Mutex.unlock wm;
        let r=f x in Mutex.lock rm; Queue.push r rq; Condition.signal rc; Mutex.unlock rm)
    done) ()
  ) in
  Array.iter Thread.join ws;
  let results=ref [] in
  while not (Queue.is_empty rq) do results:=Queue.pop rq :: !results done;
  !results

let () =
  let r = fan_map 4 (fun x->x*x) (List.init 12 (fun i->i+1)) in
  Printf.printf "%s\n" (String.concat " " (List.map string_of_int (List.sort compare r)))