diff --git a/src/compute/src/render/context.rs b/src/compute/src/render/context.rs index e9f1c2341bf42..d9efc203cca07 100644 --- a/src/compute/src/render/context.rs +++ b/src/compute/src/render/context.rs @@ -32,13 +32,13 @@ use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow use mz_storage_types::controller::CollectionMetadata; use mz_timely_util::columnar::builder::ColumnBuilder; use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange}; -use mz_timely_util::operator::CollectionExt; -use timely::container::CapacityContainerBuilder; +use timely::ContainerBuilder; +use timely::container::{CapacityContainerBuilder, PushInto}; use timely::dataflow::channels::pact::{ExchangeCore, Pipeline}; use timely::dataflow::operators::Capability; use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession}; -use timely::dataflow::{Scope, StreamVec}; +use timely::dataflow::{Scope, Stream}; use timely::progress::operate::FrontierInterest; use timely::progress::{Antichain, Timestamp}; @@ -263,8 +263,28 @@ impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> { /// Constructs and applies logic to elements of `self` and returns the results. /// - /// The `logic` receives a vector of datums, a timestamp, and a diff, and produces - /// an iterator of `(D, T, Diff)` updates. + /// The `logic` callback receives a borrow of the decoded datum vector, a timestamp, a + /// diff, and two output sessions: one for `ok` updates of type `(D, T, Diff)` and one for + /// MFP-style `DataflowErrorSer` updates. It must return the number of records *produced* + /// (written to either session), not the number of input tuples consumed. + /// + /// # Fuel + /// + /// The operator accumulates the returned counts as fuel and yields when the total reaches + /// an internal refuel threshold. The metric is output-produced (not input-consumed) on + /// purpose: it regulates two asymmetric pressures. + /// + /// * **Drain inputs.** The operator holds a clone of each pending `Batch` until its work + /// item pops; we want to release that memory back to the upstream arrangement as soon + /// as possible. A `filter(false)` MFP returns 0 for every tuple, so fuel never trips + /// and the cursor runs to end-of-batch in one activation. + /// * **Throttle outputs.** A `map("1KB-string")` MFP produces large records per input; + /// stopping when emit count hits the threshold caps how much data a single activation + /// dumps on the next operator. + /// + /// The refuel constant is a pragmatic compromise: large enough to be a non-event in + /// steady-state, small enough that one activation can't flood downstream. There is no + /// universal value across MFP shapes. /// /// If `key` is set, this is a promise that `logic` will produce no results on /// records for which the key does not evaluate to the value. This is used to @@ -273,42 +293,113 @@ impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> { /// The `max_demand` parameter limits the number of columns decoded from the /// input. Only the first `max_demand` columns are decoded. Pass `usize::MAX` to /// decode all columns. - pub fn flat_map( + pub fn flat_map( &self, key: Option<&Row>, max_demand: usize, mut logic: L, ) -> ( - StreamVec<'scope, T, I::Item>, + Stream<'scope, T, DCB::Container>, VecCollection<'scope, T, DataflowErrorSer, Diff>, ) where - I: IntoIterator, D: Data, - L: for<'a, 'b> FnMut(&'a mut DatumVecBorrow<'b>, T, Diff) -> I + 'static, + DCB: ContainerBuilder + PushInto<(D, T, Diff)>, + L: for<'a, 'b> FnMut( + &'a mut DatumVecBorrow<'b>, + T, + Diff, + &mut Session, + &mut Session>, + ) -> usize + + 'static, { - // Set a number of tuples after which the operator should yield. - // This allows us to remain responsive even when enumerating a substantial - // arrangement, as well as provides time to accumulate our produced output. - let refuel = 1000000; + let mut datums = DatumVec::new(); + let logic = move |k: DatumSeq, + v: DatumSeq, + t, + d, + ok_session: &mut Session, + err_session: &mut Session>| { + let mut datums_borrow = datums.borrow(); + datums_borrow.extend(k.to_datum_iter().take(max_demand)); + let max_demand = max_demand.saturating_sub(datums_borrow.len()); + datums_borrow.extend(v.to_datum_iter().take(max_demand)); + logic(&mut datums_borrow, t, d, ok_session, err_session) + }; + + match &self { + ArrangementFlavor::Local(oks, errs) => { + let (oks, mfp_errs) = CollectionBundle::::flat_map_core_fallible::<_, _, DCB, _>( + oks.clone(), + key, + logic, + REFUEL, + ); + let errs = errs.clone().as_collection(|k, &()| k.clone()); + let errs = errs.concat(mfp_errs.as_collection()); + (oks, errs) + } + ArrangementFlavor::Trace(_, oks, errs) => { + let (oks, mfp_errs) = CollectionBundle::::flat_map_core_fallible::<_, _, DCB, _>( + oks.clone(), + key, + logic, + REFUEL, + ); + let errs = errs.clone().as_collection(|k, &()| k.clone()); + let errs = errs.concat(mfp_errs.as_collection()); + (oks, errs) + } + } + } + /// Ok-only variant of [`Self::flat_map`]. The `logic` callback receives a single output + /// session, cannot produce errors, and returns the number of records produced (see + /// [`Self::flat_map`] for fuel semantics). The returned err collection comes solely from + /// the arrangement; no extra operator is built to carry an empty MFP-error stream. + pub fn flat_map_ok( + &self, + key: Option<&Row>, + max_demand: usize, + mut logic: L, + ) -> ( + Stream<'scope, T, DCB::Container>, + VecCollection<'scope, T, DataflowErrorSer, Diff>, + ) + where + D: Data, + DCB: ContainerBuilder + PushInto<(D, T, Diff)>, + L: for<'a, 'b> FnMut(&'a mut DatumVecBorrow<'b>, T, Diff, &mut Session) -> usize + + 'static, + { let mut datums = DatumVec::new(); - let logic = move |k: DatumSeq, v: DatumSeq, t, d| { + let logic = move |k: DatumSeq, v: DatumSeq, t, d, ok_session: &mut Session| { let mut datums_borrow = datums.borrow(); datums_borrow.extend(k.to_datum_iter().take(max_demand)); let max_demand = max_demand.saturating_sub(datums_borrow.len()); datums_borrow.extend(v.to_datum_iter().take(max_demand)); - logic(&mut datums_borrow, t, d) + logic(&mut datums_borrow, t, d, ok_session) }; match &self { ArrangementFlavor::Local(oks, errs) => { - let oks = CollectionBundle::::flat_map_core(oks.clone(), key, logic, refuel); + let oks = CollectionBundle::::flat_map_core_ok::<_, _, DCB, _>( + oks.clone(), + key, + logic, + REFUEL, + ); let errs = errs.clone().as_collection(|k, &()| k.clone()); (oks, errs) } ArrangementFlavor::Trace(_, oks, errs) => { - let oks = CollectionBundle::::flat_map_core(oks.clone(), key, logic, refuel); + let oks = CollectionBundle::::flat_map_core_ok::<_, _, DCB, _>( + oks.clone(), + key, + logic, + REFUEL, + ); let errs = errs.clone().as_collection(|k, &()| k.clone()); (oks, errs) } @@ -492,10 +583,18 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { panic!("The collection arranged by {:?} doesn't exist.", key) }); if ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION.get(config_set) { - // Decode all columns, pass max_demand as usize::MAX. - let (ok, err) = arranged.flat_map(None, usize::MAX, |borrow, t, r| { - Some((SharedRow::pack(borrow.iter()), t, r)) - }); + // Decode all columns, pass max_demand as usize::MAX. Output is 1:1 from the + // cursor (no duplicates), so a non-consolidating container builder is the + // right choice. + let (ok, err) = arranged + .flat_map_ok::<_, CapacityContainerBuilder>, _>( + None, + usize::MAX, + |borrow, t, r, ok_session| { + ok_session.give((SharedRow::pack(borrow.iter()), t, r)); + 1 + }, + ); (ok.as_collection(), err) } else { #[allow(deprecated)] @@ -520,19 +619,26 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { /// The `max_demand` parameter limits the number of columns decoded from the /// input. Only the first `max_demand` columns are decoded. Pass `usize::MAX` to /// decode all columns. - pub fn flat_map( + pub fn flat_map( &self, key_val: Option<(Vec, Option)>, max_demand: usize, mut logic: L, ) -> ( - StreamVec<'scope, T, I::Item>, + Stream<'scope, T, DCB::Container>, VecCollection<'scope, T, DataflowErrorSer, Diff>, ) where - I: IntoIterator, D: Data, - L: for<'a> FnMut(&'a mut DatumVecBorrow<'_>, T, Diff) -> I + 'static, + DCB: ContainerBuilder + PushInto<(D, T, Diff)>, + L: for<'a> FnMut( + &'a mut DatumVecBorrow<'_>, + T, + Diff, + &mut Session, + &mut Session>, + ) -> usize + + 'static, { // If `key_val` is set, we should have to use the corresponding arrangement. // If there isn't one, that implies an error in the contract between @@ -540,18 +646,45 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { if let Some((key, val)) = key_val { self.arrangement(&key) .expect("Should have ensured during planning that this arrangement exists.") - .flat_map(val.as_ref(), max_demand, logic) + .flat_map::<_, DCB, _>(val.as_ref(), max_demand, logic) } else { - use timely::dataflow::operators::vec::Map; let (oks, errs) = self .collection .clone() .expect("Invariant violated: CollectionBundle contains no collection."); - let mut datums = DatumVec::new(); - let oks = oks.inner.flat_map(move |(v, t, d)| { - logic(&mut datums.borrow_with_limit(&v, max_demand), t, d) + let scope = oks.inner.scope(); + let mut builder = OperatorBuilder::new("CollectionFlatMap".to_string(), scope); + let (ok_output, ok_stream) = builder.new_output(); + let mut ok_output = OutputBuilder::<_, DCB>::from(ok_output); + let (err_output, err_stream) = builder.new_output(); + let mut err_output = OutputBuilder::<_, ECB>::from(err_output); + let mut input = builder.new_input(oks.inner, Pipeline); + builder.build(move |_capabilities| { + let mut datums = DatumVec::new(); + move |_frontiers| { + let mut ok_output = ok_output.activate(); + let mut err_output = err_output.activate(); + input.for_each(|time, data| { + // Retain the input capability to derive a `Capability` for each output; + // the `Session` type alias is fixed to `Capability`. + let ok_cap = time.retain(0); + let err_cap = time.retain(1); + let mut ok_session = ok_output.session_with_builder(&ok_cap); + let mut err_session = err_output.session_with_builder(&err_cap); + for (v, t, d) in data.iter() { + logic( + &mut datums.borrow_with_limit(v, max_demand), + t.clone(), + d.clone(), + &mut ok_session, + &mut err_session, + ); + } + }); + } }); - (oks, errs) + let errs = errs.concat(err_stream.as_collection()); + (ok_stream, errs) } } @@ -561,13 +694,19 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { /// once, and thereby avoid any skew in the two uses of the logic. /// /// The function presents the contents of the trace as `(key, value, time, delta)` tuples, - /// where key and value are potentially specialized, but convertible into rows. - fn flat_map_core( + /// where key and value are potentially specialized, but convertible into rows. The `logic` + /// callback writes ok results into the first session and errors into the second, returning + /// the number of records produced. See [`ArrangementFlavor::flat_map`] for the fuel + /// rationale. + fn flat_map_core_fallible( trace: Arranged<'scope, Tr>, key: Option<&::Owned>, mut logic: L, refuel: usize, - ) -> StreamVec<'scope, T, I::Item> + ) -> ( + Stream<'scope, T, DCB::Container>, + Stream<'scope, T, Vec<(DataflowErrorSer, T, Diff)>>, + ) where Tr: for<'a> TraceReader< Key<'a>: ToDatumIter, @@ -577,11 +716,18 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { > + Clone + 'static, ::Owned: PartialEq, - I: IntoIterator, D: Data, - L: FnMut(Tr::Key<'_>, Tr::Val<'_>, T, mz_repr::Diff) -> I + 'static, + DCB: ContainerBuilder + PushInto<(D, T, Diff)>, + L: FnMut( + Tr::Key<'_>, + Tr::Val<'_>, + T, + mz_repr::Diff, + &mut Session, + &mut Session>, + ) -> usize + + 'static, { - use differential_dataflow::consolidation::ConsolidatingContainerBuilder as CB; let scope = trace.stream.scope(); let mut key_con = Tr::KeyContainer::with_capacity(1); @@ -590,48 +736,142 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { } let mode = if key.is_some() { "index" } else { "scan" }; let name = format!("ArrangementFlatMap({})", mode); - use timely::dataflow::operators::Operator; - trace - .stream - .unary::, _, _, _>(Pipeline, &name, move |_, info| { - // Acquire an activator to reschedule the operator when it has unfinished work. - let activator = scope.activator_for(info.address); - // Maintain a list of work to do, cursor to navigate and process. - let mut todo = std::collections::VecDeque::new(); - move |input, output| { - let key = key_con.get(0); - // First, dequeue all batches. - input.for_each(|time, data| { - let capability = time.retain(0); - for batch in data.iter() { - // enqueue a capability, cursor, and batch. - todo.push_back(PendingWork::new( - capability.clone(), - batch.cursor(), - batch.clone(), - )); - } - }); - // Second, make progress on `todo`. - let mut fuel = refuel; - while !todo.is_empty() && fuel > 0 { - todo.front_mut().unwrap().do_work( - key.as_ref(), - &mut logic, - &mut fuel, - output, - ); - if fuel > 0 { - todo.pop_front(); - } + let mut builder = OperatorBuilder::new(name, scope.clone()); + let (ok_output, ok_stream) = builder.new_output(); + let mut ok_output = OutputBuilder::<_, DCB>::from(ok_output); + let (err_output, err_stream) = builder.new_output(); + let mut err_output = OutputBuilder::<_, ECB>::from(err_output); + let mut input = builder.new_input(trace.stream.clone(), Pipeline); + let operator_info = builder.operator_info(); + + builder.build(move |_capabilities| { + // Acquire an activator to reschedule the operator when it has unfinished work. + let activator = scope.activator_for(operator_info.address); + // Maintain a list of work to do, cursor to navigate and process. + let mut todo = std::collections::VecDeque::new(); + move |_frontiers| { + let key = key_con.get(0); + let mut ok_output = ok_output.activate(); + let mut err_output = err_output.activate(); + + // First, dequeue all batches. + input.for_each(|time, data| { + // Retain a capability for each output, as the work may complete across + // multiple activations. + let ok_cap = time.retain(0); + let err_cap = time.retain(1); + for batch in data.iter() { + todo.push_back(PendingWork::new( + ok_cap.clone(), + err_cap.clone(), + batch.cursor(), + batch.clone(), + )); } - // If we have not finished all work, re-activate the operator. - if !todo.is_empty() { - activator.activate(); + }); + + // Second, make progress on `todo`. + let mut fuel = refuel; + while !todo.is_empty() && fuel > 0 { + todo.front_mut().unwrap().do_work( + key.as_ref(), + &mut logic, + &mut fuel, + &mut ok_output, + &mut err_output, + ); + if fuel > 0 { + todo.pop_front(); } } - }) + // If we have not finished all work, re-activate the operator. + if !todo.is_empty() { + activator.activate(); + } + } + }); + + (ok_stream, err_stream) + } + + /// Ok-only variant of [`Self::flat_map_core_fallible`]. The `logic` callback writes results + /// into a single output session and returns the number of records produced (see the + /// fallible variant for fuel semantics). Use this when the caller statically knows it + /// will never produce `DataflowErrorSer` records, to avoid building a second output port + /// and the empty err stream that would follow it. + fn flat_map_core_ok( + trace: Arranged<'scope, Tr>, + key: Option<&::Owned>, + mut logic: L, + refuel: usize, + ) -> Stream<'scope, T, DCB::Container> + where + Tr: for<'a> TraceReader< + Key<'a>: ToDatumIter, + Val<'a>: ToDatumIter, + Time = T, + Diff = mz_repr::Diff, + > + Clone + + 'static, + ::Owned: PartialEq, + D: Data, + DCB: ContainerBuilder + PushInto<(D, T, Diff)>, + L: FnMut(Tr::Key<'_>, Tr::Val<'_>, T, mz_repr::Diff, &mut Session) -> usize + + 'static, + { + let scope = trace.stream.scope(); + + let mut key_con = Tr::KeyContainer::with_capacity(1); + if let Some(key) = &key { + key_con.push_own(key); + } + let mode = if key.is_some() { "index" } else { "scan" }; + let name = format!("ArrangementFlatMapOk({})", mode); + + let mut builder = OperatorBuilder::new(name, scope.clone()); + let (ok_output, ok_stream) = builder.new_output(); + let mut ok_output = OutputBuilder::<_, DCB>::from(ok_output); + let mut input = builder.new_input(trace.stream.clone(), Pipeline); + let operator_info = builder.operator_info(); + + builder.build(move |_capabilities| { + let activator = scope.activator_for(operator_info.address); + let mut todo = std::collections::VecDeque::new(); + move |_frontiers| { + let key = key_con.get(0); + let mut ok_output = ok_output.activate(); + + input.for_each(|time, data| { + let cap = time.retain(0); + for batch in data.iter() { + todo.push_back(PendingWorkOk::new( + cap.clone(), + batch.cursor(), + batch.clone(), + )); + } + }); + + let mut fuel = refuel; + while !todo.is_empty() && fuel > 0 { + todo.front_mut().unwrap().do_work( + key.as_ref(), + &mut logic, + &mut fuel, + &mut ok_output, + ); + if fuel > 0 { + todo.pop_front(); + } + } + if !todo.is_empty() { + activator.activate(); + } + } + }); + + ok_stream } /// Look up an arrangement by the expressions that form the key. @@ -690,49 +930,48 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { // Wrap in an `Rc` so that lifetimes work out. let until = std::rc::Rc::new(until); - let (stream, errors) = self.flat_map(key_val, max_demand, move |row_datums, time, diff| { - let mut row_builder = SharedRow::get(); - let until = std::rc::Rc::clone(&until); - let temp_storage = RowArena::new(); - let row_iter = row_datums.iter(); - let mut datums_local = datum_vec.borrow(); - datums_local.extend(row_iter); - let time = time.clone(); - let event_time = time.event_time(); - mfp_plan - .evaluate( - &mut datums_local, - &temp_storage, - event_time, - diff.clone(), - move |time| !until.less_equal(time), - &mut row_builder, - ) - .map(move |x| match x { - Ok((row, event_time, diff)) => { - // Copy the whole time, and re-populate event time. - let mut time: T = time.clone(); - *time.event_time_mut() = event_time; - (Ok(row), time, diff) - } - Err((e, event_time, diff)) => { - // Copy the whole time, and re-populate event time. - let mut time: T = time.clone(); - *time.event_time_mut() = event_time; - (Err(e), time, diff) + let (stream, errors) = self + .flat_map::<_, ConsolidatingContainerBuilder>, _>( + key_val, + max_demand, + move |row_datums, time, diff, ok_session, err_session| { + let mut row_builder = SharedRow::get(); + let until = std::rc::Rc::clone(&until); + let temp_storage = RowArena::new(); + let row_iter = row_datums.iter(); + let mut datums_local = datum_vec.borrow(); + datums_local.extend(row_iter); + let event_time = time.event_time(); + let mut work: usize = 0; + for result in mfp_plan.evaluate( + &mut datums_local, + &temp_storage, + event_time, + diff.clone(), + move |time| !until.less_equal(time), + &mut row_builder, + ) { + work += 1; + match result { + Ok((row, event_time, diff)) => { + // Copy the whole time, and re-populate event time. + let mut time: T = time.clone(); + *time.event_time_mut() = event_time; + ok_session.give((row, time, diff)); + } + Err((e, event_time, diff)) => { + // Copy the whole time, and re-populate event time. + let mut time: T = time.clone(); + *time.event_time_mut() = event_time; + err_session.give((e, time, diff)); + } + } } - }) - }); - - use differential_dataflow::AsCollection; - let (oks, errs) = stream - .as_collection() - .map_fallible::, CapacityContainerBuilder<_>, _, _, _>( - "OkErr", - |x| x, + work + }, ); - (oks, errors.concat(errs)) + (stream.as_collection(), errors) } pub fn ensure_collections( mut self, @@ -926,11 +1165,30 @@ impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> { } } +/// Type alias for a timely output `Session` whose capability is a `Capability`. The container +/// builder `CB` is left to the caller; sessions can therefore drive consolidating, capacity, or +/// (in the future) columnar output builders without changing call sites. +type Session<'a, 'b, T, CB> = + timely::dataflow::operators::generic::Session<'a, 'b, T, CB, Capability>; + +/// Container builder used for the err output of every flat_map variant. Errors are low volume +/// and we don't expect within-batch duplicates, so [`CapacityContainerBuilder`] is the right +/// default; this matches the pre-refactor behavior of the now-removed `map_fallible` demux. +type ECB = CapacityContainerBuilder>; + +/// Number of output records the arrangement flat_map operators may produce before yielding. +/// See [`ArrangementFlavor::flat_map`] for the fuel rationale; the constant is a pragmatic +/// compromise and not tuned empirically. +const REFUEL: usize = 1_000_000; + struct PendingWork where C: Cursor, { - capability: Capability, + /// Capability for the `ok` output (output port 0). + ok_capability: Capability, + /// Capability for the `err` output (output port 1). + err_capability: Capability, cursor: C, batch: C::Storage, } @@ -939,7 +1197,66 @@ impl PendingWork where C: Cursor>, { - /// Create a new bundle of pending work, from the capability, cursor, and backing storage. + /// Create a new bundle of pending work, from a pair of capabilities (one per output), + /// a cursor, and backing storage. + fn new( + ok_capability: Capability, + err_capability: Capability, + cursor: C, + batch: C::Storage, + ) -> Self { + Self { + ok_capability, + err_capability, + cursor, + batch, + } + } + /// Perform roughly `fuel` work through the cursor, applying `logic` and sending results to + /// the two output sessions. + fn do_work( + &mut self, + key: Option<&C::Key<'_>>, + logic: &mut L, + fuel: &mut usize, + ok_output: &mut OutputBuilderSession<'_, C::Time, DCB>, + err_output: &mut OutputBuilderSession<'_, C::Time, ECB>, + ) where + D: Data, + DCB: ContainerBuilder + PushInto<(D, C::Time, C::Diff)>, + L: FnMut( + C::Key<'_>, + C::Val<'_>, + C::Time, + C::Diff, + &mut Session, + &mut Session>, + ) -> usize + + 'static, + { + let mut ok_session = ok_output.session_with_builder(&self.ok_capability); + let mut err_session = err_output.session_with_builder(&self.err_capability); + walk_cursor(&mut self.cursor, &self.batch, key, fuel, |k, v, t, d| { + logic(k, v, t, d, &mut ok_session, &mut err_session) + }); + } +} + +/// Pending work for the Ok-only variant of `flat_map_core_fallible`. Holds a single capability since +/// the operator has only one output port. +struct PendingWorkOk +where + C: Cursor, +{ + capability: Capability, + cursor: C, + batch: C::Storage, +} + +impl PendingWorkOk +where + C: Cursor>, +{ fn new(capability: Capability, cursor: C, batch: C::Storage) -> Self { Self { capability, @@ -947,71 +1264,91 @@ where batch, } } - /// Perform roughly `fuel` work through the cursor, applying `logic` and sending results to `output`. - fn do_work( + + /// Perform roughly `fuel` work through the cursor, applying `logic` and sending results to + /// the single output session. + fn do_work( &mut self, key: Option<&C::Key<'_>>, logic: &mut L, fuel: &mut usize, - output: &mut OutputBuilderSession<'_, C::Time, ConsolidatingContainerBuilder>>, + ok_output: &mut OutputBuilderSession<'_, C::Time, DCB>, ) where - I: IntoIterator, D: Data, - L: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff) -> I + 'static, + DCB: ContainerBuilder + PushInto<(D, C::Time, C::Diff)>, + L: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff, &mut Session) -> usize + + 'static, { - use differential_dataflow::consolidation::consolidate; - - // Attempt to make progress on this batch. - let mut work: usize = 0; - let mut session = output.session_with_builder(&self.capability); - let mut buffer = Vec::new(); - if let Some(key) = key { - let key = C::KeyContainer::reborrow(*key); - if self.cursor.get_key(&self.batch).map(|k| k == key) != Some(true) { - self.cursor.seek_key(&self.batch, key); - } - if self.cursor.get_key(&self.batch).map(|k| k == key) == Some(true) { - let key = self.cursor.key(&self.batch); - while let Some(val) = self.cursor.get_val(&self.batch) { - self.cursor.map_times(&self.batch, |time, diff| { - buffer.push((C::owned_time(time), C::owned_diff(diff))); - }); - consolidate(&mut buffer); - for (time, diff) in buffer.drain(..) { - for datum in logic(key, val, time, diff) { - session.give(datum); - work += 1; - } - } - self.cursor.step_val(&self.batch); - if work >= *fuel { - *fuel = 0; - return; - } + let mut ok_session = ok_output.session_with_builder(&self.capability); + walk_cursor(&mut self.cursor, &self.batch, key, fuel, |k, v, t, d| { + logic(k, v, t, d, &mut ok_session) + }); + } +} + +/// Walk a cursor, calling `emit` for each consolidated `(key, val, time, diff)` tuple. If +/// `key` is set, the cursor is seeked to it and only values for that key are produced. +/// +/// `emit` returns the number of records it produced for the given input tuple. The cursor +/// stops as soon as the accumulated emit count reaches `*fuel`, leaving the cursor in place +/// so work can resume on a later call. Within a batch, both the inner val loop and the +/// outer key loop are bounded only by emit count, so selective filters (`emit` returns 0) +/// run to batch completion in a single activation — see [`ArrangementFlavor::flat_map`] +/// for why fuel counts output rather than input. +fn walk_cursor( + cursor: &mut C, + batch: &C::Storage, + key: Option<&C::Key<'_>>, + fuel: &mut usize, + mut emit: F, +) where + C: Cursor>, + F: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff) -> usize, +{ + use differential_dataflow::consolidation::consolidate; + + let mut work: usize = 0; + let mut buffer = Vec::new(); + if let Some(key) = key { + let key = C::KeyContainer::reborrow(*key); + if cursor.get_key(batch).map(|k| k == key) != Some(true) { + cursor.seek_key(batch, key); + } + if cursor.get_key(batch).map(|k| k == key) == Some(true) { + let key = cursor.key(batch); + while let Some(val) = cursor.get_val(batch) { + cursor.map_times(batch, |time, diff| { + buffer.push((C::owned_time(time), C::owned_diff(diff))); + }); + consolidate(&mut buffer); + for (time, diff) in buffer.drain(..) { + work += emit(key, val, time, diff); + } + cursor.step_val(batch); + if work >= *fuel { + *fuel = 0; + return; } } - } else { - while let Some(key) = self.cursor.get_key(&self.batch) { - while let Some(val) = self.cursor.get_val(&self.batch) { - self.cursor.map_times(&self.batch, |time, diff| { - buffer.push((C::owned_time(time), C::owned_diff(diff))); - }); - consolidate(&mut buffer); - for (time, diff) in buffer.drain(..) { - for datum in logic(key, val, time, diff) { - session.give(datum); - work += 1; - } - } - self.cursor.step_val(&self.batch); - if work >= *fuel { - *fuel = 0; - return; - } + } + } else { + while let Some(key) = cursor.get_key(batch) { + while let Some(val) = cursor.get_val(batch) { + cursor.map_times(batch, |time, diff| { + buffer.push((C::owned_time(time), C::owned_diff(diff))); + }); + consolidate(&mut buffer); + for (time, diff) in buffer.drain(..) { + work += emit(key, val, time, diff); + } + cursor.step_val(batch); + if work >= *fuel { + *fuel = 0; + return; } - self.cursor.step_key(&self.batch); } + cursor.step_key(batch); } - *fuel -= work; } + *fuel -= work; } diff --git a/src/compute/src/render/reduce.rs b/src/compute/src/render/reduce.rs index d4f34d2951846..e165bd567c5e2 100644 --- a/src/compute/src/render/reduce.rs +++ b/src/compute/src/render/reduce.rs @@ -103,59 +103,68 @@ impl<'scope, T: RenderTimestamp> Context<'scope, T> { let max_demand = demand.iter().max().map(|x| *x + 1).unwrap_or(0); let skips = mz_compute_types::plan::reduce::convert_indexes_to_skips(demand); - let (key_val_input, err_input) = input.enter_region(inner).flat_map( - input_key.map(|k| (k, None)), - max_demand, - move |row_datums, time, diff| { - let mut row_builder = SharedRow::get(); - let temp_storage = RowArena::new(); - - let mut row_iter = row_datums.drain(..); - let mut datums_local = datums.borrow(); - // Unpack only the demanded columns. - for skip in skips.iter() { - datums_local.push(row_iter.nth(*skip).unwrap()); - } - - // Evaluate the key expressions. - let key = - key_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder); - let key = match key { - Err(e) => { - return Some((Err(e.into()), time.clone(), diff.clone())); - } - Ok(Some(key)) => key.clone(), - Ok(None) => panic!("Row expected as no predicate was used"), - }; + let (key_val_input, err) = input + .enter_region(inner) + .flat_map::<_, ConsolidatingContainerBuilder>, _>( + input_key.map(|k| (k, None)), + max_demand, + move |row_datums, time, diff, ok_session, err_session| { + let mut row_builder = SharedRow::get(); + let temp_storage = RowArena::new(); - // Evaluate the value expressions. - // The prior evaluation may have left additional columns we should delete. - datums_local.truncate(skips.len()); - let val = - val_plan.evaluate_into(&mut datums_local, &temp_storage, &mut row_builder); - let val = match val { - Err(e) => { - return Some((Err(e.into()), time.clone(), diff.clone())); + let mut row_iter = row_datums.drain(..); + let mut datums_local = datums.borrow(); + // Unpack only the demanded columns. + for skip in skips.iter() { + datums_local.push(row_iter.nth(*skip).unwrap()); } - Ok(Some(val)) => val.clone(), - Ok(None) => panic!("Row expected as no predicate was used"), - }; - - Some((Ok((key, val)), time.clone(), diff.clone())) - }, - ); - // Demux out the potential errors from key and value selector evaluation. - type CB = ConsolidatingContainerBuilder; - let (ok, mut err) = key_val_input - .as_collection() - .flat_map_fallible::, CB<_>, _, _, _, _>("OkErrDemux", Some); + // Evaluate the key expressions. + let key = key_plan.evaluate_into( + &mut datums_local, + &temp_storage, + &mut row_builder, + ); + let key = match key { + Err(e) => { + err_session.give((e.into(), time, diff)); + return 1; + } + Ok(Some(key)) => key.clone(), + Ok(None) => panic!("Row expected as no predicate was used"), + }; + + // Evaluate the value expressions. + // The prior evaluation may have left additional columns we should delete. + datums_local.truncate(skips.len()); + let val = val_plan.evaluate_into( + &mut datums_local, + &temp_storage, + &mut row_builder, + ); + let val = match val { + Err(e) => { + err_session.give((e.into(), time, diff)); + return 1; + } + Ok(Some(val)) => val.clone(), + Ok(None) => panic!("Row expected as no predicate was used"), + }; - err = err.concat(err_input); + ok_session.give(((key, val), time, diff)); + 1 + }, + ); // Render the reduce plan - self.render_reduce_plan(reduce_plan, ok, err, key_arity, mfp_after) - .leave_region(self.scope) + self.render_reduce_plan( + reduce_plan, + key_val_input.as_collection(), + err, + key_arity, + mfp_after, + ) + .leave_region(self.scope) }) }