/// 754: Testing Async Code โ std::thread as async analog
///
/// In real async Rust, use #[tokio::test]:
/// ```ignore
/// #[tokio::test]
/// async fn test_my_handler() {
/// let result = my_async_fn().await;
/// assert_eq!(result, expected);
/// }
/// ```
///
/// This example shows the structural pattern using threads.
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
// โโ "Async" service modelled with threads โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
#[derive(Debug)]
pub struct Request {
pub id: u64,
pub body: String,
}
#[derive(Debug, PartialEq)]
pub struct Response {
pub id: u64,
pub result: String,
}
/// A worker that processes requests โ simulates an async handler.
pub struct Worker {
tx: mpsc::Sender<Request>,
rx: mpsc::Receiver<Response>,
handle: thread::JoinHandle<()>,
}
impl Worker {
pub fn start() -> Self {
let (req_tx, req_rx) = mpsc::channel::<Request>();
let (res_tx, res_rx) = mpsc::channel::<Response>();
let handle = thread::spawn(move || {
while let Ok(req) = req_rx.recv() {
// Simulate async processing
thread::sleep(Duration::from_millis(1));
let result = format!("processed:{}", req.body.to_uppercase());
let _ = res_tx.send(Response { id: req.id, result });
}
});
Worker { tx: req_tx, rx: res_rx, handle }
}
pub fn send(&self, req: Request) {
self.tx.send(req).expect("worker channel closed");
}
pub fn recv_timeout(&self, timeout: Duration) -> Option<Response> {
self.rx.recv_timeout(timeout).ok()
}
pub fn shutdown(self) {
drop(self.tx); // close channel โ worker exits loop
self.handle.join().expect("worker panicked");
}
}
// โโ Rate limiter with background cleanup (another async analog) โโโโโโโโโโโโโโโ
use std::sync::{Arc, Mutex};
use std::collections::HashMap;
pub struct RateLimiter {
counts: Arc<Mutex<HashMap<String, u32>>>,
limit: u32,
}
impl RateLimiter {
pub fn new(limit: u32) -> Self {
let counts = Arc::new(Mutex::new(HashMap::new()));
let counts2 = Arc::clone(&counts);
// Background cleaner โ simulates async periodic task
thread::spawn(move || {
loop {
thread::sleep(Duration::from_millis(100));
let mut guard = counts2.lock().unwrap();
if guard.is_empty() { break; }
guard.clear();
}
});
RateLimiter { counts, limit }
}
pub fn allow(&self, key: &str) -> bool {
let mut guard = self.counts.lock().unwrap();
let count = guard.entry(key.to_owned()).or_insert(0);
if *count < self.limit {
*count += 1;
true
} else {
false
}
}
}
fn main() {
let worker = Worker::start();
for i in 0..5u64 {
worker.send(Request { id: i, body: format!("msg-{}", i) });
}
let timeout = Duration::from_secs(2);
for _ in 0..5 {
if let Some(resp) = worker.recv_timeout(timeout) {
println!("Response {}: {}", resp.id, resp.result);
}
}
worker.shutdown();
println!("Worker shutdown cleanly.");
}
#[cfg(test)]
mod tests {
use super::*;
fn timeout() -> Duration { Duration::from_secs(2) }
#[test]
fn worker_processes_single_request() {
let worker = Worker::start();
worker.send(Request { id: 1, body: "hello".into() });
let resp = worker.recv_timeout(timeout()).expect("timed out");
assert_eq!(resp.id, 1);
assert_eq!(resp.result, "processed:HELLO");
worker.shutdown();
}
#[test]
fn worker_processes_multiple_requests() {
let worker = Worker::start();
let n = 10u64;
for i in 0..n {
worker.send(Request { id: i, body: format!("item-{}", i) });
}
let mut ids: Vec<u64> = (0..n)
.filter_map(|_| worker.recv_timeout(timeout()))
.map(|r| r.id)
.collect();
ids.sort();
assert_eq!(ids.len(), n as usize);
worker.shutdown();
}
#[test]
fn worker_result_contains_uppercase_body() {
let worker = Worker::start();
worker.send(Request { id: 99, body: "rust".into() });
let resp = worker.recv_timeout(timeout()).unwrap();
assert!(resp.result.contains("RUST"), "got: {}", resp.result);
worker.shutdown();
}
#[test]
fn rate_limiter_allows_up_to_limit() {
let rl = RateLimiter::new(3);
assert!(rl.allow("user:1"));
assert!(rl.allow("user:1"));
assert!(rl.allow("user:1"));
assert!(!rl.allow("user:1")); // 4th rejected
}
#[test]
fn rate_limiter_independent_per_key() {
let rl = RateLimiter::new(2);
assert!(rl.allow("a"));
assert!(rl.allow("a"));
assert!(!rl.allow("a")); // a exhausted
assert!(rl.allow("b")); // b independent
assert!(rl.allow("b"));
}
}
(* 754: Testing Async Code โ OCaml with Lwt-style simulation using threads *)
(* We use Thread + Mutex to model async behavior in stdlib OCaml *)
let process_message msg =
(* Simulate async work *)
Thread.delay 0.001; (* 1ms *)
String.uppercase_ascii msg
(* Worker: reads from inbox, writes to outbox *)
let start_worker inbox outbox =
let running = ref true in
let thread = Thread.create (fun () ->
while !running do
match Queue.take_opt inbox with
| Some msg ->
let result = process_message msg in
Queue.push result outbox
| None ->
Thread.delay 0.0001
done
) () in
(thread, running)
let () =
let inbox = Queue.create () in
let outbox = Queue.create () in
let (thread, running) = start_worker inbox outbox in
(* Send messages *)
Queue.push "hello" inbox;
Queue.push "world" inbox;
(* Wait for results *)
let deadline = Unix.gettimeofday () +. 1.0 in
let results = ref [] in
while List.length !results < 2 && Unix.gettimeofday () < deadline do
(match Queue.take_opt outbox with
| Some r -> results := r :: !results
| None -> Thread.delay 0.001)
done;
running := false;
Thread.join thread;
let sorted = List.sort compare !results in
List.iter (Printf.printf "Result: %s\n") sorted;
assert (List.mem "HELLO" sorted);
assert (List.mem "WORLD" sorted);
Printf.printf "Async-style test passed!\n"