// 996: Timeout Pattern
// Rust: mpsc::recv_timeout โ like OCaml's Lwt.pick with sleep
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
// --- Approach 1: recv_timeout on a channel ---
fn channel_with_timeout(delay_ms: u64, timeout_ms: u64) -> Result<i32, &'static str> {
let (tx, rx) = mpsc::channel::<i32>();
thread::spawn(move || {
thread::sleep(Duration::from_millis(delay_ms));
tx.send(42).ok(); // may fail if receiver timed out and was dropped
});
match rx.recv_timeout(Duration::from_millis(timeout_ms)) {
Ok(v) => Ok(v),
Err(mpsc::RecvTimeoutError::Timeout) => Err("timeout"),
Err(mpsc::RecvTimeoutError::Disconnected) => Err("disconnected"),
}
}
// --- Approach 2: Run any function with a timeout via thread ---
fn with_timeout<T, F>(timeout: Duration, f: F) -> Option<T>
where
T: Send + 'static,
F: FnOnce() -> T + Send + 'static,
{
let (tx, rx) = mpsc::channel::<T>();
thread::spawn(move || {
let result = f();
tx.send(result).ok();
});
rx.recv_timeout(timeout).ok()
}
// --- Approach 3: First-of-N wins (Lwt.pick analogue) ---
fn race<T: Send + 'static>(
tasks: Vec<Box<dyn FnOnce() -> T + Send + 'static>>,
timeout: Duration,
) -> Option<T> {
let (tx, rx) = mpsc::channel::<T>();
for task in tasks {
let tx = tx.clone();
thread::spawn(move || {
let result = task();
tx.send(result).ok(); // first to arrive wins
});
}
drop(tx); // close original sender
rx.recv_timeout(timeout).ok()
}
// --- Approach 4: Retry with overall deadline ---
fn retry_with_deadline<T, E, F>(
max_attempts: usize,
timeout_per_attempt: Duration,
f: F,
) -> Result<T, &'static str>
where
T: Send + 'static,
E: Send + 'static,
F: Fn() -> Result<T, E> + Send + Sync + Clone + 'static,
{
for attempt in 0..max_attempts {
let f = f.clone();
let result = with_timeout(timeout_per_attempt, move || f());
match result {
Some(Ok(v)) => return Ok(v),
Some(Err(_)) | None => {
if attempt + 1 < max_attempts {
thread::sleep(Duration::from_millis(1 << attempt));
}
}
}
}
Err("max attempts exceeded")
}
fn main() {
// Fast operation โ succeeds
let r = channel_with_timeout(10, 500);
println!("fast op: {:?}", r);
// Slow operation โ times out
let r = channel_with_timeout(200, 50);
println!("slow op: {:?}", r);
// with_timeout helper
let r = with_timeout(Duration::from_millis(100), || {
thread::sleep(Duration::from_millis(10));
"done"
});
println!("with_timeout: {:?}", r);
// Race: fastest thread wins
let tasks: Vec<Box<dyn FnOnce() -> &'static str + Send + 'static>> = vec![
Box::new(|| { thread::sleep(Duration::from_millis(50)); "slow" }),
Box::new(|| { thread::sleep(Duration::from_millis(5)); "fast" }),
];
let winner = race(tasks, Duration::from_millis(200));
println!("race winner: {:?}", winner);
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_recv_before_timeout() {
let result = channel_with_timeout(10, 500);
assert_eq!(result, Ok(42));
}
#[test]
fn test_recv_after_timeout() {
let result = channel_with_timeout(200, 20);
assert_eq!(result, Err("timeout"));
}
#[test]
fn test_with_timeout_succeeds() {
let result = with_timeout(Duration::from_millis(500), || {
thread::sleep(Duration::from_millis(5));
99i32
});
assert_eq!(result, Some(99));
}
#[test]
fn test_with_timeout_expires() {
let result = with_timeout(Duration::from_millis(5), || {
thread::sleep(Duration::from_millis(100));
99i32
});
assert_eq!(result, None);
}
#[test]
fn test_race_fastest_wins() {
let tasks: Vec<Box<dyn FnOnce() -> u32 + Send + 'static>> = vec![
Box::new(|| { thread::sleep(Duration::from_millis(50)); 1 }),
Box::new(|| { thread::sleep(Duration::from_millis(5)); 2 }),
Box::new(|| { thread::sleep(Duration::from_millis(30)); 3 }),
];
let winner = race(tasks, Duration::from_millis(200));
assert_eq!(winner, Some(2)); // fastest thread wins
}
#[test]
fn test_recv_timeout_error_types() {
let (_, rx) = mpsc::channel::<i32>();
// Disconnected immediately (no sender)
let err = rx.recv_timeout(Duration::from_millis(1));
assert!(err.is_err());
}
}
(* 996: Timeout Pattern *)
(* OCaml: Lwt.pick [operation; Lwt_unix.sleep timeout] concept *)
(* Pure Thread version: run operation in separate thread, wait with timeout *)
(* --- Approach 1: Thread + timed wait via Condition --- *)
type 'a timed_result = Ok of 'a | Timeout | Error of string
let with_timeout_secs timeout_s f =
let result = ref None in
let m = Mutex.create () in
let cond = Condition.create () in
let worker = Thread.create (fun () ->
let v = (try Some (f ()) with e -> Some (Error (Printexc.to_string e))) in
Mutex.lock m;
result := Some v;
Condition.signal cond;
Mutex.unlock m
) () in
Mutex.lock m;
let deadline = Unix.gettimeofday () +. timeout_s in
while !result = None do
let remaining = deadline -. Unix.gettimeofday () in
if remaining <= 0.0 then (
result := Some (Some (Error "forced timeout"));
(* Note: OCaml has no thread kill โ worker will finish eventually *)
) else
Condition.wait cond m
(* In real Lwt: Lwt.pick cancels the losing promise *)
done;
Mutex.unlock m;
Thread.join worker;
match !result with
| None | Some None -> Timeout
| Some (Some (Error msg)) when msg = "forced timeout" -> Timeout
| Some (Some (Error msg)) -> Error msg
| Some (Some v) -> Ok v
(* --- Approach 1: fast operation completes in time --- *)
let () =
let r = with_timeout_secs 1.0 (fun () ->
Unix.sleepf 0.01;
42
) in
(match r with
| Ok v -> assert (v = 42); Printf.printf "Approach 1 (ok): %d\n" v
| Timeout -> assert false
| Error e -> Printf.printf "Error: %s\n" e)
(* --- Approach 2: Simulated recv_timeout (channel with deadline) --- *)
type 'a chan = { q: 'a Queue.t; m: Mutex.t; cond: Condition.t }
let make_chan () = { q = Queue.create (); m = Mutex.create (); cond = Condition.create () }
let send c v =
Mutex.lock c.m; Queue.push v c.q;
Condition.signal c.cond; Mutex.unlock c.m
let recv_timeout c timeout_s =
let deadline = Unix.gettimeofday () +. timeout_s in
Mutex.lock c.m;
while Queue.is_empty c.q && Unix.gettimeofday () < deadline do
let remaining = deadline -. Unix.gettimeofday () in
if remaining > 0.0 then
Condition.wait c.m c.cond (* simplified: real code uses timed wait *)
done;
let v = if Queue.is_empty c.q then None else Some (Queue.pop c.q) in
Mutex.unlock c.m;
v
let () =
let c = make_chan () in
let _ = Thread.create (fun () ->
Unix.sleepf 0.02;
send c 99
) () in
(* Very short timeout โ will miss the send *)
match recv_timeout c 0.001 with
| None -> Printf.printf "Approach 2 (timeout): timed out as expected\n"
| Some v -> Printf.printf "Approach 2 (got): %d\n" v
let () = Printf.printf "โ All tests passed\n"