diff --git a/src/compute/src/render.rs b/src/compute/src/render.rs index 35f96297de254..7b0797258ad2a 100644 --- a/src/compute/src/render.rs +++ b/src/compute/src/render.rs @@ -268,22 +268,25 @@ pub fn build_compute_dataflow( // Note: For correctness, we require that sources only emit times advanced by // `dataflow.as_of`. `persist_source` is documented to provide this guarantee. - let (mut ok_stream, err_stream, token) = - persist_source::persist_source::( - inner, - *source_id, - Arc::clone(&compute_state.persist_clients), - &compute_state.txns_ctx, - import.desc.storage_metadata.clone(), - read_schema, - dataflow.as_of.clone(), - snapshot_mode, - until.clone(), - mfp.as_mut(), - compute_state.dataflow_max_inflight_bytes(), - start_signal.clone().into_send_future(), - ErrorHandler::Halt("compute_import"), - ); + let (mut ok_stream, err_stream, token) = persist_source::persist_source::< + DataflowErrorSer, + CapacityContainerBuilder<_>, + CapacityContainerBuilder<_>, + >( + inner, + *source_id, + Arc::clone(&compute_state.persist_clients), + &compute_state.txns_ctx, + import.desc.storage_metadata.clone(), + read_schema, + dataflow.as_of.clone(), + snapshot_mode, + until.clone(), + mfp.as_mut(), + compute_state.dataflow_max_inflight_bytes(), + start_signal.clone().into_send_future(), + ErrorHandler::Halt("compute_import"), + ); // If `mfp` is non-identity, we need to apply what remains. // For the moment, assert that it is either trivial or `None`. diff --git a/src/compute/src/sink/materialized_view.rs b/src/compute/src/sink/materialized_view.rs index 1364dea7a19ea..5415b912de0be 100644 --- a/src/compute/src/sink/materialized_view.rs +++ b/src/compute/src/sink/materialized_view.rs @@ -408,22 +408,25 @@ pub(super) fn persist_source<'s>( let until = Antichain::new(); let map_filter_project = None; - let (ok_stream, err_stream, token) = - mz_storage_operators::persist_source::persist_source::( - scope, - sink_id, - Arc::clone(&compute_state.persist_clients), - &compute_state.txns_ctx, - target, - None, - as_of, - SnapshotMode::Include, - until, - map_filter_project, - compute_state.dataflow_max_inflight_bytes(), - start_signal.into_send_future(), - ErrorHandler::Halt("compute persist sink"), - ); + let (ok_stream, err_stream, token) = mz_storage_operators::persist_source::persist_source::< + DataflowErrorSer, + CapacityContainerBuilder<_>, + CapacityContainerBuilder<_>, + >( + scope, + sink_id, + Arc::clone(&compute_state.persist_clients), + &compute_state.txns_ctx, + target, + None, + as_of, + SnapshotMode::Include, + until, + map_filter_project, + compute_state.dataflow_max_inflight_bytes(), + start_signal.into_send_future(), + ErrorHandler::Halt("compute persist sink"), + ); let streams = OkErr::new(ok_stream, err_stream); (streams, token) diff --git a/src/storage-operators/src/persist_source.rs b/src/storage-operators/src/persist_source.rs index a4b8f87b6568d..fe831d8fbc887 100644 --- a/src/storage-operators/src/persist_source.rs +++ b/src/storage-operators/src/persist_source.rs @@ -45,15 +45,15 @@ use mz_storage_types::stats::RelationPartStats; use mz_timely_util::builder_async::{ Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton, }; +use mz_timely_util::operator::StreamExt as _; use mz_timely_util::probe::ProbeNotify; use mz_txn_wal::operator::{TxnsContext, txns_progress}; use serde::{Deserialize, Serialize}; -use timely::PartialOrder; -use timely::container::CapacityContainerBuilder; +use timely::container::{CapacityContainerBuilder, PushInto}; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession}; -use timely::dataflow::operators::{Capability, Leave, OkErr}; +use timely::dataflow::operators::{Capability, Leave}; use timely::dataflow::operators::{CapabilitySet, ConnectLoop, Feedback}; use timely::dataflow::{Scope, Stream, StreamVec}; use timely::order::TotalOrder; @@ -61,6 +61,7 @@ use timely::progress::Antichain; use timely::progress::Timestamp as TimelyTimestamp; use timely::progress::timestamp::PathSummary; use timely::scheduling::Activator; +use timely::{ContainerBuilder, PartialOrder}; use tokio::sync::mpsc::UnboundedSender; use tracing::{error, trace}; @@ -154,11 +155,14 @@ impl Subtime { /// flow control upper by an amount that is related to the size of batches. /// /// If no flow control is desired an empty stream whose frontier immediately advances -/// to the empty antichain can be used. An easy easy of creating such stream is by +/// to the empty antichain can be used. An easy way of creating such stream is by /// using [`timely::dataflow::operators::generic::operator::empty`]. /// +/// The caller must specify container builders for row and error data (`DCB`, `ECB`), +/// which the persist source uses to produce finished batches of data. +/// /// [advanced by]: differential_dataflow::lattice::Lattice::advance_by -pub fn persist_source<'scope, E>( +pub fn persist_source<'scope, E, DCB, ECB>( scope: Scope<'scope, mz_repr::Timestamp>, source_id: GlobalId, persist_clients: Arc, @@ -173,12 +177,14 @@ pub fn persist_source<'scope, E>( start_signal: impl Future + Send + 'static, error_handler: ErrorHandler, ) -> ( - StreamVec<'scope, mz_repr::Timestamp, (Row, Timestamp, Diff)>, - StreamVec<'scope, mz_repr::Timestamp, (E, Timestamp, Diff)>, + Stream<'scope, Timestamp, DCB::Container>, + Stream<'scope, Timestamp, ECB::Container>, Vec, ) where E: timely::ExchangeData + Ord + Clone + Debug + From + From, + DCB: ContainerBuilder + PushInto<(Row, Timestamp, Diff)>, + ECB: ContainerBuilder + PushInto<(E, Timestamp, Diff)>, { let shard_metrics = persist_clients.shard_metrics(&metadata.data_shard, &source_id.to_string()); @@ -279,10 +285,22 @@ where None => (stream, vec![]), }; tokens.extend(txns_tokens); - let (ok_stream, err_stream) = stream.ok_err(|(d, t, r)| match d { - Ok(row) => Ok((row, t.0, r)), - Err(err) => Err((err, t.0, r)), - }); + + let (ok_stream, err_stream) = + stream.unary_fallible::(Pipeline, "persist ok err demux", |_, _| { + Box::new(|input, oks, errs| { + input.for_each(|time, data| { + let mut oks = oks.session_with_builder(&time); + let mut errs = errs.session_with_builder(&time); + for (d, t, r) in data.drain(..) { + match d { + Ok(row) => oks.give((row, t.0, r)), + Err(err) => errs.give((err, t.0, r)), + } + } + }); + }) + }); (ok_stream, err_stream, tokens) } diff --git a/src/storage/src/render/sinks.rs b/src/storage/src/render/sinks.rs index e27212700e331..0a3165b214e50 100644 --- a/src/storage/src/render/sinks.rs +++ b/src/storage/src/render/sinks.rs @@ -25,6 +25,7 @@ use mz_storage_types::controller::CollectionMetadata; use mz_storage_types::errors::DataflowError; use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc}; use mz_timely_util::builder_async::PressOnDropButton; +use timely::container::CapacityContainerBuilder; use timely::dataflow::operators::Leave; use timely::dataflow::{Scope, StreamVec}; use tracing::warn; @@ -74,7 +75,11 @@ pub(crate) fn render_sink<'scope>( let mut tokens = vec![]; let sink_render = get_sink_render_for(&sink.connection); - let (ok_collection, err_collection, persist_tokens) = persist_source::persist_source( + let (ok_collection, err_collection, persist_tokens) = persist_source::persist_source::< + DataflowError, + CapacityContainerBuilder<_>, + CapacityContainerBuilder<_>, + >( scope, sink.from, Arc::clone(&storage_state.persist_clients), diff --git a/src/storage/src/sink/iceberg.rs b/src/storage/src/sink/iceberg.rs index 0f70b674f94a9..bff9e8cc9e494 100644 --- a/src/storage/src/sink/iceberg.rs +++ b/src/storage/src/sink/iceberg.rs @@ -1045,7 +1045,7 @@ where let hashed_id = sink_id.hashed(); let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index(); let (_, table_ready_stream) = builder.new_output::>>(); - let (output, output_stream) = builder.new_output(); + let (output, output_stream) = builder.new_output::>(); let (batch_desc_output, batch_desc_stream) = builder.new_output::>>(); let mut input = diff --git a/src/storage/src/upsert.rs b/src/storage/src/upsert.rs index cdc583d76b119..026c6935be2ac 100644 --- a/src/storage/src/upsert.rs +++ b/src/storage/src/upsert.rs @@ -756,7 +756,7 @@ where }; Some((UpsertKey::from_value(value_ref, &key_indices), value)) }); - let (output_handle, output) = builder.new_output(); + let (output_handle, output) = builder.new_output::>(); // An output that just reports progress of the snapshot consolidation process upstream to the // persist source to ensure that backpressure is applied diff --git a/src/timely-util/src/builder_async.rs b/src/timely-util/src/builder_async.rs index 51eb84b39d8b1..4ca914803779c 100644 --- a/src/timely-util/src/builder_async.rs +++ b/src/timely-util/src/builder_async.rs @@ -265,13 +265,13 @@ pub struct AsyncOutputHandle { index: usize, } -impl AsyncOutputHandle> +impl AsyncOutputHandle where T: Timestamp, - C: Container + Clone + 'static, + CB: ContainerBuilder, { #[inline] - pub fn give_container(&self, cap: &Capability, container: &mut C) { + pub fn give_container(&self, cap: &Capability, container: &mut CB::Container) { let mut inner = self.inner.borrow_mut(); inner.flush(); inner.output.give(cap, container); @@ -499,7 +499,7 @@ impl<'scope, T: Timestamp> OperatorBuilder<'scope, T> { pact: P, ) -> AsyncInputHandle where - D: Container + Clone + 'static, + D: Container, P: ParallelizationContract, { self.new_input_connection(stream, pact, Disconnected) @@ -513,7 +513,7 @@ impl<'scope, T: Timestamp> OperatorBuilder<'scope, T> { connection: C, ) -> AsyncInputHandle where - D: Container + Clone + 'static, + D: Container, P: ParallelizationContract, C: InputConnection + 'static, { diff --git a/src/timely-util/src/containers.rs b/src/timely-util/src/containers.rs index 00dc8b7aafeaa..9f45e53faa323 100644 --- a/src/timely-util/src/containers.rs +++ b/src/timely-util/src/containers.rs @@ -15,4 +15,39 @@ //! Reusable containers. +use std::marker::PhantomData; + +use timely::container::ContainerBuilder; + pub mod stack; + +/// A no-op [`ContainerBuilder`] that never emits a container. +/// +/// Useful for outputs that require a `ContainerBuilder` type parameter but only ever push whole +/// containers (e.g. through a `give_container`-style API), never individual elements. The builder +/// holds no state and returns `None` from both `extract` and `finish`; there is no [`PushInto`] +/// impl, so element-wise pushes are a compile error. +/// +/// [`PushInto`]: timely::container::PushInto +#[derive(Debug, Copy, Clone)] +pub struct NoopContainerBuilder(PhantomData); + +impl Default for NoopContainerBuilder { + fn default() -> Self { + Self(PhantomData) + } +} + +impl ContainerBuilder for NoopContainerBuilder { + type Container = C; + + #[inline] + fn extract(&mut self) -> Option<&mut Self::Container> { + None + } + + #[inline] + fn finish(&mut self) -> Option<&mut Self::Container> { + None + } +} diff --git a/src/txn-wal/src/operator.rs b/src/txn-wal/src/operator.rs index c6a4d8db4eb4d..76bb6de83a276 100644 --- a/src/txn-wal/src/operator.rs +++ b/src/txn-wal/src/operator.rs @@ -34,16 +34,17 @@ use mz_timely_util::builder_async::{ AsyncInputHandle, Event as AsyncEvent, InputConnection, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton, }; +use mz_timely_util::containers::NoopContainerBuilder; use timely::container::CapacityContainerBuilder; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::capture::Event; use timely::dataflow::operators::vec::{Broadcast, Map}; use timely::dataflow::operators::{Capture, Leave, Probe}; -use timely::dataflow::{ProbeHandle, Scope, StreamVec}; +use timely::dataflow::{ProbeHandle, Scope, Stream, StreamVec}; use timely::order::TotalOrder; use timely::progress::{Antichain, Timestamp}; use timely::worker::Worker; -use timely::{PartialOrder, WorkerConfig}; +use timely::{Container, PartialOrder, WorkerConfig}; use tracing::debug; use crate::TxnsCodecDefault; @@ -93,7 +94,7 @@ use crate::txn_read::{DataRemapEntry, TxnsRead}; /// on data from its input until the input has progressed to 10, at which /// point it can itself downgrade to 10. pub fn txns_progress<'scope, K, V, T, D, P, C, F>( - passthrough: StreamVec<'scope, T, P>, + passthrough: Stream<'scope, T, P>, name: &str, ctx: &TxnsContext, client_fn: impl Fn() -> F, @@ -103,13 +104,13 @@ pub fn txns_progress<'scope, K, V, T, D, P, C, F>( until: Antichain, data_key_schema: Arc, data_val_schema: Arc, -) -> (StreamVec<'scope, T, P>, Vec) +) -> (Stream<'scope, T, P>, Vec) where K: Debug + Codec + Send + Sync, V: Debug + Codec + Send + Sync, T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync, D: Debug + Clone + 'static + Monoid + Ord + Codec64 + Send + Sync, - P: Debug + Clone + 'static, + P: Container, C: TxnsCodec + 'static, F: Future + Send + 'static, { @@ -170,7 +171,7 @@ where V: Debug + Codec + Send + Sync, T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync, D: Debug + Clone + 'static + Monoid + Ord + Codec64 + Send + Sync, - P: Debug + Clone + 'static, + P: Container, C: TxnsCodec + 'static, { let worker_idx = scope.index(); @@ -230,18 +231,18 @@ where fn txns_progress_frontiers<'scope, K, V, T, D, P, C>( remap: StreamVec<'scope, T, DataRemapEntry>, - passthrough: StreamVec<'scope, T, P>, + passthrough: Stream<'scope, T, P>, name: &str, data_id: ShardId, until: Antichain, unique_id: u64, -) -> (StreamVec<'scope, T, P>, PressOnDropButton) +) -> (Stream<'scope, T, P>, PressOnDropButton) where K: Debug + Codec, V: Debug + Codec, T: Timestamp + Lattice + TotalOrder + StepForward + Codec64, D: Clone + 'static + Monoid + Codec64 + Send + Sync, - P: Debug + Clone + 'static, + P: Container, C: TxnsCodec, { let name = format!("txns_progress_frontiers({})", name); @@ -254,8 +255,7 @@ where passthrough.scope().peers(), data_id.to_string(), ); - let (passthrough_output, passthrough_stream) = - builder.new_output::>>(); + let (passthrough_output, passthrough_stream) = builder.new_output::>(); let mut remap_input = builder.new_disconnected_input(remap, Pipeline); let mut passthrough_input = builder.new_disconnected_input(passthrough, Pipeline); @@ -312,7 +312,6 @@ where // `shard_source` (before this operator) and // `mfp_and_decode` (after this operator) do the necessary // filtering. - debug!("{} emitting data {:?}", name, data); passthrough_output.give_container(&cap, &mut data); } AsyncEvent::Progress(new_progress) => { diff --git a/test/sqllogictest/introspection/relations.slt b/test/sqllogictest/introspection/relations.slt index 81494faa4000a..9149e29adaf6b 100644 --- a/test/sqllogictest/introspection/relations.slt +++ b/test/sqllogictest/introspection/relations.slt @@ -62,23 +62,23 @@ FlatMap Exchange alloc::vec::Vec<(u64,␠mz_txn_wal::txn_read::DataRemapEntry< FlatMap txns_progress_frontiers(u1) alloc::vec::Vec> FormArrangementKey ArrangeBy[[Column(0,␠"a"),␠Column(1,␠"b")]] mz_timely_util::columnar::Column<((mz_repr::row::Row,␠mz_repr::row::Row),␠mz_repr::timestamp::Timestamp,␠mz_ore::overflowing::Overflowing)> FormArrangementKey Concatenate alloc::vec::Vec<(mz_compute::render::errors::DataflowErrorSer,␠mz_repr::timestamp::Timestamp,␠mz_ore::overflowing::Overflowing)> -InputRegion:␠materialize.public.test_primary_idx BuildRegion:␠materialize.public.test_primary_idx alloc::vec::Vec<(mz_repr::row::Row,␠mz_repr::timestamp::Timestamp,␠mz_ore::overflowing::Overflowing)> InputRegion:␠materialize.public.test_primary_idx BuildRegion:␠materialize.public.test_primary_idx alloc::vec::Vec<(mz_compute::render::errors::DataflowErrorSer,␠mz_repr::timestamp::Timestamp,␠mz_ore::overflowing::Overflowing)> +InputRegion:␠materialize.public.test_primary_idx BuildRegion:␠materialize.public.test_primary_idx alloc::vec::Vec<(mz_repr::row::Row,␠mz_repr::timestamp::Timestamp,␠mz_ore::overflowing::Overflowing)> LimitProgress(Dataflow:␠materialize.public.test_primary_idx) Probe alloc::vec::Vec<(mz_repr::row::Row,␠mz_repr::timestamp::Timestamp,␠mz_ore::overflowing::Overflowing)> LogOperatorHydration␠(1) FormArrangementKey alloc::vec::Vec<(mz_repr::row::Row,␠mz_repr::timestamp::Timestamp,␠mz_ore::overflowing::Overflowing)> -OkErr SuppressEarlyProgress alloc::vec::Vec<(mz_repr::row::Row,␠mz_repr::timestamp::Timestamp,␠mz_ore::overflowing::Overflowing)> SuppressEarlyProgress LimitProgress(Dataflow:␠materialize.public.test_primary_idx) alloc::vec::Vec<(mz_repr::row::Row,␠mz_repr::timestamp::Timestamp,␠mz_ore::overflowing::Overflowing)> decode_backpressure_probe(u1) Feedback alloc::vec::Vec expire_stream_at(materialize.public.test_primary_idx_export_index_errs) LogDataflowErrorsStream alloc::vec::Vec)>>>> expire_stream_at(materialize.public.test_primary_idx_export_index_oks) InspectBatch alloc::vec::Vec)>>>> granular_backpressure(u1) shard_source_descs_return(u1) alloc::vec::Vec granular_backpressure(u1) txns_progress_frontiers(u1) alloc::vec::Vec<(core::result::Result,␠(mz_repr::timestamp::Timestamp,␠mz_storage_operators::persist_source::Subtime),␠mz_ore::overflowing::Overflowing)> +persist␠ok␠err␠demux SuppressEarlyProgress alloc::vec::Vec<(mz_repr::row::Row,␠mz_repr::timestamp::Timestamp,␠mz_ore::overflowing::Overflowing)> persist_source::decode_and_mfp(u1) InspectBatch alloc::vec::Vec<(core::result::Result,␠(mz_repr::timestamp::Timestamp,␠mz_storage_operators::persist_source::Subtime),␠mz_ore::overflowing::Overflowing)> persist_source_backpressure(backpressure(u1)) shard_source_fetch(u1) alloc::vec::Vec<(usize,␠mz_persist_client::fetch::ExchangeableBatchPart)> shard_source_descs(u1) granular_backpressure(u1) alloc::vec::Vec<(usize,␠mz_persist_client::fetch::ExchangeableBatchPart)> shard_source_fetch(u1) Feedback alloc::vec::Vec shard_source_fetch(u1) persist_source::decode_and_mfp(u1) alloc::vec::Vec> -txns_progress_frontiers(u1) OkErr alloc::vec::Vec<(core::result::Result,␠(mz_repr::timestamp::Timestamp,␠mz_storage_operators::persist_source::Subtime),␠mz_ore::overflowing::Overflowing)> +txns_progress_frontiers(u1) persist␠ok␠err␠demux alloc::vec::Vec<(core::result::Result,␠(mz_repr::timestamp::Timestamp,␠mz_storage_operators::persist_source::Subtime),␠mz_ore::overflowing::Overflowing)> txns_progress_source(u1) FlatMap alloc::vec::Vec> query IT rowsort