๐Ÿฆ€ Functional Rust

982: Join Parallel Async

Difficulty: Beginner Category: Async / Concurrency FP Patterns Concept: Run computations in parallel and wait for all to complete Key Insight: `thread::spawn` + `.join()` pair is the std equivalent of `Lwt.both`; results arrive when the last one finishes

Versions

DirectoryDescription
`std/`Standard library version using `std::sync`, `std::thread`
`tokio/`Tokio async runtime version using `tokio::sync`, `tokio::spawn`

Running

# Standard library version
cd std && cargo test

# Tokio version
cd tokio && cargo test
// 982: Join Parallel Async
// Rust: std::thread::spawn + join() โ€” like OCaml's Lwt.both

use std::thread;

// --- Approach 1: Join two threads (Lwt.both analogue) ---
fn parallel_both<A, B, F1, F2>(f1: F1, f2: F2) -> (A, B)
where
    A: Send + 'static,
    B: Send + 'static,
    F1: FnOnce() -> A + Send + 'static,
    F2: FnOnce() -> B + Send + 'static,
{
    let h1 = thread::spawn(f1);
    let h2 = thread::spawn(f2);
    // Both run concurrently; join waits for both
    let a = h1.join().expect("thread 1 panicked");
    let b = h2.join().expect("thread 2 panicked");
    (a, b)
}

// --- Approach 2: Join N tasks and collect results ---
fn parallel_map<T, F>(tasks: Vec<F>) -> Vec<T>
where
    T: Send + 'static,
    F: FnOnce() -> T + Send + 'static,
{
    let handles: Vec<_> = tasks.into_iter().map(thread::spawn).collect();
    handles.into_iter().map(|h| h.join().expect("task panicked")).collect()
}

// --- Approach 3: Parallel sum ---
fn parallel_sum(ns: Vec<i32>) -> i32 {
    let handles: Vec<_> = ns.into_iter()
        .map(|n| thread::spawn(move || n * n))
        .collect();
    handles.into_iter()
        .map(|h| h.join().unwrap())
        .sum()
}


#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_parallel_both() {
        let (a, b) = parallel_both(|| 6 * 7, || 10 + 20);
        assert_eq!(a, 42);
        assert_eq!(b, 30);
    }

    #[test]
    fn test_parallel_map() {
        let mut results = parallel_map(vec![
            Box::new(|| 2 + 2) as Box<dyn FnOnce() -> i32 + Send>,
            Box::new(|| 3 * 3),
            Box::new(|| 10 - 1),
        ]);
        results.sort(); // order may vary
        assert_eq!(results, vec![4, 9, 9]);
    }

    #[test]
    fn test_parallel_sum() {
        // 1+4+9+16 = 30
        assert_eq!(parallel_sum(vec![1, 2, 3, 4]), 30);
    }

    #[test]
    fn test_both_independent() {
        // Results don't depend on order
        let (x, y) = parallel_both(|| "hello", || 42u32);
        assert_eq!(x, "hello");
        assert_eq!(y, 42);
    }

    #[test]
    fn test_empty_parallel_map() {
        let results: Vec<i32> = parallel_map::<i32, fn() -> i32>(vec![]);
        assert!(results.is_empty());
    }
}
(* 982: Join Parallel Async *)
(* OCaml: Lwt.both p1 p2 runs them "concurrently" and waits for both *)

(* --- Approach 1: Simulate Lwt.both with threads --- *)

let parallel_both f1 f2 =
  let t1 = Thread.create f1 () in
  let t2 = Thread.create f2 () in
  (* In Lwt: Lwt.both returns (v1, v2) when both resolve *)
  (* With threads, we join both *)
  Thread.join t1;
  Thread.join t2

let result1 = ref 0
let result2 = ref 0

let () =
  parallel_both
    (fun () -> result1 := 6 * 7)
    (fun () -> result2 := 10 + 20);
  assert (!result1 = 42);
  assert (!result2 = 30);
  Printf.printf "Approach 1 (parallel threads): %d, %d\n" !result1 !result2

(* --- Approach 2: Lwt.both concept โ€” collect results via mutex --- *)

let compute_parallel tasks =
  let m = Mutex.create () in
  let results = Array.make (List.length tasks) 0 in
  let threads = List.mapi (fun i f ->
    Thread.create (fun () ->
      let v = f () in
      Mutex.lock m;
      results.(i) <- v;
      Mutex.unlock m
    ) ()
  ) tasks in
  List.iter Thread.join threads;
  Array.to_list results

let () =
  let tasks = [
    (fun () -> 2 + 2);
    (fun () -> 3 * 3);
    (fun () -> 10 - 1);
  ] in
  let results = compute_parallel tasks in
  assert (results = [4; 9; 9]);
  Printf.printf "Approach 2 (parallel collect): [%s]\n"
    (String.concat "; " (List.map string_of_int results))

(* --- Approach 3: Join all, sum results --- *)

let parallel_sum ns =
  let total = ref 0 in
  let m = Mutex.create () in
  let threads = List.map (fun n ->
    Thread.create (fun () ->
      (* simulate work: n * n *)
      let v = n * n in
      Mutex.lock m;
      total := !total + v;
      Mutex.unlock m
    ) ()
  ) ns in
  List.iter Thread.join threads;
  !total

let () =
  (* 1^2 + 2^2 + 3^2 + 4^2 = 1+4+9+16 = 30 *)
  let s = parallel_sum [1;2;3;4] in
  assert (s = 30);
  Printf.printf "Approach 3 (parallel sum): %d\n" s

let () = Printf.printf "โœ“ All tests passed\n"

๐Ÿ“Š Detailed Comparison

Join Parallel Async โ€” Comparison

Core Insight

`Lwt.both` and `thread::spawn + join` both express "run two things concurrently, wait for both". The key difference: Lwt uses cooperative concurrency on one thread; Rust `std::thread` uses OS threads with true parallelism.

OCaml Approach

  • `Lwt.both p1 p2` runs both promises on the event loop concurrently
  • Returns `(v1, v2)` when both resolve
  • `Lwt.all [p1; p2; p3]` for N promises
  • Cooperative โ€” yields at I/O points, single-threaded
  • For true parallelism: OCaml 5 Domains or `Thread` + mutexes

Rust Approach

  • `thread::spawn(f)` starts a real OS thread, returns `JoinHandle<T>`
  • `handle.join()` blocks until the thread completes, returns `Result<T>`
  • True parallelism โ€” all cores can be used simultaneously
  • `Vec<JoinHandle>` pattern for N parallel tasks
  • `Send + 'static` bounds ensure data is safe to transfer

Comparison Table

ConceptOCaml (Lwt)Rust
Run two in parallel`Lwt.both p1 p2``spawn(f1); spawn(f2); join both`
Run N in parallel`Lwt.all [p1; p2; ...]``tasks.map(spawn).map(join)`
Wait for result`let* (a,b) = Lwt.both ...``h.join().unwrap()`
Concurrency modelCooperative / event loopTrue parallelism (OS threads)
Error propagation`Lwt_result.both``h.join()` returns `Result`
Data sharingShared heap (GC)`Send + 'static` + `Arc`

std vs tokio

Aspectstd versiontokio version
RuntimeOS threads via `std::thread`Async tasks on tokio runtime
Synchronization`std::sync::Mutex`, `Condvar``tokio::sync::Mutex`, channels
Channels`std::sync::mpsc` (unbounded)`tokio::sync::mpsc` (bounded, async)
BlockingThread blocks on lock/recvTask yields, runtime switches tasks
OverheadOne OS thread per taskMany tasks per thread (M:N)
Best forCPU-bound, simple concurrencyI/O-bound, high-concurrency servers