๐Ÿฆ€ Functional Rust
๐ŸŽฌ Fearless Concurrency Threads, Arc>, channels โ€” safe parallelism enforced by the compiler.
๐Ÿ“ Text version (for readers / accessibility)

โ€ข std::thread::spawn creates OS threads โ€” closures must be Send + 'static

โ€ข Arc> provides shared mutable state across threads safely

โ€ข Channels (mpsc) enable message passing โ€” multiple producers, single consumer

โ€ข Send and Sync marker traits enforce thread safety at compile time

โ€ข Data races are impossible โ€” the type system prevents them before your code runs

984: Channel Pipeline

Difficulty: Intermediate Category: Async / Concurrency FP Patterns Concept: Unix-pipe style composition โ€” each stage is a thread consuming from one channel and producing to the next Key Insight: `pipeline_stage(rx, f)` returns a new `Receiver` โ€” stages compose like function composition; the channel itself is the glue

Versions

DirectoryDescription
`std/`Standard library version using `std::sync`, `std::thread`
`tokio/`Tokio async runtime version using `tokio::sync`, `tokio::spawn`

Running

# Standard library version
cd std && cargo test

# Tokio version
cd tokio && cargo test
// 984: Channel Pipeline
// Chain of processing stages via mpsc channels

use std::sync::mpsc;
use std::thread;

// --- Build a pipeline stage: read from rx, apply f, send to tx ---
fn pipeline_stage<T, U, F>(rx: mpsc::Receiver<T>, f: F) -> mpsc::Receiver<U>
where
    T: Send + 'static,
    U: Send + 'static,
    F: Fn(T) -> U + Send + 'static,
{
    let (tx_out, rx_out) = mpsc::channel();
    thread::spawn(move || {
        for item in rx.iter() {      // iter() stops when channel closes
            tx_out.send(f(item)).unwrap();
        }
        // tx_out drops here โ†’ closes next stage
    });
    rx_out
}

// --- Build a full pipeline from a Vec of boxed functions ---
fn run_pipeline(inputs: Vec<i32>) -> Vec<String> {
    let (tx_source, rx0) = mpsc::channel::<i32>();

    // Stage 1: double
    let rx1 = pipeline_stage(rx0, |x| x * 2);
    // Stage 2: add 1
    let rx2 = pipeline_stage(rx1, |x| x + 1);
    // Stage 3: to string
    let rx3 = pipeline_stage(rx2, |x: i32| x.to_string());

    // Producer
    let producer = thread::spawn(move || {
        for v in inputs {
            tx_source.send(v).unwrap();
        }
        // tx_source drops โ†’ closes pipeline
    });

    // Collect results
    let results: Vec<String> = rx3.iter().collect();
    producer.join().unwrap();
    results
}

// --- Parameterised N-stage pipeline ---
fn run_n_stages(inputs: Vec<i32>, stages: Vec<Box<dyn Fn(i32) -> i32 + Send + 'static>>) -> Vec<i32> {
    let (tx_source, mut current_rx) = mpsc::channel::<i32>();

    for f in stages {
        current_rx = pipeline_stage(current_rx, f);
    }

    let producer = thread::spawn(move || {
        for v in inputs {
            tx_source.send(v).unwrap();
        }
    });

    let results: Vec<i32> = current_rx.iter().collect();
    producer.join().unwrap();
    results
}

fn main() {
    let r = run_pipeline(vec![1, 2, 3, 4, 5]);
    println!("3-stage pipeline: {:?}", r);

    let stages: Vec<Box<dyn Fn(i32) -> i32 + Send + 'static>> = vec![
        Box::new(|x| x + 10),
        Box::new(|x| x * 3),
        Box::new(|x| x - 1),
    ];
    let r2 = run_n_stages(vec![1, 2, 3], stages);
    println!("n-stage pipeline: {:?}", r2);
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_pipeline_3_stages() {
        let results = run_pipeline(vec![1, 2, 3, 4, 5]);
        // 1->2->3, 2->4->5, 3->6->7, 4->8->9, 5->10->11
        assert_eq!(results, vec!["3", "5", "7", "9", "11"]);
    }

    #[test]
    fn test_pipeline_empty() {
        let results = run_pipeline(vec![]);
        assert!(results.is_empty());
    }

    #[test]
    fn test_pipeline_single_item() {
        let results = run_pipeline(vec![5]);
        assert_eq!(results, vec!["11"]); // 5*2=10, 10+1=11
    }

    #[test]
    fn test_n_stage_pipeline() {
        // +10, *3, -1: 1->11->33->32
        let stages: Vec<Box<dyn Fn(i32) -> i32 + Send + 'static>> = vec![
            Box::new(|x| x + 10),
            Box::new(|x| x * 3),
            Box::new(|x| x - 1),
        ];
        let results = run_n_stages(vec![1], stages);
        assert_eq!(results, vec![32]);
    }

    #[test]
    fn test_stage_closure() {
        let (tx, rx) = mpsc::channel::<i32>();
        let rx_out = pipeline_stage(rx, |x| x * x);

        let h = thread::spawn(move || {
            for v in [2, 3, 4] { tx.send(v).unwrap(); }
        });
        h.join().unwrap();

        let results: Vec<i32> = rx_out.iter().collect();
        assert_eq!(results, vec![4, 9, 16]);
    }
}
(* 984: Channel Pipeline *)
(* Chain of processing stages connected by channels *)
(* Each stage reads from one queue, transforms, writes to next *)

(* --- Simple queue+mutex channel abstraction --- *)

type 'a chan = {
  q: 'a Queue.t;
  m: Mutex.t;
  cond: Condition.t;
  mutable closed: bool;
}

let make_chan () = { q = Queue.create (); m = Mutex.create ();
                     cond = Condition.create (); closed = false }

let send c v =
  Mutex.lock c.m;
  Queue.push v c.q;
  Condition.signal c.cond;
  Mutex.unlock c.m

let close_chan c =
  Mutex.lock c.m;
  c.closed <- true;
  Condition.broadcast c.cond;
  Mutex.unlock c.m

let recv c =
  Mutex.lock c.m;
  while Queue.is_empty c.q && not c.closed do
    Condition.wait c.cond c.m
  done;
  let v = if Queue.is_empty c.q then None else Some (Queue.pop c.q) in
  Mutex.unlock c.m;
  v

(* --- Pipeline: producer -> stage1 -> stage2 -> collector --- *)

(* Stage: reads from in_c, applies f, writes to out_c, then closes out_c *)
let make_stage f in_c out_c =
  Thread.create (fun () ->
    let rec loop () =
      match recv in_c with
      | None -> close_chan out_c
      | Some v -> send out_c (f v); loop ()
    in
    loop ()
  ) ()

let () =
  let c0 = make_chan () in (* source *)
  let c1 = make_chan () in (* after stage1: double *)
  let c2 = make_chan () in (* after stage2: add 1 *)
  let c3 = make_chan () in (* after stage3: to string *)

  let _s1 = make_stage (fun x -> x * 2)        c0 c1 in
  let _s2 = make_stage (fun x -> x + 1)         c1 c2 in
  let _s3 = make_stage (fun x -> string_of_int x) c2 c3 in

  (* Producer: feed 1..5 *)
  let producer = Thread.create (fun () ->
    List.iter (fun i -> send c0 i) [1;2;3;4;5];
    close_chan c0
  ) () in

  (* Collector *)
  let results = ref [] in
  let rec collect () =
    match recv c3 with
    | None -> ()
    | Some v -> results := v :: !results; collect ()
  in
  collect ();
  Thread.join producer;

  let results = List.rev !results in
  (* 1->2->3, 2->4->5, 3->6->7, 4->8->9, 5->10->11 *)
  assert (results = ["3";"5";"7";"9";"11"]);
  Printf.printf "Approach 1 (pipeline): [%s]\n" (String.concat "; " results)

let () = Printf.printf "โœ“ All tests passed\n"

๐Ÿ“Š Detailed Comparison

Channel Pipeline โ€” Comparison

Core Insight

A channel pipeline is function composition in concurrent space: instead of `f โˆ˜ g โˆ˜ h`, you have `stage(f) | stage(g) | stage(h)` where `|` is a channel. This is Unix pipes, CSP, and the actor model all rolled into one pattern.

OCaml Approach

  • No built-in pipeline abstraction โ€” build with `Thread` + `Queue` + `Mutex` + `Condition`
  • Each stage is a thread looping over `recv` calls
  • Close downstream by signalling `closed = true` + broadcasting
  • More boilerplate, but same idea: transform + forward

Rust Approach

  • `pipeline_stage(rx, f)` creates a thread internally, returns new `Receiver`
  • Composable: `let rx2 = pipeline_stage(pipeline_stage(rx0, f), g)`
  • Channel closes automatically when `Sender` drops โ€” propagates through pipeline
  • `rx.iter()` is the idiomatic "read until closed" loop

Comparison Table

ConceptOCamlRust
Stage abstractionManual thread + queue + mutex`fn pipeline_stage(rx, f) -> Receiver`
Close propagationExplicit `closed` flag + broadcastDrop `Sender` โ†’ next stage's `rx.iter()` stops
Back-pressureQueue fills up (manual limit needed)`sync_channel(n)` blocks producer
Compose N stagesCreate N channel/thread pairsChain `pipeline_stage` calls
Collect outputLoop over recv until None`rx.iter().collect()`
ParallelismOne thread per stageOne thread per stage (same)

std vs tokio

Aspectstd versiontokio version
RuntimeOS threads via `std::thread`Async tasks on tokio runtime
Synchronization`std::sync::Mutex`, `Condvar``tokio::sync::Mutex`, channels
Channels`std::sync::mpsc` (unbounded)`tokio::sync::mpsc` (bounded, async)
BlockingThread blocks on lock/recvTask yields, runtime switches tasks
OverheadOne OS thread per taskMany tasks per thread (M:N)
Best forCPU-bound, simple concurrencyI/O-bound, high-concurrency servers