343: Cancellation Token
Difficulty: 3 Level: Advanced Signal running tasks to stop gracefully โ a shared atomic flag that tasks check cooperatively.The Problem This Solves
Long-running async tasks need a way to be told "stop now." A web server shutting down needs to cancel in-flight request handlers. A background job needs to stop when the user logs out. A search operation needs to abort when the user types another character. Without a cancellation mechanism, tasks run to completion even when their results are no longer needed โ wasting CPU, holding connections, and delaying shutdown. The cancellation token pattern is simple: a shared `AtomicBool` flag. Tasks check `token.is_cancelled()` at safe points in their loop. The controller calls `token.cancel()`, which sets the flag. Tasks see it on their next check and return early. This is cooperative cancellation โ tasks must actively check and respect the signal. In async Rust, `tokio_util::CancellationToken` is the production-ready version, integrating with `select!` for zero-cost cancellation points.The Intuition
Like Python's `threading.Event`:stop_event = threading.Event()
def worker():
while not stop_event.is_set():
do_work()
stop_event.set() # signal to stop
Or Go's `context.WithCancel`:
ctx, cancel := context.WithCancel(context.Background())
go func() { work(ctx) }()
cancel() // signal cancellation
Rust's token is the same concept, made explicit with `Arc<AtomicBool>` for cheap cloning across tasks.
How It Works in Rust
#[derive(Clone)]
struct CancellationToken {
cancelled: Arc<AtomicBool>,
}
impl CancellationToken {
fn cancel(&self) {
self.cancelled.store(true, Ordering::Release);
}
fn is_cancelled(&self) -> bool {
self.cancelled.load(Ordering::Acquire) // Acquire pairs with Release
}
}
fn long_task(token: CancellationToken, steps: usize) -> Result<String, String> {
for i in 0..steps {
if token.is_cancelled() {
return Err(format!("cancelled at step {i}")); // early exit
}
thread::sleep(Duration::from_millis(10));
println!("Step {i} complete");
}
Ok(format!("completed all {steps} steps"))
}
`Ordering::Release` / `Ordering::Acquire` ensure the cancellation signal is visible across threads without a full memory barrier โ more efficient than `SeqCst` for this use case.
Token is `Clone` because it wraps `Arc<AtomicBool>` โ each clone shares the same flag. Cancelling one cancels all.
What This Unlocks
- Graceful shutdown โ cancel all background tasks on `SIGTERM`; each checks the token and cleans up.
- Request scoping โ cancel all work spawned by a request when the client disconnects.
- Timeout composition โ pair with `tokio::time::timeout` or check the token inside a `select!` for deadline-based cancellation.
Key Differences
| Concept | OCaml | Rust |
|---|---|---|
| Cancellation signal | `ref bool` or `Lwt.cancel` | `Arc<AtomicBool>` (thread-safe) |
| Cooperative check | `if !cancelled then ...` | `if token.is_cancelled() { return Err(...) }` |
| Token sharing | Passed as argument | `Clone` โ each clone shares the same `Arc` |
| Production crate | N/A | `tokio_util::CancellationToken` with `select!` support |
Versions
| Directory | Description |
|---|---|
| `std/` | Standard library version using `std::sync`, `std::thread` |
| `tokio/` | Tokio async runtime version using `tokio::sync`, `tokio::spawn` |
Running
# Standard library version
cd std && cargo test
# Tokio version
cd tokio && cargo test
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
#[derive(Clone)]
struct CancellationToken {
cancelled: Arc<AtomicBool>,
}
impl CancellationToken {
fn new() -> Self {
Self { cancelled: Arc::new(AtomicBool::new(false)) }
}
fn cancel(&self) {
self.cancelled.store(true, Ordering::Release);
}
fn is_cancelled(&self) -> bool {
self.cancelled.load(Ordering::Acquire)
}
}
fn long_task(token: CancellationToken, steps: usize) -> Result<String, String> {
for i in 0..steps {
if token.is_cancelled() {
return Err(format!("cancelled at step {i}"));
}
// Do work
thread::sleep(Duration::from_millis(10));
println!("Step {i} complete");
}
Ok(format!("completed all {steps} steps"))
}
fn cancellable_sum(token: CancellationToken, data: &[i64]) -> Option<i64> {
let mut sum = 0i64;
for (i, &x) in data.iter().enumerate() {
if i % 1000 == 0 && token.is_cancelled() {
return None;
}
sum = sum.saturating_add(x);
}
Some(sum)
}
fn main() {
let token = CancellationToken::new();
let task_token = token.clone();
let handle = thread::spawn(move || long_task(task_token, 10));
thread::sleep(Duration::from_millis(35));
println!("Cancelling...");
token.cancel();
match handle.join().unwrap() {
Ok(msg) => println!("Success: {msg}"),
Err(msg) => println!("Cancelled: {msg}"),
}
// Non-cancelled task
let token2 = CancellationToken::new();
let t2 = token2.clone();
let h2 = thread::spawn(move || long_task(t2, 3));
println!("Result: {:?}", h2.join().unwrap());
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn token_starts_not_cancelled() {
let t = CancellationToken::new();
assert!(!t.is_cancelled());
}
#[test]
fn cancel_sets_flag() {
let t = CancellationToken::new();
t.cancel();
assert!(t.is_cancelled());
}
#[test]
fn task_completes_without_cancel() {
let t = CancellationToken::new();
let result = long_task(t, 2);
assert!(result.is_ok());
}
#[test]
fn task_cancelled_immediately() {
let t = CancellationToken::new();
t.cancel();
assert!(long_task(t, 100).is_err());
}
}
(* OCaml: cooperative cancellation via flag *)
let cancelled = ref false
let cancel () = cancelled := true
let long_task steps =
let rec loop i =
if !cancelled then
Printf.printf "Task cancelled at step %d\n" i
else if i >= steps then
Printf.printf "Task completed all %d steps\n" steps
else begin
Printf.printf "Step %d...\n" i;
Thread.delay 0.01;
loop (i + 1)
end
in loop 0
let () =
let t = Thread.create (fun () -> long_task 10) () in
Thread.delay 0.035;
Printf.printf "Sending cancel signal\n";
cancel ();
Thread.join t
๐ Detailed Comparison
922-cancellation-token โ Language Comparison
std vs tokio
| Aspect | std version | tokio version |
|---|---|---|
| Runtime | OS threads via `std::thread` | Async tasks on tokio runtime |
| Synchronization | `std::sync::Mutex`, `Condvar` | `tokio::sync::Mutex`, channels |
| Channels | `std::sync::mpsc` (unbounded) | `tokio::sync::mpsc` (bounded, async) |
| Blocking | Thread blocks on lock/recv | Task yields, runtime switches tasks |
| Overhead | One OS thread per task | Many tasks per thread (M:N) |
| Best for | CPU-bound, simple concurrency | I/O-bound, high-concurrency servers |