Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ impl PhysicalExtensionCodec for AdapterPreservingCodec {
buf: &[u8],
inputs: &[Arc<dyn ExecutionPlan>],
_ctx: &TaskContext,
_proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<Arc<dyn ExecutionPlan>> {
// Try to parse as our extension payload
if let Ok(payload) = serde_json::from_slice::<ExtensionPayload>(buf)
Expand Down Expand Up @@ -302,6 +303,7 @@ impl PhysicalExtensionCodec for AdapterPreservingCodec {
&self,
_node: Arc<dyn ExecutionPlan>,
_buf: &mut Vec<u8>,
_proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<()> {
// We don't need this for the example - we use serialize_physical_plan instead
not_impl_err!(
Expand Down
17 changes: 15 additions & 2 deletions datafusion-examples/examples/proto/composed_extension_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use datafusion::physical_plan::{DisplayAs, ExecutionPlan};
use datafusion::prelude::SessionContext;
use datafusion_proto::physical_plan::{
AsExecutionPlan, ComposedPhysicalExtensionCodec, PhysicalExtensionCodec,
PhysicalProtoConverterExtension,
};
use datafusion_proto::protobuf;

Expand Down Expand Up @@ -140,6 +141,7 @@ impl PhysicalExtensionCodec for ParentPhysicalExtensionCodec {
buf: &[u8],
inputs: &[Arc<dyn ExecutionPlan>],
_ctx: &TaskContext,
_proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<Arc<dyn ExecutionPlan>> {
if buf == "ParentExec".as_bytes() {
Ok(Arc::new(ParentExec {
Expand All @@ -150,7 +152,12 @@ impl PhysicalExtensionCodec for ParentPhysicalExtensionCodec {
}
}

fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
fn try_encode(
&self,
node: Arc<dyn ExecutionPlan>,
buf: &mut Vec<u8>,
_proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<()> {
if node.as_any().downcast_ref::<ParentExec>().is_some() {
buf.extend_from_slice("ParentExec".as_bytes());
Ok(())
Expand Down Expand Up @@ -216,6 +223,7 @@ impl PhysicalExtensionCodec for ChildPhysicalExtensionCodec {
buf: &[u8],
_inputs: &[Arc<dyn ExecutionPlan>],
_ctx: &TaskContext,
_proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<Arc<dyn ExecutionPlan>> {
if buf == "ChildExec".as_bytes() {
Ok(Arc::new(ChildExec {}))
Expand All @@ -224,7 +232,12 @@ impl PhysicalExtensionCodec for ChildPhysicalExtensionCodec {
}
}

fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
fn try_encode(
&self,
node: Arc<dyn ExecutionPlan>,
buf: &mut Vec<u8>,
_proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<()> {
if node.as_any().downcast_ref::<ChildExec>().is_some() {
buf.extend_from_slice("ChildExec".as_bytes());
Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ impl PhysicalExtensionCodec for CachingCodec {
_buf: &[u8],
_inputs: &[Arc<dyn ExecutionPlan>],
_ctx: &TaskContext,
_proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<Arc<dyn ExecutionPlan>> {
datafusion::common::not_impl_err!("No custom extension nodes")
}
Expand All @@ -196,6 +197,7 @@ impl PhysicalExtensionCodec for CachingCodec {
&self,
_node: Arc<dyn ExecutionPlan>,
_buf: &mut Vec<u8>,
_proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<()> {
datafusion::common::not_impl_err!("No custom extension nodes")
}
Expand Down
42 changes: 34 additions & 8 deletions datafusion/ffi/src/proto/physical_extension_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,16 @@ unsafe extern "C" fn try_decode_fn_wrapper(
.collect::<Result<Vec<_>>>();
let inputs = rresult_return!(inputs);

let plan =
rresult_return!(codec.try_decode(buf.as_ref(), &inputs, task_ctx.as_ref()));
// FFI boundary doesn't propagate the active PhysicalProtoConverterExtension,
// so dedup of nested expressions stops here (matches pre-#21807 behavior).
let proto_converter =
datafusion_proto::physical_plan::DefaultPhysicalProtoConverter {};
let plan = rresult_return!(codec.try_decode(
buf.as_ref(),
&inputs,
task_ctx.as_ref(),
&proto_converter,
));

RResult::ROk(FFI_ExecutionPlan::new(plan, None))
}
Expand All @@ -156,7 +164,11 @@ unsafe extern "C" fn try_encode_fn_wrapper(
let plan: Arc<dyn ExecutionPlan> = rresult_return!((&node).try_into());

let mut bytes = Vec::new();
rresult_return!(codec.try_encode(plan, &mut bytes));
// FFI boundary doesn't propagate the active PhysicalProtoConverterExtension,
// so dedup of nested expressions stops here (matches pre-#21807 behavior).
let proto_converter =
datafusion_proto::physical_plan::DefaultPhysicalProtoConverter {};
rresult_return!(codec.try_encode(plan, &mut bytes, &proto_converter));

RResult::ROk(bytes.into())
}
Expand Down Expand Up @@ -327,6 +339,7 @@ impl PhysicalExtensionCodec for ForeignPhysicalExtensionCodec {
buf: &[u8],
inputs: &[Arc<dyn ExecutionPlan>],
_ctx: &TaskContext,
_proto_converter: &dyn datafusion_proto::physical_plan::PhysicalProtoConverterExtension,
) -> Result<Arc<dyn ExecutionPlan>> {
let inputs = inputs
.iter()
Expand All @@ -340,7 +353,12 @@ impl PhysicalExtensionCodec for ForeignPhysicalExtensionCodec {
Ok(plan)
}

fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
fn try_encode(
&self,
node: Arc<dyn ExecutionPlan>,
buf: &mut Vec<u8>,
_proto_converter: &dyn datafusion_proto::physical_plan::PhysicalProtoConverterExtension,
) -> Result<()> {
let plan = FFI_ExecutionPlan::new(node, None);
let bytes = df_result!(unsafe { (self.0.try_encode)(&self.0, plan) })?;

Expand Down Expand Up @@ -441,6 +459,7 @@ pub(crate) mod tests {
buf: &[u8],
_inputs: &[Arc<dyn ExecutionPlan>],
_ctx: &TaskContext,
_proto_converter: &dyn datafusion_proto::physical_plan::PhysicalProtoConverterExtension,
) -> Result<Arc<dyn ExecutionPlan>> {
if buf[0] != Self::MAGIC_NUMBER {
return exec_err!(
Expand All @@ -459,6 +478,7 @@ pub(crate) mod tests {
&self,
node: Arc<dyn ExecutionPlan>,
buf: &mut Vec<u8>,
_proto_converter: &dyn datafusion_proto::physical_plan::PhysicalProtoConverterExtension,
) -> Result<()> {
buf.push(Self::MAGIC_NUMBER);

Expand Down Expand Up @@ -579,10 +599,16 @@ pub(crate) mod tests {
let exec = create_test_exec();
let input_execs = [create_test_exec()];
let mut bytes = Vec::new();
foreign_codec.try_encode(Arc::clone(&exec), &mut bytes)?;

let returned_exec =
foreign_codec.try_decode(&bytes, &input_execs, ctx.task_ctx().as_ref())?;
let proto_converter =
datafusion_proto::physical_plan::DefaultPhysicalProtoConverter {};
foreign_codec.try_encode(Arc::clone(&exec), &mut bytes, &proto_converter)?;

let returned_exec = foreign_codec.try_decode(
&bytes,
&input_execs,
ctx.task_ctx().as_ref(),
&proto_converter,
)?;

assert!(returned_exec.as_any().is::<EmptyExec>());

Expand Down
18 changes: 18 additions & 0 deletions datafusion/physical-expr-common/src/physical_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,24 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
fn placement(&self) -> ExpressionPlacement {
ExpressionPlacement::KeepInPlace
}

/// Stable, globally-unique identifier for this [`PhysicalExpr`], if any.
///
/// Connected expressions (e.g. `DynamicFilterPhysicalExpr`s where two
/// outer Arc instances share the same mutable inner state via
/// [`PhysicalExpr::with_new_children`]) must report the same id. Proto
/// (de)serialization uses this to dedupe across the wire so the
/// reconstructed plan keeps `Arc<Inner>` shared.
///
/// Default is `None`: no identity worth preserving across a
/// serialization boundary.
///
/// Ported from upstream apache/datafusion#21807 to make
/// `DynamicFilterPhysicalExpr` dedup robust to pushdown rewriting on
/// branch-53.
fn expression_id(&self) -> Option<u64> {
None
}
}

#[deprecated(
Expand Down
136 changes: 126 additions & 10 deletions datafusion/physical-expr/src/expressions/dynamic_filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use parking_lot::RwLock;
use std::sync::atomic::{AtomicU64, Ordering};
use std::{any::Any, fmt::Display, hash::Hash, sync::Arc};
use tokio::sync::watch;

Expand Down Expand Up @@ -76,21 +77,50 @@ pub struct DynamicFilterPhysicalExpr {
nullable: Arc<RwLock<Option<bool>>>,
}

#[derive(Debug)]
struct Inner {
/// A counter that gets incremented every time the expression is updated so that we can track changes cheaply.
/// This is used for [`PhysicalExpr::snapshot_generation`] to have a cheap check for changes.
generation: u64,
expr: Arc<dyn PhysicalExpr>,
/// Flag for quick synchronous check if filter is complete.
/// This is redundant with the watch channel state, but allows us to return immediately
/// from `wait_complete()` without subscribing if already complete.
is_complete: bool,
/// Atomic internal state of a [`DynamicFilterPhysicalExpr`].
///
/// `expression_id` identifies the actual filter expression. Derived
/// `DynamicFilterPhysicalExpr`s (e.g. via [`PhysicalExpr::with_new_children`])
/// are the same logical filter and must report the same id. Proto
/// (de)serialization uses this id to dedupe references on the wire so the
/// reconstructed plan keeps `Arc<Inner>` shared between e.g. SortExec.filter
/// and the FileScan predicate it was pushed down to.
///
/// **Warning:** exposed publicly only so that `datafusion-proto` can read
/// and rebuild this state. Not a stable API.
///
/// Ported from upstream apache/datafusion#21807 (minimal subset) so
/// branch-53 can drop the atlas-side post-deserialize walker.
#[derive(Clone)]
pub struct Inner {
/// Stable id, preserved across `update()` and `with_new_children`.
pub expression_id: u64,
/// Incremented on every `update()` so that [`PhysicalExpr::snapshot_generation`]
/// is a cheap "did anything change?" probe.
pub generation: u64,
pub expr: Arc<dyn PhysicalExpr>,
/// Synchronous check for completion; redundant with the watch channel
/// but lets `wait_complete()` return without subscribing.
pub is_complete: bool,
}

// Exclude `expression_id` from Debug so existing roundtrip Debug comparisons
// in tests still pass even when only some plan-node decoders carry the
// dynamic filter through proto.
impl std::fmt::Debug for Inner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Inner")
.field("generation", &self.generation)
.field("expr", &self.expr)
.field("is_complete", &self.is_complete)
.finish()
}
}

impl Inner {
fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
Self {
expression_id: EXPR_ID_SOURCE.next(),
// Start with generation 1 which gives us a different result for [`PhysicalExpr::generation`] than the default 0.
// This is not currently used anywhere but it seems useful to have this simple distinction.
generation: 1,
Expand All @@ -105,6 +135,26 @@ impl Inner {
}
}

/// An atomic counter used to generate monotonic u64 ids for
/// `DynamicFilterPhysicalExpr` instances.
struct ExpressionIdAtomicCounter {
inner: AtomicU64,
}

impl ExpressionIdAtomicCounter {
const fn new() -> Self {
Self {
inner: AtomicU64::new(0),
}
}

fn next(&self) -> u64 {
self.inner.fetch_add(1, Ordering::Relaxed)
}
}

static EXPR_ID_SOURCE: ExpressionIdAtomicCounter = ExpressionIdAtomicCounter::new();
Comment thread
zhuqi-lucas marked this conversation as resolved.

impl Hash for DynamicFilterPhysicalExpr {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
// Use pointer identity of the inner Arc for stable hashing.
Expand Down Expand Up @@ -243,6 +293,10 @@ impl DynamicFilterPhysicalExpr {
let mut current = self.inner.write();
let new_generation = current.generation + 1;
*current = Inner {
// Preserve the expression id across updates so all references
// (e.g. SortExec.filter + a pushed-down FileScan predicate)
// continue to identify the same logical filter.
expression_id: current.expression_id,
generation: new_generation,
expr: new_expr,
is_complete: current.is_complete,
Expand Down Expand Up @@ -346,6 +400,64 @@ impl DynamicFilterPhysicalExpr {

write!(f, " ]")
}

/// Return the filter's original children (before any remapping).
///
/// **Warning:** intended only for `datafusion-proto` (de)serialization.
/// Not a stable API.
pub fn original_children(&self) -> &[Arc<dyn PhysicalExpr>] {
&self.children
}

/// Return the filter's remapped children, if any have been set via
/// [`PhysicalExpr::with_new_children`].
///
/// **Warning:** intended only for `datafusion-proto` (de)serialization.
/// Not a stable API.
pub fn remapped_children(&self) -> Option<&[Arc<dyn PhysicalExpr>]> {
self.remapped_children.as_deref()
}

/// Clone the atomically-captured `Inner` state.
///
/// **Warning:** intended only for `datafusion-proto` (de)serialization.
/// Not a stable API.
pub fn inner(&self) -> Inner {
self.inner.read().clone()
}

/// Rebuild a `DynamicFilterPhysicalExpr` from its stored parts. Used by
/// proto deserialization to reconstruct the filter; when paired with the
/// existing `DeduplicatingDeserializer` it restores shared `Arc<Inner>`
/// identity across all references in the decoded plan.
///
/// **Warning:** intended only for `datafusion-proto` (de)serialization.
/// Not a stable API.
pub fn from_parts(
children: Vec<Arc<dyn PhysicalExpr>>,
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
inner: Inner,
) -> Self {
let state = if inner.is_complete {
FilterState::Complete {
generation: inner.generation,
}
} else {
FilterState::InProgress {
generation: inner.generation,
}
};
let (state_watch, _) = watch::channel(state);

Self {
children,
remapped_children,
inner: Arc::new(RwLock::new(inner)),
state_watch,
data_type: Arc::new(RwLock::new(None)),
nullable: Arc::new(RwLock::new(None)),
}
}
}

impl PhysicalExpr for DynamicFilterPhysicalExpr {
Expand Down Expand Up @@ -448,6 +560,10 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
// Return the current generation of the expression.
self.inner.read().generation
}

fn expression_id(&self) -> Option<u64> {
Some(self.inner.read().expression_id)
}
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub use cast::{CastExpr, cast};
pub use cast_column::CastColumnExpr;
pub use column::{Column, col, with_new_schema};
pub use datafusion_expr::utils::format_state_name;
pub use dynamic_filters::DynamicFilterPhysicalExpr;
pub use dynamic_filters::{DynamicFilterPhysicalExpr, Inner as DynamicFilterInner};
pub use in_list::{InListExpr, in_list};
pub use is_not_null::{IsNotNullExpr, is_not_null};
pub use is_null::{IsNullExpr, is_null};
Expand Down
Loading
Loading