922-cancellation-token — Cancellation Token
Tutorial
The Problem
Long-running operations — large file downloads, iterative computations, background scans — must be stoppable. Stopping a thread forcefully from outside is unsafe in any language (leaked resources, inconsistent state). The safe pattern is cooperative cancellation: the running task periodically checks a cancellation flag and exits cleanly when set. This is the CancellationToken pattern from .NET, context.Context from Go, and AbortController from JavaScript. Rust's std::sync::atomic::AtomicBool provides the thread-safe flag needed for this pattern without heap allocation overhead.
🎯 Learning Outcomes
CancellationToken using Arc<AtomicBool> for shared cancellation stateOrdering::Release and Ordering::Acquire for correct cross-thread visibilityLwt_switch and Fiber.with_cancellationCode Example
//! Cooperative cancellation via a shared atomic flag.
//!
//! A [`CancellationToken`] lets one thread signal another to stop work.
//! The worker periodically checks the token and exits early when
//! cancellation has been requested.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
/// A clonable, thread-safe cooperative cancellation flag.
#[derive(Debug, Clone, Default)]
pub struct CancellationToken {
flag: Arc<AtomicBool>,
}
impl CancellationToken {
/// Creates a new token in the non-cancelled state.
pub fn new() -> Self {
Self {
flag: Arc::new(AtomicBool::new(false)),
}
}
/// Signals cancellation. Idempotent.
pub fn cancel(&self) {
self.flag.store(true, Ordering::SeqCst);
}
/// Returns `true` if cancellation has been requested.
pub fn is_cancelled(&self) -> bool {
self.flag.load(Ordering::SeqCst)
}
}
/// Outcome of a cancellable task run.
#[derive(Debug, PartialEq, Eq)]
pub enum TaskOutcome {
/// Task finished all requested steps.
Completed(usize),
/// Task was cancelled at the given step index.
Cancelled(usize),
}
/// Runs a cooperative long-running task that checks `token` between steps.
///
/// `on_step` is invoked for each step that actually runs. Between steps the
/// thread sleeps for `step_delay`, mirroring the OCaml `Thread.delay 0.01`.
pub fn long_task<F>(
steps: usize,
token: &CancellationToken,
step_delay: Duration,
mut on_step: F,
) -> TaskOutcome
where
F: FnMut(usize),
{
for i in 0..steps {
if token.is_cancelled() {
return TaskOutcome::Cancelled(i);
}
on_step(i);
thread::sleep(step_delay);
}
if token.is_cancelled() {
TaskOutcome::Cancelled(steps)
} else {
TaskOutcome::Completed(steps)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn new_token_is_not_cancelled() {
let token = CancellationToken::new();
assert!(!token.is_cancelled());
}
#[test]
fn cancel_sets_the_flag() {
let token = CancellationToken::new();
token.cancel();
assert!(token.is_cancelled());
}
#[test]
fn clones_share_state() {
let token = CancellationToken::new();
let clone = token.clone();
token.cancel();
assert!(clone.is_cancelled());
}
#[test]
fn cancel_is_idempotent() {
let token = CancellationToken::new();
token.cancel();
token.cancel();
assert!(token.is_cancelled());
}
#[test]
fn task_runs_to_completion_when_not_cancelled() {
let token = CancellationToken::new();
let outcome = long_task(5, &token, Duration::from_millis(0), |_| {});
assert_eq!(outcome, TaskOutcome::Completed(5));
}
#[test]
fn task_stops_immediately_when_pre_cancelled() {
let token = CancellationToken::new();
token.cancel();
let outcome = long_task(5, &token, Duration::from_millis(0), |_| {
panic!("step should not run");
});
assert_eq!(outcome, TaskOutcome::Cancelled(0));
}
#[test]
fn task_stops_when_cancelled_from_another_thread() {
let token = CancellationToken::new();
let worker_token = token.clone();
let handle = thread::spawn(move || {
long_task(100, &worker_token, Duration::from_millis(10), |_| {})
});
thread::sleep(Duration::from_millis(35));
token.cancel();
match handle.join().expect("worker panicked") {
TaskOutcome::Cancelled(step) => assert!(step < 100),
other => panic!("expected cancellation, got {:?}", other),
}
}
#[test]
fn on_step_receives_increasing_indices() {
let token = CancellationToken::new();
let mut seen = Vec::new();
let outcome = long_task(3, &token, Duration::from_millis(0), |i| seen.push(i));
assert_eq!(outcome, TaskOutcome::Completed(3));
assert_eq!(seen, vec![0, 1, 2]);
}
#[test]
fn zero_step_task_completes_immediately() {
let token = CancellationToken::new();
let outcome = long_task(0, &token, Duration::from_millis(0), |_| {
panic!("no steps expected");
});
assert_eq!(outcome, TaskOutcome::Completed(0));
}
}Key Differences
AtomicBool for lock-free cancellation checking; OCaml needs ref + Mutex for equivalent thread safety.Release/Acquire semantics are explicit and precise; OCaml's Mutex provides full mutual exclusion (stronger but more costly).Arc::clone (atomic reference count); OCaml's Mutex-wrapped bool requires similar Arc / ref wrapping.Lwt_switch integrates with the Lwt event loop; Rust's AtomicBool is a low-level primitive — tokio::CancellationToken is the high-level version.OCaml Approach
OCaml's Lwt_switch provides cooperative cancellation for Lwt promises. Lwt_switch.create () creates a switch; Lwt_switch.add_hook switch f registers cleanup; Lwt_switch.turn_off switch cancels. OCaml 5 Eio uses Fiber.with_cancellation and Cancel.cancel. For plain threads: let cancelled = ref false with a Mutex for thread safety, equivalent to Rust's AtomicBool. The Go-style context.Context has no direct OCaml equivalent in the standard library.
Full Source
//! Cooperative cancellation via a shared atomic flag.
//!
//! A [`CancellationToken`] lets one thread signal another to stop work.
//! The worker periodically checks the token and exits early when
//! cancellation has been requested.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
/// A clonable, thread-safe cooperative cancellation flag.
#[derive(Debug, Clone, Default)]
pub struct CancellationToken {
flag: Arc<AtomicBool>,
}
impl CancellationToken {
/// Creates a new token in the non-cancelled state.
pub fn new() -> Self {
Self {
flag: Arc::new(AtomicBool::new(false)),
}
}
/// Signals cancellation. Idempotent.
pub fn cancel(&self) {
self.flag.store(true, Ordering::SeqCst);
}
/// Returns `true` if cancellation has been requested.
pub fn is_cancelled(&self) -> bool {
self.flag.load(Ordering::SeqCst)
}
}
/// Outcome of a cancellable task run.
#[derive(Debug, PartialEq, Eq)]
pub enum TaskOutcome {
/// Task finished all requested steps.
Completed(usize),
/// Task was cancelled at the given step index.
Cancelled(usize),
}
/// Runs a cooperative long-running task that checks `token` between steps.
///
/// `on_step` is invoked for each step that actually runs. Between steps the
/// thread sleeps for `step_delay`, mirroring the OCaml `Thread.delay 0.01`.
pub fn long_task<F>(
steps: usize,
token: &CancellationToken,
step_delay: Duration,
mut on_step: F,
) -> TaskOutcome
where
F: FnMut(usize),
{
for i in 0..steps {
if token.is_cancelled() {
return TaskOutcome::Cancelled(i);
}
on_step(i);
thread::sleep(step_delay);
}
if token.is_cancelled() {
TaskOutcome::Cancelled(steps)
} else {
TaskOutcome::Completed(steps)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn new_token_is_not_cancelled() {
let token = CancellationToken::new();
assert!(!token.is_cancelled());
}
#[test]
fn cancel_sets_the_flag() {
let token = CancellationToken::new();
token.cancel();
assert!(token.is_cancelled());
}
#[test]
fn clones_share_state() {
let token = CancellationToken::new();
let clone = token.clone();
token.cancel();
assert!(clone.is_cancelled());
}
#[test]
fn cancel_is_idempotent() {
let token = CancellationToken::new();
token.cancel();
token.cancel();
assert!(token.is_cancelled());
}
#[test]
fn task_runs_to_completion_when_not_cancelled() {
let token = CancellationToken::new();
let outcome = long_task(5, &token, Duration::from_millis(0), |_| {});
assert_eq!(outcome, TaskOutcome::Completed(5));
}
#[test]
fn task_stops_immediately_when_pre_cancelled() {
let token = CancellationToken::new();
token.cancel();
let outcome = long_task(5, &token, Duration::from_millis(0), |_| {
panic!("step should not run");
});
assert_eq!(outcome, TaskOutcome::Cancelled(0));
}
#[test]
fn task_stops_when_cancelled_from_another_thread() {
let token = CancellationToken::new();
let worker_token = token.clone();
let handle = thread::spawn(move || {
long_task(100, &worker_token, Duration::from_millis(10), |_| {})
});
thread::sleep(Duration::from_millis(35));
token.cancel();
match handle.join().expect("worker panicked") {
TaskOutcome::Cancelled(step) => assert!(step < 100),
other => panic!("expected cancellation, got {:?}", other),
}
}
#[test]
fn on_step_receives_increasing_indices() {
let token = CancellationToken::new();
let mut seen = Vec::new();
let outcome = long_task(3, &token, Duration::from_millis(0), |i| seen.push(i));
assert_eq!(outcome, TaskOutcome::Completed(3));
assert_eq!(seen, vec![0, 1, 2]);
}
#[test]
fn zero_step_task_completes_immediately() {
let token = CancellationToken::new();
let outcome = long_task(0, &token, Duration::from_millis(0), |_| {
panic!("no steps expected");
});
assert_eq!(outcome, TaskOutcome::Completed(0));
}
}#[cfg(test)]
mod tests {
use super::*;
#[test]
fn new_token_is_not_cancelled() {
let token = CancellationToken::new();
assert!(!token.is_cancelled());
}
#[test]
fn cancel_sets_the_flag() {
let token = CancellationToken::new();
token.cancel();
assert!(token.is_cancelled());
}
#[test]
fn clones_share_state() {
let token = CancellationToken::new();
let clone = token.clone();
token.cancel();
assert!(clone.is_cancelled());
}
#[test]
fn cancel_is_idempotent() {
let token = CancellationToken::new();
token.cancel();
token.cancel();
assert!(token.is_cancelled());
}
#[test]
fn task_runs_to_completion_when_not_cancelled() {
let token = CancellationToken::new();
let outcome = long_task(5, &token, Duration::from_millis(0), |_| {});
assert_eq!(outcome, TaskOutcome::Completed(5));
}
#[test]
fn task_stops_immediately_when_pre_cancelled() {
let token = CancellationToken::new();
token.cancel();
let outcome = long_task(5, &token, Duration::from_millis(0), |_| {
panic!("step should not run");
});
assert_eq!(outcome, TaskOutcome::Cancelled(0));
}
#[test]
fn task_stops_when_cancelled_from_another_thread() {
let token = CancellationToken::new();
let worker_token = token.clone();
let handle = thread::spawn(move || {
long_task(100, &worker_token, Duration::from_millis(10), |_| {})
});
thread::sleep(Duration::from_millis(35));
token.cancel();
match handle.join().expect("worker panicked") {
TaskOutcome::Cancelled(step) => assert!(step < 100),
other => panic!("expected cancellation, got {:?}", other),
}
}
#[test]
fn on_step_receives_increasing_indices() {
let token = CancellationToken::new();
let mut seen = Vec::new();
let outcome = long_task(3, &token, Duration::from_millis(0), |i| seen.push(i));
assert_eq!(outcome, TaskOutcome::Completed(3));
assert_eq!(seen, vec![0, 1, 2]);
}
#[test]
fn zero_step_task_completes_immediately() {
let token = CancellationToken::new();
let outcome = long_task(0, &token, Duration::from_millis(0), |_| {
panic!("no steps expected");
});
assert_eq!(outcome, TaskOutcome::Completed(0));
}
}
Deep Comparison
922-cancellation-token — Language Comparison
std vs tokio
| Aspect | std version | tokio version |
|---|---|---|
| Runtime | OS threads via std::thread | Async tasks on tokio runtime |
| Synchronization | std::sync::Mutex, Condvar | tokio::sync::Mutex, channels |
| Channels | std::sync::mpsc (unbounded) | tokio::sync::mpsc (bounded, async) |
| Blocking | Thread blocks on lock/recv | Task yields, runtime switches tasks |
| Overhead | One OS thread per task | Many tasks per thread (M:N) |
| Best for | CPU-bound, simple concurrency | I/O-bound, high-concurrency servers |
Exercises
timeout_token that automatically sets itself after a specified Duration, combining CancellationToken with thread::sleep.cancel_after(n: usize) method to CancellationToken that sets itself after n calls to is_cancelled().CancellableIter<I: Iterator> wrapper that checks the token on each .next() call.