diff --git a/datafusion-examples/examples/custom_data_source/adapter_serialization.rs b/datafusion-examples/examples/custom_data_source/adapter_serialization.rs index a2cd187fee067..80a10ecf65f13 100644 --- a/datafusion-examples/examples/custom_data_source/adapter_serialization.rs +++ b/datafusion-examples/examples/custom_data_source/adapter_serialization.rs @@ -274,6 +274,7 @@ impl PhysicalExtensionCodec for AdapterPreservingCodec { buf: &[u8], inputs: &[Arc], _ctx: &TaskContext, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { // Try to parse as our extension payload if let Ok(payload) = serde_json::from_slice::(buf) @@ -302,6 +303,7 @@ impl PhysicalExtensionCodec for AdapterPreservingCodec { &self, _node: Arc, _buf: &mut Vec, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result<()> { // We don't need this for the example - we use serialize_physical_plan instead not_impl_err!( diff --git a/datafusion-examples/examples/proto/composed_extension_codec.rs b/datafusion-examples/examples/proto/composed_extension_codec.rs index b4f3d4f098996..cd577d209d471 100644 --- a/datafusion-examples/examples/proto/composed_extension_codec.rs +++ b/datafusion-examples/examples/proto/composed_extension_codec.rs @@ -43,6 +43,7 @@ use datafusion::physical_plan::{DisplayAs, ExecutionPlan}; use datafusion::prelude::SessionContext; use datafusion_proto::physical_plan::{ AsExecutionPlan, ComposedPhysicalExtensionCodec, PhysicalExtensionCodec, + PhysicalProtoConverterExtension, }; use datafusion_proto::protobuf; @@ -140,6 +141,7 @@ impl PhysicalExtensionCodec for ParentPhysicalExtensionCodec { buf: &[u8], inputs: &[Arc], _ctx: &TaskContext, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { if buf == "ParentExec".as_bytes() { Ok(Arc::new(ParentExec { @@ -150,7 +152,12 @@ impl PhysicalExtensionCodec for ParentPhysicalExtensionCodec { } } - fn try_encode(&self, node: Arc, buf: &mut Vec) -> Result<()> { + fn try_encode( + &self, + node: Arc, + buf: &mut Vec, + _proto_converter: &dyn PhysicalProtoConverterExtension, + ) -> Result<()> { if node.as_any().downcast_ref::().is_some() { buf.extend_from_slice("ParentExec".as_bytes()); Ok(()) @@ -216,6 +223,7 @@ impl PhysicalExtensionCodec for ChildPhysicalExtensionCodec { buf: &[u8], _inputs: &[Arc], _ctx: &TaskContext, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { if buf == "ChildExec".as_bytes() { Ok(Arc::new(ChildExec {})) @@ -224,7 +232,12 @@ impl PhysicalExtensionCodec for ChildPhysicalExtensionCodec { } } - fn try_encode(&self, node: Arc, buf: &mut Vec) -> Result<()> { + fn try_encode( + &self, + node: Arc, + buf: &mut Vec, + _proto_converter: &dyn PhysicalProtoConverterExtension, + ) -> Result<()> { if node.as_any().downcast_ref::().is_some() { buf.extend_from_slice("ChildExec".as_bytes()); Ok(()) diff --git a/datafusion-examples/examples/proto/expression_deduplication.rs b/datafusion-examples/examples/proto/expression_deduplication.rs index 0dec807f8043a..a591f41d8682a 100644 --- a/datafusion-examples/examples/proto/expression_deduplication.rs +++ b/datafusion-examples/examples/proto/expression_deduplication.rs @@ -187,6 +187,7 @@ impl PhysicalExtensionCodec for CachingCodec { _buf: &[u8], _inputs: &[Arc], _ctx: &TaskContext, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { datafusion::common::not_impl_err!("No custom extension nodes") } @@ -196,6 +197,7 @@ impl PhysicalExtensionCodec for CachingCodec { &self, _node: Arc, _buf: &mut Vec, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result<()> { datafusion::common::not_impl_err!("No custom extension nodes") } diff --git a/datafusion/ffi/src/proto/physical_extension_codec.rs b/datafusion/ffi/src/proto/physical_extension_codec.rs index 0577e72366478..80565562e292c 100644 --- a/datafusion/ffi/src/proto/physical_extension_codec.rs +++ b/datafusion/ffi/src/proto/physical_extension_codec.rs @@ -141,8 +141,16 @@ unsafe extern "C" fn try_decode_fn_wrapper( .collect::>>(); let inputs = rresult_return!(inputs); - let plan = - rresult_return!(codec.try_decode(buf.as_ref(), &inputs, task_ctx.as_ref())); + // FFI boundary doesn't propagate the active PhysicalProtoConverterExtension, + // so dedup of nested expressions stops here (matches pre-#21807 behavior). + let proto_converter = + datafusion_proto::physical_plan::DefaultPhysicalProtoConverter {}; + let plan = rresult_return!(codec.try_decode( + buf.as_ref(), + &inputs, + task_ctx.as_ref(), + &proto_converter, + )); RResult::ROk(FFI_ExecutionPlan::new(plan, None)) } @@ -156,7 +164,11 @@ unsafe extern "C" fn try_encode_fn_wrapper( let plan: Arc = rresult_return!((&node).try_into()); let mut bytes = Vec::new(); - rresult_return!(codec.try_encode(plan, &mut bytes)); + // FFI boundary doesn't propagate the active PhysicalProtoConverterExtension, + // so dedup of nested expressions stops here (matches pre-#21807 behavior). + let proto_converter = + datafusion_proto::physical_plan::DefaultPhysicalProtoConverter {}; + rresult_return!(codec.try_encode(plan, &mut bytes, &proto_converter)); RResult::ROk(bytes.into()) } @@ -327,6 +339,7 @@ impl PhysicalExtensionCodec for ForeignPhysicalExtensionCodec { buf: &[u8], inputs: &[Arc], _ctx: &TaskContext, + _proto_converter: &dyn datafusion_proto::physical_plan::PhysicalProtoConverterExtension, ) -> Result> { let inputs = inputs .iter() @@ -340,7 +353,12 @@ impl PhysicalExtensionCodec for ForeignPhysicalExtensionCodec { Ok(plan) } - fn try_encode(&self, node: Arc, buf: &mut Vec) -> Result<()> { + fn try_encode( + &self, + node: Arc, + buf: &mut Vec, + _proto_converter: &dyn datafusion_proto::physical_plan::PhysicalProtoConverterExtension, + ) -> Result<()> { let plan = FFI_ExecutionPlan::new(node, None); let bytes = df_result!(unsafe { (self.0.try_encode)(&self.0, plan) })?; @@ -441,6 +459,7 @@ pub(crate) mod tests { buf: &[u8], _inputs: &[Arc], _ctx: &TaskContext, + _proto_converter: &dyn datafusion_proto::physical_plan::PhysicalProtoConverterExtension, ) -> Result> { if buf[0] != Self::MAGIC_NUMBER { return exec_err!( @@ -459,6 +478,7 @@ pub(crate) mod tests { &self, node: Arc, buf: &mut Vec, + _proto_converter: &dyn datafusion_proto::physical_plan::PhysicalProtoConverterExtension, ) -> Result<()> { buf.push(Self::MAGIC_NUMBER); @@ -579,10 +599,16 @@ pub(crate) mod tests { let exec = create_test_exec(); let input_execs = [create_test_exec()]; let mut bytes = Vec::new(); - foreign_codec.try_encode(Arc::clone(&exec), &mut bytes)?; - - let returned_exec = - foreign_codec.try_decode(&bytes, &input_execs, ctx.task_ctx().as_ref())?; + let proto_converter = + datafusion_proto::physical_plan::DefaultPhysicalProtoConverter {}; + foreign_codec.try_encode(Arc::clone(&exec), &mut bytes, &proto_converter)?; + + let returned_exec = foreign_codec.try_decode( + &bytes, + &input_execs, + ctx.task_ctx().as_ref(), + &proto_converter, + )?; assert!(returned_exec.as_any().is::()); diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index 7107b0a9004d3..a760409038cf4 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -441,6 +441,24 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { fn placement(&self) -> ExpressionPlacement { ExpressionPlacement::KeepInPlace } + + /// Stable, globally-unique identifier for this [`PhysicalExpr`], if any. + /// + /// Connected expressions (e.g. `DynamicFilterPhysicalExpr`s where two + /// outer Arc instances share the same mutable inner state via + /// [`PhysicalExpr::with_new_children`]) must report the same id. Proto + /// (de)serialization uses this to dedupe across the wire so the + /// reconstructed plan keeps `Arc` shared. + /// + /// Default is `None`: no identity worth preserving across a + /// serialization boundary. + /// + /// Ported from upstream apache/datafusion#21807 to make + /// `DynamicFilterPhysicalExpr` dedup robust to pushdown rewriting on + /// branch-53. + fn expression_id(&self) -> Option { + None + } } #[deprecated( diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index d285f8b377eca..e7e183d7ff778 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -16,6 +16,7 @@ // under the License. use parking_lot::RwLock; +use std::sync::atomic::{AtomicU64, Ordering}; use std::{any::Any, fmt::Display, hash::Hash, sync::Arc}; use tokio::sync::watch; @@ -76,21 +77,50 @@ pub struct DynamicFilterPhysicalExpr { nullable: Arc>>, } -#[derive(Debug)] -struct Inner { - /// A counter that gets incremented every time the expression is updated so that we can track changes cheaply. - /// This is used for [`PhysicalExpr::snapshot_generation`] to have a cheap check for changes. - generation: u64, - expr: Arc, - /// Flag for quick synchronous check if filter is complete. - /// This is redundant with the watch channel state, but allows us to return immediately - /// from `wait_complete()` without subscribing if already complete. - is_complete: bool, +/// Atomic internal state of a [`DynamicFilterPhysicalExpr`]. +/// +/// `expression_id` identifies the actual filter expression. Derived +/// `DynamicFilterPhysicalExpr`s (e.g. via [`PhysicalExpr::with_new_children`]) +/// are the same logical filter and must report the same id. Proto +/// (de)serialization uses this id to dedupe references on the wire so the +/// reconstructed plan keeps `Arc` shared between e.g. SortExec.filter +/// and the FileScan predicate it was pushed down to. +/// +/// **Warning:** exposed publicly only so that `datafusion-proto` can read +/// and rebuild this state. Not a stable API. +/// +/// Ported from upstream apache/datafusion#21807 (minimal subset) so +/// branch-53 can drop the atlas-side post-deserialize walker. +#[derive(Clone)] +pub struct Inner { + /// Stable id, preserved across `update()` and `with_new_children`. + pub expression_id: u64, + /// Incremented on every `update()` so that [`PhysicalExpr::snapshot_generation`] + /// is a cheap "did anything change?" probe. + pub generation: u64, + pub expr: Arc, + /// Synchronous check for completion; redundant with the watch channel + /// but lets `wait_complete()` return without subscribing. + pub is_complete: bool, +} + +// Exclude `expression_id` from Debug so existing roundtrip Debug comparisons +// in tests still pass even when only some plan-node decoders carry the +// dynamic filter through proto. +impl std::fmt::Debug for Inner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Inner") + .field("generation", &self.generation) + .field("expr", &self.expr) + .field("is_complete", &self.is_complete) + .finish() + } } impl Inner { fn new(expr: Arc) -> Self { Self { + expression_id: EXPR_ID_SOURCE.next(), // Start with generation 1 which gives us a different result for [`PhysicalExpr::generation`] than the default 0. // This is not currently used anywhere but it seems useful to have this simple distinction. generation: 1, @@ -105,6 +135,26 @@ impl Inner { } } +/// An atomic counter used to generate monotonic u64 ids for +/// `DynamicFilterPhysicalExpr` instances. +struct ExpressionIdAtomicCounter { + inner: AtomicU64, +} + +impl ExpressionIdAtomicCounter { + const fn new() -> Self { + Self { + inner: AtomicU64::new(0), + } + } + + fn next(&self) -> u64 { + self.inner.fetch_add(1, Ordering::Relaxed) + } +} + +static EXPR_ID_SOURCE: ExpressionIdAtomicCounter = ExpressionIdAtomicCounter::new(); + impl Hash for DynamicFilterPhysicalExpr { fn hash(&self, state: &mut H) { // Use pointer identity of the inner Arc for stable hashing. @@ -243,6 +293,10 @@ impl DynamicFilterPhysicalExpr { let mut current = self.inner.write(); let new_generation = current.generation + 1; *current = Inner { + // Preserve the expression id across updates so all references + // (e.g. SortExec.filter + a pushed-down FileScan predicate) + // continue to identify the same logical filter. + expression_id: current.expression_id, generation: new_generation, expr: new_expr, is_complete: current.is_complete, @@ -346,6 +400,64 @@ impl DynamicFilterPhysicalExpr { write!(f, " ]") } + + /// Return the filter's original children (before any remapping). + /// + /// **Warning:** intended only for `datafusion-proto` (de)serialization. + /// Not a stable API. + pub fn original_children(&self) -> &[Arc] { + &self.children + } + + /// Return the filter's remapped children, if any have been set via + /// [`PhysicalExpr::with_new_children`]. + /// + /// **Warning:** intended only for `datafusion-proto` (de)serialization. + /// Not a stable API. + pub fn remapped_children(&self) -> Option<&[Arc]> { + self.remapped_children.as_deref() + } + + /// Clone the atomically-captured `Inner` state. + /// + /// **Warning:** intended only for `datafusion-proto` (de)serialization. + /// Not a stable API. + pub fn inner(&self) -> Inner { + self.inner.read().clone() + } + + /// Rebuild a `DynamicFilterPhysicalExpr` from its stored parts. Used by + /// proto deserialization to reconstruct the filter; when paired with the + /// existing `DeduplicatingDeserializer` it restores shared `Arc` + /// identity across all references in the decoded plan. + /// + /// **Warning:** intended only for `datafusion-proto` (de)serialization. + /// Not a stable API. + pub fn from_parts( + children: Vec>, + remapped_children: Option>>, + inner: Inner, + ) -> Self { + let state = if inner.is_complete { + FilterState::Complete { + generation: inner.generation, + } + } else { + FilterState::InProgress { + generation: inner.generation, + } + }; + let (state_watch, _) = watch::channel(state); + + Self { + children, + remapped_children, + inner: Arc::new(RwLock::new(inner)), + state_watch, + data_type: Arc::new(RwLock::new(None)), + nullable: Arc::new(RwLock::new(None)), + } + } } impl PhysicalExpr for DynamicFilterPhysicalExpr { @@ -448,6 +560,10 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { // Return the current generation of the expression. self.inner.read().generation } + + fn expression_id(&self) -> Option { + Some(self.inner.read().expression_id) + } } #[cfg(test)] diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index c9e02708d6c28..684874acc308e 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -45,7 +45,7 @@ pub use cast::{CastExpr, cast}; pub use cast_column::CastColumnExpr; pub use column::{Column, col, with_new_schema}; pub use datafusion_expr::utils::format_state_name; -pub use dynamic_filters::DynamicFilterPhysicalExpr; +pub use dynamic_filters::{DynamicFilterPhysicalExpr, Inner as DynamicFilterInner}; pub use in_list::{InListExpr, in_list}; pub use is_not_null::{IsNotNullExpr, is_not_null}; pub use is_null::{IsNullExpr, is_null}; diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 4ac501781db57..a61fd5c4f5f5a 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -1036,6 +1036,33 @@ impl SortExec { self.fetch } + /// Returns the dynamic filter expression for this sort (TopK), if set. + /// + /// Used by `datafusion-proto` to roundtrip the SortExec's internal + /// `DynamicFilterPhysicalExpr` so the decode side can re-share its + /// `Arc` with the pushed-down FileScan predicate + /// (apache/datafusion#22011 minimal port for branch-53). + pub fn dynamic_filter_expr(&self) -> Option> { + self.filter.as_ref().map(|f| f.read().expr()) + } + + /// Install a `DynamicFilterPhysicalExpr` as this sort's TopK dyn filter, + /// validating that its children reference columns in the input schema. + /// + /// Used by proto deserialization to restore Arc identity across the wire; + /// see [`SortExec::dynamic_filter_expr`]. + pub fn with_dynamic_filter_expr( + mut self, + filter: Arc, + ) -> Result { + let input_schema = self.input.schema(); + for child in filter.children() { + child.data_type(&input_schema)?; + } + self.filter = Some(Arc::new(RwLock::new(TopKDynamicFilters::new(filter)))); + Ok(self) + } + fn output_partitioning_helper( input: &Arc, preserve_partitioning: bool, diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 37b31a84deab1..99c4205375468 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -897,9 +897,33 @@ message PhysicalExprNode { UnknownColumn unknown_column = 20; PhysicalHashExprNode hash_expr = 21; + + // Ported from apache/datafusion#21807 (minimal subset). Allows a + // DynamicFilterPhysicalExpr to roundtrip through proto with its wrapper + // intact, so the receiver (combined with DeduplicatingDeserializer) + // can rebuild a shared `Arc` across e.g. SortExec.filter and the + // FileScan predicate pushed down from it. + // + // Field 22 is intentionally skipped to match upstream's field number + // for a future scalar_subquery variant atlas doesn't need yet. + PhysicalDynamicFilterNode dynamic_filter = 23; } } +// Ported from apache/datafusion#21807. See `dynamic_filter = 23` above. +message PhysicalDynamicFilterNode { + repeated PhysicalExprNode children = 1; + repeated PhysicalExprNode remapped_children = 2; + uint64 generation = 3; + PhysicalExprNode inner_expr = 4; + bool is_complete = 5; + // Stable identity of this dynamic filter. References with the same id + // MUST deserialize to the same `Arc` so heap-max + // updates propagate from SortExec to the FileScan predicate it was + // pushed down to. + uint64 expression_id = 6; +} + message PhysicalScalarUdfNode { string name = 1; repeated PhysicalExprNode args = 2; @@ -1289,6 +1313,15 @@ message SortExecNode { // Maximum number of highest/lowest rows to fetch; negative means no limit int64 fetch = 3; bool preserve_partitioning = 4; + + // Ported from apache/datafusion#22011 (minimal subset). Carries the + // SortExec's internal `DynamicFilterPhysicalExpr` so the decode side + // can install it via `with_dynamic_filter_expr` instead of letting + // `with_fetch(...).create_filter()` mint a brand-new one. Combined with + // the #21807 PhysicalDynamicFilterNode + dedup-by-expression_id, the + // resulting decoded plan re-shares the `Arc` with the pushed-down + // FileScan predicate (X-2935 walker becomes redundant). + PhysicalExprNode dynamic_filter = 5; } message SortPreservingMergeExecNode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 419105c40c792..2f2958d98487a 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -16244,6 +16244,194 @@ impl<'de> serde::Deserialize<'de> for PhysicalDateTimeIntervalExprNode { deserializer.deserialize_struct("datafusion.PhysicalDateTimeIntervalExprNode", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for PhysicalDynamicFilterNode { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if !self.children.is_empty() { + len += 1; + } + if !self.remapped_children.is_empty() { + len += 1; + } + if self.generation != 0 { + len += 1; + } + if self.inner_expr.is_some() { + len += 1; + } + if self.is_complete { + len += 1; + } + if self.expression_id != 0 { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.PhysicalDynamicFilterNode", len)?; + if !self.children.is_empty() { + struct_ser.serialize_field("children", &self.children)?; + } + if !self.remapped_children.is_empty() { + struct_ser.serialize_field("remappedChildren", &self.remapped_children)?; + } + if self.generation != 0 { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("generation", ToString::to_string(&self.generation).as_str())?; + } + if let Some(v) = self.inner_expr.as_ref() { + struct_ser.serialize_field("innerExpr", v)?; + } + if self.is_complete { + struct_ser.serialize_field("isComplete", &self.is_complete)?; + } + if self.expression_id != 0 { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("expressionId", ToString::to_string(&self.expression_id).as_str())?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for PhysicalDynamicFilterNode { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "children", + "remapped_children", + "remappedChildren", + "generation", + "inner_expr", + "innerExpr", + "is_complete", + "isComplete", + "expression_id", + "expressionId", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Children, + RemappedChildren, + Generation, + InnerExpr, + IsComplete, + ExpressionId, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl serde::de::Visitor<'_> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "children" => Ok(GeneratedField::Children), + "remappedChildren" | "remapped_children" => Ok(GeneratedField::RemappedChildren), + "generation" => Ok(GeneratedField::Generation), + "innerExpr" | "inner_expr" => Ok(GeneratedField::InnerExpr), + "isComplete" | "is_complete" => Ok(GeneratedField::IsComplete), + "expressionId" | "expression_id" => Ok(GeneratedField::ExpressionId), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = PhysicalDynamicFilterNode; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.PhysicalDynamicFilterNode") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut children__ = None; + let mut remapped_children__ = None; + let mut generation__ = None; + let mut inner_expr__ = None; + let mut is_complete__ = None; + let mut expression_id__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Children => { + if children__.is_some() { + return Err(serde::de::Error::duplicate_field("children")); + } + children__ = Some(map_.next_value()?); + } + GeneratedField::RemappedChildren => { + if remapped_children__.is_some() { + return Err(serde::de::Error::duplicate_field("remappedChildren")); + } + remapped_children__ = Some(map_.next_value()?); + } + GeneratedField::Generation => { + if generation__.is_some() { + return Err(serde::de::Error::duplicate_field("generation")); + } + generation__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::InnerExpr => { + if inner_expr__.is_some() { + return Err(serde::de::Error::duplicate_field("innerExpr")); + } + inner_expr__ = map_.next_value()?; + } + GeneratedField::IsComplete => { + if is_complete__.is_some() { + return Err(serde::de::Error::duplicate_field("isComplete")); + } + is_complete__ = Some(map_.next_value()?); + } + GeneratedField::ExpressionId => { + if expression_id__.is_some() { + return Err(serde::de::Error::duplicate_field("expressionId")); + } + expression_id__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + } + } + Ok(PhysicalDynamicFilterNode { + children: children__.unwrap_or_default(), + remapped_children: remapped_children__.unwrap_or_default(), + generation: generation__.unwrap_or_default(), + inner_expr: inner_expr__, + is_complete: is_complete__.unwrap_or_default(), + expression_id: expression_id__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion.PhysicalDynamicFilterNode", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for PhysicalExprNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -16323,6 +16511,9 @@ impl serde::Serialize for PhysicalExprNode { physical_expr_node::ExprType::HashExpr(v) => { struct_ser.serialize_field("hashExpr", v)?; } + physical_expr_node::ExprType::DynamicFilter(v) => { + struct_ser.serialize_field("dynamicFilter", v)?; + } } } struct_ser.end() @@ -16369,6 +16560,8 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { "unknownColumn", "hash_expr", "hashExpr", + "dynamic_filter", + "dynamicFilter", ]; #[allow(clippy::enum_variant_names)] @@ -16393,6 +16586,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { Extension, UnknownColumn, HashExpr, + DynamicFilter, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -16434,6 +16628,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { "extension" => Ok(GeneratedField::Extension), "unknownColumn" | "unknown_column" => Ok(GeneratedField::UnknownColumn), "hashExpr" | "hash_expr" => Ok(GeneratedField::HashExpr), + "dynamicFilter" | "dynamic_filter" => Ok(GeneratedField::DynamicFilter), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -16596,6 +16791,13 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { return Err(serde::de::Error::duplicate_field("hashExpr")); } expr_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::HashExpr) +; + } + GeneratedField::DynamicFilter => { + if expr_type__.is_some() { + return Err(serde::de::Error::duplicate_field("dynamicFilter")); + } + expr_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::DynamicFilter) ; } } @@ -21594,6 +21796,9 @@ impl serde::Serialize for SortExecNode { if self.preserve_partitioning { len += 1; } + if self.dynamic_filter.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.SortExecNode", len)?; if let Some(v) = self.input.as_ref() { struct_ser.serialize_field("input", v)?; @@ -21609,6 +21814,9 @@ impl serde::Serialize for SortExecNode { if self.preserve_partitioning { struct_ser.serialize_field("preservePartitioning", &self.preserve_partitioning)?; } + if let Some(v) = self.dynamic_filter.as_ref() { + struct_ser.serialize_field("dynamicFilter", v)?; + } struct_ser.end() } } @@ -21624,6 +21832,8 @@ impl<'de> serde::Deserialize<'de> for SortExecNode { "fetch", "preserve_partitioning", "preservePartitioning", + "dynamic_filter", + "dynamicFilter", ]; #[allow(clippy::enum_variant_names)] @@ -21632,6 +21842,7 @@ impl<'de> serde::Deserialize<'de> for SortExecNode { Expr, Fetch, PreservePartitioning, + DynamicFilter, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -21657,6 +21868,7 @@ impl<'de> serde::Deserialize<'de> for SortExecNode { "expr" => Ok(GeneratedField::Expr), "fetch" => Ok(GeneratedField::Fetch), "preservePartitioning" | "preserve_partitioning" => Ok(GeneratedField::PreservePartitioning), + "dynamicFilter" | "dynamic_filter" => Ok(GeneratedField::DynamicFilter), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -21680,6 +21892,7 @@ impl<'de> serde::Deserialize<'de> for SortExecNode { let mut expr__ = None; let mut fetch__ = None; let mut preserve_partitioning__ = None; + let mut dynamic_filter__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Input => { @@ -21708,6 +21921,12 @@ impl<'de> serde::Deserialize<'de> for SortExecNode { } preserve_partitioning__ = Some(map_.next_value()?); } + GeneratedField::DynamicFilter => { + if dynamic_filter__.is_some() { + return Err(serde::de::Error::duplicate_field("dynamicFilter")); + } + dynamic_filter__ = map_.next_value()?; + } } } Ok(SortExecNode { @@ -21715,6 +21934,7 @@ impl<'de> serde::Deserialize<'de> for SortExecNode { expr: expr__.unwrap_or_default(), fetch: fetch__.unwrap_or_default(), preserve_partitioning: preserve_partitioning__.unwrap_or_default(), + dynamic_filter: dynamic_filter__, }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index a0d4ef9e973c4..3ff96dc8dcdcf 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1294,7 +1294,7 @@ pub struct PhysicalExprNode { pub expr_id: ::core::option::Option, #[prost( oneof = "physical_expr_node::ExprType", - tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 18, 19, 20, 21" + tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 18, 19, 20, 21, 23" )] pub expr_type: ::core::option::Option, } @@ -1347,8 +1347,38 @@ pub mod physical_expr_node { UnknownColumn(super::UnknownColumn), #[prost(message, tag = "21")] HashExpr(super::PhysicalHashExprNode), + /// Ported from apache/datafusion#21807 (minimal subset). Allows a + /// DynamicFilterPhysicalExpr to roundtrip through proto with its wrapper + /// intact, so the receiver (combined with DeduplicatingDeserializer) + /// can rebuild a shared `Arc` across e.g. SortExec.filter and the + /// FileScan predicate pushed down from it. + /// + /// Field 22 is intentionally skipped to match upstream's field number + /// for a future scalar_subquery variant atlas doesn't need yet. + #[prost(message, tag = "23")] + DynamicFilter(::prost::alloc::boxed::Box), } } +/// Ported from apache/datafusion#21807. See `dynamic_filter = 23` above. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PhysicalDynamicFilterNode { + #[prost(message, repeated, tag = "1")] + pub children: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag = "2")] + pub remapped_children: ::prost::alloc::vec::Vec, + #[prost(uint64, tag = "3")] + pub generation: u64, + #[prost(message, optional, boxed, tag = "4")] + pub inner_expr: ::core::option::Option<::prost::alloc::boxed::Box>, + #[prost(bool, tag = "5")] + pub is_complete: bool, + /// Stable identity of this dynamic filter. References with the same id + /// MUST deserialize to the same `Arc` so heap-max + /// updates propagate from SortExec to the FileScan predicate it was + /// pushed down to. + #[prost(uint64, tag = "6")] + pub expression_id: u64, +} #[derive(Clone, PartialEq, ::prost::Message)] pub struct PhysicalScalarUdfNode { #[prost(string, tag = "1")] @@ -1920,6 +1950,15 @@ pub struct SortExecNode { pub fetch: i64, #[prost(bool, tag = "4")] pub preserve_partitioning: bool, + /// Ported from apache/datafusion#22011 (minimal subset). Carries the + /// SortExec's internal `DynamicFilterPhysicalExpr` so the decode side + /// can install it via `with_dynamic_filter_expr` instead of letting + /// `with_fetch(...).create_filter()` mint a brand-new one. Combined with + /// the #21807 PhysicalDynamicFilterNode + dedup-by-expression_id, the + /// resulting decoded plan re-shares the `Arc` with the pushed-down + /// FileScan predicate (X-2935 walker becomes redundant). + #[prost(message, optional, tag = "5")] + pub dynamic_filter: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct SortPreservingMergeExecNode { diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index e424be162648b..6e6ab84307191 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -38,6 +38,9 @@ use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_execution::{FunctionRegistry, TaskContext}; use datafusion_expr::WindowFunctionDefinition; use datafusion_expr::dml::InsertOp; +use datafusion_physical_expr::expressions::{ + DynamicFilterInner, DynamicFilterPhysicalExpr, +}; use datafusion_physical_expr::projection::{ProjectionExpr, ProjectionExprs}; use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr, ScalarFunctionExpr}; use datafusion_physical_plan::expressions::{ @@ -503,7 +506,68 @@ pub fn parse_physical_expr_with_converter( proto_converter.proto_to_physical_expr(e, ctx, input_schema, codec) }) .collect::>()?; - codec.try_decode_expr(extension.expr.as_slice(), &inputs)? as _ + // Use the converter-aware decode entry point so extension codecs + // that embed nested `PhysicalExprNode` fields can thread the + // active `proto_converter` into their own parsing of those + // nested expressions, sharing the dedup cache. + codec.try_decode_expr(extension.expr.as_slice(), &inputs, proto_converter)? + as _ + } + ExprType::DynamicFilter(df_proto) => { + // Reconstruct the DynamicFilterPhysicalExpr wrapper. When this + // path is reached through a `DeduplicatingDeserializer`, + // identical expression_id references in the same plan share + // the resulting Arc via the deserializer's cache, so + // SortExec heap-max updates propagate to the FileScan predicate + // it was pushed down to. + // + // Ported from upstream apache/datafusion#21807 (minimal subset). + let inner_expr = match df_proto.inner_expr.as_deref() { + Some(p) => { + proto_converter.proto_to_physical_expr(p, ctx, input_schema, codec)? + } + None => { + return Err(proto_error( + "PhysicalDynamicFilterNode missing inner_expr", + )); + } + }; + let children = df_proto + .children + .iter() + .map(|c| { + proto_converter.proto_to_physical_expr(c, ctx, input_schema, codec) + }) + .collect::>>()?; + let remapped_children = if df_proto.remapped_children.is_empty() { + None + } else { + Some( + df_proto + .remapped_children + .iter() + .map(|c| { + proto_converter.proto_to_physical_expr( + c, + ctx, + input_schema, + codec, + ) + }) + .collect::>>()?, + ) + }; + let inner = DynamicFilterInner { + expression_id: df_proto.expression_id, + generation: df_proto.generation, + expr: inner_expr, + is_complete: df_proto.is_complete, + }; + Arc::new(DynamicFilterPhysicalExpr::from_parts( + children, + remapped_children, + inner, + )) } }; diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 85406e31da614..19546d3150039 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::any::Any; use std::cell::RefCell; use std::collections::HashMap; use std::fmt::Debug; @@ -56,6 +57,8 @@ use datafusion_functions_table::generate_series::{ }; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; use datafusion_physical_expr::async_scalar_function::AsyncFuncExpr; +use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; +use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{LexOrdering, LexRequirement, PhysicalExprRef}; use datafusion_physical_plan::aggregates::{ AggregateExec, AggregateMode, LimitOptions, PhysicalGroupBy, @@ -569,7 +572,7 @@ impl protobuf::PhysicalPlanNode { } let mut buf: Vec = vec![]; - match codec.try_encode(Arc::clone(&plan_clone), &mut buf) { + match codec.try_encode(Arc::clone(&plan_clone), &mut buf, proto_converter) { Ok(_) => { let inputs: Vec = plan_clone .children() @@ -1668,6 +1671,52 @@ impl protobuf::PhysicalPlanNode { .with_fetch(fetch) .with_preserve_partitioning(sort.preserve_partitioning); + // If the encoder carried the SortExec's internal dynamic filter, + // install it via `with_dynamic_filter_expr` to replace the fresh + // one minted by `with_fetch(...).create_filter()`. This is what + // makes the SortExec's filter Arc re-share its `Inner` with the + // pushed-down FileScan predicate on the decode side (via the + // `DeduplicatingDeserializer` cache keyed on `expression_id`). + // + // Ported from apache/datafusion#22011 (minimal subset). + let new_sort = if let Some(dynamic_filter_proto) = &sort.dynamic_filter { + let dynamic_filter_expr = proto_converter.proto_to_physical_expr( + dynamic_filter_proto, + ctx, + new_sort.input().schema().as_ref(), + codec, + )?; + // After the #21807 port the decoded expression IS a + // `DynamicFilterPhysicalExpr` (since serialize skips snapshot + // for this type), so the downcast succeeds and Arc identity + // is preserved across the call. + let df = match (Arc::clone(&dynamic_filter_expr) + as Arc) + .downcast::() + { + Ok(df) => df, + Err(_) => { + // Legacy fallback: an encoder that snapshotted the + // wrapper away leaves us with a raw expression. Wrap + // it in a fresh DynamicFilterPhysicalExpr so the + // SortExec can still install it -- Arc sharing won't + // be preserved in this branch, matching pre-#21807 + // behavior. + let children = collect_columns(&dynamic_filter_expr) + .into_iter() + .map(|c| Arc::new(c) as Arc) + .collect::>(); + Arc::new(DynamicFilterPhysicalExpr::new( + children, + dynamic_filter_expr, + )) + } + }; + new_sort.with_dynamic_filter_expr(df)? + } else { + new_sort + }; + Ok(Arc::new(new_sort)) } @@ -1739,7 +1788,8 @@ impl protobuf::PhysicalPlanNode { .map(|i| proto_converter.proto_to_execution_plan(ctx, codec, i)) .collect::>()?; - let extension_node = codec.try_decode(extension.node.as_slice(), &inputs, ctx)?; + let extension_node = + codec.try_decode(extension.node.as_slice(), &inputs, ctx, proto_converter)?; Ok(extension_node) } @@ -3084,6 +3134,21 @@ impl protobuf::PhysicalPlanNode { }) }) .collect::>>()?; + // Carry the SortExec's internal TopK dynamic filter so the decoder + // can re-install it via `with_dynamic_filter_expr` instead of letting + // `with_fetch(...).create_filter()` mint a brand-new one. Combined + // with the #21807 dedup-by-expression_id machinery, this is what + // re-shares `Arc` with the pushed-down FileScan predicate + // across the proto roundtrip (X-2935). + // + // Ported from apache/datafusion#22011 (minimal subset). + let dynamic_filter = exec + .dynamic_filter_expr() + .map(|df| { + let df_expr: Arc = df as Arc; + proto_converter.physical_expr_to_proto(&df_expr, codec) + }) + .transpose()?; Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::Sort(Box::new( protobuf::SortExecNode { @@ -3094,6 +3159,7 @@ impl protobuf::PhysicalPlanNode { _ => -1, }, preserve_partitioning: exec.preserve_partitioning(), + dynamic_filter, }, ))), }) @@ -3653,14 +3719,33 @@ pub trait AsExecutionPlan: Debug + Send + Sync + Clone { } pub trait PhysicalExtensionCodec: Debug + Send + Sync { + /// Decode an extension execution plan. + /// + /// `proto_converter` is the active conversion strategy (e.g. + /// `DeduplicatingDeserializer`). Codecs whose custom proto embeds + /// nested `PhysicalExprNode` fields (e.g. a predicate on a custom + /// file source) should route those through + /// `proto_converter.proto_to_physical_expr` so the dedup cache + /// extends across the extension boundary. + /// + /// Mirrors the upstream apache/datafusion pattern (#22920). fn try_decode( &self, buf: &[u8], inputs: &[Arc], ctx: &TaskContext, + proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result>; - fn try_encode(&self, node: Arc, buf: &mut Vec) -> Result<()>; + /// Encode an extension execution plan. + /// + /// See [`Self::try_decode`] for the role of `proto_converter`. + fn try_encode( + &self, + node: Arc, + buf: &mut Vec, + proto_converter: &dyn PhysicalProtoConverterExtension, + ) -> Result<()>; fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result> { not_impl_err!("PhysicalExtensionCodec is not provided for scalar function {name}") @@ -3670,18 +3755,26 @@ pub trait PhysicalExtensionCodec: Debug + Send + Sync { Ok(()) } + /// Decode an extension expression. + /// + /// See [`Self::try_decode`] for the role of `proto_converter`. fn try_decode_expr( &self, _buf: &[u8], _inputs: &[Arc], + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { not_impl_err!("PhysicalExtensionCodec is not provided") } + /// Encode an extension expression. + /// + /// See [`Self::try_decode`] for the role of `proto_converter`. fn try_encode_expr( &self, _node: &Arc, _buf: &mut Vec, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result<()> { not_impl_err!("PhysicalExtensionCodec is not provided") } @@ -3714,6 +3807,7 @@ impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec { _buf: &[u8], _inputs: &[Arc], _ctx: &TaskContext, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { not_impl_err!("PhysicalExtensionCodec is not provided") } @@ -3722,6 +3816,7 @@ impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec { &self, _node: Arc, _buf: &mut Vec, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result<()> { not_impl_err!("PhysicalExtensionCodec is not provided") } @@ -3821,21 +3916,32 @@ impl PhysicalProtoConverterExtension for DefaultPhysicalProtoConverter { } } -/// Internal serializer that adds expr_id to expressions. -/// Created fresh for each serialization operation. -struct DeduplicatingSerializer { +/// Serializer that adds an Arc-identity-derived expr_id to every emitted +/// `PhysicalExprNode`. Created fresh for each serialization operation. +/// +/// Made `pub` so atlas's coordinator-side codec can opt into Arc-identity +/// dedup. Combined with the apache/datafusion#21807 minimal port that +/// keeps `DynamicFilterPhysicalExpr` alive across the wire, this is what +/// lets atlas retire the post-deserialize walker (X-2935). +pub struct DeduplicatingSerializer { /// Random salt combined with pointer addresses and process ID to create globally unique expr_ids. session_id: u64, } impl DeduplicatingSerializer { - fn new() -> Self { + pub fn new() -> Self { Self { session_id: rand::random(), } } } +impl Default for DeduplicatingSerializer { + fn default() -> Self { + Self::new() + } +} + impl PhysicalProtoConverterExtension for DeduplicatingSerializer { fn proto_to_execution_plan( &self, @@ -3881,13 +3987,35 @@ impl PhysicalProtoConverterExtension for DeduplicatingSerializer { ) -> Result { let mut proto = serialize_physical_expr_with_converter(expr, codec, self)?; - // Hash session_id, pointer address, and process ID together to create expr_id. - // - session_id: random per serializer, prevents collisions when merging serializations - // - ptr: unique address per Arc within a process - // - pid: prevents collisions if serializer is shared across processes + // Pick the dedup identity for this expression. + // + // 1. If the expression reports a stable `expression_id()` (currently + // `DynamicFilterPhysicalExpr` does), use that. This is preserved + // across `with_new_children`, so two outer Arcs that share the + // same `Inner` (e.g. SortExec.filter and the FileScan predicate + // that `FilterPushdown` clones from it) still hash to the same + // expr_id and reconstruct to one Arc on decode. + // Mirrors upstream apache/datafusion#21807 behavior. + // 2. Otherwise fall back to `Arc::as_ptr` for plain expressions + // where Arc identity is the only sharing signal we have. + // + // session_id + pid are mixed in either way to avoid collisions if + // serializations from different processes are concatenated. let mut hasher = DefaultHasher::new(); self.session_id.hash(&mut hasher); - (Arc::as_ptr(expr) as *const () as u64).hash(&mut hasher); + match expr.expression_id() { + Some(eid) => { + // Tag the identity space so a 0 expression_id can't collide + // with the 0 address fallback. Tag value is arbitrary, just + // must differ between the two branches. + 0u8.hash(&mut hasher); + eid.hash(&mut hasher); + } + None => { + 1u8.hash(&mut hasher); + (Arc::as_ptr(expr) as *const () as u64).hash(&mut hasher); + } + } std::process::id().hash(&mut hasher); proto.expr_id = Some(hasher.finish()); @@ -3895,14 +4023,24 @@ impl PhysicalProtoConverterExtension for DeduplicatingSerializer { } } -/// Internal deserializer that caches expressions by expr_id. -/// Created fresh for each deserialization operation. +/// Deserializer that caches expressions by `expr_id` so multiple references +/// in a plan reconstruct to the same `Arc`. Created fresh for each +/// deserialization operation. +/// +/// Made `pub` for atlas's data-server-side codec; see +/// [`DeduplicatingSerializer`]. #[derive(Default)] -struct DeduplicatingDeserializer { +pub struct DeduplicatingDeserializer { /// Cache mapping expr_id to deserialized expressions. cache: RefCell>>, } +impl DeduplicatingDeserializer { + pub fn new() -> Self { + Self::default() + } +} + impl PhysicalProtoConverterExtension for DeduplicatingDeserializer { fn proto_to_execution_plan( &self, @@ -4106,12 +4244,22 @@ impl PhysicalExtensionCodec for ComposedPhysicalExtensionCodec { buf: &[u8], inputs: &[Arc], ctx: &TaskContext, + proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { - self.decode_protobuf(buf, |codec, data| codec.try_decode(data, inputs, ctx)) + self.decode_protobuf(buf, |codec, data| { + codec.try_decode(data, inputs, ctx, proto_converter) + }) } - fn try_encode(&self, node: Arc, buf: &mut Vec) -> Result<()> { - self.encode_protobuf(buf, |codec, data| codec.try_encode(Arc::clone(&node), data)) + fn try_encode( + &self, + node: Arc, + buf: &mut Vec, + proto_converter: &dyn PhysicalProtoConverterExtension, + ) -> Result<()> { + self.encode_protobuf(buf, |codec, data| { + codec.try_encode(Arc::clone(&node), data, proto_converter) + }) } fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result> { diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index de2f36e81e3ba..d9905954678fa 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -32,6 +32,7 @@ use datafusion_datasource_json::file_format::JsonSink; use datafusion_datasource_parquet::file_format::ParquetSink; use datafusion_expr::WindowFrame; use datafusion_physical_expr::ScalarFunctionExpr; +use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr}; use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; @@ -252,11 +253,76 @@ pub fn serialize_physical_expr( /// serialization of udfs requiring specialized serialization (see [`PhysicalExtensionCodec::try_encode_udf`]). /// A [`PhysicalProtoConverterExtension`] can be provided to handle the /// conversion process (see [`PhysicalProtoConverterExtension::physical_expr_to_proto`]). +/// Serialize a `DynamicFilterPhysicalExpr` as a `PhysicalDynamicFilterNode` +/// without snapshotting it away. Children are routed through `proto_converter` +/// so a `DeduplicatingSerializer` wrapping this call captures Arc identity +/// for nested expressions too. +/// +/// Ported from upstream apache/datafusion#21807 (minimal subset). +fn serialize_dynamic_filter( + value: &Arc, + codec: &dyn PhysicalExtensionCodec, + proto_converter: &dyn PhysicalProtoConverterExtension, +) -> Result { + let df = value + .as_any() + .downcast_ref::() + .expect("caller already checked downcast"); + + let children = df + .original_children() + .iter() + .map(|child| proto_converter.physical_expr_to_proto(child, codec)) + .collect::>>()?; + let remapped_children = if let Some(remapped) = df.remapped_children() { + remapped + .iter() + .map(|child| proto_converter.physical_expr_to_proto(child, codec)) + .collect::>>()? + } else { + vec![] + }; + // Atomic snapshot of inner state; carry the raw expr (not its snapshot) + // so the receiver gets the same logical filter we hold. + let inner = df.inner(); + let inner_expr = + Box::new(proto_converter.physical_expr_to_proto(&inner.expr, codec)?); + + Ok(protobuf::PhysicalExprNode { + expr_id: None, + expr_type: Some(protobuf::physical_expr_node::ExprType::DynamicFilter( + Box::new(protobuf::PhysicalDynamicFilterNode { + children, + remapped_children, + generation: inner.generation, + inner_expr: Some(inner_expr), + is_complete: inner.is_complete, + expression_id: inner.expression_id, + }), + )), + }) +} + pub fn serialize_physical_expr_with_converter( value: &Arc, codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result { + // Special case ported from upstream apache/datafusion#21807: a + // DynamicFilterPhysicalExpr must NOT be snapshotted before + // serialization. Snapshotting collapses it to its current inner + // expression (typically lit(true) before TopK fills its heap) and + // discards the wrapper. Without the wrapper the receiver can't + // reconstruct shared Arc identity across SortExec.filter and + // the FileScan predicate it was pushed down into. + if value + .as_any() + .downcast_ref::() + .is_some() + { + return serialize_dynamic_filter(value, codec, proto_converter); + } + // Snapshot the expr in case it has dynamic predicate state so // it can be serialized let value = snapshot_physical_expr(Arc::clone(value))?; @@ -510,7 +576,12 @@ pub fn serialize_physical_expr_with_converter( }) } else { let mut buf: Vec = vec![]; - match codec.try_encode_expr(&value, &mut buf) { + // Use the converter-aware encode entry point so extension codecs + // that embed nested `PhysicalExprNode` fields can thread the + // active `proto_converter` (e.g. `DeduplicatingSerializer`) into + // their own serialization of those nested expressions, preserving + // Arc-identity dedup end-to-end. + match codec.try_encode_expr(&value, &mut buf, proto_converter) { Ok(_) => { let inputs: Vec = value .children() diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 66ca903e4ec8a..b3e0a3ebcf88b 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -1141,6 +1141,7 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> { _buf: &[u8], _inputs: &[Arc], _ctx: &TaskContext, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { unreachable!() } @@ -1149,6 +1150,7 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> { &self, _node: Arc, _buf: &mut Vec, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result<()> { unreachable!() } @@ -1157,6 +1159,7 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> { &self, buf: &[u8], inputs: &[Arc], + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { if buf == "CustomPredicateExpr".as_bytes() { Ok(Arc::new(CustomPredicateExpr { @@ -1171,6 +1174,7 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> { &self, node: &Arc, buf: &mut Vec, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result<()> { if node .as_any() @@ -1254,6 +1258,7 @@ impl PhysicalExtensionCodec for UDFExtensionCodec { _buf: &[u8], _inputs: &[Arc], _ctx: &TaskContext, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { not_impl_err!("No extension codec provided") } @@ -1262,6 +1267,7 @@ impl PhysicalExtensionCodec for UDFExtensionCodec { &self, _node: Arc, _buf: &mut Vec, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result<()> { not_impl_err!("No extension codec provided") } @@ -3167,3 +3173,252 @@ fn roundtrip_lead_with_default_value() -> Result<()> { true, )?)) } + +/// Validates the apache/datafusion#21807 minimal port: a +/// `DynamicFilterPhysicalExpr` survives proto roundtrip (the wrapper is no +/// longer snapshotted away), and the existing `DeduplicatingSerializer` + +/// `DeduplicatingDeserializer` pair share a single `Arc` for two refs of +/// the same logical filter. This is what lets atlas drop the post-decode +/// walker (X-2935). +#[test] +fn dynamic_filter_dedup_with_deduplicating_codec() -> Result<()> { + use datafusion::physical_plan::expressions::Column; + use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; + use datafusion_proto::physical_plan::{ + DeduplicatingDeserializer, DeduplicatingSerializer, + PhysicalProtoConverterExtension, + }; + + let schema = Schema::new(vec![Field::new("a", DataType::Int64, false)]); + let initial: Arc = lit(true); + let children: Vec> = vec![Arc::new(Column::new("a", 0))]; + let df = Arc::new(DynamicFilterPhysicalExpr::new( + children, + Arc::clone(&initial), + )); + let id_before = df.inner().expression_id; + + let codec = DefaultPhysicalExtensionCodec {}; + let serializer = DeduplicatingSerializer::new(); + let proto1 = serializer + .physical_expr_to_proto(&(df.clone() as Arc), &codec)?; + let proto2 = serializer + .physical_expr_to_proto(&(df.clone() as Arc), &codec)?; + + assert_eq!( + proto1.expr_id, proto2.expr_id, + "two refs to the same Arc must get the same expr_id" + ); + + let ctx = SessionContext::new().task_ctx(); + let deserializer = DeduplicatingDeserializer::new(); + let d1 = deserializer.proto_to_physical_expr(&proto1, &ctx, &schema, &codec)?; + let d2 = deserializer.proto_to_physical_expr(&proto2, &ctx, &schema, &codec)?; + + assert!( + Arc::ptr_eq(&d1, &d2), + "DeduplicatingDeserializer must return the same Arc for two refs with the same expr_id" + ); + + let d1_df = d1 + .as_any() + .downcast_ref::() + .expect( + "decoded expr must be DynamicFilterPhysicalExpr; snapshot path is bypassed", + ); + assert_eq!( + d1_df.inner().expression_id, + id_before, + "expression_id must survive proto roundtrip" + ); + + Ok(()) +} + +/// Two distinct outer Arcs that share the same `Inner` (e.g. via +/// `with_new_children`) must still dedup to the same decoded Arc, because +/// `DeduplicatingSerializer` now hashes on `expression_id` (which is +/// preserved across `with_new_children`) rather than `Arc::as_ptr`. +/// +/// This is the case `FilterPushdown` produces: it clones the SortExec's +/// dyn filter and pushes it down to the FileScan; the FileScan's outer +/// Arc may differ from the SortExec's, but they share the same Inner. +#[test] +fn dynamic_filter_dedup_distinct_outer_arcs_same_inner() -> Result<()> { + use datafusion::physical_plan::expressions::Column; + use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; + use datafusion_proto::physical_plan::{ + DeduplicatingDeserializer, DeduplicatingSerializer, + PhysicalProtoConverterExtension, + }; + + let schema = Schema::new(vec![Field::new("a", DataType::Int64, false)]); + let initial: Arc = lit(true); + let col_a: Arc = Arc::new(Column::new("a", 0)); + let df_arc1: Arc = Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::clone(&col_a)], + Arc::clone(&initial), + )); + let expected_id = df_arc1 + .as_any() + .downcast_ref::() + .unwrap() + .inner() + .expression_id; + + // Build a second outer Arc that shares df_arc1's Inner via with_new_children. + // This mimics what FilterPushdown does when pushing a dyn filter through a + // ProjectionExec: it preserves the inner state Arc, only the outer wrapper + // is new. + let df_arc2 = Arc::clone(&df_arc1).with_new_children(vec![col_a])?; + assert!( + !Arc::ptr_eq(&df_arc1, &df_arc2), + "with_new_children must yield a distinct outer Arc; otherwise this test is trivial" + ); + assert_eq!( + df_arc2 + .as_any() + .downcast_ref::() + .unwrap() + .inner() + .expression_id, + expected_id, + "expression_id must be preserved across with_new_children", + ); + + let codec = DefaultPhysicalExtensionCodec {}; + let serializer = DeduplicatingSerializer::new(); + let proto1 = serializer.physical_expr_to_proto(&df_arc1, &codec)?; + let proto2 = serializer.physical_expr_to_proto(&df_arc2, &codec)?; + assert_eq!( + proto1.expr_id, proto2.expr_id, + "DeduplicatingSerializer must hash on expression_id, NOT Arc::as_ptr, \ + so two outer Arcs sharing the same Inner get the same wire expr_id", + ); + + let ctx = SessionContext::new().task_ctx(); + let deserializer = DeduplicatingDeserializer::new(); + let d1 = deserializer.proto_to_physical_expr(&proto1, &ctx, &schema, &codec)?; + let d2 = deserializer.proto_to_physical_expr(&proto2, &ctx, &schema, &codec)?; + assert!( + Arc::ptr_eq(&d1, &d2), + "Distinct-outer same-Inner refs must reconstruct to one Arc" + ); + + Ok(()) +} + +/// Without the deduplicating codec, two decodes still both reconstruct the +/// wrapper (no snapshotting) but get distinct Arcs. Guards the invariant +/// that the wire format itself is dedup-agnostic; dedup is the codec's +/// job, not the proto layer's. +#[test] +fn dynamic_filter_roundtrip_without_dedup() -> Result<()> { + use datafusion::physical_plan::expressions::Column; + use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; + use datafusion_proto::physical_plan::from_proto::parse_physical_expr; + use datafusion_proto::physical_plan::to_proto::serialize_physical_expr; + + let schema = Schema::new(vec![Field::new("a", DataType::Int64, false)]); + let initial: Arc = lit(true); + let children: Vec> = vec![Arc::new(Column::new("a", 0))]; + let df = Arc::new(DynamicFilterPhysicalExpr::new( + children, + Arc::clone(&initial), + )); + let id_before = df.inner().expression_id; + + let codec = DefaultPhysicalExtensionCodec {}; + let proto = serialize_physical_expr(&(df.clone() as Arc), &codec)?; + let ctx = SessionContext::new().task_ctx(); + + let d1 = parse_physical_expr(&proto, &ctx, &schema, &codec)?; + let d2 = parse_physical_expr(&proto, &ctx, &schema, &codec)?; + + // Default codec produces distinct Arcs, but each is still a + // DynamicFilterPhysicalExpr (the wrapper survived). + assert!(!Arc::ptr_eq(&d1, &d2)); + for d in [&d1, &d2] { + let inner = d + .as_any() + .downcast_ref::() + .expect("decoded expr must be DynamicFilterPhysicalExpr") + .inner(); + assert_eq!(inner.expression_id, id_before); + } + Ok(()) +} + +/// Validates the apache/datafusion#22011 minimal port: +/// `SortExecNode` now carries its internal `DynamicFilterPhysicalExpr` on +/// the wire, and `try_into_sort_physical_plan` re-installs it via +/// `with_dynamic_filter_expr` instead of letting `with_fetch(...).create_filter()` +/// mint a fresh one. +/// +/// With this in place, the SortExec's filter and any other reference to the +/// same logical filter (e.g. a FileScan predicate after FilterPushdown) +/// share the SAME `Arc` post-decode via the +/// `DeduplicatingDeserializer` cache keyed on `expression_id`. +#[test] +fn sort_exec_proto_roundtrip_preserves_dyn_filter_arc() -> Result<()> { + use datafusion::physical_plan::expressions::Column; + use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; + use datafusion_proto::physical_plan::{ + DeduplicatingDeserializer, DeduplicatingSerializer, + DefaultPhysicalExtensionCodec, PhysicalProtoConverterExtension, + }; + use datafusion_proto::protobuf::PhysicalPlanNode; + use prost::Message; + + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + let initial: Arc = lit(true); + let children: Vec> = vec![Arc::new(Column::new("a", 0))]; + let df = Arc::new(DynamicFilterPhysicalExpr::new( + children, + Arc::clone(&initial), + )); + let expected_id = df.inner().expression_id; + + let ordering = LexOrdering::new(vec![PhysicalSortExpr { + expr: Arc::new(Column::new("a", 0)), + options: SortOptions { + descending: false, + nulls_first: false, + }, + }]) + .unwrap(); + let input: Arc = Arc::new(EmptyExec::new(Arc::clone(&schema))); + let sort = Arc::new( + SortExec::new(ordering, input) + .with_fetch(Some(10)) + .with_dynamic_filter_expr(Arc::clone(&df))?, + ) as Arc; + + let codec = DefaultPhysicalExtensionCodec {}; + let serializer = DeduplicatingSerializer::new(); + let proto = serializer.execution_plan_to_proto(&sort, &codec)?; + let buf = proto.encode_to_vec(); + let decoded_proto = PhysicalPlanNode::decode(buf.as_slice()).map_err(|e| { + datafusion_common::DataFusionError::Internal(format!("decode failed: {e}")) + })?; + + let ctx = SessionContext::new().task_ctx(); + let deserializer = DeduplicatingDeserializer::new(); + let decoded = deserializer.proto_to_execution_plan(&ctx, &codec, &decoded_proto)?; + + let decoded_sort = decoded + .as_any() + .downcast_ref::() + .expect("decoded plan must be SortExec"); + let decoded_df = decoded_sort + .dynamic_filter_expr() + .expect("decoded SortExec must have a dynamic filter installed via with_dynamic_filter_expr"); + + assert_eq!( + decoded_df.inner().expression_id, + expected_id, + "SortExec proto must carry expression_id end-to-end so the decoder \ + can re-share Arc with the FileScan predicate via the dedup cache", + ); + Ok(()) +}