From 324d55f19db1302ba391835fc2966a58538a12e6 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 12 May 2026 11:36:53 -0400 Subject: [PATCH 1/7] Add go test to reproduce the deadline exceed errors when a dofn fails --- .../runners/prism/internal/execute_test.go | 22 +++++++++++++++++++ .../runners/prism/internal/testdofns_test.go | 5 +++++ 2 files changed, 27 insertions(+) diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute_test.go b/sdks/go/pkg/beam/runners/prism/internal/execute_test.go index 29fccaeb238e..2bb73f20e200 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute_test.go @@ -519,6 +519,28 @@ func TestFailure(t *testing.T) { } } +func TestFailureHang(t *testing.T) { + initRunner(t) + + p, s := beam.NewPipelineWithRoot() + imp := beam.Impulse(s) + col1 := beam.ParDo(s, doFnBlock, imp) + col2 := beam.ParDo(s, doFnFail, imp) + beam.ParDo(s, &int64Check{Name: "block", Want: []int{}}, col1) + beam.ParDo(s, &int64Check{Name: "fail", Want: []int{}}, col2) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + _, err := executeWithT(ctx, t, p) + if err == nil { + t.Fatalf("expected pipeline failure, but got a success") + } + if want := "doFnFail: failing as intended"; !strings.Contains(err.Error(), want) { + t.Fatalf("expected pipeline failure with %q, but was %v", want, err) + } +} + func TestRunner_Passert(t *testing.T) { initRunner(t) tests := []struct { diff --git a/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go b/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go index 334d74fcae1d..d21ccd53afd0 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go @@ -59,6 +59,7 @@ func init() { register.Function3x0(dofn1Counter) register.Function2x0(dofnSink) register.Function3x1(doFnFail) + register.Function3x0(doFnBlock) register.Function2x1(combineIntSum) @@ -283,6 +284,10 @@ func doFnFail(ctx context.Context, _ []byte, emit func(int64)) error { return fmt.Errorf("doFnFail: failing as intended") } +func doFnBlock(ctx context.Context, _ []byte, emit func(int64)) { + <-ctx.Done() +} + func combineIntSum(a, b int64) int64 { return a + b } From 26e8e66a18f6817353f746c86fc9c459788187f3 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 12 May 2026 11:43:45 -0400 Subject: [PATCH 2/7] Add python unit test to reproduce it. --- .../runners/portability/prism_runner_test.py | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py b/sdks/python/apache_beam/runners/portability/prism_runner_test.py index a65f9a9960b4..124514c8a824 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py @@ -457,6 +457,30 @@ def test_with_remote_path(self, has_cache_bin, has_cache_zip, ignore_cache): mock_zipfile_init.assert_called_once() +class PrismRunnerExecutionTest(unittest.TestCase): + def test_dofn_failure_clean_exit(self): + import time + class FailDoFn(beam.DoFn): + def process(self, element): + raise ValueError("Failing as intended") + + class BlockDoFn(beam.DoFn): + def process(self, element): + time.sleep(10) + yield element + + from apache_beam.options.pipeline_options import PrismRunnerOptions + options = PortableOptions() + options.view_as(StandardOptions).runner = 'PrismRunner' + options.view_as(PortableOptions).job_server_timeout = 5 + + with self.assertRaisesRegex(Exception, "Failing as intended"): + with beam.Pipeline(options=options) as p: + imp = p | beam.Create([1, 2]) + _ = imp | 'Block' >> beam.ParDo(BlockDoFn()) + _ = imp | 'Fail' >> beam.ParDo(FailDoFn()) + + class PrismRunnerSingletonTest(unittest.TestCase): @parameterized.expand([True, False]) def test_singleton(self, enable_singleton): From cd3d79a39e11262e0514deb3fe16181feac3ecd3 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 12 May 2026 11:48:14 -0400 Subject: [PATCH 3/7] Change the context to egctx so a bundle failure will cancel other bundle execution. --- sdks/go/pkg/beam/runners/prism/internal/execute.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index 853b7974479d..f6e148f9f3f6 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -376,7 +376,11 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic eg.Go(func() error { s := stages[rb.StageID] wk := wks[s.envID] - if err := s.Execute(ctx, j, wk, comps, em, rb); err != nil { + // Pass egctx instead of the parent ctx so that when any bundle fails, + // the errgroup cancels egctx and all other concurrent bundle execution + // goroutines immediately detect cancellation and abort. This prevents + // eg.Wait() from blocking indefinitely and allows prompt error reporting. + if err := s.Execute(egctx, j, wk, comps, em, rb); err != nil { // Ensure we clean up on bundle failure j.Logger.Error("Bundle Failed.", slog.Any("error", err)) em.FailBundle(rb) From ba059e8265911223d20d921f01f6070ba25cb6a1 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 12 May 2026 12:25:02 -0400 Subject: [PATCH 4/7] Fix lints. --- .../python/apache_beam/runners/portability/prism_runner_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py b/sdks/python/apache_beam/runners/portability/prism_runner_test.py index 124514c8a824..bbc9f4d96094 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py @@ -20,6 +20,7 @@ import logging import os.path import shlex +import time import typing import unittest import zipfile @@ -459,7 +460,6 @@ def test_with_remote_path(self, has_cache_bin, has_cache_zip, ignore_cache): class PrismRunnerExecutionTest(unittest.TestCase): def test_dofn_failure_clean_exit(self): - import time class FailDoFn(beam.DoFn): def process(self, element): raise ValueError("Failing as intended") From 143785db9d95f5529f22c90b4d5406ce7c63ba8f Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 12 May 2026 12:31:01 -0400 Subject: [PATCH 5/7] Remove unused import. --- sdks/python/apache_beam/runners/portability/prism_runner_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py b/sdks/python/apache_beam/runners/portability/prism_runner_test.py index 01580ca2cd52..1217d6677017 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py @@ -472,7 +472,6 @@ def process(self, element): time.sleep(10) yield element - from apache_beam.options.pipeline_options import PrismRunnerOptions options = PortableOptions() options.view_as(StandardOptions).runner = 'PrismRunner' options.view_as(PortableOptions).job_server_timeout = 5 From 48c09139b2fdaa1583555e4908375a12b5bc2985 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 12 May 2026 14:18:17 -0400 Subject: [PATCH 6/7] Move test to a test class that use built prism during vr test. --- .../runners/portability/prism_runner_test.py | 40 ++++++++----------- 1 file changed, 17 insertions(+), 23 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py b/sdks/python/apache_beam/runners/portability/prism_runner_test.py index 1217d6677017..4c06c3db6164 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py @@ -188,6 +188,7 @@ def create_options(self): options.view_as(StandardOptions).streaming = self.streaming options.view_as( TypeOptions).allow_unsafe_triggers = self.allow_unsafe_triggers + options.view_as(PortableOptions).job_server_timeout = 60 return options # Can't read host files from within docker, read a "local" file there. @@ -299,6 +300,22 @@ def test_after_count_trigger_streaming(self): ('B-3', {10, 15, 16}), ]))) + def test_dofn_failure_clean_exit(self): + class FailDoFn(beam.DoFn): + def process(self, element): + raise ValueError("Failing as intended") + + class BlockDoFn(beam.DoFn): + def process(self, element): + time.sleep(1000) + yield element + + with self.assertRaisesRegex(Exception, "Failing as intended"): + with self.create_pipeline() as p: + imp = p | beam.Create([1, 2]) + _ = imp | 'Block' >> beam.ParDo(BlockDoFn()) + _ = imp | 'Fail' >> beam.ParDo(FailDoFn()) + class PrismJobServerTest(unittest.TestCase): def setUp(self) -> None: @@ -460,29 +477,6 @@ def test_with_remote_path(self, has_cache_bin, has_cache_zip, ignore_cache): else: mock_zipfile_init.assert_called_once() - -class PrismRunnerExecutionTest(unittest.TestCase): - def test_dofn_failure_clean_exit(self): - class FailDoFn(beam.DoFn): - def process(self, element): - raise ValueError("Failing as intended") - - class BlockDoFn(beam.DoFn): - def process(self, element): - time.sleep(10) - yield element - - options = PortableOptions() - options.view_as(StandardOptions).runner = 'PrismRunner' - options.view_as(PortableOptions).job_server_timeout = 5 - - with self.assertRaisesRegex(Exception, "Failing as intended"): - with beam.Pipeline(options=options) as p: - imp = p | beam.Create([1, 2]) - _ = imp | 'Block' >> beam.ParDo(BlockDoFn()) - _ = imp | 'Fail' >> beam.ParDo(FailDoFn()) - - class PrismRunnerSingletonTest(unittest.TestCase): @parameterized.expand([True, False]) def test_singleton(self, enable_singleton): From 210971c5716315ae8b2cfa901bcea0355d96cc9d Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 12 May 2026 15:11:37 -0400 Subject: [PATCH 7/7] Remove the new python test due to flakiness. --- .../runners/portability/prism_runner_test.py | 19 ++----------------- 1 file changed, 2 insertions(+), 17 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py b/sdks/python/apache_beam/runners/portability/prism_runner_test.py index 4c06c3db6164..9c1620603fd3 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py @@ -21,6 +21,7 @@ import os.path import queue import shlex +import threading import time import typing import unittest @@ -188,7 +189,6 @@ def create_options(self): options.view_as(StandardOptions).streaming = self.streaming options.view_as( TypeOptions).allow_unsafe_triggers = self.allow_unsafe_triggers - options.view_as(PortableOptions).job_server_timeout = 60 return options # Can't read host files from within docker, read a "local" file there. @@ -300,22 +300,6 @@ def test_after_count_trigger_streaming(self): ('B-3', {10, 15, 16}), ]))) - def test_dofn_failure_clean_exit(self): - class FailDoFn(beam.DoFn): - def process(self, element): - raise ValueError("Failing as intended") - - class BlockDoFn(beam.DoFn): - def process(self, element): - time.sleep(1000) - yield element - - with self.assertRaisesRegex(Exception, "Failing as intended"): - with self.create_pipeline() as p: - imp = p | beam.Create([1, 2]) - _ = imp | 'Block' >> beam.ParDo(BlockDoFn()) - _ = imp | 'Fail' >> beam.ParDo(FailDoFn()) - class PrismJobServerTest(unittest.TestCase): def setUp(self) -> None: @@ -477,6 +461,7 @@ def test_with_remote_path(self, has_cache_bin, has_cache_zip, ignore_cache): else: mock_zipfile_init.assert_called_once() + class PrismRunnerSingletonTest(unittest.TestCase): @parameterized.expand([True, False]) def test_singleton(self, enable_singleton):