๐Ÿฆ€ Functional Rust
๐ŸŽฌ Fearless Concurrency Threads, Arc>, channels โ€” safe parallelism enforced by the compiler.
๐Ÿ“ Text version (for readers / accessibility)

โ€ข std::thread::spawn creates OS threads โ€” closures must be Send + 'static

โ€ข Arc> provides shared mutable state across threads safely

โ€ข Channels (mpsc) enable message passing โ€” multiple producers, single consumer

โ€ข Send and Sync marker traits enforce thread safety at compile time

โ€ข Data races are impossible โ€” the type system prevents them before your code runs

328: Async Channels (mpsc)

Difficulty: 3 Level: Advanced Multi-producer, single-consumer channels let multiple tasks send messages to one receiver โ€” the safe, idiomatic way to communicate between concurrent workers.

The Problem This Solves

You have multiple threads (or async tasks) producing data โ€” log entries, events, computation results โ€” and you need to funnel them all into one place for processing. Shared mutable state (`Arc<Mutex<Vec<T>>>`) works but requires locking on every access, which creates contention. Channels are the alternative: no shared state, no locks, just message passing. The "multi-producer, single-consumer" design reflects real usage: many workers generate results, one aggregator collects them. The `Sender` is cheap to clone, so any number of producers can hold one. The `Receiver` is not cloneable โ€” exactly one place in your code processes incoming messages. When all `Sender`s are dropped, the channel closes and `recv()` returns an error, giving the consumer a clean signal to stop. This makes teardown easy and correct.

The Intuition

Go channels (`chan T`) are the most famous version of this pattern. The Rust `mpsc` is similar but more explicit about roles: `Sender<T>` and `Receiver<T>` are distinct types, and the asymmetry (many senders, one receiver) is baked into the API.
// Go: bidirectional channel, anyone can send or receive
ch := make(chan int)
go func() { ch <- 42 }()
val := <-ch
// Rust: explicit producer/consumer split
let (tx, rx) = mpsc::channel::<i32>();
let tx2 = tx.clone();  // clone for second producer
thread::spawn(move || tx.send(42).unwrap());
thread::spawn(move || tx2.send(99).unwrap());
let val = rx.recv().unwrap();  // only one receiver
In async Rust (tokio), `tokio::sync::mpsc` provides the awaitable version โ€” same concept, non-blocking.

How It Works in Rust

fn producer(tx: mpsc::Sender<String>, label: &'static str, n: usize, delay_ms: u64) {
 thread::spawn(move || {
     for i in 1..=n {
         thread::sleep(Duration::from_millis(delay_ms));
         tx.send(format!("{label}-{i}")).unwrap();  // send returns Err if receiver dropped
     }
     // tx dropped here โ€” this producer is done
 });
}

fn main() {
 let (tx, rx) = mpsc::channel::<String>();
 producer(tx.clone(), "A", 3, 10);   // clone tx for each producer
 producer(tx.clone(), "B", 3, 15);
 drop(tx);  // drop the original โ€” without this, rx.into_iter() would hang forever

 let msgs: Vec<String> = rx.into_iter().collect();  // blocks until all senders dropped
}
`drop(tx)` after cloning is the pattern. You clone before spawning each producer, then drop the original. When the last producer finishes and its cloned `tx` drops, the channel closes and `rx.into_iter()` completes.

What This Unlocks

Key Differences

ConceptOCamlRust
Create channel`Event.new_channel ()``mpsc::channel()` returns `(Sender, Receiver)`
Send`Event.send chan x``tx.send(val)` โ†’ `Result` (Err if receiver gone)
Receive`Event.receive chan``rx.recv()` โ†’ `Result` (Err if all senders dropped)
Multiple producersmanual synchronization`tx.clone()` โ€” `Sender` is `Clone`
Channel close signalN/Aall senders dropped โ†’ `recv()` returns `Err`
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn producer(tx: mpsc::Sender<String>, label: &'static str, n: usize, delay_ms: u64) {
    thread::spawn(move || {
        for i in 1..=n {
            thread::sleep(Duration::from_millis(delay_ms));
            tx.send(format!("{label}-{i}")).unwrap();
        }
    });
}

fn main() {
    let (tx, rx) = mpsc::channel::<String>();
    producer(tx.clone(), "A", 3, 10);
    producer(tx.clone(), "B", 3, 15);
    drop(tx);
    let mut msgs: Vec<String> = rx.into_iter().collect();
    msgs.sort();
    for m in &msgs { println!("Recv: {m}"); }
    println!("Total: {}", msgs.len());
}

#[cfg(test)]
mod tests {
    use super::*;
    #[test] fn collects_all() {
        let (tx, rx) = mpsc::channel::<i32>();
        let tx2 = tx.clone();
        thread::spawn(move || { for i in 0..5 { tx.send(i).unwrap(); } });
        thread::spawn(move || { for i in 5..10 { tx2.send(i).unwrap(); } });
        let mut msgs: Vec<i32> = rx.into_iter().collect();
        msgs.sort();
        assert_eq!(msgs, (0..10).collect::<Vec<_>>());
    }
    #[test] fn closes_on_drop() {
        let (tx, rx) = mpsc::channel::<i32>();
        drop(tx);
        assert!(rx.recv().is_err());
    }
}
(* OCaml: message passing with channels *)

let () =
  let ch = Event.new_channel () in
  let prod label n = Thread.create (fun () ->
    for i = 1 to n do Event.sync (Event.send ch (Printf.sprintf "%s-%d" label i)) done
  ) () in
  let _t1 = prod "A" 3 in
  let _t2 = prod "B" 3 in
  for _ = 1 to 6 do Printf.printf "Recv: %s\n" (Event.sync (Event.receive ch)) done