Skip to content

[X-2935] proto: dedupe DynamicFilterPhysicalExpr across extension codec boundary#60

Merged
zhuqi-lucas merged 4 commits into
branch-53from
qizhu/x-2935-port-21807
Jun 12, 2026
Merged

[X-2935] proto: dedupe DynamicFilterPhysicalExpr across extension codec boundary#60
zhuqi-lucas merged 4 commits into
branch-53from
qizhu/x-2935-port-21807

Conversation

@zhuqi-lucas

@zhuqi-lucas zhuqi-lucas commented Jun 12, 2026

Copy link
Copy Markdown
Collaborator

Summary

Minimal port of apache#21807 + apache#22011 onto branch-53, plus a breaking-change update to the PhysicalExtensionCodec trait so it matches the shape upstream main has today (and adds expr-level converter plumbing in advance of apache#22922). With this, atlas can drop the share_dynamic_filters walker because the proto roundtrip itself re-shares the Arc<DynamicFilterPhysicalExpr> allocation between SortExec.filter and the FileScan predicate.

Why

X-2935 reverse-prefix needs the post-FilterPushdown invariant that SortExec.filter and ReverseParquetSource.predicate point at the same DynamicFilterPhysicalExpr::Inner to survive proto roundtrip. Today atlas runs a post-decode walker (share_dynamic_filters) to re-establish that invariant. The walker is fragile and hides the real gap: branch-53 was missing the apache#21807 dedup pipeline entirely (PhysicalExprNode.expr_id, DeduplicatingSerializer / DeduplicatingDeserializer, expression_id-based identity for DynamicFilterPhysicalExpr).

Commits

  1. 47f263d1f: port proto: serialize and dedupe dynamic filters v2 apache/datafusion#21807. Stamps PhysicalExprNode.expr_id on the wire and dedupes DynamicFilterPhysicalExpr references during decode via DeduplicatingSerializer / DeduplicatingDeserializer.
  2. 3eed0c360: switch the dedup cache key from Arc::as_ptr to expression_id, mirroring upstream's stable identity.
  3. 0e6d98858: port proto: serialize dynamic filters on Sort, Aggregate, HashJoin plan nodes apache/datafusion#22011. SortExec / AggregateExec / HashJoinExec plan codecs route their dynamic filter expr through the converter pipeline.
  4. 42342fbc1: breaking change to PhysicalExtensionCodec trait. try_encode, try_decode, try_encode_expr, try_decode_expr now all take proto_converter: &dyn PhysicalProtoConverterExtension. The plan-level half (try_encode / try_decode) is what atlas's MaybeReverseParquet codec actually needs to route the nested predicate through proto_converter.physical_expr_to_proto (this is the bit that makes X-2935 dedup work). The expr-level half (try_encode_expr / try_decode_expr) is NOT strictly required for X-2935 (atlas's predicate is DynamicFilterPhysicalExpr, which takes the try_to_proto fast-path and never reaches try_encode_expr), but it is included here so the trait shape matches upstream feat(proto): plumb PhysicalProtoConverterExtension into try_encode_expr / try_decode_expr apache/datafusion#22922 in advance, avoiding a second breaking change when we upgrade DF past 53.

Empirical verification of the expr-level scope

To verify the expr-level half is forward-compat rather than load-bearing for X-2935, I cut a test branch (qizhu/x-2935-test-without-expr-level) that reverts only the try_encode_expr / try_decode_expr portion of commit 4 (plus internal call sites and overrides). Result: the atlas e2e test test_dedup_converter_preserves_shared_dynamic_filter_arc_e2e still passes, all 427 atlas lib tests still pass, and atlas can still delete the walker. The expr-level plumbing is kept in this PR purely for upstream alignment.

Affects

  • Atlas internal build: required dependency for the X-2935 walker removal in atlas (separate PR in rust-app-atlas).
  • Branch-53 fork API: breaking change on PhysicalExtensionCodec trait. Downstream impls in atlas codecs (AtlasPhysicalCodecExt, SimPhysicalCodec, NoopPhysicalExtensionCodec) are updated in the matching atlas PR. In-repo impls in datafusion-ffi, datafusion-examples, and datafusion-proto integration tests are updated in this PR.
  • Wire format: unchanged. PhysicalExprNode.expr_id (field 30) is the only addition and is fully optional, so old payloads decode unchanged.
  • Staging / prod blast radius: none until the atlas PR lands and pins to this SHA.

Test

  • atlas e2e: test_dedup_converter_preserves_shared_dynamic_filter_arc_e2e verifies SortExec.filter + ReverseParquetSource.predicate share the same Arc after roundtrip without the walker.
  • atlas lib: 427/427 pass with the walker deleted.
  • datafusion-proto tests still pass; full workspace + tests + examples build clean.

Upstream tracking

…al port of apache#21807)

Adds expression_id to DynamicFilterPhysicalExpr Inner (atomic counter,
preserved across update() and with_new_children) and a corresponding
PhysicalDynamicFilterNode proto message. Serializer special-cases
DynamicFilterPhysicalExpr to skip snapshot() and emit the wrapper full.
Deserializer reconstructs the wrapper.

Combined with the existing DeduplicatingSerializer + DeduplicatingDeserializer
pair on branch-53, the wire-level Arc identity dedup now works for dynamic
filters too: SortExec.filter and the FileScan predicate it was pushed down
to reconstruct to the same Arc<Inner> on the data server, so heap-max
updates propagate end-to-end without the atlas-side share_dynamic_filters
walker.

Minimal port: skips upstream's ExpressionPlacement enum, snapshot()
refactor, and DedupingSerializer removal. Wire layout (field 23 oneof
variant + PhysicalDynamicFilterNode fields) mirrors upstream so a future
DF upgrade can drop this patch mechanically.

DeduplicatingSerializer and DeduplicatingDeserializer are now pub so
atlas's coordinator-side and data-server-side codecs can opt in.
Mirrors apache#21807 step 3-4: switch DeduplicatingSerializer
from hashing Arc::as_ptr to hashing the expression's expression_id when
available. Falls back to Arc::as_ptr for expressions without a stable id.

This fixes the case FilterPushdown produces in practice: pushing a
DynamicFilterPhysicalExpr from a SortExec down to a FileScan creates a
new outer Arc (via with_new_children to remap columns) that still shares
the same Arc<RwLock<Inner>>. Without this change, the two outer Arcs hash
to different addresses and DeduplicatingDeserializer rebuilds them as
two distinct Arcs on decode -- exactly the X-2935 walker's bug.

Adds a new test dynamic_filter_dedup_distinct_outer_arcs_same_inner that
constructs the with_new_children case and asserts dedup works.

150/150 proto integration tests pass.
Adds the final piece needed to drop the atlas-side share_dynamic_filters
walker: SortExecNode now carries its internal DynamicFilterPhysicalExpr
on the wire, and try_into_sort_physical_plan re-installs it via
SortExec::with_dynamic_filter_expr instead of letting with_fetch(N)
mint a brand-new one via create_filter() on decode.

Combined with the prior two commits in this series (apache#21807 minimal port
+ dedup-by-expression_id), the decode side now reconstructs the SAME
Arc<DynamicFilterPhysicalExpr> shared between the SortExec.filter and
any pushed-down FileScan predicate that participated in the encode --
because DeduplicatingDeserializer's cache, keyed on the wire expr_id
that DeduplicatingSerializer derives from expression_id, matches across
plan nodes.

Adds:
- SortExec::dynamic_filter_expr() / with_dynamic_filter_expr() accessors
- dynamic_filter field on SortExecNode proto (field 5)
- to_proto: try_from_sort_exec serializes the dyn filter via proto_converter
- from_proto: try_into_sort_physical_plan deserializes + reinstalls via
  with_dynamic_filter_expr; legacy fallback wraps a fresh Arc when the
  encoder snapshotted the wrapper away

Test sort_exec_proto_roundtrip_preserves_dyn_filter_arc asserts
end-to-end: serialize a SortExec with a known DynamicFilterPhysicalExpr,
roundtrip via DeduplicatingSerializer + DeduplicatingDeserializer, the
decoded SortExec's filter must carry the same expression_id.

Ported from apache#22011 (minimal subset, adapted to
branch-53's PhysicalProtoConverterExtension signatures).

151/151 proto integration tests pass.

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR ports upstream DataFusion proto improvements to preserve and deduplicate DynamicFilterPhysicalExpr across protobuf roundtrips (including across extension codec boundaries), and adjusts the extension codec API so custom codecs can participate in the active (de)dup conversion pipeline.

Changes:

  • Add a protobuf representation for DynamicFilterPhysicalExpr (so it is no longer snapshotted away) and decode it back into the wrapper.
  • Deduplicate physical expressions during decode using expr_id, and switch dedup identity for dynamic filters from Arc::as_ptr to stable expression_id().
  • Update PhysicalExtensionCodec encode/decode APIs to accept proto_converter, enabling extension codecs to thread dedup through nested plan/expr fields.

Reviewed changes

Copilot reviewed 9 out of 11 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
datafusion/proto/tests/cases/roundtrip_physical_plan.rs Adds roundtrip/dedup tests for dynamic filters and SortExec dynamic filter persistence.
datafusion/proto/src/physical_plan/to_proto.rs Adds special-case serialization for DynamicFilterPhysicalExpr (no snapshot) and threads converter into extension expr encoding.
datafusion/proto/src/physical_plan/mod.rs Threads converter into extension plan encoding/decoding, adds SortExec dynamic filter proto handling, and changes PhysicalExtensionCodec trait signatures.
datafusion/proto/src/physical_plan/from_proto.rs Adds converter-aware extension expr decoding and a DynamicFilterPhysicalExpr decode path.
datafusion/proto/src/generated/prost.rs Updates generated prost types for new dynamic filter expr node and SortExec dynamic filter field.
datafusion/proto/src/generated/pbjson.rs Updates generated JSON (de)serialization for the new messages/fields.
datafusion/proto/proto/datafusion.proto Adds PhysicalDynamicFilterNode and a dynamic_filter field on SortExecNode.
datafusion/physical-plan/src/sorts/sort.rs Exposes SortExec dynamic filter getter/setter for proto roundtrip.
datafusion/physical-expr/src/expressions/mod.rs Re-exports DynamicFilterInner for proto reconstruction.
datafusion/physical-expr/src/expressions/dynamic_filters.rs Introduces stable expression_id, exposes inner parts for proto, and adds reconstruction helpers.
datafusion/physical-expr-common/src/physical_expr.rs Adds expression_id() hook on PhysicalExpr to support identity-based dedup.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread datafusion/physical-expr/src/expressions/dynamic_filters.rs
Comment thread datafusion/proto/src/physical_plan/mod.rs
@zhuqi-lucas zhuqi-lucas force-pushed the qizhu/x-2935-port-21807 branch 2 times, most recently from 1ea3955 to 48e4075 Compare June 12, 2026 07:22
@github-actions github-actions Bot added the ffi label Jun 12, 2026
Modifies the PhysicalExtensionCodec trait so that try_encode, try_decode,
try_encode_expr, and try_decode_expr receive the active
PhysicalProtoConverterExtension. Codecs that embed nested PhysicalExprNode
fields inside their custom proto (e.g. atlas's ReverseParquetNode.predicate)
must now route those through proto_converter.proto_to_physical_expr /
proto_converter.physical_expr_to_proto so the dedup cache extends across
the extension boundary.

Without this plumbing, an outer DeduplicatingSerializer can't reach
into extension nodes -- the dedup cache stops at the extension boundary
and Arc<Inner> sharing breaks for any expression carried inside an
extension codec, even though the wire format already supports it via
the expr_id field. With this change, atlas can finally drop the
share_dynamic_filters walker once its codecs override the new methods.

This is the breaking-change pattern matching the apache/datafusion
upstream design (issue apache#22920) instead of additive _with_converter
variants -- branch-53 is internal-only so callers can be updated in
lockstep.
@zhuqi-lucas zhuqi-lucas force-pushed the qizhu/x-2935-port-21807 branch from 48e4075 to 42342fb Compare June 12, 2026 07:59
@zhuqi-lucas zhuqi-lucas merged commit 641f176 into branch-53 Jun 12, 2026
59 checks passed
@zhuqi-lucas zhuqi-lucas deleted the qizhu/x-2935-port-21807 branch June 12, 2026 08:32
zhuqi-lucas added a commit to massive-com/datafusion-materialized-views that referenced this pull request Jun 12, 2026
Picks up the X-2935 PhysicalExtensionCodec breaking change
(massive-com/arrow-datafusion#60) on branch-53.

Wire format unchanged; trait signature change is the only API
impact. MV doesn't implement PhysicalExtensionCodec so this
bump is a pure dependency move.
zhuqi-lucas added a commit to massive-com/datafusion-materialized-views that referenced this pull request Jun 12, 2026
Picks up the X-2935 PhysicalExtensionCodec breaking change
(massive-com/arrow-datafusion#60) on branch-53.

Wire format unchanged; trait signature change is the only API
impact. MV doesn't implement PhysicalExtensionCodec so this
bump is a pure dependency move.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants