Skip to main content

An async pipeline with backpressure

Time: 55 minutes. Prerequisites: Channels cookbook, Nursery cookbook, Resilience cookbook.

We'll build ingest — a typical streaming ETL pipeline:

Each stage is a task. Channels connect them. The consumer's rate throttles the producer; failures trigger retry with exponential backoff; Ctrl+C drains the pipeline gracefully.

1. The domain

src/types.vr:

pub type RawRecord is {
line_no: Int,
text: Text,
};

pub type ParsedRecord is {
line_no: Int,
timestamp: Instant,
severity: Severity,
message: Text,
};

pub type Severity is Debug | Info | Warn | Error;

pub type ValidRecord is ParsedRecord { self.message.len() > 0 };

The refinement on ValidRecord makes the validator's contract load-bearing — a ValidRecord at the writer is guaranteed non-empty.

2. Stage 1: Reader

src/reader.vr:

use core.async.*;
use core.io.*;
use .self.types::RawRecord;

pub async fn read_from_stdin(tx: Sender<RawRecord>)
using [IO, Logger]
-> Result<Int, Error>
{
let stdin = stdin();
let mut reader = BufReader.new(stdin.lock());
let mut line_no = 0;

loop {
let mut line = Text.new();
match reader.read_line_async(&mut line).await? {
0 => break, // EOF
_ => {
line_no += 1;
let rec = RawRecord {
line_no,
text: line.trim_end().to_string(),
};
// If the queue is full, suspend — producer waits for consumer.
if tx.send(rec).await.is_err() { break; } // pipeline closed
}
}
}
Logger.info(&f"reader finished: {line_no} lines");
Result.Ok(line_no)
}

tx.send suspends when the channel is full — that's backpressure.

3. Stage 2: Parser (N workers)

src/parser.vr:

use core.async.*;
use core.text.*;
use .self.types::*;

pub async fn parse_loop(
mut rx: Receiver<RawRecord>,
tx: Sender<ParsedRecord>,
) using [Logger] -> Result<Int, Error> {
let mut processed = 0;

while let Maybe.Some(raw) = rx.recv().await {
match parse_one(&raw) {
Result.Ok(parsed) => {
if tx.send(parsed).await.is_err() { break; }
processed += 1;
}
Result.Err(e) => {
Logger.warn(&f"parse error at line {raw.line_no}: {e}");
}
}
}
Logger.info(&f"parser worker finished: {processed}");
Result.Ok(processed)
}

fn parse_one(raw: &RawRecord) -> Result<ParsedRecord, Error> {
// Format: "TIMESTAMP SEVERITY MESSAGE..."
let parts: List<&Text> = raw.text.splitn(3, &" ").collect();
if parts.len() < 3 {
return Result.Err(Error.new(&"not enough fields"));
}
let timestamp = parse_timestamp(parts[0])?;
let severity = match parts[1].as_str() {
"DEBUG" => Severity.Debug,
"INFO" => Severity.Info,
"WARN" => Severity.Warn,
"ERROR" => Severity.Error,
s => return Result.Err(Error.new(&f"bad severity: {s}")),
};
Result.Ok(ParsedRecord {
line_no: raw.line_no,
timestamp,
severity,
message: parts[2].to_string(),
})
}

fn parse_timestamp(s: &Text) -> Result<Instant, Error> {
// Stub — real parser would handle ISO 8601 etc.
Result.Ok(Instant.now())
}

4. Stage 3: Validator

src/validator.vr:

use core.async.*;
use .self.types::*;

pub async fn validate_loop(
mut rx: Receiver<ParsedRecord>,
tx: Sender<ValidRecord>,
) using [Logger] -> Result<Int, Error> {
let mut valid = 0;
let mut invalid = 0;

while let Maybe.Some(rec) = rx.recv().await {
if rec.message.is_empty() {
invalid += 1;
Logger.debug(&f"dropping empty message line {rec.line_no}");
continue;
}
// Promote ParsedRecord -> ValidRecord (refinement checked).
let vr: ValidRecord = rec;
if tx.send(vr).await.is_err() { break; }
valid += 1;
}
Logger.info(&f"validator finished: {valid} valid, {invalid} dropped");
Result.Ok(valid)
}

5. Stage 4: Writer with retry

src/writer.vr:

use core.async.*;
use core.io.*;
use .self.types::ValidRecord;

pub async fn write_loop(
mut rx: Receiver<ValidRecord>,
path: &Path,
) using [IO, Logger] -> Result<Int, Error> {
let file = OpenOptions.new()
.create(true).append(true)
.open_async(path).await?;
let mut writer = BufWriter.new(file);
let mut written = 0;

while let Maybe.Some(rec) = rx.recv().await {
let line = format_record(&rec);

// Retry with exponential backoff on transient write failures.
let result = execute_with_retry_config(
|| writer.write_all_async(line.as_bytes()),
RetryConfig {
max_attempts: 3,
initial_backoff_ms: 50,
max_backoff_ms: 500,
backoff_factor: 2.0,
jitter: true,
}
).await;

match result {
Result.Ok(_) => { written += 1; }
Result.Err(e) => {
Logger.error(&f"write failed after retries: {e}");
break; // propagate failure to the nursery
}
}
}

writer.flush_async().await?;
Logger.info(&f"writer finished: {written} records");
Result.Ok(written)
}

fn format_record(rec: &ValidRecord) -> Text {
let sev = match rec.severity {
Severity.Debug => "DEBUG",
Severity.Info => "INFO",
Severity.Warn => "WARN",
Severity.Error => "ERROR",
};
f"{rec.line_no:>6} {sev:>5} {rec.message}\n"
}

6. Orchestration — the nursery

src/main.vr:

use core.async.*;
use core.io.*;
use .self.reader::read_from_stdin;
use .self.parser::parse_loop;
use .self.validator::validate_loop;
use .self.writer::write_loop;
use .self.types::*;

const PARSER_WORKERS: Int = 4;
const CHANNEL_CAPACITY: Int = 1024;

async fn run_pipeline(output_path: &Path) using [IO, Logger] -> Result<(), Error> {
// Wire the stages.
let (raw_tx, raw_rx) = channel::<RawRecord>(CHANNEL_CAPACITY);
let (parsed_tx, parsed_rx) = channel::<ParsedRecord>(CHANNEL_CAPACITY);
let (valid_tx, valid_rx) = channel::<ValidRecord>(CHANNEL_CAPACITY);

nursery(on_error: cancel_all) {
// Stage 1: reader (single)
spawn read_from_stdin(raw_tx);

// Stage 2: parser workers (N)
for worker_id in 0..PARSER_WORKERS {
let rx = raw_rx.clone();
let tx = parsed_tx.clone();
spawn parse_loop(rx, tx);
}
drop(parsed_tx); // last clone released; EOF propagates

// Stage 3: validator (single)
spawn validate_loop(parsed_rx, valid_tx);

// Stage 4: writer (single)
spawn write_loop(valid_rx, output_path);

// Signal handler: Ctrl+C triggers a graceful drain.
spawn async move {
wait_for_signal(Signal::Interrupt).await;
Logger.info(&"Ctrl+C received; draining pipeline…");
// Dropping all senders triggers graceful EOF through each stage.
drop(raw_tx);
};

// Nursery waits for every child.
} recover (e: NurseryError) {
Logger.error(&f"pipeline error: {e:?}");
return Result.Err(Error.new(&"pipeline failed"));
}

Result.Ok(())
}

fn main() using [IO] {
let args = env::args();
let out_path = args.get(1)
.map(|s| Path.from(s.as_str()))
.unwrap_or(Path.from(&"ingest.log"));

let rt = Runtime.new(RuntimeConfig.default()
.worker_threads(PARSER_WORKERS + 4)
.io_engine(IoEngineKind::IoUring))
.expect("runtime");

rt.block_on(async {
provide Logger = ConsoleLogger.new(LogLevel.Info) in {
run_pipeline(&out_path).await.expect("pipeline")
}
});
}

7. What happens at each interesting moment

Producer-faster-than-consumer

Reader sends at 10 K/sec; parser can handle 4 K/sec; writer 2 K/sec.

  • Writer's channel (capacity 1024) fills.
  • Validator suspends on tx.send — backpressure.
  • Parser's output channel fills.
  • Parsers suspend on their tx.send.
  • Raw channel fills.
  • Reader suspends on tx.send.
  • Input rate throttles itself to writer rate.

No memory blows up; no messages are dropped.

A parser worker panics

  • The panicked task's future returns Err(JoinError::Panicked).
  • Nursery's on_error: cancel_all cancels all sibling tasks.
  • Each stage observes a closed channel and exits cleanly.
  • Nursery's recover block runs; we return an error from run_pipeline.

Ctrl+C

  • wait_for_signal(Signal::Interrupt) wakes up.
  • The signal task drops raw_tx. Since the reader holds the other sender clone, we actually drop both: our local raw_tx and after the signal task drops its clone, the reader's own sender is still alive — but the reader is told via a separate cancel mechanism...

Let me correct the shutdown:

// Shutdown via shared flag
let stopping = Shared.new(AtomicBool.new(false));
let s_clone = stopping.clone();
spawn async move {
wait_for_signal(Signal::Interrupt).await;
s_clone.store(true, MemoryOrdering.Release);
};

// In reader:
while !stopping.load(MemoryOrdering.Acquire) && /* ... */ {
// read next line
}

Or more simply: use a broadcast channel for shutdown notifications and select! in each stage.

8. Tests

@cfg(test)
module tests {
use .super.*;

@test
async fn pipeline_preserves_count() using [IO] {
// Inject fake input via an in-memory Reader.
let input: List<Text> = (0..100).map(|i|
f"2026-04-15T00:00:{i:02} INFO Message {i}"
).collect();

let (raw_tx, raw_rx) = channel::<RawRecord>(16);
let (parsed_tx, parsed_rx) = channel::<ParsedRecord>(16);
let (valid_tx, mut valid_rx) = channel::<ValidRecord>(16);

nursery {
spawn async move {
for (i, line) in input.iter().enumerate() {
raw_tx.send(RawRecord { line_no: i + 1, text: line.clone() })
.await.unwrap();
}
};
spawn parse_loop(raw_rx, parsed_tx);
spawn validate_loop(parsed_rx, valid_tx);

let mut count = 0;
while let Maybe.Some(_r) = valid_rx.recv().await { count += 1; }
assert_eq(count, 100);
}
}

@test
async fn backpressure_throttles_sender() using [IO, Clock] {
// Slow consumer: sleep after each recv.
let (tx, mut rx) = channel::<Int>(4);

let sender_start = Clock.now();
spawn async move {
for i in 0..10 {
tx.send(i).await.unwrap();
}
};

for _ in 0..10 {
let _ = rx.recv().await.unwrap();
sleep(50.ms()).await;
}

let elapsed = Clock.now() - sender_start;
assert(elapsed.as_millis() >= 450); // 9 sleeps × 50 ms
}
}

9. Run

$ verum build --release
$ cat logs.txt | ./target/release/ingest output.log
[info] parser worker finished: 2500
[info] parser worker finished: 2503
[info] parser worker finished: 2497
[info] parser worker finished: 2500
[info] reader finished: 10000 lines
[info] validator finished: 9998 valid, 2 dropped
[info] writer finished: 9998 records

What you learned

  • Channels as pipeline glue. Each stage is a task; bounded channels provide automatic backpressure.
  • Nursery for lifetime management. Every spawned task outlives or is cancelled with the pipeline.
  • Retry + circuit breaker composes naturally around I/O-touching stages.
  • Graceful shutdown via flag + signal task — no Ctrl+C-induced data loss.
  • Refinement types at stage boundaries. ValidRecord encodes "validator has done its job" in the type.

Next

  • Distributed pipeline: replace channels with network protocols (net::tcp) and serialise with @derive(Serialize).
  • Exactly-once processing: add an on-disk journal + idempotent writers.
  • Metrics: add a Metrics context; each stage records throughput, lag, error counts.

See also