๐Ÿฆ€ Functional Rust

468: Lock-Free Queue

Difficulty: 3 Level: Intermediate Enqueue and dequeue without mutexes โ€” using compare-and-swap loops on atomic pointers.

The Problem This Solves

A `Mutex<VecDeque>` queue serialises every enqueue and dequeue. Under high throughput โ€” millions of messages per second โ€” threads spend more time waiting for the lock than doing real work. The kernel context-switch to park a thread costs microseconds; lock-free queues avoid that entirely. A lock-free queue lets multiple threads enqueue and dequeue concurrently using CPU atomic instructions (CAS โ€” compare-and-swap). If two threads race, one wins and the other retries. No thread ever blocks. In the uncontended case (the common case) the fast path is a handful of atomic operations. This is the pattern behind `std::sync::mpsc` and the core data path in async runtimes.

The Intuition

Two people sharing a whiteboard. Instead of taking turns with a lock on the whiteboard, each one writes their message in a specific spot and uses a sticky note to say "next message is over there." If two people try to update the sticky note simultaneously, only one succeeds โ€” the other looks again and tries a different spot. Nobody waits; they just retry.

How It Works in Rust

The Michael-Scott queue is the classic algorithm: 1. Node structure โ€” each node holds a value and an `AtomicPtr` to the next node. 2. Head and tail โ€” the queue maintains `AtomicPtr<Node>` for both. Head is a sentinel (dummy) node. 3. Enqueue โ€” allocate a new node, then CAS the current tail's `next` pointer from null to the new node. Swing the tail pointer forward.
loop {
    let tail = self.tail.load(Acquire);
    let next = (*tail).next.load(Acquire);
    if next.is_null() {
        if (*tail).next.compare_exchange(null_mut(), node, Release, Relaxed).is_ok() {
            self.tail.compare_exchange(tail, node, Release, Relaxed).ok();
            return;
        }
    } else {
        // help advance the tail
        self.tail.compare_exchange(tail, next, Release, Relaxed).ok();
    }
}
4. Dequeue โ€” CAS the head sentinel forward; the old sentinel's value becomes the returned item. 5. Memory safety โ€” use `crossbeam-epoch` to defer-free unlinked nodes safely (see example 467).

What This Unlocks

Key Differences

ConceptOCamlRust
Lock-free queueNot available in stdlib`AtomicPtr` + CAS loop
Memory reclamationGCEpoch-based (`crossbeam-epoch`)
Ordering guaranteesSequential consistency (GC barrier)Explicit `Acquire`/`Release`
Practical crateโ€”`crossbeam-queue::SegQueue`
// 468. Lock-free queue basics (Michael-Scott, simplified)
use std::sync::atomic::{AtomicPtr, Ordering};
use std::sync::Arc;
use std::thread;
use std::ptr;

struct Node<T> { value: Option<T>, next: AtomicPtr<Node<T>> }
impl<T> Node<T> { fn new(v: Option<T>) -> *mut Self { Box::into_raw(Box::new(Node{value:v,next:AtomicPtr::new(ptr::null_mut())})) } }

pub struct Queue<T> { head: AtomicPtr<Node<T>>, tail: AtomicPtr<Node<T>> }
unsafe impl<T:Send> Send for Queue<T> {}
unsafe impl<T:Send> Sync for Queue<T> {}

impl<T> Queue<T> {
    pub fn new() -> Self { let d=Node::new(None); Queue{head:AtomicPtr::new(d),tail:AtomicPtr::new(d)} }
    pub fn enqueue(&self, v: T) {
        let n = Node::new(Some(v));
        loop {
            let t = self.tail.load(Ordering::Acquire);
            let next = unsafe{(*t).next.load(Ordering::Acquire)};
            if next.is_null() {
                match unsafe{(*t).next.compare_exchange_weak(ptr::null_mut(),n,Ordering::Release,Ordering::Relaxed)} {
                    Ok(_) => { let _=self.tail.compare_exchange(t,n,Ordering::Release,Ordering::Relaxed); return; }
                    Err(_) => {}
                }
            } else { let _=self.tail.compare_exchange(t,next,Ordering::Release,Ordering::Relaxed); }
        }
    }
    pub fn dequeue(&self) -> Option<T> {
        loop {
            let h=self.head.load(Ordering::Acquire);
            let t=self.tail.load(Ordering::Acquire);
            let next=unsafe{(*h).next.load(Ordering::Acquire)};
            if h==t { if next.is_null() { return None; } let _=self.tail.compare_exchange(t,next,Ordering::Release,Ordering::Relaxed); }
            else {
                match self.head.compare_exchange_weak(h,next,Ordering::AcqRel,Ordering::Relaxed) {
                    Ok(_) => { let v=unsafe{ptr::read(&(*next).value)}; unsafe{drop(Box::from_raw(h))}; return v; }
                    Err(_) => {}
                }
            }
        }
    }
}
impl<T> Drop for Queue<T> { fn drop(&mut self) { while self.dequeue().is_some() {} unsafe{drop(Box::from_raw(self.head.load(Ordering::Relaxed)));} } }

fn main() {
    let q=Arc::new(Queue::new());
    let hs:Vec<_>=(0..4).map(|id|{let q=Arc::clone(&q); thread::spawn(move || { for i in 0..10u32 { q.enqueue(id*10+i); } })}).collect();
    for h in hs { h.join().unwrap(); }
    let mut n=0; while q.dequeue().is_some(){n+=1;} println!("dequeued {} (expected 40)",n);
}

#[cfg(test)]
mod tests {
    use super::*;
    #[test] fn test_fifo() { let q=Queue::new(); for i in 1..=5u32 { q.enqueue(i); } for i in 1..=5 { assert_eq!(q.dequeue(),Some(i)); } assert_eq!(q.dequeue(),None); }
    #[test] fn test_concurrent() {
        let q=Arc::new(Queue::<u32>::new());
        thread::scope(|s|{ for i in 0..4u32 { let q=Arc::clone(&q); s.spawn(move || { for j in 0..25 { q.enqueue(i*25+j); } }); } });
        let mut c=0; while q.dequeue().is_some(){c+=1;} assert_eq!(c,100);
    }
}
(* 468. Lock-free queue concept โ€“ OCaml with Mutex *)
type 'a node = { v: 'a option; mutable next: 'a node option }
type 'a q = { mutable head: 'a node; mutable tail: 'a node; m: Mutex.t }

let create () =
  let dummy={v=None;next=None} in
  {head=dummy;tail=dummy;m=Mutex.create ()}

let enqueue q v =
  let n={v=Some v;next=None} in
  Mutex.lock q.m; q.tail.next<-Some n; q.tail<-n; Mutex.unlock q.m

let dequeue q =
  Mutex.lock q.m;
  let r = match q.head.next with
    | None -> None
    | Some n -> q.head<-n; n.v
  in Mutex.unlock q.m; r

let () =
  let q=create () in
  List.iter (enqueue q) [1;2;3;4;5];
  let rec drain () = match dequeue q with None->() | Some v->Printf.printf "%d " v; drain ()
  in drain (); print_newline ()