๐Ÿฆ€ Functional Rust

995: N-Stage Streaming Pipeline

Difficulty: Intermediate Category: Async / Concurrency FP Patterns Concept: Stream processing: map/filter/flat_map stages connected by channels Key Insight: `map_stage(rx, f)` and `filter_stage(rx, pred)` return a new `Receiver` โ€” pipeline stages are composable like iterators, but run concurrently with true parallelism between stages

Versions

DirectoryDescription
`std/`Standard library version using `std::sync`, `std::thread`
`tokio/`Tokio async runtime version using `tokio::sync`, `tokio::spawn`

Running

# Standard library version
cd std && cargo test

# Tokio version
cd tokio && cargo test
// 995: N-Stage Streaming Pipeline
// Each stage is a thread + channel โ€” filter/map/transform stages

use std::sync::mpsc;
use std::thread;

// --- Map stage: applies f to each item ---
fn map_stage<T, U, F>(rx: mpsc::Receiver<T>, f: F) -> mpsc::Receiver<U>
where
    T: Send + 'static,
    U: Send + 'static,
    F: Fn(T) -> U + Send + 'static,
{
    let (tx, out) = mpsc::channel();
    thread::spawn(move || {
        for item in rx.iter() {
            tx.send(f(item)).unwrap();
        }
    });
    out
}

// --- Filter stage: only forward items where pred is true ---
fn filter_stage<T, F>(rx: mpsc::Receiver<T>, pred: F) -> mpsc::Receiver<T>
where
    T: Send + 'static,
    F: Fn(&T) -> bool + Send + 'static,
{
    let (tx, out) = mpsc::channel();
    thread::spawn(move || {
        for item in rx.iter() {
            if pred(&item) {
                tx.send(item).unwrap();
            }
        }
    });
    out
}

// --- Flat-map stage: one item โ†’ multiple outputs ---
fn flat_map_stage<T, U, F>(rx: mpsc::Receiver<T>, f: F) -> mpsc::Receiver<U>
where
    T: Send + 'static,
    U: Send + 'static,
    F: Fn(T) -> Vec<U> + Send + 'static,
{
    let (tx, out) = mpsc::channel();
    thread::spawn(move || {
        for item in rx.iter() {
            for v in f(item) {
                tx.send(v).unwrap();
            }
        }
    });
    out
}

// --- Build a multi-stage pipeline ---
fn pipeline_even_squares() -> Vec<String> {
    let (tx, rx) = mpsc::channel::<i32>();

    // Stage 1: square
    let rx1 = map_stage(rx, |x| x * x);
    // Stage 2: keep even
    let rx2 = filter_stage(rx1, |x| x % 2 == 0);
    // Stage 3: to string
    let rx3 = map_stage(rx2, |x: i32| x.to_string());

    // Producer
    let h = thread::spawn(move || {
        for i in 1..=10 { tx.send(i).unwrap(); }
    });

    let results: Vec<String> = rx3.iter().collect();
    h.join().unwrap();
    results
}

// --- More complex: tokenize โ†’ filter stop words โ†’ count ---
fn word_count_pipeline(text: &str) -> usize {
    let stop_words = vec!["the", "a", "an", "is", "in", "of", "to"];
    let words: Vec<String> = text.split_whitespace()
        .map(|w| w.to_lowercase())
        .collect();

    let (tx, rx) = mpsc::channel::<String>();

    // Stage 1: emit each word
    let rx1 = filter_stage(rx, move |w: &String| !stop_words.contains(&w.as_str()));
    // Stage 2: remove empty
    let rx2 = filter_stage(rx1, |w: &String| !w.is_empty());
    // Stage 3: get length (to count)
    let rx3 = map_stage(rx2, |_: String| 1usize);

    let h = thread::spawn(move || {
        for w in words { tx.send(w).unwrap(); }
    });

    let count: usize = rx3.iter().sum();
    h.join().unwrap();
    count
}

fn main() {
    let results = pipeline_even_squares();
    println!("even squares: {:?}", results);

    let count = word_count_pipeline("the quick brown fox jumps over the lazy dog");
    println!("word count (no stop words): {}", count);
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_pipeline_even_squares() {
        let results = pipeline_even_squares();
        // Squares of 1..10: 1,4,9,16,25,36,49,64,81,100
        // Even: 4,16,36,64,100
        assert_eq!(results, vec!["4", "16", "36", "64", "100"]);
    }

    #[test]
    fn test_map_stage() {
        let (tx, rx) = mpsc::channel::<i32>();
        let out = map_stage(rx, |x| x * 2);
        for i in [1, 2, 3] { tx.send(i).unwrap(); }
        drop(tx);
        let results: Vec<i32> = out.iter().collect();
        assert_eq!(results, vec![2, 4, 6]);
    }

    #[test]
    fn test_filter_stage() {
        let (tx, rx) = mpsc::channel::<i32>();
        let out = filter_stage(rx, |x| x % 2 == 0);
        for i in 1..=6 { tx.send(i).unwrap(); }
        drop(tx);
        let results: Vec<i32> = out.iter().collect();
        assert_eq!(results, vec![2, 4, 6]);
    }

    #[test]
    fn test_flat_map_stage() {
        let (tx, rx) = mpsc::channel::<i32>();
        let out = flat_map_stage(rx, |x| vec![x, x * 10]);
        for i in [1, 2, 3] { tx.send(i).unwrap(); }
        drop(tx);
        let results: Vec<i32> = out.iter().collect();
        assert_eq!(results, vec![1, 10, 2, 20, 3, 30]);
    }

    #[test]
    fn test_word_count_pipeline() {
        let count = word_count_pipeline("the quick brown fox jumps over the lazy dog");
        // 9 words - stop words: the(x2), over -> 6 content words
        assert!(count > 0 && count < 9);
    }
}
(* 995: N-Stage Streaming Pipeline *)
(* Each stage is a thread + channel. Data flows item by item *)

type 'a chan = { q: 'a Queue.t; m: Mutex.t; cond: Condition.t; mutable closed: bool }

let make_chan () = { q = Queue.create (); m = Mutex.create ();
                     cond = Condition.create (); closed = false }

let send c v =
  Mutex.lock c.m; Queue.push v c.q;
  Condition.signal c.cond; Mutex.unlock c.m

let close_chan c =
  Mutex.lock c.m; c.closed <- true;
  Condition.broadcast c.cond; Mutex.unlock c.m

let recv c =
  Mutex.lock c.m;
  while Queue.is_empty c.q && not c.closed do
    Condition.wait c.cond c.m done;
  let v = if Queue.is_empty c.q then None else Some (Queue.pop c.q) in
  Mutex.unlock c.m; v

(* Stage: thread that maps f over incoming items *)
let make_stage in_c f =
  let out_c = make_chan () in
  let _ = Thread.create (fun () ->
    let rec loop () = match recv in_c with
      | None -> close_chan out_c
      | Some v -> send out_c (f v); loop ()
    in loop ()
  ) () in
  out_c

(* Filter stage *)
let make_filter in_c pred =
  let out_c = make_chan () in
  let _ = Thread.create (fun () ->
    let rec loop () = match recv in_c with
      | None -> close_chan out_c
      | Some v -> if pred v then send out_c v; loop ()
    in loop ()
  ) () in
  out_c

(* --- Build a pipeline: source -> stages -> sink --- *)

let () =
  let source = make_chan () in

  (* Pipeline: parse int -> square -> filter even -> to string *)
  let c1 = make_stage source (fun x -> x * x) in
  let c2 = make_filter c1 (fun x -> x mod 2 = 0) in
  let c3 = make_stage c2 string_of_int in

  (* Feed data *)
  let producer = Thread.create (fun () ->
    List.iter (send source) [1;2;3;4;5;6;7;8;9;10];
    close_chan source
  ) () in

  (* Collect *)
  let results = ref [] in
  let rec collect () = match recv c3 with
    | None -> ()
    | Some v -> results := v :: !results; collect ()
  in
  collect ();
  Thread.join producer;

  let results = List.rev !results in
  (* Even squares of 1..10: 4,16,36,64,100 *)
  assert (results = ["4";"16";"36";"64";"100"]);
  Printf.printf "Approach 1 (filter pipeline): [%s]\n"
    (String.concat "; " results)

let () = Printf.printf "โœ“ All tests passed\n"

๐Ÿ“Š Detailed Comparison

N-Stage Streaming Pipeline โ€” Comparison

Core Insight

Channel pipelines are lazy iterators that run in parallel: while stage N processes item K, stage N+1 can process item K-1. This is the concurrent equivalent of `Seq.map f |> Seq.filter p |> Seq.map g`.

OCaml Approach

  • `make_stage in_c f` creates an output channel and spawns a thread
  • `make_filter in_c pred` filters items โ€” only passes matching ones
  • Chained: `let c2 = make_stage (make_stage c0 f) g`
  • Backpressure: natural if using bounded channels
  • Each stage closes its output when input is exhausted

Rust Approach

  • `map_stage(rx, f)` and `filter_stage(rx, pred)` return `Receiver<T>`
  • Composable by chaining: `map_stage(filter_stage(rx, pred), f)`
  • `flat_map_stage` for one-to-many expansion
  • Thread runs `for item in rx.iter()` โ€” stops when channel closes
  • Stages run truly in parallel โ€” all cores can be utilized

Comparison Table

Stage typeOCamlRust
Map`make_stage in_c f``map_stage(rx, f)`
Filter`make_filter in_c pred``filter_stage(rx, pred)`
Flat-mapCustom stage with multiple sends`flat_map_stage(rx, f)`
Compose 2 stages`make_stage (make_stage c0 f) g``map_stage(map_stage(rx, f), g)`
BackpressureBounded queue (manual)`sync_channel(n)` for stage tx
Stage countAny NAny N
Channel closeExplicit `close_chan out_c`Drop `tx` (RAII)

std vs tokio

Aspectstd versiontokio version
RuntimeOS threads via `std::thread`Async tasks on tokio runtime
Synchronization`std::sync::Mutex`, `Condvar``tokio::sync::Mutex`, channels
Channels`std::sync::mpsc` (unbounded)`tokio::sync::mpsc` (bounded, async)
BlockingThread blocks on lock/recvTask yields, runtime switches tasks
OverheadOne OS thread per taskMany tasks per thread (M:N)
Best forCPU-bound, simple concurrencyI/O-bound, high-concurrency servers