๐Ÿฆ€ Functional Rust

338: Async RwLock

Difficulty: 3 Level: Advanced Multiple concurrent readers, one exclusive writer โ€” the right lock for read-heavy shared state.

The Problem This Solves

A plain `Mutex` allows only one accessor at a time โ€” even if multiple threads just want to read the same data simultaneously. For read-heavy workloads (caches, configuration, shared databases), this creates unnecessary contention. Every read blocks every other read, even though reads are perfectly safe to execute concurrently. `RwLock` (Read-Write Lock) solves this: any number of readers can hold the lock simultaneously, but a writer gets exclusive access โ€” no readers or other writers. This is the right primitive for data that's read often and written rarely: in-memory caches, configuration state, route tables, or โ€” as in this example โ€” a shared key-value store. In async Rust, the same cross-await problem applies as with `Mutex`. `tokio::sync::RwLock` provides `read().await` and `write().await` that suspend the task, not the thread.

The Intuition

Think of a library's book catalog: many people can browse it simultaneously (concurrent reads), but when the librarian updates it (write), everyone has to wait. That's an RwLock. Python's `asyncio` has `asyncio.Lock` but no built-in `RwLock`. JavaScript is single-threaded and doesn't need it. In Rust it's a first-class primitive because the ownership system makes the reader/writer distinction valuable enough to enforce at the type level.

How It Works in Rust

struct SharedDb { data: RwLock<HashMap<String, i32>> }

impl SharedDb {
 // Any number of threads can read() simultaneously
 fn read(&self, k: &str) -> Option<i32> {
     self.data.read().unwrap().get(k).copied()
 }

 // write() blocks until all readers are done, then gives exclusive access
 fn write(&self, k: &str, v: i32) {
     self.data.write().unwrap().insert(k.to_string(), v);
 }

 // update() also takes a write lock โ€” read + modify in one lock acquisition
 fn update(&self, k: &str, f: impl Fn(i32) -> i32) {
     if let Some(v) = self.data.write().unwrap().get_mut(k) {
         *v = f(*v);
     }
 }
}
Concurrent reads don't block each other:
// All 5 threads read simultaneously โ€” no contention
let handles: Vec<_> = (0..5).map(|_| {
 let db = Arc::clone(&db);
 thread::spawn(move || db.read("x"))
}).collect();
For async: `tokio::sync::RwLock` โ€” `db.data.read().await` yields to other tasks while waiting.

What This Unlocks

Key Differences

ConceptOCamlRust
RwLockNo stdlib RwLock; use Mutex or custom`std::sync::RwLock` in stdlib
Read guardN/A`RwLockReadGuard` โ€” many can coexist
Write guardN/A`RwLockWriteGuard` โ€” exclusive
Async version`Lwt_rwlock` (community crate)`tokio::sync::RwLock`
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::thread;

struct SharedDb { data: RwLock<HashMap<String,i32>> }

impl SharedDb {
    fn new() -> Arc<Self> { Arc::new(Self{data:RwLock::new(HashMap::new())}) }
    fn read(&self, k: &str) -> Option<i32> { self.data.read().unwrap().get(k).copied() }
    fn write(&self, k: &str, v: i32) { self.data.write().unwrap().insert(k.to_string(),v); }
    fn update(&self, k: &str, f: impl Fn(i32)->i32) {
        if let Some(v) = self.data.write().unwrap().get_mut(k) { *v = f(*v); }
    }
}

fn main() {
    let db = SharedDb::new();
    db.write("x", 10); db.write("y", 20);
    println!("x={:?}, y={:?}", db.read("x"), db.read("y"));
    db.update("x", |v| v*2);
    println!("x after update: {:?}", db.read("x"));
    // Concurrent reads
    let handles: Vec<_> = (0..5).map(|_| { let db = Arc::clone(&db); thread::spawn(move || db.read("x")) }).collect();
    let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
    println!("All concurrent reads: {results:?}");
}

#[cfg(test)]
mod tests {
    use super::*;
    #[test] fn read_write() { let db = SharedDb::new(); db.write("k",99); assert_eq!(db.read("k"),Some(99)); }
    #[test] fn missing_none() { let db = SharedDb::new(); assert_eq!(db.read("nope"),None); }
    #[test] fn concurrent_reads_ok() {
        let db = SharedDb::new(); db.write("k",7);
        let hs: Vec<_> = (0..10).map(|_| { let db = Arc::clone(&db); thread::spawn(move||db.read("k")) }).collect();
        assert!(hs.into_iter().all(|h| h.join().unwrap()==Some(7)));
    }
}
(* OCaml: RwLock pattern via Mutex *)

let db : (string, int) Hashtbl.t = Hashtbl.create 16
let mu = Mutex.create ()

let read k = Mutex.lock mu; let v = Hashtbl.find_opt db k in Mutex.unlock mu; v
let write k v = Mutex.lock mu; Hashtbl.replace db k v; Mutex.unlock mu

let () =
  write "x" 10; write "y" 20;
  Printf.printf "x=%s\n" (Option.fold ~none:"?" ~some:string_of_int (read "x"))