compute: dual-output sessions for flat_map#36526
Open
antiguru wants to merge 8 commits into
Open
Conversation
Propagate the dual-output `do_work` callback shape up through the
arrangement flat_map stack so MFP evaluation can write `Ok` and `Err`
results to separate output sessions, eliminating the downstream
`map_fallible("OkErr", ...)` and `flat_map_fallible("OkErrDemux", ...)`
demux operators in `as_collection_core` and the reduce key/value
selector path.
* `PendingWork` now retains a capability per output.
* `flat_map_core` builds a two-output operator via `OperatorBuilder`
instead of `Stream::unary`, and returns `(ok_stream, err_stream)`.
* `ArrangementFlavor::flat_map` and `CollectionBundle::flat_map` take
closures of the form
`(datums, t, diff, &mut ok_session, &mut err_session) -> usize`,
concat any MFP-derived errors with the arrangement's err collection,
and return `(ok_stream, err_collection)`.
* `as_collection_core` pushes `Ok`/`Err` directly into the two sessions
and no longer reconstructs and re-splits a tagged stream.
* `reduce.rs` updates its key/value selector to the new shape and drops
its `OkErrDemux` operator.
Introduce `CollectionBundle::flat_map_core_ok` and `ArrangementFlavor::flat_map_ok`, single-output siblings of the fallible versions for callers that statically never emit `DataflowErrorSer` records. Switch `as_specific_collection`'s fueled path to use it so we no longer build a second output port, retain a per-batch err capability, or wire an empty err stream into the downstream concat. Cursor walking is factored into a free `walk_cursor` helper so the fallible and Ok-only `do_work` methods share the same loop.
Visually flag fallible vs Ok-only paths at call sites so future readers notice which one emits errors. Pure rename, no behavior change.
State explicitly that the closure returns records produced (not input tuples consumed), and note the run-to-completion behavior when emit returns 0 for every input.
Add a Fuel section to the public flat_map doc explaining that the output-produced metric serves two pressures: draining inputs so the operator releases its Batch clone (a filter(false) walks to end-of-batch in one activation), and throttling outputs so a map producing wide rows can't flood the next operator. The refuel constant is documented as a pragmatic compromise; no universal value exists across MFP shapes. Trim the inner helpers' docs to reference the public one.
Let the caller of flat_map/flat_map_ok/as_collection_core/reduce select
the ok-side container builder. Each call site now states explicitly
whether it wants consolidation (`ConsolidatingContainerBuilder`) or
plain accumulation (`CapacityContainerBuilder`); the operator no longer
hardcodes a choice deep in the stack.
* `Session<'a, 'b, T, CB>` is generic over the builder rather than
fixed to consolidating.
* `flat_map_core_fallible`, `flat_map_core_ok`, `flat_map`, `flat_map_ok`,
and `CollectionBundle::flat_map` carry an `OkCB: ContainerBuilder +
PushInto<(D, T, Diff)>` generic and return
`Stream<'scope, T, OkCB::Container>`. The err side is fixed to
`CapacityContainerBuilder<Vec<(DataflowErrorSer, T, Diff)>>` via an
`ErrCB<T>` alias; this restores the pre-refactor behavior where the
removed `map_fallible("OkErr", ...)` demux used `CapacityCB`.
* Call-site choices:
* `as_specific_collection` -> Capacity (cursor walk is 1:1).
* `as_collection_core` -> Consolidating (MFP fan-out can collide).
* `reduce` key/val selector -> Consolidating (multiple inputs can
map to the same `(key, val)` at the same time).
This also opens the door for a columnar ok container in the future:
callers can plug in a builder whose `Container` is not `Vec<...>`.
Lift the repeated `refuel = 1000000` into a module-level `const REFUEL: usize = 1_000_000`. Adds a sentence to `walk_cursor`'s doc clarifying that the fuel check is the only bound on both inner-val and outer-key iteration, so a `filter(false)` runs the whole batch in one activation.
f7f074c to
c91ca7e
Compare
Matches the convention used in src/timely-util/src/operator.rs (DCB for the data side, ECB for the error side). The err type alias becomes ECB even though it's currently fixed to a CapacityContainerBuilder; the shared Session<'_, '_, T, CB> alias keeps the abstract name CB since it is reusable for both sides. Also shortens the turbofish call sites enough to fit under rustfmt's 100-char line limit.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
The arrangement flat_map path threaded results through a single stream of
Resulttuples and re-split them with a downstreammap_fallible("OkErr", ...).The demux is a per-callsite operator we can drop.
The session-based shape is also a prerequisite for moving to columnar dataflow edges: the closure has to write references into the output builder, and iterators can't return references with the right lifetimes.
Description
PendingWorkretains a capability per output;do_workwrites to ok+err sessions and returns work directly.flat_map_core_falliblebuilds a two-output operator and returns(ok_stream, err_stream).Fallible callers (
as_collection_core, the reduce key/val selector) drop theirOkErrdemux.flat_map_core_okis a single-output sibling for callers that statically never emit errors.as_specific_collectionuses it so imported arrangements don't pay for an empty err output and concat.as_specific_collectionopts intoCapacityContainerBuilder(1:1 cursor walk, no duplicates);as_collection_coreand the reduce key/val selector keepConsolidatingContainerBuilder(MFP and key/val fan-out can collide).This also unblocks plugging in a columnar ok container without changing the API.
walk_cursor, shared between the twodo_workimpls.Fuel
The closure returns the number of records produced, not input tuples consumed.
This is intentional and asymmetric: a selective
filter(false)MFP runs the batch to completion in one activation so the operator can drop itsBatchclone and release upstream memory; a widemap(...)MFP is bounded by emit count so a single activation can't flood downstream.REFUEL = 1_000_000is a pragmatic compromise — no universal value exists across MFP shapes.