Skip to content

compute: scaffold CollectionEdge for columnar dataflow edges#36507

Draft
antiguru wants to merge 5 commits into
MaterializeInc:mainfrom
antiguru:columnar-consumer-first
Draft

compute: scaffold CollectionEdge for columnar dataflow edges#36507
antiguru wants to merge 5 commits into
MaterializeInc:mainfrom
antiguru:columnar-consumer-first

Conversation

@antiguru
Copy link
Copy Markdown
Member

Motivation

Today's row-based dataflow edges in compute force per-record consumer handling.
Switching producers to columnar first imposes an unpack tax on every consumer that is not yet columnar-aware; this PR begins a consumer-first migration that avoids the tax by making every consumer ready to absorb columnar batches before any producer emits them.

Description

Adds CollectionEdge<'scope, T> with Vec and Columnar arms in src/compute/src/render/columnar.rs, plus a columnar_negate stub.
Scaffolding only: no producer emits the columnar variant, CollectionBundle is unchanged, and columnar arms carry todo!() bodies that land alongside the producer switch.
The design rule encoded in the API is that a columnar-to-row decode at a consumer's input is only acceptable when the consumer would have decoded Row to Datum anyway; passthrough consumers must round-trip columnar without decoding.

Introduce `CollectionEdge<'scope, T>`, an enum that lets dataflow
edges between Plan nodes carry either a row-based `VecCollection` or
a columnar `Stream<Column<(Row, T, Diff)>>`.
This is the foundation for a consumer-first migration to columnar
edges: every Plan-node consumer learns to accept either variant
before any producer emits the columnar variant, after which a single
switch flips producers end-to-end.
This commit is scaffolding only.
No producer emits the columnar variant and `CollectionBundle` is
unchanged; the columnar arms of `enter_region`, `negate`, and the
`columnar_negate` primitive carry `todo!()` bodies that will be
filled in alongside the producer switch.
The design rule baked into the API is that a columnar-to-row decode
at a consumer's input is only acceptable when the consumer would have
decoded `Row` to `Datum` anyway; pure passthrough consumers must
round-trip columnar without decoding.
There is no user-visible change and no behavioural change in this PR.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@antiguru antiguru requested a review from a team as a code owner May 11, 2026 13:11
@antiguru antiguru marked this pull request as draft May 11, 2026 13:12
antiguru and others added 4 commits May 11, 2026 15:17
Replace the `ColumnarBatch<T>` type alias with a struct
`ColumnarCollection<'scope, T, D, R>` parameterised the same way as
differential's `VecCollection<'scope, T, D, R>`, holding a
`Stream<'scope, T, Column<(D, T, R)>>` instead of a
`Stream<'scope, T, Vec<(D, T, R)>>`.
The `CollectionEdge` variants now read symmetrically:
`Vec(VecCollection<'scope, T, Row, Diff>)` and
`Columnar(ColumnarCollection<'scope, T, Row, Diff>)`.
`columnar_negate` is generalised to operate on any
`ColumnarCollection<'scope, T, D, R>`.

Also drop a doc link to a not-yet-defined `concat` method.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replace the oks side of `CollectionBundle.collection` from
`VecCollection<'scope, T, Row, Diff>` to `CollectionEdge<'scope, T>`.
The bundle now uniformly carries a `CollectionEdge` and exposes a new
`from_edge` constructor; `from_collections` is preserved as a thin
wrapper that builds a `CollectionEdge::Vec` arm.
This is the bundle-shape commit, intentionally mechanical: every
producer site still emits the `Vec` arm and every consumer either
keeps using `as_specific_collection` (which extracts the Vec arm
internally) or unwraps explicitly via the new transitional
`CollectionEdge::expect_vec` / `expect_vec_mut` helpers.
The fence sites that needed `expect_vec` are
`as_specific_collection`, the unkeyed `flat_map` branch, the three
take / store points in `ensure_collections`, the two LetRec bindings
in `render.rs`, the `log_operator_hydration` path, and the raw
collection branch in `sinks.rs`.
Each fence is a future Phase-B PR target.
Convert `Negate` to use `CollectionEdge::negate` natively rather than
going through `as_specific_collection`: it now consumes the edge,
flips diffs preserving the variant, and rewraps via `from_edge`.
This is the first consumer to handle the edge enum directly.
No behavioural change.
The `Columnar` arm of `CollectionEdge::negate` is still wired to
`columnar_negate`, which carries a `todo!()` body that will land
alongside the producer switch.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add `CollectionEdge::concat_many` and `CollectionEdge::consolidate_named`,
both preserving variant.
The columnar arms are `todo!()` until producers emit the columnar
variant.

Convert `Union` in `render.rs` to operate on `CollectionEdge` natively:
pull each input's `(edge, errs)` directly out of the bundle, concatenate
edges via `concat_many`, optionally consolidate via
`consolidate_named`, and rewrap via `from_edge`.
The error stream still goes through differential's `concatenate` since
errors stay on the row-formatted side.

Union is the second consumer to handle the edge enum directly, after
Negate.
No behavioural change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`flat_map_datums(max_demand, logic)` is the canonical unified entry
point for "decoding consumers" — operators that read `Datum`s from
each row anyway (MFP, FlatMap, joins, TopK, sinks).
The closure receives a borrowed `DatumVecBorrow`, identical to the
existing `CollectionBundle::flat_map` else-branch contract.

The Vec arm reuses `DatumVec::borrow_with_limit` over each `Row` from
the timely stream.
The Columnar arm is `todo!()`; it will iterate
`Column<(Row, T, Diff)>::borrow()` via the `Rows<_>::Index` impl
(yielding `&RowRef`) and call `DatumVec::borrow_with_limit` per row,
all without materialising an owned `Row`.

Route `CollectionBundle::flat_map`'s unkeyed branch through the new
method so the per-record decode lives in exactly one place.
This deletes one of the transitional `expect_vec` fences.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant