// 985: Select Pattern โ Poll Multiple Channels
// Rust: try_recv loop for non-blocking select over multiple channels
use std::sync::mpsc::{self, TryRecvError};
use std::thread;
use std::time::Duration;
#[derive(Debug, PartialEq)]
enum Selected<A, B> {
Left(A),
Right(B),
BothClosed,
}
// --- Non-blocking select over two channels ---
fn select<A, B>(
rx1: &mpsc::Receiver<A>,
rx2: &mpsc::Receiver<B>,
) -> Selected<A, B> {
let mut r1_closed = false;
let mut r2_closed = false;
loop {
if !r1_closed {
match rx1.try_recv() {
Ok(v) => return Selected::Left(v),
Err(TryRecvError::Disconnected) => r1_closed = true,
Err(TryRecvError::Empty) => {}
}
}
if !r2_closed {
match rx2.try_recv() {
Ok(v) => return Selected::Right(v),
Err(TryRecvError::Disconnected) => r2_closed = true,
Err(TryRecvError::Empty) => {}
}
}
if r1_closed && r2_closed {
return Selected::BothClosed;
}
thread::yield_now();
}
}
// --- Drain both channels via select, categorizing messages ---
fn select_drain(rx1: mpsc::Receiver<i32>, rx2: mpsc::Receiver<String>) -> (Vec<i32>, Vec<String>) {
let mut lefts = Vec::new();
let mut rights = Vec::new();
loop {
match select(&rx1, &rx2) {
Selected::Left(v) => lefts.push(v),
Selected::Right(v) => rights.push(v),
Selected::BothClosed => break,
}
}
(lefts, rights)
}
// --- Priority select: prefer channel 1 when both have data ---
fn priority_recv<T>(high: &mpsc::Receiver<T>, low: &mpsc::Receiver<T>) -> Option<(T, bool)> {
// true = came from high priority
match high.try_recv() {
Ok(v) => Some((v, true)),
Err(_) => low.try_recv().ok().map(|v| (v, false)),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_select_drain() {
let (tx1, rx1) = mpsc::channel::<i32>();
let (tx2, rx2) = mpsc::channel::<String>();
for i in [1, 2, 3] { tx1.send(i).unwrap(); }
for s in ["a", "b", "c"] { tx2.send(s.to_string()).unwrap(); }
drop(tx1); drop(tx2);
let (mut lefts, mut rights) = select_drain(rx1, rx2);
lefts.sort();
rights.sort();
assert_eq!(lefts, vec![1, 2, 3]);
assert_eq!(rights, vec!["a", "b", "c"]);
}
#[test]
fn test_both_closed() {
let (tx1, rx1) = mpsc::channel::<i32>();
let (tx2, rx2) = mpsc::channel::<i32>();
drop(tx1); drop(tx2);
assert_eq!(select(&rx1, &rx2), Selected::BothClosed);
}
#[test]
fn test_priority_recv() {
let (htx, hrx) = mpsc::channel::<i32>();
let (ltx, lrx) = mpsc::channel::<i32>();
htx.send(10).unwrap();
ltx.send(20).unwrap();
// High priority wins
let result = priority_recv(&hrx, &lrx);
assert_eq!(result, Some((10, true)));
// Now only low available
let result2 = priority_recv(&hrx, &lrx);
assert_eq!(result2, Some((20, false)));
}
#[test]
fn test_select_empty_left() {
let (_tx1, rx1) = mpsc::channel::<i32>();
let (tx2, rx2) = mpsc::channel::<i32>();
tx2.send(99).unwrap();
drop(tx2);
// rx1 never closes so we'll get Right(99) first
assert_eq!(select(&rx1, &rx2), Selected::Right(99));
}
}
(* 985: Select Pattern โ Poll Multiple Channels *)
(* OCaml: Lwt.pick / try_receive with non-blocking checks *)
type 'a chan = {
q: 'a Queue.t;
m: Mutex.t;
cond: Condition.t;
mutable closed: bool;
}
let make_chan () = { q = Queue.create (); m = Mutex.create ();
cond = Condition.create (); closed = false }
let send c v =
Mutex.lock c.m;
Queue.push v c.q;
Condition.signal c.cond;
Mutex.unlock c.m
let try_recv c =
Mutex.lock c.m;
let v = if Queue.is_empty c.q then None else Some (Queue.pop c.q) in
Mutex.unlock c.m;
v
let close_chan c =
Mutex.lock c.m;
c.closed <- true;
Condition.broadcast c.cond;
Mutex.unlock c.m
let is_closed c =
Mutex.lock c.m;
let v = c.closed && Queue.is_empty c.q in
Mutex.unlock c.m;
v
(* --- Approach 1: Non-blocking select loop over two channels --- *)
type ('a, 'b) select_result = Left of 'a | Right of 'b | Both_closed
let select c1 c2 =
let rec loop () =
match try_recv c1, try_recv c2 with
| Some v, _ -> Left v
| None, Some v -> Right v
| None, None ->
if is_closed c1 && is_closed c2 then Both_closed
else (Thread.yield (); loop ())
in
loop ()
let () =
let c1 = make_chan () in
let c2 = make_chan () in
(* Producer for c1 *)
let p1 = Thread.create (fun () ->
List.iter (fun i -> send c1 i; Unix.sleepf 0.001) [1;2;3];
close_chan c1
) () in
(* Producer for c2 *)
let p2 = Thread.create (fun () ->
List.iter (fun s -> send c2 s; Unix.sleepf 0.001) ["a";"b";"c"];
close_chan c2
) () in
let lefts = ref [] and rights = ref [] in
let rec drain () =
match select c1 c2 with
| Left v -> lefts := v :: !lefts; drain ()
| Right v -> rights := v :: !rights; drain ()
| Both_closed -> ()
in
drain ();
Thread.join p1; Thread.join p2;
let lefts = List.sort compare !lefts in
let rights = List.sort compare !rights in
assert (lefts = [1;2;3]);
assert (rights = ["a";"b";"c"]);
Printf.printf "Approach 1 (select): lefts=[%s] rights=[%s]\n"
(String.concat ";" (List.map string_of_int lefts))
(String.concat ";" rights)
let () = Printf.printf "โ All tests passed\n"