import StdlibStatus from '@site/src/components/StdlibStatus';
core.async — Asynchronous execution
<StdlibStatus
status="partial"
detail="12 of 23 async modules carry full conformance suites; 7 fundamental compiler/stdlib defects pinned. Variant-algebra + construction-surface backbone is interpreter-green; runtime poll-path coverage waits on the executor test-bed."
defects={[
{area: 'global AOT', summary: 'task #10 — compiler.phase.generate_native SIGABRT (LLVM SmallVector hang) — blocks AOT coverage across every async test'},
{area: 'panic_fence', summary: 'task #11 — Maybe.take() mutation through &mut self does not flow back to a generic record field — gates the fence lifecycle invariant'},
{area: 'semaphore', summary: 'task #12 — AsyncSemaphore.new null-derefs through AtomicInt.swap in the Mutex/AtomicBool init chain; task #13 — is-operator returns false for the only variant of a single-variant sum'},
{area: 'timer', summary: 'task #14 — timeout_ms cross-module name collision; task #15 — Duration.from_millis dispatch routes from_nanos to an Int receiver; task #16 — Timeout<F> field-layout write OOB; task #17 — TimerInterval.period() recursion'},
]}
sweepDate="2026-05-14"
/>
Full async toolkit: Future protocol, executors, channels, async
streams, timers, structured concurrency (nursery), racing (select),
circuit breakers, retry policies, parallel helpers.
Module status
Each core.async.* module carries an explicit conformance status so
you know what you can rely on today versus what is still in flight.
The status is the truth-table over the module's API surface as
exercised by core-tests/async/<module>/ under both verum test --interp (Tier 0 VBC interpreter) and verum test --aot (Tier 2 LLVM
AOT, --test-threads 1).
| Status | Meaning |
|---|---|
| stable | Every public method is conformance-tested. Algebraic laws are pinned by exhaustive or large-domain property tests. Cross-stdlib integration is verified. Interpreter and AOT agree on every test. Safe to depend on in production. |
| complete | Synonym for stable used by the conformance suite: 100 % public API coverage, every algebraic law pinned, every regression guarded forever. |
| partial | Subset of the public API is conformance-tested and stable. The rest is exercised in regression_test.vr via @ignored tests pinning the specific defects that block coverage. The non-ignored API surface is safe; everything else is documented per-module under "Open defects". |
| regression-only | Module is gated by upstream stdlib / language-level defects. Public-API tests do not pass yet — only @ignored regressions exist to lock the bug shapes. Avoid in production until promoted. |
| undocumented | Documentation in this reference is authoritative, but the module has not yet been routed through the core-tests/ conformance suite. The current page is a best-effort snapshot of the source; it may drift from runtime behaviour. |
| Module | Status | Conformance suite |
|---|---|---|
poll.vr | complete | core-tests/async/poll — 42 unit + 21 property + 14 integration + 7 regression-as-guardrail (79 working, 0 pinned) |
waker.vr | partial | core-tests/async/waker — 9 working + 2 pinned regressions (inline-vtable redesign + record-literal Clone-corruption fix + Waker construction inlining; waker construction + clone + wake_by_ref + will_wake all now green; 2 residual pins: fn_ref-as-Int identity stability and Debug auto-derive precedence for record types) |
future.vr | partial | core-tests/async/future — 17 working (construction + SelectResult variant algebra + Maybe interop) + 14 pinned regressions (the Future.poll / FutureExt.block surface is gated by the same &self auto-deref defect as waker §C) |
backoff.vr | partial | core-tests/async/backoff — 14 working (BackoffStrategy variant + match-coverage) + 7 pinned regressions (Backoff. |
task.vr | partial | core-tests/async/task — 16 working (JoinError variant algebra + TaskId record construction + List |
diagnostics.vr | partial | core-tests/async/diagnostics — 15 working (TaskLifecycleState 6-variant lifecycle + partition + terminal-state classification + List-histogram). Pure data-type module — no runtime dependency. |
cancellation.vr | partial | core-tests/async/cancellation — 11 working (CancelReason 4-variant + Aborted(Text) payload + List bookkeeping). Timeout{deadline} arm deferred to integration once Instant works under interp. |
channel.vr | partial | core-tests/async/channel — 12 working (TrySendError + TryRecvError variant algebra + payload recovery + retry-signal classification). |
broadcast.vr | partial | core-tests/async/broadcast — 18 working (BroadcastRecvError + TryRecvResult |
select.vr | partial | core-tests/async/select — 12 working (Either<A,B> + SelectError + race-outcome bookkeeping). |
nursery.vr | partial | core-tests/async/nursery — 8 working (NurseryErrorBehavior 3-policy + priority/severity ordering). |
spawn_config.vr | partial | core-tests/async/spawn_config — 21 working (RestartPolicy + IsolationLevel + Priority 4-rank ordering + will-restart classification). |
spawn_with.vr | partial | core-tests/async/spawn_with — 10 working (CircuitState 3-variant breaker lifecycle Closed → Open → HalfOpen → Closed + can-attempt classification). |
executor.vr | unaudited | — depends on extern FFI symbols not callable under interp; deferred pending executor test-bed |
stream.vr | unaudited | — StreamExt depends on Future protocol; deferred pending the executor test-bed and tasks #11 + #25 |
generator.vr | unaudited | — runtime-bound; deferred pending the executor test-bed |
timer.vr | partial | core-tests/async/timer — 29 working (Sleep/SleepUntil/Delay construction surface + TimerInterval new/immediate next_tick partition + Debounce/Throttle state-machine round-trip + monotonic refusal + reset-then-acquire across 4 representative intervals + TimeoutError Eq reflexivity) + 6 pinned regressions for tasks #14 / #15 / #16 / #17. Pre-fix landed in this branch: pub async fn acquire → public async fn acquire (line 535). |
parallel.vr | complete (interp) | core-tests/async/parallel — 38 working covering parallel_map, parallel_filter_map, parallel_for_each, parallel_reduce, and the Blelloch parallel_scan_exclusive. Pinned properties: worker-count invariance over {1,2,4,8,16}, Blelloch-vs-reference exclusive-prefix-scan identity for + and max, parallel_reduce ≡ left-fold₁, filter_map index-subset-of-map. AOT validation gated by task #10. |
panic_fence.vr | partial | core-tests/async/panic_fence — 12 working (panic_safe factory + record-literal Some/None inner + Ready(Ok) Int/Text round-trip + fence outcome→tag classification + ListMaybe.take() mutation through &mut self on a generic record field; gates the fence's documented "inner=None after Ready" lifecycle invariant). Panic-arm coverage deferred pending a panicking-Future test bed. |
semaphore.vr | regression-only outside variant algebra | core-tests/async/semaphore — 7 working (SemaphoreError single-variant algebra + Result/Maybe wrapping integration) + 10 pinned regressions (9 lifecycle tests blocked by task #12: AsyncSemaphore.new null-derefs through AtomicInt.swap in Mutex/AtomicBool init; 1 single-variant is-operator test blocked by task #13). |
async_iterator.vr | unaudited | — protocol-only; the testable surface is the blanket IntoAsyncIterator for A, exercised transitively through stream/channel/broadcast concrete impls (deferred pending those test beds) |
intrinsics.vr | partial | core-tests/async/intrinsics — 19 working (Executor.current/in_async_context coherence + future_poll_sync ReadyFuture round-trip across Int/Text/Bool payloads + IntrinsicsYieldNow two-state lifecycle Pending→Ready with exactly-one-Pending tightness). Spawn family + sleep family @intrinsics deferred pending the live-executor test-bed. |
Cross-module language defects
Multiple interpreter / codegen defects are shared across the partial async modules above; closing any unblocks coverage in every module that depends on it.
Active (2026-05-14)
-
Task #10 — global AOT
generate_nativeSIGABRT. Every test under--aotcrashes with__pthread_cond_wait→llvm::SmallVectorBase::grow_podat the native-gen worker pool. Affects every async test (and every base/maybe test too); not async-specific. Blocks the AOT half of the cross-tier conformance contract. Repro:verum test --aot --filter test_none_constructionfromcore/. -
Task #11 —
Maybe.take()mutation through&mut selfdoes not flow back to a generic record field. Repro:PanicFence<F>::polldoesself.inner.take()(whereinner: Maybe<F>); after a Ready poll,fence.inneris stillSome(f). The fence's documented "inner=None after Ready" lifecycle invariant is observably broken, so the "polled after completion" panic guard cannot fire. Defect lives in the CBGR-ref writeback path for*self = Noneinside a generic-typed Maybe field. Pinned incore-tests/async/panic_fence/regression_test.vr §A. -
Task #12 —
AsyncSemaphore.newnull-derefs throughAtomicInt.swap. EveryAsyncSemaphore.new(N)for any N panics withNullPointerAt { op: "opcode 0xe5", site: "AtomicInt.swap" }. Construction chain:AsyncSemaphore.new→Shared.new(Mutex.new(...))→AtomicBool.new(false)→AtomicInt.swapNULL. Defect class: VBC interpreter atomic-primitive dispatch on a freshly-allocated atomic cell. Pinned incore-tests/async/semaphore/regression_test.vr §A. -
Task #13 —
is-operator returns false on a single-variant sum type.let e: SemaphoreError = SemaphoreError.Closed; e is SemaphoreError.Closedevaluates tofalse, even though the same value routes correctly throughmatch e { SemaphoreError.Closed => ... }and througha.eq(&a). Likely shares root cause with the task #22 variant-tag stability cluster for the degenerate single-variant case. Pinned incore-tests/async/semaphore/regression_test.vr §B. -
Task #14 —
timeout_mscross-module name collision. Selective mountmount core.async.timer.{timeout_ms}plustimeout_ms(500, ready(7))fails to compile withWrongArgumentCount expected:1 found:2— the codegen routes to one of the same-named symbols incore.net.dns(instance method),core.runtime.supervisor(instance method), orcore.meta.contexts(protocol method). Defect class: free-function vs method-with-self name collision in the dispatch resolution table. Pinned incore-tests/async/timer/regression_test.vr §D. -
Task #15 —
Duration.from_millisdispatch routesfrom_nanosto an Int receiver.sleep(Duration.from_millis(N))panics: "method 'from_nanos' not found on receiver of runtime kind Int". ChainDuration.from_millis → Duration.from_nanos(int*1_000_000)is dispatchingfrom_nanosas a method on the Int multiplied result instead of as a static Duration constructor. Also surfaces throughsleep(Duration.from_secs(1))via the@inline(always)factory expansion, but NOT throughSleep.new(Duration.from_secs(1)). Pinned incore-tests/async/timer/regression_test.vr §A. -
Task #16 —
Timeout<F>field-layout write out of bounds.Timeout.new(Duration, future)panics: "field write out of bounds: field index 5 (offset 40+8=48) exceeds object data size 8". TheTimeout<F>record declares 3 fields {future, sleep, completed} but codegen writes to field index 5 (off by 2). Defect class: same family as task #9 (field-layout cross-mount race) but for a generic wrapper carrying an inner Sleep field. Pinned incore-tests/async/timer/regression_test.vr §C. -
Task #17 —
TimerInterval.period()field-vs-method-name shadowing causes StackOverflow. The getter method bodyself.perioddispatches asself.period()recursively. Workaround: read the field directly (it.period) — which works. Cleanest fix: stdlib renameTimerInterval.period()getter toTimerInterval.duration(), OR fix the language to give field access precedence over method-name resolution for bareself.Xsyntax. Pinned incore-tests/async/timer/regression_test.vr §B.
Closed
Five interpreter / codegen defects previously gated the partial async modules; closing them unblocked coverage:
-
Protocol default-method dispatch via blanket impl→ CLOSED 2026-05-12 (task #11) by a focused blanket-impl pre-pass incollect_all_declarations+ a generic-param materialisation skip in the maincollect_declarationsarm. Stdlib pattern:implement<F: Future> FutureExt for F {}declared AFTER concreteimplement Future for ReadyFuture<T>(source-order incore/async/future.vr). Single-pass collection observedself.blanket_impls = [Future→IntoFuture]only at ReadyFuture's collection point —Future→FutureExthad not yet been visited, so FutureExt's default bodies (block/map/and_then) never monomorphised onto ReadyFuture and runtimer.block()panicked. Fix: pre-pass populatesblanket_implsfrom a single linear scan; the main pass'salready_presentguard short-circuits duplicate registration. Generic-param skip suppresses spuriousF.block/F.map/F.and_thenregistration when the blanket impl itself is observed (the generic-param's bare name was leaking phantom FunctionIds via the bare-name fanout). Critical invariant: the pre-pass NEVER callsgenerate_default_protocol_methods— only seedsblanket_impls— so the Poll-suite invariant (protocol-registry empty-entry guard at line 1455) stays intact and the default-method materialisation runs exactly once per (concrete impl × derived protocol) pair. Repro fixed atcore-tests/async/future/regression_test.vr §A— 6 newly-passing tests acrossblock/ payload round-trip /lazy.invokes/map/map_composes/and_then. §B (4 tests on Join2/Join3/Select2 combinator receivers) remains pinned as task #24 — separate dispatch defect surfaces only when the receiver itself is a generic combinator wrapping inner Futures. -
Stdlib precompile divergence for record methods→ CLOSED 2026-05-12 ascompile_recordClone-Unit-corruption. Investigation traced the root cause tocompile_record's field-init Clone-before-SetF step, which wrote Unit into record fields whose AST declared noCloneimpl. Combined with aWaker.from_raw(raw)call-arg passing failure in theunsafe { ... }wrapper at the call site, the construction chain materialised Wakers whoserawfield was Unit; every downstreamself.raw.<vtable.slot>GetF then null-derefed at the first chain step. Fix: removed the synthetic Clone incompile_record(records live in heap-allocated NaN-boxed objects, so the field-write copies the pointer into the parent record's field slot — aliasing isn't possible through this path), AND inlinednoop_waker()'s body to a single nested record literal that side-steps the call-arg indirection. See commit5129d8b1a. -
→ CLOSED 2026-05-13 (task #9) by extendingtype_field_layoutscross-mount registration raceregister_archive_type(crates/verum_vbc/src/codegen/mod.rs:3944) to unconditionally populatetype_field_type_namesfor every archive-loaded record type — mirrors theregister_record_fieldsinvariant established by commitab768e5d8for user-phase declarations. Pre-fix the archive-side path only populatedtype_field_layoutsand lefttype_field_type_namesempty; downstreamfield_type_namereturnedNoneandresolve_field_indexfell through to the "pick the type with the most fields" global-scan heuristic — silently routing record-construction field writes to wrong offsets when a sibling type with same-named field was in scope. Addedtype_ref_to_field_namehelper that mirrorsextract_type_name_from_ast's prefix-preservation invariants (&unsafe T/*const T/*mut Tkeep their prefix so the raw-pointer marker atcompile_field_accessline 14372 still fires); pinned the built-in-TypeId → namediscipline via the newprimitive_type_id_to_namesource-of-truth (third site consistent withtype_ref_to_namecodegen-side andprimitive_typeid_namearchive_ctx_loader-side). 4 newly-passing regression tests atcore-tests/async/future/regression_test.vr §C(ReadyFuture.value / Join2.{fut1,fut2,result1,result2} / Select2.{fut1,fut2} / Lazy.f field-access underListmount). -
Free-function name collision in mount resolution→ CLOSED 2026-05-12 byregister_function_authoritative+ archive cross-pollination guard + qualified-key path-doubling fix inregister_module_filtered. When multiple stdlib modules exported free functions with the same simple name (e.g.selectappears in 7 modules;joinin 10), the call-sitemount path.{name}previously bound to whichever overload won the bootstrap-order first-wins race. Three layered defects, all closed:-
Codegen first-wins discipline swallowed explicit user mounts:
register_function'sentry().or_insert()underprefer_existing_functions=truerejected the user's authoritative binding when a passive archive-load had already claimed the bare-name slot. Lifted viaregister_function_authoritative(writes bothnameandname#aritykeys, no first-wins gate, no arity-collision shadowing) — invoked exclusively at the explicit-mountprocess_import_tree::Pathbranch. Glob mounts (mount X.*) keep first-wins to protect the FFI-raw / safe-wrapper precedence rule. -
Archive cross-pollination guard matched only the first path segment, so every stdlib
core.X.Y.<leaf>path passed the gate against every othercore.A.B.<leaf>(thew_prefixwas alwayscorefor stdlib keys). Two unrelated functions sharing a simple name collapsed onto the sameFunctionInfo, leaking the wrong FunctionId through the canonical qualified key. Tightened: when both keys are qualified, the FULL path-before-leaf must match. -
Path-doubling in
register_module_filteredbuilt the qualified key asformat!("{}.{}", module_name, simple_name_str)without checking whethersimple_name_stralready carried the module path (the precompiler's descriptor-name promotion setsfn_desc.nameto the full source-module-qualified form). Result:core.async.future.readygot registered ascore.async.core.async.future.ready; the canonical-form probe missed, and the user-side mount's lookup fell through to the cross-pollinated entry under the un-doubled key. Mirrors the detection rule frompopulate_ctx_from_archiveline ~326: whensimple_namealready contains a dot, treat it as the qualified shape directly.
Repro pinned at
core-tests/async/future/unit_test.vr §4-5:mount core.async.future.{join, select, join3}; let _ = join(f1, f2);now dispatches tocore.async.future.join(was:core.io.path.join/core.security.labels.join/ etc.). -
-
Variant-tag stability under per-file test compilation→ CLOSED 2026-05-13 (task #22) across 4 architectural defect classes via commits90b94e68b+3f14510b8+485a230c6+f1dd6fd19. (1) Nested-variant destructure scrutinee-type leak — stash i-thvariant_payload_typesintomatch_scrutinee_typebefore recursing. (2) Flat-variant tag drift — scrutinee-aware lookup tier BEFORE barelookup_function(name).variant_tagat both construction (compile_variant_constructor_hinted) and destructure (compile_pattern_test) sites. (3) Nested construction payload propagation — symmetric to (1) at construction site via new helperfind_variant_payload_types_by_type_and_name. (4) Generic-param substitution — when payload type is bare generic-param shape ("T"/"E"/"Self"), substitute the i-th generic arg from the outerreceiver_type<...>instantiation using TypeDescriptor'stype_paramsindex. Closes round-trip forPoll.Ready(Err(7)) → match → 2007deterministically across precompile cycles. Mirror of tasks #9 / #11 / #21 discipline: context-aware canonical resolution wins over passive bare-name race. The constrained- implement-block dispatch issue (closure body inimplement<T,E> Poll<Result<T,E>> { fn map_err(...) { ... } }not invoking the closure arg) is a DISTINCT defect tracked separately as task #25. -
Closure dispatch in constrained-implement-block bodies (task #25, distinct from #22). Methods defined inside
implement<T, E> Poll<Result<T, E>>(atcore/async/poll.vr:100onward — includingmap_ok/map_err/ready_ok/ready_err) are dispatched but the closure argument is bound incorrectly. Concretely:Poll.Ready(Err(7)).map_err(|e| e+1)returnsPoll.Ready(Err(7))rather thanPoll.Ready(Err(8))— the closure body is never invoked, producing a structural no-op. The dispatcher resolves the method (no panic, no argument-count mismatch) but either: (a) closure ends up in the wrong slot; (b) dispatch falls through to the genericimplement<T> Poll<T>block'smapand elides the inner transformation; or (c) the Ready(Err()) destructuring recurses back through Ready() without extracting the Err payload. The_preserves_ok/_preserves_pendingtest cases pass despite the bug, because their pinned outcomes (Ok / Pending arms) are preserved trivially by the no-op fallback. Worked around incore-tests/async/poll/{unit,property,integration}_test.vrvia directmatch q { Poll.Ready(Err(e)) => e, _ => ... }projection — the Poll/Result algebraic identity is pinned without crossing the broken dispatcher.
| File | Purpose |
|---|---|
poll.vr | Poll<T> — the three async states |
waker.vr | Waker, Context, RawWaker, RawWakerVTable |
future.vr | Future protocol + ReadyFuture, PendingFuture, Lazy, Join*, Select2, FutureExt |
task.vr | Task<T>, JoinHandle<T>, TaskId, JoinError, JoinSet<T>, YieldNow |
channel.vr | Channel<T>, Sender/Receiver, OneshotSender/OneshotReceiver, send/try errors |
broadcast.vr | BroadcastSender<T>, BroadcastReceiver<T>, broadcast_channel |
executor.vr | Runtime, RuntimeConfig, block_on, Timeout, LocalExecutor |
select.vr | Either<A,B>, select_either, race, select_all, join_all, try_first |
stream.vr | Stream, StreamExt, 30+ adapters, factories (iter, unfold, interval, from_fn) |
generator.vr | Generator<T>, AsyncGenerator<T> |
nursery.vr | Nursery, NurseryOptions, NurseryError, NurseryErrorBehavior, TaskHandle |
timer.vr | Sleep, SleepUntil, Interval, Delay, Timeout, Debounce, Throttle |
spawn_config.vr | SpawnConfig, RetryConfig, CircuitBreakerConfig, RecoveryStrategy, Priority |
spawn_with.vr | CircuitBreaker, CircuitState, execute_with_retry* |
parallel.vr | parallel_map, parallel_filter_map, parallel_for_each, parallel_reduce |
intrinsics.vr | runtime hooks: spawn_with_env, executor_spawn, future_poll_sync, async_sleep_* |
Poll<T> — the two-state algebra
Status: complete. Full public-API conformance under Tier 0 (interpreter) and Tier 2 (LLVM AOT,
--test-threads 1). 42 unit + 21 property + 14 integration + 7 regression-as-guardrail tests; every algebraic law on theFunctor-shape (identity, composition, Pending-as-absorbing- element), the Maybe-isomorphism round-trip, and theEq/Clone/Defaultprotocols are exhaustively pinned over a representativeIntpayload domain. Conformance suite: core-tests/async/poll.
The foundational poll-state algebra. Every async surface in the stdlib
(Future.poll, Stream.poll_next, AsyncIterator.poll_next, cancellation
checks, …) returns Poll<T> as its hot-path completion signal.
public type Poll<T> is
| Ready(T) // computation completed with value
| Pending; // not yet complete — the caller must re-poll
Predicates
p.is_ready() -> Bool // O(1), inlined
p.is_pending() -> Bool // O(1), inlined; is_ready ⊕ is_pending = true ∀ p
Functorial map
p.map<U, F: fn(T) -> U>(self, f: F) -> Poll<U>
Pending is the absorbing element: Pending.map(f) == Pending for any
f; the closure is never invoked. Ready(t).map(f) materialises
Ready(f(t)).
let p: Poll<Int> = Poll.Ready(3);
let q: Poll<Int> = p.map(|x| x * 2); // Poll.Ready(6)
let r: Poll<Text> = p.map(|x| f"{x}"); // Poll.Ready("3")
Extraction
p.unwrap(self) -> T // panics if Pending
p.unwrap_or(self, default: T) -> T // Pending → default
p.ready(self) -> Maybe<T> // Pending → None
p.ready() is the canonical bridge to Maybe<T>: Ready(t) ↔ Some(t)
and Pending ↔ None. The reverse direction is the From<Maybe<T>>
impl below; the two together form a structural isomorphism between
Poll<T> and Maybe<T> on the observable surface.
Conversions
From<Maybe<T>> for Poll<T> // Some(t) -> Ready(t), None -> Pending
Removed blanket impl. An earlier
From<T> for Poll<T>blanket impl was deleted: under any substitutionT = Maybe<U>it overlappedFrom<Maybe<T>>and the impl-coherence selector silently routedPoll.from(non_maybe_value)through the Maybe arm — collapsing the result toPoll.Pendingregardless of input. WritePoll.Ready(value)directly; it is the explicit, unambiguous form and produces byte-identical code.
Poll<Result<T, E>> — async-result composition
map_ok / map_err live on the Poll<Result<T, E>> shape so async
functions whose Output is Result<T, E> compose without
intermediate destructuring.
Poll.ready_ok(t: T) -> Poll<Result<T, E>> // shorthand for Ready(Ok(t))
Poll.ready_err(e: E) -> Poll<Result<T, E>> // shorthand for Ready(Err(e))
p.map_ok<U, F>(self, f: F) -> Poll<Result<U, E>> // F: fn(T) -> U
p.map_err<U, F>(self, f: F) -> Poll<Result<T, U>> // F: fn(E) -> U
Pending remains the absorbing element on both arms:
Pending.map_ok(f) == Pending and Pending.map_err(f) == Pending.
Ready(Err(e)).map_ok(f) == Ready(Err(e)) (no re-wrapping); symmetric
for map_err.
let p: Poll<Result<Int, Text>> = Poll.ready_ok(10);
let mid: Poll<Result<Int, Text>> = p.map_ok(|x| x + 1);
let after: Poll<Result<Int, Text>> = mid.map_ok(|x| x * 2); // Ready(Ok(22))
Protocol implementations
implement<T: Eq> Eq for Poll<T>; // structural on Ready/Pending
implement<T: Clone> Clone for Poll<T>;
implement<T> Default for Poll<T>; // = Pending
implement<T: Debug> Debug for Poll<T>; // "Ready(<t>)" | "Pending"
Algebraic laws
All pinned by core-tests/async/poll/property_test.vr:
| Law | Statement |
|---|---|
| Functor identity | p.map(|x| x) == p |
| Functor composition | p.map(g).map(f) == p.map(|x| f(g(x))) |
| Pending absorbs map | Pending.map(f) == Pending |
| Maybe round-trip | Poll.from(p.ready()) == p |
| Result arm preservation | Ready(Err(e)).map_ok(f) == Ready(Err(e)); symmetric for map_err |
| Eq reflexive / symmetric / transitive | standard equality laws on the two-variant carrier |
| Default invariance | Poll.default() = Pending for every payload type |
Pattern matching
Poll<T> is exhaustive on two arms; nest with Maybe or Result for
the common composite patterns:
fn classify(p: Poll<Result<Int, Text>>) -> Text {
match p {
Poll.Ready(Ok(_)) => "ready-ok",
Poll.Ready(Err(_)) => "ready-err",
Poll.Pending => "pending",
}
}
Future protocol
type Future is protocol {
type Output;
fn poll(&mut self, cx: &mut Context) -> Poll<Self.Output>;
}
type IntoFuture is protocol {
type Future: Future;
fn into_future(self) -> Self.Future;
}
type FutureExt is protocol extends Future {
fn map<U, F>(self, f: F) -> MapFuture<Self, F>
where F: fn(Self.Output) -> U;
fn and_then<U, F, Fut2>(self, f: F) -> AndThenFuture<Self, F, Fut2>
where F: fn(Self.Output) -> Fut2, Fut2: Future<Output = U>;
fn block(self) -> Self.Output; // block current thread
}
Factories
ready(value) -> ReadyFuture<T> // immediately completes
pending<T>() -> PendingFuture<T> // never completes
lazy(|| compute()) -> Lazy<F, T> // deferred closure
Combinators (also available on FutureExt)
join(fut1, fut2) -> Join2<Fut1, Fut2> // (O1, O2)
join3(fut1, fut2, fut3) -> Join3 // (O1, O2, O3)
join_all(futures) -> List<Output> // for List<F>
try_join(fut1, fut2) -> Result<(O1, O2), E> // fail-fast on Err
select(fut1, fut2) -> Select2 // first to complete wins
select_either(fut1, fut2) -> Either<A, B>
race(fut1, fut2) -> T // winner; loser cancelled
select_all(futures) -> SelectAllResult<T> // first + index
try_first(futures) -> SelectAllResult<Output> // first Ok
timeout(fut, duration) -> Result<T, TimeoutError>
Tasks
type TaskId is { id: UInt64 };
type Task<T> is { ... };
type JoinHandle<T> is { ... };
type JoinError is Cancelled | Panicked(PanicInfo);
spawn(future) -> JoinHandle<T> // shorthand
spawn_blocking(f) -> JoinHandle<T> // on thread pool
spawn_detached(future) -> () // fire-and-forget
yield_now() -> YieldNow // cooperate
h.abort() // cancel
h.is_finished() -> Bool
h.id() -> TaskId
h.await -> Result<T, JoinError> // via Future
JoinSet<T> — dynamic task collection
let mut set: JoinSet<Int> = JoinSet.new();
set.spawn(task_a());
set.spawn(task_b());
while let Maybe.Some(res) = set.join_next().await {
match res {
Result.Ok(value) => ...,
Result.Err(JoinError.Cancelled) => ...,
Result.Err(JoinError.Panicked(info)) => ...,
}
}
Channels
MPSC — Sender<T> / Receiver<T>
channel<T>() -> (Sender<T>, Receiver<T>) // unbounded
bounded<T>(capacity) -> (Sender<T>, Receiver<T>)
unbounded_channel<T>() // alias for channel()
bounded_channel<T>(cap) // alias for bounded()
// Sync API
tx.send(value) -> Result<(), SendError<T>> // blocks (futex) if bounded full
tx.try_send(value) -> Result<(), TrySendError<T>> // Full | Disconnected
tx.send_timeout(value, d) -> Result<(), SendError<T>> // with deadline
// Async API — awaitable with waker-based backpressure
tx.send_async(value) -> SendFut<T> // Future<Result<(), SendError<T>>>
tx.send_cancellable(value, &token) // Future; Err(Cancelled) on token fire
tx.closed() -> ChannelClosed<T> // Future<()>; completes on close
// Inspection
tx.is_closed() -> Bool // == is_disconnected
tx.is_disconnected() -> Bool
tx.capacity() -> Maybe<Int>
tx.len() -> Int
// Receiver
rx.recv() -> Maybe<T> // sync blocking
rx.recv_fut() -> RecvFut<T> // explicit async Future
rx.recv_cancellable(&token) // Future; Err(Cancelled) on token fire
rx.recv_many(&mut buf, max) -> Int // batch-drain in one lock
rx.try_recv() -> Result<T, TryRecvError> // Empty | Disconnected
// Receiver implements Stream<Item = T> — works with `for await msg in rx { ... }`
// and all StreamExt combinators (map, filter, take, etc.).
Async backpressure
For bounded channels, send_async(value).await is the idiomatic way to
apply backpressure:
- If the channel has slack — push and resolve
Ok(())immediately. - If full — register the caller's waker in
sender_wakers, returnPoll.Pending. When the receiver pops, the sender's waker fires and the future re-polls, now with space.
The blocking send() method uses the same notification path but via
futex, for non-async callers. Both paths share state; a bounded
channel is safe to use from a mix of async and blocking senders.
Cancellation integration
Both async variants accept a &CancellationToken parameter via
*_cancellable — on token fire the future resolves immediately
with Err(CancellationError) or Err(CancellableSendError.Cancelled(..)),
deregisters its waker, and returns control. Pattern:
select {
msg = rx.recv_cancellable(&shutdown).await => match msg {
Ok(Some(v)) => handle(v),
Ok(None) => return, // channel closed
Err(_) => return, // shutdown fired
},
_ = idle_timeout.await => return,
}
One-shot — oneshot<T>()
let (tx, rx) = oneshot<Result<Data, Error>>();
spawn async move { tx.send(compute()); };
let result = rx.await;
Broadcast (MPMC) — broadcast_channel
broadcast_channel<T>(capacity) -> (BroadcastSender<T>, BroadcastReceiver<T>)
broadcast_channel_with<T>(capacity, policy) -> (Sender, Receiver)
// Sender
tx.send(value) -> Result<Int, SendError<T>> // returns listener count
tx.clone() -> BroadcastSender<T> // multi-producer
tx.subscribe() -> BroadcastReceiver<T> // new receiver starting now
tx.receiver_count() -> Int
tx.sender_count() -> Int
tx.is_closed() -> Bool
tx.close()
// Receiver — implements Future AND Stream
rx.recv() -> BroadcastRecv<T> // awaitable future
rx.recv_cancellable(&token) -> Result<Result<T, RecvError>, CancellationError>
rx.try_recv() -> TryRecvResult<T> // Value/Empty/Closed/Lagged
rx.len() -> Int
rx.is_empty() -> Bool
Lag policies — LagPolicy
Controls behavior when a receiver falls behind the ring capacity:
| Variant | Semantics |
|---|---|
LagTolerant (default) | Return RecvError.Lagged(n); advance to oldest available message. |
DropOldest | Advance silently; never return Lagged. Senders never block. |
DropSlowReceiver | Unsubscribe the slow receiver entirely. For strict keep-up SLAs. |
Broadcast receivers observe every value sent after subscription; they do not see historic values.
BroadcastReceiver<T> implements both Future<Output = Result<T, RecvError>>
(direct .await) and Stream<Item = Result<T, RecvError>> (for-await loops
and combinators). Sender and receiver counts are maintained atomically;
last-sender-drop closes the channel and wakes all receivers.
Streams
type Stream is protocol {
type Item;
fn poll_next(&mut self, cx: &mut Context) -> Poll<Maybe<Self.Item>>;
}
type IntoStream is protocol { ... };
type StreamExt is protocol extends Stream { ... };
Factories
iter(iterable) -> Iter<I> // any IntoIterator
once(item) -> StreamOnce<T>
once_future(fut) -> StreamOnce<Output>
empty<T>() -> StreamEmpty
repeat(item) -> StreamRepeat<T> // infinite (T: Clone)
repeat_n(item, n) -> StreamRepeatN<T>
from_fn(|| produce_next()) -> StreamFromFn
poll_fn(|cx| ...) -> StreamFromFn
unfold(state, |s| (item, new_state)) -> StreamUnfold
interval(duration) -> Interval
Adapters (return new streams)
s.map(|x| f(x)) s.filter(|x| pred(x)) s.filter_map(|x| ...)
s.take(n) s.skip(n)
s.take_while(|x| pred) s.skip_while(|x| pred)
s.chain(other) s.zip(other) s.enumerate()
s.peekable() s.flatten() s.fuse()
s.throttle(rate) s.debounce(duration) s.chunks(n)
s.buffer_unordered(n) s.buffered(n)
s.timeout_each(duration)
Consumers (terminal)
s.next() -> Poll<Maybe<Item>>
s.try_next() -> Poll<Maybe<Result<T, E>>>
s.for_each(|x| side_effect(x))
s.fold(init, |acc, x| ...) -> B
s.reduce(|a, b| ...) -> Maybe<Item>
s.collect<C>() -> C
s.find(|x| pred) -> Maybe<Item>
s.any(|x| pred) / s.all(|x| pred)
s.count() / s.last() / s.nth(n)
s.position(|x| pred) -> Maybe<Int>
Example
async fn monitor(sensor: &Sensor) using [Logger] {
let mut s = interval(1.seconds())
.map(|_| sensor.read())
.filter(|r| r.is_ok())
.map(|r| r.unwrap())
.throttle_filter(|v| v.delta() > 0.01);
while let Maybe.Some(reading) = s.next().await {
Logger.info(&f"reading: {reading}");
}
}
Generators
fn* fibonacci() -> Int {
let (mut a, mut b) = (0, 1);
loop {
yield a;
(a, b) = (b, a + b);
}
}
for n in fibonacci().take(10) {
print(f"{n}");
}
fn* name(...) -> T— synchronous generator; returnsIterator<Item=T>.async fn* name(...) -> T— async generator; returnsAsyncIterator<Item=T>(aStream<T>).
Inside a generator:
yield value; // emit
// `return` (no value) ends the generator
Async generators support .await:
async fn* stream_events(url: &Text) -> Event using [Http] {
let mut body = Http.get_streaming(url).await?;
loop {
let chunk = body.next_chunk().await?;
for e in parse_chunk(chunk) { yield e; }
}
}
for await event in stream_events("wss://…") using [Http] {
handle(event);
}
Nursery — structured concurrency
public type NurseryOptions is {
timeout: Maybe<Duration>,
max_tasks: Maybe<Int>,
on_error: NurseryErrorBehavior, // field name matches the surface syntax
};
public type NurseryErrorBehavior is CancelAll | WaitAll | FailFast;
public type NurseryError is
| Single(Heap<Error>) // single-task failure
| Multiple(List<Heap<Error>>) // WaitAll collected multiple failures
| Timeout
| Cancelled
| Panic(PanicInfo)
| TaskLimitExceeded(Int);
// Builder
implement NurseryOptions {
public fn new() -> Self;
public fn default() -> Self; // alias for new
public fn with_timeout(self, timeout: Duration) -> Self;
public fn with_timeout_ms(self, ms: Int) -> Self;
public fn with_max_tasks(self, max: Int) -> Self;
public fn with_error_behavior(self, behavior: NurseryErrorBehavior) -> Self;
}
// Underlying async functions the nursery { ... } block lowers to:
public async fn with_nursery<T>(
options: NurseryOptions,
f: fn(&mut Nursery) -> T,
) -> Result<T, NurseryError>;
public async fn with_nursery_timeout<T>(
duration: Duration,
f: fn(&mut Nursery) -> T,
) -> Result<T, NurseryError>;
Usage
async fn fetch_batch(urls: &List<Text>) -> List<Bytes> using [Http] {
nursery(
timeout: 10.seconds(),
on_error: cancel_all,
max_tasks: 100,
) {
let handles: List<JoinHandle<Bytes>> = urls.iter()
.map(|u| spawn Http.get(u.clone()))
.collect();
try_join_all(handles).await?
} on_cancel {
metrics.increment("fetch_batch.cancelled");
} recover(e: NurseryError) {
log_error(&e);
List.new()
}
}
Guarantees
- Every spawned task completes, fails, or is cancelled before the nursery scope exits.
on_errorpolicies:cancel_all— one failure cancels all siblings.wait_all— collect all results including errors.fail_fast— return the first error immediately.
- Context stacks are inherited by spawned tasks.
Cancellation
Cooperative cancellation is implemented by core.async.cancellation.
A cancelled task continues running until it hits a cancel point — an
.await on a cancellation-aware future, an explicit throw_if_cancelled
check, or a CancelScope exit.
Core types
// Owner — only holder can cancel.
CancellationTokenSource.new()
.token() -> CancellationToken // observer handle (clone-cheap)
.cancel() // with default CancelReason.Cancelled
.cancel_with(reason)
.is_cancelled() -> Bool
.reason() -> Maybe<CancelReason>
.linked_to(&parent) // child source, propagates from parent
// Observer — read-only view.
token: CancellationToken
.is_cancelled() -> Bool
.reason() -> Maybe<CancelReason>
.throw_if_cancelled() -> Result<(), CancellationError>
.cancelled() -> CancelledFuture // Future<Output = CancelReason>
.register(fn()) -> Registration // sync callback; RAII-deregister
.child_source() -> CancellationTokenSource // propagation tree
.combine(&[t1, t2, ...]) -> CancellationToken // any-of aggregation (static fn)
.with_timeout(Duration) -> CancellationToken // auto-cancel (static fn)
.with_deadline(Instant) -> CancellationToken
.never() -> CancellationToken // sentinel; never fires
Structured reasons
type CancelReason is
| Cancelled
| Timeout { deadline: Instant }
| ParentCancelled
| Aborted(Text)
Children of a cancelled parent see ParentCancelled — not the parent's
own reason. This makes structured traceability explicit.
Awaitable integration
token.cancelled() returns a CancelledFuture that completes with the
token's CancelReason. Compose with select:
select {
r = work().await => Ok(r),
res = token.cancelled().await => Err(res),
}
The future deregisters its waker on drop; no stale wake-ups.
Sync callback bridge — Registration
let reg = token.register(|| close_file_handle(fd));
// ... do work ...
// Drop of `reg` deregisters BEFORE cancel fires. If cancel already
// happened, the callback fired synchronously inside `register()`.
Scoped cancellation — CancelScope
let scope = CancelScope.new();
let token = scope.token();
spawn worker(token.clone());
// ... work ...
// Dropping `scope` cancels `token` (unless `scope.dismiss()` was called).
Scope variants:
CancelScope.new() // auto-cancel on drop
CancelScope.linked_to(&parent_token) // child-source pattern
CancelScope.with_timeout(Duration) // auto-cancel + timeout
scope.dismiss() // opt-out of drop-cancel
scope.cancel() // explicit
Propagation rules (normative)
- Parent → child:
source.cancel()propagates to every token linked viachild_source()/linked_to(). Children fire withCancelReason.ParentCancelled. - Combine: tokens from
CancellationToken.combine(&inputs)fire when any input fires. - Idempotent: subsequent calls to
cancel_with()after a cancel are no-ops; the first call's reason wins. - Registered callbacks and wakers are drained under lock, then invoked without the lock held (no re-entrancy).
- Dropped registrations / futures deregister automatically.
Async signal subscription — core.signal
Async-aware wrapper around core.sys.signal — exposes OS signals as
awaitable futures and AsyncIterator streams for ergonomic composition
with select, nursery, and cancellation tokens.
// Wait for a single Ctrl-C
ctrl_c().await; // -> ()
// Wait for a single SIGTERM (K8s pod-eviction trigger)
terminate().await; // -> ()
// Wait for SIGHUP (reload-config convention)
hup().await; // -> ()
// Wait for any shutdown signal; returns which one fired
let sig: Signal = shutdown_signals().await; // Int | Term | Hup
// Arbitrary signal set as a Stream of arrivals
let mut stream = signal_stream(&[Signal.Usr1, Signal.Usr2]);
for await sig in stream {
handle(sig);
}
Idiomatic shutdown — race server work vs signal:
select {
_ = ctrl_c().await => drain_and_exit(),
_ = shutdown_signals().await => drain_and_exit(),
r = run_server().await => handle_result(r),
}
Architecture
Invoking any allocating or lock-taking operation from inside a POSIX
signal handler is undefined — the handler may preempt the mainline
thread mid-malloc, mid-mutex-unlock, etc. core.signal uses the
standard self-pipe / atomic-flag pattern:
- The OS-level signal handler (registered once per subscribed signal
via
core.sys.signal.on_signal) does only an async-signal-safe atomic store into aSignalFlag(set bit). - A single background poller task polls these flags every ~20 ms,
clears any set flags, and fans out to subscribers via a
BroadcastSender<Signal>. The poller runs in normal runtime context, so broadcasting and waker operations are safe.
Trade-off: up to ~20 ms signal-to-subscriber latency — acceptable for
shutdown, reload, and heartbeat use cases. A future upgrade to Linux
signalfd(2), kqueue EVFILT_SIGNAL, or Windows APC delivery will
collapse latency to zero behind the same public API.
Timers
sleep(duration) -> Sleep // await to suspend
sleep_ms(ms) sleep_secs(secs)
sleep_until(deadline: Instant) -> SleepUntil
delay(future) -> Delay<F> // delay future by a duration
timeout(future, duration) -> Timeout<F> // -> Result<T, TimeoutError>
debounce(future) -> Debounce<F> // suppress rapid calls
throttle(future) -> Throttle<F> // rate-limit
interval(duration) -> Interval // stream firing on schedule
let mut ticker = Interval.new(500.ms());
loop {
ticker.tick().await;
update_ui();
}
Runtime and executor
type RuntimeConfig is { ... };
type Runtime is { ... };
type LocalExecutor is { ... };
type TimeoutError is ();
type ExecutionEnv is { ... }; // θ+ context
Runtime.new() -> RuntimeBuilder
builder.worker_threads(n).stack_size(bytes)
.io_engine(IoEngineKind.IoUring)
.max_tasks(n)
.build() -> Runtime
rt.block_on(future) -> Output
rt.spawn(future) -> JoinHandle<T>
rt.shutdown() / rt.shutdown_timeout(duration)
rt.enter() // set current runtime for this thread
Global helpers
block_on(future) -> Output // uses default runtime
spawn(future) -> JoinHandle<T>
current_runtime() -> Maybe<&Runtime>
LocalExecutor
Single-threaded executor for !Send futures:
let exec = LocalExecutor.new();
exec.spawn_local(future);
exec.run_until(main_future);
Spawn configuration
type SpawnConfig is { ... }; // builder
type RecoveryStrategy is
| None
| Retry(RetryConfig)
| CircuitBreaker(CircuitBreakerConfig)
| Fallback(fn() -> T)
| Supervised;
type RestartPolicy is Permanent | Transient | Temporary;
type IsolationLevel is Shared | SendOnly | Full;
type Priority is Low | Normal | High | Critical;
let cfg = SpawnConfig.new()
.with_priority(Priority.High)
.with_isolation(IsolationLevel.Full)
.with_recovery(RecoveryStrategy.Retry(RetryConfig.exponential(3, 100.ms())))
.with_timeout_ms(5000)
.with_name("worker-42");
let handle = spawn_with(cfg, task());
Retry and circuit breaker
type RetryConfig is {
max_attempts: Int,
initial_backoff_ms: Int,
max_backoff_ms: Int,
backoff_factor: Float,
jitter: Bool,
};
RetryConfig.fixed(attempts, delay_ms)
RetryConfig.exponential(attempts, initial_ms)
execute_with_retry(|| call_api(), max_attempts = 3, backoff_ms = 100)
execute_with_retry_config(|| call_api(), config)
Circuit breaker
type CircuitBreakerConfig is {
failure_threshold: Int,
reset_timeout_ms: Int,
half_open_max_calls: Int,
};
type CircuitState is Closed | Open | HalfOpen;
let breaker = CircuitBreaker.new(CircuitBreakerConfig {
failure_threshold: 5,
reset_timeout_ms: 30_000,
half_open_max_calls: 1,
});
if breaker.is_call_allowed() {
match call_remote().await {
Result.Ok(v) => { breaker.record_success(); Result.Ok(v) }
Result.Err(e) => { breaker.record_failure(); Result.Err(e) }
}
} else {
Result.Err(Error.new("circuit open"))
}
Parallel helpers
Data-parallel patterns, implemented in a portable way that the work- stealing runtime can pick up.
parallel_map(items, worker_count, |x| f(x)) -> List<U>
parallel_filter_map(items, worker_count, |x| maybe_transform(x)) -> List<U>
parallel_for_each(items, worker_count, |x| side_effect(x))
parallel_reduce(items, worker_count, |a, b| combine(a, b)) -> Maybe<T>
worker_count = 0 means "default to num_cpus()".
Low-level intrinsics (intrinsics.vr)
type Executor is { ... }; // opaque handle
Executor.current() -> Maybe<Executor>
Executor.in_async_context() -> Bool
spawn_with_env(future) -> JoinHandle<T>
executor_spawn(&exec, future) -> JoinHandle<T>
executor_block_on(future) -> Output
future_poll_sync(&mut future) -> Maybe<Output> // single poll, sync
async_sleep_ms(ms) / async_sleep_ns(ns)
User code rarely touches these; they exist for runtime authors.
Waker and Context
type Waker is { ... };
type Context<'a> is { waker: &'a Waker };
type RawWaker is { ... };
type RawWakerVTable is { ... };
noop_waker() -> Waker
Context.from_waker(&waker) -> Context<'a>
waker.wake() // consume, enqueue task
waker.wake_by_ref() // enqueue without consuming
waker.clone()
Context inheritance across .await and spawn
.awaitpreserves the current context stack verbatim.spawnsnapshots the parent's context stack at spawn time.nursery { spawn ... }— tasks inherit the nursery's contexts.- Channels do not propagate contexts (they're pure data pipes).
See Language → context system for the rules.
semaphore — cooperative task limiter
mount core.async.semaphore.{AsyncSemaphore, SemaphorePermit};
let sem = AsyncSemaphore.new(10); // cap at 10 concurrent ops
for url in urls {
let permit = sem.acquire().await?;
spawn(async move {
let _p = permit; // held for task lifetime (RAII)
fetch(&url).await;
});
}
Async-task counting semaphore — waiters park via Future /
Waker instead of blocking an OS thread (unlike
core.sync.Semaphore which futex-blocks). FIFO waiter fairness;
try_acquire() non-blocking fast path; add_permits(n)
runtime resize; close() causes pending + future acquire
calls to fail with SemaphoreError.Closed.
Typical deployments:
- bounded outbound fan-out (N concurrent HTTP fetches)
- DB connection-pool checkout
- rate-limit async CPU-bound tasks (N parallel inferences)
- producer/consumer backpressure without a channel
backoff — retry delay policies
mount core.async.backoff.{Backoff, BackoffStrategy};
let mut bo = Backoff.exponential_full_jitter(
Duration.from_millis(100),
Duration.from_secs(30),
).with_max_attempts(5);
loop {
match try_operation() {
Ok(r) => return r,
Err(_) => match bo.next_delay() {
Some(d) => async_sleep(d).await,
None => return Err(MaxAttemptsReached),
},
}
}
Four industry-standard strategies:
| Strategy | Formula | Notes |
|---|---|---|
ExponentialNoJitter | base × 2^n | deterministic |
ExponentialFullJitter | rand(0, base × 2^n) | AWS default |
ExponentialDecorrelated | rand(base, prev × 3) | AWS whitepaper; best for large fleets |
FibonacciFullJitter | base × F(n+1) jittered | gentler ramp |
Overflow-guarded integer arithmetic over microseconds. Once
base × 2^attempt would overflow UInt64, the raw value
saturates at cap_us — pathological max_attempts values
plateau at the configured ceiling instead of wrapping.
See also
- sync — atomics, mutexes, condvars used by async code.
- runtime —
ExecutionEnv, supervision. - time —
Duration,Instant, time intrinsics. - Language → async & concurrency — surface syntax (
async fn,.await,spawn,nursery,select). - Architecture → runtime tiers — executor internals.