325: Racing Futures with select!
Difficulty: 3 Level: Advanced `select!` races multiple futures and returns the first one to finish โ the others are cancelled (dropped).The Problem This Solves
You need to fetch data from a slow external API, but you can't let the user wait forever. Without `select!`, you either block forever or implement complex cancellation logic with shared flags, mutexes, and condition variables. Getting that right is surprisingly hard. `select!` also solves the "try multiple sources" problem: hit your primary cache and a fallback simultaneously, return whichever responds first. Or implement circuit breaking โ if a database query takes longer than 100ms, bail out and serve a cached response rather than holding a connection open. This is fundamentally different from `join!` โ `join!` requires all tasks to succeed; `select!` races them and discards losers. The cancelled futures are simply dropped, which in Rust means their destructors run and resources are cleaned up safely.The Intuition
`select!` is like JavaScript's `Promise.race()` or Python's `asyncio.wait(return_when=FIRST_COMPLETED)` โ whoever finishes first wins, the rest are abandoned.join!: task1 โโโโโโโโโโโโโโโโโโโ
task2 โโโโโโโโโโโ โ โ waits for BOTH
task3 โโโโโโ โ (waits for slowest)
select!: task1 โโโโโโโโโโโโโโโโโโ
task2 โโโโโโโโโโโ โ WINNER (first done)
task3 โโโโโโโโโโโโโ โ returns immediately, drops others
The "non-determinism" warning you'll see in tokio's `select!` docs: if multiple futures complete in the same poll cycle, one is chosen arbitrarily. Don't rely on ordering โ `select!` is for "I need any one result", not "I need the results in this order."
This example uses `mpsc::channel` + `recv_timeout` as the synchronous analogy: threads race to send on a channel, first message wins.
How It Works in Rust
fn race<T: Send + 'static>(
tasks: Vec<(Box<dyn FnOnce()->T+Send>, &'static str)>
) -> (&'static str, T) {
let (tx, rx) = mpsc::channel();
for (f, label) in tasks {
let tx = tx.clone();
thread::spawn(move || {
let _ = tx.send((label, f())); // first to finish sends its result
});
}
rx.recv().unwrap() // returns the first message โ others may still be running
}
fn with_timeout<T: Send + 'static>(f: Box<dyn FnOnce()->T+Send>, ms: u64) -> Option<T> {
let (tx, rx) = mpsc::channel();
thread::spawn(move || { let _ = tx.send(f()); });
rx.recv_timeout(Duration::from_millis(ms)).ok() // None if timeout fires first
}
Note `let _ = tx.send(...)` โ we ignore the error because if the receiver was dropped (timeout fired), the losers will just fail to send, which is fine. No panic, no resource leak.
What This Unlocks
- Timeouts: Wrap any async operation with a deadline โ return an error if it doesn't complete in time.
- Fallback sources: Race primary and secondary data sources, use whichever responds first.
- Cancellation: `select!` with a cancellation channel lets you cleanly abort long-running tasks on demand.
Key Differences
| Concept | OCaml | Rust |
|---|---|---|
| Race futures | no stdlib equivalent | `select!` macro (tokio/futures) |
| First-wins semantics | `Lwt.pick [p1; p2]` | `select!` โ first branch wins |
| Cancellation | exception propagation | losers are `drop`ped (destructors run) |
| Timeout | `Lwt_unix.with_timeout` | `time::timeout(dur, future)` |
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn race<T: Send + 'static>(tasks: Vec<(Box<dyn FnOnce()->T+Send>, &'static str)>) -> (&'static str, T) {
let (tx, rx) = mpsc::channel();
for (f, label) in tasks {
let tx = tx.clone();
thread::spawn(move || { let _ = tx.send((label, f())); });
}
rx.recv().unwrap()
}
fn with_timeout<T: Send + 'static>(f: Box<dyn FnOnce()->T+Send>, ms: u64) -> Option<T> {
let (tx, rx) = mpsc::channel();
thread::spawn(move || { let _ = tx.send(f()); });
rx.recv_timeout(Duration::from_millis(ms)).ok()
}
fn main() {
let tasks: Vec<(Box<dyn FnOnce()->i32+Send>, &'static str)> = vec![
(Box::new(|| {thread::sleep(Duration::from_millis(50)); 1}), "slow"),
(Box::new(|| {thread::sleep(Duration::from_millis(10)); 2}), "fast"),
];
let (winner, val) = race(tasks);
println!("Winner: {winner} = {val}");
println!("Timeout ok: {:?}", with_timeout(Box::new(|| {thread::sleep(Duration::from_millis(5)); 42}), 100));
println!("Timeout fail: {:?}", with_timeout(Box::new(|| {thread::sleep(Duration::from_millis(200)); 0}), 50));
}
#[cfg(test)]
mod tests {
use super::*;
#[test] fn fastest_wins() {
let tasks: Vec<(Box<dyn FnOnce()->i32+Send>, &'static str)> = vec![
(Box::new(||{thread::sleep(Duration::from_millis(50));1}), "slow"),
(Box::new(||{thread::sleep(Duration::from_millis(5));2}), "fast"),
];
let (_, v) = race(tasks);
assert_eq!(v, 2);
}
#[test] fn timeout_succeeds() { assert_eq!(with_timeout(Box::new(||{thread::sleep(Duration::from_millis(5));99}), 200), Some(99)); }
}
(* OCaml: racing with threads and a channel *)
let race tasks =
let ch = Event.new_channel () in
List.iter (fun f ->
ignore (Thread.create (fun () -> Event.sync (Event.send ch (f ()))) ())
) tasks;
Event.sync (Event.receive ch)
let () =
let winner = race [
(fun () -> Thread.delay 0.05; "slow");
(fun () -> Thread.delay 0.01; "fast");
(fun () -> Thread.delay 0.03; "medium");
] in
Printf.printf "Winner: %s\n" winner