๐Ÿฆ€ Functional Rust

343: Cancellation Token

Difficulty: 3 Level: Advanced Signal running tasks to stop gracefully โ€” a shared atomic flag that tasks check cooperatively.

The Problem This Solves

Long-running async tasks need a way to be told "stop now." A web server shutting down needs to cancel in-flight request handlers. A background job needs to stop when the user logs out. A search operation needs to abort when the user types another character. Without a cancellation mechanism, tasks run to completion even when their results are no longer needed โ€” wasting CPU, holding connections, and delaying shutdown. The cancellation token pattern is simple: a shared `AtomicBool` flag. Tasks check `token.is_cancelled()` at safe points in their loop. The controller calls `token.cancel()`, which sets the flag. Tasks see it on their next check and return early. This is cooperative cancellation โ€” tasks must actively check and respect the signal. In async Rust, `tokio_util::CancellationToken` is the production-ready version, integrating with `select!` for zero-cost cancellation points.

The Intuition

Like Python's `threading.Event`:
stop_event = threading.Event()
def worker():
 while not stop_event.is_set():
     do_work()
stop_event.set()  # signal to stop
Or Go's `context.WithCancel`:
ctx, cancel := context.WithCancel(context.Background())
go func() { work(ctx) }()
cancel()  // signal cancellation
Rust's token is the same concept, made explicit with `Arc<AtomicBool>` for cheap cloning across tasks.

How It Works in Rust

#[derive(Clone)]
struct CancellationToken {
 cancelled: Arc<AtomicBool>,
}

impl CancellationToken {
 fn cancel(&self) {
     self.cancelled.store(true, Ordering::Release);
 }
 fn is_cancelled(&self) -> bool {
     self.cancelled.load(Ordering::Acquire)  // Acquire pairs with Release
 }
}

fn long_task(token: CancellationToken, steps: usize) -> Result<String, String> {
 for i in 0..steps {
     if token.is_cancelled() {
         return Err(format!("cancelled at step {i}"));  // early exit
     }
     thread::sleep(Duration::from_millis(10));
     println!("Step {i} complete");
 }
 Ok(format!("completed all {steps} steps"))
}
`Ordering::Release` / `Ordering::Acquire` ensure the cancellation signal is visible across threads without a full memory barrier โ€” more efficient than `SeqCst` for this use case. Token is `Clone` because it wraps `Arc<AtomicBool>` โ€” each clone shares the same flag. Cancelling one cancels all.

What This Unlocks

Key Differences

ConceptOCamlRust
Cancellation signal`ref bool` or `Lwt.cancel``Arc<AtomicBool>` (thread-safe)
Cooperative check`if !cancelled then ...``if token.is_cancelled() { return Err(...) }`
Token sharingPassed as argument`Clone` โ€” each clone shares the same `Arc`
Production crateN/A`tokio_util::CancellationToken` with `select!` support

Versions

DirectoryDescription
`std/`Standard library version using `std::sync`, `std::thread`
`tokio/`Tokio async runtime version using `tokio::sync`, `tokio::spawn`

Running

# Standard library version
cd std && cargo test

# Tokio version
cd tokio && cargo test
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;

#[derive(Clone)]
struct CancellationToken {
    cancelled: Arc<AtomicBool>,
}

impl CancellationToken {
    fn new() -> Self {
        Self { cancelled: Arc::new(AtomicBool::new(false)) }
    }

    fn cancel(&self) {
        self.cancelled.store(true, Ordering::Release);
    }

    fn is_cancelled(&self) -> bool {
        self.cancelled.load(Ordering::Acquire)
    }
}

fn long_task(token: CancellationToken, steps: usize) -> Result<String, String> {
    for i in 0..steps {
        if token.is_cancelled() {
            return Err(format!("cancelled at step {i}"));
        }
        // Do work
        thread::sleep(Duration::from_millis(10));
        println!("Step {i} complete");
    }
    Ok(format!("completed all {steps} steps"))
}

fn cancellable_sum(token: CancellationToken, data: &[i64]) -> Option<i64> {
    let mut sum = 0i64;
    for (i, &x) in data.iter().enumerate() {
        if i % 1000 == 0 && token.is_cancelled() {
            return None;
        }
        sum = sum.saturating_add(x);
    }
    Some(sum)
}

fn main() {
    let token = CancellationToken::new();
    let task_token = token.clone();

    let handle = thread::spawn(move || long_task(task_token, 10));

    thread::sleep(Duration::from_millis(35));
    println!("Cancelling...");
    token.cancel();

    match handle.join().unwrap() {
        Ok(msg) => println!("Success: {msg}"),
        Err(msg) => println!("Cancelled: {msg}"),
    }

    // Non-cancelled task
    let token2 = CancellationToken::new();
    let t2 = token2.clone();
    let h2 = thread::spawn(move || long_task(t2, 3));
    println!("Result: {:?}", h2.join().unwrap());
}

#[cfg(test)]
mod tests {
    use super::*;
    #[test]
    fn token_starts_not_cancelled() {
        let t = CancellationToken::new();
        assert!(!t.is_cancelled());
    }
    #[test]
    fn cancel_sets_flag() {
        let t = CancellationToken::new();
        t.cancel();
        assert!(t.is_cancelled());
    }
    #[test]
    fn task_completes_without_cancel() {
        let t = CancellationToken::new();
        let result = long_task(t, 2);
        assert!(result.is_ok());
    }
    #[test]
    fn task_cancelled_immediately() {
        let t = CancellationToken::new();
        t.cancel();
        assert!(long_task(t, 100).is_err());
    }
}
(* OCaml: cooperative cancellation via flag *)

let cancelled = ref false

let cancel () = cancelled := true

let long_task steps =
  let rec loop i =
    if !cancelled then
      Printf.printf "Task cancelled at step %d\n" i
    else if i >= steps then
      Printf.printf "Task completed all %d steps\n" steps
    else begin
      Printf.printf "Step %d...\n" i;
      Thread.delay 0.01;
      loop (i + 1)
    end
  in loop 0

let () =
  let t = Thread.create (fun () -> long_task 10) () in
  Thread.delay 0.035;
  Printf.printf "Sending cancel signal\n";
  cancel ();
  Thread.join t

๐Ÿ“Š Detailed Comparison

922-cancellation-token โ€” Language Comparison

std vs tokio

Aspectstd versiontokio version
RuntimeOS threads via `std::thread`Async tasks on tokio runtime
Synchronization`std::sync::Mutex`, `Condvar``tokio::sync::Mutex`, channels
Channels`std::sync::mpsc` (unbounded)`tokio::sync::mpsc` (bounded, async)
BlockingThread blocks on lock/recvTask yields, runtime switches tasks
OverheadOne OS thread per taskMany tasks per thread (M:N)
Best forCPU-bound, simple concurrencyI/O-bound, high-concurrency servers