🦀 Functional Rust
🎬 Fearless Concurrency Threads, Arc>, channels — safe parallelism enforced by the compiler.
📝 Text version (for readers / accessibility)

• std::thread::spawn creates OS threads — closures must be Send + 'static

• Arc> provides shared mutable state across threads safely

• Channels (mpsc) enable message passing — multiple producers, single consumer

• Send and Sync marker traits enforce thread safety at compile time

• Data races are impossible — the type system prevents them before your code runs

443: Arc<Mutex<T>> — Shared Mutable State Across Threads

Difficulty: 3 Level: Intermediate Share a single mutable value across multiple threads using `Arc` for ownership and `Mutex` for exclusive access — the compile-time enforced lock pattern.

The Problem This Solves

Multiple threads writing to shared state is the source of most concurrency bugs. A counter incremented by 10 threads without coordination will silently produce the wrong total — because increment is three operations (read, add, write) and threads interleave arbitrarily. This is a data race: undefined behavior in C/C++, a runtime check failure in Java, an occasional wrong answer in Python (which only avoids the worst because of the GIL, at the cost of true parallelism). In languages with locks you write the correct-looking code and hope you remembered to acquire the lock, hope you don't hold it across a function that also acquires it (deadlock), and hope you release it even in error paths. In Rust, the `Mutex<T>` wraps the data itself — you cannot touch the data without going through the lock. The type system makes this structural: the value is inside the mutex. There's no way to "accidentally forget to lock" because you can't reach the data without calling `.lock()`. The `MutexGuard` that `.lock()` returns implements `Drop` — when it goes out of scope (including on panic, via unwinding), the lock is released. You cannot forget to unlock. The borrow checker ensures the guard's lifetime bounds all access to the inner value.

The Intuition

`Arc` is "Atomically Reference Counted" — like `Rc` but thread-safe. Clone the `Arc` to share ownership across threads; the value is freed when the last clone drops. `Mutex` is the lock: only one thread can hold the `MutexGuard` at a time. Together, `Arc<Mutex<T>>` is Rust's canonical "shared mutable state" pattern. In Python you'd write `lock = threading.Lock(); lock.acquire(); counter += 1; lock.release()` — data and lock are separate, bugs hide in the gap. In Java, `synchronized(obj)` locks on an arbitrary object. In Rust, the data IS inside the lock — there is no gap.

How It Works in Rust

use std::sync::{Arc, Mutex};
use std::thread;

let counter = Arc::new(Mutex::new(0u64)); // data lives inside the Mutex

let handles: Vec<_> = (0..10).map(|_| {
 let c = Arc::clone(&counter); // clone the Arc — increments ref count
 thread::spawn(move || {
     for _ in 0..100 {
         // lock() blocks until we hold the lock, returns MutexGuard
         // *guard dereferences to &mut u64
         *c.lock().unwrap() += 1;
         // guard drops here — lock released automatically
     }
 })
}).collect();

for h in handles { h.join().unwrap(); }
println!("{}", *counter.lock().unwrap()); // 1000 — guaranteed correct
`.unwrap()` on `.lock()` handles "poisoned" mutexes — a mutex is poisoned if a thread panics while holding it. In most cases, propagating the panic is correct; production code may call `.unwrap_or_else(|e| e.into_inner())` to recover the data.

What This Unlocks

Key Differences

ConceptOCamlRust
Shared ownershipGC automatic`Arc::new(...)` + `Arc::clone(&a)`
Lock`Mutex.lock m` (separate from data)`m.lock().unwrap()` — data IS in the mutex
Unlock`Mutex.unlock m` — manual`MutexGuard` drops automatically (RAII)
Forget to lockpossible — data and lock separateimpossible — data unreachable without `lock()`
PoisoningN/Apanic while holding = mutex poisoned
//! # Arc<Mutex<T>> — Shared Mutable State Across Threads
//!
//! Share a single mutable value across multiple threads using `Arc` for
//! ownership and `Mutex` for exclusive access.

use std::sync::{Arc, Mutex};
use std::thread;

/// Approach 1: Shared counter with multiple threads
pub fn parallel_increment(num_threads: usize, increments_per_thread: usize) -> u64 {
    let counter = Arc::new(Mutex::new(0u64));

    let handles: Vec<_> = (0..num_threads)
        .map(|_| {
            let c = Arc::clone(&counter);
            thread::spawn(move || {
                for _ in 0..increments_per_thread {
                    *c.lock().unwrap() += 1;
                }
            })
        })
        .collect();

    for h in handles {
        h.join().unwrap();
    }

    let result = *counter.lock().unwrap();
    result
}

/// Approach 2: Shared collection (Vec)
pub fn parallel_collect<T, F>(num_threads: usize, producer: F) -> Vec<T>
where
    T: Send + std::fmt::Debug + 'static,
    F: Fn(usize) -> T + Send + Sync + 'static + Clone,
{
    let results: Arc<Mutex<Vec<T>>> = Arc::new(Mutex::new(Vec::new()));

    let handles: Vec<_> = (0..num_threads)
        .map(|i| {
            let results = Arc::clone(&results);
            let producer = producer.clone();
            thread::spawn(move || {
                let value = producer(i);
                results.lock().unwrap().push(value);
            })
        })
        .collect();

    for h in handles {
        h.join().unwrap();
    }

    Arc::try_unwrap(results)
        .expect("all threads joined")
        .into_inner()
        .unwrap()
}

/// Approach 3: try_lock for non-blocking access
pub fn try_lock_demo() -> Option<u64> {
    let data = Arc::new(Mutex::new(42u64));
    let data_clone = Arc::clone(&data);

    // Hold the lock in main thread
    let _guard = data.lock().unwrap();

    // Another thread tries to get it
    let handle = thread::spawn(move || {
        // try_lock returns Err if lock is held
        match data_clone.try_lock() {
            Ok(mut guard) => {
                *guard += 1;
                Some(*guard)
            }
            Err(_) => None, // Lock was held
        }
    });

    handle.join().unwrap()
}

/// Thread-safe accumulator struct
pub struct SharedAccumulator {
    value: Arc<Mutex<i64>>,
}

impl SharedAccumulator {
    pub fn new(initial: i64) -> Self {
        Self {
            value: Arc::new(Mutex::new(initial)),
        }
    }

    pub fn add(&self, amount: i64) {
        *self.value.lock().unwrap() += amount;
    }

    pub fn get(&self) -> i64 {
        *self.value.lock().unwrap()
    }

    pub fn clone_handle(&self) -> Arc<Mutex<i64>> {
        Arc::clone(&self.value)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_parallel_increment_10_threads() {
        let result = parallel_increment(10, 100);
        assert_eq!(result, 1000);
    }

    #[test]
    fn test_parallel_increment_single_thread() {
        let result = parallel_increment(1, 500);
        assert_eq!(result, 500);
    }

    #[test]
    fn test_parallel_collect() {
        let mut results = parallel_collect(4, |i| format!("thread-{}", i));
        results.sort();
        assert_eq!(results.len(), 4);
        assert!(results.contains(&String::from("thread-0")));
        assert!(results.contains(&String::from("thread-3")));
    }

    #[test]
    fn test_try_lock_fails_when_held() {
        let result = try_lock_demo();
        assert_eq!(result, None);
    }

    #[test]
    fn test_try_lock_succeeds_when_free() {
        let m = Mutex::new(0);
        {
            let guard = m.try_lock();
            assert!(guard.is_ok());
        }
        // Lock released, try again
        assert!(m.try_lock().is_ok());
    }

    #[test]
    fn test_shared_accumulator() {
        let acc = SharedAccumulator::new(0);
        let handle = acc.clone_handle();

        thread::scope(|s| {
            s.spawn(|| {
                for _ in 0..100 {
                    *handle.lock().unwrap() += 1;
                }
            });
            s.spawn(|| {
                for _ in 0..100 {
                    acc.add(1);
                }
            });
        });

        assert_eq!(acc.get(), 200);
    }

    #[test]
    fn test_mutex_guard_drops_on_scope_exit() {
        let m = Mutex::new(vec![1, 2, 3]);
        {
            let mut guard = m.lock().unwrap();
            guard.push(4);
        } // guard drops here
        assert_eq!(m.lock().unwrap().len(), 4);
    }

    #[test]
    fn test_arc_clone_count() {
        let data = Arc::new(Mutex::new(0));
        assert_eq!(Arc::strong_count(&data), 1);

        let clone1 = Arc::clone(&data);
        assert_eq!(Arc::strong_count(&data), 2);

        drop(clone1);
        assert_eq!(Arc::strong_count(&data), 1);
    }
}
(* 443. Arc<Mutex<T>> – OCaml *)
let counter = ref 0
let mutex   = Mutex.create ()

let () =
  let threads = List.init 10 (fun _ ->
    Thread.create (fun () ->
      for _ = 1 to 100 do
        Mutex.lock mutex; incr counter; Mutex.unlock mutex
      done) ()
  ) in
  List.iter Thread.join threads;
  Printf.printf "Counter = %d (expected 1000)\n" !counter