🦀 Functional Rust
🎬 Fearless Concurrency Threads, Arc>, channels — safe parallelism enforced by the compiler.
📝 Text version (for readers / accessibility)

• std::thread::spawn creates OS threads — closures must be Send + 'static

• Arc> provides shared mutable state across threads safely

• Channels (mpsc) enable message passing — multiple producers, single consumer

• Send and Sync marker traits enforce thread safety at compile time

• Data races are impossible — the type system prevents them before your code runs

446: Thread Pool — Reuse Threads for Amortised Concurrency

Difficulty: 3 Level: Intermediate Pre-spawn N worker threads that drain a shared job queue — eliminate per-task thread creation overhead and cap total concurrency.

The Problem This Solves

Spawning a thread for each unit of work has real cost: OS kernel call, stack allocation (typically 2–8 MB reserved), scheduler registration. For thousands of short tasks, this overhead dominates. A server spawning a thread per HTTP request will exhaust memory and kernel thread limits under load. The solution is a thread pool: create N threads once at startup, then reuse them. Work items (closures) are sent through a channel; workers dequeue and execute them. When a job finishes, the thread doesn't exit — it loops back and takes the next job. Total thread count is bounded; jobs queue when all workers are busy. The tricky part in Rust is sharing the `Receiver` among N workers. A `Receiver` is not `Clone` — `mpsc` is single-consumer by design. The solution is `Arc<Mutex<Receiver<Job>>>`: wrap the receiver so workers compete for jobs via a mutex. Exactly one worker dequeues each job; the OS scheduler naturally load-balances.

The Intuition

A thread pool is a restaurant kitchen. Instead of hiring a chef per order (expensive, chaotic), you hire N chefs at opening and give them a ticket system. Orders go on the rail; any free chef takes the next ticket. When all chefs are busy, tickets queue. When the restaurant closes, you wait for current orders to finish, then send the chefs home. In Java: `Executors.newFixedThreadPool(n)`. In Python: `ThreadPoolExecutor(max_workers=n)`. In Go: a buffered channel of goroutines. Rust's standard library doesn't include a thread pool, but building one from `mpsc` + `Arc<Mutex<>>` in ~30 lines illustrates the primitives cleanly. Production code uses `rayon` or `tokio`.

How It Works in Rust

use std::sync::{Arc, Mutex, mpsc};
use std::thread;

type Job = Box<dyn FnOnce() + Send + 'static>;

pub struct ThreadPool {
 workers: Vec<thread::JoinHandle<()>>,
 tx: Option<mpsc::Sender<Job>>,
}

impl ThreadPool {
 pub fn new(n: usize) -> Self {
     let (tx, rx) = mpsc::channel::<Job>();
     // Wrap Receiver so N workers can share it
     let rx = Arc::new(Mutex::new(rx));

     let workers = (0..n).map(|_| {
         let rx = Arc::clone(&rx);
         thread::spawn(move || loop {
             // Acquire lock, dequeue one job, release lock, execute job
             match rx.lock().unwrap().recv() {
                 Ok(job) => job(),        // run the closure
                 Err(_)  => break,        // channel closed — exit
             }
         })
     }).collect();

     ThreadPool { workers, tx: Some(tx) }
 }

 pub fn execute<F: FnOnce() + Send + 'static>(&self, f: F) {
     self.tx.as_ref().unwrap().send(Box::new(f)).unwrap();
 }
}

impl Drop for ThreadPool {
 fn drop(&mut self) {
     drop(self.tx.take()); // close channel — workers see Err and exit
     for w in self.workers.drain(..) {
         w.join().unwrap(); // wait for clean shutdown
     }
 }
}
The `Drop` impl is the shutdown protocol: drop the `Sender`, which closes the channel, which causes all `recv()` calls to return `Err`, which causes all workers to `break` their loop. Then `join()` waits for all of them. Dropping the pool blocks until all queued jobs complete.

What This Unlocks

Key Differences

ConceptOCamlRust
Job type`unit -> unit``Box<dyn FnOnce() + Send + 'static>`
Shared queue`Queue.t` + `Mutex` + `Condvar``mpsc::Receiver` in `Arc<Mutex<...>>`
Shutdownsentinel `None` or flagdrop `Sender` — workers see `Err` on `recv`
Backpressuremanual queue capacity check`mpsc::sync_channel(bound)` for bounded queue
Production use`Domainslib``rayon`, `tokio::task::spawn_blocking`
//! # Thread Pool Pattern — Reusable Worker Threads
//!
//! A pool of worker threads that process jobs from a shared queue,
//! avoiding the overhead of spawning threads per task.

use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};

/// A job is a boxed closure that runs once
type Job = Box<dyn FnOnce() + Send + 'static>;

/// Approach 1: Basic thread pool with channel-based job queue
pub struct ThreadPool {
    workers: Vec<JoinHandle<()>>,
    sender: Option<Sender<Job>>,
}

impl ThreadPool {
    /// Create a new thread pool with `size` workers
    pub fn new(size: usize) -> Self {
        assert!(size > 0, "Thread pool must have at least one worker");

        let (sender, receiver) = mpsc::channel::<Job>();
        let receiver = Arc::new(Mutex::new(receiver));

        let workers = (0..size)
            .map(|_id| {
                let rx = Arc::clone(&receiver);
                thread::spawn(move || loop {
                    let job = rx.lock().unwrap().recv();
                    match job {
                        Ok(job) => job(),
                        Err(_) => break, // Channel closed
                    }
                })
            })
            .collect();

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    /// Submit a job to be executed by a worker
    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);
        self.sender.as_ref().unwrap().send(job).unwrap();
    }

    /// Get the number of workers
    pub fn size(&self) -> usize {
        self.workers.len()
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        // Drop sender to close channel
        drop(self.sender.take());

        // Wait for all workers to finish
        for worker in self.workers.drain(..) {
            worker.join().unwrap();
        }
    }
}

/// Approach 2: Scoped thread pool for borrowed data
pub fn scoped_pool<T, R, F>(data: &[T], num_threads: usize, f: F) -> Vec<R>
where
    T: Sync,
    R: Send + Default + Clone,
    F: Fn(&T) -> R + Sync,
{
    let chunk_size = (data.len() + num_threads - 1) / num_threads;
    let mut results = vec![R::default(); data.len()];

    thread::scope(|s| {
        for (chunk_data, chunk_results) in data.chunks(chunk_size).zip(results.chunks_mut(chunk_size))
        {
            s.spawn(|| {
                for (input, output) in chunk_data.iter().zip(chunk_results.iter_mut()) {
                    *output = f(input);
                }
            });
        }
    });

    results
}

/// Approach 3: Simple parallel map using thread pool
pub fn parallel_map<T, U, F>(pool: &ThreadPool, data: Vec<T>, f: F) -> Vec<U>
where
    T: Send + 'static,
    U: Send + std::fmt::Debug + 'static,
    F: Fn(T) -> U + Send + Sync + Clone + 'static,
{
    let results: Arc<Mutex<Vec<Option<(usize, U)>>>> = Arc::new(Mutex::new(Vec::new()));

    for (i, item) in data.into_iter().enumerate() {
        let f = f.clone();
        let results = Arc::clone(&results);
        pool.execute(move || {
            let result = f(item);
            results.lock().unwrap().push(Some((i, result)));
        });
    }

    // Wait for results (this is a simplified approach)
    drop(pool);

    let mut collected: Vec<_> = Arc::try_unwrap(results)
        .unwrap()
        .into_inner()
        .unwrap()
        .into_iter()
        .flatten()
        .collect();

    collected.sort_by_key(|(i, _)| *i);
    collected.into_iter().map(|(_, v)| v).collect()
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::atomic::{AtomicUsize, Ordering};

    #[test]
    fn test_pool_executes_all_jobs() {
        let count = Arc::new(AtomicUsize::new(0));

        {
            let pool = ThreadPool::new(4);

            for _ in 0..10 {
                let count = Arc::clone(&count);
                pool.execute(move || {
                    count.fetch_add(1, Ordering::SeqCst);
                });
            }
        } // Pool dropped, all jobs complete

        assert_eq!(count.load(Ordering::SeqCst), 10);
    }

    #[test]
    fn test_pool_size() {
        let pool = ThreadPool::new(4);
        assert_eq!(pool.size(), 4);
    }

    #[test]
    fn test_multiple_pools() {
        let count = Arc::new(AtomicUsize::new(0));

        {
            let pool1 = ThreadPool::new(2);
            let pool2 = ThreadPool::new(2);

            for _ in 0..5 {
                let c = Arc::clone(&count);
                pool1.execute(move || {
                    c.fetch_add(1, Ordering::SeqCst);
                });
            }
            for _ in 0..5 {
                let c = Arc::clone(&count);
                pool2.execute(move || {
                    c.fetch_add(1, Ordering::SeqCst);
                });
            }
        }

        assert_eq!(count.load(Ordering::SeqCst), 10);
    }

    #[test]
    fn test_scoped_pool() {
        let data: Vec<i32> = (1..=10).collect();
        let results = scoped_pool(&data, 4, |x| x * x);
        assert_eq!(results, vec![1, 4, 9, 16, 25, 36, 49, 64, 81, 100]);
    }

    #[test]
    fn test_results_collected() {
        let results = Arc::new(Mutex::new(Vec::new()));

        {
            let pool = ThreadPool::new(2);
            for i in 0..5 {
                let r = Arc::clone(&results);
                pool.execute(move || {
                    r.lock().unwrap().push(i * i);
                });
            }
        }

        let mut collected = results.lock().unwrap().clone();
        collected.sort();
        assert_eq!(collected, vec![0, 1, 4, 9, 16]);
    }

    #[test]
    #[should_panic]
    fn test_zero_workers_panics() {
        let _ = ThreadPool::new(0);
    }
}
(* 446. Thread pool – OCaml *)
let make_pool n =
  let q = Queue.create () in
  let m = Mutex.create () in
  let c = Condition.create () in
  let stop = ref false in
  let workers = Array.init n (fun _ ->
    Thread.create (fun () ->
      let go = ref true in
      while !go do
        Mutex.lock m;
        while Queue.is_empty q && not !stop do Condition.wait c m done;
        if not (Queue.is_empty q) then
          let f = Queue.pop q in (Mutex.unlock m; f ())
        else (Mutex.unlock m; go := false)
      done) ()
  ) in
  let submit f = Mutex.lock m; Queue.push f q; Condition.signal c; Mutex.unlock m in
  let shutdown () =
    Mutex.lock m; stop := true; Condition.broadcast c; Mutex.unlock m;
    Array.iter Thread.join workers
  in
  (submit, shutdown)

let () =
  let (submit, shutdown) = make_pool 4 in
  for i = 1 to 8 do
    let n = i in submit (fun () -> Printf.printf "job %d\n%!" n)
  done;
  Thread.delay 0.05; shutdown ()