From ba32acea607064e31d1163bebd4ae3d8f74581f6 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Mon, 11 May 2026 14:34:35 +0200 Subject: [PATCH 1/5] compute: scaffold CollectionEdge for columnar dataflow edges Introduce `CollectionEdge<'scope, T>`, an enum that lets dataflow edges between Plan nodes carry either a row-based `VecCollection` or a columnar `Stream>`. This is the foundation for a consumer-first migration to columnar edges: every Plan-node consumer learns to accept either variant before any producer emits the columnar variant, after which a single switch flips producers end-to-end. This commit is scaffolding only. No producer emits the columnar variant and `CollectionBundle` is unchanged; the columnar arms of `enter_region`, `negate`, and the `columnar_negate` primitive carry `todo!()` bodies that will be filled in alongside the producer switch. The design rule baked into the API is that a columnar-to-row decode at a consumer's input is only acceptable when the consumer would have decoded `Row` to `Datum` anyway; pure passthrough consumers must round-trip columnar without decoding. There is no user-visible change and no behavioural change in this PR. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/compute/src/render.rs | 1 + src/compute/src/render/columnar.rs | 106 +++++++++++++++++++++++++++++ 2 files changed, 107 insertions(+) create mode 100644 src/compute/src/render/columnar.rs diff --git a/src/compute/src/render.rs b/src/compute/src/render.rs index 35f96297de254..a7d1a4eb3dba5 100644 --- a/src/compute/src/render.rs +++ b/src/compute/src/render.rs @@ -163,6 +163,7 @@ use crate::render::errors::DataflowErrorSer; use crate::row_spine::{DatumSeq, RowRowBatcher, RowRowBuilder}; use crate::typedefs::{ErrBatcher, ErrBuilder, ErrSpine, KeyBatcher, MzTimestamp}; +mod columnar; pub mod context; pub(crate) mod errors; mod flat_map; diff --git a/src/compute/src/render/columnar.rs b/src/compute/src/render/columnar.rs new file mode 100644 index 0000000000000..ac6d1f571a6f6 --- /dev/null +++ b/src/compute/src/render/columnar.rs @@ -0,0 +1,106 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! Columnar dataflow edge support. +//! +//! Defines [`CollectionEdge`], a wrapper that lets dataflow edges between Plan +//! nodes carry either row-based ([`VecCollection`]) or columnar +//! ([`mz_timely_util::columnar::Column`]) batches of `(Row, T, Diff)` updates. +//! +//! # Migration model +//! +//! The migration is consumer-first: every Plan-node consumer learns to accept +//! both variants before any producer emits the columnar variant. Once all +//! consumers are ready, a single switch flips producers to columnar end-to-end. +//! +//! Within a Plan node, operators may freely materialize Vec collections; only +//! the inter-node edge format is constrained. A decode from columnar to Vec at +//! a consumer's input is acceptable only when the consumer would have decoded +//! `Row` to [`mz_repr::Datum`] anyway. Pure passthrough consumers (Negate, +//! Union) must round-trip the columnar variant without decoding. +//! +//! See `.claude/plans/columnar_consumer_first.md` for the staged plan. + +#![allow(dead_code)] +// Phase A defines the type with `todo!()` arms that will be filled in when +// producers begin emitting columnar batches. +#![allow(clippy::todo)] + +use differential_dataflow::VecCollection; +use mz_repr::{Diff, Row}; +use mz_timely_util::columnar::Column; +use timely::dataflow::{Scope, Stream}; + +use crate::render::RenderTimestamp; + +/// Container for a columnar batch of `(Row, T, Diff)` updates traveling on a +/// compute dataflow edge. +pub type ColumnarBatch = Column<(Row, T, Diff)>; + +/// A dataflow edge carrying records as either a row-based [`VecCollection`] or +/// a columnar [`Stream`] of [`ColumnarBatch`]es. +/// +/// Producers choose a variant; consumers must accept either. Mixing variants +/// across a `concat` is rejected during the transition: see +/// [`CollectionEdge::concat`]. +pub enum CollectionEdge<'scope, T: RenderTimestamp> { + /// Row-formatted collection. Today's default for every producer. + Vec(VecCollection<'scope, T, Row, Diff>), + /// Columnar batch stream. Currently unused by any producer; reserved for + /// the producer flip at the end of the migration. + Columnar(Stream<'scope, T, ColumnarBatch>), +} + +impl<'scope, T: RenderTimestamp> CollectionEdge<'scope, T> { + /// The scope containing this edge. + pub fn scope(&self) -> Scope<'scope, T> { + match self { + CollectionEdge::Vec(c) => c.inner.scope(), + CollectionEdge::Columnar(s) => s.scope(), + } + } + + /// Brings the edge into a sub-region of its current scope. + pub fn enter_region<'inner>(self, region: Scope<'inner, T>) -> CollectionEdge<'inner, T> { + match self { + CollectionEdge::Vec(c) => CollectionEdge::Vec(c.enter_region(region)), + CollectionEdge::Columnar(_) => { + // Sub-region entry for columnar batches will be wired when the + // first producer emits this variant. + todo!("CollectionEdge::Columnar::enter_region") + } + } + } + + /// Negates the diff on every record in this edge. + /// + /// Preserves variant. The columnar arm uses [`columnar_negate`], which + /// flips the diff column without decoding the data column. + pub fn negate(self) -> Self { + match self { + CollectionEdge::Vec(c) => CollectionEdge::Vec(c.negate()), + CollectionEdge::Columnar(s) => CollectionEdge::Columnar(columnar_negate(s)), + } + } +} + +/// Negates the diff column of every batch in a columnar stream without +/// decoding the data column. +pub fn columnar_negate<'scope, T>( + stream: Stream<'scope, T, ColumnarBatch>, +) -> Stream<'scope, T, ColumnarBatch> +where + T: RenderTimestamp, +{ + // Implementation is deferred until the first producer emits the + // [`CollectionEdge::Columnar`] variant. The signature is fixed here so + // consumers can target it during the consumer-first phase. + let _ = stream; + todo!("columnar_negate: flip diff column in place over Column<(Row, T, Diff)>") +} From 359b9a60ff5be9ee289e671bf6e0787c0cf283cd Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Mon, 11 May 2026 15:17:44 +0200 Subject: [PATCH 2/5] compute: shape ColumnarCollection to mirror VecCollection Replace the `ColumnarBatch` type alias with a struct `ColumnarCollection<'scope, T, D, R>` parameterised the same way as differential's `VecCollection<'scope, T, D, R>`, holding a `Stream<'scope, T, Column<(D, T, R)>>` instead of a `Stream<'scope, T, Vec<(D, T, R)>>`. The `CollectionEdge` variants now read symmetrically: `Vec(VecCollection<'scope, T, Row, Diff>)` and `Columnar(ColumnarCollection<'scope, T, Row, Diff>)`. `columnar_negate` is generalised to operate on any `ColumnarCollection<'scope, T, D, R>`. Also drop a doc link to a not-yet-defined `concat` method. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/compute/src/render/columnar.rs | 55 ++++++++++++++++++------------ 1 file changed, 34 insertions(+), 21 deletions(-) diff --git a/src/compute/src/render/columnar.rs b/src/compute/src/render/columnar.rs index ac6d1f571a6f6..4b76e33d5a2a0 100644 --- a/src/compute/src/render/columnar.rs +++ b/src/compute/src/render/columnar.rs @@ -11,7 +11,7 @@ //! //! Defines [`CollectionEdge`], a wrapper that lets dataflow edges between Plan //! nodes carry either row-based ([`VecCollection`]) or columnar -//! ([`mz_timely_util::columnar::Column`]) batches of `(Row, T, Diff)` updates. +//! ([`ColumnarCollection`]) batches of `(D, T, R)` updates. //! //! # Migration model //! @@ -32,29 +32,41 @@ // producers begin emitting columnar batches. #![allow(clippy::todo)] +use columnar::Columnar; use differential_dataflow::VecCollection; use mz_repr::{Diff, Row}; use mz_timely_util::columnar::Column; use timely::dataflow::{Scope, Stream}; +use timely::progress::Timestamp; use crate::render::RenderTimestamp; -/// Container for a columnar batch of `(Row, T, Diff)` updates traveling on a -/// compute dataflow edge. -pub type ColumnarBatch = Column<(Row, T, Diff)>; +/// A columnar collection of `(D, T, R)` updates traveling on a compute +/// dataflow edge. +/// +/// Mirrors differential's [`VecCollection<'scope, T, D, R>`] shape and +/// parameters; the underlying container is [`Column<(D, T, R)>`] instead of +/// `Vec<(D, T, R)>`. +pub struct ColumnarCollection<'scope, T, D, R> +where + T: Timestamp, + (D, T, R): Columnar, +{ + /// The underlying timely stream of columnar batches. + pub inner: Stream<'scope, T, Column<(D, T, R)>>, +} /// A dataflow edge carrying records as either a row-based [`VecCollection`] or -/// a columnar [`Stream`] of [`ColumnarBatch`]es. +/// a [`ColumnarCollection`]. /// /// Producers choose a variant; consumers must accept either. Mixing variants -/// across a `concat` is rejected during the transition: see -/// [`CollectionEdge::concat`]. +/// across a `concat` is rejected during the transition. pub enum CollectionEdge<'scope, T: RenderTimestamp> { /// Row-formatted collection. Today's default for every producer. Vec(VecCollection<'scope, T, Row, Diff>), - /// Columnar batch stream. Currently unused by any producer; reserved for - /// the producer flip at the end of the migration. - Columnar(Stream<'scope, T, ColumnarBatch>), + /// Columnar collection. Currently unused by any producer; reserved for the + /// producer flip at the end of the migration. + Columnar(ColumnarCollection<'scope, T, Row, Diff>), } impl<'scope, T: RenderTimestamp> CollectionEdge<'scope, T> { @@ -62,7 +74,7 @@ impl<'scope, T: RenderTimestamp> CollectionEdge<'scope, T> { pub fn scope(&self) -> Scope<'scope, T> { match self { CollectionEdge::Vec(c) => c.inner.scope(), - CollectionEdge::Columnar(s) => s.scope(), + CollectionEdge::Columnar(c) => c.inner.scope(), } } @@ -71,8 +83,8 @@ impl<'scope, T: RenderTimestamp> CollectionEdge<'scope, T> { match self { CollectionEdge::Vec(c) => CollectionEdge::Vec(c.enter_region(region)), CollectionEdge::Columnar(_) => { - // Sub-region entry for columnar batches will be wired when the - // first producer emits this variant. + // Sub-region entry for columnar collections will be wired when + // the first producer emits this variant. todo!("CollectionEdge::Columnar::enter_region") } } @@ -85,22 +97,23 @@ impl<'scope, T: RenderTimestamp> CollectionEdge<'scope, T> { pub fn negate(self) -> Self { match self { CollectionEdge::Vec(c) => CollectionEdge::Vec(c.negate()), - CollectionEdge::Columnar(s) => CollectionEdge::Columnar(columnar_negate(s)), + CollectionEdge::Columnar(c) => CollectionEdge::Columnar(columnar_negate(c)), } } } -/// Negates the diff column of every batch in a columnar stream without +/// Negates the diff column of every batch in a [`ColumnarCollection`] without /// decoding the data column. -pub fn columnar_negate<'scope, T>( - stream: Stream<'scope, T, ColumnarBatch>, -) -> Stream<'scope, T, ColumnarBatch> +pub fn columnar_negate<'scope, T, D, R>( + collection: ColumnarCollection<'scope, T, D, R>, +) -> ColumnarCollection<'scope, T, D, R> where - T: RenderTimestamp, + T: Timestamp, + (D, T, R): Columnar, { // Implementation is deferred until the first producer emits the // [`CollectionEdge::Columnar`] variant. The signature is fixed here so // consumers can target it during the consumer-first phase. - let _ = stream; - todo!("columnar_negate: flip diff column in place over Column<(Row, T, Diff)>") + let _ = collection; + todo!("columnar_negate: flip diff column in place over Column<(D, T, R)>") } From a237d05bcdbe81ea9a66fea7e032f8adb6a7b633 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Mon, 11 May 2026 15:43:14 +0200 Subject: [PATCH 3/5] compute: swap CollectionBundle to CollectionEdge; convert Negate Replace the oks side of `CollectionBundle.collection` from `VecCollection<'scope, T, Row, Diff>` to `CollectionEdge<'scope, T>`. The bundle now uniformly carries a `CollectionEdge` and exposes a new `from_edge` constructor; `from_collections` is preserved as a thin wrapper that builds a `CollectionEdge::Vec` arm. This is the bundle-shape commit, intentionally mechanical: every producer site still emits the `Vec` arm and every consumer either keeps using `as_specific_collection` (which extracts the Vec arm internally) or unwraps explicitly via the new transitional `CollectionEdge::expect_vec` / `expect_vec_mut` helpers. The fence sites that needed `expect_vec` are `as_specific_collection`, the unkeyed `flat_map` branch, the three take / store points in `ensure_collections`, the two LetRec bindings in `render.rs`, the `log_operator_hydration` path, and the raw collection branch in `sinks.rs`. Each fence is a future Phase-B PR target. Convert `Negate` to use `CollectionEdge::negate` natively rather than going through `as_specific_collection`: it now consumes the edge, flips diffs preserving the variant, and rewraps via `from_edge`. This is the first consumer to handle the edge enum directly. No behavioural change. The `Columnar` arm of `CollectionEdge::negate` is still wired to `columnar_negate`, which carries a `todo!()` body that will land alongside the producer switch. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/compute/src/render.rs | 16 +++++++--- src/compute/src/render/columnar.rs | 49 ++++++++++++++++++++++++++++++ src/compute/src/render/context.rs | 30 +++++++++++++----- src/compute/src/render/sinks.rs | 4 +-- 4 files changed, 84 insertions(+), 15 deletions(-) diff --git a/src/compute/src/render.rs b/src/compute/src/render.rs index a7d1a4eb3dba5..50f24eeb134da 100644 --- a/src/compute/src/render.rs +++ b/src/compute/src/render.rs @@ -163,7 +163,7 @@ use crate::render::errors::DataflowErrorSer; use crate::row_spine::{DatumSeq, RowRowBatcher, RowRowBuilder}; use crate::typedefs::{ErrBatcher, ErrBuilder, ErrSpine, KeyBatcher, MzTimestamp}; -mod columnar; +pub(crate) mod columnar; pub mod context; pub(crate) mod errors; mod flat_map; @@ -911,6 +911,7 @@ impl<'scope> Context<'scope, Product>> { // We need to ensure that the raw collection exists, but do not have enough information // here to cause that to happen. let (oks, mut err) = bundle.collection.clone().unwrap(); + let oks = oks.expect_vec(); self.insert_id(Id::Local(id), bundle); let (oks_v, err_v) = variables.remove(&Id::Local(id)).unwrap(); @@ -963,6 +964,7 @@ impl<'scope> Context<'scope, Product>> { for id in rec_ids.into_iter() { let bundle = self.remove_id(Id::Local(id)).unwrap(); let (oks, err) = bundle.collection.unwrap(); + let oks = oks.expect_vec(); self.insert_id( Id::Local(id), CollectionBundle::from_collections( @@ -1276,8 +1278,11 @@ impl<'scope, T: RenderTimestamp + MaybeBucketByTime> Context<'scope, T> { } Negate { input } => { let input = expect_input(input); - let (oks, errs) = input.as_specific_collection(None, &self.config_set); - CollectionBundle::from_collections(oks.negate(), errs) + let (oks, errs) = input + .collection + .clone() + .expect("Negate input must be an unarranged collection"); + CollectionBundle::from_edge(oks.negate(), errs) } Threshold { input, @@ -1382,8 +1387,9 @@ impl<'scope, T: RenderTimestamp + MaybeBucketByTime> Context<'scope, T> { .collection .as_mut() .expect("CollectionBundle invariant"); - let stream = self.log_operator_hydration_inner(oks.inner.clone(), lir_id); - *oks = stream.as_collection(); + let oks_vec = oks.expect_vec_mut(); + let stream = self.log_operator_hydration_inner(oks_vec.inner.clone(), lir_id); + *oks_vec = stream.as_collection(); } } } diff --git a/src/compute/src/render/columnar.rs b/src/compute/src/render/columnar.rs index 4b76e33d5a2a0..d64cef9dc84aa 100644 --- a/src/compute/src/render/columnar.rs +++ b/src/compute/src/render/columnar.rs @@ -56,11 +56,24 @@ where pub inner: Stream<'scope, T, Column<(D, T, R)>>, } +impl<'scope, T, D, R> Clone for ColumnarCollection<'scope, T, D, R> +where + T: Timestamp, + (D, T, R): Columnar, +{ + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + /// A dataflow edge carrying records as either a row-based [`VecCollection`] or /// a [`ColumnarCollection`]. /// /// Producers choose a variant; consumers must accept either. Mixing variants /// across a `concat` is rejected during the transition. +#[derive(Clone)] pub enum CollectionEdge<'scope, T: RenderTimestamp> { /// Row-formatted collection. Today's default for every producer. Vec(VecCollection<'scope, T, Row, Diff>), @@ -90,6 +103,42 @@ impl<'scope, T: RenderTimestamp> CollectionEdge<'scope, T> { } } + /// Leaves a sub-region back to the outer scope. + pub fn leave_region<'outer>(self, outer: Scope<'outer, T>) -> CollectionEdge<'outer, T> { + match self { + CollectionEdge::Vec(c) => CollectionEdge::Vec(c.leave_region(outer)), + CollectionEdge::Columnar(_) => { + todo!("CollectionEdge::Columnar::leave_region") + } + } + } + + /// Extracts the [`VecCollection`] arm, panicking on the columnar arm. + /// + /// This is a transitional fence used at consumer sites that have not yet + /// been converted to handle the columnar arm natively. Each Phase-B + /// consumer PR removes one call. + pub fn expect_vec(self) -> VecCollection<'scope, T, Row, Diff> { + match self { + CollectionEdge::Vec(c) => c, + CollectionEdge::Columnar(_) => panic!( + "CollectionEdge::expect_vec called on columnar arm; consumer must convert first" + ), + } + } + + /// Borrows the [`VecCollection`] arm mutably, panicking on the columnar arm. + /// + /// Transitional fence; see [`Self::expect_vec`]. + pub fn expect_vec_mut(&mut self) -> &mut VecCollection<'scope, T, Row, Diff> { + match self { + CollectionEdge::Vec(c) => c, + CollectionEdge::Columnar(_) => panic!( + "CollectionEdge::expect_vec_mut called on columnar arm; consumer must convert first" + ), + } + } + /// Negates the diff on every record in this edge. /// /// Preserves variant. The columnar arm uses [`columnar_negate`], which diff --git a/src/compute/src/render/context.rs b/src/compute/src/render/context.rs index e9f1c2341bf42..b5ef16af5c6f3 100644 --- a/src/compute/src/render/context.rs +++ b/src/compute/src/render/context.rs @@ -44,6 +44,7 @@ use timely::progress::{Antichain, Timestamp}; use crate::compute_state::ComputeState; use crate::extensions::arrange::{KeyCollection, MzArrange, MzArrangeCore}; +use crate::render::columnar::CollectionEdge; use crate::render::errors::{DataflowErrorSer, ErrorLogger}; use crate::render::{LinearJoinSpec, MaybeBucketByTime, RenderTimestamp}; use crate::row_spine::{DatumSeq, RowRowBuilder}; @@ -363,7 +364,7 @@ impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> { #[derive(Clone)] pub struct CollectionBundle<'scope, T: RenderTimestamp> { pub collection: Option<( - VecCollection<'scope, T, Row, Diff>, + CollectionEdge<'scope, T>, VecCollection<'scope, T, DataflowErrorSer, Diff>, )>, pub arranged: BTreeMap, ArrangementFlavor<'scope, T>>, @@ -374,6 +375,14 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { pub fn from_collections( oks: VecCollection<'scope, T, Row, Diff>, errs: VecCollection<'scope, T, DataflowErrorSer, Diff>, + ) -> Self { + Self::from_edge(CollectionEdge::Vec(oks), errs) + } + + /// Construct a new collection bundle from a [`CollectionEdge`] and an error stream. + pub fn from_edge( + oks: CollectionEdge<'scope, T>, + errs: VecCollection<'scope, T, DataflowErrorSer, Diff>, ) -> Self { Self { collection: Some((oks, errs)), @@ -409,7 +418,7 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { /// The scope containing the collection bundle. pub fn scope(&self) -> Scope<'scope, T> { if let Some((oks, _errs)) = &self.collection { - oks.inner.scope() + oks.scope() } else { self.arranged .values() @@ -483,10 +492,13 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { // // If it doesn't, we panic. match key { - None => self - .collection - .clone() - .expect("The unarranged collection doesn't exist."), + None => { + let (oks, errs) = self + .collection + .clone() + .expect("The unarranged collection doesn't exist."); + (oks.expect_vec(), errs) + } Some(key) => { let arranged = self.arranged.get(key).unwrap_or_else(|| { panic!("The collection arranged by {:?} doesn't exist.", key) @@ -547,6 +559,7 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { .collection .clone() .expect("Invariant violated: CollectionBundle contains no collection."); + let oks = oks.expect_vec(); let mut datums = DatumVec::new(); let oks = oks.inner.flat_map(move |(v, t, d)| { logic(&mut datums.borrow_with_limit(&v, max_demand), t, d) @@ -797,7 +810,7 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { } else { oks }; - self.collection = Some((oks, errs)); + self.collection = Some((CollectionEdge::Vec(oks), errs)); } for (key, _, thinning) in collections.arranged { if !self.arranged.contains_key(&key) { @@ -808,6 +821,7 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { .collection .take() .expect("Collection constructed above"); + let oks = oks.expect_vec(); // Apply temporal bucketing if the collection already existed on // the bundle (e.g., from an upstream temporal Mfp or Get) and we // haven't bucketed yet. This is the common path for temporal-MFP @@ -828,7 +842,7 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { let (oks, errs_keyed, passthrough) = Self::arrange_collection(&name, oks, key.clone(), thinning.clone()); let errs_concat: KeyCollection<_, _, _> = errs.clone().concat(errs_keyed).into(); - self.collection = Some((passthrough, errs)); + self.collection = Some((CollectionEdge::Vec(passthrough), errs)); let errs = errs_concat.mz_arrange::, ErrBuilder<_, _>, ErrSpine<_, _>>( &format!("{}-errors", name), diff --git a/src/compute/src/render/sinks.rs b/src/compute/src/render/sinks.rs index cde32674ab814..2feb3776ec2cf 100644 --- a/src/compute/src/render/sinks.rs +++ b/src/compute/src/render/sinks.rs @@ -66,8 +66,8 @@ impl<'g, T: RenderTimestamp> Context<'g, T> { let bundle = self .lookup_id(mz_expr::Id::Global(sink.from)) .expect("Sink source collection not loaded"); - let (ok_collection, mut err_collection) = if let Some(collection) = &bundle.collection { - collection.clone() + let (ok_collection, mut err_collection) = if let Some((oks, errs)) = &bundle.collection { + (oks.clone().expect_vec(), errs.clone()) } else { let (key, _arrangement) = bundle .arranged From 51b7a2a4ed6378e6f6b285ca45c33f6e67108007 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Mon, 11 May 2026 15:49:59 +0200 Subject: [PATCH 4/5] compute: convert Union to CollectionEdge::concat_many Add `CollectionEdge::concat_many` and `CollectionEdge::consolidate_named`, both preserving variant. The columnar arms are `todo!()` until producers emit the columnar variant. Convert `Union` in `render.rs` to operate on `CollectionEdge` natively: pull each input's `(edge, errs)` directly out of the bundle, concatenate edges via `concat_many`, optionally consolidate via `consolidate_named`, and rewrap via `from_edge`. The error stream still goes through differential's `concatenate` since errors stay on the row-formatted side. Union is the second consumer to handle the edge enum directly, after Negate. No behavioural change. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/compute/src/render.rs | 21 ++++++++--------- src/compute/src/render/columnar.rs | 36 ++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 10 deletions(-) diff --git a/src/compute/src/render.rs b/src/compute/src/render.rs index 50f24eeb134da..6d662cc3edb68 100644 --- a/src/compute/src/render.rs +++ b/src/compute/src/render.rs @@ -1298,20 +1298,21 @@ impl<'scope, T: RenderTimestamp + MaybeBucketByTime> Context<'scope, T> { let mut oks = Vec::new(); let mut errs = Vec::new(); for input in inputs.into_iter() { - let (os, es) = - expect_input(input).as_specific_collection(None, &self.config_set); + let (os, es) = expect_input(input) + .collection + .clone() + .expect("Union input must be an unarranged collection"); oks.push(os); errs.push(es); } - let mut oks = differential_dataflow::collection::concatenate(self.scope, oks); - if consolidate_output { - oks = CollectionExt::consolidate_named::>( - oks, - "UnionConsolidation", - ) - } + let oks = crate::render::columnar::CollectionEdge::concat_many(self.scope, oks); + let oks = if consolidate_output { + oks.consolidate_named("UnionConsolidation") + } else { + oks + }; let errs = differential_dataflow::collection::concatenate(self.scope, errs); - CollectionBundle::from_collections(oks, errs) + CollectionBundle::from_edge(oks, errs) } ArrangeBy { input_key, diff --git a/src/compute/src/render/columnar.rs b/src/compute/src/render/columnar.rs index d64cef9dc84aa..2eb57c0447d3b 100644 --- a/src/compute/src/render/columnar.rs +++ b/src/compute/src/render/columnar.rs @@ -36,10 +36,12 @@ use columnar::Columnar; use differential_dataflow::VecCollection; use mz_repr::{Diff, Row}; use mz_timely_util::columnar::Column; +use mz_timely_util::operator::CollectionExt; use timely::dataflow::{Scope, Stream}; use timely::progress::Timestamp; use crate::render::RenderTimestamp; +use crate::typedefs::KeyBatcher; /// A columnar collection of `(D, T, R)` updates traveling on a compute /// dataflow edge. @@ -149,6 +151,40 @@ impl<'scope, T: RenderTimestamp> CollectionEdge<'scope, T> { CollectionEdge::Columnar(c) => CollectionEdge::Columnar(columnar_negate(c)), } } + + /// Concatenates a collection of edges that all share the same variant. + /// + /// Mixing variants is rejected during the transition. + pub fn concat_many(scope: Scope<'scope, T>, edges: I) -> Self + where + I: IntoIterator, + { + let mut vec_arm = Vec::new(); + for edge in edges { + match edge { + CollectionEdge::Vec(c) => vec_arm.push(c), + CollectionEdge::Columnar(_) => { + // No producer emits the columnar arm yet; once one does, + // this branch must either concatenate columnar streams or + // reject mixed-variant inputs explicitly. + todo!("CollectionEdge::concat_many: columnar arm"); + } + } + } + CollectionEdge::Vec(differential_dataflow::collection::concatenate(scope, vec_arm)) + } + + /// Consolidates updates in the edge, preserving variant. + pub fn consolidate_named(self, name: &str) -> Self { + match self { + CollectionEdge::Vec(c) => { + CollectionEdge::Vec(CollectionExt::consolidate_named::>(c, name)) + } + CollectionEdge::Columnar(_) => { + todo!("CollectionEdge::Columnar::consolidate_named") + } + } + } } /// Negates the diff column of every batch in a [`ColumnarCollection`] without From 6b23b77350f7d35b62730f9ae4cdde40f28b7bd3 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Mon, 11 May 2026 16:03:55 +0200 Subject: [PATCH 5/5] compute: add CollectionEdge::flat_map_datums MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `flat_map_datums(max_demand, logic)` is the canonical unified entry point for "decoding consumers" — operators that read `Datum`s from each row anyway (MFP, FlatMap, joins, TopK, sinks). The closure receives a borrowed `DatumVecBorrow`, identical to the existing `CollectionBundle::flat_map` else-branch contract. The Vec arm reuses `DatumVec::borrow_with_limit` over each `Row` from the timely stream. The Columnar arm is `todo!()`; it will iterate `Column<(Row, T, Diff)>::borrow()` via the `Rows<_>::Index` impl (yielding `&RowRef`) and call `DatumVec::borrow_with_limit` per row, all without materialising an owned `Row`. Route `CollectionBundle::flat_map`'s unkeyed branch through the new method so the per-record decode lives in exactly one place. This deletes one of the transitional `expect_vec` fences. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/compute/src/render/columnar.rs | 44 ++++++++++++++++++++++++++++-- src/compute/src/render/context.rs | 10 ++----- 2 files changed, 43 insertions(+), 11 deletions(-) diff --git a/src/compute/src/render/columnar.rs b/src/compute/src/render/columnar.rs index 2eb57c0447d3b..1d67e523d11dc 100644 --- a/src/compute/src/render/columnar.rs +++ b/src/compute/src/render/columnar.rs @@ -33,11 +33,11 @@ #![allow(clippy::todo)] use columnar::Columnar; -use differential_dataflow::VecCollection; -use mz_repr::{Diff, Row}; +use differential_dataflow::{Data, VecCollection}; +use mz_repr::{DatumVec, DatumVecBorrow, Diff, Row}; use mz_timely_util::columnar::Column; use mz_timely_util::operator::CollectionExt; -use timely::dataflow::{Scope, Stream}; +use timely::dataflow::{Scope, Stream, StreamVec}; use timely::progress::Timestamp; use crate::render::RenderTimestamp; @@ -174,6 +174,44 @@ impl<'scope, T: RenderTimestamp> CollectionEdge<'scope, T> { CollectionEdge::Vec(differential_dataflow::collection::concatenate(scope, vec_arm)) } + /// Applies `logic` to each record in this edge, exposing the record as a + /// borrowed [`DatumVecBorrow`]. + /// + /// `max_demand` bounds the number of columns decoded per row; pass + /// `usize::MAX` to decode all columns. + /// + /// This is the canonical unified entry point for "decoding consumers" + /// (operators that read [`mz_repr::Datum`]s from each row anyway). The + /// Vec arm uses [`DatumVec::borrow_with_limit`] on each [`Row`]; the + /// Columnar arm iterates the columnar batch directly without going + /// through an owned [`Row`]. + pub fn flat_map_datums( + self, + max_demand: usize, + mut logic: L, + ) -> StreamVec<'scope, T, I::Item> + where + I: IntoIterator, + D: Data, + L: for<'a> FnMut(&'a mut DatumVecBorrow<'_>, T, Diff) -> I + 'static, + { + match self { + CollectionEdge::Vec(c) => { + use timely::dataflow::operators::vec::Map; + let mut datums = DatumVec::new(); + c.inner.flat_map(move |(v, t, d)| { + logic(&mut datums.borrow_with_limit(&v, max_demand), t, d) + }) + } + CollectionEdge::Columnar(_) => { + // Implementation will iterate `Column<(Row, T, Diff)>::borrow()` + // via the `Rows<_>::Index` impl (yielding `&RowRef`) and call + // `DatumVec::borrow_with_limit(row_ref, max_demand)` per row. + todo!("CollectionEdge::flat_map_datums: columnar arm") + } + } + } + /// Consolidates updates in the edge, preserving variant. pub fn consolidate_named(self, name: &str) -> Self { match self { diff --git a/src/compute/src/render/context.rs b/src/compute/src/render/context.rs index b5ef16af5c6f3..9043bb71a57a4 100644 --- a/src/compute/src/render/context.rs +++ b/src/compute/src/render/context.rs @@ -536,7 +536,7 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { &self, key_val: Option<(Vec, Option)>, max_demand: usize, - mut logic: L, + logic: L, ) -> ( StreamVec<'scope, T, I::Item>, VecCollection<'scope, T, DataflowErrorSer, Diff>, @@ -554,17 +554,11 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { .expect("Should have ensured during planning that this arrangement exists.") .flat_map(val.as_ref(), max_demand, logic) } else { - use timely::dataflow::operators::vec::Map; let (oks, errs) = self .collection .clone() .expect("Invariant violated: CollectionBundle contains no collection."); - let oks = oks.expect_vec(); - let mut datums = DatumVec::new(); - let oks = oks.inner.flat_map(move |(v, t, d)| { - logic(&mut datums.borrow_with_limit(&v, max_demand), t, d) - }); - (oks, errs) + (oks.flat_map_datums(max_demand, logic), errs) } }