From 5022c8fafb91bc02b21c2f33107a357311fecfca Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 12 May 2026 15:21:23 +0200 Subject: [PATCH 1/8] compute: thread ok/err sessions through flat_map Propagate the dual-output `do_work` callback shape up through the arrangement flat_map stack so MFP evaluation can write `Ok` and `Err` results to separate output sessions, eliminating the downstream `map_fallible("OkErr", ...)` and `flat_map_fallible("OkErrDemux", ...)` demux operators in `as_collection_core` and the reduce key/value selector path. * `PendingWork` now retains a capability per output. * `flat_map_core` builds a two-output operator via `OperatorBuilder` instead of `Stream::unary`, and returns `(ok_stream, err_stream)`. * `ArrangementFlavor::flat_map` and `CollectionBundle::flat_map` take closures of the form `(datums, t, diff, &mut ok_session, &mut err_session) -> usize`, concat any MFP-derived errors with the arrangement's err collection, and return `(ok_stream, err_collection)`. * `as_collection_core` pushes `Ok`/`Err` directly into the two sessions and no longer reconstructs and re-splits a tagged stream. * `reduce.rs` updates its key/value selector to the new shape and drops its `OkErrDemux` operator. --- src/compute/src/render/context.rs | 370 ++++++++++++++++++++---------- src/compute/src/render/reduce.rs | 32 +-- 2 files changed, 260 insertions(+), 142 deletions(-) diff --git a/src/compute/src/render/context.rs b/src/compute/src/render/context.rs index e9f1c2341bf42..ec6b5f93adf32 100644 --- a/src/compute/src/render/context.rs +++ b/src/compute/src/render/context.rs @@ -32,8 +32,6 @@ use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow use mz_storage_types::controller::CollectionMetadata; use mz_timely_util::columnar::builder::ColumnBuilder; use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange}; -use mz_timely_util::operator::CollectionExt; -use timely::container::CapacityContainerBuilder; use timely::dataflow::channels::pact::{ExchangeCore, Pipeline}; use timely::dataflow::operators::Capability; use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; @@ -263,8 +261,10 @@ impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> { /// Constructs and applies logic to elements of `self` and returns the results. /// - /// The `logic` receives a vector of datums, a timestamp, and a diff, and produces - /// an iterator of `(D, T, Diff)` updates. + /// The `logic` callback receives a borrow of the decoded datum vector, a timestamp, a + /// diff, and two output sessions: one for `ok` updates of type `(D, T, Diff)` and one for + /// MFP-style `DataflowErrorSer` updates. It must return the amount of work performed so + /// the underlying operator can budget activations. /// /// If `key` is set, this is a promise that `logic` will produce no results on /// records for which the key does not evaluate to the value. This is used to @@ -273,19 +273,25 @@ impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> { /// The `max_demand` parameter limits the number of columns decoded from the /// input. Only the first `max_demand` columns are decoded. Pass `usize::MAX` to /// decode all columns. - pub fn flat_map( + pub fn flat_map( &self, key: Option<&Row>, max_demand: usize, mut logic: L, ) -> ( - StreamVec<'scope, T, I::Item>, + StreamVec<'scope, T, (D, T, Diff)>, VecCollection<'scope, T, DataflowErrorSer, Diff>, ) where - I: IntoIterator, D: Data, - L: for<'a, 'b> FnMut(&'a mut DatumVecBorrow<'b>, T, Diff) -> I + 'static, + L: for<'a, 'b> FnMut( + &'a mut DatumVecBorrow<'b>, + T, + Diff, + &mut Session, + &mut Session, + ) -> usize + + 'static, { // Set a number of tuples after which the operator should yield. // This allows us to remain responsive even when enumerating a substantial @@ -293,23 +299,33 @@ impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> { let refuel = 1000000; let mut datums = DatumVec::new(); - let logic = move |k: DatumSeq, v: DatumSeq, t, d| { - let mut datums_borrow = datums.borrow(); - datums_borrow.extend(k.to_datum_iter().take(max_demand)); - let max_demand = max_demand.saturating_sub(datums_borrow.len()); - datums_borrow.extend(v.to_datum_iter().take(max_demand)); - logic(&mut datums_borrow, t, d) - }; + let logic = + move |k: DatumSeq, + v: DatumSeq, + t, + d, + ok_session: &mut Session, + err_session: &mut Session| { + let mut datums_borrow = datums.borrow(); + datums_borrow.extend(k.to_datum_iter().take(max_demand)); + let max_demand = max_demand.saturating_sub(datums_borrow.len()); + datums_borrow.extend(v.to_datum_iter().take(max_demand)); + logic(&mut datums_borrow, t, d, ok_session, err_session) + }; match &self { ArrangementFlavor::Local(oks, errs) => { - let oks = CollectionBundle::::flat_map_core(oks.clone(), key, logic, refuel); + let (oks, mfp_errs) = + CollectionBundle::::flat_map_core(oks.clone(), key, logic, refuel); let errs = errs.clone().as_collection(|k, &()| k.clone()); + let errs = errs.concat(mfp_errs.as_collection()); (oks, errs) } ArrangementFlavor::Trace(_, oks, errs) => { - let oks = CollectionBundle::::flat_map_core(oks.clone(), key, logic, refuel); + let (oks, mfp_errs) = + CollectionBundle::::flat_map_core(oks.clone(), key, logic, refuel); let errs = errs.clone().as_collection(|k, &()| k.clone()); + let errs = errs.concat(mfp_errs.as_collection()); (oks, errs) } } @@ -493,9 +509,14 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { }); if ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION.get(config_set) { // Decode all columns, pass max_demand as usize::MAX. - let (ok, err) = arranged.flat_map(None, usize::MAX, |borrow, t, r| { - Some((SharedRow::pack(borrow.iter()), t, r)) - }); + let (ok, err) = arranged.flat_map( + None, + usize::MAX, + |borrow, t, r, ok_session, _err_session| { + ok_session.give((SharedRow::pack(borrow.iter()), t, r)); + 1 + }, + ); (ok.as_collection(), err) } else { #[allow(deprecated)] @@ -520,19 +541,25 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { /// The `max_demand` parameter limits the number of columns decoded from the /// input. Only the first `max_demand` columns are decoded. Pass `usize::MAX` to /// decode all columns. - pub fn flat_map( + pub fn flat_map( &self, key_val: Option<(Vec, Option)>, max_demand: usize, mut logic: L, ) -> ( - StreamVec<'scope, T, I::Item>, + StreamVec<'scope, T, (D, T, Diff)>, VecCollection<'scope, T, DataflowErrorSer, Diff>, ) where - I: IntoIterator, D: Data, - L: for<'a> FnMut(&'a mut DatumVecBorrow<'_>, T, Diff) -> I + 'static, + L: for<'a> FnMut( + &'a mut DatumVecBorrow<'_>, + T, + Diff, + &mut Session, + &mut Session, + ) -> usize + + 'static, { // If `key_val` is set, we should have to use the corresponding arrangement. // If there isn't one, that implies an error in the contract between @@ -542,16 +569,49 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { .expect("Should have ensured during planning that this arrangement exists.") .flat_map(val.as_ref(), max_demand, logic) } else { - use timely::dataflow::operators::vec::Map; let (oks, errs) = self .collection .clone() .expect("Invariant violated: CollectionBundle contains no collection."); - let mut datums = DatumVec::new(); - let oks = oks.inner.flat_map(move |(v, t, d)| { - logic(&mut datums.borrow_with_limit(&v, max_demand), t, d) + let scope = oks.inner.scope(); + let mut builder = OperatorBuilder::new("CollectionFlatMap".to_string(), scope); + let (ok_output, ok_stream) = builder.new_output(); + let mut ok_output = + OutputBuilder::<_, ConsolidatingContainerBuilder>>::from( + ok_output, + ); + let (err_output, err_stream) = builder.new_output(); + let mut err_output = OutputBuilder::< + _, + ConsolidatingContainerBuilder>, + >::from(err_output); + let mut input = builder.new_input(oks.inner, Pipeline); + builder.build(move |_capabilities| { + let mut datums = DatumVec::new(); + move |_frontiers| { + let mut ok_output = ok_output.activate(); + let mut err_output = err_output.activate(); + input.for_each(|time, data| { + // Retain the input capability to derive a `Capability` for each output; + // the `Session` type alias is fixed to `Capability`. + let ok_cap = time.retain(0); + let err_cap = time.retain(1); + let mut ok_session = ok_output.session_with_builder(&ok_cap); + let mut err_session = err_output.session_with_builder(&err_cap); + for (v, t, d) in data.iter() { + logic( + &mut datums.borrow_with_limit(v, max_demand), + t.clone(), + d.clone(), + &mut ok_session, + &mut err_session, + ); + } + }); + } }); - (oks, errs) + let errs = errs.concat(err_stream.as_collection()); + (ok_stream, errs) } } @@ -561,13 +621,18 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { /// once, and thereby avoid any skew in the two uses of the logic. /// /// The function presents the contents of the trace as `(key, value, time, delta)` tuples, - /// where key and value are potentially specialized, but convertible into rows. - fn flat_map_core( + /// where key and value are potentially specialized, but convertible into rows. The `logic` + /// callback writes ok results into the first session and errors into the second, returning + /// the amount of work performed so the operator can fuel-limit its activations. + fn flat_map_core( trace: Arranged<'scope, Tr>, key: Option<&::Owned>, mut logic: L, refuel: usize, - ) -> StreamVec<'scope, T, I::Item> + ) -> ( + StreamVec<'scope, T, (D, T, Diff)>, + StreamVec<'scope, T, (DataflowErrorSer, T, Diff)>, + ) where Tr: for<'a> TraceReader< Key<'a>: ToDatumIter, @@ -577,11 +642,17 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { > + Clone + 'static, ::Owned: PartialEq, - I: IntoIterator, D: Data, - L: FnMut(Tr::Key<'_>, Tr::Val<'_>, T, mz_repr::Diff) -> I + 'static, + L: FnMut( + Tr::Key<'_>, + Tr::Val<'_>, + T, + mz_repr::Diff, + &mut Session, + &mut Session, + ) -> usize + + 'static, { - use differential_dataflow::consolidation::ConsolidatingContainerBuilder as CB; let scope = trace.stream.scope(); let mut key_con = Tr::KeyContainer::with_capacity(1); @@ -590,48 +661,67 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { } let mode = if key.is_some() { "index" } else { "scan" }; let name = format!("ArrangementFlatMap({})", mode); - use timely::dataflow::operators::Operator; - trace - .stream - .unary::, _, _, _>(Pipeline, &name, move |_, info| { - // Acquire an activator to reschedule the operator when it has unfinished work. - let activator = scope.activator_for(info.address); - // Maintain a list of work to do, cursor to navigate and process. - let mut todo = std::collections::VecDeque::new(); - move |input, output| { - let key = key_con.get(0); - // First, dequeue all batches. - input.for_each(|time, data| { - let capability = time.retain(0); - for batch in data.iter() { - // enqueue a capability, cursor, and batch. - todo.push_back(PendingWork::new( - capability.clone(), - batch.cursor(), - batch.clone(), - )); - } - }); - // Second, make progress on `todo`. - let mut fuel = refuel; - while !todo.is_empty() && fuel > 0 { - todo.front_mut().unwrap().do_work( - key.as_ref(), - &mut logic, - &mut fuel, - output, - ); - if fuel > 0 { - todo.pop_front(); - } + let mut builder = OperatorBuilder::new(name, scope.clone()); + let (ok_output, ok_stream) = builder.new_output(); + let mut ok_output = + OutputBuilder::<_, ConsolidatingContainerBuilder>>::from(ok_output); + let (err_output, err_stream) = builder.new_output(); + let mut err_output = OutputBuilder::< + _, + ConsolidatingContainerBuilder>, + >::from(err_output); + let mut input = builder.new_input(trace.stream.clone(), Pipeline); + let operator_info = builder.operator_info(); + + builder.build(move |_capabilities| { + // Acquire an activator to reschedule the operator when it has unfinished work. + let activator = scope.activator_for(operator_info.address); + // Maintain a list of work to do, cursor to navigate and process. + let mut todo = std::collections::VecDeque::new(); + move |_frontiers| { + let key = key_con.get(0); + let mut ok_output = ok_output.activate(); + let mut err_output = err_output.activate(); + + // First, dequeue all batches. + input.for_each(|time, data| { + // Retain a capability for each output, as the work may complete across + // multiple activations. + let ok_cap = time.retain(0); + let err_cap = time.retain(1); + for batch in data.iter() { + todo.push_back(PendingWork::new( + ok_cap.clone(), + err_cap.clone(), + batch.cursor(), + batch.clone(), + )); } - // If we have not finished all work, re-activate the operator. - if !todo.is_empty() { - activator.activate(); + }); + + // Second, make progress on `todo`. + let mut fuel = refuel; + while !todo.is_empty() && fuel > 0 { + todo.front_mut().unwrap().do_work( + key.as_ref(), + &mut logic, + &mut fuel, + &mut ok_output, + &mut err_output, + ); + if fuel > 0 { + todo.pop_front(); } } - }) + // If we have not finished all work, re-activate the operator. + if !todo.is_empty() { + activator.activate(); + } + } + }); + + (ok_stream, err_stream) } /// Look up an arrangement by the expressions that form the key. @@ -690,49 +780,47 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { // Wrap in an `Rc` so that lifetimes work out. let until = std::rc::Rc::new(until); - let (stream, errors) = self.flat_map(key_val, max_demand, move |row_datums, time, diff| { - let mut row_builder = SharedRow::get(); - let until = std::rc::Rc::clone(&until); - let temp_storage = RowArena::new(); - let row_iter = row_datums.iter(); - let mut datums_local = datum_vec.borrow(); - datums_local.extend(row_iter); - let time = time.clone(); - let event_time = time.event_time(); - mfp_plan - .evaluate( + let (stream, errors) = self.flat_map( + key_val, + max_demand, + move |row_datums, time, diff, ok_session, err_session| { + let mut row_builder = SharedRow::get(); + let until = std::rc::Rc::clone(&until); + let temp_storage = RowArena::new(); + let row_iter = row_datums.iter(); + let mut datums_local = datum_vec.borrow(); + datums_local.extend(row_iter); + let event_time = time.event_time(); + let mut work: usize = 0; + for result in mfp_plan.evaluate( &mut datums_local, &temp_storage, event_time, diff.clone(), move |time| !until.less_equal(time), &mut row_builder, - ) - .map(move |x| match x { - Ok((row, event_time, diff)) => { - // Copy the whole time, and re-populate event time. - let mut time: T = time.clone(); - *time.event_time_mut() = event_time; - (Ok(row), time, diff) - } - Err((e, event_time, diff)) => { - // Copy the whole time, and re-populate event time. - let mut time: T = time.clone(); - *time.event_time_mut() = event_time; - (Err(e), time, diff) + ) { + work += 1; + match result { + Ok((row, event_time, diff)) => { + // Copy the whole time, and re-populate event time. + let mut time: T = time.clone(); + *time.event_time_mut() = event_time; + ok_session.give((row, time, diff)); + } + Err((e, event_time, diff)) => { + // Copy the whole time, and re-populate event time. + let mut time: T = time.clone(); + *time.event_time_mut() = event_time; + err_session.give((e, time, diff)); + } } - }) - }); - - use differential_dataflow::AsCollection; - let (oks, errs) = stream - .as_collection() - .map_fallible::, CapacityContainerBuilder<_>, _, _, _>( - "OkErr", - |x| x, - ); + } + work + }, + ); - (oks, errors.concat(errs)) + (stream.as_collection(), errors) } pub fn ensure_collections( mut self, @@ -926,11 +1014,22 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { } } +type Session<'a, 'b, T, D> = timely::dataflow::operators::generic::Session< + 'a, + 'b, + T, + ConsolidatingContainerBuilder>, + Capability, +>; + struct PendingWork where C: Cursor, { - capability: Capability, + /// Capability for the `ok` output (output port 0). + ok_capability: Capability, + /// Capability for the `err` output (output port 1). + err_capability: Capability, cursor: C, batch: C::Storage, } @@ -939,31 +1038,56 @@ impl PendingWork where C: Cursor>, { - /// Create a new bundle of pending work, from the capability, cursor, and backing storage. - fn new(capability: Capability, cursor: C, batch: C::Storage) -> Self { + /// Create a new bundle of pending work, from a pair of capabilities (one per output), + /// a cursor, and backing storage. + fn new( + ok_capability: Capability, + err_capability: Capability, + cursor: C, + batch: C::Storage, + ) -> Self { Self { - capability, + ok_capability, + err_capability, cursor, batch, } } - /// Perform roughly `fuel` work through the cursor, applying `logic` and sending results to `output`. - fn do_work( + /// Perform roughly `fuel` work through the cursor, applying `logic` and sending results to + /// the two output sessions. + fn do_work( &mut self, key: Option<&C::Key<'_>>, logic: &mut L, fuel: &mut usize, - output: &mut OutputBuilderSession<'_, C::Time, ConsolidatingContainerBuilder>>, + ok_output: &mut OutputBuilderSession< + '_, + C::Time, + ConsolidatingContainerBuilder>, + >, + err_output: &mut OutputBuilderSession< + '_, + C::Time, + ConsolidatingContainerBuilder>, + >, ) where - I: IntoIterator, D: Data, - L: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff) -> I + 'static, + L: FnMut( + C::Key<'_>, + C::Val<'_>, + C::Time, + C::Diff, + &mut Session, + &mut Session, + ) -> usize + + 'static, { use differential_dataflow::consolidation::consolidate; // Attempt to make progress on this batch. let mut work: usize = 0; - let mut session = output.session_with_builder(&self.capability); + let mut ok_session = ok_output.session_with_builder(&self.ok_capability); + let mut err_session = err_output.session_with_builder(&self.err_capability); let mut buffer = Vec::new(); if let Some(key) = key { let key = C::KeyContainer::reborrow(*key); @@ -978,10 +1102,7 @@ where }); consolidate(&mut buffer); for (time, diff) in buffer.drain(..) { - for datum in logic(key, val, time, diff) { - session.give(datum); - work += 1; - } + work += logic(key, val, time, diff, &mut ok_session, &mut err_session); } self.cursor.step_val(&self.batch); if work >= *fuel { @@ -998,10 +1119,7 @@ where }); consolidate(&mut buffer); for (time, diff) in buffer.drain(..) { - for datum in logic(key, val, time, diff) { - session.give(datum); - work += 1; - } + work += logic(key, val, time, diff, &mut ok_session, &mut err_session); } self.cursor.step_val(&self.batch); if work >= *fuel { diff --git a/src/compute/src/render/reduce.rs b/src/compute/src/render/reduce.rs index d4f34d2951846..d41e58f889c37 100644 --- a/src/compute/src/render/reduce.rs +++ b/src/compute/src/render/reduce.rs @@ -18,7 +18,6 @@ use columnation::{Columnation, CopyRegion}; use dec::OrderedDecimal; use differential_dataflow::Diff as _; use differential_dataflow::collection::AsCollection; -use differential_dataflow::consolidation::ConsolidatingContainerBuilder; use differential_dataflow::difference::{IsZero, Multiply, Semigroup}; use differential_dataflow::hashable::Hashable; use differential_dataflow::operators::arrange::{Arranged, TraceAgent}; @@ -103,10 +102,10 @@ impl<'scope, T: RenderTimestamp> Context<'scope, T> { let max_demand = demand.iter().max().map(|x| *x + 1).unwrap_or(0); let skips = mz_compute_types::plan::reduce::convert_indexes_to_skips(demand); - let (key_val_input, err_input) = input.enter_region(inner).flat_map( + let (key_val_input, err) = input.enter_region(inner).flat_map( input_key.map(|k| (k, None)), max_demand, - move |row_datums, time, diff| { + move |row_datums, time, diff, ok_session, err_session| { let mut row_builder = SharedRow::get(); let temp_storage = RowArena::new(); @@ -122,7 +121,8 @@ impl<'scope, T: RenderTimestamp> Context<'scope, T> { key_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder); let key = match key { Err(e) => { - return Some((Err(e.into()), time.clone(), diff.clone())); + err_session.give((e.into(), time, diff)); + return 1; } Ok(Some(key)) => key.clone(), Ok(None) => panic!("Row expected as no predicate was used"), @@ -135,27 +135,27 @@ impl<'scope, T: RenderTimestamp> Context<'scope, T> { val_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder); let val = match val { Err(e) => { - return Some((Err(e.into()), time.clone(), diff.clone())); + err_session.give((e.into(), time, diff)); + return 1; } Ok(Some(val)) => val.clone(), Ok(None) => panic!("Row expected as no predicate was used"), }; - Some((Ok((key, val)), time.clone(), diff.clone())) + ok_session.give(((key, val), time, diff)); + 1 }, ); - // Demux out the potential errors from key and value selector evaluation. - type CB = ConsolidatingContainerBuilder; - let (ok, mut err) = key_val_input - .as_collection() - .flat_map_fallible::, CB<_>, _, _, _, _>("OkErrDemux", Some); - - err = err.concat(err_input); - // Render the reduce plan - self.render_reduce_plan(reduce_plan, ok, err, key_arity, mfp_after) - .leave_region(self.scope) + self.render_reduce_plan( + reduce_plan, + key_val_input.as_collection(), + err, + key_arity, + mfp_after, + ) + .leave_region(self.scope) }) } From c16e5371d4717bb8aa12547f2ef14cfc6b649182 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 12 May 2026 12:01:35 +0200 Subject: [PATCH 2/8] compute: add Ok-only flat_map variant Introduce `CollectionBundle::flat_map_core_ok` and `ArrangementFlavor::flat_map_ok`, single-output siblings of the fallible versions for callers that statically never emit `DataflowErrorSer` records. Switch `as_specific_collection`'s fueled path to use it so we no longer build a second output port, retain a per-batch err capability, or wire an empty err stream into the downstream concat. Cursor walking is factored into a free `walk_cursor` helper so the fallible and Ok-only `do_work` methods share the same loop. --- src/compute/src/render/context.rs | 296 +++++++++++++++++++++++++----- 1 file changed, 248 insertions(+), 48 deletions(-) diff --git a/src/compute/src/render/context.rs b/src/compute/src/render/context.rs index ec6b5f93adf32..7f0ea495578a8 100644 --- a/src/compute/src/render/context.rs +++ b/src/compute/src/render/context.rs @@ -330,6 +330,54 @@ impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> { } } } + + /// Ok-only variant of [`Self::flat_map`]. The `logic` callback receives a single output + /// session and cannot produce errors. The returned err collection comes solely from the + /// arrangement; no extra operator is built to carry an empty MFP-error stream. + pub fn flat_map_ok( + &self, + key: Option<&Row>, + max_demand: usize, + mut logic: L, + ) -> ( + StreamVec<'scope, T, (D, T, Diff)>, + VecCollection<'scope, T, DataflowErrorSer, Diff>, + ) + where + D: Data, + L: for<'a, 'b> FnMut( + &'a mut DatumVecBorrow<'b>, + T, + Diff, + &mut Session, + ) -> usize + + 'static, + { + let refuel = 1000000; + + let mut datums = DatumVec::new(); + let logic = + move |k: DatumSeq, v: DatumSeq, t, d, ok_session: &mut Session| { + let mut datums_borrow = datums.borrow(); + datums_borrow.extend(k.to_datum_iter().take(max_demand)); + let max_demand = max_demand.saturating_sub(datums_borrow.len()); + datums_borrow.extend(v.to_datum_iter().take(max_demand)); + logic(&mut datums_borrow, t, d, ok_session) + }; + + match &self { + ArrangementFlavor::Local(oks, errs) => { + let oks = CollectionBundle::::flat_map_core_ok(oks.clone(), key, logic, refuel); + let errs = errs.clone().as_collection(|k, &()| k.clone()); + (oks, errs) + } + ArrangementFlavor::Trace(_, oks, errs) => { + let oks = CollectionBundle::::flat_map_core_ok(oks.clone(), key, logic, refuel); + let errs = errs.clone().as_collection(|k, &()| k.clone()); + (oks, errs) + } + } + } } impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> { /// The scope containing the collection bundle. @@ -509,14 +557,11 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { }); if ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION.get(config_set) { // Decode all columns, pass max_demand as usize::MAX. - let (ok, err) = arranged.flat_map( - None, - usize::MAX, - |borrow, t, r, ok_session, _err_session| { + let (ok, err) = + arranged.flat_map_ok(None, usize::MAX, |borrow, t, r, ok_session| { ok_session.give((SharedRow::pack(borrow.iter()), t, r)); 1 - }, - ); + }); (ok.as_collection(), err) } else { #[allow(deprecated)] @@ -724,6 +769,90 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { (ok_stream, err_stream) } + /// Ok-only variant of [`Self::flat_map_core`]. The `logic` callback writes results into a + /// single output session and returns the amount of work performed. Use this when the caller + /// statically knows it will never produce `DataflowErrorSer` records, to avoid building a + /// second output port and the empty err stream that would follow it. + fn flat_map_core_ok( + trace: Arranged<'scope, Tr>, + key: Option<&::Owned>, + mut logic: L, + refuel: usize, + ) -> StreamVec<'scope, T, (D, T, Diff)> + where + Tr: for<'a> TraceReader< + Key<'a>: ToDatumIter, + Val<'a>: ToDatumIter, + Time = T, + Diff = mz_repr::Diff, + > + Clone + + 'static, + ::Owned: PartialEq, + D: Data, + L: FnMut( + Tr::Key<'_>, + Tr::Val<'_>, + T, + mz_repr::Diff, + &mut Session, + ) -> usize + + 'static, + { + let scope = trace.stream.scope(); + + let mut key_con = Tr::KeyContainer::with_capacity(1); + if let Some(key) = &key { + key_con.push_own(key); + } + let mode = if key.is_some() { "index" } else { "scan" }; + let name = format!("ArrangementFlatMapOk({})", mode); + + let mut builder = OperatorBuilder::new(name, scope.clone()); + let (ok_output, ok_stream) = builder.new_output(); + let mut ok_output = + OutputBuilder::<_, ConsolidatingContainerBuilder>>::from(ok_output); + let mut input = builder.new_input(trace.stream.clone(), Pipeline); + let operator_info = builder.operator_info(); + + builder.build(move |_capabilities| { + let activator = scope.activator_for(operator_info.address); + let mut todo = std::collections::VecDeque::new(); + move |_frontiers| { + let key = key_con.get(0); + let mut ok_output = ok_output.activate(); + + input.for_each(|time, data| { + let cap = time.retain(0); + for batch in data.iter() { + todo.push_back(PendingWorkOk::new( + cap.clone(), + batch.cursor(), + batch.clone(), + )); + } + }); + + let mut fuel = refuel; + while !todo.is_empty() && fuel > 0 { + todo.front_mut().unwrap().do_work( + key.as_ref(), + &mut logic, + &mut fuel, + &mut ok_output, + ); + if fuel > 0 { + todo.pop_front(); + } + } + if !todo.is_empty() { + activator.activate(); + } + } + }); + + ok_stream + } + /// Look up an arrangement by the expressions that form the key. /// /// The result may be `None` if no such arrangement exists, or it may be one of many @@ -1082,54 +1211,125 @@ where ) -> usize + 'static, { - use differential_dataflow::consolidation::consolidate; - - // Attempt to make progress on this batch. - let mut work: usize = 0; let mut ok_session = ok_output.session_with_builder(&self.ok_capability); let mut err_session = err_output.session_with_builder(&self.err_capability); - let mut buffer = Vec::new(); - if let Some(key) = key { - let key = C::KeyContainer::reborrow(*key); - if self.cursor.get_key(&self.batch).map(|k| k == key) != Some(true) { - self.cursor.seek_key(&self.batch, key); - } - if self.cursor.get_key(&self.batch).map(|k| k == key) == Some(true) { - let key = self.cursor.key(&self.batch); - while let Some(val) = self.cursor.get_val(&self.batch) { - self.cursor.map_times(&self.batch, |time, diff| { - buffer.push((C::owned_time(time), C::owned_diff(diff))); - }); - consolidate(&mut buffer); - for (time, diff) in buffer.drain(..) { - work += logic(key, val, time, diff, &mut ok_session, &mut err_session); - } - self.cursor.step_val(&self.batch); - if work >= *fuel { - *fuel = 0; - return; - } + walk_cursor(&mut self.cursor, &self.batch, key, fuel, |k, v, t, d| { + logic(k, v, t, d, &mut ok_session, &mut err_session) + }); + } +} + +/// Pending work for the Ok-only variant of `flat_map_core`. Holds a single capability since +/// the operator has only one output port. +struct PendingWorkOk +where + C: Cursor, +{ + capability: Capability, + cursor: C, + batch: C::Storage, +} + +impl PendingWorkOk +where + C: Cursor>, +{ + fn new(capability: Capability, cursor: C, batch: C::Storage) -> Self { + Self { + capability, + cursor, + batch, + } + } + + /// Perform roughly `fuel` work through the cursor, applying `logic` and sending results to + /// the single output session. + fn do_work( + &mut self, + key: Option<&C::Key<'_>>, + logic: &mut L, + fuel: &mut usize, + ok_output: &mut OutputBuilderSession< + '_, + C::Time, + ConsolidatingContainerBuilder>, + >, + ) where + D: Data, + L: FnMut( + C::Key<'_>, + C::Val<'_>, + C::Time, + C::Diff, + &mut Session, + ) -> usize + + 'static, + { + let mut ok_session = ok_output.session_with_builder(&self.capability); + walk_cursor(&mut self.cursor, &self.batch, key, fuel, |k, v, t, d| { + logic(k, v, t, d, &mut ok_session) + }); + } +} + +/// Walk a cursor, calling `emit` for each consolidated `(key, val, time, diff)` tuple. If `key` +/// is set, the cursor is seeked to it and only values for that key are produced. The cursor +/// stops as soon as `emit` has reported at least `*fuel` units of work, leaving the cursor in +/// place so that work can resume on a later call. +fn walk_cursor( + cursor: &mut C, + batch: &C::Storage, + key: Option<&C::Key<'_>>, + fuel: &mut usize, + mut emit: F, +) where + C: Cursor>, + F: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff) -> usize, +{ + use differential_dataflow::consolidation::consolidate; + + let mut work: usize = 0; + let mut buffer = Vec::new(); + if let Some(key) = key { + let key = C::KeyContainer::reborrow(*key); + if cursor.get_key(batch).map(|k| k == key) != Some(true) { + cursor.seek_key(batch, key); + } + if cursor.get_key(batch).map(|k| k == key) == Some(true) { + let key = cursor.key(batch); + while let Some(val) = cursor.get_val(batch) { + cursor.map_times(batch, |time, diff| { + buffer.push((C::owned_time(time), C::owned_diff(diff))); + }); + consolidate(&mut buffer); + for (time, diff) in buffer.drain(..) { + work += emit(key, val, time, diff); + } + cursor.step_val(batch); + if work >= *fuel { + *fuel = 0; + return; } } - } else { - while let Some(key) = self.cursor.get_key(&self.batch) { - while let Some(val) = self.cursor.get_val(&self.batch) { - self.cursor.map_times(&self.batch, |time, diff| { - buffer.push((C::owned_time(time), C::owned_diff(diff))); - }); - consolidate(&mut buffer); - for (time, diff) in buffer.drain(..) { - work += logic(key, val, time, diff, &mut ok_session, &mut err_session); - } - self.cursor.step_val(&self.batch); - if work >= *fuel { - *fuel = 0; - return; - } + } + } else { + while let Some(key) = cursor.get_key(batch) { + while let Some(val) = cursor.get_val(batch) { + cursor.map_times(batch, |time, diff| { + buffer.push((C::owned_time(time), C::owned_diff(diff))); + }); + consolidate(&mut buffer); + for (time, diff) in buffer.drain(..) { + work += emit(key, val, time, diff); + } + cursor.step_val(batch); + if work >= *fuel { + *fuel = 0; + return; } - self.cursor.step_key(&self.batch); } + cursor.step_key(batch); } - *fuel -= work; } + *fuel -= work; } From b661c2a950e7855dd45bf0af6bba676f08817dd4 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 12 May 2026 12:10:53 +0200 Subject: [PATCH 3/8] compute: rename flat_map_core to flat_map_core_fallible Visually flag fallible vs Ok-only paths at call sites so future readers notice which one emits errors. Pure rename, no behavior change. --- src/compute/src/render/context.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/compute/src/render/context.rs b/src/compute/src/render/context.rs index 7f0ea495578a8..c1a38c3def32e 100644 --- a/src/compute/src/render/context.rs +++ b/src/compute/src/render/context.rs @@ -316,14 +316,14 @@ impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> { match &self { ArrangementFlavor::Local(oks, errs) => { let (oks, mfp_errs) = - CollectionBundle::::flat_map_core(oks.clone(), key, logic, refuel); + CollectionBundle::::flat_map_core_fallible(oks.clone(), key, logic, refuel); let errs = errs.clone().as_collection(|k, &()| k.clone()); let errs = errs.concat(mfp_errs.as_collection()); (oks, errs) } ArrangementFlavor::Trace(_, oks, errs) => { let (oks, mfp_errs) = - CollectionBundle::::flat_map_core(oks.clone(), key, logic, refuel); + CollectionBundle::::flat_map_core_fallible(oks.clone(), key, logic, refuel); let errs = errs.clone().as_collection(|k, &()| k.clone()); let errs = errs.concat(mfp_errs.as_collection()); (oks, errs) @@ -669,7 +669,7 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { /// where key and value are potentially specialized, but convertible into rows. The `logic` /// callback writes ok results into the first session and errors into the second, returning /// the amount of work performed so the operator can fuel-limit its activations. - fn flat_map_core( + fn flat_map_core_fallible( trace: Arranged<'scope, Tr>, key: Option<&::Owned>, mut logic: L, @@ -769,7 +769,7 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { (ok_stream, err_stream) } - /// Ok-only variant of [`Self::flat_map_core`]. The `logic` callback writes results into a + /// Ok-only variant of [`Self::flat_map_core_fallible`]. The `logic` callback writes results into a /// single output session and returns the amount of work performed. Use this when the caller /// statically knows it will never produce `DataflowErrorSer` records, to avoid building a /// second output port and the empty err stream that would follow it. @@ -1219,7 +1219,7 @@ where } } -/// Pending work for the Ok-only variant of `flat_map_core`. Holds a single capability since +/// Pending work for the Ok-only variant of `flat_map_core_fallible`. Holds a single capability since /// the operator has only one output port. struct PendingWorkOk where From 828c834d3711dab325ff8fdb5b25ca91f206079b Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 12 May 2026 14:29:31 +0200 Subject: [PATCH 4/8] compute: clarify flat_map fuel doc State explicitly that the closure returns records produced (not input tuples consumed), and note the run-to-completion behavior when emit returns 0 for every input. --- src/compute/src/render/context.rs | 35 ++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/src/compute/src/render/context.rs b/src/compute/src/render/context.rs index c1a38c3def32e..417b47473a3c1 100644 --- a/src/compute/src/render/context.rs +++ b/src/compute/src/render/context.rs @@ -263,8 +263,10 @@ impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> { /// /// The `logic` callback receives a borrow of the decoded datum vector, a timestamp, a /// diff, and two output sessions: one for `ok` updates of type `(D, T, Diff)` and one for - /// MFP-style `DataflowErrorSer` updates. It must return the amount of work performed so - /// the underlying operator can budget activations. + /// MFP-style `DataflowErrorSer` updates. It must return the number of records *produced* + /// (written to either session), not the number of input tuples consumed. The underlying + /// operator uses this to bound output per activation while running input-to-completion + /// for inputs that produce no output (e.g. filtered-out rows). /// /// If `key` is set, this is a promise that `logic` will produce no results on /// records for which the key does not evaluate to the value. This is used to @@ -332,8 +334,9 @@ impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> { } /// Ok-only variant of [`Self::flat_map`]. The `logic` callback receives a single output - /// session and cannot produce errors. The returned err collection comes solely from the - /// arrangement; no extra operator is built to carry an empty MFP-error stream. + /// session, cannot produce errors, and returns the number of records produced (see + /// [`Self::flat_map`] for fuel semantics). The returned err collection comes solely from + /// the arrangement; no extra operator is built to carry an empty MFP-error stream. pub fn flat_map_ok( &self, key: Option<&Row>, @@ -668,7 +671,9 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { /// The function presents the contents of the trace as `(key, value, time, delta)` tuples, /// where key and value are potentially specialized, but convertible into rows. The `logic` /// callback writes ok results into the first session and errors into the second, returning - /// the amount of work performed so the operator can fuel-limit its activations. + /// the number of records produced (not input tuples consumed). The operator uses the sum + /// as fuel to bound output per activation; input tuples that produce no output are + /// walked through to completion within a batch. fn flat_map_core_fallible( trace: Arranged<'scope, Tr>, key: Option<&::Owned>, @@ -769,10 +774,11 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { (ok_stream, err_stream) } - /// Ok-only variant of [`Self::flat_map_core_fallible`]. The `logic` callback writes results into a - /// single output session and returns the amount of work performed. Use this when the caller - /// statically knows it will never produce `DataflowErrorSer` records, to avoid building a - /// second output port and the empty err stream that would follow it. + /// Ok-only variant of [`Self::flat_map_core_fallible`]. The `logic` callback writes results + /// into a single output session and returns the number of records produced (see the + /// fallible variant for fuel semantics). Use this when the caller statically knows it + /// will never produce `DataflowErrorSer` records, to avoid building a second output port + /// and the empty err stream that would follow it. fn flat_map_core_ok( trace: Arranged<'scope, Tr>, key: Option<&::Owned>, @@ -1273,9 +1279,14 @@ where } /// Walk a cursor, calling `emit` for each consolidated `(key, val, time, diff)` tuple. If `key` -/// is set, the cursor is seeked to it and only values for that key are produced. The cursor -/// stops as soon as `emit` has reported at least `*fuel` units of work, leaving the cursor in -/// place so that work can resume on a later call. +/// is set, the cursor is seeked to it and only values for that key are produced. +/// +/// `emit` returns the number of records it produced for the given input tuple (0 if it +/// filtered the tuple out). The cursor stops as soon as the accumulated emit count reaches +/// `*fuel`, leaving the cursor in place so work can resume on a later call. Because the +/// metric is output-produced and not input-consumed, input tuples that produce no output are +/// walked through to completion within a single call; this matches our run-to-completion +/// preference for selective MFPs. fn walk_cursor( cursor: &mut C, batch: &C::Storage, From 17b1c2259ab553e6eb53cacdc37daf7a24f710a1 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 12 May 2026 14:37:59 +0200 Subject: [PATCH 5/8] compute: explain why flat_map fuel counts output Add a Fuel section to the public flat_map doc explaining that the output-produced metric serves two pressures: draining inputs so the operator releases its Batch clone (a filter(false) walks to end-of-batch in one activation), and throttling outputs so a map producing wide rows can't flood the next operator. The refuel constant is documented as a pragmatic compromise; no universal value exists across MFP shapes. Trim the inner helpers' docs to reference the public one. --- src/compute/src/render/context.rs | 48 ++++++++++++++++++++----------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/src/compute/src/render/context.rs b/src/compute/src/render/context.rs index 417b47473a3c1..946ca86fabfb6 100644 --- a/src/compute/src/render/context.rs +++ b/src/compute/src/render/context.rs @@ -264,9 +264,25 @@ impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> { /// The `logic` callback receives a borrow of the decoded datum vector, a timestamp, a /// diff, and two output sessions: one for `ok` updates of type `(D, T, Diff)` and one for /// MFP-style `DataflowErrorSer` updates. It must return the number of records *produced* - /// (written to either session), not the number of input tuples consumed. The underlying - /// operator uses this to bound output per activation while running input-to-completion - /// for inputs that produce no output (e.g. filtered-out rows). + /// (written to either session), not the number of input tuples consumed. + /// + /// # Fuel + /// + /// The operator accumulates the returned counts as fuel and yields when the total reaches + /// an internal refuel threshold. The metric is output-produced (not input-consumed) on + /// purpose: it regulates two asymmetric pressures. + /// + /// * **Drain inputs.** The operator holds a clone of each pending `Batch` until its work + /// item pops; we want to release that memory back to the upstream arrangement as soon + /// as possible. A `filter(false)` MFP returns 0 for every tuple, so fuel never trips + /// and the cursor runs to end-of-batch in one activation. + /// * **Throttle outputs.** A `map("1KB-string")` MFP produces large records per input; + /// stopping when emit count hits the threshold caps how much data a single activation + /// dumps on the next operator. + /// + /// The refuel constant is a pragmatic compromise: large enough to be a non-event in + /// steady-state, small enough that one activation can't flood downstream. There is no + /// universal value across MFP shapes. /// /// If `key` is set, this is a promise that `logic` will produce no results on /// records for which the key does not evaluate to the value. This is used to @@ -295,9 +311,9 @@ impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> { ) -> usize + 'static, { - // Set a number of tuples after which the operator should yield. - // This allows us to remain responsive even when enumerating a substantial - // arrangement, as well as provides time to accumulate our produced output. + // Number of output records this activation may produce before yielding. See the + // `Fuel` section in the doc comment for the rationale behind both the metric and the + // magnitude. let refuel = 1000000; let mut datums = DatumVec::new(); @@ -356,6 +372,7 @@ impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> { ) -> usize + 'static, { + // See [`Self::flat_map`] for the fuel rationale. let refuel = 1000000; let mut datums = DatumVec::new(); @@ -671,9 +688,8 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { /// The function presents the contents of the trace as `(key, value, time, delta)` tuples, /// where key and value are potentially specialized, but convertible into rows. The `logic` /// callback writes ok results into the first session and errors into the second, returning - /// the number of records produced (not input tuples consumed). The operator uses the sum - /// as fuel to bound output per activation; input tuples that produce no output are - /// walked through to completion within a batch. + /// the number of records produced. See [`ArrangementFlavor::flat_map`] for the fuel + /// rationale. fn flat_map_core_fallible( trace: Arranged<'scope, Tr>, key: Option<&::Owned>, @@ -1278,15 +1294,13 @@ where } } -/// Walk a cursor, calling `emit` for each consolidated `(key, val, time, diff)` tuple. If `key` -/// is set, the cursor is seeked to it and only values for that key are produced. +/// Walk a cursor, calling `emit` for each consolidated `(key, val, time, diff)` tuple. If +/// `key` is set, the cursor is seeked to it and only values for that key are produced. /// -/// `emit` returns the number of records it produced for the given input tuple (0 if it -/// filtered the tuple out). The cursor stops as soon as the accumulated emit count reaches -/// `*fuel`, leaving the cursor in place so work can resume on a later call. Because the -/// metric is output-produced and not input-consumed, input tuples that produce no output are -/// walked through to completion within a single call; this matches our run-to-completion -/// preference for selective MFPs. +/// `emit` returns the number of records it produced for the given input tuple. The cursor +/// stops as soon as the accumulated emit count reaches `*fuel`, leaving the cursor in place +/// so work can resume on a later call. See [`ArrangementFlavor::flat_map`] for why fuel +/// counts output rather than input. fn walk_cursor( cursor: &mut C, batch: &C::Storage, From a0e1362600646fbf38c0c7a8fd662f165f4547b7 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 12 May 2026 15:12:29 +0200 Subject: [PATCH 6/8] compute: parameterize ok output container builder Let the caller of flat_map/flat_map_ok/as_collection_core/reduce select the ok-side container builder. Each call site now states explicitly whether it wants consolidation (`ConsolidatingContainerBuilder`) or plain accumulation (`CapacityContainerBuilder`); the operator no longer hardcodes a choice deep in the stack. * `Session<'a, 'b, T, CB>` is generic over the builder rather than fixed to consolidating. * `flat_map_core_fallible`, `flat_map_core_ok`, `flat_map`, `flat_map_ok`, and `CollectionBundle::flat_map` carry an `OkCB: ContainerBuilder + PushInto<(D, T, Diff)>` generic and return `Stream<'scope, T, OkCB::Container>`. The err side is fixed to `CapacityContainerBuilder>` via an `ErrCB` alias; this restores the pre-refactor behavior where the removed `map_fallible("OkErr", ...)` demux used `CapacityCB`. * Call-site choices: * `as_specific_collection` -> Capacity (cursor walk is 1:1). * `as_collection_core` -> Consolidating (MFP fan-out can collide). * `reduce` key/val selector -> Consolidating (multiple inputs can map to the same `(key, val)` at the same time). This also opens the door for a columnar ok container in the future: callers can plug in a builder whose `Container` is not `Vec<...>`. --- src/compute/src/render/context.rs | 297 +++++++++++++++--------------- src/compute/src/render/reduce.rs | 89 +++++---- 2 files changed, 195 insertions(+), 191 deletions(-) diff --git a/src/compute/src/render/context.rs b/src/compute/src/render/context.rs index 946ca86fabfb6..3bac094e1cb6b 100644 --- a/src/compute/src/render/context.rs +++ b/src/compute/src/render/context.rs @@ -32,11 +32,13 @@ use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow use mz_storage_types::controller::CollectionMetadata; use mz_timely_util::columnar::builder::ColumnBuilder; use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange}; +use timely::ContainerBuilder; +use timely::container::{CapacityContainerBuilder, PushInto}; use timely::dataflow::channels::pact::{ExchangeCore, Pipeline}; use timely::dataflow::operators::Capability; use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession}; -use timely::dataflow::{Scope, StreamVec}; +use timely::dataflow::{Scope, Stream}; use timely::progress::operate::FrontierInterest; use timely::progress::{Antichain, Timestamp}; @@ -291,23 +293,24 @@ impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> { /// The `max_demand` parameter limits the number of columns decoded from the /// input. Only the first `max_demand` columns are decoded. Pass `usize::MAX` to /// decode all columns. - pub fn flat_map( + pub fn flat_map( &self, key: Option<&Row>, max_demand: usize, mut logic: L, ) -> ( - StreamVec<'scope, T, (D, T, Diff)>, + Stream<'scope, T, OkCB::Container>, VecCollection<'scope, T, DataflowErrorSer, Diff>, ) where D: Data, + OkCB: ContainerBuilder + PushInto<(D, T, Diff)>, L: for<'a, 'b> FnMut( &'a mut DatumVecBorrow<'b>, T, Diff, - &mut Session, - &mut Session, + &mut Session, + &mut Session>, ) -> usize + 'static, { @@ -317,31 +320,38 @@ impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> { let refuel = 1000000; let mut datums = DatumVec::new(); - let logic = - move |k: DatumSeq, - v: DatumSeq, - t, - d, - ok_session: &mut Session, - err_session: &mut Session| { - let mut datums_borrow = datums.borrow(); - datums_borrow.extend(k.to_datum_iter().take(max_demand)); - let max_demand = max_demand.saturating_sub(datums_borrow.len()); - datums_borrow.extend(v.to_datum_iter().take(max_demand)); - logic(&mut datums_borrow, t, d, ok_session, err_session) - }; + let logic = move |k: DatumSeq, + v: DatumSeq, + t, + d, + ok_session: &mut Session, + err_session: &mut Session>| { + let mut datums_borrow = datums.borrow(); + datums_borrow.extend(k.to_datum_iter().take(max_demand)); + let max_demand = max_demand.saturating_sub(datums_borrow.len()); + datums_borrow.extend(v.to_datum_iter().take(max_demand)); + logic(&mut datums_borrow, t, d, ok_session, err_session) + }; match &self { ArrangementFlavor::Local(oks, errs) => { - let (oks, mfp_errs) = - CollectionBundle::::flat_map_core_fallible(oks.clone(), key, logic, refuel); + let (oks, mfp_errs) = CollectionBundle::::flat_map_core_fallible::<_, _, OkCB, _>( + oks.clone(), + key, + logic, + refuel, + ); let errs = errs.clone().as_collection(|k, &()| k.clone()); let errs = errs.concat(mfp_errs.as_collection()); (oks, errs) } ArrangementFlavor::Trace(_, oks, errs) => { - let (oks, mfp_errs) = - CollectionBundle::::flat_map_core_fallible(oks.clone(), key, logic, refuel); + let (oks, mfp_errs) = CollectionBundle::::flat_map_core_fallible::<_, _, OkCB, _>( + oks.clone(), + key, + logic, + refuel, + ); let errs = errs.clone().as_collection(|k, &()| k.clone()); let errs = errs.concat(mfp_errs.as_collection()); (oks, errs) @@ -353,46 +363,51 @@ impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> { /// session, cannot produce errors, and returns the number of records produced (see /// [`Self::flat_map`] for fuel semantics). The returned err collection comes solely from /// the arrangement; no extra operator is built to carry an empty MFP-error stream. - pub fn flat_map_ok( + pub fn flat_map_ok( &self, key: Option<&Row>, max_demand: usize, mut logic: L, ) -> ( - StreamVec<'scope, T, (D, T, Diff)>, + Stream<'scope, T, OkCB::Container>, VecCollection<'scope, T, DataflowErrorSer, Diff>, ) where D: Data, - L: for<'a, 'b> FnMut( - &'a mut DatumVecBorrow<'b>, - T, - Diff, - &mut Session, - ) -> usize + OkCB: ContainerBuilder + PushInto<(D, T, Diff)>, + L: for<'a, 'b> FnMut(&'a mut DatumVecBorrow<'b>, T, Diff, &mut Session) -> usize + 'static, { // See [`Self::flat_map`] for the fuel rationale. let refuel = 1000000; let mut datums = DatumVec::new(); - let logic = - move |k: DatumSeq, v: DatumSeq, t, d, ok_session: &mut Session| { - let mut datums_borrow = datums.borrow(); - datums_borrow.extend(k.to_datum_iter().take(max_demand)); - let max_demand = max_demand.saturating_sub(datums_borrow.len()); - datums_borrow.extend(v.to_datum_iter().take(max_demand)); - logic(&mut datums_borrow, t, d, ok_session) - }; + let logic = move |k: DatumSeq, v: DatumSeq, t, d, ok_session: &mut Session| { + let mut datums_borrow = datums.borrow(); + datums_borrow.extend(k.to_datum_iter().take(max_demand)); + let max_demand = max_demand.saturating_sub(datums_borrow.len()); + datums_borrow.extend(v.to_datum_iter().take(max_demand)); + logic(&mut datums_borrow, t, d, ok_session) + }; match &self { ArrangementFlavor::Local(oks, errs) => { - let oks = CollectionBundle::::flat_map_core_ok(oks.clone(), key, logic, refuel); + let oks = CollectionBundle::::flat_map_core_ok::<_, _, OkCB, _>( + oks.clone(), + key, + logic, + refuel, + ); let errs = errs.clone().as_collection(|k, &()| k.clone()); (oks, errs) } ArrangementFlavor::Trace(_, oks, errs) => { - let oks = CollectionBundle::::flat_map_core_ok(oks.clone(), key, logic, refuel); + let oks = CollectionBundle::::flat_map_core_ok::<_, _, OkCB, _>( + oks.clone(), + key, + logic, + refuel, + ); let errs = errs.clone().as_collection(|k, &()| k.clone()); (oks, errs) } @@ -576,12 +591,18 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { panic!("The collection arranged by {:?} doesn't exist.", key) }); if ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION.get(config_set) { - // Decode all columns, pass max_demand as usize::MAX. - let (ok, err) = - arranged.flat_map_ok(None, usize::MAX, |borrow, t, r, ok_session| { - ok_session.give((SharedRow::pack(borrow.iter()), t, r)); - 1 - }); + // Decode all columns, pass max_demand as usize::MAX. Output is 1:1 from the + // cursor (no duplicates), so a non-consolidating container builder is the + // right choice. + let (ok, err) = arranged + .flat_map_ok::<_, CapacityContainerBuilder>, _>( + None, + usize::MAX, + |borrow, t, r, ok_session| { + ok_session.give((SharedRow::pack(borrow.iter()), t, r)); + 1 + }, + ); (ok.as_collection(), err) } else { #[allow(deprecated)] @@ -606,23 +627,24 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { /// The `max_demand` parameter limits the number of columns decoded from the /// input. Only the first `max_demand` columns are decoded. Pass `usize::MAX` to /// decode all columns. - pub fn flat_map( + pub fn flat_map( &self, key_val: Option<(Vec, Option)>, max_demand: usize, mut logic: L, ) -> ( - StreamVec<'scope, T, (D, T, Diff)>, + Stream<'scope, T, OkCB::Container>, VecCollection<'scope, T, DataflowErrorSer, Diff>, ) where D: Data, + OkCB: ContainerBuilder + PushInto<(D, T, Diff)>, L: for<'a> FnMut( &'a mut DatumVecBorrow<'_>, T, Diff, - &mut Session, - &mut Session, + &mut Session, + &mut Session>, ) -> usize + 'static, { @@ -632,7 +654,7 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { if let Some((key, val)) = key_val { self.arrangement(&key) .expect("Should have ensured during planning that this arrangement exists.") - .flat_map(val.as_ref(), max_demand, logic) + .flat_map::<_, OkCB, _>(val.as_ref(), max_demand, logic) } else { let (oks, errs) = self .collection @@ -641,15 +663,9 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { let scope = oks.inner.scope(); let mut builder = OperatorBuilder::new("CollectionFlatMap".to_string(), scope); let (ok_output, ok_stream) = builder.new_output(); - let mut ok_output = - OutputBuilder::<_, ConsolidatingContainerBuilder>>::from( - ok_output, - ); + let mut ok_output = OutputBuilder::<_, OkCB>::from(ok_output); let (err_output, err_stream) = builder.new_output(); - let mut err_output = OutputBuilder::< - _, - ConsolidatingContainerBuilder>, - >::from(err_output); + let mut err_output = OutputBuilder::<_, ErrCB>::from(err_output); let mut input = builder.new_input(oks.inner, Pipeline); builder.build(move |_capabilities| { let mut datums = DatumVec::new(); @@ -690,14 +706,14 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { /// callback writes ok results into the first session and errors into the second, returning /// the number of records produced. See [`ArrangementFlavor::flat_map`] for the fuel /// rationale. - fn flat_map_core_fallible( + fn flat_map_core_fallible( trace: Arranged<'scope, Tr>, key: Option<&::Owned>, mut logic: L, refuel: usize, ) -> ( - StreamVec<'scope, T, (D, T, Diff)>, - StreamVec<'scope, T, (DataflowErrorSer, T, Diff)>, + Stream<'scope, T, OkCB::Container>, + Stream<'scope, T, Vec<(DataflowErrorSer, T, Diff)>>, ) where Tr: for<'a> TraceReader< @@ -709,13 +725,14 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { + 'static, ::Owned: PartialEq, D: Data, + OkCB: ContainerBuilder + PushInto<(D, T, Diff)>, L: FnMut( Tr::Key<'_>, Tr::Val<'_>, T, mz_repr::Diff, - &mut Session, - &mut Session, + &mut Session, + &mut Session>, ) -> usize + 'static, { @@ -730,13 +747,9 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { let mut builder = OperatorBuilder::new(name, scope.clone()); let (ok_output, ok_stream) = builder.new_output(); - let mut ok_output = - OutputBuilder::<_, ConsolidatingContainerBuilder>>::from(ok_output); + let mut ok_output = OutputBuilder::<_, OkCB>::from(ok_output); let (err_output, err_stream) = builder.new_output(); - let mut err_output = OutputBuilder::< - _, - ConsolidatingContainerBuilder>, - >::from(err_output); + let mut err_output = OutputBuilder::<_, ErrCB>::from(err_output); let mut input = builder.new_input(trace.stream.clone(), Pipeline); let operator_info = builder.operator_info(); @@ -795,12 +808,12 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { /// fallible variant for fuel semantics). Use this when the caller statically knows it /// will never produce `DataflowErrorSer` records, to avoid building a second output port /// and the empty err stream that would follow it. - fn flat_map_core_ok( + fn flat_map_core_ok( trace: Arranged<'scope, Tr>, key: Option<&::Owned>, mut logic: L, refuel: usize, - ) -> StreamVec<'scope, T, (D, T, Diff)> + ) -> Stream<'scope, T, OkCB::Container> where Tr: for<'a> TraceReader< Key<'a>: ToDatumIter, @@ -811,13 +824,8 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { + 'static, ::Owned: PartialEq, D: Data, - L: FnMut( - Tr::Key<'_>, - Tr::Val<'_>, - T, - mz_repr::Diff, - &mut Session, - ) -> usize + OkCB: ContainerBuilder + PushInto<(D, T, Diff)>, + L: FnMut(Tr::Key<'_>, Tr::Val<'_>, T, mz_repr::Diff, &mut Session) -> usize + 'static, { let scope = trace.stream.scope(); @@ -831,8 +839,7 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { let mut builder = OperatorBuilder::new(name, scope.clone()); let (ok_output, ok_stream) = builder.new_output(); - let mut ok_output = - OutputBuilder::<_, ConsolidatingContainerBuilder>>::from(ok_output); + let mut ok_output = OutputBuilder::<_, OkCB>::from(ok_output); let mut input = builder.new_input(trace.stream.clone(), Pipeline); let operator_info = builder.operator_info(); @@ -931,45 +938,46 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { // Wrap in an `Rc` so that lifetimes work out. let until = std::rc::Rc::new(until); - let (stream, errors) = self.flat_map( - key_val, - max_demand, - move |row_datums, time, diff, ok_session, err_session| { - let mut row_builder = SharedRow::get(); - let until = std::rc::Rc::clone(&until); - let temp_storage = RowArena::new(); - let row_iter = row_datums.iter(); - let mut datums_local = datum_vec.borrow(); - datums_local.extend(row_iter); - let event_time = time.event_time(); - let mut work: usize = 0; - for result in mfp_plan.evaluate( - &mut datums_local, - &temp_storage, - event_time, - diff.clone(), - move |time| !until.less_equal(time), - &mut row_builder, - ) { - work += 1; - match result { - Ok((row, event_time, diff)) => { - // Copy the whole time, and re-populate event time. - let mut time: T = time.clone(); - *time.event_time_mut() = event_time; - ok_session.give((row, time, diff)); - } - Err((e, event_time, diff)) => { - // Copy the whole time, and re-populate event time. - let mut time: T = time.clone(); - *time.event_time_mut() = event_time; - err_session.give((e, time, diff)); + let (stream, errors) = self + .flat_map::<_, ConsolidatingContainerBuilder>, _>( + key_val, + max_demand, + move |row_datums, time, diff, ok_session, err_session| { + let mut row_builder = SharedRow::get(); + let until = std::rc::Rc::clone(&until); + let temp_storage = RowArena::new(); + let row_iter = row_datums.iter(); + let mut datums_local = datum_vec.borrow(); + datums_local.extend(row_iter); + let event_time = time.event_time(); + let mut work: usize = 0; + for result in mfp_plan.evaluate( + &mut datums_local, + &temp_storage, + event_time, + diff.clone(), + move |time| !until.less_equal(time), + &mut row_builder, + ) { + work += 1; + match result { + Ok((row, event_time, diff)) => { + // Copy the whole time, and re-populate event time. + let mut time: T = time.clone(); + *time.event_time_mut() = event_time; + ok_session.give((row, time, diff)); + } + Err((e, event_time, diff)) => { + // Copy the whole time, and re-populate event time. + let mut time: T = time.clone(); + *time.event_time_mut() = event_time; + err_session.give((e, time, diff)); + } } } - } - work - }, - ); + work + }, + ); (stream.as_collection(), errors) } @@ -1165,13 +1173,16 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { } } -type Session<'a, 'b, T, D> = timely::dataflow::operators::generic::Session< - 'a, - 'b, - T, - ConsolidatingContainerBuilder>, - Capability, ->; +/// Type alias for a timely output `Session` whose capability is a `Capability`. The container +/// builder `CB` is left to the caller; sessions can therefore drive consolidating, capacity, or +/// (in the future) columnar output builders without changing call sites. +type Session<'a, 'b, T, CB> = + timely::dataflow::operators::generic::Session<'a, 'b, T, CB, Capability>; + +/// Container builder used for the err output of every flat_map variant. Errors are low volume +/// and we don't expect within-batch duplicates, so [`CapacityContainerBuilder`] is the right +/// default; this matches the pre-refactor behavior of the now-removed `map_fallible` demux. +type ErrCB = CapacityContainerBuilder>; struct PendingWork where @@ -1206,30 +1217,23 @@ where } /// Perform roughly `fuel` work through the cursor, applying `logic` and sending results to /// the two output sessions. - fn do_work( + fn do_work( &mut self, key: Option<&C::Key<'_>>, logic: &mut L, fuel: &mut usize, - ok_output: &mut OutputBuilderSession< - '_, - C::Time, - ConsolidatingContainerBuilder>, - >, - err_output: &mut OutputBuilderSession< - '_, - C::Time, - ConsolidatingContainerBuilder>, - >, + ok_output: &mut OutputBuilderSession<'_, C::Time, OkCB>, + err_output: &mut OutputBuilderSession<'_, C::Time, ErrCB>, ) where D: Data, + OkCB: ContainerBuilder + PushInto<(D, C::Time, C::Diff)>, L: FnMut( C::Key<'_>, C::Val<'_>, C::Time, C::Diff, - &mut Session, - &mut Session, + &mut Session, + &mut Session>, ) -> usize + 'static, { @@ -1266,25 +1270,16 @@ where /// Perform roughly `fuel` work through the cursor, applying `logic` and sending results to /// the single output session. - fn do_work( + fn do_work( &mut self, key: Option<&C::Key<'_>>, logic: &mut L, fuel: &mut usize, - ok_output: &mut OutputBuilderSession< - '_, - C::Time, - ConsolidatingContainerBuilder>, - >, + ok_output: &mut OutputBuilderSession<'_, C::Time, OkCB>, ) where D: Data, - L: FnMut( - C::Key<'_>, - C::Val<'_>, - C::Time, - C::Diff, - &mut Session, - ) -> usize + OkCB: ContainerBuilder + PushInto<(D, C::Time, C::Diff)>, + L: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff, &mut Session) -> usize + 'static, { let mut ok_session = ok_output.session_with_builder(&self.capability); diff --git a/src/compute/src/render/reduce.rs b/src/compute/src/render/reduce.rs index d41e58f889c37..e165bd567c5e2 100644 --- a/src/compute/src/render/reduce.rs +++ b/src/compute/src/render/reduce.rs @@ -18,6 +18,7 @@ use columnation::{Columnation, CopyRegion}; use dec::OrderedDecimal; use differential_dataflow::Diff as _; use differential_dataflow::collection::AsCollection; +use differential_dataflow::consolidation::ConsolidatingContainerBuilder; use differential_dataflow::difference::{IsZero, Multiply, Semigroup}; use differential_dataflow::hashable::Hashable; use differential_dataflow::operators::arrange::{Arranged, TraceAgent}; @@ -102,50 +103,58 @@ impl<'scope, T: RenderTimestamp> Context<'scope, T> { let max_demand = demand.iter().max().map(|x| *x + 1).unwrap_or(0); let skips = mz_compute_types::plan::reduce::convert_indexes_to_skips(demand); - let (key_val_input, err) = input.enter_region(inner).flat_map( - input_key.map(|k| (k, None)), - max_demand, - move |row_datums, time, diff, ok_session, err_session| { - let mut row_builder = SharedRow::get(); - let temp_storage = RowArena::new(); - - let mut row_iter = row_datums.drain(..); - let mut datums_local = datums.borrow(); - // Unpack only the demanded columns. - for skip in skips.iter() { - datums_local.push(row_iter.nth(*skip).unwrap()); - } + let (key_val_input, err) = input + .enter_region(inner) + .flat_map::<_, ConsolidatingContainerBuilder>, _>( + input_key.map(|k| (k, None)), + max_demand, + move |row_datums, time, diff, ok_session, err_session| { + let mut row_builder = SharedRow::get(); + let temp_storage = RowArena::new(); - // Evaluate the key expressions. - let key = - key_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder); - let key = match key { - Err(e) => { - err_session.give((e.into(), time, diff)); - return 1; + let mut row_iter = row_datums.drain(..); + let mut datums_local = datums.borrow(); + // Unpack only the demanded columns. + for skip in skips.iter() { + datums_local.push(row_iter.nth(*skip).unwrap()); } - Ok(Some(key)) => key.clone(), - Ok(None) => panic!("Row expected as no predicate was used"), - }; - // Evaluate the value expressions. - // The prior evaluation may have left additional columns we should delete. - datums_local.truncate(skips.len()); - let val = - val_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder); - let val = match val { - Err(e) => { - err_session.give((e.into(), time, diff)); - return 1; - } - Ok(Some(val)) => val.clone(), - Ok(None) => panic!("Row expected as no predicate was used"), - }; + // Evaluate the key expressions. + let key = key_plan.evaluate_into( + &mut datums_local, + &temp_storage, + &mut row_builder, + ); + let key = match key { + Err(e) => { + err_session.give((e.into(), time, diff)); + return 1; + } + Ok(Some(key)) => key.clone(), + Ok(None) => panic!("Row expected as no predicate was used"), + }; + + // Evaluate the value expressions. + // The prior evaluation may have left additional columns we should delete. + datums_local.truncate(skips.len()); + let val = val_plan.evaluate_into( + &mut datums_local, + &temp_storage, + &mut row_builder, + ); + let val = match val { + Err(e) => { + err_session.give((e.into(), time, diff)); + return 1; + } + Ok(Some(val)) => val.clone(), + Ok(None) => panic!("Row expected as no predicate was used"), + }; - ok_session.give(((key, val), time, diff)); - 1 - }, - ); + ok_session.give(((key, val), time, diff)); + 1 + }, + ); // Render the reduce plan self.render_reduce_plan( From c91ca7e81a79765f3ea8dd74b0c33ac4588cddb7 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 12 May 2026 15:17:01 +0200 Subject: [PATCH 7/8] compute: REFUEL const + walk_cursor multi-key doc Lift the repeated `refuel = 1000000` into a module-level `const REFUEL: usize = 1_000_000`. Adds a sentence to `walk_cursor`'s doc clarifying that the fuel check is the only bound on both inner-val and outer-key iteration, so a `filter(false)` runs the whole batch in one activation. --- src/compute/src/render/context.rs | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/src/compute/src/render/context.rs b/src/compute/src/render/context.rs index 3bac094e1cb6b..594130126cec7 100644 --- a/src/compute/src/render/context.rs +++ b/src/compute/src/render/context.rs @@ -314,11 +314,6 @@ impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> { ) -> usize + 'static, { - // Number of output records this activation may produce before yielding. See the - // `Fuel` section in the doc comment for the rationale behind both the metric and the - // magnitude. - let refuel = 1000000; - let mut datums = DatumVec::new(); let logic = move |k: DatumSeq, v: DatumSeq, @@ -339,7 +334,7 @@ impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> { oks.clone(), key, logic, - refuel, + REFUEL, ); let errs = errs.clone().as_collection(|k, &()| k.clone()); let errs = errs.concat(mfp_errs.as_collection()); @@ -350,7 +345,7 @@ impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> { oks.clone(), key, logic, - refuel, + REFUEL, ); let errs = errs.clone().as_collection(|k, &()| k.clone()); let errs = errs.concat(mfp_errs.as_collection()); @@ -378,9 +373,6 @@ impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> { L: for<'a, 'b> FnMut(&'a mut DatumVecBorrow<'b>, T, Diff, &mut Session) -> usize + 'static, { - // See [`Self::flat_map`] for the fuel rationale. - let refuel = 1000000; - let mut datums = DatumVec::new(); let logic = move |k: DatumSeq, v: DatumSeq, t, d, ok_session: &mut Session| { let mut datums_borrow = datums.borrow(); @@ -396,7 +388,7 @@ impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> { oks.clone(), key, logic, - refuel, + REFUEL, ); let errs = errs.clone().as_collection(|k, &()| k.clone()); (oks, errs) @@ -406,7 +398,7 @@ impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> { oks.clone(), key, logic, - refuel, + REFUEL, ); let errs = errs.clone().as_collection(|k, &()| k.clone()); (oks, errs) @@ -1184,6 +1176,11 @@ type Session<'a, 'b, T, CB> = /// default; this matches the pre-refactor behavior of the now-removed `map_fallible` demux. type ErrCB = CapacityContainerBuilder>; +/// Number of output records the arrangement flat_map operators may produce before yielding. +/// See [`ArrangementFlavor::flat_map`] for the fuel rationale; the constant is a pragmatic +/// compromise and not tuned empirically. +const REFUEL: usize = 1_000_000; + struct PendingWork where C: Cursor, @@ -1294,8 +1291,10 @@ where /// /// `emit` returns the number of records it produced for the given input tuple. The cursor /// stops as soon as the accumulated emit count reaches `*fuel`, leaving the cursor in place -/// so work can resume on a later call. See [`ArrangementFlavor::flat_map`] for why fuel -/// counts output rather than input. +/// so work can resume on a later call. Within a batch, both the inner val loop and the +/// outer key loop are bounded only by emit count, so selective filters (`emit` returns 0) +/// run to batch completion in a single activation — see [`ArrangementFlavor::flat_map`] +/// for why fuel counts output rather than input. fn walk_cursor( cursor: &mut C, batch: &C::Storage, From 279978f51e9921ea6d9e2dc784c8ce346e6b73cb Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 12 May 2026 15:31:40 +0200 Subject: [PATCH 8/8] compute: rename flat_map container builder generic to DCB Matches the convention used in src/timely-util/src/operator.rs (DCB for the data side, ECB for the error side). The err type alias becomes ECB even though it's currently fixed to a CapacityContainerBuilder; the shared Session<'_, '_, T, CB> alias keeps the abstract name CB since it is reusable for both sides. Also shortens the turbofish call sites enough to fit under rustfmt's 100-char line limit. --- src/compute/src/render/context.rs | 94 +++++++++++++++---------------- 1 file changed, 47 insertions(+), 47 deletions(-) diff --git a/src/compute/src/render/context.rs b/src/compute/src/render/context.rs index 594130126cec7..d9efc203cca07 100644 --- a/src/compute/src/render/context.rs +++ b/src/compute/src/render/context.rs @@ -293,24 +293,24 @@ impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> { /// The `max_demand` parameter limits the number of columns decoded from the /// input. Only the first `max_demand` columns are decoded. Pass `usize::MAX` to /// decode all columns. - pub fn flat_map( + pub fn flat_map( &self, key: Option<&Row>, max_demand: usize, mut logic: L, ) -> ( - Stream<'scope, T, OkCB::Container>, + Stream<'scope, T, DCB::Container>, VecCollection<'scope, T, DataflowErrorSer, Diff>, ) where D: Data, - OkCB: ContainerBuilder + PushInto<(D, T, Diff)>, + DCB: ContainerBuilder + PushInto<(D, T, Diff)>, L: for<'a, 'b> FnMut( &'a mut DatumVecBorrow<'b>, T, Diff, - &mut Session, - &mut Session>, + &mut Session, + &mut Session>, ) -> usize + 'static, { @@ -319,8 +319,8 @@ impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> { v: DatumSeq, t, d, - ok_session: &mut Session, - err_session: &mut Session>| { + ok_session: &mut Session, + err_session: &mut Session>| { let mut datums_borrow = datums.borrow(); datums_borrow.extend(k.to_datum_iter().take(max_demand)); let max_demand = max_demand.saturating_sub(datums_borrow.len()); @@ -330,7 +330,7 @@ impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> { match &self { ArrangementFlavor::Local(oks, errs) => { - let (oks, mfp_errs) = CollectionBundle::::flat_map_core_fallible::<_, _, OkCB, _>( + let (oks, mfp_errs) = CollectionBundle::::flat_map_core_fallible::<_, _, DCB, _>( oks.clone(), key, logic, @@ -341,7 +341,7 @@ impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> { (oks, errs) } ArrangementFlavor::Trace(_, oks, errs) => { - let (oks, mfp_errs) = CollectionBundle::::flat_map_core_fallible::<_, _, OkCB, _>( + let (oks, mfp_errs) = CollectionBundle::::flat_map_core_fallible::<_, _, DCB, _>( oks.clone(), key, logic, @@ -358,23 +358,23 @@ impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> { /// session, cannot produce errors, and returns the number of records produced (see /// [`Self::flat_map`] for fuel semantics). The returned err collection comes solely from /// the arrangement; no extra operator is built to carry an empty MFP-error stream. - pub fn flat_map_ok( + pub fn flat_map_ok( &self, key: Option<&Row>, max_demand: usize, mut logic: L, ) -> ( - Stream<'scope, T, OkCB::Container>, + Stream<'scope, T, DCB::Container>, VecCollection<'scope, T, DataflowErrorSer, Diff>, ) where D: Data, - OkCB: ContainerBuilder + PushInto<(D, T, Diff)>, - L: for<'a, 'b> FnMut(&'a mut DatumVecBorrow<'b>, T, Diff, &mut Session) -> usize + DCB: ContainerBuilder + PushInto<(D, T, Diff)>, + L: for<'a, 'b> FnMut(&'a mut DatumVecBorrow<'b>, T, Diff, &mut Session) -> usize + 'static, { let mut datums = DatumVec::new(); - let logic = move |k: DatumSeq, v: DatumSeq, t, d, ok_session: &mut Session| { + let logic = move |k: DatumSeq, v: DatumSeq, t, d, ok_session: &mut Session| { let mut datums_borrow = datums.borrow(); datums_borrow.extend(k.to_datum_iter().take(max_demand)); let max_demand = max_demand.saturating_sub(datums_borrow.len()); @@ -384,7 +384,7 @@ impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> { match &self { ArrangementFlavor::Local(oks, errs) => { - let oks = CollectionBundle::::flat_map_core_ok::<_, _, OkCB, _>( + let oks = CollectionBundle::::flat_map_core_ok::<_, _, DCB, _>( oks.clone(), key, logic, @@ -394,7 +394,7 @@ impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> { (oks, errs) } ArrangementFlavor::Trace(_, oks, errs) => { - let oks = CollectionBundle::::flat_map_core_ok::<_, _, OkCB, _>( + let oks = CollectionBundle::::flat_map_core_ok::<_, _, DCB, _>( oks.clone(), key, logic, @@ -619,24 +619,24 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { /// The `max_demand` parameter limits the number of columns decoded from the /// input. Only the first `max_demand` columns are decoded. Pass `usize::MAX` to /// decode all columns. - pub fn flat_map( + pub fn flat_map( &self, key_val: Option<(Vec, Option)>, max_demand: usize, mut logic: L, ) -> ( - Stream<'scope, T, OkCB::Container>, + Stream<'scope, T, DCB::Container>, VecCollection<'scope, T, DataflowErrorSer, Diff>, ) where D: Data, - OkCB: ContainerBuilder + PushInto<(D, T, Diff)>, + DCB: ContainerBuilder + PushInto<(D, T, Diff)>, L: for<'a> FnMut( &'a mut DatumVecBorrow<'_>, T, Diff, - &mut Session, - &mut Session>, + &mut Session, + &mut Session>, ) -> usize + 'static, { @@ -646,7 +646,7 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { if let Some((key, val)) = key_val { self.arrangement(&key) .expect("Should have ensured during planning that this arrangement exists.") - .flat_map::<_, OkCB, _>(val.as_ref(), max_demand, logic) + .flat_map::<_, DCB, _>(val.as_ref(), max_demand, logic) } else { let (oks, errs) = self .collection @@ -655,9 +655,9 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { let scope = oks.inner.scope(); let mut builder = OperatorBuilder::new("CollectionFlatMap".to_string(), scope); let (ok_output, ok_stream) = builder.new_output(); - let mut ok_output = OutputBuilder::<_, OkCB>::from(ok_output); + let mut ok_output = OutputBuilder::<_, DCB>::from(ok_output); let (err_output, err_stream) = builder.new_output(); - let mut err_output = OutputBuilder::<_, ErrCB>::from(err_output); + let mut err_output = OutputBuilder::<_, ECB>::from(err_output); let mut input = builder.new_input(oks.inner, Pipeline); builder.build(move |_capabilities| { let mut datums = DatumVec::new(); @@ -698,13 +698,13 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { /// callback writes ok results into the first session and errors into the second, returning /// the number of records produced. See [`ArrangementFlavor::flat_map`] for the fuel /// rationale. - fn flat_map_core_fallible( + fn flat_map_core_fallible( trace: Arranged<'scope, Tr>, key: Option<&::Owned>, mut logic: L, refuel: usize, ) -> ( - Stream<'scope, T, OkCB::Container>, + Stream<'scope, T, DCB::Container>, Stream<'scope, T, Vec<(DataflowErrorSer, T, Diff)>>, ) where @@ -717,14 +717,14 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { + 'static, ::Owned: PartialEq, D: Data, - OkCB: ContainerBuilder + PushInto<(D, T, Diff)>, + DCB: ContainerBuilder + PushInto<(D, T, Diff)>, L: FnMut( Tr::Key<'_>, Tr::Val<'_>, T, mz_repr::Diff, - &mut Session, - &mut Session>, + &mut Session, + &mut Session>, ) -> usize + 'static, { @@ -739,9 +739,9 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { let mut builder = OperatorBuilder::new(name, scope.clone()); let (ok_output, ok_stream) = builder.new_output(); - let mut ok_output = OutputBuilder::<_, OkCB>::from(ok_output); + let mut ok_output = OutputBuilder::<_, DCB>::from(ok_output); let (err_output, err_stream) = builder.new_output(); - let mut err_output = OutputBuilder::<_, ErrCB>::from(err_output); + let mut err_output = OutputBuilder::<_, ECB>::from(err_output); let mut input = builder.new_input(trace.stream.clone(), Pipeline); let operator_info = builder.operator_info(); @@ -800,12 +800,12 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { /// fallible variant for fuel semantics). Use this when the caller statically knows it /// will never produce `DataflowErrorSer` records, to avoid building a second output port /// and the empty err stream that would follow it. - fn flat_map_core_ok( + fn flat_map_core_ok( trace: Arranged<'scope, Tr>, key: Option<&::Owned>, mut logic: L, refuel: usize, - ) -> Stream<'scope, T, OkCB::Container> + ) -> Stream<'scope, T, DCB::Container> where Tr: for<'a> TraceReader< Key<'a>: ToDatumIter, @@ -816,8 +816,8 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { + 'static, ::Owned: PartialEq, D: Data, - OkCB: ContainerBuilder + PushInto<(D, T, Diff)>, - L: FnMut(Tr::Key<'_>, Tr::Val<'_>, T, mz_repr::Diff, &mut Session) -> usize + DCB: ContainerBuilder + PushInto<(D, T, Diff)>, + L: FnMut(Tr::Key<'_>, Tr::Val<'_>, T, mz_repr::Diff, &mut Session) -> usize + 'static, { let scope = trace.stream.scope(); @@ -831,7 +831,7 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { let mut builder = OperatorBuilder::new(name, scope.clone()); let (ok_output, ok_stream) = builder.new_output(); - let mut ok_output = OutputBuilder::<_, OkCB>::from(ok_output); + let mut ok_output = OutputBuilder::<_, DCB>::from(ok_output); let mut input = builder.new_input(trace.stream.clone(), Pipeline); let operator_info = builder.operator_info(); @@ -1174,7 +1174,7 @@ type Session<'a, 'b, T, CB> = /// Container builder used for the err output of every flat_map variant. Errors are low volume /// and we don't expect within-batch duplicates, so [`CapacityContainerBuilder`] is the right /// default; this matches the pre-refactor behavior of the now-removed `map_fallible` demux. -type ErrCB = CapacityContainerBuilder>; +type ECB = CapacityContainerBuilder>; /// Number of output records the arrangement flat_map operators may produce before yielding. /// See [`ArrangementFlavor::flat_map`] for the fuel rationale; the constant is a pragmatic @@ -1214,23 +1214,23 @@ where } /// Perform roughly `fuel` work through the cursor, applying `logic` and sending results to /// the two output sessions. - fn do_work( + fn do_work( &mut self, key: Option<&C::Key<'_>>, logic: &mut L, fuel: &mut usize, - ok_output: &mut OutputBuilderSession<'_, C::Time, OkCB>, - err_output: &mut OutputBuilderSession<'_, C::Time, ErrCB>, + ok_output: &mut OutputBuilderSession<'_, C::Time, DCB>, + err_output: &mut OutputBuilderSession<'_, C::Time, ECB>, ) where D: Data, - OkCB: ContainerBuilder + PushInto<(D, C::Time, C::Diff)>, + DCB: ContainerBuilder + PushInto<(D, C::Time, C::Diff)>, L: FnMut( C::Key<'_>, C::Val<'_>, C::Time, C::Diff, - &mut Session, - &mut Session>, + &mut Session, + &mut Session>, ) -> usize + 'static, { @@ -1267,16 +1267,16 @@ where /// Perform roughly `fuel` work through the cursor, applying `logic` and sending results to /// the single output session. - fn do_work( + fn do_work( &mut self, key: Option<&C::Key<'_>>, logic: &mut L, fuel: &mut usize, - ok_output: &mut OutputBuilderSession<'_, C::Time, OkCB>, + ok_output: &mut OutputBuilderSession<'_, C::Time, DCB>, ) where D: Data, - OkCB: ContainerBuilder + PushInto<(D, C::Time, C::Diff)>, - L: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff, &mut Session) -> usize + DCB: ContainerBuilder + PushInto<(D, C::Time, C::Diff)>, + L: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff, &mut Session) -> usize + 'static, { let mut ok_session = ok_output.session_with_builder(&self.capability);