๐Ÿฆ€ Functional Rust

754: Testing Async Functions Conceptually

Difficulty: 3 Level: Advanced Test concurrent, time-sensitive logic using the same assertion patterns as sync tests โ€” async tests are just functions with `async fn` and `#[tokio::test]`.

The Problem This Solves

Async code is hard to test for several reasons. First, you need a runtime to drive futures to completion โ€” you can't just call `my_async_fn()` and get a result; you have to `.await` it inside an async context. Second, tests involving timeouts, background tasks, or channels can be flaky if not structured carefully. Third, error messages from panicking inside async tasks are often confusing. Many developers skip async tests entirely and only test the synchronous parts of their application. This leaves the wiring โ€” the actual async coordination โ€” untested, and bugs in that layer only show up in production. The good news: with `#[tokio::test]`, async tests look almost identical to sync tests. The runtime manages the event loop; you write `async fn my_test()` and use `.await` normally.

The Intuition

In Python's `asyncio`, you'd use `pytest-asyncio` and mark tests with `@pytest.mark.asyncio`. In JavaScript, Jest handles `async` test functions natively โ€” just `return` a Promise or use `await`. In Rust, the `#[tokio::test]` attribute (or `#[async_std::test]`) wraps your test function in a single-threaded or multi-threaded runtime. Inside, everything works exactly like production async code. Channels, timeouts, `spawn` โ€” all available. This example demonstrates the structural pattern using threads as a concrete analog, so the concepts are visible without a runtime dependency. The real production pattern uses `#[tokio::test]` directly.

How It Works in Rust

Real async testing with Tokio:
// In Cargo.toml:
// [dev-dependencies]
// tokio = { version = "1", features = ["full", "test-util"] }

#[tokio::test]
async fn handler_returns_correct_response() {
 let result = my_async_handler("GET", "/health").await;
 assert_eq!(result.status, 200);
}

#[tokio::test]
async fn timeout_returns_error() {
 use tokio::time::{timeout, Duration};
 let result = timeout(
     Duration::from_millis(100),
     slow_operation(),
 ).await;
 assert!(result.is_err(), "should time out");
}

#[tokio::test]
async fn channel_delivers_messages_in_order() {
 let (tx, mut rx) = tokio::sync::mpsc::channel(10);
 for i in 0..5u64 {
     tx.send(i).await.unwrap();
 }
 for expected in 0..5u64 {
     assert_eq!(rx.recv().await.unwrap(), expected);
 }
}
The thread-based analog (this example's approach โ€” no runtime dependency):
#[test]
fn worker_processes_single_request() {
 let worker = Worker::start();          // spawns a background thread
 worker.send(Request { id: 1, body: "hello".into() });
 let resp = worker.recv_timeout(Duration::from_secs(2))
     .expect("timed out waiting for response");
 assert_eq!(resp.id, 1);
 assert_eq!(resp.result, "processed:HELLO");
 worker.shutdown();                     // clean teardown
}
Key points:

What This Unlocks

Key Differences

ConceptOCamlRust
Async test runner`Lwt_main.run` in test`#[tokio::test]` attribute
Async channels`Lwt_mvar`, `Lwt_stream``tokio::sync::mpsc`, `watch`, `broadcast`
Timeouts in tests`Lwt_unix.sleep``tokio::time::timeout` + `pause`/`advance`
Parallel test tasks`Lwt.both``tokio::join!` or `tokio::spawn`
Teardown`Lwt_main.run (cleanup ())``worker.shutdown()` or `Drop` impl
No-runtime analogThreads + channels`std::sync::mpsc` + `thread::spawn`
/// 754: Testing Async Code โ€” std::thread as async analog
///
/// In real async Rust, use #[tokio::test]:
/// ```ignore
/// #[tokio::test]
/// async fn test_my_handler() {
///     let result = my_async_fn().await;
///     assert_eq!(result, expected);
/// }
/// ```
///
/// This example shows the structural pattern using threads.

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

// โ”€โ”€ "Async" service modelled with threads โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€

#[derive(Debug)]
pub struct Request {
    pub id:   u64,
    pub body: String,
}

#[derive(Debug, PartialEq)]
pub struct Response {
    pub id:     u64,
    pub result: String,
}

/// A worker that processes requests โ€” simulates an async handler.
pub struct Worker {
    tx:     mpsc::Sender<Request>,
    rx:     mpsc::Receiver<Response>,
    handle: thread::JoinHandle<()>,
}

impl Worker {
    pub fn start() -> Self {
        let (req_tx, req_rx) = mpsc::channel::<Request>();
        let (res_tx, res_rx) = mpsc::channel::<Response>();

        let handle = thread::spawn(move || {
            while let Ok(req) = req_rx.recv() {
                // Simulate async processing
                thread::sleep(Duration::from_millis(1));
                let result = format!("processed:{}", req.body.to_uppercase());
                let _ = res_tx.send(Response { id: req.id, result });
            }
        });

        Worker { tx: req_tx, rx: res_rx, handle }
    }

    pub fn send(&self, req: Request) {
        self.tx.send(req).expect("worker channel closed");
    }

    pub fn recv_timeout(&self, timeout: Duration) -> Option<Response> {
        self.rx.recv_timeout(timeout).ok()
    }

    pub fn shutdown(self) {
        drop(self.tx);  // close channel โ†’ worker exits loop
        self.handle.join().expect("worker panicked");
    }
}

// โ”€โ”€ Rate limiter with background cleanup (another async analog) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€

use std::sync::{Arc, Mutex};
use std::collections::HashMap;

pub struct RateLimiter {
    counts: Arc<Mutex<HashMap<String, u32>>>,
    limit:  u32,
}

impl RateLimiter {
    pub fn new(limit: u32) -> Self {
        let counts = Arc::new(Mutex::new(HashMap::new()));
        let counts2 = Arc::clone(&counts);

        // Background cleaner โ€” simulates async periodic task
        thread::spawn(move || {
            loop {
                thread::sleep(Duration::from_millis(100));
                let mut guard = counts2.lock().unwrap();
                if guard.is_empty() { break; }
                guard.clear();
            }
        });

        RateLimiter { counts, limit }
    }

    pub fn allow(&self, key: &str) -> bool {
        let mut guard = self.counts.lock().unwrap();
        let count = guard.entry(key.to_owned()).or_insert(0);
        if *count < self.limit {
            *count += 1;
            true
        } else {
            false
        }
    }
}

fn main() {
    let worker = Worker::start();

    for i in 0..5u64 {
        worker.send(Request { id: i, body: format!("msg-{}", i) });
    }

    let timeout = Duration::from_secs(2);
    for _ in 0..5 {
        if let Some(resp) = worker.recv_timeout(timeout) {
            println!("Response {}: {}", resp.id, resp.result);
        }
    }

    worker.shutdown();
    println!("Worker shutdown cleanly.");
}

#[cfg(test)]
mod tests {
    use super::*;

    fn timeout() -> Duration { Duration::from_secs(2) }

    #[test]
    fn worker_processes_single_request() {
        let worker = Worker::start();
        worker.send(Request { id: 1, body: "hello".into() });
        let resp = worker.recv_timeout(timeout()).expect("timed out");
        assert_eq!(resp.id, 1);
        assert_eq!(resp.result, "processed:HELLO");
        worker.shutdown();
    }

    #[test]
    fn worker_processes_multiple_requests() {
        let worker = Worker::start();
        let n = 10u64;
        for i in 0..n {
            worker.send(Request { id: i, body: format!("item-{}", i) });
        }
        let mut ids: Vec<u64> = (0..n)
            .filter_map(|_| worker.recv_timeout(timeout()))
            .map(|r| r.id)
            .collect();
        ids.sort();
        assert_eq!(ids.len(), n as usize);
        worker.shutdown();
    }

    #[test]
    fn worker_result_contains_uppercase_body() {
        let worker = Worker::start();
        worker.send(Request { id: 99, body: "rust".into() });
        let resp = worker.recv_timeout(timeout()).unwrap();
        assert!(resp.result.contains("RUST"), "got: {}", resp.result);
        worker.shutdown();
    }

    #[test]
    fn rate_limiter_allows_up_to_limit() {
        let rl = RateLimiter::new(3);
        assert!(rl.allow("user:1"));
        assert!(rl.allow("user:1"));
        assert!(rl.allow("user:1"));
        assert!(!rl.allow("user:1"));  // 4th rejected
    }

    #[test]
    fn rate_limiter_independent_per_key() {
        let rl = RateLimiter::new(2);
        assert!(rl.allow("a"));
        assert!(rl.allow("a"));
        assert!(!rl.allow("a"));       // a exhausted
        assert!(rl.allow("b"));        // b independent
        assert!(rl.allow("b"));
    }
}
(* 754: Testing Async Code โ€” OCaml with Lwt-style simulation using threads *)
(* We use Thread + Mutex to model async behavior in stdlib OCaml *)

let process_message msg =
  (* Simulate async work *)
  Thread.delay 0.001;  (* 1ms *)
  String.uppercase_ascii msg

(* Worker: reads from inbox, writes to outbox *)
let start_worker inbox outbox =
  let running = ref true in
  let thread = Thread.create (fun () ->
    while !running do
      match Queue.take_opt inbox with
      | Some msg ->
        let result = process_message msg in
        Queue.push result outbox
      | None ->
        Thread.delay 0.0001
    done
  ) () in
  (thread, running)

let () =
  let inbox  = Queue.create () in
  let outbox = Queue.create () in
  let (thread, running) = start_worker inbox outbox in

  (* Send messages *)
  Queue.push "hello" inbox;
  Queue.push "world" inbox;

  (* Wait for results *)
  let deadline = Unix.gettimeofday () +. 1.0 in
  let results = ref [] in
  while List.length !results < 2 && Unix.gettimeofday () < deadline do
    (match Queue.take_opt outbox with
    | Some r -> results := r :: !results
    | None   -> Thread.delay 0.001)
  done;

  running := false;
  Thread.join thread;

  let sorted = List.sort compare !results in
  List.iter (Printf.printf "Result: %s\n") sorted;
  assert (List.mem "HELLO" sorted);
  assert (List.mem "WORLD" sorted);
  Printf.printf "Async-style test passed!\n"