🦀 Functional Rust

458: Barrier — Rendezvous All Threads Before Proceeding

Difficulty: 3 Level: Intermediate Synchronise N threads at a checkpoint — all must arrive before any can proceed, with one elected leader per phase for coordination work.

The Problem This Solves

Parallel algorithms often have phases: initialise in parallel, then all threads must finish before computing in parallel, then all must finish before aggregating. If thread 3 starts phase 2 before thread 1 finishes phase 1, it operates on incomplete data — a race condition with no obvious error message, just silently wrong results. The naive fix is for the main thread to `join` all workers, then re-spawn them for the next phase. This works but is expensive: thread creation cost multiplied by number of phases. A `Barrier` solves this cleanly: threads reach the barrier and block until the last one arrives, then all proceed simultaneously. No re-spawning, no separate coordination thread. The leader mechanism addresses a common need: one thread should print phase headers, accumulate partial results, or reset state between phases. Electing a leader via a mutex or channel is boilerplate. `Barrier::wait()` returns a `BarrierWaitResult` where exactly one thread gets `is_leader() == true` per phase — built-in, no extra code.

The Intuition

A `Barrier::new(n)` is a gate that only opens when `n` threads are waiting at it. The last thread to arrive swings the gate open and all threads proceed. Then the barrier resets for the next use. It's the parallel equivalent of waiting for everyone to sit down before starting dinner — you can use the same dinner table (barrier) for multiple meals (phases). In Java: `CyclicBarrier`. In Python: `threading.Barrier`. In Go: no built-in — you'd build one from a `WaitGroup` per phase (not reusable like `Barrier`). Rust's `std::sync::Barrier` is reusable across phases automatically.

How It Works in Rust

use std::sync::{Arc, Barrier};
use std::thread;

let n = 4;
let barrier = Arc::new(Barrier::new(n)); // all n threads must wait

let handles: Vec<_> = (0..n).map(|id| {
 let b = Arc::clone(&barrier);
 thread::spawn(move || {
     for phase in 1..=3 {
         // Do phase work (different amounts per thread — fast or slow)
         do_phase_work(id, phase);

         // Block here until all n threads arrive
         let result = b.wait();

         // Exactly one thread per phase gets is_leader() == true
         if result.is_leader() {
             println!("=== phase {} complete — all threads ready ===", phase);
             // Safe to aggregate results, log, reset shared state, etc.
         }
         // All threads continue here simultaneously
     }
 })
}).collect();

for h in handles { h.join().unwrap(); }
The barrier automatically resets after each `wait()` cycle — no `reset()` call needed. This makes it directly reusable for multi-phase algorithms without rebuilding it.

What This Unlocks

Key Differences

ConceptOCamlRust
Barriermanual `Mutex` + `Condvar` + counter`Barrier::new(n)`
Waitblock until count == n, then reset`barrier.wait()` → `BarrierWaitResult`
Leader electionmanual (first to unlock, or external)`result.is_leader()` — exactly one per phase, built-in
Reuse across phasesnew barrier or manual resetautomatic — same `Barrier` reused across all phases
Java equivalent`CyclicBarrier``std::sync::Barrier`
// 458. Barrier for thread synchronization
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Duration;

fn main() {
    let n = 4;
    let b = Arc::new(Barrier::new(n));
    let hs: Vec<_> = (0..n).map(|id| {
        let b=Arc::clone(&b);
        thread::spawn(move || {
            for ph in 1..=3 {
                thread::sleep(Duration::from_millis(5*(id as u64+1)));
                let r = b.wait();
                if r.is_leader() { println!("=== phase {} ===", ph); }
            }
            println!("thread {} done", id);
        })
    }).collect();
    for h in hs { h.join().unwrap(); }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::atomic::{AtomicUsize,Ordering};
    #[test] fn test_all_arrive() {
        let n=4; let b=Arc::new(Barrier::new(n)); let c=Arc::new(AtomicUsize::new(0));
        thread::scope(|s| { for _ in 0..n { let b=Arc::clone(&b); let c=Arc::clone(&c); s.spawn(move || { c.fetch_add(1,Ordering::SeqCst); b.wait(); assert_eq!(c.load(Ordering::SeqCst),n); }); } });
    }
    #[test] fn test_one_leader() {
        let n=5; let b=Arc::new(Barrier::new(n)); let leaders=Arc::new(AtomicUsize::new(0));
        thread::scope(|s| { for _ in 0..n { let b=Arc::clone(&b); let l=Arc::clone(&leaders); s.spawn(move || { if b.wait().is_leader() { l.fetch_add(1,Ordering::SeqCst); } }); } });
        assert_eq!(leaders.load(Ordering::SeqCst),1);
    }
}
(* 458. Barrier – OCaml manual *)
let make_barrier n =
  let cnt=ref 0 let m=Mutex.create () let c=Condition.create () let gen=ref 0 in
  (fun () ->
    Mutex.lock m; incr cnt;
    let g = !gen in
    if !cnt=n then (cnt:=0; incr gen; Condition.broadcast c)
    else (while !gen=g do Condition.wait c m done);
    Mutex.unlock m)

let () =
  let n=4 in let b=make_barrier n in
  let ts = Array.init n (fun id ->
    Thread.create (fun () ->
      for ph=1 to 3 do
        Thread.delay (0.005 *. float_of_int (id+1));
        b ();
        if id=0 then Printf.printf "phase %d done\n%!" ph
      done) ()
  ) in Array.iter Thread.join ts