๐Ÿฆ€ Functional Rust

985: Select Pattern

Difficulty: Intermediate Category: Async / Concurrency FP Patterns Concept: Poll multiple channels non-blocking, handle whichever has data Key Insight: `try_recv()` returns immediately with `Ok(v)` or `Err(Empty|Disconnected)` โ€” loop over all channels with `yield_now()` for a portable select without external crates

Versions

DirectoryDescription
`std/`Standard library version using `std::sync`, `std::thread`
`tokio/`Tokio async runtime version using `tokio::sync`, `tokio::spawn`

Running

# Standard library version
cd std && cargo test

# Tokio version
cd tokio && cargo test
// 985: Select Pattern โ€” Poll Multiple Channels
// Rust: try_recv loop for non-blocking select over multiple channels

use std::sync::mpsc::{self, TryRecvError};
use std::thread;
use std::time::Duration;

#[derive(Debug, PartialEq)]
enum Selected<A, B> {
    Left(A),
    Right(B),
    BothClosed,
}

// --- Non-blocking select over two channels ---
fn select<A, B>(
    rx1: &mpsc::Receiver<A>,
    rx2: &mpsc::Receiver<B>,
) -> Selected<A, B> {
    let mut r1_closed = false;
    let mut r2_closed = false;
    loop {
        if !r1_closed {
            match rx1.try_recv() {
                Ok(v) => return Selected::Left(v),
                Err(TryRecvError::Disconnected) => r1_closed = true,
                Err(TryRecvError::Empty) => {}
            }
        }
        if !r2_closed {
            match rx2.try_recv() {
                Ok(v) => return Selected::Right(v),
                Err(TryRecvError::Disconnected) => r2_closed = true,
                Err(TryRecvError::Empty) => {}
            }
        }
        if r1_closed && r2_closed {
            return Selected::BothClosed;
        }
        thread::yield_now();
    }
}

// --- Drain both channels via select, categorizing messages ---
fn select_drain(rx1: mpsc::Receiver<i32>, rx2: mpsc::Receiver<String>) -> (Vec<i32>, Vec<String>) {
    let mut lefts = Vec::new();
    let mut rights = Vec::new();

    loop {
        match select(&rx1, &rx2) {
            Selected::Left(v) => lefts.push(v),
            Selected::Right(v) => rights.push(v),
            Selected::BothClosed => break,
        }
    }
    (lefts, rights)
}

// --- Priority select: prefer channel 1 when both have data ---
fn priority_recv<T>(high: &mpsc::Receiver<T>, low: &mpsc::Receiver<T>) -> Option<(T, bool)> {
    // true = came from high priority
    match high.try_recv() {
        Ok(v) => Some((v, true)),
        Err(_) => low.try_recv().ok().map(|v| (v, false)),
    }
}


#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_select_drain() {
        let (tx1, rx1) = mpsc::channel::<i32>();
        let (tx2, rx2) = mpsc::channel::<String>();

        for i in [1, 2, 3] { tx1.send(i).unwrap(); }
        for s in ["a", "b", "c"] { tx2.send(s.to_string()).unwrap(); }
        drop(tx1); drop(tx2);

        let (mut lefts, mut rights) = select_drain(rx1, rx2);
        lefts.sort();
        rights.sort();
        assert_eq!(lefts, vec![1, 2, 3]);
        assert_eq!(rights, vec!["a", "b", "c"]);
    }

    #[test]
    fn test_both_closed() {
        let (tx1, rx1) = mpsc::channel::<i32>();
        let (tx2, rx2) = mpsc::channel::<i32>();
        drop(tx1); drop(tx2);
        assert_eq!(select(&rx1, &rx2), Selected::BothClosed);
    }

    #[test]
    fn test_priority_recv() {
        let (htx, hrx) = mpsc::channel::<i32>();
        let (ltx, lrx) = mpsc::channel::<i32>();

        htx.send(10).unwrap();
        ltx.send(20).unwrap();

        // High priority wins
        let result = priority_recv(&hrx, &lrx);
        assert_eq!(result, Some((10, true)));

        // Now only low available
        let result2 = priority_recv(&hrx, &lrx);
        assert_eq!(result2, Some((20, false)));
    }

    #[test]
    fn test_select_empty_left() {
        let (_tx1, rx1) = mpsc::channel::<i32>();
        let (tx2, rx2) = mpsc::channel::<i32>();
        tx2.send(99).unwrap();
        drop(tx2);
        // rx1 never closes so we'll get Right(99) first
        assert_eq!(select(&rx1, &rx2), Selected::Right(99));
    }
}
(* 985: Select Pattern โ€” Poll Multiple Channels *)
(* OCaml: Lwt.pick / try_receive with non-blocking checks *)

type 'a chan = {
  q: 'a Queue.t;
  m: Mutex.t;
  cond: Condition.t;
  mutable closed: bool;
}

let make_chan () = { q = Queue.create (); m = Mutex.create ();
                     cond = Condition.create (); closed = false }

let send c v =
  Mutex.lock c.m;
  Queue.push v c.q;
  Condition.signal c.cond;
  Mutex.unlock c.m

let try_recv c =
  Mutex.lock c.m;
  let v = if Queue.is_empty c.q then None else Some (Queue.pop c.q) in
  Mutex.unlock c.m;
  v

let close_chan c =
  Mutex.lock c.m;
  c.closed <- true;
  Condition.broadcast c.cond;
  Mutex.unlock c.m

let is_closed c =
  Mutex.lock c.m;
  let v = c.closed && Queue.is_empty c.q in
  Mutex.unlock c.m;
  v

(* --- Approach 1: Non-blocking select loop over two channels --- *)

type ('a, 'b) select_result = Left of 'a | Right of 'b | Both_closed

let select c1 c2 =
  let rec loop () =
    match try_recv c1, try_recv c2 with
    | Some v, _ -> Left v
    | None, Some v -> Right v
    | None, None ->
      if is_closed c1 && is_closed c2 then Both_closed
      else (Thread.yield (); loop ())
  in
  loop ()

let () =
  let c1 = make_chan () in
  let c2 = make_chan () in

  (* Producer for c1 *)
  let p1 = Thread.create (fun () ->
    List.iter (fun i -> send c1 i; Unix.sleepf 0.001) [1;2;3];
    close_chan c1
  ) () in

  (* Producer for c2 *)
  let p2 = Thread.create (fun () ->
    List.iter (fun s -> send c2 s; Unix.sleepf 0.001) ["a";"b";"c"];
    close_chan c2
  ) () in

  let lefts = ref [] and rights = ref [] in
  let rec drain () =
    match select c1 c2 with
    | Left v -> lefts := v :: !lefts; drain ()
    | Right v -> rights := v :: !rights; drain ()
    | Both_closed -> ()
  in
  drain ();
  Thread.join p1; Thread.join p2;

  let lefts = List.sort compare !lefts in
  let rights = List.sort compare !rights in
  assert (lefts = [1;2;3]);
  assert (rights = ["a";"b";"c"]);
  Printf.printf "Approach 1 (select): lefts=[%s] rights=[%s]\n"
    (String.concat ";" (List.map string_of_int lefts))
    (String.concat ";" rights)

let () = Printf.printf "โœ“ All tests passed\n"

๐Ÿ“Š Detailed Comparison

Select Pattern โ€” Comparison

Core Insight

`select` picks from whichever channel is ready first. In async runtimes this is a syscall (`epoll`/`kqueue`); in pure std Rust we spin with `try_recv` + `yield_now`. In OCaml, `Lwt.pick` or `Event.select` provides similar semantics.

OCaml Approach

  • `Lwt.pick [p1; p2]` returns the first promise to resolve, cancels others
  • `Event.select [ev1; ev2]` for `Thread`/`Event` module
  • Non-blocking: no built-in `try_receive` โ€” must use `Thread.create` + timeout tricks
  • `try_recv` simulation: poll with `Unix.select` for I/O events

Rust Approach

  • `rx.try_recv()` is non-blocking: `Ok(v)` | `Err(Empty)` | `Err(Disconnected)`
  • Loop over all receivers, `yield_now()` when all empty
  • Priority select: check high-priority channel first
  • For true async select: `crossbeam::select!` macro (external crate)
  • For async/await: `tokio::select!` or `futures::select!`

Comparison Table

ConceptOCamlRust (std)
Select first ready`Lwt.pick [p1; p2]``try_recv` spin loop
Non-blocking recvNo built-in (use timeout)`rx.try_recv()`
Distinguish sourcesPattern match on promise listMatch on `(r1, r2)` tuples
Cancel others`Lwt.pick` cancels losersJust ignore other channels
Priority channelsNot built-inCheck high first in loop
Efficient (no spin)`Lwt.pick` event-driven`crossbeam::select!` (external)

std vs tokio

Aspectstd versiontokio version
RuntimeOS threads via `std::thread`Async tasks on tokio runtime
Synchronization`std::sync::Mutex`, `Condvar``tokio::sync::Mutex`, channels
Channels`std::sync::mpsc` (unbounded)`tokio::sync::mpsc` (bounded, async)
BlockingThread blocks on lock/recvTask yields, runtime switches tasks
OverheadOne OS thread per taskMany tasks per thread (M:N)
Best forCPU-bound, simple concurrencyI/O-bound, high-concurrency servers