🦀 Functional Rust
🎬 Fearless Concurrency Threads, Arc>, channels — safe parallelism enforced by the compiler.
📝 Text version (for readers / accessibility)

• std::thread::spawn creates OS threads — closures must be Send + 'static

• Arc> provides shared mutable state across threads safely

• Channels (mpsc) enable message passing — multiple producers, single consumer

• Send and Sync marker traits enforce thread safety at compile time

• Data races are impossible — the type system prevents them before your code runs

445: MPSC Channels — Message Passing Between Threads

Difficulty: 3 Level: Intermediate Send values across threads with `std::sync::mpsc` — multiple producers, one consumer, with automatic shutdown when all senders drop.

The Problem This Solves

Shared mutable state (`Arc<Mutex<T>>`) is one concurrency model, but it requires every thread to coordinate on access. It scales poorly when threads have different roles: producers that generate work and a consumer that processes it. Shared state forces both sides to synchronise on every operation, creating contention. The alternative is message passing: producers don't share data with the consumer — they send owned values through a channel. No locks, no shared memory, no coordination beyond the channel itself. The consumer processes messages one at a time in a clean sequential loop. This is the model Go popularised with goroutines and channels, and it maps directly to actor systems (Erlang, Akka). The critical operational question is: when does the consumer stop? With shared state you need a sentinel value or an external flag. With `mpsc`, the answer is elegant: when all `Sender` clones are dropped, the channel closes and `recv()` returns `Err`. The consumer loop exits naturally. No sentinel, no flag, no race on "was the last message sent?".

The Intuition

A `mpsc` channel is a thread-safe queue. The `Sender` end can be cloned and given to as many threads as you like — they all push values in. The `Receiver` end is unique — only one consumer. Values arrive in FIFO order (though producers interleave non-deterministically). `recv()` blocks; `try_recv()` and `try_iter()` don't. In Python: `queue.Queue()` with `put`/`get`. In Go: `ch := make(chan T)` with `ch <- v` and `<-ch`. The Rust version gives you type safety (the channel carries a specific `T`) and automatic close signaling via drop.

How It Works in Rust

use std::sync::mpsc;
use std::thread;

let (tx, rx) = mpsc::channel::<String>();

// Multiple producers — clone the Sender for each thread
let handles: Vec<_> = (0..3).map(|id| {
 let tx = tx.clone(); // clone increments sender count
 thread::spawn(move || {
     for i in 0..5 {
         tx.send(format!("p{}-msg{}", id, i)).unwrap();
     }
     // tx drops here — sender count decremented
 })
}).collect();

// Drop the original tx — channel closes when ALL clones drop
drop(tx);

// for-loop on Receiver: iterates until channel closes
for msg in rx {
 println!("got: {}", msg);
}
// Loop exits when last Sender drops — no sentinel needed

for h in handles { h.join().unwrap(); }

// Non-blocking drain — collect all buffered messages
let (tx2, rx2) = mpsc::channel::<u32>();
for i in 0..5 { tx2.send(i).unwrap(); }
drop(tx2);
let all: Vec<u32> = rx2.try_iter().collect(); // non-blocking
The crucial line is `drop(tx)` after cloning. If you forget it, the channel never closes — `for msg in rx` loops forever waiting for the original sender that will never send.

What This Unlocks

Key Differences

ConceptOCamlRust
Channel creation`Queue.create ()` + manual Mutex + Condvar`let (tx, rx) = mpsc::channel()`
Sendpush + signal`tx.send(v).unwrap()`
Receive (blocking)`Condition.wait``rx.recv().unwrap()`
Receive (non-blocking)manual `try``rx.try_recv()` or `rx.try_iter()`
Shutdown signalsentinel `None` or external flagdrop all `Sender` clones — `recv()` returns `Err`
Multiple producersmanual clone/synchronise`tx.clone()` — built-in
// 445. Multi-producer single-consumer channels
use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel::<String>();

    // Multiple producers — clone the sender
    let handles: Vec<_> = (0..3).map(|id| {
        let tx = tx.clone();
        thread::spawn(move || {
            for i in 1..=5 { tx.send(format!("p{}-msg{}", id, i)).unwrap(); }
        })
    }).collect();
    drop(tx); // drop original — channel closes when all clones drop

    // Consumer: for-loop exits when channel closes
    for msg in rx { println!("got: {}", msg); }
    for h in handles { h.join().unwrap(); }

    // try_iter — non-blocking drain
    let (tx2, rx2) = mpsc::channel::<u32>();
    for i in 0..5 { tx2.send(i).unwrap(); }
    drop(tx2);
    let v: Vec<u32> = rx2.try_iter().collect();
    println!("drained: {:?}", v);
}

#[cfg(test)]
mod tests {
    use super::*;
    #[test] fn test_send_recv() { let (t,r)=mpsc::channel(); t.send(42u32).unwrap(); assert_eq!(r.recv().unwrap(),42); }
    #[test] fn test_closed()    { let (t,r)=mpsc::channel::<i32>(); drop(t); assert!(r.recv().is_err()); }
    #[test] fn test_multi()     {
        let (t,r)=mpsc::channel::<u32>();
        let hs:Vec<_>=(0..4).map(|i|{let t=t.clone(); thread::spawn(move || t.send(i).unwrap())}).collect();
        drop(t); let mut v:Vec<u32>=r.iter().collect(); v.sort(); assert_eq!(v,vec![0,1,2,3]);
        for h in hs { h.join().unwrap(); }
    }
}
(* 445. MPSC channels – OCaml Queue+Mutex+Condition *)
let q=Queue.create () let m=Mutex.create () let c=Condition.create ()
let send v = Mutex.lock m; Queue.push v q; Condition.signal c; Mutex.unlock m
let recv () = Mutex.lock m; while Queue.is_empty q do Condition.wait c m done;
  let v=Queue.pop q in Mutex.unlock m; v

let () =
  let producers = List.init 3 (fun id ->
    Thread.create (fun () ->
      for i=1 to 5 do send (Printf.sprintf "p%d-msg%d" id i) done
    ) ()
  ) in
  let consumer = Thread.create (fun () ->
    for _ = 1 to 15 do Printf.printf "got: %s\n%!" (recv ()) done
  ) () in
  List.iter Thread.join producers; Thread.join consumer