๐Ÿฆ€ Functional Rust
๐ŸŽฌ Fearless Concurrency Threads, Arc>, channels โ€” safe parallelism enforced by the compiler.
๐Ÿ“ Text version (for readers / accessibility)

โ€ข std::thread::spawn creates OS threads โ€” closures must be Send + 'static

โ€ข Arc> provides shared mutable state across threads safely

โ€ข Channels (mpsc) enable message passing โ€” multiple producers, single consumer

โ€ข Send and Sync marker traits enforce thread safety at compile time

โ€ข Data races are impossible โ€” the type system prevents them before your code runs

337: Async Mutex

Difficulty: 3 Level: Advanced Lock shared state safely across async tasks โ€” `std::sync::Mutex` panics when held across `.await`.

The Problem This Solves

In synchronous code, `std::sync::Mutex` is the right tool for protecting shared mutable state across threads. But in async code, holding a `std::sync::Mutex` guard across an `.await` point causes a problem: the thread might switch to executing a different task while you hold the lock. If that other task also tries to lock the same mutex โ€” deadlock. Worse, tokio's single-threaded executor panics immediately with "cannot lock a mutex on the current thread." The rule is: never hold a `std::sync::Mutex` guard across an `.await`. If you need to hold a lock across an async operation, you need an async-aware mutex (like `tokio::sync::Mutex`) whose `lock().await` suspends the task rather than blocking the thread. This example demonstrates the correct patterns: release the lock before awaiting, use `std::sync::Mutex` safely with short critical sections, and recover from mutex poisoning.

The Intuition

Compare to JavaScript: there's no mutex in JS because it's single-threaded. But if Rust's async executor is also single-threaded (tokio's `current_thread` runtime), holding a sync lock across an await is a logic error โ€” you've suspended the only thread while it holds the lock. Nothing can release it. Python's `asyncio.Lock` is the async equivalent: `async with lock: await something()` โ€” it yields the event loop while waiting to acquire, rather than blocking the thread.

How It Works in Rust

// CORRECT: release the lock before awaiting
fn correct_pattern() {
 let shared = Arc::new(Mutex::new(vec![1i32, 2, 3]));

 // The braces create a scope โ€” guard drops when scope exits
 let sum = { shared.lock().unwrap().iter().sum::<i32>() };
 //        ^-- guard drops here, BEFORE any .await                ^
 println!("Sum: {sum}");

 // WRONG (commented out):
 // let guard = shared.lock().unwrap();
 // some_async_fn().await;   // guard still held โ€” DEADLOCK or PANIC
}

// Poison recovery: if a thread panics while holding the lock, it "poisons" it
match m.lock() {
 Ok(v)  => println!("Ok: {v}"),
 Err(p) => println!("Recovered: {}", p.into_inner()), // access data anyway
}
For cross-await locking in real async code, use `tokio::sync::Mutex`:
let mutex = Arc::new(tokio::sync::Mutex::new(0));
let mut guard = mutex.lock().await;  // suspends task, not thread
*guard += 1;
// guard can safely be held across other .await points here

What This Unlocks

Key Differences

ConceptOCamlRust
Sync mutex`Mutex.create()` / `Mutex.lock` (blocks thread)`std::sync::Mutex` (blocks thread)
Async mutex`Lwt_mutex.create()` / `Lwt_mutex.lock m >>= ...``tokio::sync::Mutex::lock().await`
PoisonDoesn't exist โ€” no thread panics corrupt mutex`PoisonError` if thread panics while locked
Guard scope`Mutex.unlock m` (explicit)RAII: guard drops at end of scope
//! # Async Mutex
//!
//! Lock shared state safely across async tasks โ€” demonstrates correct patterns
//! for using `std::sync::Mutex` and avoiding deadlocks across await points.

use std::sync::{Arc, Mutex};
use std::thread;

/// Demonstrates concurrent increments with a mutex.
pub fn concurrent_increment(num_threads: usize) -> i32 {
    let counter = Arc::new(Mutex::new(0));

    let handles: Vec<_> = (0..num_threads)
        .map(|_| {
            let c = Arc::clone(&counter);
            thread::spawn(move || {
                *c.lock().unwrap() += 1;
            })
        })
        .collect();

    for h in handles {
        h.join().unwrap();
    }

    let result = *counter.lock().unwrap();
    result
}

/// Demonstrates the correct pattern: release lock before doing other work.
pub fn correct_lock_pattern(data: Vec<i32>) -> i32 {
    let shared = Arc::new(Mutex::new(data));

    // CORRECT: compute value inside a scope, guard drops at scope end
    let sum = {
        let guard = shared.lock().unwrap();
        guard.iter().sum::<i32>()
    }; // guard drops here, lock released BEFORE any other work

    sum
}

/// Demonstrates safe read-modify-write pattern.
pub fn safe_update<F>(mutex: &Mutex<i32>, f: F) -> i32
where
    F: FnOnce(i32) -> i32,
{
    let mut guard = mutex.lock().unwrap();
    *guard = f(*guard);
    *guard
}

/// Demonstrates poison recovery after a panic.
pub fn with_poison_recovery(mutex: &Mutex<i32>) -> Result<i32, i32> {
    match mutex.lock() {
        Ok(guard) => Ok(*guard),
        Err(poisoned) => {
            // Recover by accessing the data anyway
            let recovered = poisoned.into_inner();
            Err(*recovered)
        }
    }
}

/// A thread-safe counter using Mutex.
pub struct Counter {
    value: Mutex<i32>,
}

impl Counter {
    pub fn new(initial: i32) -> Self {
        Self {
            value: Mutex::new(initial),
        }
    }

    pub fn increment(&self) -> i32 {
        let mut guard = self.value.lock().unwrap();
        *guard += 1;
        *guard
    }

    pub fn get(&self) -> i32 {
        *self.value.lock().unwrap()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_concurrent_increment() {
        assert_eq!(concurrent_increment(10), 10);
    }

    #[test]
    fn test_high_contention() {
        let counter = Arc::new(Mutex::new(0));
        let handles: Vec<_> = (0..100)
            .map(|_| {
                let c = Arc::clone(&counter);
                thread::spawn(move || {
                    *c.lock().unwrap() += 1;
                })
            })
            .collect();

        for h in handles {
            h.join().unwrap();
        }

        assert_eq!(*counter.lock().unwrap(), 100);
    }

    #[test]
    fn test_correct_lock_pattern() {
        let sum = correct_lock_pattern(vec![1, 2, 3, 4, 5]);
        assert_eq!(sum, 15);
    }

    #[test]
    fn test_safe_update() {
        let m = Mutex::new(10);
        let result = safe_update(&m, |x| x * 2);
        assert_eq!(result, 20);
    }

    #[test]
    fn test_counter() {
        let counter = Counter::new(0);
        assert_eq!(counter.increment(), 1);
        assert_eq!(counter.increment(), 2);
        assert_eq!(counter.get(), 2);
    }
}
(* OCaml: Mutex usage *)

let counter = ref 0
let mutex = Mutex.create ()

let increment () =
  Mutex.lock mutex;
  let v = !counter in
  counter := v+1;
  Mutex.unlock mutex

let () =
  let ts = List.init 10 (fun _ -> Thread.create (fun () -> increment ()) ()) in
  List.iter Thread.join ts;
  Printf.printf "Counter: %d\n" !counter