🦀 Functional Rust

455: Lock-Free Stack — Concurrent Push/Pop Without Mutex

Difficulty: 3 Level: Intermediate Build a stack where multiple threads push and pop concurrently, using only atomic pointer operations — no mutex, no blocking.

The Problem This Solves

A `Mutex<Vec<T>>` stack is correct but serialises every push and pop: only one thread operates at a time. Under high contention — many threads pushing and popping rapidly — this becomes a bottleneck. Threads spend most of their time waiting for the lock, not doing work. A lock-free stack replaces the mutex with a CAS loop on the head pointer. Push: create a node, point it at the current head, CAS the head to your node. Pop: read the head, CAS the head to head.next. If the CAS fails (another thread changed the head), retry. Progress is guaranteed: every failure means another thread succeeded, so the system as a whole makes forward progress even if individual threads retry. This example also demonstrates Rust's approach to unsafe code. The lock-free stack requires raw pointers — operations that the borrow checker cannot statically verify safe. Rust doesn't forbid this; it requires you to put unsafe code in an `unsafe` block, document the invariants you're maintaining, and verify them yourself. The `unsafe` keyword is a signal: "I, the programmer, am responsible for proving this correct."

The Intuition

The stack head is an `AtomicPtr<Node<T>>`. Push atomically swings the head to a new node; pop atomically swings it to the current head's next. Each operation is a load-then-CAS loop. Multiple threads can execute their loops simultaneously; at most one succeeds per round and the rest retry with fresh data. In Java: `java.util.concurrent.ConcurrentLinkedQueue` uses the same pattern internally. In Go: you'd build a similar structure with `atomic.Pointer[Node]`. In Rust, you have to go `unsafe` because raw pointer dereferences require it — but the `unsafe` is isolated and the public API is safe.

How It Works in Rust

use std::sync::atomic::{AtomicPtr, Ordering};
use std::ptr;

struct Node<T> { value: T, next: *mut Node<T> }

pub struct Stack<T> { head: AtomicPtr<Node<T>> }

// Safety: we manage ownership via Box; T: Send ensures T is thread-safe
unsafe impl<T: Send> Send for Stack<T> {}
unsafe impl<T: Send> Sync for Stack<T> {}

impl<T> Stack<T> {
 pub fn push(&self, v: T) {
     // Allocate node on heap, leak it to a raw pointer
     let n = Box::into_raw(Box::new(Node { value: v, next: ptr::null_mut() }));
     loop {
         let h = self.head.load(Ordering::Relaxed);
         unsafe { (*n).next = h; } // point new node at current head
         // CAS: if head is still h, replace with n (our new node)
         match self.head.compare_exchange_weak(h, n, Ordering::Release, Ordering::Relaxed) {
             Ok(_) => break,  // published
             Err(_) => {}     // retry — head changed
         }
     }
 }

 pub fn pop(&self) -> Option<T> {
     loop {
         let h = self.head.load(Ordering::Acquire);
         if h.is_null() { return None; }
         let next = unsafe { (*h).next };
         // CAS: if head is still h, replace with h.next
         match self.head.compare_exchange_weak(h, next, Ordering::AcqRel, Ordering::Relaxed) {
             Ok(_) => {
                 // We own h now — no other thread can reach it
                 let v = unsafe { ptr::read(&(*h).value) };
                 unsafe { drop(Box::from_raw(h)); }  // reclaim memory
                 return Some(v);
             }
             Err(_) => {} // retry
         }
     }
 }
}
Important caveat: This implementation has the ABA problem. If thread A reads head = P, thread B pops P and pushes a new node that happens to get allocated at address P, thread A's CAS will succeed but now `P.next` points somewhere unexpected. Safe production use requires epoch-based memory reclamation (the `crossbeam-epoch` crate). This example is for educational clarity, not production use.

What This Unlocks

Key Differences

ConceptOCamlRust
Head pointer`Atomic.make head_node` (OCaml 5)`AtomicPtr<Node<T>>`
Allocate nodeGC-managed`Box::into_raw(Box::new(...))` — explicit ownership
PushCAS loop to prepend`compare_exchange_weak` on head, `Ordering::Release`
PopCAS loop to remove head`compare_exchange_weak`, then `Box::from_raw` to reclaim
Memory reclamationGCmanual (ABA risk) or epoch-based (`crossbeam-epoch`)
ABA problemGC prevents itpresent in this impl — use epoch reclamation in production
// 455. Lock-free stack with atomics
use std::sync::atomic::{AtomicPtr, Ordering};
use std::sync::Arc;
use std::thread;
use std::ptr;

struct Node<T> { value: T, next: *mut Node<T> }
pub struct Stack<T> { head: AtomicPtr<Node<T>> }
unsafe impl<T:Send> Send for Stack<T> {}
unsafe impl<T:Send> Sync for Stack<T> {}

impl<T> Stack<T> {
    pub fn new() -> Self { Stack { head: AtomicPtr::new(ptr::null_mut()) } }
    pub fn push(&self, v: T) {
        let n = Box::into_raw(Box::new(Node { value:v, next:ptr::null_mut() }));
        loop {
            let h = self.head.load(Ordering::Relaxed);
            unsafe { (*n).next = h; }
            match self.head.compare_exchange_weak(h, n, Ordering::Release, Ordering::Relaxed) {
                Ok(_) => break, Err(_) => {}
            }
        }
    }
    pub fn pop(&self) -> Option<T> {
        loop {
            let h = self.head.load(Ordering::Acquire);
            if h.is_null() { return None; }
            let next = unsafe { (*h).next };
            match self.head.compare_exchange_weak(h, next, Ordering::AcqRel, Ordering::Relaxed) {
                Ok(_) => { let v = unsafe { ptr::read(&(*h).value) }; unsafe { drop(Box::from_raw(h)); } return Some(v); }
                Err(_) => {}
            }
        }
    }
}
impl<T> Drop for Stack<T> { fn drop(&mut self) { while self.pop().is_some() {} } }

fn main() {
    let s = Arc::new(Stack::new());
    let hs: Vec<_> = (0..4).map(|id|{let s=Arc::clone(&s); thread::spawn(move || { for i in 0..10u32 { s.push(id*10+i); } })}).collect();
    for h in hs { h.join().unwrap(); }
    let mut n=0; while s.pop().is_some() { n+=1; }
    println!("popped {} (expected 40)", n);
}

#[cfg(test)]
mod tests {
    use super::*;
    #[test] fn test_lifo()  { let s=Stack::new(); s.push(1); s.push(2); s.push(3); assert_eq!(s.pop(),Some(3)); assert_eq!(s.pop(),Some(2)); assert_eq!(s.pop(),None.or(Some(1))); assert_eq!(s.pop(),None); }
    #[test] fn test_concurrent() {
        let s=Arc::new(Stack::<u32>::new());
        let hs:Vec<_>=(0..4).map(|_|{let s=Arc::clone(&s); thread::spawn(move || { for i in 0..100 { s.push(i); } })}).collect();
        for h in hs { h.join().unwrap(); }
        let mut c=0; while s.pop().is_some() { c+=1; } assert_eq!(c,400);
    }
}
(* 455. Lock-free stack – OCaml with Mutex (stdlib) *)
type 'a t = { mutable head: 'a list; m: Mutex.t }
let create () = { head=[]; m=Mutex.create () }
let push s v = Mutex.lock s.m; s.head <- v::s.head; Mutex.unlock s.m
let pop  s   = Mutex.lock s.m; let r = match s.head with []->None|x::t->s.head<-t;Some x in Mutex.unlock s.m; r
let () =
  let s = create () in
  let ps = List.init 4 (fun id ->
    Thread.create (fun () -> for i=0 to 9 do push s (id*10+i) done) ()
  ) in
  List.iter Thread.join ps;
  let n = ref 0 in while Option.is_some (pop s) do incr n done;
  Printf.printf "popped %d\n" !n