468: Lock-Free Queue
Difficulty: 3 Level: Intermediate Enqueue and dequeue without mutexes โ using compare-and-swap loops on atomic pointers.The Problem This Solves
A `Mutex<VecDeque>` queue serialises every enqueue and dequeue. Under high throughput โ millions of messages per second โ threads spend more time waiting for the lock than doing real work. The kernel context-switch to park a thread costs microseconds; lock-free queues avoid that entirely. A lock-free queue lets multiple threads enqueue and dequeue concurrently using CPU atomic instructions (CAS โ compare-and-swap). If two threads race, one wins and the other retries. No thread ever blocks. In the uncontended case (the common case) the fast path is a handful of atomic operations. This is the pattern behind `std::sync::mpsc` and the core data path in async runtimes.The Intuition
Two people sharing a whiteboard. Instead of taking turns with a lock on the whiteboard, each one writes their message in a specific spot and uses a sticky note to say "next message is over there." If two people try to update the sticky note simultaneously, only one succeeds โ the other looks again and tries a different spot. Nobody waits; they just retry.How It Works in Rust
The Michael-Scott queue is the classic algorithm: 1. Node structure โ each node holds a value and an `AtomicPtr` to the next node. 2. Head and tail โ the queue maintains `AtomicPtr<Node>` for both. Head is a sentinel (dummy) node. 3. Enqueue โ allocate a new node, then CAS the current tail's `next` pointer from null to the new node. Swing the tail pointer forward.loop {
let tail = self.tail.load(Acquire);
let next = (*tail).next.load(Acquire);
if next.is_null() {
if (*tail).next.compare_exchange(null_mut(), node, Release, Relaxed).is_ok() {
self.tail.compare_exchange(tail, node, Release, Relaxed).ok();
return;
}
} else {
// help advance the tail
self.tail.compare_exchange(tail, next, Release, Relaxed).ok();
}
}
4. Dequeue โ CAS the head sentinel forward; the old sentinel's value becomes the returned item.
5. Memory safety โ use `crossbeam-epoch` to defer-free unlinked nodes safely (see example 467).
What This Unlocks
- High-throughput messaging โ uncontended paths hit L1 cache; no system calls, no context switches.
- Progress guarantees โ lock-free means at least one thread always makes forward progress even if others stall.
- Foundation for channels โ production MPSC queues (like `tokio::sync::mpsc` internals) build on this.
Key Differences
| Concept | OCaml | Rust |
|---|---|---|
| Lock-free queue | Not available in stdlib | `AtomicPtr` + CAS loop |
| Memory reclamation | GC | Epoch-based (`crossbeam-epoch`) |
| Ordering guarantees | Sequential consistency (GC barrier) | Explicit `Acquire`/`Release` |
| Practical crate | โ | `crossbeam-queue::SegQueue` |
// 468. Lock-free queue basics (Michael-Scott, simplified)
use std::sync::atomic::{AtomicPtr, Ordering};
use std::sync::Arc;
use std::thread;
use std::ptr;
struct Node<T> { value: Option<T>, next: AtomicPtr<Node<T>> }
impl<T> Node<T> { fn new(v: Option<T>) -> *mut Self { Box::into_raw(Box::new(Node{value:v,next:AtomicPtr::new(ptr::null_mut())})) } }
pub struct Queue<T> { head: AtomicPtr<Node<T>>, tail: AtomicPtr<Node<T>> }
unsafe impl<T:Send> Send for Queue<T> {}
unsafe impl<T:Send> Sync for Queue<T> {}
impl<T> Queue<T> {
pub fn new() -> Self { let d=Node::new(None); Queue{head:AtomicPtr::new(d),tail:AtomicPtr::new(d)} }
pub fn enqueue(&self, v: T) {
let n = Node::new(Some(v));
loop {
let t = self.tail.load(Ordering::Acquire);
let next = unsafe{(*t).next.load(Ordering::Acquire)};
if next.is_null() {
match unsafe{(*t).next.compare_exchange_weak(ptr::null_mut(),n,Ordering::Release,Ordering::Relaxed)} {
Ok(_) => { let _=self.tail.compare_exchange(t,n,Ordering::Release,Ordering::Relaxed); return; }
Err(_) => {}
}
} else { let _=self.tail.compare_exchange(t,next,Ordering::Release,Ordering::Relaxed); }
}
}
pub fn dequeue(&self) -> Option<T> {
loop {
let h=self.head.load(Ordering::Acquire);
let t=self.tail.load(Ordering::Acquire);
let next=unsafe{(*h).next.load(Ordering::Acquire)};
if h==t { if next.is_null() { return None; } let _=self.tail.compare_exchange(t,next,Ordering::Release,Ordering::Relaxed); }
else {
match self.head.compare_exchange_weak(h,next,Ordering::AcqRel,Ordering::Relaxed) {
Ok(_) => { let v=unsafe{ptr::read(&(*next).value)}; unsafe{drop(Box::from_raw(h))}; return v; }
Err(_) => {}
}
}
}
}
}
impl<T> Drop for Queue<T> { fn drop(&mut self) { while self.dequeue().is_some() {} unsafe{drop(Box::from_raw(self.head.load(Ordering::Relaxed)));} } }
fn main() {
let q=Arc::new(Queue::new());
let hs:Vec<_>=(0..4).map(|id|{let q=Arc::clone(&q); thread::spawn(move || { for i in 0..10u32 { q.enqueue(id*10+i); } })}).collect();
for h in hs { h.join().unwrap(); }
let mut n=0; while q.dequeue().is_some(){n+=1;} println!("dequeued {} (expected 40)",n);
}
#[cfg(test)]
mod tests {
use super::*;
#[test] fn test_fifo() { let q=Queue::new(); for i in 1..=5u32 { q.enqueue(i); } for i in 1..=5 { assert_eq!(q.dequeue(),Some(i)); } assert_eq!(q.dequeue(),None); }
#[test] fn test_concurrent() {
let q=Arc::new(Queue::<u32>::new());
thread::scope(|s|{ for i in 0..4u32 { let q=Arc::clone(&q); s.spawn(move || { for j in 0..25 { q.enqueue(i*25+j); } }); } });
let mut c=0; while q.dequeue().is_some(){c+=1;} assert_eq!(c,100);
}
}
(* 468. Lock-free queue concept โ OCaml with Mutex *)
type 'a node = { v: 'a option; mutable next: 'a node option }
type 'a q = { mutable head: 'a node; mutable tail: 'a node; m: Mutex.t }
let create () =
let dummy={v=None;next=None} in
{head=dummy;tail=dummy;m=Mutex.create ()}
let enqueue q v =
let n={v=Some v;next=None} in
Mutex.lock q.m; q.tail.next<-Some n; q.tail<-n; Mutex.unlock q.m
let dequeue q =
Mutex.lock q.m;
let r = match q.head.next with
| None -> None
| Some n -> q.head<-n; n.v
in Mutex.unlock q.m; r
let () =
let q=create () in
List.iter (enqueue q) [1;2;3;4;5];
let rec drain () = match dequeue q with None->() | Some v->Printf.printf "%d " v; drain ()
in drain (); print_newline ()