[X-2935] proto: dedupe DynamicFilterPhysicalExpr across extension codec boundary#60
Merged
Merged
Conversation
…al port of apache#21807) Adds expression_id to DynamicFilterPhysicalExpr Inner (atomic counter, preserved across update() and with_new_children) and a corresponding PhysicalDynamicFilterNode proto message. Serializer special-cases DynamicFilterPhysicalExpr to skip snapshot() and emit the wrapper full. Deserializer reconstructs the wrapper. Combined with the existing DeduplicatingSerializer + DeduplicatingDeserializer pair on branch-53, the wire-level Arc identity dedup now works for dynamic filters too: SortExec.filter and the FileScan predicate it was pushed down to reconstruct to the same Arc<Inner> on the data server, so heap-max updates propagate end-to-end without the atlas-side share_dynamic_filters walker. Minimal port: skips upstream's ExpressionPlacement enum, snapshot() refactor, and DedupingSerializer removal. Wire layout (field 23 oneof variant + PhysicalDynamicFilterNode fields) mirrors upstream so a future DF upgrade can drop this patch mechanically. DeduplicatingSerializer and DeduplicatingDeserializer are now pub so atlas's coordinator-side and data-server-side codecs can opt in.
Mirrors apache#21807 step 3-4: switch DeduplicatingSerializer from hashing Arc::as_ptr to hashing the expression's expression_id when available. Falls back to Arc::as_ptr for expressions without a stable id. This fixes the case FilterPushdown produces in practice: pushing a DynamicFilterPhysicalExpr from a SortExec down to a FileScan creates a new outer Arc (via with_new_children to remap columns) that still shares the same Arc<RwLock<Inner>>. Without this change, the two outer Arcs hash to different addresses and DeduplicatingDeserializer rebuilds them as two distinct Arcs on decode -- exactly the X-2935 walker's bug. Adds a new test dynamic_filter_dedup_distinct_outer_arcs_same_inner that constructs the with_new_children case and asserts dedup works. 150/150 proto integration tests pass.
Adds the final piece needed to drop the atlas-side share_dynamic_filters walker: SortExecNode now carries its internal DynamicFilterPhysicalExpr on the wire, and try_into_sort_physical_plan re-installs it via SortExec::with_dynamic_filter_expr instead of letting with_fetch(N) mint a brand-new one via create_filter() on decode. Combined with the prior two commits in this series (apache#21807 minimal port + dedup-by-expression_id), the decode side now reconstructs the SAME Arc<DynamicFilterPhysicalExpr> shared between the SortExec.filter and any pushed-down FileScan predicate that participated in the encode -- because DeduplicatingDeserializer's cache, keyed on the wire expr_id that DeduplicatingSerializer derives from expression_id, matches across plan nodes. Adds: - SortExec::dynamic_filter_expr() / with_dynamic_filter_expr() accessors - dynamic_filter field on SortExecNode proto (field 5) - to_proto: try_from_sort_exec serializes the dyn filter via proto_converter - from_proto: try_into_sort_physical_plan deserializes + reinstalls via with_dynamic_filter_expr; legacy fallback wraps a fresh Arc when the encoder snapshotted the wrapper away Test sort_exec_proto_roundtrip_preserves_dyn_filter_arc asserts end-to-end: serialize a SortExec with a known DynamicFilterPhysicalExpr, roundtrip via DeduplicatingSerializer + DeduplicatingDeserializer, the decoded SortExec's filter must carry the same expression_id. Ported from apache#22011 (minimal subset, adapted to branch-53's PhysicalProtoConverterExtension signatures). 151/151 proto integration tests pass.
There was a problem hiding this comment.
Pull request overview
This PR ports upstream DataFusion proto improvements to preserve and deduplicate DynamicFilterPhysicalExpr across protobuf roundtrips (including across extension codec boundaries), and adjusts the extension codec API so custom codecs can participate in the active (de)dup conversion pipeline.
Changes:
- Add a protobuf representation for
DynamicFilterPhysicalExpr(so it is no longer snapshotted away) and decode it back into the wrapper. - Deduplicate physical expressions during decode using
expr_id, and switch dedup identity for dynamic filters fromArc::as_ptrto stableexpression_id(). - Update
PhysicalExtensionCodecencode/decode APIs to acceptproto_converter, enabling extension codecs to thread dedup through nested plan/expr fields.
Reviewed changes
Copilot reviewed 9 out of 11 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| datafusion/proto/tests/cases/roundtrip_physical_plan.rs | Adds roundtrip/dedup tests for dynamic filters and SortExec dynamic filter persistence. |
| datafusion/proto/src/physical_plan/to_proto.rs | Adds special-case serialization for DynamicFilterPhysicalExpr (no snapshot) and threads converter into extension expr encoding. |
| datafusion/proto/src/physical_plan/mod.rs | Threads converter into extension plan encoding/decoding, adds SortExec dynamic filter proto handling, and changes PhysicalExtensionCodec trait signatures. |
| datafusion/proto/src/physical_plan/from_proto.rs | Adds converter-aware extension expr decoding and a DynamicFilterPhysicalExpr decode path. |
| datafusion/proto/src/generated/prost.rs | Updates generated prost types for new dynamic filter expr node and SortExec dynamic filter field. |
| datafusion/proto/src/generated/pbjson.rs | Updates generated JSON (de)serialization for the new messages/fields. |
| datafusion/proto/proto/datafusion.proto | Adds PhysicalDynamicFilterNode and a dynamic_filter field on SortExecNode. |
| datafusion/physical-plan/src/sorts/sort.rs | Exposes SortExec dynamic filter getter/setter for proto roundtrip. |
| datafusion/physical-expr/src/expressions/mod.rs | Re-exports DynamicFilterInner for proto reconstruction. |
| datafusion/physical-expr/src/expressions/dynamic_filters.rs | Introduces stable expression_id, exposes inner parts for proto, and adds reconstruction helpers. |
| datafusion/physical-expr-common/src/physical_expr.rs | Adds expression_id() hook on PhysicalExpr to support identity-based dedup. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
1ea3955 to
48e4075
Compare
Modifies the PhysicalExtensionCodec trait so that try_encode, try_decode, try_encode_expr, and try_decode_expr receive the active PhysicalProtoConverterExtension. Codecs that embed nested PhysicalExprNode fields inside their custom proto (e.g. atlas's ReverseParquetNode.predicate) must now route those through proto_converter.proto_to_physical_expr / proto_converter.physical_expr_to_proto so the dedup cache extends across the extension boundary. Without this plumbing, an outer DeduplicatingSerializer can't reach into extension nodes -- the dedup cache stops at the extension boundary and Arc<Inner> sharing breaks for any expression carried inside an extension codec, even though the wire format already supports it via the expr_id field. With this change, atlas can finally drop the share_dynamic_filters walker once its codecs override the new methods. This is the breaking-change pattern matching the apache/datafusion upstream design (issue apache#22920) instead of additive _with_converter variants -- branch-53 is internal-only so callers can be updated in lockstep.
48e4075 to
42342fb
Compare
zhuqi-lucas
added a commit
to massive-com/datafusion-materialized-views
that referenced
this pull request
Jun 12, 2026
Picks up the X-2935 PhysicalExtensionCodec breaking change (massive-com/arrow-datafusion#60) on branch-53. Wire format unchanged; trait signature change is the only API impact. MV doesn't implement PhysicalExtensionCodec so this bump is a pure dependency move.
zhuqi-lucas
added a commit
to massive-com/datafusion-materialized-views
that referenced
this pull request
Jun 12, 2026
Picks up the X-2935 PhysicalExtensionCodec breaking change (massive-com/arrow-datafusion#60) on branch-53. Wire format unchanged; trait signature change is the only API impact. MV doesn't implement PhysicalExtensionCodec so this bump is a pure dependency move.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Minimal port of apache#21807 + apache#22011 onto branch-53, plus a breaking-change update to the
PhysicalExtensionCodectrait so it matches the shape upstream main has today (and adds expr-level converter plumbing in advance of apache#22922). With this, atlas can drop theshare_dynamic_filterswalker because the proto roundtrip itself re-shares theArc<DynamicFilterPhysicalExpr>allocation between SortExec.filter and the FileScan predicate.Why
X-2935 reverse-prefix needs the post-FilterPushdown invariant that SortExec.filter and ReverseParquetSource.predicate point at the same
DynamicFilterPhysicalExpr::Innerto survive proto roundtrip. Today atlas runs a post-decode walker (share_dynamic_filters) to re-establish that invariant. The walker is fragile and hides the real gap: branch-53 was missing the apache#21807 dedup pipeline entirely (PhysicalExprNode.expr_id,DeduplicatingSerializer/DeduplicatingDeserializer,expression_id-based identity forDynamicFilterPhysicalExpr).Commits
47f263d1f: port proto: serialize and dedupe dynamic filters v2 apache/datafusion#21807. StampsPhysicalExprNode.expr_idon the wire and dedupesDynamicFilterPhysicalExprreferences during decode viaDeduplicatingSerializer/DeduplicatingDeserializer.3eed0c360: switch the dedup cache key fromArc::as_ptrtoexpression_id, mirroring upstream's stable identity.0e6d98858: port proto: serialize dynamic filters on Sort, Aggregate, HashJoin plan nodes apache/datafusion#22011. SortExec / AggregateExec / HashJoinExec plan codecs route their dynamic filter expr through the converter pipeline.42342fbc1: breaking change toPhysicalExtensionCodectrait.try_encode,try_decode,try_encode_expr,try_decode_exprnow all takeproto_converter: &dyn PhysicalProtoConverterExtension. The plan-level half (try_encode/try_decode) is what atlas'sMaybeReverseParquetcodec actually needs to route the nested predicate throughproto_converter.physical_expr_to_proto(this is the bit that makes X-2935 dedup work). The expr-level half (try_encode_expr/try_decode_expr) is NOT strictly required for X-2935 (atlas's predicate isDynamicFilterPhysicalExpr, which takes thetry_to_protofast-path and never reachestry_encode_expr), but it is included here so the trait shape matches upstream feat(proto): plumb PhysicalProtoConverterExtension into try_encode_expr / try_decode_expr apache/datafusion#22922 in advance, avoiding a second breaking change when we upgrade DF past 53.Empirical verification of the expr-level scope
To verify the expr-level half is forward-compat rather than load-bearing for X-2935, I cut a test branch (
qizhu/x-2935-test-without-expr-level) that reverts only thetry_encode_expr/try_decode_exprportion of commit 4 (plus internal call sites and overrides). Result: the atlas e2e testtest_dedup_converter_preserves_shared_dynamic_filter_arc_e2estill passes, all 427 atlas lib tests still pass, and atlas can still delete the walker. The expr-level plumbing is kept in this PR purely for upstream alignment.Affects
PhysicalExtensionCodectrait. Downstream impls in atlas codecs (AtlasPhysicalCodecExt,SimPhysicalCodec,NoopPhysicalExtensionCodec) are updated in the matching atlas PR. In-repo impls indatafusion-ffi,datafusion-examples, anddatafusion-protointegration tests are updated in this PR.PhysicalExprNode.expr_id(field 30) is the only addition and is fully optional, so old payloads decode unchanged.Test
test_dedup_converter_preserves_shared_dynamic_filter_arc_e2everifies SortExec.filter + ReverseParquetSource.predicate share the same Arc after roundtrip without the walker.datafusion-prototests still pass; full workspace + tests + examples build clean.Upstream tracking
PhysicalProtoConverterExtensionintoPhysicalExtensionCodecexpression-level methods apache/datafusion#22920