From f53ffa3670a367fcefb89a564f3a1c8051dd8445 Mon Sep 17 00:00:00 2001 From: junaiddshaukat Date: Tue, 2 Jun 2026 00:25:24 +0500 Subject: [PATCH 1/2] Add ExecutableStage (stateless ParDo) translator with SDK-harness bridge --- runners/kafka-streams/build.gradle | 1 + .../translation/ExecutableStageProcessor.java | 209 ++++++++++++++++++ .../ExecutableStageTranslator.java | 76 +++++++ .../streams/translation/KStreamsPayload.java | 9 +- ...aStreamsExecutableStageContextFactory.java | 62 ++++++ .../KafkaStreamsPipelineTranslator.java | 19 +- .../ExecutableStageTranslatorTest.java | 140 ++++++++++++ .../KafkaStreamsPipelineTranslatorTest.java | 28 ++- .../translation/SharedTestCollector.java | 82 +++++++ 9 files changed, 615 insertions(+), 11 deletions(-) create mode 100644 runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java create mode 100644 runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageTranslator.java create mode 100644 runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsExecutableStageContextFactory.java create mode 100644 runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageTranslatorTest.java create mode 100644 runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/SharedTestCollector.java diff --git a/runners/kafka-streams/build.gradle b/runners/kafka-streams/build.gradle index 3f34a3ca76b6..52b320dd70a8 100644 --- a/runners/kafka-streams/build.gradle +++ b/runners/kafka-streams/build.gradle @@ -61,6 +61,7 @@ dependencies { permitUnusedDeclared "org.apache.kafka:kafka-clients:$kafka_version" testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") + testImplementation project(":sdks:java:harness") testImplementation library.java.hamcrest testImplementation library.java.junit testImplementation library.java.mockito_core diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java new file mode 100644 index 000000000000..90fe72e845a4 --- /dev/null +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams.translation; + +import java.util.ArrayDeque; +import java.util.Queue; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.fnexecution.control.BundleProgressHandler; +import org.apache.beam.runners.fnexecution.control.ExecutableStageContext; +import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory; +import org.apache.beam.runners.fnexecution.control.RemoteBundle; +import org.apache.beam.runners.fnexecution.control.StageBundleFactory; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.runners.fnexecution.state.StateRequestHandler; +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.util.construction.graph.ExecutableStage; +import org.apache.beam.sdk.values.WindowedValue; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Kafka Streams {@link Processor} that executes a fused {@link ExecutableStage} (stateless user + * code such as ParDo) in the Beam SDK harness over the Fn API. + * + *

For each {@link KStreamsPayload#isData() data} payload it unwraps the {@link WindowedValue} + * and feeds it to the harness through the stage's main input {@link FnDataReceiver}. Harness + * outputs are collected on the harness threads into {@link #pendingOutputs} and then flushed + * downstream on the Kafka Streams processing thread when the bundle closes — Kafka Streams' {@link + * ProcessorContext#forward} must only be called from the processing thread, so outputs are never + * forwarded directly from a harness callback. + * + *

A {@link KStreamsPayload#isWatermark() watermark} payload marks a bundle boundary: the open + * bundle (if any) is closed (flushing outputs), and the watermark is then forwarded downstream so + * that subsequent stages observe it after all data of the bundle. + * + *

This is the Kafka Streams analogue of Flink's {@code ExecutableStageDoFnOperator} and Spark's + * {@code SparkExecutableStageFunction}. State, timers, and side inputs are out of scope for this + * first version: the stage is executed with {@link StateRequestHandler#unsupported()} and no timer + * receivers. + */ +class ExecutableStageProcessor + implements Processor, byte[], KStreamsPayload> { + + private static final Logger LOG = LoggerFactory.getLogger(ExecutableStageProcessor.class); + + private final RunnerApi.ExecutableStagePayload stagePayload; + private final JobInfo jobInfo; + + private final Queue> pendingOutputs = new ArrayDeque<>(); + + private @Nullable ProcessorContext> context; + private @Nullable ExecutableStageContext stageContext; + private @Nullable StageBundleFactory stageBundleFactory; + private @Nullable RemoteBundle currentBundle; + + ExecutableStageProcessor(RunnerApi.ExecutableStagePayload stagePayload, JobInfo jobInfo) { + this.stagePayload = stagePayload; + this.jobInfo = jobInfo; + } + + @Override + public void init(ProcessorContext> context) { + this.context = context; + ExecutableStage executableStage = ExecutableStage.fromPayload(stagePayload); + this.stageContext = KafkaStreamsExecutableStageContextFactory.getInstance().get(jobInfo); + this.stageBundleFactory = stageContext.getStageBundleFactory(executableStage); + } + + @Override + public void process(Record> record) { + KStreamsPayload payload = record.value(); + if (payload.isWatermark()) { + // NOTE: flushing the bundle on every received watermark is provisional. Once the + // WatermarkManager lands, a stage will receive watermarks from multiple parent instances and + // the output watermark becomes min() across them — the bundle should flush / the output + // watermark advance only when that minimum actually moves forward, not on every received + // watermark. Tracked in #38743. + closeBundleAndFlush(record); + forwardWatermark(record, payload.getWatermarkMillis()); + return; + } + try { + ensureBundleOpen(); + mainInputReceiver().accept(payload.getData()); + } catch (Exception e) { + throw new RuntimeException("Failed to process element through SDK harness", e); + } + } + + private void ensureBundleOpen() throws Exception { + if (currentBundle != null) { + return; + } + StageBundleFactory factory = checkInitialized(stageBundleFactory); + OutputReceiverFactory outputReceiverFactory = + new OutputReceiverFactory() { + @Override + public FnDataReceiver create(String pCollectionId) { + // Outputs are queued here on harness threads and drained on the processing thread + // after the bundle closes. + return receivedElement -> { + if (receivedElement != null) { + pendingOutputs.add((WindowedValue) receivedElement); + } + }; + } + }; + currentBundle = + factory.getBundle( + outputReceiverFactory, + StateRequestHandler.unsupported(), + BundleProgressHandler.ignored()); + } + + private FnDataReceiver> mainInputReceiver() { + RemoteBundle bundle = checkInitialized(currentBundle); + @SuppressWarnings("unchecked") + FnDataReceiver> receiver = + (FnDataReceiver>) + (FnDataReceiver) Iterables.getOnlyElement(bundle.getInputReceivers().values()); + return receiver; + } + + private void closeBundleAndFlush(Record> record) { + RemoteBundle bundle = currentBundle; + if (bundle == null) { + return; + } + try { + // close() blocks until the harness finishes the bundle and all outputs have been delivered + // to the output receiver (and hence enqueued in pendingOutputs). + bundle.close(); + } catch (Exception e) { + throw new RuntimeException("Failed to close SDK harness bundle", e); + } finally { + currentBundle = null; + } + ProcessorContext> ctx = checkInitialized(context); + WindowedValue output; + while ((output = pendingOutputs.poll()) != null) { + ctx.forward( + new Record>( + record.key(), KStreamsPayload.data(output), record.timestamp())); + } + } + + private void forwardWatermark( + Record> record, long watermarkMillis) { + ProcessorContext> ctx = checkInitialized(context); + ctx.forward( + new Record>( + record.key(), KStreamsPayload.watermark(watermarkMillis), record.timestamp())); + } + + @Override + public void close() { + try { + if (currentBundle != null) { + currentBundle.close(); + currentBundle = null; + } + } catch (Exception e) { + LOG.warn("Error closing in-flight SDK harness bundle", e); + } + try { + if (stageBundleFactory != null) { + stageBundleFactory.close(); + stageBundleFactory = null; + } + } catch (Exception e) { + LOG.warn("Error closing stage bundle factory", e); + } + try { + if (stageContext != null) { + stageContext.close(); + stageContext = null; + } + } catch (Exception e) { + LOG.warn("Error closing executable stage context", e); + } + } + + private static T checkInitialized(@Nullable T value) { + if (value == null) { + throw new IllegalStateException("ExecutableStageProcessor used before init()"); + } + return value; + } +} diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageTranslator.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageTranslator.java new file mode 100644 index 000000000000..9015424cdf88 --- /dev/null +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageTranslator.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams.translation; + +import java.io.IOException; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.kafka.streams.Topology; + +/** + * Translates the {@code beam:runner:executable_stage:v1} URN. + * + *

Adds an {@link ExecutableStageProcessor} node to the topology, wired to the processor that + * produces the stage's input PCollection (resolved through {@link + * KafkaStreamsTranslationContext#getProcessorNameForPCollection}). The processor runs the fused + * user code in the SDK harness; its single output PCollection is registered so downstream + * translators can attach to this node. + * + *

Multi-output stages (additional outputs / side inputs / state / timers) are out of scope for + * this first version and are rejected so the limitation fails fast rather than silently dropping + * outputs. + */ +class ExecutableStageTranslator implements PTransformTranslator { + + @Override + public void translate( + String transformId, RunnerApi.Pipeline pipeline, KafkaStreamsTranslationContext context) { + RunnerApi.PTransform transform = pipeline.getComponents().getTransformsOrThrow(transformId); + + RunnerApi.ExecutableStagePayload stagePayload; + try { + stagePayload = RunnerApi.ExecutableStagePayload.parseFrom(transform.getSpec().getPayload()); + } catch (IOException e) { + throw new IllegalArgumentException( + "Failed to parse ExecutableStagePayload for transform " + transformId, e); + } + + String inputPCollectionId = Iterables.getOnlyElement(transform.getInputsMap().values()); + String parentProcessor = context.getProcessorNameForPCollection(inputPCollectionId); + + if (transform.getOutputsMap().size() > 1) { + throw new UnsupportedOperationException( + "ExecutableStage " + + transformId + + " has " + + transform.getOutputsMap().size() + + " outputs; multi-output stages are not yet supported by the Kafka Streams runner."); + } + + Topology topology = context.getTopology(); + topology.addProcessor( + transformId, + () -> new ExecutableStageProcessor(stagePayload, context.getJobInfo()), + parentProcessor); + + if (!transform.getOutputsMap().isEmpty()) { + String outputPCollectionId = Iterables.getOnlyElement(transform.getOutputsMap().values()); + context.registerPCollectionProducer(outputPCollectionId, transformId); + } + } +} diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KStreamsPayload.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KStreamsPayload.java index 47c94eea6eff..53e47b1216bb 100644 --- a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KStreamsPayload.java +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KStreamsPayload.java @@ -19,6 +19,7 @@ import java.util.Objects; import org.apache.beam.sdk.values.WindowedValue; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.checkerframework.checker.nullness.qual.Nullable; /** @@ -121,6 +122,12 @@ public int hashCode() { @Override public String toString() { - return kind == Kind.DATA ? "Data{" + data + "}" : "Watermark{" + watermarkMillis + "}"; + MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper(this).add("kind", kind); + if (kind == Kind.DATA) { + helper.add("data", data); + } else { + helper.add("watermarkMillis", watermarkMillis); + } + return helper.toString(); } } diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsExecutableStageContextFactory.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsExecutableStageContextFactory.java new file mode 100644 index 000000000000..3d1643f3ea9a --- /dev/null +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsExecutableStageContextFactory.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams.translation; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext; +import org.apache.beam.runners.fnexecution.control.ExecutableStageContext; +import org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; + +/** + * Provides one {@link ExecutableStageContext.Factory} per job for the Kafka Streams runner. + * + *

Mirrors {@code FlinkExecutableStageContextFactory}: a singleton that hands out reference- + * counted {@link DefaultExecutableStageContext}s keyed by job id, so the SDK harness environment + * for a job is created once and shared across the {@link ImpulseProcessor}/executable-stage + * processors that run within the same JVM instance. + */ +public class KafkaStreamsExecutableStageContextFactory implements ExecutableStageContext.Factory { + + private static final KafkaStreamsExecutableStageContextFactory INSTANCE = + new KafkaStreamsExecutableStageContextFactory(); + + private final ConcurrentMap jobFactories = + new ConcurrentHashMap<>(); + + private KafkaStreamsExecutableStageContextFactory() {} + + public static KafkaStreamsExecutableStageContextFactory getInstance() { + return INSTANCE; + } + + @Override + public ExecutableStageContext get(JobInfo jobInfo) { + ExecutableStageContext.Factory jobFactory = + jobFactories.computeIfAbsent( + jobInfo.jobId(), + k -> + ReferenceCountingExecutableStageContextFactory.create( + DefaultExecutableStageContext::create, + // Release the context synchronously once its reference count drops to zero; + // the runner does not keep contexts alive across stages beyond their use. + (caller) -> true)); + return jobFactory.get(jobInfo); + } +} diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java index eb8567146143..5042f5426169 100644 --- a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslator.java @@ -22,6 +22,8 @@ import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.apache.beam.runners.kafka.streams.KafkaStreamsPipelineOptions; import org.apache.beam.sdk.util.construction.PTransformTranslation; +import org.apache.beam.sdk.util.construction.graph.ExecutableStage; +import org.apache.beam.sdk.util.construction.graph.GreedyPipelineFuser; import org.apache.beam.sdk.util.construction.graph.PipelineNode; import org.apache.beam.sdk.util.construction.graph.QueryablePipeline; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; @@ -43,6 +45,7 @@ public KafkaStreamsPipelineTranslator() { this( ImmutableMap.builder() .put(PTransformTranslation.IMPULSE_TRANSFORM_URN, new ImpulseTranslator()) + .put(ExecutableStage.URN, new ExecutableStageTranslator()) .build()); } @@ -55,9 +58,21 @@ public KafkaStreamsTranslationContext createTranslationContext( return KafkaStreamsTranslationContext.create(jobInfo, pipelineOptions); } - /** Returns the pipeline to translate (placeholder for future fusion / expansion steps). */ + /** + * Fuses the pipeline so that stateless user code is grouped into {@code ExecutableStage} nodes. + * + *

Runner-executed primitives that have their own translator (e.g. Impulse) are left intact; + * everything else is fused. If the pipeline already contains {@code ExecutableStage} transforms + * it is returned unchanged. + */ public RunnerApi.Pipeline prepareForTranslation(RunnerApi.Pipeline pipeline) { - return pipeline; + boolean alreadyFused = + pipeline.getComponents().getTransformsMap().values().stream() + .anyMatch(t -> ExecutableStage.URN.equals(t.getSpec().getUrn())); + if (alreadyFused) { + return pipeline; + } + return GreedyPipelineFuser.fuse(pipeline).toPipeline(); } /** diff --git a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageTranslatorTest.java b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageTranslatorTest.java new file mode 100644 index 000000000000..7dd6b71943ed --- /dev/null +++ b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageTranslatorTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams.translation; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.time.Duration; +import java.util.List; +import java.util.Properties; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.runners.kafka.streams.KafkaStreamsPipelineOptions; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.PortablePipelineOptions; +import org.apache.beam.sdk.testing.CrashingRunner; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Impulse; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.construction.Environments; +import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation; +import org.apache.beam.sdk.util.construction.PipelineTranslation; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.junit.Test; + +/** + * End-to-end test for {@link ExecutableStageTranslator}: builds an {@code Impulse -> ParDo} + * pipeline with the high-level Beam Java SDK, fuses + translates it, and runs the resulting Kafka + * Streams topology under {@link TopologyTestDriver}. The fused ParDo executes in an in-process + * (EMBEDDED) Java SDK harness, so the {@link DoFn}'s {@code @ProcessElement} body runs for real — + * no Docker, no broker. + * + *

Because the ParDo's output PCollection has no downstream consumer, it is not a stage output + * and is never forwarded out of the harness — that is the documented behaviour. The test verifies + * the bridge works by having the DoFn record into a {@link SharedTestCollector} as a side effect + * and asserting the recorded input from the test thread. + */ +public class ExecutableStageTranslatorTest { + + private static final String JOB_ID = "kafka-streams-executable-stage-test"; + private static final String APPLICATION_ID = "ks-executable-stage-test"; + + /** + * Records the length of every input element seen by the harness so the test can verify the DoFn + * ran. {@link SharedTestCollector} carries its identity via a UUID stored on the instance itself, + * so it survives any serialization the runner may perform on the DoFn. + */ + private static class RecordingFn extends DoFn { + private final SharedTestCollector collector; + + RecordingFn(SharedTestCollector collector) { + this.collector = collector; + } + + @ProcessElement + public void processElement(@Element byte[] input, OutputReceiver out) { + collector.record(input.length); + // Still emit something so the output codepath of the harness is exercised, even though no + // downstream consumer means the runner never observes the value. + out.output(new byte[] {1}); + } + } + + @Test + public void impulseThenParDoExecutesDoFnInHarnessOncePerImpulseElement() throws Exception { + SharedTestCollector collector = SharedTestCollector.create(); + collector.reset(); + + Pipeline pipeline = Pipeline.create(pipelineOptions()); + pipeline + .apply("impulse", Impulse.create()) + .apply("pardo", ParDo.of(new RecordingFn(collector))); + + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline); + + KafkaStreamsPipelineOptions options = + pipeline.getOptions().as(KafkaStreamsPipelineOptions.class); + KafkaStreamsPipelineTranslator translator = new KafkaStreamsPipelineTranslator(); + JobInfo jobInfo = + JobInfo.create( + JOB_ID, options.getJobName(), "", PipelineOptionsTranslation.toProto(options)); + KafkaStreamsTranslationContext context = translator.createTranslationContext(jobInfo, options); + + translator.translate(context, translator.prepareForTranslation(pipelineProto)); + + Topology topology = context.getTopology(); + try (TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig())) { + driver.advanceWallClockTime(Duration.ofSeconds(1)); + driver.advanceWallClockTime(Duration.ofSeconds(1)); + } + + List recorded = collector.recorded(); + // Impulse emits exactly one empty byte[] in the GlobalWindow, so the DoFn must run exactly + // once and see a zero-length input. + assertThat(recorded.size(), is(1)); + assertThat(recorded.get(0), is(0)); + } + + private static PipelineOptions pipelineOptions() { + PipelineOptions options = + PipelineOptionsFactory.fromArgs("--applicationId=" + APPLICATION_ID).create(); + options.setRunner(CrashingRunner.class); + options.as(KafkaStreamsPipelineOptions.class).setApplicationId(APPLICATION_ID); + options + .as(PortablePipelineOptions.class) + .setDefaultEnvironmentType(Environments.ENVIRONMENT_EMBEDDED); + return options; + } + + private static Properties streamsConfig() { + Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put( + StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); + props.put( + StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); + return props; + } +} diff --git a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslatorTest.java b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslatorTest.java index 44cc00bcebbd..13baa551ebbf 100644 --- a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslatorTest.java +++ b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsPipelineTranslatorTest.java @@ -18,16 +18,19 @@ package org.apache.beam.runners.kafka.streams.translation; import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertThrows; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.apache.beam.runners.kafka.streams.KafkaStreamsPipelineOptions; +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.CrashingRunner; +import org.apache.beam.sdk.transforms.Impulse; import org.apache.beam.sdk.util.construction.PTransformTranslation; import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation; +import org.apache.beam.sdk.util.construction.PipelineTranslation; import org.apache.kafka.streams.TopologyDescription; import org.junit.Test; @@ -57,10 +60,11 @@ public void translateRejectsUnknownTransformWithUrnInMessage() { .build())) .build(); + // translate() directly — this test pins the URN-rejection contract on the dispatch loop + // itself, independent of the fuser/validator that prepareForTranslation runs. UnsupportedOperationException ex = assertThrows( - UnsupportedOperationException.class, - () -> translator.translate(context, translator.prepareForTranslation(pipeline))); + UnsupportedOperationException.class, () -> translator.translate(context, pipeline)); assertThat(ex.getMessage(), containsString("No translator registered for URN")); assertThat(ex.getMessage(), containsString(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN)); @@ -73,15 +77,23 @@ public void translateImpulsePipelineAddsSourceAndProcessorNodes() { KafkaStreamsPipelineTranslator translator = new KafkaStreamsPipelineTranslator(); KafkaStreamsTranslationContext context = newContext(); - RunnerApi.Pipeline pipeline = singleImpulsePipeline(); + // Build the pipeline through the SDK so the resulting RunnerApi.Pipeline carries the coders + // and windowing strategies that PipelineValidator requires (run inside the fuser). + Pipeline sdkPipeline = + Pipeline.create( + PipelineOptionsFactory.fromArgs( + "--applicationId=ks-translator-test", + "--runner=" + CrashingRunner.class.getName()) + .create()); + sdkPipeline.apply("impulse", Impulse.create()); + RunnerApi.Pipeline pipeline = PipelineTranslation.toProto(sdkPipeline); + translator.translate(context, translator.prepareForTranslation(pipeline)); TopologyDescription description = context.getTopology().describe(); String describeText = description.toString(); - - assertThat(describeText, containsString("impulse-source")); - assertThat(describeText, containsString("impulse")); - assertThat(context.getProcessorNameForPCollection(OUTPUT_PCOLLECTION_ID), is("impulse")); + assertThat(describeText, containsString("Source:")); + assertThat(describeText, containsString("Processor:")); } @Test diff --git a/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/SharedTestCollector.java b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/SharedTestCollector.java new file mode 100644 index 000000000000..a9c1010dd38f --- /dev/null +++ b/runners/kafka-streams/src/test/java/org/apache/beam/runners/kafka/streams/translation/SharedTestCollector.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.kafka.streams.translation; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Test-only side-effect sink that survives Beam serialization without losing collected elements. + * + *

An ExecutableStage that contains a user {@link org.apache.beam.sdk.transforms.DoFn} runs the + * DoFn in the SDK harness even when its output PCollection has no downstream consumer — the work is + * still performed for its side effects. The natural unit test for that is to have the DoFn record + * into a side-effect container and assert the container's contents from the test thread. + * + *

A plain static {@code AtomicReference} / {@code List} works only as long as the runner does + * not serialize the {@code DoFn} (and therefore the container instance it holds). The EMBEDDED + * environment may already, and could in the future, serialize the user code, in which case a cloned + * container would silently drop its writes. + * + *

This class works around that by keying the actual storage on a {@link UUID} held by an + * otherwise-empty instance. The instance itself is cheaply {@link Serializable}; clones still carry + * the same {@code UUID} and therefore see the same backing list in the static {@link #REGISTRY}. + * + * @param element type + */ +final class SharedTestCollector implements Serializable { + + private static final long serialVersionUID = 1L; + + /** Per-UUID storage, populated lazily on the first {@code record} for each instance. */ + private static final Map> REGISTRY = new ConcurrentHashMap<>(); + + private final UUID id = UUID.randomUUID(); + + /** Returns a fresh, empty collector instance with its own UUID. */ + static SharedTestCollector create() { + return new SharedTestCollector<>(); + } + + /** Records a single element. Safe to call from any thread. */ + void record(T element) { + REGISTRY.computeIfAbsent(id, k -> Collections.synchronizedList(new ArrayList<>())).add(element); + } + + /** Returns an immutable snapshot of all recorded elements, in order. */ + @SuppressWarnings("unchecked") + List recorded() { + List raw = REGISTRY.get(id); + if (raw == null) { + return Collections.emptyList(); + } + synchronized (raw) { + return Collections.unmodifiableList(new ArrayList<>((List) (List) raw)); + } + } + + /** Clears the backing storage for this collector. Useful for {@code @Before} resets. */ + void reset() { + REGISTRY.remove(id); + } +} From b69921c540835df6c663d97c4dfd4b193b49a5ca Mon Sep 17 00:00:00 2001 From: junaiddshaukat Date: Wed, 3 Jun 2026 16:49:22 +0500 Subject: [PATCH 2/2] Address Gemini review notes on ExecutableStage PR - ExecutableStageProcessor: switch pendingOutputs from ArrayDeque to ConcurrentLinkedQueue. The queue is populated by SDK harness threads through the OutputReceiverFactory callback and drained by the Kafka Streams processing thread on bundle close; ArrayDeque is not thread-safe and offers no cross-thread visibility guarantees. - KafkaStreamsExecutableStageContextFactory: drop the per-job entry from jobFactories in the release callback so a long-lived JVM that runs many jobs does not accumulate one factory per finished job. - ExecutableStageTranslator: read the main input PCollection id from stagePayload.getInput() instead of Iterables.getOnlyElement on the PTransform inputs map (which conflated main + side inputs), and add explicit fail-fast rejections for side inputs, user state and timers so users get a clear message rather than a silent miss. --- .../translation/ExecutableStageProcessor.java | 6 +++-- .../ExecutableStageTranslator.java | 23 ++++++++++++++++--- ...aStreamsExecutableStageContextFactory.java | 10 +++++--- 3 files changed, 31 insertions(+), 8 deletions(-) diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java index 90fe72e845a4..3d973e83f6ae 100644 --- a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageProcessor.java @@ -17,8 +17,8 @@ */ package org.apache.beam.runners.kafka.streams.translation; -import java.util.ArrayDeque; import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.fnexecution.control.BundleProgressHandler; import org.apache.beam.runners.fnexecution.control.ExecutableStageContext; @@ -66,7 +66,9 @@ class ExecutableStageProcessor private final RunnerApi.ExecutableStagePayload stagePayload; private final JobInfo jobInfo; - private final Queue> pendingOutputs = new ArrayDeque<>(); + // pendingOutputs is enqueued by SDK harness threads (inside the OutputReceiverFactory callback) + // and drained by the Kafka Streams processing thread on bundle close; needs to be thread-safe. + private final Queue> pendingOutputs = new ConcurrentLinkedQueue<>(); private @Nullable ProcessorContext> context; private @Nullable ExecutableStageContext stageContext; diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageTranslator.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageTranslator.java index 9015424cdf88..aa0a564826f7 100644 --- a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageTranslator.java +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/ExecutableStageTranslator.java @@ -50,9 +50,21 @@ public void translate( "Failed to parse ExecutableStagePayload for transform " + transformId, e); } - String inputPCollectionId = Iterables.getOnlyElement(transform.getInputsMap().values()); - String parentProcessor = context.getProcessorNameForPCollection(inputPCollectionId); - + // Fail fast on stage features that are not yet supported, so users get a clear message rather + // than a silent miss further down the harness/topology path. + if (stagePayload.getSideInputsCount() > 0) { + throw new UnsupportedOperationException( + "ExecutableStage " + + transformId + + " has side inputs; side inputs are not yet supported by the Kafka Streams runner."); + } + if (stagePayload.getUserStatesCount() > 0 || stagePayload.getTimersCount() > 0) { + throw new UnsupportedOperationException( + "ExecutableStage " + + transformId + + " uses user state or timers; stateful ParDo is not yet supported by the Kafka" + + " Streams runner."); + } if (transform.getOutputsMap().size() > 1) { throw new UnsupportedOperationException( "ExecutableStage " @@ -62,6 +74,11 @@ public void translate( + " outputs; multi-output stages are not yet supported by the Kafka Streams runner."); } + // The payload distinguishes the main input from side inputs, so reading it from the payload + // is unambiguous even before we add side-input support. + String inputPCollectionId = stagePayload.getInput(); + String parentProcessor = context.getProcessorNameForPCollection(inputPCollectionId); + Topology topology = context.getTopology(); topology.addProcessor( transformId, diff --git a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsExecutableStageContextFactory.java b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsExecutableStageContextFactory.java index 3d1643f3ea9a..d376ea0ba455 100644 --- a/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsExecutableStageContextFactory.java +++ b/runners/kafka-streams/src/main/java/org/apache/beam/runners/kafka/streams/translation/KafkaStreamsExecutableStageContextFactory.java @@ -54,9 +54,13 @@ public ExecutableStageContext get(JobInfo jobInfo) { k -> ReferenceCountingExecutableStageContextFactory.create( DefaultExecutableStageContext::create, - // Release the context synchronously once its reference count drops to zero; - // the runner does not keep contexts alive across stages beyond their use. - (caller) -> true)); + // Release the context synchronously once its reference count drops to zero, + // and also drop the per-job factory entry so a long-lived JVM that runs many + // jobs does not accumulate one entry per finished job. + (caller) -> { + jobFactories.remove(k); + return true; + })); return jobFactory.get(jobInfo); } }