Tracking issue: #18479
Depends on: #38743 / #38764 (ExecutableStage / stateless ParDo translator, merged).
Summary
Direct follow-up to #38764, agreed with @je-ik on Slack:
- Implement Redistribute (arbitrarily) as a runner-native passthrough so the GreedyPipelineFuser splits stages at the Redistribute boundary, enabling a chained ExecutableStage end-to-end test.
- Drop the
byte[] type bound on ExecutableStageProcessor / KStreamsPayload at the processor edge so the runtime value type isn't silently corrupted for non-byte[] outputs (e.g. Integer from MapElements).
- Add a chained
Impulse -> MapElements<Integer> -> Redistribute.arbitrarily() -> ParDo(record) test under the EMBEDDED environment to prove non-byte[] flow stage-to-stage.
Scope
RedistributeTranslator registering beam:transform:redistribute_arbitrarily:v1 as a passthrough (no GBK, no state — single-instance topology has no actual redistribution to do).
- Wire
TrivialNativeTransformExpander.forKnownUrns(pipeline, knownUrns()) into prepareForTranslation so the fuser respects runner-native URNs (matches Spark/Flink pattern).
- Type generalization on the
ExecutableStageProcessor edge.
- High-level chained test using
SharedTestCollector<Integer> to assert side-effect arrival.
Out of scope
Redistribute.byKey() URN (rehashing semantics; punt to GBK sub-issue).
- WatermarkManager (separate sub-issue).
cc @je-ik
Tracking issue: #18479
Depends on: #38743 / #38764 (ExecutableStage / stateless ParDo translator, merged).
Summary
Direct follow-up to #38764, agreed with @je-ik on Slack:
byte[]type bound onExecutableStageProcessor/KStreamsPayloadat the processor edge so the runtime value type isn't silently corrupted for non-byte[] outputs (e.g.IntegerfromMapElements).Impulse -> MapElements<Integer> -> Redistribute.arbitrarily() -> ParDo(record)test under the EMBEDDED environment to prove non-byte[] flow stage-to-stage.Scope
RedistributeTranslatorregisteringbeam:transform:redistribute_arbitrarily:v1as a passthrough (no GBK, no state — single-instance topology has no actual redistribution to do).TrivialNativeTransformExpander.forKnownUrns(pipeline, knownUrns())intoprepareForTranslationso the fuser respects runner-native URNs (matches Spark/Flink pattern).ExecutableStageProcessoredge.SharedTestCollector<Integer>to assert side-effect arrival.Out of scope
Redistribute.byKey()URN (rehashing semantics; punt to GBK sub-issue).cc @je-ik