๐Ÿฆ€ Functional Rust

342: Async File and Network I/O

Difficulty: 3 Level: Advanced Perform I/O without blocking your thread โ€” offload blocking work to threads, use non-blocking sockets for network code.

The Problem This Solves

Standard file I/O in Rust (`std::fs`) is synchronous: your thread sits idle while the kernel copies bytes from disk. In a server or pipeline that handles many concurrent operations, this idle time compounds into latency and wasted throughput. The solution is to either use a full async runtime (Tokio/async-std) or simulate async behavior using threads and channels. Network I/O has the same problem but a different solution: `TcpListener` and `TcpStream` support `set_nonblocking(true)`, which makes `accept()` and `read()` return immediately with `WouldBlock` instead of sleeping. This lets you multiplex many connections on one thread using an event loop or selector. Understanding both approaches โ€” thread-offloaded blocking I/O and non-blocking socket I/O โ€” gives you the mental model for what async runtimes actually do under the hood.

The Intuition

Blocking I/O is like asking a question and then freezing until you get the answer. Non-blocking I/O is like posting a sticky note ("call me when ready") and going off to do other work. Thread offload is the middle ground: you hand the blocking call to another thread so your thread stays free. `mpsc::channel` is the perfect glue: the worker thread sends its result down the channel when done; your thread `recv()`s at exactly the moment it needs the value.

How It Works in Rust

1. `spawn_io_task` โ€” creates an `mpsc::channel`, spawns a thread that runs the closure and sends its result, returns the `Receiver`. 2. Do other work while the task runs โ€” the main thread isn't blocked. 3. `rx.recv()` โ€” block only when you actually need the result. 4. Non-blocking listener โ€” `TcpListener::bind()` then `set_nonblocking(true)`. `accept()` returns `Err(WouldBlock)` immediately when no connection is waiting. 5. Parallel I/O โ€” collect multiple `Receiver`s, then drain them in order.
let rx = spawn_io_task(|| expensive_read(path));
do_other_work();
let result = rx.recv().unwrap(); // only blocks here

What This Unlocks

Key Differences

ConceptOCamlRust
Async file read`Lwt_io.read_file``thread::spawn` + `mpsc::channel`
Non-blocking socket`Unix.set_nonblock``listener.set_nonblocking(true)`
`WouldBlock` signal`Unix.EAGAIN``io::ErrorKind::WouldBlock`
Parallel tasks`Lwt.all` / `Lwt_list.map_p``Vec<Receiver<T>>` drain

Versions

DirectoryDescription
`std/`Standard library version using `std::sync`, `std::thread`
`tokio/`Tokio async runtime version using `tokio::sync`, `tokio::spawn`

Running

# Standard library version
cd std && cargo test

# Tokio version
cd tokio && cargo test
use std::io::{self, BufRead, Write};
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

// Offload blocking I/O to a thread (async-style)
fn read_string_async(content: String) -> impl FnOnce() -> String {
    move || {
        thread::sleep(Duration::from_millis(1)); // simulate I/O latency
        content
    }
}

fn spawn_io_task<T: Send + 'static>(f: impl FnOnce() -> T + Send + 'static)
    -> mpsc::Receiver<T>
{
    let (tx, rx) = mpsc::channel();
    thread::spawn(move || { let _ = tx.send(f()); });
    rx
}

fn process_text(text: &str) -> (usize, usize, usize) {
    let lines = text.lines().count();
    let words = text.split_whitespace().count();
    let chars = text.chars().count();
    (lines, words, chars)
}

fn write_to_buf(buf: &mut Vec<u8>, data: &[u8]) -> io::Result<usize> {
    buf.write(data)
}

fn main() {
    // Simulate async file read
    let content = "Hello world\nFoo bar baz\nOne two three four".to_string();
    let rx = spawn_io_task(read_string_async(content));

    // Do other work while "reading"...
    println!("Doing other work...");

    let text = rx.recv().unwrap();
    let (lines, words, chars) = process_text(&text);
    println!("Lines: {lines}, Words: {words}, Chars: {chars}");

    // Non-blocking socket demo (concept)
    use std::net::TcpListener;
    let listener = TcpListener::bind("127.0.0.1:0").unwrap();
    listener.set_nonblocking(true).unwrap();
    let addr = listener.local_addr().unwrap();
    println!("Listening on {addr} (non-blocking)");

    // accept() returns WouldBlock immediately โ€” no connections waiting
    match listener.accept() {
        Ok(_) => println!("Got connection"),
        Err(e) if e.kind() == io::ErrorKind::WouldBlock => println!("No connections yet (WouldBlock) โœ“"),
        Err(e) => println!("Error: {e}"),
    }

    // Parallel I/O tasks
    let tasks: Vec<_> = vec![
        "first line\nsecond line",
        "alpha beta gamma delta",
        "one\ntwo\nthree\nfour\nfive",
    ]
    .into_iter()
    .map(|s| spawn_io_task({ let s = s.to_string(); move || process_text(&s) }))
    .collect();

    for (i, rx) in tasks.into_iter().enumerate() {
        let (l, w, c) = rx.recv().unwrap();
        println!("Task {i}: lines={l} words={w} chars={c}");
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    #[test]
    fn process_text_counts() {
        let (l, w, c) = process_text("hello world\nfoo");
        assert_eq!(l, 2);
        assert_eq!(w, 3);
        assert_eq!(c, 15);
    }
    #[test]
    fn spawn_io_returns_value() {
        let rx = spawn_io_task(|| 42i32);
        assert_eq!(rx.recv().unwrap(), 42);
    }
    #[test]
    fn nonblocking_listener() {
        let l = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
        l.set_nonblocking(true).unwrap();
        assert!(l.accept().is_err()); // WouldBlock or similar
    }
}
(* OCaml: file and network I/O patterns *)

let read_file_lines filename =
  let ic = open_in filename in
  let rec loop acc =
    match input_line ic with
    | line -> loop (line :: acc)
    | exception End_of_file -> close_in ic; List.rev acc
  in loop []

let count_words text =
  String.split_on_char ' ' text
  |> List.filter (fun s -> String.length s > 0)
  |> List.length

let process_text text =
  let lines = String.split_on_char '\n' text in
  let words = count_words text in
  let chars = String.length text in
  (List.length lines, words, chars)

let () =
  let text = "Hello world\nFoo bar baz\nOne two three four" in
  let (lines, words, chars) = process_text text in
  Printf.printf "Lines: %d, Words: %d, Chars: %d\n" lines words chars

๐Ÿ“Š Detailed Comparison

921-async-io โ€” Language Comparison

std vs tokio

Aspectstd versiontokio version
RuntimeOS threads via `std::thread`Async tasks on tokio runtime
Synchronization`std::sync::Mutex`, `Condvar``tokio::sync::Mutex`, channels
Channels`std::sync::mpsc` (unbounded)`tokio::sync::mpsc` (bounded, async)
BlockingThread blocks on lock/recvTask yields, runtime switches tasks
OverheadOne OS thread per taskMany tasks per thread (M:N)
Best forCPU-bound, simple concurrencyI/O-bound, high-concurrency servers