Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 19 additions & 16 deletions src/compute/src/render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,22 +268,25 @@ pub fn build_compute_dataflow(

// Note: For correctness, we require that sources only emit times advanced by
// `dataflow.as_of`. `persist_source` is documented to provide this guarantee.
let (mut ok_stream, err_stream, token) =
persist_source::persist_source::<DataflowErrorSer>(
inner,
*source_id,
Arc::clone(&compute_state.persist_clients),
&compute_state.txns_ctx,
import.desc.storage_metadata.clone(),
read_schema,
dataflow.as_of.clone(),
snapshot_mode,
until.clone(),
mfp.as_mut(),
compute_state.dataflow_max_inflight_bytes(),
start_signal.clone().into_send_future(),
ErrorHandler::Halt("compute_import"),
);
let (mut ok_stream, err_stream, token) = persist_source::persist_source::<
DataflowErrorSer,
CapacityContainerBuilder<_>,
CapacityContainerBuilder<_>,
>(
inner,
*source_id,
Arc::clone(&compute_state.persist_clients),
&compute_state.txns_ctx,
import.desc.storage_metadata.clone(),
read_schema,
dataflow.as_of.clone(),
snapshot_mode,
until.clone(),
mfp.as_mut(),
compute_state.dataflow_max_inflight_bytes(),
start_signal.clone().into_send_future(),
ErrorHandler::Halt("compute_import"),
);

// If `mfp` is non-identity, we need to apply what remains.
// For the moment, assert that it is either trivial or `None`.
Expand Down
35 changes: 19 additions & 16 deletions src/compute/src/sink/materialized_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,22 +408,25 @@ pub(super) fn persist_source<'s>(
let until = Antichain::new();
let map_filter_project = None;

let (ok_stream, err_stream, token) =
mz_storage_operators::persist_source::persist_source::<DataflowErrorSer>(
scope,
sink_id,
Arc::clone(&compute_state.persist_clients),
&compute_state.txns_ctx,
target,
None,
as_of,
SnapshotMode::Include,
until,
map_filter_project,
compute_state.dataflow_max_inflight_bytes(),
start_signal.into_send_future(),
ErrorHandler::Halt("compute persist sink"),
);
let (ok_stream, err_stream, token) = mz_storage_operators::persist_source::persist_source::<
DataflowErrorSer,
CapacityContainerBuilder<_>,
CapacityContainerBuilder<_>,
>(
scope,
sink_id,
Arc::clone(&compute_state.persist_clients),
&compute_state.txns_ctx,
target,
None,
as_of,
SnapshotMode::Include,
until,
map_filter_project,
compute_state.dataflow_max_inflight_bytes(),
start_signal.into_send_future(),
ErrorHandler::Halt("compute persist sink"),
);

let streams = OkErr::new(ok_stream, err_stream);
(streams, token)
Expand Down
40 changes: 29 additions & 11 deletions src/storage-operators/src/persist_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,23 @@ use mz_storage_types::stats::RelationPartStats;
use mz_timely_util::builder_async::{
Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
};
use mz_timely_util::operator::StreamExt as _;
use mz_timely_util::probe::ProbeNotify;
use mz_txn_wal::operator::{TxnsContext, txns_progress};
use serde::{Deserialize, Serialize};
use timely::PartialOrder;
use timely::container::CapacityContainerBuilder;
use timely::container::{CapacityContainerBuilder, PushInto};
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession};
use timely::dataflow::operators::{Capability, Leave, OkErr};
use timely::dataflow::operators::{Capability, Leave};
use timely::dataflow::operators::{CapabilitySet, ConnectLoop, Feedback};
use timely::dataflow::{Scope, Stream, StreamVec};
use timely::order::TotalOrder;
use timely::progress::Antichain;
use timely::progress::Timestamp as TimelyTimestamp;
use timely::progress::timestamp::PathSummary;
use timely::scheduling::Activator;
use timely::{ContainerBuilder, PartialOrder};
use tokio::sync::mpsc::UnboundedSender;
use tracing::{error, trace};

Expand Down Expand Up @@ -154,11 +155,14 @@ impl Subtime {
/// flow control upper by an amount that is related to the size of batches.
///
/// If no flow control is desired an empty stream whose frontier immediately advances
/// to the empty antichain can be used. An easy easy of creating such stream is by
/// to the empty antichain can be used. An easy way of creating such stream is by
/// using [`timely::dataflow::operators::generic::operator::empty`].
///
/// The caller must specify container builders for row and error data (`DCB`, `ECB`),
/// which the persist source uses to produce finished batches of data.
///
/// [advanced by]: differential_dataflow::lattice::Lattice::advance_by
pub fn persist_source<'scope, E>(
pub fn persist_source<'scope, E, DCB, ECB>(
scope: Scope<'scope, mz_repr::Timestamp>,
source_id: GlobalId,
persist_clients: Arc<PersistClientCache>,
Expand All @@ -173,12 +177,14 @@ pub fn persist_source<'scope, E>(
start_signal: impl Future<Output = ()> + Send + 'static,
error_handler: ErrorHandler,
) -> (
StreamVec<'scope, mz_repr::Timestamp, (Row, Timestamp, Diff)>,
StreamVec<'scope, mz_repr::Timestamp, (E, Timestamp, Diff)>,
Stream<'scope, Timestamp, DCB::Container>,
Stream<'scope, Timestamp, ECB::Container>,
Vec<PressOnDropButton>,
)
where
E: timely::ExchangeData + Ord + Clone + Debug + From<DataflowError> + From<EvalError>,
DCB: ContainerBuilder + PushInto<(Row, Timestamp, Diff)>,
ECB: ContainerBuilder + PushInto<(E, Timestamp, Diff)>,
{
let shard_metrics = persist_clients.shard_metrics(&metadata.data_shard, &source_id.to_string());

Expand Down Expand Up @@ -279,10 +285,22 @@ where
None => (stream, vec![]),
};
tokens.extend(txns_tokens);
let (ok_stream, err_stream) = stream.ok_err(|(d, t, r)| match d {
Ok(row) => Ok((row, t.0, r)),
Err(err) => Err((err, t.0, r)),
});

let (ok_stream, err_stream) =
stream.unary_fallible::<DCB, ECB, _, _>(Pipeline, "persist ok err demux", |_, _| {
Box::new(|input, oks, errs| {
input.for_each(|time, data| {
let mut oks = oks.session_with_builder(&time);
let mut errs = errs.session_with_builder(&time);
for (d, t, r) in data.drain(..) {
match d {
Ok(row) => oks.give((row, t.0, r)),
Err(err) => errs.give((err, t.0, r)),
}
}
});
})
});
(ok_stream, err_stream, tokens)
}

Expand Down
7 changes: 6 additions & 1 deletion src/storage/src/render/sinks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use mz_storage_types::controller::CollectionMetadata;
use mz_storage_types::errors::DataflowError;
use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
use mz_timely_util::builder_async::PressOnDropButton;
use timely::container::CapacityContainerBuilder;
use timely::dataflow::operators::Leave;
use timely::dataflow::{Scope, StreamVec};
use tracing::warn;
Expand Down Expand Up @@ -74,7 +75,11 @@ pub(crate) fn render_sink<'scope>(
let mut tokens = vec![];
let sink_render = get_sink_render_for(&sink.connection);

let (ok_collection, err_collection, persist_tokens) = persist_source::persist_source(
let (ok_collection, err_collection, persist_tokens) = persist_source::persist_source::<
DataflowError,
CapacityContainerBuilder<_>,
CapacityContainerBuilder<_>,
>(
scope,
sink.from,
Arc::clone(&storage_state.persist_clients),
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/sink/iceberg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1045,7 +1045,7 @@ where
let hashed_id = sink_id.hashed();
let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
let (_, table_ready_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
let (output, output_stream) = builder.new_output();
let (output, output_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
let (batch_desc_output, batch_desc_stream) =
builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
let mut input =
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ where
};
Some((UpsertKey::from_value(value_ref, &key_indices), value))
});
let (output_handle, output) = builder.new_output();
let (output_handle, output) = builder.new_output::<CapacityContainerBuilder<_>>();

// An output that just reports progress of the snapshot consolidation process upstream to the
// persist source to ensure that backpressure is applied
Expand Down
10 changes: 5 additions & 5 deletions src/timely-util/src/builder_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,13 +265,13 @@ pub struct AsyncOutputHandle<T: Timestamp, CB: ContainerBuilder> {
index: usize,
}

impl<T, C> AsyncOutputHandle<T, CapacityContainerBuilder<C>>
impl<T, CB> AsyncOutputHandle<T, CB>
where
T: Timestamp,
C: Container + Clone + 'static,
CB: ContainerBuilder,
{
#[inline]
pub fn give_container(&self, cap: &Capability<T>, container: &mut C) {
pub fn give_container(&self, cap: &Capability<T>, container: &mut CB::Container) {
let mut inner = self.inner.borrow_mut();
inner.flush();
inner.output.give(cap, container);
Expand Down Expand Up @@ -499,7 +499,7 @@ impl<'scope, T: Timestamp> OperatorBuilder<'scope, T> {
pact: P,
) -> AsyncInputHandle<T, D, Disconnected>
where
D: Container + Clone + 'static,
D: Container,
P: ParallelizationContract<T, D>,
{
self.new_input_connection(stream, pact, Disconnected)
Expand All @@ -513,7 +513,7 @@ impl<'scope, T: Timestamp> OperatorBuilder<'scope, T> {
connection: C,
) -> AsyncInputHandle<T, D, C>
where
D: Container + Clone + 'static,
D: Container,
P: ParallelizationContract<T, D>,
C: InputConnection<T> + 'static,
{
Expand Down
35 changes: 35 additions & 0 deletions src/timely-util/src/containers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,39 @@

//! Reusable containers.

use std::marker::PhantomData;

use timely::container::ContainerBuilder;

pub mod stack;

/// A no-op [`ContainerBuilder`] that never emits a container.
///
/// Useful for outputs that require a `ContainerBuilder` type parameter but only ever push whole
/// containers (e.g. through a `give_container`-style API), never individual elements. The builder
/// holds no state and returns `None` from both `extract` and `finish`; there is no [`PushInto`]
/// impl, so element-wise pushes are a compile error.
///
/// [`PushInto`]: timely::container::PushInto
#[derive(Debug, Copy, Clone)]
pub struct NoopContainerBuilder<C>(PhantomData<C>);

impl<C> Default for NoopContainerBuilder<C> {
fn default() -> Self {
Self(PhantomData)
}
}

impl<C> ContainerBuilder for NoopContainerBuilder<C> {
type Container = C;

#[inline]
fn extract(&mut self) -> Option<&mut Self::Container> {
None
}

#[inline]
fn finish(&mut self) -> Option<&mut Self::Container> {
None
}
}
23 changes: 11 additions & 12 deletions src/txn-wal/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,17 @@ use mz_timely_util::builder_async::{
AsyncInputHandle, Event as AsyncEvent, InputConnection,
OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
};
use mz_timely_util::containers::NoopContainerBuilder;
use timely::container::CapacityContainerBuilder;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::capture::Event;
use timely::dataflow::operators::vec::{Broadcast, Map};
use timely::dataflow::operators::{Capture, Leave, Probe};
use timely::dataflow::{ProbeHandle, Scope, StreamVec};
use timely::dataflow::{ProbeHandle, Scope, Stream, StreamVec};
use timely::order::TotalOrder;
use timely::progress::{Antichain, Timestamp};
use timely::worker::Worker;
use timely::{PartialOrder, WorkerConfig};
use timely::{Container, PartialOrder, WorkerConfig};
use tracing::debug;

use crate::TxnsCodecDefault;
Expand Down Expand Up @@ -93,7 +94,7 @@ use crate::txn_read::{DataRemapEntry, TxnsRead};
/// on data from its input until the input has progressed to 10, at which
/// point it can itself downgrade to 10.
pub fn txns_progress<'scope, K, V, T, D, P, C, F>(
passthrough: StreamVec<'scope, T, P>,
passthrough: Stream<'scope, T, P>,
name: &str,
ctx: &TxnsContext,
client_fn: impl Fn() -> F,
Expand All @@ -103,13 +104,13 @@ pub fn txns_progress<'scope, K, V, T, D, P, C, F>(
until: Antichain<T>,
data_key_schema: Arc<K::Schema>,
data_val_schema: Arc<V::Schema>,
) -> (StreamVec<'scope, T, P>, Vec<PressOnDropButton>)
) -> (Stream<'scope, T, P>, Vec<PressOnDropButton>)
where
K: Debug + Codec + Send + Sync,
V: Debug + Codec + Send + Sync,
T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
D: Debug + Clone + 'static + Monoid + Ord + Codec64 + Send + Sync,
P: Debug + Clone + 'static,
P: Container,
C: TxnsCodec + 'static,
F: Future<Output = PersistClient> + Send + 'static,
{
Expand Down Expand Up @@ -170,7 +171,7 @@ where
V: Debug + Codec + Send + Sync,
T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
D: Debug + Clone + 'static + Monoid + Ord + Codec64 + Send + Sync,
P: Debug + Clone + 'static,
P: Container,
C: TxnsCodec + 'static,
{
let worker_idx = scope.index();
Expand Down Expand Up @@ -230,18 +231,18 @@ where

fn txns_progress_frontiers<'scope, K, V, T, D, P, C>(
remap: StreamVec<'scope, T, DataRemapEntry<T>>,
passthrough: StreamVec<'scope, T, P>,
passthrough: Stream<'scope, T, P>,
name: &str,
data_id: ShardId,
until: Antichain<T>,
unique_id: u64,
) -> (StreamVec<'scope, T, P>, PressOnDropButton)
) -> (Stream<'scope, T, P>, PressOnDropButton)
where
K: Debug + Codec,
V: Debug + Codec,
T: Timestamp + Lattice + TotalOrder + StepForward + Codec64,
D: Clone + 'static + Monoid + Codec64 + Send + Sync,
P: Debug + Clone + 'static,
P: Container,
C: TxnsCodec,
{
let name = format!("txns_progress_frontiers({})", name);
Expand All @@ -254,8 +255,7 @@ where
passthrough.scope().peers(),
data_id.to_string(),
);
let (passthrough_output, passthrough_stream) =
builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
let (passthrough_output, passthrough_stream) = builder.new_output::<NoopContainerBuilder<_>>();
let mut remap_input = builder.new_disconnected_input(remap, Pipeline);
let mut passthrough_input = builder.new_disconnected_input(passthrough, Pipeline);

Expand Down Expand Up @@ -312,7 +312,6 @@ where
// `shard_source` (before this operator) and
// `mfp_and_decode` (after this operator) do the necessary
// filtering.
debug!("{} emitting data {:?}", name, data);
passthrough_output.give_container(&cap, &mut data);
}
AsyncEvent::Progress(new_progress) => {
Expand Down
Loading
Loading