๐Ÿฆ€ Functional Rust

996: Timeout Pattern

Difficulty: Intermediate Category: Async / Concurrency FP Patterns Concept: Bound operation time โ€” fail fast if no result within deadline Key Insight: `rx.recv_timeout(duration)` is the std primitive; `with_timeout(dur, f)` wraps any `FnOnce` in a thread and uses `recv_timeout` โ€” equivalent to `Lwt.pick [f (); Lwt_unix.sleep t]`

Versions

DirectoryDescription
`std/`Standard library version using `std::sync`, `std::thread`
`tokio/`Tokio async runtime version using `tokio::sync`, `tokio::spawn`

Running

# Standard library version
cd std && cargo test

# Tokio version
cd tokio && cargo test
// 996: Timeout Pattern
// Rust: mpsc::recv_timeout โ€” like OCaml's Lwt.pick with sleep

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

// --- Approach 1: recv_timeout on a channel ---
fn channel_with_timeout(delay_ms: u64, timeout_ms: u64) -> Result<i32, &'static str> {
    let (tx, rx) = mpsc::channel::<i32>();

    thread::spawn(move || {
        thread::sleep(Duration::from_millis(delay_ms));
        tx.send(42).ok(); // may fail if receiver timed out and was dropped
    });

    match rx.recv_timeout(Duration::from_millis(timeout_ms)) {
        Ok(v) => Ok(v),
        Err(mpsc::RecvTimeoutError::Timeout) => Err("timeout"),
        Err(mpsc::RecvTimeoutError::Disconnected) => Err("disconnected"),
    }
}

// --- Approach 2: Run any function with a timeout via thread ---
fn with_timeout<T, F>(timeout: Duration, f: F) -> Option<T>
where
    T: Send + 'static,
    F: FnOnce() -> T + Send + 'static,
{
    let (tx, rx) = mpsc::channel::<T>();
    thread::spawn(move || {
        let result = f();
        tx.send(result).ok();
    });
    rx.recv_timeout(timeout).ok()
}

// --- Approach 3: First-of-N wins (Lwt.pick analogue) ---
fn race<T: Send + 'static>(
    tasks: Vec<Box<dyn FnOnce() -> T + Send + 'static>>,
    timeout: Duration,
) -> Option<T> {
    let (tx, rx) = mpsc::channel::<T>();

    for task in tasks {
        let tx = tx.clone();
        thread::spawn(move || {
            let result = task();
            tx.send(result).ok(); // first to arrive wins
        });
    }
    drop(tx); // close original sender

    rx.recv_timeout(timeout).ok()
}

// --- Approach 4: Retry with overall deadline ---
fn retry_with_deadline<T, E, F>(
    max_attempts: usize,
    timeout_per_attempt: Duration,
    f: F,
) -> Result<T, &'static str>
where
    T: Send + 'static,
    E: Send + 'static,
    F: Fn() -> Result<T, E> + Send + Sync + Clone + 'static,
{
    for attempt in 0..max_attempts {
        let f = f.clone();
        let result = with_timeout(timeout_per_attempt, move || f());
        match result {
            Some(Ok(v)) => return Ok(v),
            Some(Err(_)) | None => {
                if attempt + 1 < max_attempts {
                    thread::sleep(Duration::from_millis(1 << attempt));
                }
            }
        }
    }
    Err("max attempts exceeded")
}

fn main() {
    // Fast operation โ€” succeeds
    let r = channel_with_timeout(10, 500);
    println!("fast op: {:?}", r);

    // Slow operation โ€” times out
    let r = channel_with_timeout(200, 50);
    println!("slow op: {:?}", r);

    // with_timeout helper
    let r = with_timeout(Duration::from_millis(100), || {
        thread::sleep(Duration::from_millis(10));
        "done"
    });
    println!("with_timeout: {:?}", r);

    // Race: fastest thread wins
    let tasks: Vec<Box<dyn FnOnce() -> &'static str + Send + 'static>> = vec![
        Box::new(|| { thread::sleep(Duration::from_millis(50)); "slow" }),
        Box::new(|| { thread::sleep(Duration::from_millis(5));  "fast" }),
    ];
    let winner = race(tasks, Duration::from_millis(200));
    println!("race winner: {:?}", winner);
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_recv_before_timeout() {
        let result = channel_with_timeout(10, 500);
        assert_eq!(result, Ok(42));
    }

    #[test]
    fn test_recv_after_timeout() {
        let result = channel_with_timeout(200, 20);
        assert_eq!(result, Err("timeout"));
    }

    #[test]
    fn test_with_timeout_succeeds() {
        let result = with_timeout(Duration::from_millis(500), || {
            thread::sleep(Duration::from_millis(5));
            99i32
        });
        assert_eq!(result, Some(99));
    }

    #[test]
    fn test_with_timeout_expires() {
        let result = with_timeout(Duration::from_millis(5), || {
            thread::sleep(Duration::from_millis(100));
            99i32
        });
        assert_eq!(result, None);
    }

    #[test]
    fn test_race_fastest_wins() {
        let tasks: Vec<Box<dyn FnOnce() -> u32 + Send + 'static>> = vec![
            Box::new(|| { thread::sleep(Duration::from_millis(50)); 1 }),
            Box::new(|| { thread::sleep(Duration::from_millis(5)); 2 }),
            Box::new(|| { thread::sleep(Duration::from_millis(30)); 3 }),
        ];
        let winner = race(tasks, Duration::from_millis(200));
        assert_eq!(winner, Some(2)); // fastest thread wins
    }

    #[test]
    fn test_recv_timeout_error_types() {
        let (_, rx) = mpsc::channel::<i32>();
        // Disconnected immediately (no sender)
        let err = rx.recv_timeout(Duration::from_millis(1));
        assert!(err.is_err());
    }
}
(* 996: Timeout Pattern *)
(* OCaml: Lwt.pick [operation; Lwt_unix.sleep timeout] concept *)
(* Pure Thread version: run operation in separate thread, wait with timeout *)

(* --- Approach 1: Thread + timed wait via Condition --- *)

type 'a timed_result = Ok of 'a | Timeout | Error of string

let with_timeout_secs timeout_s f =
  let result = ref None in
  let m = Mutex.create () in
  let cond = Condition.create () in

  let worker = Thread.create (fun () ->
    let v = (try Some (f ()) with e -> Some (Error (Printexc.to_string e))) in
    Mutex.lock m;
    result := Some v;
    Condition.signal cond;
    Mutex.unlock m
  ) () in

  Mutex.lock m;
  let deadline = Unix.gettimeofday () +. timeout_s in
  while !result = None do
    let remaining = deadline -. Unix.gettimeofday () in
    if remaining <= 0.0 then (
      result := Some (Some (Error "forced timeout"));
      (* Note: OCaml has no thread kill โ€” worker will finish eventually *)
    ) else
      Condition.wait cond m
      (* In real Lwt: Lwt.pick cancels the losing promise *)
  done;
  Mutex.unlock m;
  Thread.join worker;

  match !result with
  | None | Some None -> Timeout
  | Some (Some (Error msg)) when msg = "forced timeout" -> Timeout
  | Some (Some (Error msg)) -> Error msg
  | Some (Some v) -> Ok v

(* --- Approach 1: fast operation completes in time --- *)

let () =
  let r = with_timeout_secs 1.0 (fun () ->
    Unix.sleepf 0.01;
    42
  ) in
  (match r with
  | Ok v -> assert (v = 42); Printf.printf "Approach 1 (ok): %d\n" v
  | Timeout -> assert false
  | Error e -> Printf.printf "Error: %s\n" e)

(* --- Approach 2: Simulated recv_timeout (channel with deadline) --- *)

type 'a chan = { q: 'a Queue.t; m: Mutex.t; cond: Condition.t }

let make_chan () = { q = Queue.create (); m = Mutex.create (); cond = Condition.create () }

let send c v =
  Mutex.lock c.m; Queue.push v c.q;
  Condition.signal c.cond; Mutex.unlock c.m

let recv_timeout c timeout_s =
  let deadline = Unix.gettimeofday () +. timeout_s in
  Mutex.lock c.m;
  while Queue.is_empty c.q && Unix.gettimeofday () < deadline do
    let remaining = deadline -. Unix.gettimeofday () in
    if remaining > 0.0 then
      Condition.wait c.m c.cond  (* simplified: real code uses timed wait *)
  done;
  let v = if Queue.is_empty c.q then None else Some (Queue.pop c.q) in
  Mutex.unlock c.m;
  v

let () =
  let c = make_chan () in
  let _ = Thread.create (fun () ->
    Unix.sleepf 0.02;
    send c 99
  ) () in
  (* Very short timeout โ€” will miss the send *)
  match recv_timeout c 0.001 with
  | None -> Printf.printf "Approach 2 (timeout): timed out as expected\n"
  | Some v -> Printf.printf "Approach 2 (got): %d\n" v

let () = Printf.printf "โœ“ All tests passed\n"

๐Ÿ“Š Detailed Comparison

Timeout Pattern โ€” Comparison

Core Insight

Timeouts express "I'd rather fail fast than wait forever." In OCaml's Lwt: `Lwt.pick [op; sleep]` races two promises and takes the first. In Rust std: `recv_timeout` is the primitive, or wrap in a thread for arbitrary operations.

OCaml Approach

  • `Lwt.pick [operation; Lwt_unix.sleep timeout]` โ€” cancels the loser
  • Thread-based: spawn worker, timed `Condition.wait` with deadline
  • OCaml cannot kill threads โ€” worker keeps running after "timeout"
  • `Unix.gettimeofday` for wall-clock deadline tracking

Rust Approach

  • `rx.recv_timeout(Duration)` โ†’ `Result<T, RecvTimeoutError>`
  • `RecvTimeoutError::Timeout` vs `RecvTimeoutError::Disconnected`
  • `with_timeout(dur, f)` pattern: spawn thread, recv_timeout, discard handle
  • The "lost" thread keeps running but its channel is dropped โ€” no cleanup needed
  • `race(tasks, timeout)` for "first-of-N" / Lwt.pick over multiple computations

Comparison Table

ConceptOCaml (Lwt)Rust
Timeout primitive`Lwt_unix.sleep t``rx.recv_timeout(Duration::from_millis(t))`
Race two futures`Lwt.pick [f; sleep t]``race([task], timeout)`
Timeout result`exception` or `None``Err(RecvTimeoutError::Timeout)`
CancellationLwt cancels the losing promiseThread keeps running (can't kill)
Timed channel recvManual Condition.wait with deadline`rx.recv_timeout(dur)`
Wrap arbitrary work`Lwt.wrap (fun () -> ...)``with_timeout(dur,f())`

std vs tokio

Aspectstd versiontokio version
RuntimeOS threads via `std::thread`Async tasks on tokio runtime
Synchronization`std::sync::Mutex`, `Condvar``tokio::sync::Mutex`, channels
Channels`std::sync::mpsc` (unbounded)`tokio::sync::mpsc` (bounded, async)
BlockingThread blocks on lock/recvTask yields, runtime switches tasks
OverheadOne OS thread per taskMany tasks per thread (M:N)
Best forCPU-bound, simple concurrencyI/O-bound, high-concurrency servers