342: Async File and Network I/O
Difficulty: 3 Level: Advanced Perform I/O without blocking your thread โ offload blocking work to threads, use non-blocking sockets for network code.The Problem This Solves
Standard file I/O in Rust (`std::fs`) is synchronous: your thread sits idle while the kernel copies bytes from disk. In a server or pipeline that handles many concurrent operations, this idle time compounds into latency and wasted throughput. The solution is to either use a full async runtime (Tokio/async-std) or simulate async behavior using threads and channels. Network I/O has the same problem but a different solution: `TcpListener` and `TcpStream` support `set_nonblocking(true)`, which makes `accept()` and `read()` return immediately with `WouldBlock` instead of sleeping. This lets you multiplex many connections on one thread using an event loop or selector. Understanding both approaches โ thread-offloaded blocking I/O and non-blocking socket I/O โ gives you the mental model for what async runtimes actually do under the hood.The Intuition
Blocking I/O is like asking a question and then freezing until you get the answer. Non-blocking I/O is like posting a sticky note ("call me when ready") and going off to do other work. Thread offload is the middle ground: you hand the blocking call to another thread so your thread stays free. `mpsc::channel` is the perfect glue: the worker thread sends its result down the channel when done; your thread `recv()`s at exactly the moment it needs the value.How It Works in Rust
1. `spawn_io_task` โ creates an `mpsc::channel`, spawns a thread that runs the closure and sends its result, returns the `Receiver`. 2. Do other work while the task runs โ the main thread isn't blocked. 3. `rx.recv()` โ block only when you actually need the result. 4. Non-blocking listener โ `TcpListener::bind()` then `set_nonblocking(true)`. `accept()` returns `Err(WouldBlock)` immediately when no connection is waiting. 5. Parallel I/O โ collect multiple `Receiver`s, then drain them in order.let rx = spawn_io_task(|| expensive_read(path));
do_other_work();
let result = rx.recv().unwrap(); // only blocks here
What This Unlocks
- Async-style concurrency without a runtime โ pure `std::thread` + `mpsc` handles many real-world cases.
- Non-blocking network primitives โ understanding `WouldBlock` demystifies how `epoll`/`kqueue`-based runtimes work.
- Parallel pipelines โ fan out N independent I/O tasks, collect results in order.
Key Differences
| Concept | OCaml | Rust |
|---|---|---|
| Async file read | `Lwt_io.read_file` | `thread::spawn` + `mpsc::channel` |
| Non-blocking socket | `Unix.set_nonblock` | `listener.set_nonblocking(true)` |
| `WouldBlock` signal | `Unix.EAGAIN` | `io::ErrorKind::WouldBlock` |
| Parallel tasks | `Lwt.all` / `Lwt_list.map_p` | `Vec<Receiver<T>>` drain |
Versions
| Directory | Description |
|---|---|
| `std/` | Standard library version using `std::sync`, `std::thread` |
| `tokio/` | Tokio async runtime version using `tokio::sync`, `tokio::spawn` |
Running
# Standard library version
cd std && cargo test
# Tokio version
cd tokio && cargo test
use std::io::{self, BufRead, Write};
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
// Offload blocking I/O to a thread (async-style)
fn read_string_async(content: String) -> impl FnOnce() -> String {
move || {
thread::sleep(Duration::from_millis(1)); // simulate I/O latency
content
}
}
fn spawn_io_task<T: Send + 'static>(f: impl FnOnce() -> T + Send + 'static)
-> mpsc::Receiver<T>
{
let (tx, rx) = mpsc::channel();
thread::spawn(move || { let _ = tx.send(f()); });
rx
}
fn process_text(text: &str) -> (usize, usize, usize) {
let lines = text.lines().count();
let words = text.split_whitespace().count();
let chars = text.chars().count();
(lines, words, chars)
}
fn write_to_buf(buf: &mut Vec<u8>, data: &[u8]) -> io::Result<usize> {
buf.write(data)
}
fn main() {
// Simulate async file read
let content = "Hello world\nFoo bar baz\nOne two three four".to_string();
let rx = spawn_io_task(read_string_async(content));
// Do other work while "reading"...
println!("Doing other work...");
let text = rx.recv().unwrap();
let (lines, words, chars) = process_text(&text);
println!("Lines: {lines}, Words: {words}, Chars: {chars}");
// Non-blocking socket demo (concept)
use std::net::TcpListener;
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
listener.set_nonblocking(true).unwrap();
let addr = listener.local_addr().unwrap();
println!("Listening on {addr} (non-blocking)");
// accept() returns WouldBlock immediately โ no connections waiting
match listener.accept() {
Ok(_) => println!("Got connection"),
Err(e) if e.kind() == io::ErrorKind::WouldBlock => println!("No connections yet (WouldBlock) โ"),
Err(e) => println!("Error: {e}"),
}
// Parallel I/O tasks
let tasks: Vec<_> = vec![
"first line\nsecond line",
"alpha beta gamma delta",
"one\ntwo\nthree\nfour\nfive",
]
.into_iter()
.map(|s| spawn_io_task({ let s = s.to_string(); move || process_text(&s) }))
.collect();
for (i, rx) in tasks.into_iter().enumerate() {
let (l, w, c) = rx.recv().unwrap();
println!("Task {i}: lines={l} words={w} chars={c}");
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn process_text_counts() {
let (l, w, c) = process_text("hello world\nfoo");
assert_eq!(l, 2);
assert_eq!(w, 3);
assert_eq!(c, 15);
}
#[test]
fn spawn_io_returns_value() {
let rx = spawn_io_task(|| 42i32);
assert_eq!(rx.recv().unwrap(), 42);
}
#[test]
fn nonblocking_listener() {
let l = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
l.set_nonblocking(true).unwrap();
assert!(l.accept().is_err()); // WouldBlock or similar
}
}
(* OCaml: file and network I/O patterns *)
let read_file_lines filename =
let ic = open_in filename in
let rec loop acc =
match input_line ic with
| line -> loop (line :: acc)
| exception End_of_file -> close_in ic; List.rev acc
in loop []
let count_words text =
String.split_on_char ' ' text
|> List.filter (fun s -> String.length s > 0)
|> List.length
let process_text text =
let lines = String.split_on_char '\n' text in
let words = count_words text in
let chars = String.length text in
(List.length lines, words, chars)
let () =
let text = "Hello world\nFoo bar baz\nOne two three four" in
let (lines, words, chars) = process_text text in
Printf.printf "Lines: %d, Words: %d, Chars: %d\n" lines words chars
๐ Detailed Comparison
921-async-io โ Language Comparison
std vs tokio
| Aspect | std version | tokio version |
|---|---|---|
| Runtime | OS threads via `std::thread` | Async tasks on tokio runtime |
| Synchronization | `std::sync::Mutex`, `Condvar` | `tokio::sync::Mutex`, channels |
| Channels | `std::sync::mpsc` (unbounded) | `tokio::sync::mpsc` (bounded, async) |
| Blocking | Thread blocks on lock/recv | Task yields, runtime switches tasks |
| Overhead | One OS thread per task | Many tasks per thread (M:N) |
| Best for | CPU-bound, simple concurrency | I/O-bound, high-concurrency servers |