๐Ÿฆ€ Functional Rust

990: Semaphore

Difficulty: Intermediate Category: Async / Concurrency FP Patterns Concept: Counting semaphore to limit concurrent access to a resource Key Insight: `Mutex<usize>` + `Condvar` = semaphore; `acquire` decrements and waits when zero, `release` increments and wakes a waiter โ€” classic Dijkstra P/V in Rust

Versions

DirectoryDescription
`std/`Standard library version using `std::sync`, `std::thread`
`tokio/`Tokio async runtime version using `tokio::sync`, `tokio::spawn`

Running

# Standard library version
cd std && cargo test

# Tokio version
cd tokio && cargo test
// 990: Semaphore via Mutex<usize> + Condvar
// Counting semaphore: limit N concurrent operations

use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;

struct Semaphore {
    count: Mutex<usize>,
    cond: Condvar,
    max: usize,
}

impl Semaphore {
    fn new(n: usize) -> Self {
        Semaphore {
            count: Mutex::new(n),
            cond: Condvar::new(),
            max: n,
        }
    }

    fn acquire(&self) {
        let mut count = self.count.lock().unwrap();
        while *count == 0 {
            count = self.cond.wait(count).unwrap();
        }
        *count -= 1;
    }

    fn release(&self) {
        let mut count = self.count.lock().unwrap();
        if *count < self.max {
            *count += 1;
            self.cond.notify_one();
        }
    }

    fn with_permit<T, F: FnOnce() -> T>(&self, f: F) -> T {
        self.acquire();
        let result = f();
        self.release();
        result
    }
}

// --- Approach 1: Limit concurrent workers ---
fn limited_concurrency() -> usize {
    let sem = Arc::new(Semaphore::new(3));
    let active = Arc::new(Mutex::new(0usize));
    let max_active = Arc::new(Mutex::new(0usize));

    let handles: Vec<_> = (0..10).map(|_| {
        let sem = Arc::clone(&sem);
        let active = Arc::clone(&active);
        let max_active = Arc::clone(&max_active);
        thread::spawn(move || {
            sem.with_permit(|| {
                {
                    let mut a = active.lock().unwrap();
                    *a += 1;
                    let mut m = max_active.lock().unwrap();
                    if *a > *m { *m = *a; }
                }
                thread::sleep(Duration::from_millis(5));
                *active.lock().unwrap() -= 1;
            });
        })
    }).collect();

    for h in handles { h.join().unwrap(); }
    let x = *max_active.lock().unwrap(); x
}

// --- Approach 2: Binary semaphore as mutex ---
fn binary_semaphore_counter() -> u32 {
    let sem = Arc::new(Semaphore::new(1));
    let counter = Arc::new(Mutex::new(0u32));

    let handles: Vec<_> = (0..5).map(|_| {
        let sem = Arc::clone(&sem);
        let counter = Arc::clone(&counter);
        thread::spawn(move || {
            for _ in 0..100 {
                sem.with_permit(|| {
                    *counter.lock().unwrap() += 1;
                });
            }
        })
    }).collect();

    for h in handles { h.join().unwrap(); }
    let x = *counter.lock().unwrap(); x
}

// --- Approach 3: Drain a resource pool ---
fn resource_pool_demo() -> Vec<usize> {
    const POOL_SIZE: usize = 2;
    let sem = Arc::new(Semaphore::new(POOL_SIZE));
    let usage_log = Arc::new(Mutex::new(Vec::new()));

    let handles: Vec<_> = (0..6).map(|i| {
        let sem = Arc::clone(&sem);
        let log = Arc::clone(&usage_log);
        thread::spawn(move || {
            sem.with_permit(|| {
                log.lock().unwrap().push(i);
                thread::sleep(Duration::from_millis(2));
            });
        })
    }).collect();

    for h in handles { h.join().unwrap(); }
    let mut log = usage_log.lock().unwrap().clone();
    log.sort();
    log
}


#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_limited_concurrency() {
        let max = limited_concurrency();
        assert!(max <= 3, "max concurrent was {}, expected โ‰ค 3", max);
        assert!(max >= 1);
    }

    #[test]
    fn test_binary_semaphore_correctness() {
        assert_eq!(binary_semaphore_counter(), 500);
    }

    #[test]
    fn test_resource_pool() {
        let log = resource_pool_demo();
        assert_eq!(log.len(), 6);
        assert_eq!(log, vec![0, 1, 2, 3, 4, 5]);
    }

    #[test]
    fn test_semaphore_acquire_release() {
        let sem = Semaphore::new(2);
        sem.acquire();
        sem.acquire();
        // Can't acquire a third โ€” release one
        sem.release();
        sem.acquire(); // should succeed
        sem.release();
        sem.release();
    }

    #[test]
    fn test_semaphore_permits_count() {
        let sem = Semaphore::new(3);
        assert_eq!(*sem.count.lock().unwrap(), 3);
        sem.acquire();
        assert_eq!(*sem.count.lock().unwrap(), 2);
        sem.release();
        assert_eq!(*sem.count.lock().unwrap(), 3);
    }
}
(* 990: Semaphore via Mutex + Condvar *)
(* Counting semaphore: allow at most N concurrent operations *)

type semaphore = {
  mutable count: int;
  max_count: int;
  m: Mutex.t;
  cond: Condition.t;
}

let make_semaphore n = {
  count = n;
  max_count = n;
  m = Mutex.create ();
  cond = Condition.create ();
}

let acquire sem =
  Mutex.lock sem.m;
  while sem.count = 0 do
    Condition.wait sem.cond sem.m
  done;
  sem.count <- sem.count - 1;
  Mutex.unlock sem.m

let release sem =
  Mutex.lock sem.m;
  if sem.count < sem.max_count then begin
    sem.count <- sem.count + 1;
    Condition.signal sem.cond
  end;
  Mutex.unlock sem.m

let with_semaphore sem f =
  acquire sem;
  let result = (try f () with e -> release sem; raise e) in
  release sem;
  result

(* --- Approach 1: Limit concurrent workers to 3 --- *)

let () =
  let sem = make_semaphore 3 in
  let active = ref 0 in
  let max_active = ref 0 in
  let m = Mutex.create () in

  let threads = List.init 10 (fun i ->
    Thread.create (fun () ->
      with_semaphore sem (fun () ->
        Mutex.lock m;
        incr active;
        if !active > !max_active then max_active := !active;
        Mutex.unlock m;

        (* simulate work *)
        Unix.sleepf 0.005;

        Mutex.lock m;
        decr active;
        Mutex.unlock m;

        Printf.printf "worker %d done\n" i
      )
    ) ()
  ) in
  List.iter Thread.join threads;
  assert (!max_active <= 3);
  Printf.printf "Approach 1: max concurrent = %d (โ‰ค 3)\n" !max_active

(* --- Approach 2: Binary semaphore (mutex-like) --- *)

let () =
  let sem = make_semaphore 1 in
  let counter = ref 0 in
  let threads = List.init 5 (fun _ ->
    Thread.create (fun () ->
      for _ = 1 to 100 do
        with_semaphore sem (fun () -> incr counter)
      done
    ) ()
  ) in
  List.iter Thread.join threads;
  assert (!counter = 500);
  Printf.printf "Approach 2 (binary semaphore): counter = %d\n" !counter

let () = Printf.printf "โœ“ All tests passed\n"

๐Ÿ“Š Detailed Comparison

Semaphore โ€” Comparison

Core Insight

A semaphore is a generalized mutex: where a mutex allows 1 concurrent holder, a semaphore allows N. Both OCaml and Rust implement it the same way โ€” an integer protected by a mutex, with threads waiting on a condition variable when the count hits zero.

OCaml Approach

  • `count: int` protected by `Mutex.t` + `Condition.t`
  • `acquire`: lock, wait while `count = 0`, decrement, unlock
  • `release`: lock, increment, signal, unlock
  • `with_semaphore` bracket pattern for exception safety
  • No built-in semaphore in OCaml's stdlib โ€” always custom

Rust Approach

  • `Mutex<usize>` for count, `Condvar` for waiting
  • `acquire`: lock guard, `while *count == 0 { count = cond.wait(count) }`, decrement
  • `release`: lock, increment, `notify_one()`
  • `with_permit(f)` RAII-style wrapper
  • External crates (tokio, parking_lot) provide optimized async semaphores

Comparison Table

ConceptOCamlRust
Count storage`mutable count: int``Mutex<usize>`
Wait mechanism`Condition.wait cond m``cond.wait(guard).unwrap()`
Signal waiter`Condition.signal cond``cond.notify_one()`
Bracket acquire`with_semaphore sem f``sem.with_permit(f)`
Binary mode`make_semaphore 1``Semaphore::new(1)`
Built into stdlibNoNo (use parking_lot or tokio)
Overflow guard`if count < max_count``if *count < self.max`

std vs tokio

Aspectstd versiontokio version
RuntimeOS threads via `std::thread`Async tasks on tokio runtime
Synchronization`std::sync::Mutex`, `Condvar``tokio::sync::Mutex`, channels
Channels`std::sync::mpsc` (unbounded)`tokio::sync::mpsc` (bounded, async)
BlockingThread blocks on lock/recvTask yields, runtime switches tasks
OverheadOne OS thread per taskMany tasks per thread (M:N)
Best forCPU-bound, simple concurrencyI/O-bound, high-concurrency servers