Skip to content

compute: dual-output sessions for flat_map#36526

Open
antiguru wants to merge 8 commits into
MaterializeInc:mainfrom
antiguru:flat-map-ok-err-sessions
Open

compute: dual-output sessions for flat_map#36526
antiguru wants to merge 8 commits into
MaterializeInc:mainfrom
antiguru:flat-map-ok-err-sessions

Conversation

@antiguru
Copy link
Copy Markdown
Member

@antiguru antiguru commented May 12, 2026

Motivation

The arrangement flat_map path threaded results through a single stream of Result tuples and re-split them with a downstream map_fallible("OkErr", ...).
The demux is a per-callsite operator we can drop.
The session-based shape is also a prerequisite for moving to columnar dataflow edges: the closure has to write references into the output builder, and iterators can't return references with the right lifetimes.

Description

  • PendingWork retains a capability per output; do_work writes to ok+err sessions and returns work directly.
  • flat_map_core_fallible builds a two-output operator and returns (ok_stream, err_stream).
    Fallible callers (as_collection_core, the reduce key/val selector) drop their OkErr demux.
  • flat_map_core_ok is a single-output sibling for callers that statically never emit errors.
    as_specific_collection uses it so imported arrangements don't pay for an empty err output and concat.
  • The ok output container builder is generic at each call site rather than hardcoded.
    as_specific_collection opts into CapacityContainerBuilder (1:1 cursor walk, no duplicates); as_collection_core and the reduce key/val selector keep ConsolidatingContainerBuilder (MFP and key/val fan-out can collide).
    This also unblocks plugging in a columnar ok container without changing the API.
  • Cursor walking is factored into walk_cursor, shared between the two do_work impls.

Fuel

The closure returns the number of records produced, not input tuples consumed.
This is intentional and asymmetric: a selective filter(false) MFP runs the batch to completion in one activation so the operator can drop its Batch clone and release upstream memory; a wide map(...) MFP is bounded by emit count so a single activation can't flood downstream.
REFUEL = 1_000_000 is a pragmatic compromise — no universal value exists across MFP shapes.

antiguru added 7 commits May 12, 2026 15:21
Propagate the dual-output `do_work` callback shape up through the
arrangement flat_map stack so MFP evaluation can write `Ok` and `Err`
results to separate output sessions, eliminating the downstream
`map_fallible("OkErr", ...)` and `flat_map_fallible("OkErrDemux", ...)`
demux operators in `as_collection_core` and the reduce key/value
selector path.

* `PendingWork` now retains a capability per output.
* `flat_map_core` builds a two-output operator via `OperatorBuilder`
  instead of `Stream::unary`, and returns `(ok_stream, err_stream)`.
* `ArrangementFlavor::flat_map` and `CollectionBundle::flat_map` take
  closures of the form
  `(datums, t, diff, &mut ok_session, &mut err_session) -> usize`,
  concat any MFP-derived errors with the arrangement's err collection,
  and return `(ok_stream, err_collection)`.
* `as_collection_core` pushes `Ok`/`Err` directly into the two sessions
  and no longer reconstructs and re-splits a tagged stream.
* `reduce.rs` updates its key/value selector to the new shape and drops
  its `OkErrDemux` operator.
Introduce `CollectionBundle::flat_map_core_ok` and
`ArrangementFlavor::flat_map_ok`, single-output siblings of the fallible
versions for callers that statically never emit `DataflowErrorSer`
records. Switch `as_specific_collection`'s fueled path to use it so we
no longer build a second output port, retain a per-batch err
capability, or wire an empty err stream into the downstream concat.

Cursor walking is factored into a free `walk_cursor` helper so the
fallible and Ok-only `do_work` methods share the same loop.
Visually flag fallible vs Ok-only paths at call sites so future readers
notice which one emits errors. Pure rename, no behavior change.
State explicitly that the closure returns records produced (not input
tuples consumed), and note the run-to-completion behavior when emit
returns 0 for every input.
Add a Fuel section to the public flat_map doc explaining that the
output-produced metric serves two pressures: draining inputs so the
operator releases its Batch clone (a filter(false) walks to end-of-batch
in one activation), and throttling outputs so a map producing wide rows
can't flood the next operator. The refuel constant is documented as a
pragmatic compromise; no universal value exists across MFP shapes.

Trim the inner helpers' docs to reference the public one.
Let the caller of flat_map/flat_map_ok/as_collection_core/reduce select
the ok-side container builder. Each call site now states explicitly
whether it wants consolidation (`ConsolidatingContainerBuilder`) or
plain accumulation (`CapacityContainerBuilder`); the operator no longer
hardcodes a choice deep in the stack.

* `Session<'a, 'b, T, CB>` is generic over the builder rather than
  fixed to consolidating.
* `flat_map_core_fallible`, `flat_map_core_ok`, `flat_map`, `flat_map_ok`,
  and `CollectionBundle::flat_map` carry an `OkCB: ContainerBuilder +
  PushInto<(D, T, Diff)>` generic and return
  `Stream<'scope, T, OkCB::Container>`. The err side is fixed to
  `CapacityContainerBuilder<Vec<(DataflowErrorSer, T, Diff)>>` via an
  `ErrCB<T>` alias; this restores the pre-refactor behavior where the
  removed `map_fallible("OkErr", ...)` demux used `CapacityCB`.
* Call-site choices:
  * `as_specific_collection` -> Capacity (cursor walk is 1:1).
  * `as_collection_core` -> Consolidating (MFP fan-out can collide).
  * `reduce` key/val selector -> Consolidating (multiple inputs can
    map to the same `(key, val)` at the same time).

This also opens the door for a columnar ok container in the future:
callers can plug in a builder whose `Container` is not `Vec<...>`.
Lift the repeated `refuel = 1000000` into a module-level
`const REFUEL: usize = 1_000_000`. Adds a sentence to `walk_cursor`'s
doc clarifying that the fuel check is the only bound on both inner-val
and outer-key iteration, so a `filter(false)` runs the whole batch in
one activation.
@antiguru antiguru force-pushed the flat-map-ok-err-sessions branch from f7f074c to c91ca7e Compare May 12, 2026 13:22
@antiguru antiguru marked this pull request as ready for review May 12, 2026 13:25
@antiguru antiguru requested a review from a team as a code owner May 12, 2026 13:25
Matches the convention used in src/timely-util/src/operator.rs (DCB for
the data side, ECB for the error side). The err type alias becomes ECB
even though it's currently fixed to a CapacityContainerBuilder; the
shared Session<'_, '_, T, CB> alias keeps the abstract name CB since it
is reusable for both sides.

Also shortens the turbofish call sites enough to fit under rustfmt's
100-char line limit.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant