๐Ÿฆ€ Functional Rust
๐ŸŽฌ Fearless Concurrency Threads, Arc>, channels โ€” safe parallelism enforced by the compiler.
๐Ÿ“ Text version (for readers / accessibility)

โ€ข std::thread::spawn creates OS threads โ€” closures must be Send + 'static

โ€ข Arc> provides shared mutable state across threads safely

โ€ข Channels (mpsc) enable message passing โ€” multiple producers, single consumer

โ€ข Send and Sync marker traits enforce thread safety at compile time

โ€ข Data races are impossible โ€” the type system prevents them before your code runs

469: Parallel Fold/Reduce

Difficulty: 3 Level: Intermediate Split data across threads, reduce each chunk independently, then combine โ€” the MapReduce pattern in miniature.

The Problem This Solves

A sequential fold over a million-element list visits every item one after another. No matter how fast your closure is, you're using one core. If the reduction is expensive (hashing, parsing, heavy math), you're leaving 90% of your CPU idle. Parallel reduce exploits the associativity of the combining operation: if `f(a, f(b, c)) == f(f(a, b), c)`, then it doesn't matter what order you combine partial results. Split into N chunks, reduce each in parallel, combine the partial results at the end. Sum, product, maximum, minimum, string concatenation (with careful ordering), and histogram counting are all associative. If your operation qualifies, parallelising is straightforward.

The Intuition

Counting votes in an election. You don't have one person read every ballot. You divide the ballots into piles, give each pile to a counter, and when all counters are done, add the totals. The final sum is identical regardless of how you divided the piles โ€” because addition is associative.

How It Works in Rust

With Rayon (recommended):
use rayon::prelude::*;

let sum: i64 = data.par_iter()
 .map(|x| expensive_hash(x))
 .reduce(|| 0, |a, b| a + b);
Manual parallel reduce (for understanding the pattern): 1. Split โ€” divide the slice into `num_cpus` chunks. 2. Spawn threads โ€” each thread reduces its chunk to a partial result. 3. Collect and combine โ€” join all threads, fold the partial results.
let chunks = data.chunks(chunk_size);
let handles: Vec<_> = chunks.map(|chunk| {
    let chunk = chunk.to_vec();
    thread::spawn(move || chunk.iter().sum::<i64>())
}).collect();
let total: i64 = handles.into_iter().map(|h| h.join().unwrap()).sum();
Key constraint: the combining function must be associative. Rayon's `.reduce(identity, f)` takes an identity element (e.g. `0` for sum, `1` for product) and an associative `f`.

What This Unlocks

Key Differences

ConceptOCamlRust
Parallel reduce`Domainslib.Task.parallel_for` + shared refRayon `.par_iter().reduce()`
SafetyMutable shared state, careful codingOwnership prevents races
Identity elementExplicitExplicit (Rayon requires it)
Chunk managementManual or libraryRayon adaptive work-stealing
//! # Parallel Reduce โ€” Concurrent Aggregation
//!
//! Reduce collections in parallel for faster aggregation.

use std::thread;

/// Parallel reduce with custom operation
pub fn parallel_reduce<T>(data: &[T], identity: T, op: &(dyn Fn(T, T) -> T + Sync)) -> T
where
    T: Send + Sync + Clone,
{
    const THRESHOLD: usize = 100;

    if data.is_empty() {
        return identity;
    }

    if data.len() <= THRESHOLD {
        return data.iter().cloned().fold(identity, |a, b| op(a, b));
    }

    let mid = data.len() / 2;
    let (left, right) = data.split_at(mid);

    thread::scope(|s| {
        let id_left = identity.clone();
        let left_handle = s.spawn(|| parallel_reduce(left, id_left, op));
        let right_result = parallel_reduce(right, identity, op);
        op(left_handle.join().unwrap(), right_result)
    })
}

/// Parallel sum
pub fn parallel_sum<T>(data: &[T]) -> T
where
    T: Send + Sync + Clone + std::ops::Add<Output = T> + Default,
{
    parallel_reduce(data, T::default(), &|a, b| a + b)
}

/// Parallel product
pub fn parallel_product<T>(data: &[T]) -> T
where
    T: Send + Sync + Clone + std::ops::Mul<Output = T> + From<u8>,
{
    parallel_reduce(data, T::from(1), &|a, b| a * b)
}

/// Parallel min
pub fn parallel_min<T: Send + Sync + Clone + Ord>(data: &[T]) -> Option<T> {
    if data.is_empty() {
        return None;
    }
    Some(parallel_reduce(data, data[0].clone(), &|a, b| {
        if a < b { a } else { b }
    }))
}

/// Parallel max
pub fn parallel_max<T: Send + Sync + Clone + Ord>(data: &[T]) -> Option<T> {
    if data.is_empty() {
        return None;
    }
    Some(parallel_reduce(data, data[0].clone(), &|a, b| {
        if a > b { a } else { b }
    }))
}

/// Parallel all (conjunction)
pub fn parallel_all<T>(data: &[T], predicate: &(dyn Fn(&T) -> bool + Sync)) -> bool
where
    T: Sync,
{
    const THRESHOLD: usize = 100;

    if data.is_empty() {
        return true;
    }

    if data.len() <= THRESHOLD {
        return data.iter().all(predicate);
    }

    let mid = data.len() / 2;
    let (left, right) = data.split_at(mid);

    thread::scope(|s| {
        let left_handle = s.spawn(|| parallel_all(left, predicate));
        let right_result = parallel_all(right, predicate);
        left_handle.join().unwrap() && right_result
    })
}

/// Parallel any (disjunction)
pub fn parallel_any<T>(data: &[T], predicate: &(dyn Fn(&T) -> bool + Sync)) -> bool
where
    T: Sync,
{
    const THRESHOLD: usize = 100;

    if data.is_empty() {
        return false;
    }

    if data.len() <= THRESHOLD {
        return data.iter().any(predicate);
    }

    let mid = data.len() / 2;
    let (left, right) = data.split_at(mid);

    thread::scope(|s| {
        let left_handle = s.spawn(|| parallel_any(left, predicate));
        let right_result = parallel_any(right, predicate);
        left_handle.join().unwrap() || right_result
    })
}

/// Parallel count matching predicate
pub fn parallel_count<T>(data: &[T], predicate: &(dyn Fn(&T) -> bool + Sync)) -> usize
where
    T: Sync,
{
    const THRESHOLD: usize = 100;

    if data.len() <= THRESHOLD {
        return data.iter().filter(|x| predicate(x)).count();
    }

    let mid = data.len() / 2;
    let (left, right) = data.split_at(mid);

    thread::scope(|s| {
        let left_handle = s.spawn(|| parallel_count(left, predicate));
        let right_result = parallel_count(right, predicate);
        left_handle.join().unwrap() + right_result
    })
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_parallel_reduce_sum() {
        let data: Vec<i64> = (1..=1000).collect();
        let sum = parallel_reduce(&data, 0, &|a, b| a + b);
        assert_eq!(sum, 500500);
    }

    #[test]
    fn test_parallel_sum() {
        let data: Vec<i32> = (1..=100).collect();
        assert_eq!(parallel_sum(&data), 5050);
    }

    #[test]
    fn test_parallel_product() {
        let data: Vec<i64> = (1..=10).collect();
        assert_eq!(parallel_product(&data), 3628800); // 10!
    }

    #[test]
    fn test_parallel_min() {
        let data = vec![5, 2, 8, 1, 9, 3];
        assert_eq!(parallel_min(&data), Some(1));
    }

    #[test]
    fn test_parallel_max() {
        let data = vec![5, 2, 8, 1, 9, 3];
        assert_eq!(parallel_max(&data), Some(9));
    }

    #[test]
    fn test_parallel_all() {
        let data: Vec<i32> = (1..=100).collect();
        assert!(parallel_all(&data, &|x| *x > 0));
        assert!(!parallel_all(&data, &|x| *x > 50));
    }

    #[test]
    fn test_parallel_any() {
        let data: Vec<i32> = (1..=100).collect();
        assert!(parallel_any(&data, &|x| *x == 50));
        assert!(!parallel_any(&data, &|x| *x > 100));
    }

    #[test]
    fn test_parallel_count() {
        let data: Vec<i32> = (1..=100).collect();
        assert_eq!(parallel_count(&data, &|x| x % 2 == 0), 50);
    }

    #[test]
    fn test_empty_cases() {
        let empty: Vec<i32> = vec![];
        assert_eq!(parallel_sum(&empty), 0);
        assert_eq!(parallel_min(&empty), None);
        assert_eq!(parallel_max(&empty), None);
        assert!(parallel_all(&empty, &|_: &i32| false));
        assert!(!parallel_any(&empty, &|_: &i32| true));
    }
}
(* 469. Parallel reduce โ€“ OCaml *)
let parallel_reduce arr f id =
  let n=Array.length arr and nt=4 in
  let chunk=(n+nt-1)/nt in
  let parts=Array.make nt id in
  let ts=Array.init nt (fun t ->
    Thread.create (fun () ->
      let lo=t*chunk and hi=min n ((t+1)*chunk) in
      parts.(t) <- Array.fold_left f id (Array.sub arr lo (hi-lo))
    ) ()
  ) in
  Array.iter Thread.join ts;
  Array.fold_left f id parts

let () =
  let a=Array.init 10000 (fun i->i+1) in
  Printf.printf "sum=%d max=%d\n" (parallel_reduce a (+) 0) (parallel_reduce a max min_int)