๐Ÿฆ€ Functional Rust
๐ŸŽฌ Fearless Concurrency Threads, Arc>, channels โ€” safe parallelism enforced by the compiler.
๐Ÿ“ Text version (for readers / accessibility)

โ€ข std::thread::spawn creates OS threads โ€” closures must be Send + 'static

โ€ข Arc> provides shared mutable state across threads safely

โ€ข Channels (mpsc) enable message passing โ€” multiple producers, single consumer

โ€ข Send and Sync marker traits enforce thread safety at compile time

โ€ข Data races are impossible โ€” the type system prevents them before your code runs

462: Pipeline Concurrency

Difficulty: 3 Level: Intermediate Chain N processing stages with channels so each stage runs concurrently in its own thread.

The Problem This Solves

Multi-step data transformation is common: parse raw bytes, validate, enrich from a database, serialize to JSON. If you run these steps sequentially on each item, you're wasting CPU โ€” stage 2 sits idle while stage 1 processes the next item. You want all stages running simultaneously, with each working on a different item in the stream. A naive solution spawns a thread pool and runs all stages in it. But that loses ordering guarantees and makes error propagation complicated. What you want is a pipeline: each stage has a dedicated thread, reads from an input channel, does its work, and writes to an output channel. The elegant property of this design is automatic shutdown propagation. When the source closes, stage 1 sees `Err` on `recv` and drops its output sender. Stage 2's `recv` then fails. The shutdown signal travels through the entire pipeline without any extra signaling code.

The Intuition

A pipeline is N threads, each owning an `rx`/`tx` pair, chained so one thread's output is the next thread's input โ€” the channels are the pipeline, and dropping propagates end-of-stream automatically. The trade-off: each stage adds latency (buffering), but the pipeline runs all stages in parallel, so throughput matches your slowest stage.

How It Works in Rust

use std::sync::mpsc;
use std::thread;

// Stage 1: produce numbers
let (tx1, rx1) = mpsc::channel::<i32>();
thread::spawn(move || {
 for i in 0..10 { tx1.send(i).unwrap(); }
 // dropping tx1 signals end of stream to stage 2
});

// Stage 2: double each value
let (tx2, rx2) = mpsc::channel::<i32>();
thread::spawn(move || {
 for item in rx1 {          // ends when tx1 drops
     tx2.send(item * 2).unwrap();
 }
 // dropping tx2 propagates end-of-stream to stage 3
});

// Stage 3: consume and print
let handle = thread::spawn(move || {
 for item in rx2 {          // ends when tx2 drops
     println!("{}", item);
 }
});

handle.join().unwrap();
Use `sync_channel(N)` instead of `channel()` for bounded stages with backpressure โ€” stage 3 being slow will throttle stage 2, which throttles stage 1.

What This Unlocks

Key Differences

ConceptOCamlRust
Stage threadThread + two shared queues`thread::spawn` with owned `rx` + `tx`
Stage connectionShared `Queue.t` ref`mpsc::channel` pair between stages
BackpressureManual capacity check`sync_channel(N)` blocks automatically
Shutdown propagationSentinel value or flagDrop final `Sender` โ†’ `Err` cascades
OwnershipShared by referenceEach stage owns its channels exclusively
// 462. Pipeline with channels
use std::sync::mpsc;
use std::thread;

fn pipe<T,U,F>(rx: mpsc::Receiver<T>, f: F) -> mpsc::Receiver<U>
where T:Send+'static, U:Send+'static, F:Fn(T)->U+Send+'static {
    let (tx,next)=mpsc::channel();
    thread::spawn(move || { for v in rx { if tx.send(f(v)).is_err() { break; } } });
    next
}

fn main() {
    let (src, rx) = mpsc::channel::<i64>();
    let rx = pipe(rx, |x| x * 2);
    let rx = pipe(rx, |x| x + 1);
    let rx = pipe(rx, |x: i64| format!("val={}", x));

    for i in 1..=5 { src.send(i).unwrap(); }
    drop(src);

    let results: Vec<String> = rx.iter().collect();
    println!("{:?}", results);
    // ["val=3","val=5","val=7","val=9","val=11"]
}

#[cfg(test)]
mod tests {
    use super::*;
    #[test] fn test_pipeline() {
        let (tx,rx)=mpsc::channel::<i64>();
        let rx=pipe(rx,|x| x*3);
        let rx=pipe(rx,|x| x-1);
        for i in 1..=4 { tx.send(i).unwrap(); } drop(tx);
        let r:Vec<i64>=rx.iter().collect();
        assert_eq!(r,vec![2,5,8,11]);
    }
}
(* 462. Pipeline โ€“ OCaml *)
let make_chan () =
  let q=Queue.create () let m=Mutex.create () let c=Condition.create () in
  let send v=Mutex.lock m; Queue.push (Some v) q; Condition.signal c; Mutex.unlock m in
  let close ()=Mutex.lock m; Queue.push None q; Condition.signal c; Mutex.unlock m in
  let recv ()=Mutex.lock m; while Queue.is_empty q do Condition.wait c m done;
    let v=Queue.pop q in Mutex.unlock m; v in
  (send,close,recv)

let stage f send2 close2 recv =
  Thread.create (fun () ->
    let rec loop () = match recv () with
      | None -> close2 ()
      | Some v -> send2 (f v); loop ()
    in loop ()
  ) ()

let () =
  let (s1,c1,r1)=make_chan () in
  let (s2,c2,r2)=make_chan () in
  let (s3,c3,r3)=make_chan () in
  let t1=stage (fun x->x*2) s2 c2 r1 in
  let t2=stage (fun x->x+1) s3 c3 r2 in
  for i=1 to 5 do s1 i done; c1 ();
  Thread.join t1; Thread.join t2;
  let rec collect () = match r3 () with None->[] | Some v->v::collect () in
  Printf.printf "%s\n" (String.concat " " (List.map string_of_int (collect ())))