From 30a86770372967c149548dc4c8c94cb6a56eff28 Mon Sep 17 00:00:00 2001 From: TongruiLi Date: Thu, 7 May 2026 22:11:04 +0000 Subject: [PATCH 1/5] Added portable runner options to java runner --- .../org/apache/beam/runners/dataflow/DataflowRunner.java | 9 ++++++--- .../apache/beam/runners/dataflow/DataflowRunnerTest.java | 8 ++++---- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index ecc231ab825e..0a28b0e48d38 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1245,7 +1245,7 @@ public DataflowPipelineJob run(Pipeline pipeline) { // to Runner v2. if (DataflowRunner.isMultiLanguagePipeline(pipeline) || includesTransformUpgrades(pipeline)) { List experiments = firstNonNull(options.getExperiments(), Collections.emptyList()); - if (!experiments.contains("use_runner_v2")) { + if (!useUnifiedWorker(options)) { LOG.info( "Automatically enabling Dataflow Runner v2 since the pipeline used cross-language" + " transforms or pipeline needed a transform upgrade."); @@ -1256,7 +1256,9 @@ public DataflowPipelineJob run(Pipeline pipeline) { if (useUnifiedWorker(options)) { if (hasExperiment(options, "disable_runner_v2") || hasExperiment(options, "disable_runner_v2_until_2023") - || hasExperiment(options, "disable_prime_runner_v2")) { + || hasExperiment(options, "disable_prime_runner_v2") + || hasExperiment(options, "disable_portable_runner") + || hasExperiment(options, "enable_streaming_java_runner")) { throw new IllegalArgumentException( "Runner V2 both disabled and enabled: at least one of ['beam_fn_api', 'use_unified_worker', 'use_runner_v2', 'use_portable_job_submission'] is set and also one of ['disable_runner_v2', 'disable_runner_v2_until_2023', 'disable_prime_runner_v2'] is set."); } @@ -2729,7 +2731,8 @@ static boolean useUnifiedWorker(DataflowPipelineOptions options) { return hasExperiment(options, "beam_fn_api") || hasExperiment(options, "use_runner_v2") || hasExperiment(options, "use_unified_worker") - || hasExperiment(options, "use_portable_job_submission"); + || hasExperiment(options, "use_portable_job_submission") + || hasExperiment(options, "enable_portable_runner"); } static void verifyDoFnSupported( diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 8c33123be6d5..da533252694d 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -1783,7 +1783,7 @@ public void testSdkHarnessConfigurationPrime() throws IOException { public void testSettingAnyFnApiExperimentEnablesUnifiedWorker() throws Exception { for (String experiment : ImmutableList.of( - "beam_fn_api", "use_runner_v2", "use_unified_worker", "use_portable_job_submission")) { + "beam_fn_api", "use_runner_v2", "use_unified_worker", "use_portable_job_submission", "enable_portable_runner")) { DataflowPipelineOptions options = buildPipelineOptions(); ExperimentalOptions.addExperiment(options, experiment); Pipeline p = Pipeline.create(options); @@ -1798,7 +1798,7 @@ public void testSettingAnyFnApiExperimentEnablesUnifiedWorker() throws Exception for (String experiment : ImmutableList.of( - "beam_fn_api", "use_runner_v2", "use_unified_worker", "use_portable_job_submission")) { + "beam_fn_api", "use_runner_v2", "use_unified_worker", "use_portable_job_submission", "enable_portable_runner")) { DataflowPipelineOptions options = buildPipelineOptions(); options.setStreaming(true); ExperimentalOptions.addExperiment(options, experiment); @@ -1822,10 +1822,10 @@ public void testSettingAnyFnApiExperimentEnablesUnifiedWorker() throws Exception public void testSettingConflictingEnableAndDisableExperimentsThrowsException() throws Exception { for (String experiment : ImmutableList.of( - "beam_fn_api", "use_runner_v2", "use_unified_worker", "use_portable_job_submission")) { + "beam_fn_api", "use_runner_v2", "use_unified_worker", "use_portable_job_submission", "enable_portable_runner")) { for (String disabledExperiment : ImmutableList.of( - "disable_runner_v2", "disable_runner_v2_until_2023", "disable_prime_runner_v2")) { + "disable_runner_v2", "disable_runner_v2_until_2023", "disable_prime_runner_v2", "enable_streaming_java_runner", "disable_portable_runner")) { DataflowPipelineOptions options = buildPipelineOptions(); ExperimentalOptions.addExperiment(options, experiment); ExperimentalOptions.addExperiment(options, disabledExperiment); From 2aab1e13616d12b30700cbc1fa9435e93166f81f Mon Sep 17 00:00:00 2001 From: TongruiLi <12992126+TongruiLi@users.noreply.github.com> Date: Fri, 8 May 2026 00:53:44 +0000 Subject: [PATCH 2/5] spotless --- .../runners/dataflow/DataflowRunnerTest.java | 24 +++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index da533252694d..ab3b62a0aa1b 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -1783,7 +1783,11 @@ public void testSdkHarnessConfigurationPrime() throws IOException { public void testSettingAnyFnApiExperimentEnablesUnifiedWorker() throws Exception { for (String experiment : ImmutableList.of( - "beam_fn_api", "use_runner_v2", "use_unified_worker", "use_portable_job_submission", "enable_portable_runner")) { + "beam_fn_api", + "use_runner_v2", + "use_unified_worker", + "use_portable_job_submission", + "enable_portable_runner")) { DataflowPipelineOptions options = buildPipelineOptions(); ExperimentalOptions.addExperiment(options, experiment); Pipeline p = Pipeline.create(options); @@ -1798,7 +1802,11 @@ public void testSettingAnyFnApiExperimentEnablesUnifiedWorker() throws Exception for (String experiment : ImmutableList.of( - "beam_fn_api", "use_runner_v2", "use_unified_worker", "use_portable_job_submission", "enable_portable_runner")) { + "beam_fn_api", + "use_runner_v2", + "use_unified_worker", + "use_portable_job_submission", + "enable_portable_runner")) { DataflowPipelineOptions options = buildPipelineOptions(); options.setStreaming(true); ExperimentalOptions.addExperiment(options, experiment); @@ -1822,10 +1830,18 @@ public void testSettingAnyFnApiExperimentEnablesUnifiedWorker() throws Exception public void testSettingConflictingEnableAndDisableExperimentsThrowsException() throws Exception { for (String experiment : ImmutableList.of( - "beam_fn_api", "use_runner_v2", "use_unified_worker", "use_portable_job_submission", "enable_portable_runner")) { + "beam_fn_api", + "use_runner_v2", + "use_unified_worker", + "use_portable_job_submission", + "enable_portable_runner")) { for (String disabledExperiment : ImmutableList.of( - "disable_runner_v2", "disable_runner_v2_until_2023", "disable_prime_runner_v2", "enable_streaming_java_runner", "disable_portable_runner")) { + "disable_runner_v2", + "disable_runner_v2_until_2023", + "disable_prime_runner_v2", + "enable_streaming_java_runner", + "disable_portable_runner")) { DataflowPipelineOptions options = buildPipelineOptions(); ExperimentalOptions.addExperiment(options, experiment); ExperimentalOptions.addExperiment(options, disabledExperiment); From fbfb7fea81dc9eef11298b645cc81893d5501e63 Mon Sep 17 00:00:00 2001 From: TongruiLi <12992126+TongruiLi@users.noreply.github.com> Date: Fri, 8 May 2026 10:10:38 +0000 Subject: [PATCH 3/5] Added more tolerance to flaky test --- .../beam/sdk/util/UnboundedScheduledExecutorServiceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java index efbb03519789..bba10c84fab6 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java @@ -625,7 +625,7 @@ public void testThreadsAreAddedOnlyAsNeededWithContention() throws Exception { LOG.info("Created {} threads to execute at most 100 parallel tasks", largestPool); // Ideally we would never create more than 100, however with contention it is still possible // some extra threads will be created. - assertTrue(largestPool <= 110); + assertTrue(largestPool <= 120); executorService.shutdown(); } } From f7c8c5639fc7335e4661a99d596f11a5014855aa Mon Sep 17 00:00:00 2001 From: TongruiLi <12992126+TongruiLi@users.noreply.github.com> Date: Fri, 8 May 2026 21:25:36 +0000 Subject: [PATCH 4/5] Removed unused experiments --- .../java/org/apache/beam/runners/dataflow/DataflowRunner.java | 1 - 1 file changed, 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 0a28b0e48d38..bb1fa4d0cbd6 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1244,7 +1244,6 @@ public DataflowPipelineJob run(Pipeline pipeline) { // Multi-language pipelines and pipelines that include upgrades should automatically be upgraded // to Runner v2. if (DataflowRunner.isMultiLanguagePipeline(pipeline) || includesTransformUpgrades(pipeline)) { - List experiments = firstNonNull(options.getExperiments(), Collections.emptyList()); if (!useUnifiedWorker(options)) { LOG.info( "Automatically enabling Dataflow Runner v2 since the pipeline used cross-language" From 60b27f0cf23573ba350eb275cad25c8c67952ae9 Mon Sep 17 00:00:00 2001 From: TongruiLi <12992126+TongruiLi@users.noreply.github.com> Date: Fri, 8 May 2026 21:55:31 +0000 Subject: [PATCH 5/5] Added experiments back in --- .../java/org/apache/beam/runners/dataflow/DataflowRunner.java | 1 + 1 file changed, 1 insertion(+) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index bb1fa4d0cbd6..299e7fa21ed1 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1245,6 +1245,7 @@ public DataflowPipelineJob run(Pipeline pipeline) { // to Runner v2. if (DataflowRunner.isMultiLanguagePipeline(pipeline) || includesTransformUpgrades(pipeline)) { if (!useUnifiedWorker(options)) { + List experiments = firstNonNull(options.getExperiments(), Collections.emptyList()); LOG.info( "Automatically enabling Dataflow Runner v2 since the pipeline used cross-language" + " transforms or pipeline needed a transform upgrade.");