Skip to content

Split proto serialization to encapsulate private state #21835

@adriangb

Description

@adriangb

Note: this plan / investigation was AI generated based on my human direction and guidance.

Problem

datafusion-proto serializes every built-in PhysicalExpr through a single ~300-line downcast_ref chain in serialize_physical_expr_with_converter, with a symmetric match on ExprType in from_proto.rs. Because the serializer lives outside the crate where each expression is defined, every piece of internal state an expression wants to round-trip has to be pub.

The concrete incident that motivated this issue was #21807, where supporting proto round-tripping of DynamicFilterPhysicalExpr required exposing pub struct Inner, pub fn inner(), pub fn from_parts(), pub fn original_children(), pub fn remapped_children() — all marked "warning: not stable; proto-only" in the docstrings. See discussion at #21807 (comment) / #21807 (comment).

The underlying issue is not specific to DynamicFilterPhysicalExpr. Any stateful expression (one whose round-trip state isn't expressible through its normal public constructor) will hit the same wall. It also means:

  • Built-in expressions use a big central switch; third-party expressions go through PhysicalExtensionCodec::try_encode_expr. Two different shapes for the same thing.
  • Every new built-in PhysicalExpr requires editing datafusion-proto, which many contributors don't touch for their "real" change.
  • The .proto schema for each expression lives far away from the expression itself.

Proposal

Two changes, in sequence.

1. Extract datafusion-proto-models

Mirror the existing datafusion-proto-common split, but for the physical/logical plan schemas. The new crate contains only:

  • the .proto file(s) and the prost-generated Rust types,
  • optional pbjson/serde derives behind a json feature,
  • zero datafusion deps beyond datafusion-proto-common.

datafusion-proto keeps its current public API by re-exporting from datafusion-proto-models. Downstream consumers (datafusion-ffi, datafusion-examples, benchmarks) need no changes.

This is a pure refactor — no semantic change, no behavior change.

2. Add PhysicalExpr::to_proto (feature-gated)

Add a method to the PhysicalExpr trait, gated on a new proto feature:

#[cfg(feature = "proto")]
fn to_proto(
    &self,
    ctx: &dyn PhysicalExprEncoder,
) -> Result<Option<PhysicalExprNode>> {
    Ok(None)
}
  • Default returns Ok(None) → "fall through to the existing codec path" (matches today's behavior for extension expressions).
  • Explicit Ok(Some(...)) → the expression has serialized itself.
  • Expressions that should serialize but don't implement this override the default with internal_err!("{typename} does not implement to_proto").

PhysicalExprEncoder is a small trait defined in datafusion-proto-models, wrapping the existing PhysicalExtensionCodec + PhysicalProtoConverterExtension plumbing, with helpers for encode_child, encode_udf, encode_udaf, encode_udwf. This keeps physical-expr-common free of datafusion-proto as a dep.

The existing serialize_physical_expr_with_converter tries expr.to_proto(ctx) first; if it returns Ok(None), it falls through to the current downcast chain. This lets expressions migrate one at a time without breaking anything.

What this unlocks

  • Private state stays private. DynamicFilterPhysicalExpr::to_proto reads the RwLock directly — no more pub struct Inner. Once migrated, proto: serialize and dedupe dynamic filters v2 #21807's "pub for proto" scaffolding (Inner, from_parts, inner(), original_children, remapped_children) can be reverted.
  • One-file changes. Adding serialization for a new expression is a method impl next to the expression, not a round trip through datafusion-proto.
  • Parity between built-in and third-party. Everyone uses the same hook shape.

Decode side (follow-up)

The encode-side win is clear. The decode side is still a central match on ExprType, which is fine — oneof ExprType gives us an exhaustive Rust enum, a strict improvement over today's runtime downcast chain.

As a follow-up (probably best after the encode migrations land), we can push decoding back to the expressions too, via associated fns like:

impl BinaryExpr {
    fn from_proto(
        proto: &PhysicalBinaryExprNode,
        ctx: &ProtoDecodeCtx<'_>,
    ) -> Result<Arc<Self>> { ... }
}

The central match becomes a dispatch table. The main payoff is that public god-constructors for private state go away: DynamicFilterPhysicalExpr::from_proto reads the proto fields and builds Inner internally, so we can fully delete the from_parts/Inner-as-pub scaffolding.

Alternatives considered

  1. Bytes-level try_encode_self hook, no prost in physical-expr. Each expression encodes into an opaque Vec<u8>. Preserves privacy without pulling prost into more crates. Downside: you lose the single .proto file as the schema source of truth — cross-language debuggability and interop suffer. Rejected.
  2. Keep the central switch, mark unstable accessors #[doc(hidden)]. Cheap. Doesn't fix the third-party/built-in asymmetry or the central-edit-per-new-expression problem. Reasonable as a stopgap, not a solution.
  3. Separate PhysicalExprProto trait with a blanket fallback impl. Would avoid adding to PhysicalExpr. Requires a runtime "does it implement this" check (narrow Any cast). Slightly more complex than option (2) above and delivers the same thing.

Scope

This issue proposes the change only for PhysicalExpr. ExecutionPlan, LogicalPlan, and logical Expr have the same shape and would benefit from the same treatment, but each is its own migration (~40 ExecutionPlan impls alone). Worth tackling after the PhysicalExpr path is proven.

datafusion-substrait is unaffected — it operates independently and does not share code with datafusion-proto.

Open questions

  • Should PhysicalExprEncoder live in datafusion-proto-models or be a narrow sub-trait that PhysicalExtensionCodec re-implements? Leaning toward the former for minimum dep surface.
  • Window / aggregate expressions are serialized through a separate code path in to_proto.rs and are not regular PhysicalExpr impls. Probably get a parallel WindowExpr::to_proto hook, or stay central. Defer until the core PhysicalExpr migration is done.
  • proto feature default. I'd default it off on physical-expr-common/physical-expr/physical-plan (so developers who don't care pay nothing) and flip it on from datafusion-proto. CI would test with the feature on; off-mode is a "it still compiles" smoke test.

Related

Full implementation plan

Long Claude generated plan

Plan: split datafusion-proto and add PhysicalExpr::to_proto

Context

Today, datafusion-proto serializes every built-in PhysicalExpr via a ~300-line downcast_ref chain in datafusion/proto/src/physical_plan/to_proto.rs::serialize_physical_expr_with_converter (and a symmetric ExprType match in from_proto.rs). Because the serializer lives outside the crate where each expression is defined, every piece of internal state an expression wants to round-trip must be pub. The motivating example is the DynamicFilterPhysicalExpr PR (#21807), which had to expose pub struct Inner, pub fn inner(), pub fn from_parts(), pub fn original_children(), pub fn remapped_children() — all "warning: not stable; proto-only". The same shape will recur for every stateful expression.

Two problems to solve together:

  1. Private state stays private. Serialization code should live next to the data it serializes.
  2. Central switch is a maintenance sink. 20 production PhysicalExpr impls already; every new one requires editing proto/src/physical_plan/to_proto.rs + from_proto.rs. Third parties have to use PhysicalExtensionCodec with a different shape than built-ins.

Decision (user-approved):

  • Extract datafusion-proto-models (mirror of datafusion-proto-common): just the prost-generated structs + optional pbjson. No datafusion deps beyond proto-common.
  • Add fn to_proto(&self, ctx: &ProtoEncodeCtx<'_>) -> Result<PhysicalExprNode> directly on the PhysicalExpr trait, #[cfg(feature = "proto")]. Default impl returns internal_err!("<typename> does not implement proto serialization").
  • CI tests with the proto feature on; off-mode is a best-effort "it still compiles" signal.

This plan covers PhysicalExpr only. ExecutionPlan, LogicalPlan, and logical Expr have the same shape and can follow once this lands, but each is its own body of work (~40 ExecutionPlan impls alone).

Scope boundaries

  • In scope: PhysicalExpr encode path, infrastructure for the decode path, migration of all ~18 currently-serialized PhysicalExpr types.
  • Out of scope for this plan: ExecutionPlan and logical-side refactors, changes to PhysicalExtensionCodec shape for third-party codecs, datafusion-substrait (operates independently; unaffected).

Sequencing

Three PRs, each independently mergeable and reviewable.

PR 1 — Extract datafusion-proto-models

Pure refactor: move generated prost code + proto schema into a new crate. No semantic change. datafusion-proto becomes a thin layer on top.

New crate: datafusion/proto-models/

  • Cargo.toml — deps: prost, arrow (workspace), datafusion-proto-common, optional pbjson/serde/serde_json behind json feature. No datafusion-common, no datafusion crates beyond proto-common. Mirror datafusion/proto-common/Cargo.toml.
  • proto/datafusion.proto — moved from datafusion/proto/proto/datafusion.proto.
  • gen/src/main.rs + gen/Cargo.toml — moved from datafusion/proto/gen/.
  • src/lib.rs — re-exports generated types under a protobuf module (same shape as datafusion-proto-common's protobuf_common).
  • src/generated/prost.rs + src/generated/pbjson.rs — moved from datafusion/proto/src/generated/.
  • regen.sh — mirrored from datafusion/proto-common/regen.sh.

Modified:

  • Cargo.toml (workspace root) — register datafusion/proto-models member + workspace dep entry.
  • datafusion/proto/Cargo.toml — depend on datafusion-proto-models. Remove prost direct dep if fully delegated (keep if datafusion-proto itself still uses prost::Message directly — it does, for DataEncoderTuple, so keep).
  • datafusion/proto/src/lib.rspub use datafusion_proto_models::protobuf::*; (plus keep existing proto_error, FromProtoError, ToProtoError re-exports from proto-common). Maintain the existing public protobuf module so downstream users don't break.
  • datafusion/proto/regen.sh — update to invoke proto-models regen.
  • datafusion/proto/gen/ — deleted (moved).
  • datafusion/proto/proto/datafusion.proto — deleted (moved).
  • datafusion/proto/src/generated/ — deleted (moved).

Invariants: public API surface of datafusion-proto is unchanged. datafusion/ffi, datafusion-examples, benchmarks — the only three external consumers — keep compiling without any Cargo.toml change.

PR 2 — Add PhysicalExpr::to_proto hook + ProtoEncodeCtx

Introduce the trait method. Keep the existing downcast chain in place as a fallback so this PR is non-breaking — no expression migrates yet.

Modified:

  • datafusion/physical-expr-common/Cargo.toml — add optional datafusion-proto-models dep + proto feature. Feature is enabled in datafusion/physical-expr and datafusion/physical-plan when they want serialization (gated through them as well).
  • datafusion/physical-expr-common/src/physical_expr.rs — inside pub trait PhysicalExpr, add:
    #[cfg(feature = "proto")]
    fn to_proto(
        &self,
        _ctx: &datafusion_proto_models::ProtoEncodeCtx<'_>,
    ) -> Result<datafusion_proto_models::protobuf::PhysicalExprNode> {
        internal_err!(
            "PhysicalExpr `{}` does not implement to_proto; \
             implement it or use PhysicalExtensionCodec",
            std::any::type_name::<Self>(),
        )
    }

New (in datafusion-proto-models or a small sibling crate — TBD during implementation):

  • ProtoEncodeCtx<'a> — thin context type carrying &'a dyn PhysicalExtensionCodec and a reference to the PhysicalProtoConverterExtension converter, exposing helpers:
    • fn encode_child(&self, expr: &Arc<dyn PhysicalExpr>) -> Result<PhysicalExprNode> — recurses through the converter (preserves dedup support).
    • fn encode_udf(&self, udf: &ScalarUDF) -> Result<Vec<u8>> — wraps codec.try_encode_udf.
    • Similar encode_udaf / encode_udwf.

Open question for implementation: ProtoEncodeCtx references PhysicalExtensionCodec, which is defined in datafusion-proto. To avoid a cyclic dep, either:

  • (a) Move PhysicalExtensionCodec trait (or a stripped-down ProtoCodecExpr sub-trait) into proto-models, or
  • (b) Keep ProtoEncodeCtx in datafusion-proto and have PhysicalExpr::to_proto take &dyn PhysicalExprEncoder (a narrow trait defined in proto-models) that the real ProtoEncodeCtx implements.

Leaning toward (b) — cheaper surface change, smaller blast radius on PhysicalExtensionCodec.

Modified:

  • datafusion/proto/src/physical_plan/to_proto.rs::serialize_physical_expr_with_converter — before the downcast chain, try expr.to_proto(&ctx). If it returns the default "not implemented" error (detect via sentinel error kind, or via a Result<Option<PhysicalExprNode>> where Ok(None) means "fall through"), fall through to the existing chain. Going with Ok(None) is cleaner.

    Revised trait signature:

    fn to_proto(&self, ctx: &dyn PhysicalExprEncoder)
        -> Result<Option<PhysicalExprNode>> { Ok(None) }

    Default = fall through. Explicit Ok(Some(...)) = serialized. Err(...) = genuine error.

Decode path (unchanged structurally): keep parse_physical_expr_with_converter as the central match proto.expr_type in from_proto.rs. Per-expression decoders become associated fns fn try_from_proto(node: &X, ctx: &ProtoDecodeCtx<'_>) -> Result<Arc<Self>> colocated with the type (next PR). The central match stays exhaustive on ExprType, which is a strict improvement over today's runtime downcast chain.

Follow-up for decode side (not blocking): once expressions own both ends, consider formalizing the decoder as a sibling trait so the symmetry with to_proto is explicit — e.g.

// On each concrete PhysicalExpr:
impl BinaryExpr {
    fn from_proto(proto: &PhysicalBinaryExprNode, ctx: &ProtoDecodeCtx<'_>) -> Result<Arc<Self>> { ... }
}

and the central match in from_proto.rs becomes a dispatch table: ExprType::BinaryExpr(n) => BinaryExpr::from_proto(n, ctx), etc. The key payoff is that no public god-constructor is needed for private state — e.g. DynamicFilterPhysicalExpr::from_proto reads the proto fields and constructs Inner directly, so we can delete from_parts/Inner-as-pub entirely. Doing this after PR 3+ migrations rather than during keeps each PR small.

PR 3+ — Migrate expressions one at a time

One commit per type (or small group). Each migration:

  1. Implement fn to_proto(&self, ctx) -> Result<Option<PhysicalExprNode>> on the expression.
  2. Implement fn try_from_proto(node, ctx) -> Result<Arc<Self>> as an associated fn.
  3. Remove the corresponding downcast_ref::<T>() arm from to_proto.rs.
  4. Change the corresponding arm in from_proto.rs's central match to call Type::try_from_proto.
  5. Roll back any "pub for proto" scaffolding that is no longer needed (e.g., eventually the pub struct Inner + from_parts + inner() bits added in proto: serialize and dedupe dynamic filters v2 #21807 can become private fields, since DynamicFilterPhysicalExpr::to_proto reads the RwLock directly).

Suggested migration order (easiest → hardest):

  1. Column, UnKnownColumn, Literal, NoOp — trivial fields.
  2. NotExpr, IsNullExpr, IsNotNullExpr, NegativeExpr — single child.
  3. BinaryExpr (the linearization logic moves too), CastExpr, TryCastExpr, LikeExpr, InListExpr, CaseExpr.
  4. ScalarFunctionExpr — involves UDF encoding via ctx.encode_udf(...).
  5. HashExpr, HashTableLookupExpr — lives in datafusion-physical-plan, so enable the proto feature there. HashTableLookupExpr's current "replace with lit(true)" hack (to_proto.rs:277) moves into its own to_proto impl.
  6. DynamicFilterPhysicalExpr — after migration, revert proto: serialize and dedupe dynamic filters v2 #21807's pub Inner/from_parts scaffolding.
  7. Window + aggregate exprs (PlainAggregateWindowExpr, SlidingAggregateWindowExpr, StandardWindowExpr, WindowUDFExpr, AggregateFunctionExpr) — shaped slightly differently (not PhysicalExpr impls in the same way); may need a sibling trait or stay central.

After all in-workspace expressions migrate, the fallback downcast chain in to_proto.rs shrinks to just HashTableLookupExpr's special case (and eventually goes away entirely).

Critical files

Created:

  • datafusion/proto-models/Cargo.toml
  • datafusion/proto-models/proto/datafusion.proto (moved)
  • datafusion/proto-models/gen/{Cargo.toml,src/main.rs} (moved)
  • datafusion/proto-models/src/{lib.rs,generated/*.rs} (moved)
  • datafusion/proto-models/regen.sh

Modified (PR 1):

  • Cargo.toml (workspace root)
  • datafusion/proto/Cargo.toml
  • datafusion/proto/src/lib.rs
  • datafusion/proto/regen.sh

Modified (PR 2):

  • datafusion/physical-expr-common/Cargo.toml
  • datafusion/physical-expr-common/src/physical_expr.rs (trait definition, line 163+)
  • datafusion/physical-expr/Cargo.toml (proto feature passthrough)
  • datafusion/physical-plan/Cargo.toml (proto feature passthrough — needed for HashExpr/HashTableLookupExpr migration in PR 3)
  • datafusion/proto/src/physical_plan/to_proto.rs (serialize_physical_expr_with_converter tries trait method first)
  • New: PhysicalExprEncoder trait + ProtoEncodeCtx (location TBD; see open question above)

Modified (PRs 3+): each expression's source file + corresponding removal from datafusion/proto/src/physical_plan/{to_proto.rs,from_proto.rs}.

Reused existing code

  • datafusion-proto-common organization (proto/, gen/, src/generated/, regen.sh, src/{from_proto,to_proto}/) — mirror for proto-models. Ref: datafusion/proto-common/.
  • PhysicalExtensionCodec::try_encode_udf/udaf/udwf (datafusion/proto/src/physical_plan/mod.rs:3683–3723) — wrapped by ProtoEncodeCtx helpers, not changed.
  • PhysicalProtoConverterExtension (datafusion/proto/src/physical_plan/mod.rs:3751) — stays as-is; its physical_expr_to_proto becomes the entry point that calls into the trait method.
  • DeduplicatingSerializer/DeduplicatingDeserializer (mod.rs:3844+) — unchanged. ProtoEncodeCtx::encode_child routes through them, preserving dedup.
  • serialize_physical_exprs helper (to_proto.rs:220) — stays, becomes a thin wrapper over to_proto.

Verification

PR 1 (proto-models extraction):

cargo check -p datafusion-proto-models
cargo check -p datafusion-proto
cargo test -p datafusion-proto
cargo test -p datafusion-ffi
cargo check -p datafusion-examples
# Regen to confirm reproducibility:
./datafusion/proto-models/regen.sh && git diff --exit-code

Confirm no pub surface of datafusion-proto changed (cargo public-api or manual diff of src/lib.rs).

PR 2 (trait hook added, no migration):

cargo check -p datafusion-physical-expr-common --features proto
cargo check -p datafusion-physical-expr --features proto
cargo test -p datafusion-proto       # all roundtrip tests still pass (fallback path)
cargo test -p datafusion-proto --no-default-features  # smoke: still compiles

Also: verify the proto feature is wired into datafusion-proto's dev build so its roundtrip tests go through the trait hook with Ok(None) fallback — behavior identical to today.

PR 3+ (per-expression migration):
For each migrated expression, the existing roundtrip tests in datafusion/proto/tests/cases/roundtrip_physical_plan.rs and roundtrip_physical_expr.rs must continue to pass. Because the central from_proto.rs match stays exhaustive, missing migration arms cause compile errors, not silent regressions.
After DynamicFilterPhysicalExpr migrates: revert the "pub for proto" scaffolding added in #21807 (Inner becomes private, from_parts/inner()/original_children()/remapped_children() removed) and run the existing dynamic-filter roundtrip tests.

End-to-end signal: workspace CI runs ./dev/rust_lint.sh + cargo test --workspace --features proto green; cargo test -p datafusion-proto green on both feature settings.

Risks / open items

  1. PhysicalExprEncoder trait placement (see open question above) — decided during PR 2 implementation. Pre-decision: put it in datafusion-proto-models so physical-expr-common has one dep under proto feature.
  2. Window / aggregate expressions are not regular PhysicalExpr impls and are serialized through a different code path in to_proto.rs (lines 117–151). Migrating them may need a parallel WindowExpr::to_proto or can stay central — defer decision to PR 4+.
  3. ForeignPhysicalExpr (datafusion-ffi) is a PhysicalExpr impl but not serialized today (not in the downcast chain). The default Ok(None) behavior preserves that — nothing to do.
  4. datafusion-ffi depends on datafusion-proto — the re-export surface has to stay stable through PR 1. Verified above.
  5. Feature-flag propagation is the main source of mechanical churn. Keep proto off by default on physical-expr-common/physical-expr/physical-plan; datafusion-proto flips it on. Developers who run cargo test --workspace without features get the no-proto variant; CI should explicitly --features proto for the proto crate's test job.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions