// 990: Semaphore via Mutex<usize> + Condvar
// Counting semaphore: limit N concurrent operations
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;
struct Semaphore {
count: Mutex<usize>,
cond: Condvar,
max: usize,
}
impl Semaphore {
fn new(n: usize) -> Self {
Semaphore {
count: Mutex::new(n),
cond: Condvar::new(),
max: n,
}
}
fn acquire(&self) {
let mut count = self.count.lock().unwrap();
while *count == 0 {
count = self.cond.wait(count).unwrap();
}
*count -= 1;
}
fn release(&self) {
let mut count = self.count.lock().unwrap();
if *count < self.max {
*count += 1;
self.cond.notify_one();
}
}
fn with_permit<T, F: FnOnce() -> T>(&self, f: F) -> T {
self.acquire();
let result = f();
self.release();
result
}
}
// --- Approach 1: Limit concurrent workers ---
fn limited_concurrency() -> usize {
let sem = Arc::new(Semaphore::new(3));
let active = Arc::new(Mutex::new(0usize));
let max_active = Arc::new(Mutex::new(0usize));
let handles: Vec<_> = (0..10).map(|_| {
let sem = Arc::clone(&sem);
let active = Arc::clone(&active);
let max_active = Arc::clone(&max_active);
thread::spawn(move || {
sem.with_permit(|| {
{
let mut a = active.lock().unwrap();
*a += 1;
let mut m = max_active.lock().unwrap();
if *a > *m { *m = *a; }
}
thread::sleep(Duration::from_millis(5));
*active.lock().unwrap() -= 1;
});
})
}).collect();
for h in handles { h.join().unwrap(); }
let x = *max_active.lock().unwrap(); x
}
// --- Approach 2: Binary semaphore as mutex ---
fn binary_semaphore_counter() -> u32 {
let sem = Arc::new(Semaphore::new(1));
let counter = Arc::new(Mutex::new(0u32));
let handles: Vec<_> = (0..5).map(|_| {
let sem = Arc::clone(&sem);
let counter = Arc::clone(&counter);
thread::spawn(move || {
for _ in 0..100 {
sem.with_permit(|| {
*counter.lock().unwrap() += 1;
});
}
})
}).collect();
for h in handles { h.join().unwrap(); }
let x = *counter.lock().unwrap(); x
}
// --- Approach 3: Drain a resource pool ---
fn resource_pool_demo() -> Vec<usize> {
const POOL_SIZE: usize = 2;
let sem = Arc::new(Semaphore::new(POOL_SIZE));
let usage_log = Arc::new(Mutex::new(Vec::new()));
let handles: Vec<_> = (0..6).map(|i| {
let sem = Arc::clone(&sem);
let log = Arc::clone(&usage_log);
thread::spawn(move || {
sem.with_permit(|| {
log.lock().unwrap().push(i);
thread::sleep(Duration::from_millis(2));
});
})
}).collect();
for h in handles { h.join().unwrap(); }
let mut log = usage_log.lock().unwrap().clone();
log.sort();
log
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_limited_concurrency() {
let max = limited_concurrency();
assert!(max <= 3, "max concurrent was {}, expected โค 3", max);
assert!(max >= 1);
}
#[test]
fn test_binary_semaphore_correctness() {
assert_eq!(binary_semaphore_counter(), 500);
}
#[test]
fn test_resource_pool() {
let log = resource_pool_demo();
assert_eq!(log.len(), 6);
assert_eq!(log, vec![0, 1, 2, 3, 4, 5]);
}
#[test]
fn test_semaphore_acquire_release() {
let sem = Semaphore::new(2);
sem.acquire();
sem.acquire();
// Can't acquire a third โ release one
sem.release();
sem.acquire(); // should succeed
sem.release();
sem.release();
}
#[test]
fn test_semaphore_permits_count() {
let sem = Semaphore::new(3);
assert_eq!(*sem.count.lock().unwrap(), 3);
sem.acquire();
assert_eq!(*sem.count.lock().unwrap(), 2);
sem.release();
assert_eq!(*sem.count.lock().unwrap(), 3);
}
}
(* 990: Semaphore via Mutex + Condvar *)
(* Counting semaphore: allow at most N concurrent operations *)
type semaphore = {
mutable count: int;
max_count: int;
m: Mutex.t;
cond: Condition.t;
}
let make_semaphore n = {
count = n;
max_count = n;
m = Mutex.create ();
cond = Condition.create ();
}
let acquire sem =
Mutex.lock sem.m;
while sem.count = 0 do
Condition.wait sem.cond sem.m
done;
sem.count <- sem.count - 1;
Mutex.unlock sem.m
let release sem =
Mutex.lock sem.m;
if sem.count < sem.max_count then begin
sem.count <- sem.count + 1;
Condition.signal sem.cond
end;
Mutex.unlock sem.m
let with_semaphore sem f =
acquire sem;
let result = (try f () with e -> release sem; raise e) in
release sem;
result
(* --- Approach 1: Limit concurrent workers to 3 --- *)
let () =
let sem = make_semaphore 3 in
let active = ref 0 in
let max_active = ref 0 in
let m = Mutex.create () in
let threads = List.init 10 (fun i ->
Thread.create (fun () ->
with_semaphore sem (fun () ->
Mutex.lock m;
incr active;
if !active > !max_active then max_active := !active;
Mutex.unlock m;
(* simulate work *)
Unix.sleepf 0.005;
Mutex.lock m;
decr active;
Mutex.unlock m;
Printf.printf "worker %d done\n" i
)
) ()
) in
List.iter Thread.join threads;
assert (!max_active <= 3);
Printf.printf "Approach 1: max concurrent = %d (โค 3)\n" !max_active
(* --- Approach 2: Binary semaphore (mutex-like) --- *)
let () =
let sem = make_semaphore 1 in
let counter = ref 0 in
let threads = List.init 5 (fun _ ->
Thread.create (fun () ->
for _ = 1 to 100 do
with_semaphore sem (fun () -> incr counter)
done
) ()
) in
List.iter Thread.join threads;
assert (!counter = 500);
Printf.printf "Approach 2 (binary semaphore): counter = %d\n" !counter
let () = Printf.printf "โ All tests passed\n"