// 984: Channel Pipeline
// Chain of processing stages via mpsc channels
use std::sync::mpsc;
use std::thread;
// --- Build a pipeline stage: read from rx, apply f, send to tx ---
fn pipeline_stage<T, U, F>(rx: mpsc::Receiver<T>, f: F) -> mpsc::Receiver<U>
where
T: Send + 'static,
U: Send + 'static,
F: Fn(T) -> U + Send + 'static,
{
let (tx_out, rx_out) = mpsc::channel();
thread::spawn(move || {
for item in rx.iter() { // iter() stops when channel closes
tx_out.send(f(item)).unwrap();
}
// tx_out drops here โ closes next stage
});
rx_out
}
// --- Build a full pipeline from a Vec of boxed functions ---
fn run_pipeline(inputs: Vec<i32>) -> Vec<String> {
let (tx_source, rx0) = mpsc::channel::<i32>();
// Stage 1: double
let rx1 = pipeline_stage(rx0, |x| x * 2);
// Stage 2: add 1
let rx2 = pipeline_stage(rx1, |x| x + 1);
// Stage 3: to string
let rx3 = pipeline_stage(rx2, |x: i32| x.to_string());
// Producer
let producer = thread::spawn(move || {
for v in inputs {
tx_source.send(v).unwrap();
}
// tx_source drops โ closes pipeline
});
// Collect results
let results: Vec<String> = rx3.iter().collect();
producer.join().unwrap();
results
}
// --- Parameterised N-stage pipeline ---
fn run_n_stages(inputs: Vec<i32>, stages: Vec<Box<dyn Fn(i32) -> i32 + Send + 'static>>) -> Vec<i32> {
let (tx_source, mut current_rx) = mpsc::channel::<i32>();
for f in stages {
current_rx = pipeline_stage(current_rx, f);
}
let producer = thread::spawn(move || {
for v in inputs {
tx_source.send(v).unwrap();
}
});
let results: Vec<i32> = current_rx.iter().collect();
producer.join().unwrap();
results
}
fn main() {
let r = run_pipeline(vec![1, 2, 3, 4, 5]);
println!("3-stage pipeline: {:?}", r);
let stages: Vec<Box<dyn Fn(i32) -> i32 + Send + 'static>> = vec![
Box::new(|x| x + 10),
Box::new(|x| x * 3),
Box::new(|x| x - 1),
];
let r2 = run_n_stages(vec![1, 2, 3], stages);
println!("n-stage pipeline: {:?}", r2);
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pipeline_3_stages() {
let results = run_pipeline(vec![1, 2, 3, 4, 5]);
// 1->2->3, 2->4->5, 3->6->7, 4->8->9, 5->10->11
assert_eq!(results, vec!["3", "5", "7", "9", "11"]);
}
#[test]
fn test_pipeline_empty() {
let results = run_pipeline(vec![]);
assert!(results.is_empty());
}
#[test]
fn test_pipeline_single_item() {
let results = run_pipeline(vec![5]);
assert_eq!(results, vec!["11"]); // 5*2=10, 10+1=11
}
#[test]
fn test_n_stage_pipeline() {
// +10, *3, -1: 1->11->33->32
let stages: Vec<Box<dyn Fn(i32) -> i32 + Send + 'static>> = vec![
Box::new(|x| x + 10),
Box::new(|x| x * 3),
Box::new(|x| x - 1),
];
let results = run_n_stages(vec![1], stages);
assert_eq!(results, vec![32]);
}
#[test]
fn test_stage_closure() {
let (tx, rx) = mpsc::channel::<i32>();
let rx_out = pipeline_stage(rx, |x| x * x);
let h = thread::spawn(move || {
for v in [2, 3, 4] { tx.send(v).unwrap(); }
});
h.join().unwrap();
let results: Vec<i32> = rx_out.iter().collect();
assert_eq!(results, vec![4, 9, 16]);
}
}
(* 984: Channel Pipeline *)
(* Chain of processing stages connected by channels *)
(* Each stage reads from one queue, transforms, writes to next *)
(* --- Simple queue+mutex channel abstraction --- *)
type 'a chan = {
q: 'a Queue.t;
m: Mutex.t;
cond: Condition.t;
mutable closed: bool;
}
let make_chan () = { q = Queue.create (); m = Mutex.create ();
cond = Condition.create (); closed = false }
let send c v =
Mutex.lock c.m;
Queue.push v c.q;
Condition.signal c.cond;
Mutex.unlock c.m
let close_chan c =
Mutex.lock c.m;
c.closed <- true;
Condition.broadcast c.cond;
Mutex.unlock c.m
let recv c =
Mutex.lock c.m;
while Queue.is_empty c.q && not c.closed do
Condition.wait c.cond c.m
done;
let v = if Queue.is_empty c.q then None else Some (Queue.pop c.q) in
Mutex.unlock c.m;
v
(* --- Pipeline: producer -> stage1 -> stage2 -> collector --- *)
(* Stage: reads from in_c, applies f, writes to out_c, then closes out_c *)
let make_stage f in_c out_c =
Thread.create (fun () ->
let rec loop () =
match recv in_c with
| None -> close_chan out_c
| Some v -> send out_c (f v); loop ()
in
loop ()
) ()
let () =
let c0 = make_chan () in (* source *)
let c1 = make_chan () in (* after stage1: double *)
let c2 = make_chan () in (* after stage2: add 1 *)
let c3 = make_chan () in (* after stage3: to string *)
let _s1 = make_stage (fun x -> x * 2) c0 c1 in
let _s2 = make_stage (fun x -> x + 1) c1 c2 in
let _s3 = make_stage (fun x -> string_of_int x) c2 c3 in
(* Producer: feed 1..5 *)
let producer = Thread.create (fun () ->
List.iter (fun i -> send c0 i) [1;2;3;4;5];
close_chan c0
) () in
(* Collector *)
let results = ref [] in
let rec collect () =
match recv c3 with
| None -> ()
| Some v -> results := v :: !results; collect ()
in
collect ();
Thread.join producer;
let results = List.rev !results in
(* 1->2->3, 2->4->5, 3->6->7, 4->8->9, 5->10->11 *)
assert (results = ["3";"5";"7";"9";"11"]);
Printf.printf "Approach 1 (pipeline): [%s]\n" (String.concat "; " results)
let () = Printf.printf "โ All tests passed\n"