// 462. Pipeline with channels
use std::sync::mpsc;
use std::thread;
fn pipe<T,U,F>(rx: mpsc::Receiver<T>, f: F) -> mpsc::Receiver<U>
where T:Send+'static, U:Send+'static, F:Fn(T)->U+Send+'static {
let (tx,next)=mpsc::channel();
thread::spawn(move || { for v in rx { if tx.send(f(v)).is_err() { break; } } });
next
}
fn main() {
let (src, rx) = mpsc::channel::<i64>();
let rx = pipe(rx, |x| x * 2);
let rx = pipe(rx, |x| x + 1);
let rx = pipe(rx, |x: i64| format!("val={}", x));
for i in 1..=5 { src.send(i).unwrap(); }
drop(src);
let results: Vec<String> = rx.iter().collect();
println!("{:?}", results);
// ["val=3","val=5","val=7","val=9","val=11"]
}
#[cfg(test)]
mod tests {
use super::*;
#[test] fn test_pipeline() {
let (tx,rx)=mpsc::channel::<i64>();
let rx=pipe(rx,|x| x*3);
let rx=pipe(rx,|x| x-1);
for i in 1..=4 { tx.send(i).unwrap(); } drop(tx);
let r:Vec<i64>=rx.iter().collect();
assert_eq!(r,vec![2,5,8,11]);
}
}
(* 462. Pipeline โ OCaml *)
let make_chan () =
let q=Queue.create () let m=Mutex.create () let c=Condition.create () in
let send v=Mutex.lock m; Queue.push (Some v) q; Condition.signal c; Mutex.unlock m in
let close ()=Mutex.lock m; Queue.push None q; Condition.signal c; Mutex.unlock m in
let recv ()=Mutex.lock m; while Queue.is_empty q do Condition.wait c m done;
let v=Queue.pop q in Mutex.unlock m; v in
(send,close,recv)
let stage f send2 close2 recv =
Thread.create (fun () ->
let rec loop () = match recv () with
| None -> close2 ()
| Some v -> send2 (f v); loop ()
in loop ()
) ()
let () =
let (s1,c1,r1)=make_chan () in
let (s2,c2,r2)=make_chan () in
let (s3,c3,r3)=make_chan () in
let t1=stage (fun x->x*2) s2 c2 r1 in
let t2=stage (fun x->x+1) s3 c3 r2 in
for i=1 to 5 do s1 i done; c1 ();
Thread.join t1; Thread.join t2;
let rec collect () = match r3 () with None->[] | Some v->v::collect () in
Printf.printf "%s\n" (String.concat " " (List.map string_of_int (collect ())))