// 991: Barrier Synchronization
// Rust: std::sync::Barrier โ wait until N threads all arrive
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Duration;
// --- Approach 1: Simple barrier โ all threads synchronize at one point ---
fn barrier_demo() -> (Vec<String>, Vec<String>) {
let n = 5;
let barrier = Arc::new(Barrier::new(n));
let phase1_log = Arc::new(std::sync::Mutex::new(Vec::new()));
let phase2_log = Arc::new(std::sync::Mutex::new(Vec::new()));
let handles: Vec<_> = (0..n).map(|i| {
let barrier = Arc::clone(&barrier);
let p1 = Arc::clone(&phase1_log);
let p2 = Arc::clone(&phase2_log);
thread::spawn(move || {
// Phase 1: independent work
thread::sleep(Duration::from_millis(i as u64 * 2));
p1.lock().unwrap().push(format!("p1:{}", i));
// BARRIER โ blocks until all N threads arrive
barrier.wait();
// Phase 2: all start together after barrier
p2.lock().unwrap().push(format!("p2:{}", i));
})
}).collect();
for h in handles { h.join().unwrap(); }
let p1 = phase1_log.lock().unwrap().clone();
let p2 = phase2_log.lock().unwrap().clone();
(p1, p2)
}
// --- Approach 2: Detect the "leader" (the last thread to arrive) ---
fn barrier_with_leader() -> Vec<bool> {
let n = 4;
let barrier = Arc::new(Barrier::new(n));
let is_leader = Arc::new(std::sync::Mutex::new(Vec::new()));
let handles: Vec<_> = (0..n).map(|_| {
let barrier = Arc::clone(&barrier);
let leaders = Arc::clone(&is_leader);
thread::spawn(move || {
let result = barrier.wait();
// BarrierWaitResult::is_leader() is true for exactly one thread
leaders.lock().unwrap().push(result.is_leader());
})
}).collect();
for h in handles { h.join().unwrap(); }
let x = is_leader.lock().unwrap().clone(); x
}
// --- Approach 3: Reusable barrier across multiple rounds ---
fn multi_round_barrier() -> Vec<usize> {
let n = 3;
let barrier = Arc::new(Barrier::new(n));
let counts = Arc::new(std::sync::Mutex::new(vec![0usize; 2]));
let handles: Vec<_> = (0..n).map(|_| {
let barrier = Arc::clone(&barrier);
let counts = Arc::clone(&counts);
thread::spawn(move || {
for round in 0..2 {
counts.lock().unwrap()[round] += 1;
barrier.wait(); // resets automatically after all arrive
}
})
}).collect();
for h in handles { h.join().unwrap(); }
let x = counts.lock().unwrap().clone(); x
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_barrier_both_phases_complete() {
let (p1, p2) = barrier_demo();
assert_eq!(p1.len(), 5);
assert_eq!(p2.len(), 5);
}
#[test]
fn test_exactly_one_leader() {
let leaders = barrier_with_leader();
assert_eq!(leaders.len(), 4);
assert_eq!(leaders.iter().filter(|&&b| b).count(), 1);
}
#[test]
fn test_reusable_barrier() {
let rounds = multi_round_barrier();
assert_eq!(rounds, vec![3, 3]); // all 3 threads counted in each round
}
#[test]
fn test_barrier_new() {
// Barrier of 1 passes immediately
let b = Barrier::new(1);
let result = b.wait();
assert!(result.is_leader());
}
#[test]
fn test_barrier_synchronizes_ordering() {
// Ensure no thread reaches phase2 before all finish phase1
let n = 4;
let barrier = Arc::new(Barrier::new(n));
let phase1_done = Arc::new(std::sync::Mutex::new(0usize));
let error = Arc::new(std::sync::Mutex::new(false));
let handles: Vec<_> = (0..n).map(|_| {
let b = Arc::clone(&barrier);
let done = Arc::clone(&phase1_done);
let err = Arc::clone(&error);
thread::spawn(move || {
*done.lock().unwrap() += 1;
b.wait();
// After barrier, all must have finished phase1
if *done.lock().unwrap() != n {
*err.lock().unwrap() = true;
}
})
}).collect();
for h in handles { h.join().unwrap(); }
assert!(!*error.lock().unwrap());
}
}
(* 991: Barrier Synchronization *)
(* Wait until N threads all reach the same point, then continue together *)
type barrier = {
mutable count: int;
total: int;
mutable generation: int;
m: Mutex.t;
cond: Condition.t;
}
let make_barrier n = {
count = 0;
total = n;
generation = 0;
m = Mutex.create ();
cond = Condition.create ();
}
let barrier_wait b =
Mutex.lock b.m;
let gen = b.generation in
b.count <- b.count + 1;
if b.count = b.total then begin
(* Last thread to arrive โ wake all waiting *)
b.count <- 0;
b.generation <- b.generation + 1;
Condition.broadcast b.cond;
Mutex.unlock b.m
end else begin
(* Wait until generation changes *)
while b.generation = gen do
Condition.wait b.cond b.m
done;
Mutex.unlock b.m
end
(* --- Approach 1: 5 threads synchronize at a barrier --- *)
let () =
let n = 5 in
let b = make_barrier n in
let results = ref [] in
let m = Mutex.create () in
let threads = List.init n (fun i ->
Thread.create (fun () ->
(* Phase 1: each thread does independent work *)
Unix.sleepf (float_of_int i *. 0.002);
Mutex.lock m; results := (Printf.sprintf "p1:%d" i) :: !results; Mutex.unlock m;
(* BARRIER: wait for all threads *)
barrier_wait b;
(* Phase 2: all threads start together *)
Mutex.lock m; results := (Printf.sprintf "p2:%d" i) :: !results; Mutex.unlock m
) ()
) in
List.iter Thread.join threads;
let p1 = List.filter (fun s -> String.length s > 2 && s.[1] = '1') !results in
let p2 = List.filter (fun s -> String.length s > 2 && s.[1] = '2') !results in
assert (List.length p1 = 5);
assert (List.length p2 = 5);
Printf.printf "Approach 1 (barrier): %d phase1, %d phase2 items\n"
(List.length p1) (List.length p2)
(* --- Approach 2: Reusable barrier (multiple rounds) --- *)
let () =
let n = 3 in
let b = make_barrier n in
let round_results = Array.make 2 [] in
let m = Mutex.create () in
let threads = List.init n (fun i ->
Thread.create (fun () ->
for round = 0 to 1 do
Mutex.lock m;
round_results.(round) <- i :: round_results.(round);
Mutex.unlock m;
barrier_wait b
done
) ()
) in
List.iter Thread.join threads;
assert (List.length round_results.(0) = 3);
assert (List.length round_results.(1) = 3);
Printf.printf "Approach 2 (reusable barrier): 2 rounds OK\n"
let () = Printf.printf "โ All tests passed\n"