330: Async Sink
Difficulty: 4 Level: Expert A destination that accepts values and flushes them in batches โ the write side of a stream.The Problem This Solves
If a `Stream` is a source you pull from, a `Sink` is a destination you push into. In real async systems you constantly need to write to something that can't accept items one-by-one at full speed: a database that prefers bulk inserts, a log aggregator that batches messages, a network socket that benefits from coalescing writes, or a metrics collector that flushes every N samples. Writing directly to these sinks on every item is expensive. The `Sink` pattern buffers incoming values and flushes when the buffer is full (or on explicit flush), absorbing bursts without hammering the downstream. In async Rust, `futures::Sink` gives you `send(item).await` and `flush().await` โ the sink decides internally when to actually write. This example implements the same pattern synchronously using `VecDeque` as a buffer, showing the core logic without a runtime dependency.The Intuition
Think of it like a batched database writer in Node.js:// Don't do one INSERT per row โ buffer and flush
buffer.push(item);
if (buffer.length >= BATCH_SIZE) await db.insertMany(buffer.splice(0));
Rust's `BatchSink` is the same idea: `send()` buffers, flush triggers when capacity is reached. The async version (from `futures::SinkExt`) wraps `send` in an async call so the flush โ which might wait for I/O โ doesn't block the executor.
How It Works in Rust
struct BatchSink<T> {
buffer: VecDeque<T>,
capacity: usize,
flushed_batches: Vec<Vec<T>>,
}
impl<T: Clone> BatchSink<T> {
fn send(&mut self, item: T) -> Result<(), String> {
self.buffer.push_back(item);
// Auto-flush when buffer reaches capacity
if self.buffer.len() >= self.capacity { self.flush()?; }
Ok(())
}
fn flush(&mut self) -> Result<(), String> {
if !self.buffer.is_empty() {
// drain() moves items out without cloning
let batch: Vec<T> = self.buffer.drain(..).collect();
self.flushed_batches.push(batch);
}
Ok(())
}
}
Key: `VecDeque::drain(..)` empties the buffer in one move. Always call `flush()` explicitly at the end โ the last partial batch won't auto-flush if `len < capacity`.
What This Unlocks
- Batched database writes โ collect rows and `INSERT` in bulk, cutting round-trips by 10โ100ร.
- Log aggregation โ buffer log lines and send to Elasticsearch / Loki in batches.
- Backpressure integration โ pair with a `Stream` to build a full push-through pipeline with flow control.
Key Differences
| Concept | OCaml | Rust |
|---|---|---|
| Mutable buffer | `mutable buf: 'a list` (reversed for O(1) prepend) | `VecDeque<T>` (O(1) both ends) |
| Flush trigger | Manual check on `List.length` | `buffer.len() >= capacity` |
| Drain | `s.buf <- []` (GC frees old list) | `buffer.drain(..)` (moves into new `Vec`) |
| Error handling | Unit / exception | `Result<(), String>` |
use std::collections::VecDeque;
struct BatchSink<T> {
buffer: VecDeque<T>,
capacity: usize,
flushed_batches: Vec<Vec<T>>,
}
impl<T: Clone> BatchSink<T> {
fn new(capacity: usize) -> Self {
Self { buffer: VecDeque::new(), capacity, flushed_batches: Vec::new() }
}
fn send(&mut self, item: T) -> Result<(), String> {
self.buffer.push_back(item);
if self.buffer.len() >= self.capacity { self.flush()?; }
Ok(())
}
fn flush(&mut self) -> Result<(), String> {
if !self.buffer.is_empty() {
let b: Vec<T> = self.buffer.drain(..).collect();
println!("Flushing {} items", b.len());
self.flushed_batches.push(b);
}
Ok(())
}
fn into_batches(self) -> Vec<Vec<T>> { self.flushed_batches }
}
fn main() {
let mut s: BatchSink<i32> = BatchSink::new(3);
for i in 1..=8 { s.send(i).unwrap(); }
s.flush().unwrap();
println!("Batches: {}", s.into_batches().len());
}
#[cfg(test)]
mod tests {
use super::*;
#[test] fn flushes_in_batches() {
let mut s: BatchSink<i32> = BatchSink::new(3);
for i in 1..=9 { s.send(i).unwrap(); }
let b = s.into_batches();
assert_eq!(b.len(), 3);
assert_eq!(b[0], vec![1,2,3]);
}
#[test] fn partial_flush() {
let mut s: BatchSink<i32> = BatchSink::new(5);
for i in 1..=3 { s.send(i).unwrap(); }
s.flush().unwrap();
let b = s.into_batches();
assert_eq!(b[0], vec![1,2,3]);
}
}
(* OCaml: sink-like accumulator with flush *)
type 'a sink = { mutable buf: 'a list; cap: int; flush_fn: 'a list -> unit }
let make_sink cap f = { buf=[]; cap; flush_fn=f }
let send s x =
s.buf <- x :: s.buf;
if List.length s.buf >= s.cap then (s.flush_fn (List.rev s.buf); s.buf <- [])
let flush s = if s.buf <> [] then (s.flush_fn (List.rev s.buf); s.buf <- [])
let () =
let out = ref 0 in
let s = make_sink 3 (fun b -> out := !out + 1; Printf.printf "Flush %d items\n" (List.length b)) in
List.iter (send s) [1;2;3;4;5;6;7;8];
flush s;
Printf.printf "Batches: %d\n" !out