๐Ÿฆ€ Functional Rust

991: Barrier Synchronization

Difficulty: Intermediate Category: Async / Concurrency FP Patterns Concept: Rendezvous point โ€” all N threads must arrive before any can continue Key Insight: `std::sync::Barrier` is reusable (resets after each rendezvous); `BarrierWaitResult::is_leader()` identifies the last thread to arrive โ€” useful for post-barrier coordination

Versions

DirectoryDescription
`std/`Standard library version using `std::sync`, `std::thread`
`tokio/`Tokio async runtime version using `tokio::sync`, `tokio::spawn`

Running

# Standard library version
cd std && cargo test

# Tokio version
cd tokio && cargo test
// 991: Barrier Synchronization
// Rust: std::sync::Barrier โ€” wait until N threads all arrive

use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Duration;

// --- Approach 1: Simple barrier โ€” all threads synchronize at one point ---
fn barrier_demo() -> (Vec<String>, Vec<String>) {
    let n = 5;
    let barrier = Arc::new(Barrier::new(n));
    let phase1_log = Arc::new(std::sync::Mutex::new(Vec::new()));
    let phase2_log = Arc::new(std::sync::Mutex::new(Vec::new()));

    let handles: Vec<_> = (0..n).map(|i| {
        let barrier = Arc::clone(&barrier);
        let p1 = Arc::clone(&phase1_log);
        let p2 = Arc::clone(&phase2_log);
        thread::spawn(move || {
            // Phase 1: independent work
            thread::sleep(Duration::from_millis(i as u64 * 2));
            p1.lock().unwrap().push(format!("p1:{}", i));

            // BARRIER โ€” blocks until all N threads arrive
            barrier.wait();

            // Phase 2: all start together after barrier
            p2.lock().unwrap().push(format!("p2:{}", i));
        })
    }).collect();

    for h in handles { h.join().unwrap(); }

    let p1 = phase1_log.lock().unwrap().clone();
    let p2 = phase2_log.lock().unwrap().clone();
    (p1, p2)
}

// --- Approach 2: Detect the "leader" (the last thread to arrive) ---
fn barrier_with_leader() -> Vec<bool> {
    let n = 4;
    let barrier = Arc::new(Barrier::new(n));
    let is_leader = Arc::new(std::sync::Mutex::new(Vec::new()));

    let handles: Vec<_> = (0..n).map(|_| {
        let barrier = Arc::clone(&barrier);
        let leaders = Arc::clone(&is_leader);
        thread::spawn(move || {
            let result = barrier.wait();
            // BarrierWaitResult::is_leader() is true for exactly one thread
            leaders.lock().unwrap().push(result.is_leader());
        })
    }).collect();

    for h in handles { h.join().unwrap(); }
    let x = is_leader.lock().unwrap().clone(); x
}

// --- Approach 3: Reusable barrier across multiple rounds ---
fn multi_round_barrier() -> Vec<usize> {
    let n = 3;
    let barrier = Arc::new(Barrier::new(n));
    let counts = Arc::new(std::sync::Mutex::new(vec![0usize; 2]));

    let handles: Vec<_> = (0..n).map(|_| {
        let barrier = Arc::clone(&barrier);
        let counts = Arc::clone(&counts);
        thread::spawn(move || {
            for round in 0..2 {
                counts.lock().unwrap()[round] += 1;
                barrier.wait(); // resets automatically after all arrive
            }
        })
    }).collect();

    for h in handles { h.join().unwrap(); }
    let x = counts.lock().unwrap().clone(); x
}


#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_barrier_both_phases_complete() {
        let (p1, p2) = barrier_demo();
        assert_eq!(p1.len(), 5);
        assert_eq!(p2.len(), 5);
    }

    #[test]
    fn test_exactly_one_leader() {
        let leaders = barrier_with_leader();
        assert_eq!(leaders.len(), 4);
        assert_eq!(leaders.iter().filter(|&&b| b).count(), 1);
    }

    #[test]
    fn test_reusable_barrier() {
        let rounds = multi_round_barrier();
        assert_eq!(rounds, vec![3, 3]); // all 3 threads counted in each round
    }

    #[test]
    fn test_barrier_new() {
        // Barrier of 1 passes immediately
        let b = Barrier::new(1);
        let result = b.wait();
        assert!(result.is_leader());
    }

    #[test]
    fn test_barrier_synchronizes_ordering() {
        // Ensure no thread reaches phase2 before all finish phase1
        let n = 4;
        let barrier = Arc::new(Barrier::new(n));
        let phase1_done = Arc::new(std::sync::Mutex::new(0usize));
        let error = Arc::new(std::sync::Mutex::new(false));

        let handles: Vec<_> = (0..n).map(|_| {
            let b = Arc::clone(&barrier);
            let done = Arc::clone(&phase1_done);
            let err = Arc::clone(&error);
            thread::spawn(move || {
                *done.lock().unwrap() += 1;
                b.wait();
                // After barrier, all must have finished phase1
                if *done.lock().unwrap() != n {
                    *err.lock().unwrap() = true;
                }
            })
        }).collect();

        for h in handles { h.join().unwrap(); }
        assert!(!*error.lock().unwrap());
    }
}
(* 991: Barrier Synchronization *)
(* Wait until N threads all reach the same point, then continue together *)

type barrier = {
  mutable count: int;
  total: int;
  mutable generation: int;
  m: Mutex.t;
  cond: Condition.t;
}

let make_barrier n = {
  count = 0;
  total = n;
  generation = 0;
  m = Mutex.create ();
  cond = Condition.create ();
}

let barrier_wait b =
  Mutex.lock b.m;
  let gen = b.generation in
  b.count <- b.count + 1;
  if b.count = b.total then begin
    (* Last thread to arrive โ€” wake all waiting *)
    b.count <- 0;
    b.generation <- b.generation + 1;
    Condition.broadcast b.cond;
    Mutex.unlock b.m
  end else begin
    (* Wait until generation changes *)
    while b.generation = gen do
      Condition.wait b.cond b.m
    done;
    Mutex.unlock b.m
  end

(* --- Approach 1: 5 threads synchronize at a barrier --- *)

let () =
  let n = 5 in
  let b = make_barrier n in
  let results = ref [] in
  let m = Mutex.create () in

  let threads = List.init n (fun i ->
    Thread.create (fun () ->
      (* Phase 1: each thread does independent work *)
      Unix.sleepf (float_of_int i *. 0.002);
      Mutex.lock m; results := (Printf.sprintf "p1:%d" i) :: !results; Mutex.unlock m;

      (* BARRIER: wait for all threads *)
      barrier_wait b;

      (* Phase 2: all threads start together *)
      Mutex.lock m; results := (Printf.sprintf "p2:%d" i) :: !results; Mutex.unlock m
    ) ()
  ) in
  List.iter Thread.join threads;

  let p1 = List.filter (fun s -> String.length s > 2 && s.[1] = '1') !results in
  let p2 = List.filter (fun s -> String.length s > 2 && s.[1] = '2') !results in
  assert (List.length p1 = 5);
  assert (List.length p2 = 5);
  Printf.printf "Approach 1 (barrier): %d phase1, %d phase2 items\n"
    (List.length p1) (List.length p2)

(* --- Approach 2: Reusable barrier (multiple rounds) --- *)

let () =
  let n = 3 in
  let b = make_barrier n in
  let round_results = Array.make 2 [] in
  let m = Mutex.create () in

  let threads = List.init n (fun i ->
    Thread.create (fun () ->
      for round = 0 to 1 do
        Mutex.lock m;
        round_results.(round) <- i :: round_results.(round);
        Mutex.unlock m;
        barrier_wait b
      done
    ) ()
  ) in
  List.iter Thread.join threads;
  assert (List.length round_results.(0) = 3);
  assert (List.length round_results.(1) = 3);
  Printf.printf "Approach 2 (reusable barrier): 2 rounds OK\n"

let () = Printf.printf "โœ“ All tests passed\n"

๐Ÿ“Š Detailed Comparison

Barrier Synchronization โ€” Comparison

Core Insight

A barrier is a collective synchronization point โ€” like a countdown latch where N threads each decrement, and all are released when it hits zero. Used in parallel algorithms where phases must complete before the next begins.

OCaml Approach

  • No built-in barrier โ€” simulate with `Mutex` + `Condition` + generation counter
  • `generation` counter prevents spurious wakeup confusion across rounds
  • `Condition.broadcast` wakes all waiting threads simultaneously
  • Reusable: increment `generation` and reset `count` atomically

Rust Approach

  • `std::sync::Barrier::new(n)` โ€” built-in, no boilerplate
  • `barrier.wait()` blocks until `n` threads have called it
  • Returns `BarrierWaitResult` โ€” `.is_leader()` is true for exactly one thread
  • Automatically resets โ€” reusable for multiple rounds
  • Thread-safe by design; panic-safe

Comparison Table

ConceptOCaml (simulated)Rust
CreateManual struct with mutex+condvar`Barrier::new(n)`
Wait at barrier`barrier_wait b``barrier.wait()`
Leader detectionNot built-in`result.is_leader()`
Reuse after triggerManual generation counterAutomatic
Prevent spurious wake`while gen = b.generation`Handled internally
Wake mechanism`Condition.broadcast`Internal (implementation-defined)
StdlibNoYes (`std::sync::Barrier`)

std vs tokio

Aspectstd versiontokio version
RuntimeOS threads via `std::thread`Async tasks on tokio runtime
Synchronization`std::sync::Mutex`, `Condvar``tokio::sync::Mutex`, channels
Channels`std::sync::mpsc` (unbounded)`tokio::sync::mpsc` (bounded, async)
BlockingThread blocks on lock/recvTask yields, runtime switches tasks
OverheadOne OS thread per taskMany tasks per thread (M:N)
Best forCPU-bound, simple concurrencyI/O-bound, high-concurrency servers