From 47f263d1fda8d50c69aab97b2925ed4dc7d6e042 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Thu, 11 Jun 2026 21:17:00 +0800 Subject: [PATCH 1/4] [X-2935] proto: serialize and dedupe DynamicFilterPhysicalExpr (minimal port of apache/datafusion#21807) Adds expression_id to DynamicFilterPhysicalExpr Inner (atomic counter, preserved across update() and with_new_children) and a corresponding PhysicalDynamicFilterNode proto message. Serializer special-cases DynamicFilterPhysicalExpr to skip snapshot() and emit the wrapper full. Deserializer reconstructs the wrapper. Combined with the existing DeduplicatingSerializer + DeduplicatingDeserializer pair on branch-53, the wire-level Arc identity dedup now works for dynamic filters too: SortExec.filter and the FileScan predicate it was pushed down to reconstruct to the same Arc on the data server, so heap-max updates propagate end-to-end without the atlas-side share_dynamic_filters walker. Minimal port: skips upstream's ExpressionPlacement enum, snapshot() refactor, and DedupingSerializer removal. Wire layout (field 23 oneof variant + PhysicalDynamicFilterNode fields) mirrors upstream so a future DF upgrade can drop this patch mechanically. DeduplicatingSerializer and DeduplicatingDeserializer are now pub so atlas's coordinator-side and data-server-side codecs can opt in. --- .../physical-expr-common/src/physical_expr.rs | 18 ++ .../src/expressions/dynamic_filters.rs | 136 +++++++++++- .../physical-expr/src/expressions/mod.rs | 4 +- datafusion/proto/proto/datafusion.proto | 24 +++ datafusion/proto/src/generated/pbjson.rs | 202 ++++++++++++++++++ datafusion/proto/src/generated/prost.rs | 32 ++- .../proto/src/physical_plan/from_proto.rs | 59 +++++ datafusion/proto/src/physical_plan/mod.rs | 35 ++- .../proto/src/physical_plan/to_proto.rs | 66 ++++++ .../tests/cases/roundtrip_physical_plan.rs | 94 ++++++++ 10 files changed, 651 insertions(+), 19 deletions(-) diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index 7107b0a9004d3..a760409038cf4 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -441,6 +441,24 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { fn placement(&self) -> ExpressionPlacement { ExpressionPlacement::KeepInPlace } + + /// Stable, globally-unique identifier for this [`PhysicalExpr`], if any. + /// + /// Connected expressions (e.g. `DynamicFilterPhysicalExpr`s where two + /// outer Arc instances share the same mutable inner state via + /// [`PhysicalExpr::with_new_children`]) must report the same id. Proto + /// (de)serialization uses this to dedupe across the wire so the + /// reconstructed plan keeps `Arc` shared. + /// + /// Default is `None`: no identity worth preserving across a + /// serialization boundary. + /// + /// Ported from upstream apache/datafusion#21807 to make + /// `DynamicFilterPhysicalExpr` dedup robust to pushdown rewriting on + /// branch-53. + fn expression_id(&self) -> Option { + None + } } #[deprecated( diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index d285f8b377eca..e7e183d7ff778 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -16,6 +16,7 @@ // under the License. use parking_lot::RwLock; +use std::sync::atomic::{AtomicU64, Ordering}; use std::{any::Any, fmt::Display, hash::Hash, sync::Arc}; use tokio::sync::watch; @@ -76,21 +77,50 @@ pub struct DynamicFilterPhysicalExpr { nullable: Arc>>, } -#[derive(Debug)] -struct Inner { - /// A counter that gets incremented every time the expression is updated so that we can track changes cheaply. - /// This is used for [`PhysicalExpr::snapshot_generation`] to have a cheap check for changes. - generation: u64, - expr: Arc, - /// Flag for quick synchronous check if filter is complete. - /// This is redundant with the watch channel state, but allows us to return immediately - /// from `wait_complete()` without subscribing if already complete. - is_complete: bool, +/// Atomic internal state of a [`DynamicFilterPhysicalExpr`]. +/// +/// `expression_id` identifies the actual filter expression. Derived +/// `DynamicFilterPhysicalExpr`s (e.g. via [`PhysicalExpr::with_new_children`]) +/// are the same logical filter and must report the same id. Proto +/// (de)serialization uses this id to dedupe references on the wire so the +/// reconstructed plan keeps `Arc` shared between e.g. SortExec.filter +/// and the FileScan predicate it was pushed down to. +/// +/// **Warning:** exposed publicly only so that `datafusion-proto` can read +/// and rebuild this state. Not a stable API. +/// +/// Ported from upstream apache/datafusion#21807 (minimal subset) so +/// branch-53 can drop the atlas-side post-deserialize walker. +#[derive(Clone)] +pub struct Inner { + /// Stable id, preserved across `update()` and `with_new_children`. + pub expression_id: u64, + /// Incremented on every `update()` so that [`PhysicalExpr::snapshot_generation`] + /// is a cheap "did anything change?" probe. + pub generation: u64, + pub expr: Arc, + /// Synchronous check for completion; redundant with the watch channel + /// but lets `wait_complete()` return without subscribing. + pub is_complete: bool, +} + +// Exclude `expression_id` from Debug so existing roundtrip Debug comparisons +// in tests still pass even when only some plan-node decoders carry the +// dynamic filter through proto. +impl std::fmt::Debug for Inner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Inner") + .field("generation", &self.generation) + .field("expr", &self.expr) + .field("is_complete", &self.is_complete) + .finish() + } } impl Inner { fn new(expr: Arc) -> Self { Self { + expression_id: EXPR_ID_SOURCE.next(), // Start with generation 1 which gives us a different result for [`PhysicalExpr::generation`] than the default 0. // This is not currently used anywhere but it seems useful to have this simple distinction. generation: 1, @@ -105,6 +135,26 @@ impl Inner { } } +/// An atomic counter used to generate monotonic u64 ids for +/// `DynamicFilterPhysicalExpr` instances. +struct ExpressionIdAtomicCounter { + inner: AtomicU64, +} + +impl ExpressionIdAtomicCounter { + const fn new() -> Self { + Self { + inner: AtomicU64::new(0), + } + } + + fn next(&self) -> u64 { + self.inner.fetch_add(1, Ordering::Relaxed) + } +} + +static EXPR_ID_SOURCE: ExpressionIdAtomicCounter = ExpressionIdAtomicCounter::new(); + impl Hash for DynamicFilterPhysicalExpr { fn hash(&self, state: &mut H) { // Use pointer identity of the inner Arc for stable hashing. @@ -243,6 +293,10 @@ impl DynamicFilterPhysicalExpr { let mut current = self.inner.write(); let new_generation = current.generation + 1; *current = Inner { + // Preserve the expression id across updates so all references + // (e.g. SortExec.filter + a pushed-down FileScan predicate) + // continue to identify the same logical filter. + expression_id: current.expression_id, generation: new_generation, expr: new_expr, is_complete: current.is_complete, @@ -346,6 +400,64 @@ impl DynamicFilterPhysicalExpr { write!(f, " ]") } + + /// Return the filter's original children (before any remapping). + /// + /// **Warning:** intended only for `datafusion-proto` (de)serialization. + /// Not a stable API. + pub fn original_children(&self) -> &[Arc] { + &self.children + } + + /// Return the filter's remapped children, if any have been set via + /// [`PhysicalExpr::with_new_children`]. + /// + /// **Warning:** intended only for `datafusion-proto` (de)serialization. + /// Not a stable API. + pub fn remapped_children(&self) -> Option<&[Arc]> { + self.remapped_children.as_deref() + } + + /// Clone the atomically-captured `Inner` state. + /// + /// **Warning:** intended only for `datafusion-proto` (de)serialization. + /// Not a stable API. + pub fn inner(&self) -> Inner { + self.inner.read().clone() + } + + /// Rebuild a `DynamicFilterPhysicalExpr` from its stored parts. Used by + /// proto deserialization to reconstruct the filter; when paired with the + /// existing `DeduplicatingDeserializer` it restores shared `Arc` + /// identity across all references in the decoded plan. + /// + /// **Warning:** intended only for `datafusion-proto` (de)serialization. + /// Not a stable API. + pub fn from_parts( + children: Vec>, + remapped_children: Option>>, + inner: Inner, + ) -> Self { + let state = if inner.is_complete { + FilterState::Complete { + generation: inner.generation, + } + } else { + FilterState::InProgress { + generation: inner.generation, + } + }; + let (state_watch, _) = watch::channel(state); + + Self { + children, + remapped_children, + inner: Arc::new(RwLock::new(inner)), + state_watch, + data_type: Arc::new(RwLock::new(None)), + nullable: Arc::new(RwLock::new(None)), + } + } } impl PhysicalExpr for DynamicFilterPhysicalExpr { @@ -448,6 +560,10 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { // Return the current generation of the expression. self.inner.read().generation } + + fn expression_id(&self) -> Option { + Some(self.inner.read().expression_id) + } } #[cfg(test)] diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index c9e02708d6c28..b4f319cd61009 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -45,7 +45,9 @@ pub use cast::{CastExpr, cast}; pub use cast_column::CastColumnExpr; pub use column::{Column, col, with_new_schema}; pub use datafusion_expr::utils::format_state_name; -pub use dynamic_filters::DynamicFilterPhysicalExpr; +pub use dynamic_filters::{ + DynamicFilterPhysicalExpr, Inner as DynamicFilterInner, +}; pub use in_list::{InListExpr, in_list}; pub use is_not_null::{IsNotNullExpr, is_not_null}; pub use is_null::{IsNullExpr, is_null}; diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 37b31a84deab1..353ca9bbffc9f 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -897,9 +897,33 @@ message PhysicalExprNode { UnknownColumn unknown_column = 20; PhysicalHashExprNode hash_expr = 21; + + // Ported from apache/datafusion#21807 (minimal subset). Allows a + // DynamicFilterPhysicalExpr to roundtrip through proto with its wrapper + // intact, so the receiver (combined with DeduplicatingDeserializer) + // can rebuild a shared Arc across e.g. SortExec.filter and the + // FileScan predicate pushed down from it. + // + // Field 22 is intentionally skipped to match upstream's field number + // for a future scalar_subquery variant atlas doesn't need yet. + PhysicalDynamicFilterNode dynamic_filter = 23; } } +// Ported from apache/datafusion#21807. See `dynamic_filter = 23` above. +message PhysicalDynamicFilterNode { + repeated PhysicalExprNode children = 1; + repeated PhysicalExprNode remapped_children = 2; + uint64 generation = 3; + PhysicalExprNode inner_expr = 4; + bool is_complete = 5; + // Stable identity of this dynamic filter. References with the same id + // MUST deserialize to the same Arc so heap-max + // updates propagate from SortExec to the FileScan predicate it was + // pushed down to. + uint64 expression_id = 6; +} + message PhysicalScalarUdfNode { string name = 1; repeated PhysicalExprNode args = 2; diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 419105c40c792..cdc0095be78a5 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -16244,6 +16244,194 @@ impl<'de> serde::Deserialize<'de> for PhysicalDateTimeIntervalExprNode { deserializer.deserialize_struct("datafusion.PhysicalDateTimeIntervalExprNode", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for PhysicalDynamicFilterNode { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if !self.children.is_empty() { + len += 1; + } + if !self.remapped_children.is_empty() { + len += 1; + } + if self.generation != 0 { + len += 1; + } + if self.inner_expr.is_some() { + len += 1; + } + if self.is_complete { + len += 1; + } + if self.expression_id != 0 { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.PhysicalDynamicFilterNode", len)?; + if !self.children.is_empty() { + struct_ser.serialize_field("children", &self.children)?; + } + if !self.remapped_children.is_empty() { + struct_ser.serialize_field("remappedChildren", &self.remapped_children)?; + } + if self.generation != 0 { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("generation", ToString::to_string(&self.generation).as_str())?; + } + if let Some(v) = self.inner_expr.as_ref() { + struct_ser.serialize_field("innerExpr", v)?; + } + if self.is_complete { + struct_ser.serialize_field("isComplete", &self.is_complete)?; + } + if self.expression_id != 0 { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("expressionId", ToString::to_string(&self.expression_id).as_str())?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for PhysicalDynamicFilterNode { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "children", + "remapped_children", + "remappedChildren", + "generation", + "inner_expr", + "innerExpr", + "is_complete", + "isComplete", + "expression_id", + "expressionId", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Children, + RemappedChildren, + Generation, + InnerExpr, + IsComplete, + ExpressionId, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl serde::de::Visitor<'_> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "children" => Ok(GeneratedField::Children), + "remappedChildren" | "remapped_children" => Ok(GeneratedField::RemappedChildren), + "generation" => Ok(GeneratedField::Generation), + "innerExpr" | "inner_expr" => Ok(GeneratedField::InnerExpr), + "isComplete" | "is_complete" => Ok(GeneratedField::IsComplete), + "expressionId" | "expression_id" => Ok(GeneratedField::ExpressionId), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = PhysicalDynamicFilterNode; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.PhysicalDynamicFilterNode") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut children__ = None; + let mut remapped_children__ = None; + let mut generation__ = None; + let mut inner_expr__ = None; + let mut is_complete__ = None; + let mut expression_id__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Children => { + if children__.is_some() { + return Err(serde::de::Error::duplicate_field("children")); + } + children__ = Some(map_.next_value()?); + } + GeneratedField::RemappedChildren => { + if remapped_children__.is_some() { + return Err(serde::de::Error::duplicate_field("remappedChildren")); + } + remapped_children__ = Some(map_.next_value()?); + } + GeneratedField::Generation => { + if generation__.is_some() { + return Err(serde::de::Error::duplicate_field("generation")); + } + generation__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::InnerExpr => { + if inner_expr__.is_some() { + return Err(serde::de::Error::duplicate_field("innerExpr")); + } + inner_expr__ = map_.next_value()?; + } + GeneratedField::IsComplete => { + if is_complete__.is_some() { + return Err(serde::de::Error::duplicate_field("isComplete")); + } + is_complete__ = Some(map_.next_value()?); + } + GeneratedField::ExpressionId => { + if expression_id__.is_some() { + return Err(serde::de::Error::duplicate_field("expressionId")); + } + expression_id__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + } + } + Ok(PhysicalDynamicFilterNode { + children: children__.unwrap_or_default(), + remapped_children: remapped_children__.unwrap_or_default(), + generation: generation__.unwrap_or_default(), + inner_expr: inner_expr__, + is_complete: is_complete__.unwrap_or_default(), + expression_id: expression_id__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion.PhysicalDynamicFilterNode", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for PhysicalExprNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -16323,6 +16511,9 @@ impl serde::Serialize for PhysicalExprNode { physical_expr_node::ExprType::HashExpr(v) => { struct_ser.serialize_field("hashExpr", v)?; } + physical_expr_node::ExprType::DynamicFilter(v) => { + struct_ser.serialize_field("dynamicFilter", v)?; + } } } struct_ser.end() @@ -16369,6 +16560,8 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { "unknownColumn", "hash_expr", "hashExpr", + "dynamic_filter", + "dynamicFilter", ]; #[allow(clippy::enum_variant_names)] @@ -16393,6 +16586,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { Extension, UnknownColumn, HashExpr, + DynamicFilter, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -16434,6 +16628,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { "extension" => Ok(GeneratedField::Extension), "unknownColumn" | "unknown_column" => Ok(GeneratedField::UnknownColumn), "hashExpr" | "hash_expr" => Ok(GeneratedField::HashExpr), + "dynamicFilter" | "dynamic_filter" => Ok(GeneratedField::DynamicFilter), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -16596,6 +16791,13 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { return Err(serde::de::Error::duplicate_field("hashExpr")); } expr_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::HashExpr) +; + } + GeneratedField::DynamicFilter => { + if expr_type__.is_some() { + return Err(serde::de::Error::duplicate_field("dynamicFilter")); + } + expr_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::DynamicFilter) ; } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index a0d4ef9e973c4..9a512232f06fc 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1294,7 +1294,7 @@ pub struct PhysicalExprNode { pub expr_id: ::core::option::Option, #[prost( oneof = "physical_expr_node::ExprType", - tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 18, 19, 20, 21" + tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 18, 19, 20, 21, 23" )] pub expr_type: ::core::option::Option, } @@ -1347,8 +1347,38 @@ pub mod physical_expr_node { UnknownColumn(super::UnknownColumn), #[prost(message, tag = "21")] HashExpr(super::PhysicalHashExprNode), + /// Ported from apache/datafusion#21807 (minimal subset). Allows a + /// DynamicFilterPhysicalExpr to roundtrip through proto with its wrapper + /// intact, so the receiver (combined with DeduplicatingDeserializer) + /// can rebuild a shared Arc across e.g. SortExec.filter and the + /// FileScan predicate pushed down from it. + /// + /// Field 22 is intentionally skipped to match upstream's field number + /// for a future scalar_subquery variant atlas doesn't need yet. + #[prost(message, tag = "23")] + DynamicFilter(::prost::alloc::boxed::Box), } } +/// Ported from apache/datafusion#21807. See `dynamic_filter = 23` above. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PhysicalDynamicFilterNode { + #[prost(message, repeated, tag = "1")] + pub children: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag = "2")] + pub remapped_children: ::prost::alloc::vec::Vec, + #[prost(uint64, tag = "3")] + pub generation: u64, + #[prost(message, optional, boxed, tag = "4")] + pub inner_expr: ::core::option::Option<::prost::alloc::boxed::Box>, + #[prost(bool, tag = "5")] + pub is_complete: bool, + /// Stable identity of this dynamic filter. References with the same id + /// MUST deserialize to the same Arc so heap-max + /// updates propagate from SortExec to the FileScan predicate it was + /// pushed down to. + #[prost(uint64, tag = "6")] + pub expression_id: u64, +} #[derive(Clone, PartialEq, ::prost::Message)] pub struct PhysicalScalarUdfNode { #[prost(string, tag = "1")] diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index e424be162648b..b3c0e26060963 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -38,6 +38,9 @@ use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_execution::{FunctionRegistry, TaskContext}; use datafusion_expr::WindowFunctionDefinition; use datafusion_expr::dml::InsertOp; +use datafusion_physical_expr::expressions::{ + DynamicFilterInner, DynamicFilterPhysicalExpr, +}; use datafusion_physical_expr::projection::{ProjectionExpr, ProjectionExprs}; use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr, ScalarFunctionExpr}; use datafusion_physical_plan::expressions::{ @@ -505,6 +508,62 @@ pub fn parse_physical_expr_with_converter( .collect::>()?; codec.try_decode_expr(extension.expr.as_slice(), &inputs)? as _ } + ExprType::DynamicFilter(df_proto) => { + // Reconstruct the DynamicFilterPhysicalExpr wrapper. When this + // path is reached through a `DeduplicatingDeserializer`, + // identical expression_id references in the same plan share + // the resulting Arc via the deserializer's cache, so + // SortExec heap-max updates propagate to the FileScan predicate + // it was pushed down to. + // + // Ported from upstream apache/datafusion#21807 (minimal subset). + let inner_expr = match df_proto.inner_expr.as_deref() { + Some(p) => proto_converter + .proto_to_physical_expr(p, ctx, input_schema, codec)?, + None => { + return Err(proto_error( + "PhysicalDynamicFilterNode missing inner_expr", + )); + } + }; + let children = df_proto + .children + .iter() + .map(|c| { + proto_converter + .proto_to_physical_expr(c, ctx, input_schema, codec) + }) + .collect::>>()?; + let remapped_children = if df_proto.remapped_children.is_empty() { + None + } else { + Some( + df_proto + .remapped_children + .iter() + .map(|c| { + proto_converter.proto_to_physical_expr( + c, + ctx, + input_schema, + codec, + ) + }) + .collect::>>()?, + ) + }; + let inner = DynamicFilterInner { + expression_id: df_proto.expression_id, + generation: df_proto.generation, + expr: inner_expr, + is_complete: df_proto.is_complete, + }; + Arc::new(DynamicFilterPhysicalExpr::from_parts( + children, + remapped_children, + inner, + )) + } }; Ok(pexpr) diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 85406e31da614..0aa57be8bc99d 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -3821,21 +3821,32 @@ impl PhysicalProtoConverterExtension for DefaultPhysicalProtoConverter { } } -/// Internal serializer that adds expr_id to expressions. -/// Created fresh for each serialization operation. -struct DeduplicatingSerializer { +/// Serializer that adds an Arc-identity-derived expr_id to every emitted +/// `PhysicalExprNode`. Created fresh for each serialization operation. +/// +/// Made `pub` so atlas's coordinator-side codec can opt into Arc-identity +/// dedup. Combined with the apache/datafusion#21807 minimal port that +/// keeps `DynamicFilterPhysicalExpr` alive across the wire, this is what +/// lets atlas retire the post-deserialize walker (X-2935). +pub struct DeduplicatingSerializer { /// Random salt combined with pointer addresses and process ID to create globally unique expr_ids. session_id: u64, } impl DeduplicatingSerializer { - fn new() -> Self { + pub fn new() -> Self { Self { session_id: rand::random(), } } } +impl Default for DeduplicatingSerializer { + fn default() -> Self { + Self::new() + } +} + impl PhysicalProtoConverterExtension for DeduplicatingSerializer { fn proto_to_execution_plan( &self, @@ -3895,14 +3906,24 @@ impl PhysicalProtoConverterExtension for DeduplicatingSerializer { } } -/// Internal deserializer that caches expressions by expr_id. -/// Created fresh for each deserialization operation. +/// Deserializer that caches expressions by `expr_id` so multiple references +/// in a plan reconstruct to the same `Arc`. Created fresh for each +/// deserialization operation. +/// +/// Made `pub` for atlas's data-server-side codec; see +/// [`DeduplicatingSerializer`]. #[derive(Default)] -struct DeduplicatingDeserializer { +pub struct DeduplicatingDeserializer { /// Cache mapping expr_id to deserialized expressions. cache: RefCell>>, } +impl DeduplicatingDeserializer { + pub fn new() -> Self { + Self::default() + } +} + impl PhysicalProtoConverterExtension for DeduplicatingDeserializer { fn proto_to_execution_plan( &self, diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index de2f36e81e3ba..e722a8c72dcb9 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -32,6 +32,7 @@ use datafusion_datasource_json::file_format::JsonSink; use datafusion_datasource_parquet::file_format::ParquetSink; use datafusion_expr::WindowFrame; use datafusion_physical_expr::ScalarFunctionExpr; +use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr}; use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; @@ -252,11 +253,76 @@ pub fn serialize_physical_expr( /// serialization of udfs requiring specialized serialization (see [`PhysicalExtensionCodec::try_encode_udf`]). /// A [`PhysicalProtoConverterExtension`] can be provided to handle the /// conversion process (see [`PhysicalProtoConverterExtension::physical_expr_to_proto`]). +/// Serialize a `DynamicFilterPhysicalExpr` as a `PhysicalDynamicFilterNode` +/// without snapshotting it away. Children are routed through `proto_converter` +/// so a `DeduplicatingSerializer` wrapping this call captures Arc identity +/// for nested expressions too. +/// +/// Ported from upstream apache/datafusion#21807 (minimal subset). +fn serialize_dynamic_filter( + value: &Arc, + codec: &dyn PhysicalExtensionCodec, + proto_converter: &dyn PhysicalProtoConverterExtension, +) -> Result { + let df = value + .as_any() + .downcast_ref::() + .expect("caller already checked downcast"); + + let children = df + .original_children() + .iter() + .map(|child| proto_converter.physical_expr_to_proto(child, codec)) + .collect::>>()?; + let remapped_children = if let Some(remapped) = df.remapped_children() { + remapped + .iter() + .map(|child| proto_converter.physical_expr_to_proto(child, codec)) + .collect::>>()? + } else { + vec![] + }; + // Atomic snapshot of inner state; carry the raw expr (not its snapshot) + // so the receiver gets the same logical filter we hold. + let inner = df.inner(); + let inner_expr = + Box::new(proto_converter.physical_expr_to_proto(&inner.expr, codec)?); + + Ok(protobuf::PhysicalExprNode { + expr_id: None, + expr_type: Some(protobuf::physical_expr_node::ExprType::DynamicFilter( + Box::new(protobuf::PhysicalDynamicFilterNode { + children, + remapped_children, + generation: inner.generation, + inner_expr: Some(inner_expr), + is_complete: inner.is_complete, + expression_id: inner.expression_id, + }), + )), + }) +} + pub fn serialize_physical_expr_with_converter( value: &Arc, codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result { + // Special case ported from upstream apache/datafusion#21807: a + // DynamicFilterPhysicalExpr must NOT be snapshotted before + // serialization. Snapshotting collapses it to its current inner + // expression (typically lit(true) before TopK fills its heap) and + // discards the wrapper. Without the wrapper the receiver can't + // reconstruct shared Arc identity across SortExec.filter and + // the FileScan predicate it was pushed down into. + if value + .as_any() + .downcast_ref::() + .is_some() + { + return serialize_dynamic_filter(value, codec, proto_converter); + } + // Snapshot the expr in case it has dynamic predicate state so // it can be serialized let value = snapshot_physical_expr(Arc::clone(value))?; diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 66ca903e4ec8a..9a67650deb5e2 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -3167,3 +3167,97 @@ fn roundtrip_lead_with_default_value() -> Result<()> { true, )?)) } + +/// Validates the apache/datafusion#21807 minimal port: a +/// `DynamicFilterPhysicalExpr` survives proto roundtrip (the wrapper is no +/// longer snapshotted away), and the existing `DeduplicatingSerializer` + +/// `DeduplicatingDeserializer` pair share a single `Arc` for two refs of +/// the same logical filter. This is what lets atlas drop the post-decode +/// walker (X-2935). +#[test] +fn dynamic_filter_dedup_with_deduplicating_codec() -> Result<()> { + use datafusion::physical_plan::expressions::Column; + use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; + use datafusion_proto::physical_plan::{ + DeduplicatingDeserializer, DeduplicatingSerializer, + PhysicalProtoConverterExtension, + }; + + let schema = Schema::new(vec![Field::new("a", DataType::Int64, false)]); + let initial: Arc = lit(true); + let children: Vec> = vec![Arc::new(Column::new("a", 0))]; + let df = Arc::new(DynamicFilterPhysicalExpr::new(children, Arc::clone(&initial))); + let id_before = df.inner().expression_id; + + let codec = DefaultPhysicalExtensionCodec {}; + let serializer = DeduplicatingSerializer::new(); + let proto1 = + serializer.physical_expr_to_proto(&(df.clone() as Arc), &codec)?; + let proto2 = + serializer.physical_expr_to_proto(&(df.clone() as Arc), &codec)?; + + assert_eq!( + proto1.expr_id, proto2.expr_id, + "two refs to the same Arc must get the same expr_id" + ); + + let ctx = SessionContext::new().task_ctx(); + let deserializer = DeduplicatingDeserializer::new(); + let d1 = deserializer.proto_to_physical_expr(&proto1, &ctx, &schema, &codec)?; + let d2 = deserializer.proto_to_physical_expr(&proto2, &ctx, &schema, &codec)?; + + assert!( + Arc::ptr_eq(&d1, &d2), + "DeduplicatingDeserializer must return the same Arc for two refs with the same expr_id" + ); + + let d1_df = d1 + .as_any() + .downcast_ref::() + .expect("decoded expr must be DynamicFilterPhysicalExpr; snapshot path is bypassed"); + assert_eq!( + d1_df.inner().expression_id, + id_before, + "expression_id must survive proto roundtrip" + ); + + Ok(()) +} + +/// Without the deduplicating codec, two decodes still both reconstruct the +/// wrapper (no snapshotting) but get distinct Arcs. Guards the invariant +/// that the wire format itself is dedup-agnostic; dedup is the codec's +/// job, not the proto layer's. +#[test] +fn dynamic_filter_roundtrip_without_dedup() -> Result<()> { + use datafusion::physical_plan::expressions::Column; + use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; + use datafusion_proto::physical_plan::from_proto::parse_physical_expr; + use datafusion_proto::physical_plan::to_proto::serialize_physical_expr; + + let schema = Schema::new(vec![Field::new("a", DataType::Int64, false)]); + let initial: Arc = lit(true); + let children: Vec> = vec![Arc::new(Column::new("a", 0))]; + let df = Arc::new(DynamicFilterPhysicalExpr::new(children, Arc::clone(&initial))); + let id_before = df.inner().expression_id; + + let codec = DefaultPhysicalExtensionCodec {}; + let proto = serialize_physical_expr(&(df.clone() as Arc), &codec)?; + let ctx = SessionContext::new().task_ctx(); + + let d1 = parse_physical_expr(&proto, &ctx, &schema, &codec)?; + let d2 = parse_physical_expr(&proto, &ctx, &schema, &codec)?; + + // Default codec produces distinct Arcs, but each is still a + // DynamicFilterPhysicalExpr (the wrapper survived). + assert!(!Arc::ptr_eq(&d1, &d2)); + for d in [&d1, &d2] { + let inner = d + .as_any() + .downcast_ref::() + .expect("decoded expr must be DynamicFilterPhysicalExpr") + .inner(); + assert_eq!(inner.expression_id, id_before); + } + Ok(()) +} From 3eed0c360b685aef5a7aa35273da6c4ada6a92aa Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Thu, 11 Jun 2026 21:51:42 +0800 Subject: [PATCH 2/4] [X-2935] dedupe DynamicFilter by expression_id, not Arc::as_ptr Mirrors apache/datafusion#21807 step 3-4: switch DeduplicatingSerializer from hashing Arc::as_ptr to hashing the expression's expression_id when available. Falls back to Arc::as_ptr for expressions without a stable id. This fixes the case FilterPushdown produces in practice: pushing a DynamicFilterPhysicalExpr from a SortExec down to a FileScan creates a new outer Arc (via with_new_children to remap columns) that still shares the same Arc>. Without this change, the two outer Arcs hash to different addresses and DeduplicatingDeserializer rebuilds them as two distinct Arcs on decode -- exactly the X-2935 walker's bug. Adds a new test dynamic_filter_dedup_distinct_outer_arcs_same_inner that constructs the with_new_children case and asserts dedup works. 150/150 proto integration tests pass. --- datafusion/proto/src/physical_plan/mod.rs | 32 +++++++-- .../tests/cases/roundtrip_physical_plan.rs | 72 +++++++++++++++++++ 2 files changed, 99 insertions(+), 5 deletions(-) diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 0aa57be8bc99d..833ce01e19826 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -3892,13 +3892,35 @@ impl PhysicalProtoConverterExtension for DeduplicatingSerializer { ) -> Result { let mut proto = serialize_physical_expr_with_converter(expr, codec, self)?; - // Hash session_id, pointer address, and process ID together to create expr_id. - // - session_id: random per serializer, prevents collisions when merging serializations - // - ptr: unique address per Arc within a process - // - pid: prevents collisions if serializer is shared across processes + // Pick the dedup identity for this expression. + // + // 1. If the expression reports a stable `expression_id()` (currently + // `DynamicFilterPhysicalExpr` does), use that. This is preserved + // across `with_new_children`, so two outer Arcs that share the + // same `Inner` (e.g. SortExec.filter and the FileScan predicate + // that `FilterPushdown` clones from it) still hash to the same + // expr_id and reconstruct to one Arc on decode. + // Mirrors upstream apache/datafusion#21807 behavior. + // 2. Otherwise fall back to `Arc::as_ptr` for plain expressions + // where Arc identity is the only sharing signal we have. + // + // session_id + pid are mixed in either way to avoid collisions if + // serializations from different processes are concatenated. let mut hasher = DefaultHasher::new(); self.session_id.hash(&mut hasher); - (Arc::as_ptr(expr) as *const () as u64).hash(&mut hasher); + match expr.expression_id() { + Some(eid) => { + // Tag the identity space so a 0 expression_id can't collide + // with the 0 address fallback. Tag value is arbitrary, just + // must differ between the two branches. + 0u8.hash(&mut hasher); + eid.hash(&mut hasher); + } + None => { + 1u8.hash(&mut hasher); + (Arc::as_ptr(expr) as *const () as u64).hash(&mut hasher); + } + } std::process::id().hash(&mut hasher); proto.expr_id = Some(hasher.finish()); diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 9a67650deb5e2..c134284de3a35 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -3224,6 +3224,78 @@ fn dynamic_filter_dedup_with_deduplicating_codec() -> Result<()> { Ok(()) } +/// Two distinct outer Arcs that share the same `Inner` (e.g. via +/// `with_new_children`) must still dedup to the same decoded Arc, because +/// `DeduplicatingSerializer` now hashes on `expression_id` (which is +/// preserved across `with_new_children`) rather than `Arc::as_ptr`. +/// +/// This is the case `FilterPushdown` produces: it clones the SortExec's +/// dyn filter and pushes it down to the FileScan; the FileScan's outer +/// Arc may differ from the SortExec's, but they share the same Inner. +#[test] +fn dynamic_filter_dedup_distinct_outer_arcs_same_inner() -> Result<()> { + use datafusion::physical_plan::expressions::Column; + use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; + use datafusion_proto::physical_plan::{ + DeduplicatingDeserializer, DeduplicatingSerializer, + PhysicalProtoConverterExtension, + }; + + let schema = Schema::new(vec![Field::new("a", DataType::Int64, false)]); + let initial: Arc = lit(true); + let col_a: Arc = Arc::new(Column::new("a", 0)); + let df_arc1: Arc = Arc::new( + DynamicFilterPhysicalExpr::new(vec![Arc::clone(&col_a)], Arc::clone(&initial)), + ); + let expected_id = df_arc1 + .as_any() + .downcast_ref::() + .unwrap() + .inner() + .expression_id; + + // Build a second outer Arc that shares df_arc1's Inner via with_new_children. + // This mimics what FilterPushdown does when pushing a dyn filter through a + // ProjectionExec: it preserves the inner state Arc, only the outer wrapper + // is new. + let df_arc2 = Arc::clone(&df_arc1).with_new_children(vec![col_a])?; + assert!( + !Arc::ptr_eq(&df_arc1, &df_arc2), + "with_new_children must yield a distinct outer Arc; otherwise this test is trivial" + ); + assert_eq!( + df_arc2 + .as_any() + .downcast_ref::() + .unwrap() + .inner() + .expression_id, + expected_id, + "expression_id must be preserved across with_new_children", + ); + + let codec = DefaultPhysicalExtensionCodec {}; + let serializer = DeduplicatingSerializer::new(); + let proto1 = serializer.physical_expr_to_proto(&df_arc1, &codec)?; + let proto2 = serializer.physical_expr_to_proto(&df_arc2, &codec)?; + assert_eq!( + proto1.expr_id, proto2.expr_id, + "DeduplicatingSerializer must hash on expression_id, NOT Arc::as_ptr, \ + so two outer Arcs sharing the same Inner get the same wire expr_id", + ); + + let ctx = SessionContext::new().task_ctx(); + let deserializer = DeduplicatingDeserializer::new(); + let d1 = deserializer.proto_to_physical_expr(&proto1, &ctx, &schema, &codec)?; + let d2 = deserializer.proto_to_physical_expr(&proto2, &ctx, &schema, &codec)?; + assert!( + Arc::ptr_eq(&d1, &d2), + "Distinct-outer same-Inner refs must reconstruct to one Arc" + ); + + Ok(()) +} + /// Without the deduplicating codec, two decodes still both reconstruct the /// wrapper (no snapshotting) but get distinct Arcs. Guards the invariant /// that the wire format itself is dedup-agnostic; dedup is the codec's From 0e6d98858065b4328ab146668c7d5eeb0ed6abe7 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Thu, 11 Jun 2026 22:02:44 +0800 Subject: [PATCH 3/4] [X-2935] proto: serialize SortExec's dynamic filter end-to-end Adds the final piece needed to drop the atlas-side share_dynamic_filters walker: SortExecNode now carries its internal DynamicFilterPhysicalExpr on the wire, and try_into_sort_physical_plan re-installs it via SortExec::with_dynamic_filter_expr instead of letting with_fetch(N) mint a brand-new one via create_filter() on decode. Combined with the prior two commits in this series (#21807 minimal port + dedup-by-expression_id), the decode side now reconstructs the SAME Arc shared between the SortExec.filter and any pushed-down FileScan predicate that participated in the encode -- because DeduplicatingDeserializer's cache, keyed on the wire expr_id that DeduplicatingSerializer derives from expression_id, matches across plan nodes. Adds: - SortExec::dynamic_filter_expr() / with_dynamic_filter_expr() accessors - dynamic_filter field on SortExecNode proto (field 5) - to_proto: try_from_sort_exec serializes the dyn filter via proto_converter - from_proto: try_into_sort_physical_plan deserializes + reinstalls via with_dynamic_filter_expr; legacy fallback wraps a fresh Arc when the encoder snapshotted the wrapper away Test sort_exec_proto_roundtrip_preserves_dyn_filter_arc asserts end-to-end: serialize a SortExec with a known DynamicFilterPhysicalExpr, roundtrip via DeduplicatingSerializer + DeduplicatingDeserializer, the decoded SortExec's filter must carry the same expression_id. Ported from apache/datafusion#22011 (minimal subset, adapted to branch-53's PhysicalProtoConverterExtension signatures). 151/151 proto integration tests pass. --- datafusion/physical-plan/src/sorts/sort.rs | 27 +++++++ datafusion/proto/proto/datafusion.proto | 9 +++ datafusion/proto/src/generated/pbjson.rs | 18 +++++ datafusion/proto/src/generated/prost.rs | 9 +++ datafusion/proto/src/physical_plan/mod.rs | 64 ++++++++++++++++ .../tests/cases/roundtrip_physical_plan.rs | 74 +++++++++++++++++++ 6 files changed, 201 insertions(+) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 4ac501781db57..a61fd5c4f5f5a 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -1036,6 +1036,33 @@ impl SortExec { self.fetch } + /// Returns the dynamic filter expression for this sort (TopK), if set. + /// + /// Used by `datafusion-proto` to roundtrip the SortExec's internal + /// `DynamicFilterPhysicalExpr` so the decode side can re-share its + /// `Arc` with the pushed-down FileScan predicate + /// (apache/datafusion#22011 minimal port for branch-53). + pub fn dynamic_filter_expr(&self) -> Option> { + self.filter.as_ref().map(|f| f.read().expr()) + } + + /// Install a `DynamicFilterPhysicalExpr` as this sort's TopK dyn filter, + /// validating that its children reference columns in the input schema. + /// + /// Used by proto deserialization to restore Arc identity across the wire; + /// see [`SortExec::dynamic_filter_expr`]. + pub fn with_dynamic_filter_expr( + mut self, + filter: Arc, + ) -> Result { + let input_schema = self.input.schema(); + for child in filter.children() { + child.data_type(&input_schema)?; + } + self.filter = Some(Arc::new(RwLock::new(TopKDynamicFilters::new(filter)))); + Ok(self) + } + fn output_partitioning_helper( input: &Arc, preserve_partitioning: bool, diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 353ca9bbffc9f..de88c5e648575 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1313,6 +1313,15 @@ message SortExecNode { // Maximum number of highest/lowest rows to fetch; negative means no limit int64 fetch = 3; bool preserve_partitioning = 4; + + // Ported from apache/datafusion#22011 (minimal subset). Carries the + // SortExec's internal `DynamicFilterPhysicalExpr` so the decode side + // can install it via `with_dynamic_filter_expr` instead of letting + // `with_fetch(...).create_filter()` mint a brand-new one. Combined with + // the #21807 PhysicalDynamicFilterNode + dedup-by-expression_id, the + // resulting decoded plan re-shares the Arc with the pushed-down + // FileScan predicate (X-2935 walker becomes redundant). + PhysicalExprNode dynamic_filter = 5; } message SortPreservingMergeExecNode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index cdc0095be78a5..2f2958d98487a 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -21796,6 +21796,9 @@ impl serde::Serialize for SortExecNode { if self.preserve_partitioning { len += 1; } + if self.dynamic_filter.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.SortExecNode", len)?; if let Some(v) = self.input.as_ref() { struct_ser.serialize_field("input", v)?; @@ -21811,6 +21814,9 @@ impl serde::Serialize for SortExecNode { if self.preserve_partitioning { struct_ser.serialize_field("preservePartitioning", &self.preserve_partitioning)?; } + if let Some(v) = self.dynamic_filter.as_ref() { + struct_ser.serialize_field("dynamicFilter", v)?; + } struct_ser.end() } } @@ -21826,6 +21832,8 @@ impl<'de> serde::Deserialize<'de> for SortExecNode { "fetch", "preserve_partitioning", "preservePartitioning", + "dynamic_filter", + "dynamicFilter", ]; #[allow(clippy::enum_variant_names)] @@ -21834,6 +21842,7 @@ impl<'de> serde::Deserialize<'de> for SortExecNode { Expr, Fetch, PreservePartitioning, + DynamicFilter, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -21859,6 +21868,7 @@ impl<'de> serde::Deserialize<'de> for SortExecNode { "expr" => Ok(GeneratedField::Expr), "fetch" => Ok(GeneratedField::Fetch), "preservePartitioning" | "preserve_partitioning" => Ok(GeneratedField::PreservePartitioning), + "dynamicFilter" | "dynamic_filter" => Ok(GeneratedField::DynamicFilter), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -21882,6 +21892,7 @@ impl<'de> serde::Deserialize<'de> for SortExecNode { let mut expr__ = None; let mut fetch__ = None; let mut preserve_partitioning__ = None; + let mut dynamic_filter__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Input => { @@ -21910,6 +21921,12 @@ impl<'de> serde::Deserialize<'de> for SortExecNode { } preserve_partitioning__ = Some(map_.next_value()?); } + GeneratedField::DynamicFilter => { + if dynamic_filter__.is_some() { + return Err(serde::de::Error::duplicate_field("dynamicFilter")); + } + dynamic_filter__ = map_.next_value()?; + } } } Ok(SortExecNode { @@ -21917,6 +21934,7 @@ impl<'de> serde::Deserialize<'de> for SortExecNode { expr: expr__.unwrap_or_default(), fetch: fetch__.unwrap_or_default(), preserve_partitioning: preserve_partitioning__.unwrap_or_default(), + dynamic_filter: dynamic_filter__, }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 9a512232f06fc..7ab59f69cc983 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1950,6 +1950,15 @@ pub struct SortExecNode { pub fetch: i64, #[prost(bool, tag = "4")] pub preserve_partitioning: bool, + /// Ported from apache/datafusion#22011 (minimal subset). Carries the + /// SortExec's internal `DynamicFilterPhysicalExpr` so the decode side + /// can install it via `with_dynamic_filter_expr` instead of letting + /// `with_fetch(...).create_filter()` mint a brand-new one. Combined with + /// the #21807 PhysicalDynamicFilterNode + dedup-by-expression_id, the + /// resulting decoded plan re-shares the Arc with the pushed-down + /// FileScan predicate (X-2935 walker becomes redundant). + #[prost(message, optional, tag = "5")] + pub dynamic_filter: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct SortPreservingMergeExecNode { diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 833ce01e19826..3c97411b861c1 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::any::Any; use std::cell::RefCell; use std::collections::HashMap; use std::fmt::Debug; @@ -56,6 +57,8 @@ use datafusion_functions_table::generate_series::{ }; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; use datafusion_physical_expr::async_scalar_function::AsyncFuncExpr; +use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; +use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{LexOrdering, LexRequirement, PhysicalExprRef}; use datafusion_physical_plan::aggregates::{ AggregateExec, AggregateMode, LimitOptions, PhysicalGroupBy, @@ -1668,6 +1671,51 @@ impl protobuf::PhysicalPlanNode { .with_fetch(fetch) .with_preserve_partitioning(sort.preserve_partitioning); + // If the encoder carried the SortExec's internal dynamic filter, + // install it via `with_dynamic_filter_expr` to replace the fresh + // one minted by `with_fetch(...).create_filter()`. This is what + // makes the SortExec's filter Arc re-share its `Inner` with the + // pushed-down FileScan predicate on the decode side (via the + // `DeduplicatingDeserializer` cache keyed on `expression_id`). + // + // Ported from apache/datafusion#22011 (minimal subset). + let new_sort = if let Some(dynamic_filter_proto) = &sort.dynamic_filter { + let dynamic_filter_expr = proto_converter.proto_to_physical_expr( + dynamic_filter_proto, + ctx, + new_sort.input().schema().as_ref(), + codec, + )?; + // After the #21807 port the decoded expression IS a + // `DynamicFilterPhysicalExpr` (since serialize skips snapshot + // for this type), so the downcast succeeds and Arc identity + // is preserved across the call. + let df = match (dynamic_filter_expr.clone() as Arc) + .downcast::() + { + Ok(df) => df, + Err(_) => { + // Legacy fallback: an encoder that snapshotted the + // wrapper away leaves us with a raw expression. Wrap + // it in a fresh DynamicFilterPhysicalExpr so the + // SortExec can still install it -- Arc sharing won't + // be preserved in this branch, matching pre-#21807 + // behavior. + let children = collect_columns(&dynamic_filter_expr) + .into_iter() + .map(|c| Arc::new(c) as Arc) + .collect::>(); + Arc::new(DynamicFilterPhysicalExpr::new( + children, + dynamic_filter_expr, + )) + } + }; + new_sort.with_dynamic_filter_expr(df)? + } else { + new_sort + }; + Ok(Arc::new(new_sort)) } @@ -3084,6 +3132,21 @@ impl protobuf::PhysicalPlanNode { }) }) .collect::>>()?; + // Carry the SortExec's internal TopK dynamic filter so the decoder + // can re-install it via `with_dynamic_filter_expr` instead of letting + // `with_fetch(...).create_filter()` mint a brand-new one. Combined + // with the #21807 dedup-by-expression_id machinery, this is what + // re-shares `Arc` with the pushed-down FileScan predicate + // across the proto roundtrip (X-2935). + // + // Ported from apache/datafusion#22011 (minimal subset). + let dynamic_filter = exec + .dynamic_filter_expr() + .map(|df| { + let df_expr: Arc = df as Arc; + proto_converter.physical_expr_to_proto(&df_expr, codec) + }) + .transpose()?; Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::Sort(Box::new( protobuf::SortExecNode { @@ -3094,6 +3157,7 @@ impl protobuf::PhysicalPlanNode { _ => -1, }, preserve_partitioning: exec.preserve_partitioning(), + dynamic_filter, }, ))), }) diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index c134284de3a35..27e2f4a402108 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -3333,3 +3333,77 @@ fn dynamic_filter_roundtrip_without_dedup() -> Result<()> { } Ok(()) } + +/// Validates the apache/datafusion#22011 minimal port: +/// `SortExecNode` now carries its internal `DynamicFilterPhysicalExpr` on +/// the wire, and `try_into_sort_physical_plan` re-installs it via +/// `with_dynamic_filter_expr` instead of letting `with_fetch(...).create_filter()` +/// mint a fresh one. +/// +/// With this in place, the SortExec's filter and any other reference to the +/// same logical filter (e.g. a FileScan predicate after FilterPushdown) +/// share the SAME `Arc` post-decode via the +/// `DeduplicatingDeserializer` cache keyed on `expression_id`. +#[test] +fn sort_exec_proto_roundtrip_preserves_dyn_filter_arc() -> Result<()> { + use datafusion::physical_plan::expressions::Column; + use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; + use datafusion_proto::physical_plan::{ + DeduplicatingDeserializer, DeduplicatingSerializer, + DefaultPhysicalExtensionCodec, PhysicalProtoConverterExtension, + }; + use datafusion_proto::protobuf::PhysicalPlanNode; + use prost::Message; + + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + let initial: Arc = lit(true); + let children: Vec> = vec![Arc::new(Column::new("a", 0))]; + let df = Arc::new(DynamicFilterPhysicalExpr::new( + children, + Arc::clone(&initial), + )); + let expected_id = df.inner().expression_id; + + let ordering = LexOrdering::new(vec![PhysicalSortExpr { + expr: Arc::new(Column::new("a", 0)), + options: SortOptions { + descending: false, + nulls_first: false, + }, + }]) + .unwrap(); + let input: Arc = Arc::new(EmptyExec::new(Arc::clone(&schema))); + let sort = Arc::new( + SortExec::new(ordering, input) + .with_fetch(Some(10)) + .with_dynamic_filter_expr(Arc::clone(&df))?, + ) as Arc; + + let codec = DefaultPhysicalExtensionCodec {}; + let serializer = DeduplicatingSerializer::new(); + let proto = serializer.execution_plan_to_proto(&sort, &codec)?; + let buf = proto.encode_to_vec(); + let decoded_proto = PhysicalPlanNode::decode(buf.as_slice()).map_err(|e| { + datafusion_common::DataFusionError::Internal(format!("decode failed: {e}")) + })?; + + let ctx = SessionContext::new().task_ctx(); + let deserializer = DeduplicatingDeserializer::new(); + let decoded = deserializer.proto_to_execution_plan(&ctx, &codec, &decoded_proto)?; + + let decoded_sort = decoded + .as_any() + .downcast_ref::() + .expect("decoded plan must be SortExec"); + let decoded_df = decoded_sort + .dynamic_filter_expr() + .expect("decoded SortExec must have a dynamic filter installed via with_dynamic_filter_expr"); + + assert_eq!( + decoded_df.inner().expression_id, + expected_id, + "SortExec proto must carry expression_id end-to-end so the decoder \ + can re-share Arc with the FileScan predicate via the dedup cache", + ); + Ok(()) +} From 42342fbc1027121d99af89fdc56961f6cdcf5668 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Thu, 11 Jun 2026 22:29:29 +0800 Subject: [PATCH 4/4] [X-2935] PhysicalExtensionCodec: thread proto_converter through methods Modifies the PhysicalExtensionCodec trait so that try_encode, try_decode, try_encode_expr, and try_decode_expr receive the active PhysicalProtoConverterExtension. Codecs that embed nested PhysicalExprNode fields inside their custom proto (e.g. atlas's ReverseParquetNode.predicate) must now route those through proto_converter.proto_to_physical_expr / proto_converter.physical_expr_to_proto so the dedup cache extends across the extension boundary. Without this plumbing, an outer DeduplicatingSerializer can't reach into extension nodes -- the dedup cache stops at the extension boundary and Arc sharing breaks for any expression carried inside an extension codec, even though the wire format already supports it via the expr_id field. With this change, atlas can finally drop the share_dynamic_filters walker once its codecs override the new methods. This is the breaking-change pattern matching the apache/datafusion upstream design (issue #22920) instead of additive _with_converter variants -- branch-53 is internal-only so callers can be updated in lockstep. --- .../adapter_serialization.rs | 2 + .../proto/composed_extension_codec.rs | 17 +++++- .../proto/expression_deduplication.rs | 2 + .../ffi/src/proto/physical_extension_codec.rs | 42 +++++++++++--- .../physical-expr/src/expressions/mod.rs | 4 +- datafusion/proto/proto/datafusion.proto | 6 +- datafusion/proto/src/generated/prost.rs | 6 +- .../proto/src/physical_plan/from_proto.rs | 15 +++-- datafusion/proto/src/physical_plan/mod.rs | 55 ++++++++++++++++--- .../proto/src/physical_plan/to_proto.rs | 7 ++- .../tests/cases/roundtrip_physical_plan.rs | 35 ++++++++---- 11 files changed, 149 insertions(+), 42 deletions(-) diff --git a/datafusion-examples/examples/custom_data_source/adapter_serialization.rs b/datafusion-examples/examples/custom_data_source/adapter_serialization.rs index a2cd187fee067..80a10ecf65f13 100644 --- a/datafusion-examples/examples/custom_data_source/adapter_serialization.rs +++ b/datafusion-examples/examples/custom_data_source/adapter_serialization.rs @@ -274,6 +274,7 @@ impl PhysicalExtensionCodec for AdapterPreservingCodec { buf: &[u8], inputs: &[Arc], _ctx: &TaskContext, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { // Try to parse as our extension payload if let Ok(payload) = serde_json::from_slice::(buf) @@ -302,6 +303,7 @@ impl PhysicalExtensionCodec for AdapterPreservingCodec { &self, _node: Arc, _buf: &mut Vec, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result<()> { // We don't need this for the example - we use serialize_physical_plan instead not_impl_err!( diff --git a/datafusion-examples/examples/proto/composed_extension_codec.rs b/datafusion-examples/examples/proto/composed_extension_codec.rs index b4f3d4f098996..cd577d209d471 100644 --- a/datafusion-examples/examples/proto/composed_extension_codec.rs +++ b/datafusion-examples/examples/proto/composed_extension_codec.rs @@ -43,6 +43,7 @@ use datafusion::physical_plan::{DisplayAs, ExecutionPlan}; use datafusion::prelude::SessionContext; use datafusion_proto::physical_plan::{ AsExecutionPlan, ComposedPhysicalExtensionCodec, PhysicalExtensionCodec, + PhysicalProtoConverterExtension, }; use datafusion_proto::protobuf; @@ -140,6 +141,7 @@ impl PhysicalExtensionCodec for ParentPhysicalExtensionCodec { buf: &[u8], inputs: &[Arc], _ctx: &TaskContext, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { if buf == "ParentExec".as_bytes() { Ok(Arc::new(ParentExec { @@ -150,7 +152,12 @@ impl PhysicalExtensionCodec for ParentPhysicalExtensionCodec { } } - fn try_encode(&self, node: Arc, buf: &mut Vec) -> Result<()> { + fn try_encode( + &self, + node: Arc, + buf: &mut Vec, + _proto_converter: &dyn PhysicalProtoConverterExtension, + ) -> Result<()> { if node.as_any().downcast_ref::().is_some() { buf.extend_from_slice("ParentExec".as_bytes()); Ok(()) @@ -216,6 +223,7 @@ impl PhysicalExtensionCodec for ChildPhysicalExtensionCodec { buf: &[u8], _inputs: &[Arc], _ctx: &TaskContext, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { if buf == "ChildExec".as_bytes() { Ok(Arc::new(ChildExec {})) @@ -224,7 +232,12 @@ impl PhysicalExtensionCodec for ChildPhysicalExtensionCodec { } } - fn try_encode(&self, node: Arc, buf: &mut Vec) -> Result<()> { + fn try_encode( + &self, + node: Arc, + buf: &mut Vec, + _proto_converter: &dyn PhysicalProtoConverterExtension, + ) -> Result<()> { if node.as_any().downcast_ref::().is_some() { buf.extend_from_slice("ChildExec".as_bytes()); Ok(()) diff --git a/datafusion-examples/examples/proto/expression_deduplication.rs b/datafusion-examples/examples/proto/expression_deduplication.rs index 0dec807f8043a..a591f41d8682a 100644 --- a/datafusion-examples/examples/proto/expression_deduplication.rs +++ b/datafusion-examples/examples/proto/expression_deduplication.rs @@ -187,6 +187,7 @@ impl PhysicalExtensionCodec for CachingCodec { _buf: &[u8], _inputs: &[Arc], _ctx: &TaskContext, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { datafusion::common::not_impl_err!("No custom extension nodes") } @@ -196,6 +197,7 @@ impl PhysicalExtensionCodec for CachingCodec { &self, _node: Arc, _buf: &mut Vec, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result<()> { datafusion::common::not_impl_err!("No custom extension nodes") } diff --git a/datafusion/ffi/src/proto/physical_extension_codec.rs b/datafusion/ffi/src/proto/physical_extension_codec.rs index 0577e72366478..80565562e292c 100644 --- a/datafusion/ffi/src/proto/physical_extension_codec.rs +++ b/datafusion/ffi/src/proto/physical_extension_codec.rs @@ -141,8 +141,16 @@ unsafe extern "C" fn try_decode_fn_wrapper( .collect::>>(); let inputs = rresult_return!(inputs); - let plan = - rresult_return!(codec.try_decode(buf.as_ref(), &inputs, task_ctx.as_ref())); + // FFI boundary doesn't propagate the active PhysicalProtoConverterExtension, + // so dedup of nested expressions stops here (matches pre-#21807 behavior). + let proto_converter = + datafusion_proto::physical_plan::DefaultPhysicalProtoConverter {}; + let plan = rresult_return!(codec.try_decode( + buf.as_ref(), + &inputs, + task_ctx.as_ref(), + &proto_converter, + )); RResult::ROk(FFI_ExecutionPlan::new(plan, None)) } @@ -156,7 +164,11 @@ unsafe extern "C" fn try_encode_fn_wrapper( let plan: Arc = rresult_return!((&node).try_into()); let mut bytes = Vec::new(); - rresult_return!(codec.try_encode(plan, &mut bytes)); + // FFI boundary doesn't propagate the active PhysicalProtoConverterExtension, + // so dedup of nested expressions stops here (matches pre-#21807 behavior). + let proto_converter = + datafusion_proto::physical_plan::DefaultPhysicalProtoConverter {}; + rresult_return!(codec.try_encode(plan, &mut bytes, &proto_converter)); RResult::ROk(bytes.into()) } @@ -327,6 +339,7 @@ impl PhysicalExtensionCodec for ForeignPhysicalExtensionCodec { buf: &[u8], inputs: &[Arc], _ctx: &TaskContext, + _proto_converter: &dyn datafusion_proto::physical_plan::PhysicalProtoConverterExtension, ) -> Result> { let inputs = inputs .iter() @@ -340,7 +353,12 @@ impl PhysicalExtensionCodec for ForeignPhysicalExtensionCodec { Ok(plan) } - fn try_encode(&self, node: Arc, buf: &mut Vec) -> Result<()> { + fn try_encode( + &self, + node: Arc, + buf: &mut Vec, + _proto_converter: &dyn datafusion_proto::physical_plan::PhysicalProtoConverterExtension, + ) -> Result<()> { let plan = FFI_ExecutionPlan::new(node, None); let bytes = df_result!(unsafe { (self.0.try_encode)(&self.0, plan) })?; @@ -441,6 +459,7 @@ pub(crate) mod tests { buf: &[u8], _inputs: &[Arc], _ctx: &TaskContext, + _proto_converter: &dyn datafusion_proto::physical_plan::PhysicalProtoConverterExtension, ) -> Result> { if buf[0] != Self::MAGIC_NUMBER { return exec_err!( @@ -459,6 +478,7 @@ pub(crate) mod tests { &self, node: Arc, buf: &mut Vec, + _proto_converter: &dyn datafusion_proto::physical_plan::PhysicalProtoConverterExtension, ) -> Result<()> { buf.push(Self::MAGIC_NUMBER); @@ -579,10 +599,16 @@ pub(crate) mod tests { let exec = create_test_exec(); let input_execs = [create_test_exec()]; let mut bytes = Vec::new(); - foreign_codec.try_encode(Arc::clone(&exec), &mut bytes)?; - - let returned_exec = - foreign_codec.try_decode(&bytes, &input_execs, ctx.task_ctx().as_ref())?; + let proto_converter = + datafusion_proto::physical_plan::DefaultPhysicalProtoConverter {}; + foreign_codec.try_encode(Arc::clone(&exec), &mut bytes, &proto_converter)?; + + let returned_exec = foreign_codec.try_decode( + &bytes, + &input_execs, + ctx.task_ctx().as_ref(), + &proto_converter, + )?; assert!(returned_exec.as_any().is::()); diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index b4f319cd61009..684874acc308e 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -45,9 +45,7 @@ pub use cast::{CastExpr, cast}; pub use cast_column::CastColumnExpr; pub use column::{Column, col, with_new_schema}; pub use datafusion_expr::utils::format_state_name; -pub use dynamic_filters::{ - DynamicFilterPhysicalExpr, Inner as DynamicFilterInner, -}; +pub use dynamic_filters::{DynamicFilterPhysicalExpr, Inner as DynamicFilterInner}; pub use in_list::{InListExpr, in_list}; pub use is_not_null::{IsNotNullExpr, is_not_null}; pub use is_null::{IsNullExpr, is_null}; diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index de88c5e648575..99c4205375468 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -901,7 +901,7 @@ message PhysicalExprNode { // Ported from apache/datafusion#21807 (minimal subset). Allows a // DynamicFilterPhysicalExpr to roundtrip through proto with its wrapper // intact, so the receiver (combined with DeduplicatingDeserializer) - // can rebuild a shared Arc across e.g. SortExec.filter and the + // can rebuild a shared `Arc` across e.g. SortExec.filter and the // FileScan predicate pushed down from it. // // Field 22 is intentionally skipped to match upstream's field number @@ -918,7 +918,7 @@ message PhysicalDynamicFilterNode { PhysicalExprNode inner_expr = 4; bool is_complete = 5; // Stable identity of this dynamic filter. References with the same id - // MUST deserialize to the same Arc so heap-max + // MUST deserialize to the same `Arc` so heap-max // updates propagate from SortExec to the FileScan predicate it was // pushed down to. uint64 expression_id = 6; @@ -1319,7 +1319,7 @@ message SortExecNode { // can install it via `with_dynamic_filter_expr` instead of letting // `with_fetch(...).create_filter()` mint a brand-new one. Combined with // the #21807 PhysicalDynamicFilterNode + dedup-by-expression_id, the - // resulting decoded plan re-shares the Arc with the pushed-down + // resulting decoded plan re-shares the `Arc` with the pushed-down // FileScan predicate (X-2935 walker becomes redundant). PhysicalExprNode dynamic_filter = 5; } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 7ab59f69cc983..3ff96dc8dcdcf 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1350,7 +1350,7 @@ pub mod physical_expr_node { /// Ported from apache/datafusion#21807 (minimal subset). Allows a /// DynamicFilterPhysicalExpr to roundtrip through proto with its wrapper /// intact, so the receiver (combined with DeduplicatingDeserializer) - /// can rebuild a shared Arc across e.g. SortExec.filter and the + /// can rebuild a shared `Arc` across e.g. SortExec.filter and the /// FileScan predicate pushed down from it. /// /// Field 22 is intentionally skipped to match upstream's field number @@ -1373,7 +1373,7 @@ pub struct PhysicalDynamicFilterNode { #[prost(bool, tag = "5")] pub is_complete: bool, /// Stable identity of this dynamic filter. References with the same id - /// MUST deserialize to the same Arc so heap-max + /// MUST deserialize to the same `Arc` so heap-max /// updates propagate from SortExec to the FileScan predicate it was /// pushed down to. #[prost(uint64, tag = "6")] @@ -1955,7 +1955,7 @@ pub struct SortExecNode { /// can install it via `with_dynamic_filter_expr` instead of letting /// `with_fetch(...).create_filter()` mint a brand-new one. Combined with /// the #21807 PhysicalDynamicFilterNode + dedup-by-expression_id, the - /// resulting decoded plan re-shares the Arc with the pushed-down + /// resulting decoded plan re-shares the `Arc` with the pushed-down /// FileScan predicate (X-2935 walker becomes redundant). #[prost(message, optional, tag = "5")] pub dynamic_filter: ::core::option::Option, diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index b3c0e26060963..6e6ab84307191 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -506,7 +506,12 @@ pub fn parse_physical_expr_with_converter( proto_converter.proto_to_physical_expr(e, ctx, input_schema, codec) }) .collect::>()?; - codec.try_decode_expr(extension.expr.as_slice(), &inputs)? as _ + // Use the converter-aware decode entry point so extension codecs + // that embed nested `PhysicalExprNode` fields can thread the + // active `proto_converter` into their own parsing of those + // nested expressions, sharing the dedup cache. + codec.try_decode_expr(extension.expr.as_slice(), &inputs, proto_converter)? + as _ } ExprType::DynamicFilter(df_proto) => { // Reconstruct the DynamicFilterPhysicalExpr wrapper. When this @@ -518,8 +523,9 @@ pub fn parse_physical_expr_with_converter( // // Ported from upstream apache/datafusion#21807 (minimal subset). let inner_expr = match df_proto.inner_expr.as_deref() { - Some(p) => proto_converter - .proto_to_physical_expr(p, ctx, input_schema, codec)?, + Some(p) => { + proto_converter.proto_to_physical_expr(p, ctx, input_schema, codec)? + } None => { return Err(proto_error( "PhysicalDynamicFilterNode missing inner_expr", @@ -530,8 +536,7 @@ pub fn parse_physical_expr_with_converter( .children .iter() .map(|c| { - proto_converter - .proto_to_physical_expr(c, ctx, input_schema, codec) + proto_converter.proto_to_physical_expr(c, ctx, input_schema, codec) }) .collect::>>()?; let remapped_children = if df_proto.remapped_children.is_empty() { diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 3c97411b861c1..19546d3150039 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -572,7 +572,7 @@ impl protobuf::PhysicalPlanNode { } let mut buf: Vec = vec![]; - match codec.try_encode(Arc::clone(&plan_clone), &mut buf) { + match codec.try_encode(Arc::clone(&plan_clone), &mut buf, proto_converter) { Ok(_) => { let inputs: Vec = plan_clone .children() @@ -1690,7 +1690,8 @@ impl protobuf::PhysicalPlanNode { // `DynamicFilterPhysicalExpr` (since serialize skips snapshot // for this type), so the downcast succeeds and Arc identity // is preserved across the call. - let df = match (dynamic_filter_expr.clone() as Arc) + let df = match (Arc::clone(&dynamic_filter_expr) + as Arc) .downcast::() { Ok(df) => df, @@ -1787,7 +1788,8 @@ impl protobuf::PhysicalPlanNode { .map(|i| proto_converter.proto_to_execution_plan(ctx, codec, i)) .collect::>()?; - let extension_node = codec.try_decode(extension.node.as_slice(), &inputs, ctx)?; + let extension_node = + codec.try_decode(extension.node.as_slice(), &inputs, ctx, proto_converter)?; Ok(extension_node) } @@ -3717,14 +3719,33 @@ pub trait AsExecutionPlan: Debug + Send + Sync + Clone { } pub trait PhysicalExtensionCodec: Debug + Send + Sync { + /// Decode an extension execution plan. + /// + /// `proto_converter` is the active conversion strategy (e.g. + /// `DeduplicatingDeserializer`). Codecs whose custom proto embeds + /// nested `PhysicalExprNode` fields (e.g. a predicate on a custom + /// file source) should route those through + /// `proto_converter.proto_to_physical_expr` so the dedup cache + /// extends across the extension boundary. + /// + /// Mirrors the upstream apache/datafusion pattern (#22920). fn try_decode( &self, buf: &[u8], inputs: &[Arc], ctx: &TaskContext, + proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result>; - fn try_encode(&self, node: Arc, buf: &mut Vec) -> Result<()>; + /// Encode an extension execution plan. + /// + /// See [`Self::try_decode`] for the role of `proto_converter`. + fn try_encode( + &self, + node: Arc, + buf: &mut Vec, + proto_converter: &dyn PhysicalProtoConverterExtension, + ) -> Result<()>; fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result> { not_impl_err!("PhysicalExtensionCodec is not provided for scalar function {name}") @@ -3734,18 +3755,26 @@ pub trait PhysicalExtensionCodec: Debug + Send + Sync { Ok(()) } + /// Decode an extension expression. + /// + /// See [`Self::try_decode`] for the role of `proto_converter`. fn try_decode_expr( &self, _buf: &[u8], _inputs: &[Arc], + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { not_impl_err!("PhysicalExtensionCodec is not provided") } + /// Encode an extension expression. + /// + /// See [`Self::try_decode`] for the role of `proto_converter`. fn try_encode_expr( &self, _node: &Arc, _buf: &mut Vec, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result<()> { not_impl_err!("PhysicalExtensionCodec is not provided") } @@ -3778,6 +3807,7 @@ impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec { _buf: &[u8], _inputs: &[Arc], _ctx: &TaskContext, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { not_impl_err!("PhysicalExtensionCodec is not provided") } @@ -3786,6 +3816,7 @@ impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec { &self, _node: Arc, _buf: &mut Vec, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result<()> { not_impl_err!("PhysicalExtensionCodec is not provided") } @@ -4213,12 +4244,22 @@ impl PhysicalExtensionCodec for ComposedPhysicalExtensionCodec { buf: &[u8], inputs: &[Arc], ctx: &TaskContext, + proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { - self.decode_protobuf(buf, |codec, data| codec.try_decode(data, inputs, ctx)) + self.decode_protobuf(buf, |codec, data| { + codec.try_decode(data, inputs, ctx, proto_converter) + }) } - fn try_encode(&self, node: Arc, buf: &mut Vec) -> Result<()> { - self.encode_protobuf(buf, |codec, data| codec.try_encode(Arc::clone(&node), data)) + fn try_encode( + &self, + node: Arc, + buf: &mut Vec, + proto_converter: &dyn PhysicalProtoConverterExtension, + ) -> Result<()> { + self.encode_protobuf(buf, |codec, data| { + codec.try_encode(Arc::clone(&node), data, proto_converter) + }) } fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result> { diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index e722a8c72dcb9..d9905954678fa 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -576,7 +576,12 @@ pub fn serialize_physical_expr_with_converter( }) } else { let mut buf: Vec = vec![]; - match codec.try_encode_expr(&value, &mut buf) { + // Use the converter-aware encode entry point so extension codecs + // that embed nested `PhysicalExprNode` fields can thread the + // active `proto_converter` (e.g. `DeduplicatingSerializer`) into + // their own serialization of those nested expressions, preserving + // Arc-identity dedup end-to-end. + match codec.try_encode_expr(&value, &mut buf, proto_converter) { Ok(_) => { let inputs: Vec = value .children() diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 27e2f4a402108..b3e0a3ebcf88b 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -1141,6 +1141,7 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> { _buf: &[u8], _inputs: &[Arc], _ctx: &TaskContext, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { unreachable!() } @@ -1149,6 +1150,7 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> { &self, _node: Arc, _buf: &mut Vec, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result<()> { unreachable!() } @@ -1157,6 +1159,7 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> { &self, buf: &[u8], inputs: &[Arc], + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { if buf == "CustomPredicateExpr".as_bytes() { Ok(Arc::new(CustomPredicateExpr { @@ -1171,6 +1174,7 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> { &self, node: &Arc, buf: &mut Vec, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result<()> { if node .as_any() @@ -1254,6 +1258,7 @@ impl PhysicalExtensionCodec for UDFExtensionCodec { _buf: &[u8], _inputs: &[Arc], _ctx: &TaskContext, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { not_impl_err!("No extension codec provided") } @@ -1262,6 +1267,7 @@ impl PhysicalExtensionCodec for UDFExtensionCodec { &self, _node: Arc, _buf: &mut Vec, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result<()> { not_impl_err!("No extension codec provided") } @@ -3186,15 +3192,18 @@ fn dynamic_filter_dedup_with_deduplicating_codec() -> Result<()> { let schema = Schema::new(vec![Field::new("a", DataType::Int64, false)]); let initial: Arc = lit(true); let children: Vec> = vec![Arc::new(Column::new("a", 0))]; - let df = Arc::new(DynamicFilterPhysicalExpr::new(children, Arc::clone(&initial))); + let df = Arc::new(DynamicFilterPhysicalExpr::new( + children, + Arc::clone(&initial), + )); let id_before = df.inner().expression_id; let codec = DefaultPhysicalExtensionCodec {}; let serializer = DeduplicatingSerializer::new(); - let proto1 = - serializer.physical_expr_to_proto(&(df.clone() as Arc), &codec)?; - let proto2 = - serializer.physical_expr_to_proto(&(df.clone() as Arc), &codec)?; + let proto1 = serializer + .physical_expr_to_proto(&(df.clone() as Arc), &codec)?; + let proto2 = serializer + .physical_expr_to_proto(&(df.clone() as Arc), &codec)?; assert_eq!( proto1.expr_id, proto2.expr_id, @@ -3214,7 +3223,9 @@ fn dynamic_filter_dedup_with_deduplicating_codec() -> Result<()> { let d1_df = d1 .as_any() .downcast_ref::() - .expect("decoded expr must be DynamicFilterPhysicalExpr; snapshot path is bypassed"); + .expect( + "decoded expr must be DynamicFilterPhysicalExpr; snapshot path is bypassed", + ); assert_eq!( d1_df.inner().expression_id, id_before, @@ -3244,9 +3255,10 @@ fn dynamic_filter_dedup_distinct_outer_arcs_same_inner() -> Result<()> { let schema = Schema::new(vec![Field::new("a", DataType::Int64, false)]); let initial: Arc = lit(true); let col_a: Arc = Arc::new(Column::new("a", 0)); - let df_arc1: Arc = Arc::new( - DynamicFilterPhysicalExpr::new(vec![Arc::clone(&col_a)], Arc::clone(&initial)), - ); + let df_arc1: Arc = Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::clone(&col_a)], + Arc::clone(&initial), + )); let expected_id = df_arc1 .as_any() .downcast_ref::() @@ -3310,7 +3322,10 @@ fn dynamic_filter_roundtrip_without_dedup() -> Result<()> { let schema = Schema::new(vec![Field::new("a", DataType::Int64, false)]); let initial: Arc = lit(true); let children: Vec> = vec![Arc::new(Column::new("a", 0))]; - let df = Arc::new(DynamicFilterPhysicalExpr::new(children, Arc::clone(&initial))); + let df = Arc::new(DynamicFilterPhysicalExpr::new( + children, + Arc::clone(&initial), + )); let id_before = df.inner().expression_id; let codec = DefaultPhysicalExtensionCodec {};