๐Ÿฆ€ Functional Rust

325: Racing Futures with select!

Difficulty: 3 Level: Advanced `select!` races multiple futures and returns the first one to finish โ€” the others are cancelled (dropped).

The Problem This Solves

You need to fetch data from a slow external API, but you can't let the user wait forever. Without `select!`, you either block forever or implement complex cancellation logic with shared flags, mutexes, and condition variables. Getting that right is surprisingly hard. `select!` also solves the "try multiple sources" problem: hit your primary cache and a fallback simultaneously, return whichever responds first. Or implement circuit breaking โ€” if a database query takes longer than 100ms, bail out and serve a cached response rather than holding a connection open. This is fundamentally different from `join!` โ€” `join!` requires all tasks to succeed; `select!` races them and discards losers. The cancelled futures are simply dropped, which in Rust means their destructors run and resources are cleaned up safely.

The Intuition

`select!` is like JavaScript's `Promise.race()` or Python's `asyncio.wait(return_when=FIRST_COMPLETED)` โ€” whoever finishes first wins, the rest are abandoned.
join!:    task1 โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
       task2 โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”       โ”‚  โ†’ waits for BOTH
       task3 โ”€โ”€โ”€โ”€โ”€โ”˜   โ†‘ (waits for slowest)

select!:  task1 โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
       task2 โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ† WINNER (first done)
       task3 โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€      โ†’ returns immediately, drops others
The "non-determinism" warning you'll see in tokio's `select!` docs: if multiple futures complete in the same poll cycle, one is chosen arbitrarily. Don't rely on ordering โ€” `select!` is for "I need any one result", not "I need the results in this order." This example uses `mpsc::channel` + `recv_timeout` as the synchronous analogy: threads race to send on a channel, first message wins.

How It Works in Rust

fn race<T: Send + 'static>(
 tasks: Vec<(Box<dyn FnOnce()->T+Send>, &'static str)>
) -> (&'static str, T) {
 let (tx, rx) = mpsc::channel();

 for (f, label) in tasks {
     let tx = tx.clone();
     thread::spawn(move || {
         let _ = tx.send((label, f()));  // first to finish sends its result
     });
 }

 rx.recv().unwrap()  // returns the first message โ€” others may still be running
}

fn with_timeout<T: Send + 'static>(f: Box<dyn FnOnce()->T+Send>, ms: u64) -> Option<T> {
 let (tx, rx) = mpsc::channel();
 thread::spawn(move || { let _ = tx.send(f()); });
 rx.recv_timeout(Duration::from_millis(ms)).ok()  // None if timeout fires first
}
Note `let _ = tx.send(...)` โ€” we ignore the error because if the receiver was dropped (timeout fired), the losers will just fail to send, which is fine. No panic, no resource leak.

What This Unlocks

Key Differences

ConceptOCamlRust
Race futuresno stdlib equivalent`select!` macro (tokio/futures)
First-wins semantics`Lwt.pick [p1; p2]``select!` โ€” first branch wins
Cancellationexception propagationlosers are `drop`ped (destructors run)
Timeout`Lwt_unix.with_timeout``time::timeout(dur, future)`
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn race<T: Send + 'static>(tasks: Vec<(Box<dyn FnOnce()->T+Send>, &'static str)>) -> (&'static str, T) {
    let (tx, rx) = mpsc::channel();
    for (f, label) in tasks {
        let tx = tx.clone();
        thread::spawn(move || { let _ = tx.send((label, f())); });
    }
    rx.recv().unwrap()
}

fn with_timeout<T: Send + 'static>(f: Box<dyn FnOnce()->T+Send>, ms: u64) -> Option<T> {
    let (tx, rx) = mpsc::channel();
    thread::spawn(move || { let _ = tx.send(f()); });
    rx.recv_timeout(Duration::from_millis(ms)).ok()
}

fn main() {
    let tasks: Vec<(Box<dyn FnOnce()->i32+Send>, &'static str)> = vec![
        (Box::new(|| {thread::sleep(Duration::from_millis(50)); 1}), "slow"),
        (Box::new(|| {thread::sleep(Duration::from_millis(10)); 2}), "fast"),
    ];
    let (winner, val) = race(tasks);
    println!("Winner: {winner} = {val}");
    println!("Timeout ok: {:?}", with_timeout(Box::new(|| {thread::sleep(Duration::from_millis(5)); 42}), 100));
    println!("Timeout fail: {:?}", with_timeout(Box::new(|| {thread::sleep(Duration::from_millis(200)); 0}), 50));
}

#[cfg(test)]
mod tests {
    use super::*;
    #[test] fn fastest_wins() {
        let tasks: Vec<(Box<dyn FnOnce()->i32+Send>, &'static str)> = vec![
            (Box::new(||{thread::sleep(Duration::from_millis(50));1}), "slow"),
            (Box::new(||{thread::sleep(Duration::from_millis(5));2}), "fast"),
        ];
        let (_, v) = race(tasks);
        assert_eq!(v, 2);
    }
    #[test] fn timeout_succeeds() { assert_eq!(with_timeout(Box::new(||{thread::sleep(Duration::from_millis(5));99}), 200), Some(99)); }
}
(* OCaml: racing with threads and a channel *)

let race tasks =
  let ch = Event.new_channel () in
  List.iter (fun f ->
    ignore (Thread.create (fun () -> Event.sync (Event.send ch (f ()))) ())
  ) tasks;
  Event.sync (Event.receive ch)

let () =
  let winner = race [
    (fun () -> Thread.delay 0.05; "slow");
    (fun () -> Thread.delay 0.01; "fast");
    (fun () -> Thread.delay 0.03; "medium");
  ] in
  Printf.printf "Winner: %s\n" winner