diff --git a/src/compute/src/render.rs b/src/compute/src/render.rs index 35f96297de254..6d662cc3edb68 100644 --- a/src/compute/src/render.rs +++ b/src/compute/src/render.rs @@ -163,6 +163,7 @@ use crate::render::errors::DataflowErrorSer; use crate::row_spine::{DatumSeq, RowRowBatcher, RowRowBuilder}; use crate::typedefs::{ErrBatcher, ErrBuilder, ErrSpine, KeyBatcher, MzTimestamp}; +pub(crate) mod columnar; pub mod context; pub(crate) mod errors; mod flat_map; @@ -910,6 +911,7 @@ impl<'scope> Context<'scope, Product>> { // We need to ensure that the raw collection exists, but do not have enough information // here to cause that to happen. let (oks, mut err) = bundle.collection.clone().unwrap(); + let oks = oks.expect_vec(); self.insert_id(Id::Local(id), bundle); let (oks_v, err_v) = variables.remove(&Id::Local(id)).unwrap(); @@ -962,6 +964,7 @@ impl<'scope> Context<'scope, Product>> { for id in rec_ids.into_iter() { let bundle = self.remove_id(Id::Local(id)).unwrap(); let (oks, err) = bundle.collection.unwrap(); + let oks = oks.expect_vec(); self.insert_id( Id::Local(id), CollectionBundle::from_collections( @@ -1275,8 +1278,11 @@ impl<'scope, T: RenderTimestamp + MaybeBucketByTime> Context<'scope, T> { } Negate { input } => { let input = expect_input(input); - let (oks, errs) = input.as_specific_collection(None, &self.config_set); - CollectionBundle::from_collections(oks.negate(), errs) + let (oks, errs) = input + .collection + .clone() + .expect("Negate input must be an unarranged collection"); + CollectionBundle::from_edge(oks.negate(), errs) } Threshold { input, @@ -1292,20 +1298,21 @@ impl<'scope, T: RenderTimestamp + MaybeBucketByTime> Context<'scope, T> { let mut oks = Vec::new(); let mut errs = Vec::new(); for input in inputs.into_iter() { - let (os, es) = - expect_input(input).as_specific_collection(None, &self.config_set); + let (os, es) = expect_input(input) + .collection + .clone() + .expect("Union input must be an unarranged collection"); oks.push(os); errs.push(es); } - let mut oks = differential_dataflow::collection::concatenate(self.scope, oks); - if consolidate_output { - oks = CollectionExt::consolidate_named::>( - oks, - "UnionConsolidation", - ) - } + let oks = crate::render::columnar::CollectionEdge::concat_many(self.scope, oks); + let oks = if consolidate_output { + oks.consolidate_named("UnionConsolidation") + } else { + oks + }; let errs = differential_dataflow::collection::concatenate(self.scope, errs); - CollectionBundle::from_collections(oks, errs) + CollectionBundle::from_edge(oks, errs) } ArrangeBy { input_key, @@ -1381,8 +1388,9 @@ impl<'scope, T: RenderTimestamp + MaybeBucketByTime> Context<'scope, T> { .collection .as_mut() .expect("CollectionBundle invariant"); - let stream = self.log_operator_hydration_inner(oks.inner.clone(), lir_id); - *oks = stream.as_collection(); + let oks_vec = oks.expect_vec_mut(); + let stream = self.log_operator_hydration_inner(oks_vec.inner.clone(), lir_id); + *oks_vec = stream.as_collection(); } } } diff --git a/src/compute/src/render/columnar.rs b/src/compute/src/render/columnar.rs new file mode 100644 index 0000000000000..1d67e523d11dc --- /dev/null +++ b/src/compute/src/render/columnar.rs @@ -0,0 +1,242 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! Columnar dataflow edge support. +//! +//! Defines [`CollectionEdge`], a wrapper that lets dataflow edges between Plan +//! nodes carry either row-based ([`VecCollection`]) or columnar +//! ([`ColumnarCollection`]) batches of `(D, T, R)` updates. +//! +//! # Migration model +//! +//! The migration is consumer-first: every Plan-node consumer learns to accept +//! both variants before any producer emits the columnar variant. Once all +//! consumers are ready, a single switch flips producers to columnar end-to-end. +//! +//! Within a Plan node, operators may freely materialize Vec collections; only +//! the inter-node edge format is constrained. A decode from columnar to Vec at +//! a consumer's input is acceptable only when the consumer would have decoded +//! `Row` to [`mz_repr::Datum`] anyway. Pure passthrough consumers (Negate, +//! Union) must round-trip the columnar variant without decoding. +//! +//! See `.claude/plans/columnar_consumer_first.md` for the staged plan. + +#![allow(dead_code)] +// Phase A defines the type with `todo!()` arms that will be filled in when +// producers begin emitting columnar batches. +#![allow(clippy::todo)] + +use columnar::Columnar; +use differential_dataflow::{Data, VecCollection}; +use mz_repr::{DatumVec, DatumVecBorrow, Diff, Row}; +use mz_timely_util::columnar::Column; +use mz_timely_util::operator::CollectionExt; +use timely::dataflow::{Scope, Stream, StreamVec}; +use timely::progress::Timestamp; + +use crate::render::RenderTimestamp; +use crate::typedefs::KeyBatcher; + +/// A columnar collection of `(D, T, R)` updates traveling on a compute +/// dataflow edge. +/// +/// Mirrors differential's [`VecCollection<'scope, T, D, R>`] shape and +/// parameters; the underlying container is [`Column<(D, T, R)>`] instead of +/// `Vec<(D, T, R)>`. +pub struct ColumnarCollection<'scope, T, D, R> +where + T: Timestamp, + (D, T, R): Columnar, +{ + /// The underlying timely stream of columnar batches. + pub inner: Stream<'scope, T, Column<(D, T, R)>>, +} + +impl<'scope, T, D, R> Clone for ColumnarCollection<'scope, T, D, R> +where + T: Timestamp, + (D, T, R): Columnar, +{ + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + +/// A dataflow edge carrying records as either a row-based [`VecCollection`] or +/// a [`ColumnarCollection`]. +/// +/// Producers choose a variant; consumers must accept either. Mixing variants +/// across a `concat` is rejected during the transition. +#[derive(Clone)] +pub enum CollectionEdge<'scope, T: RenderTimestamp> { + /// Row-formatted collection. Today's default for every producer. + Vec(VecCollection<'scope, T, Row, Diff>), + /// Columnar collection. Currently unused by any producer; reserved for the + /// producer flip at the end of the migration. + Columnar(ColumnarCollection<'scope, T, Row, Diff>), +} + +impl<'scope, T: RenderTimestamp> CollectionEdge<'scope, T> { + /// The scope containing this edge. + pub fn scope(&self) -> Scope<'scope, T> { + match self { + CollectionEdge::Vec(c) => c.inner.scope(), + CollectionEdge::Columnar(c) => c.inner.scope(), + } + } + + /// Brings the edge into a sub-region of its current scope. + pub fn enter_region<'inner>(self, region: Scope<'inner, T>) -> CollectionEdge<'inner, T> { + match self { + CollectionEdge::Vec(c) => CollectionEdge::Vec(c.enter_region(region)), + CollectionEdge::Columnar(_) => { + // Sub-region entry for columnar collections will be wired when + // the first producer emits this variant. + todo!("CollectionEdge::Columnar::enter_region") + } + } + } + + /// Leaves a sub-region back to the outer scope. + pub fn leave_region<'outer>(self, outer: Scope<'outer, T>) -> CollectionEdge<'outer, T> { + match self { + CollectionEdge::Vec(c) => CollectionEdge::Vec(c.leave_region(outer)), + CollectionEdge::Columnar(_) => { + todo!("CollectionEdge::Columnar::leave_region") + } + } + } + + /// Extracts the [`VecCollection`] arm, panicking on the columnar arm. + /// + /// This is a transitional fence used at consumer sites that have not yet + /// been converted to handle the columnar arm natively. Each Phase-B + /// consumer PR removes one call. + pub fn expect_vec(self) -> VecCollection<'scope, T, Row, Diff> { + match self { + CollectionEdge::Vec(c) => c, + CollectionEdge::Columnar(_) => panic!( + "CollectionEdge::expect_vec called on columnar arm; consumer must convert first" + ), + } + } + + /// Borrows the [`VecCollection`] arm mutably, panicking on the columnar arm. + /// + /// Transitional fence; see [`Self::expect_vec`]. + pub fn expect_vec_mut(&mut self) -> &mut VecCollection<'scope, T, Row, Diff> { + match self { + CollectionEdge::Vec(c) => c, + CollectionEdge::Columnar(_) => panic!( + "CollectionEdge::expect_vec_mut called on columnar arm; consumer must convert first" + ), + } + } + + /// Negates the diff on every record in this edge. + /// + /// Preserves variant. The columnar arm uses [`columnar_negate`], which + /// flips the diff column without decoding the data column. + pub fn negate(self) -> Self { + match self { + CollectionEdge::Vec(c) => CollectionEdge::Vec(c.negate()), + CollectionEdge::Columnar(c) => CollectionEdge::Columnar(columnar_negate(c)), + } + } + + /// Concatenates a collection of edges that all share the same variant. + /// + /// Mixing variants is rejected during the transition. + pub fn concat_many(scope: Scope<'scope, T>, edges: I) -> Self + where + I: IntoIterator, + { + let mut vec_arm = Vec::new(); + for edge in edges { + match edge { + CollectionEdge::Vec(c) => vec_arm.push(c), + CollectionEdge::Columnar(_) => { + // No producer emits the columnar arm yet; once one does, + // this branch must either concatenate columnar streams or + // reject mixed-variant inputs explicitly. + todo!("CollectionEdge::concat_many: columnar arm"); + } + } + } + CollectionEdge::Vec(differential_dataflow::collection::concatenate(scope, vec_arm)) + } + + /// Applies `logic` to each record in this edge, exposing the record as a + /// borrowed [`DatumVecBorrow`]. + /// + /// `max_demand` bounds the number of columns decoded per row; pass + /// `usize::MAX` to decode all columns. + /// + /// This is the canonical unified entry point for "decoding consumers" + /// (operators that read [`mz_repr::Datum`]s from each row anyway). The + /// Vec arm uses [`DatumVec::borrow_with_limit`] on each [`Row`]; the + /// Columnar arm iterates the columnar batch directly without going + /// through an owned [`Row`]. + pub fn flat_map_datums( + self, + max_demand: usize, + mut logic: L, + ) -> StreamVec<'scope, T, I::Item> + where + I: IntoIterator, + D: Data, + L: for<'a> FnMut(&'a mut DatumVecBorrow<'_>, T, Diff) -> I + 'static, + { + match self { + CollectionEdge::Vec(c) => { + use timely::dataflow::operators::vec::Map; + let mut datums = DatumVec::new(); + c.inner.flat_map(move |(v, t, d)| { + logic(&mut datums.borrow_with_limit(&v, max_demand), t, d) + }) + } + CollectionEdge::Columnar(_) => { + // Implementation will iterate `Column<(Row, T, Diff)>::borrow()` + // via the `Rows<_>::Index` impl (yielding `&RowRef`) and call + // `DatumVec::borrow_with_limit(row_ref, max_demand)` per row. + todo!("CollectionEdge::flat_map_datums: columnar arm") + } + } + } + + /// Consolidates updates in the edge, preserving variant. + pub fn consolidate_named(self, name: &str) -> Self { + match self { + CollectionEdge::Vec(c) => { + CollectionEdge::Vec(CollectionExt::consolidate_named::>(c, name)) + } + CollectionEdge::Columnar(_) => { + todo!("CollectionEdge::Columnar::consolidate_named") + } + } + } +} + +/// Negates the diff column of every batch in a [`ColumnarCollection`] without +/// decoding the data column. +pub fn columnar_negate<'scope, T, D, R>( + collection: ColumnarCollection<'scope, T, D, R>, +) -> ColumnarCollection<'scope, T, D, R> +where + T: Timestamp, + (D, T, R): Columnar, +{ + // Implementation is deferred until the first producer emits the + // [`CollectionEdge::Columnar`] variant. The signature is fixed here so + // consumers can target it during the consumer-first phase. + let _ = collection; + todo!("columnar_negate: flip diff column in place over Column<(D, T, R)>") +} diff --git a/src/compute/src/render/context.rs b/src/compute/src/render/context.rs index e9f1c2341bf42..9043bb71a57a4 100644 --- a/src/compute/src/render/context.rs +++ b/src/compute/src/render/context.rs @@ -44,6 +44,7 @@ use timely::progress::{Antichain, Timestamp}; use crate::compute_state::ComputeState; use crate::extensions::arrange::{KeyCollection, MzArrange, MzArrangeCore}; +use crate::render::columnar::CollectionEdge; use crate::render::errors::{DataflowErrorSer, ErrorLogger}; use crate::render::{LinearJoinSpec, MaybeBucketByTime, RenderTimestamp}; use crate::row_spine::{DatumSeq, RowRowBuilder}; @@ -363,7 +364,7 @@ impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> { #[derive(Clone)] pub struct CollectionBundle<'scope, T: RenderTimestamp> { pub collection: Option<( - VecCollection<'scope, T, Row, Diff>, + CollectionEdge<'scope, T>, VecCollection<'scope, T, DataflowErrorSer, Diff>, )>, pub arranged: BTreeMap, ArrangementFlavor<'scope, T>>, @@ -374,6 +375,14 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { pub fn from_collections( oks: VecCollection<'scope, T, Row, Diff>, errs: VecCollection<'scope, T, DataflowErrorSer, Diff>, + ) -> Self { + Self::from_edge(CollectionEdge::Vec(oks), errs) + } + + /// Construct a new collection bundle from a [`CollectionEdge`] and an error stream. + pub fn from_edge( + oks: CollectionEdge<'scope, T>, + errs: VecCollection<'scope, T, DataflowErrorSer, Diff>, ) -> Self { Self { collection: Some((oks, errs)), @@ -409,7 +418,7 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { /// The scope containing the collection bundle. pub fn scope(&self) -> Scope<'scope, T> { if let Some((oks, _errs)) = &self.collection { - oks.inner.scope() + oks.scope() } else { self.arranged .values() @@ -483,10 +492,13 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { // // If it doesn't, we panic. match key { - None => self - .collection - .clone() - .expect("The unarranged collection doesn't exist."), + None => { + let (oks, errs) = self + .collection + .clone() + .expect("The unarranged collection doesn't exist."); + (oks.expect_vec(), errs) + } Some(key) => { let arranged = self.arranged.get(key).unwrap_or_else(|| { panic!("The collection arranged by {:?} doesn't exist.", key) @@ -524,7 +536,7 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { &self, key_val: Option<(Vec, Option)>, max_demand: usize, - mut logic: L, + logic: L, ) -> ( StreamVec<'scope, T, I::Item>, VecCollection<'scope, T, DataflowErrorSer, Diff>, @@ -542,16 +554,11 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { .expect("Should have ensured during planning that this arrangement exists.") .flat_map(val.as_ref(), max_demand, logic) } else { - use timely::dataflow::operators::vec::Map; let (oks, errs) = self .collection .clone() .expect("Invariant violated: CollectionBundle contains no collection."); - let mut datums = DatumVec::new(); - let oks = oks.inner.flat_map(move |(v, t, d)| { - logic(&mut datums.borrow_with_limit(&v, max_demand), t, d) - }); - (oks, errs) + (oks.flat_map_datums(max_demand, logic), errs) } } @@ -797,7 +804,7 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { } else { oks }; - self.collection = Some((oks, errs)); + self.collection = Some((CollectionEdge::Vec(oks), errs)); } for (key, _, thinning) in collections.arranged { if !self.arranged.contains_key(&key) { @@ -808,6 +815,7 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { .collection .take() .expect("Collection constructed above"); + let oks = oks.expect_vec(); // Apply temporal bucketing if the collection already existed on // the bundle (e.g., from an upstream temporal Mfp or Get) and we // haven't bucketed yet. This is the common path for temporal-MFP @@ -828,7 +836,7 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { let (oks, errs_keyed, passthrough) = Self::arrange_collection(&name, oks, key.clone(), thinning.clone()); let errs_concat: KeyCollection<_, _, _> = errs.clone().concat(errs_keyed).into(); - self.collection = Some((passthrough, errs)); + self.collection = Some((CollectionEdge::Vec(passthrough), errs)); let errs = errs_concat.mz_arrange::, ErrBuilder<_, _>, ErrSpine<_, _>>( &format!("{}-errors", name), diff --git a/src/compute/src/render/sinks.rs b/src/compute/src/render/sinks.rs index cde32674ab814..2feb3776ec2cf 100644 --- a/src/compute/src/render/sinks.rs +++ b/src/compute/src/render/sinks.rs @@ -66,8 +66,8 @@ impl<'g, T: RenderTimestamp> Context<'g, T> { let bundle = self .lookup_id(mz_expr::Id::Global(sink.from)) .expect("Sink source collection not loaded"); - let (ok_collection, mut err_collection) = if let Some(collection) = &bundle.collection { - collection.clone() + let (ok_collection, mut err_collection) = if let Some((oks, errs)) = &bundle.collection { + (oks.clone().expect_vec(), errs.clone()) } else { let (key, _arrangement) = bundle .arranged