463: Fan-Out / Fan-In Pattern
Tutorial
The Problem
One slow processing step can bottleneck an entire pipeline. Fan-out distributes work items from one source to N parallel workers; fan-in collects results from all N workers back to one consumer. Together, they parallelize the bottleneck stage without changing the serial stages around it. If one processing step takes 100ms and you have 8 cores, 8 parallel workers reduce the step's throughput contribution to ~12.5ms per item — 8x improvement.
Fan-out/fan-in appears in MapReduce frameworks, parallel database aggregations, web crawler link processing, batch ML inference, and any stage requiring horizontal scaling.
🎯 Learning Outcomes
Arc<Mutex<Iterator>> enables work stealing among workerstx clonesCode Example
//! Fan-out / fan-in: distribute work to N workers, collect results into one channel.
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
/// Apply `f` to each item using `n` worker threads, returning all results.
///
/// Workers share the input via `Arc<Mutex<Iterator>>` (work stealing) and
/// send outputs through an `mpsc` channel (fan-in). Result order is
/// non-deterministic since it reflects worker completion order.
pub fn fan_map<T, U, F>(items: Vec<T>, n: usize, f: F) -> Vec<U>
where
T: Send + 'static,
U: Send + 'static,
F: Fn(T) -> U + Send + Sync + 'static,
{
let work = Arc::new(Mutex::new(items.into_iter()));
let f = Arc::new(f);
let (tx, rx) = mpsc::channel::<U>();
let workers: Vec<_> = (0..n)
.map(|_| {
let work = Arc::clone(&work);
let f = Arc::clone(&f);
let tx = tx.clone();
thread::spawn(move || loop {
let next = work.lock().unwrap().next();
match next {
Some(x) => {
let _ = tx.send(f(x));
}
None => break,
}
})
})
.collect();
drop(tx);
for w in workers {
w.join().unwrap();
}
rx.iter().collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn squares_match_serial_map() {
let mut got = fan_map((1..=12u32).collect(), 4, |x| x * x);
got.sort();
let expected: Vec<u32> = (1..=12u32).map(|x| x * x).collect();
assert_eq!(got, expected);
}
#[test]
fn doubles_eight_items_four_workers() {
let mut r = fan_map((1..=8u32).collect(), 4, |x| x * 2);
r.sort();
assert_eq!(r, vec![2, 4, 6, 8, 10, 12, 14, 16]);
}
#[test]
fn preserves_count_for_large_input() {
assert_eq!(fan_map((0..100u32).collect(), 8, |x| x).len(), 100);
}
#[test]
fn empty_input_returns_empty() {
let r: Vec<u32> = fan_map(Vec::<u32>::new(), 4, |x| x + 1);
assert!(r.is_empty());
}
#[test]
fn more_workers_than_items_is_safe() {
let mut r = fan_map(vec![1, 2, 3], 16, |x: i32| x * 10);
r.sort();
assert_eq!(r, vec![10, 20, 30]);
}
#[test]
fn single_worker_behaves_like_map() {
let mut r = fan_map((1..=5u32).collect(), 1, |x| x + 100);
r.sort();
assert_eq!(r, vec![101, 102, 103, 104, 105]);
}
}Key Differences
Arc<Mutex<Iterator>> (work stealing); OCaml typically pre-distributes work (static partitioning).mpsc::channel collects results in completion order (non-deterministic); OCaml's List.map Domain.join collects in spawn order.Arc<Mutex<Iterator>>) handles variable-cost items better than static partitioning.rayon::par_iter().map(f).collect() is fan-out/fan-in in one operation; this manual implementation shows the underlying mechanism.OCaml Approach
OCaml's fan-out uses List.map (fun item -> Domain.spawn (fun () -> process item)) items in OCaml 5.x, then List.map Domain.join handles for fan-in. Domainslib.Task.parallel_for is the idiomatic OCaml 5.x approach. In OCaml 4.x, Thread.create with channels provides the same pattern. OCaml's list map naturally expresses the fan-out structure.
Full Source
//! Fan-out / fan-in: distribute work to N workers, collect results into one channel.
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
/// Apply `f` to each item using `n` worker threads, returning all results.
///
/// Workers share the input via `Arc<Mutex<Iterator>>` (work stealing) and
/// send outputs through an `mpsc` channel (fan-in). Result order is
/// non-deterministic since it reflects worker completion order.
pub fn fan_map<T, U, F>(items: Vec<T>, n: usize, f: F) -> Vec<U>
where
T: Send + 'static,
U: Send + 'static,
F: Fn(T) -> U + Send + Sync + 'static,
{
let work = Arc::new(Mutex::new(items.into_iter()));
let f = Arc::new(f);
let (tx, rx) = mpsc::channel::<U>();
let workers: Vec<_> = (0..n)
.map(|_| {
let work = Arc::clone(&work);
let f = Arc::clone(&f);
let tx = tx.clone();
thread::spawn(move || loop {
let next = work.lock().unwrap().next();
match next {
Some(x) => {
let _ = tx.send(f(x));
}
None => break,
}
})
})
.collect();
drop(tx);
for w in workers {
w.join().unwrap();
}
rx.iter().collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn squares_match_serial_map() {
let mut got = fan_map((1..=12u32).collect(), 4, |x| x * x);
got.sort();
let expected: Vec<u32> = (1..=12u32).map(|x| x * x).collect();
assert_eq!(got, expected);
}
#[test]
fn doubles_eight_items_four_workers() {
let mut r = fan_map((1..=8u32).collect(), 4, |x| x * 2);
r.sort();
assert_eq!(r, vec![2, 4, 6, 8, 10, 12, 14, 16]);
}
#[test]
fn preserves_count_for_large_input() {
assert_eq!(fan_map((0..100u32).collect(), 8, |x| x).len(), 100);
}
#[test]
fn empty_input_returns_empty() {
let r: Vec<u32> = fan_map(Vec::<u32>::new(), 4, |x| x + 1);
assert!(r.is_empty());
}
#[test]
fn more_workers_than_items_is_safe() {
let mut r = fan_map(vec![1, 2, 3], 16, |x: i32| x * 10);
r.sort();
assert_eq!(r, vec![10, 20, 30]);
}
#[test]
fn single_worker_behaves_like_map() {
let mut r = fan_map((1..=5u32).collect(), 1, |x| x + 100);
r.sort();
assert_eq!(r, vec![101, 102, 103, 104, 105]);
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn squares_match_serial_map() {
let mut got = fan_map((1..=12u32).collect(), 4, |x| x * x);
got.sort();
let expected: Vec<u32> = (1..=12u32).map(|x| x * x).collect();
assert_eq!(got, expected);
}
#[test]
fn doubles_eight_items_four_workers() {
let mut r = fan_map((1..=8u32).collect(), 4, |x| x * 2);
r.sort();
assert_eq!(r, vec![2, 4, 6, 8, 10, 12, 14, 16]);
}
#[test]
fn preserves_count_for_large_input() {
assert_eq!(fan_map((0..100u32).collect(), 8, |x| x).len(), 100);
}
#[test]
fn empty_input_returns_empty() {
let r: Vec<u32> = fan_map(Vec::<u32>::new(), 4, |x| x + 1);
assert!(r.is_empty());
}
#[test]
fn more_workers_than_items_is_safe() {
let mut r = fan_map(vec![1, 2, 3], 16, |x: i32| x * 10);
r.sort();
assert_eq!(r, vec![10, 20, 30]);
}
#[test]
fn single_worker_behaves_like_map() {
let mut r = fan_map((1..=5u32).collect(), 1, |x| x + 100);
r.sort();
assert_eq!(r, vec![101, 102, 103, 104, 105]);
}
}
Exercises
fan_map to return results in the same order as the input. Hint: include an index with each work item and sort results by index after fan-in.