From e3a94628ad64d84c48584cc1b491f2c8f4e554bb Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 11 May 2026 22:19:06 -0400 Subject: [PATCH 1/4] Add retry in connecting manager in MultiProcessShared. --- .../apache_beam/utils/multi_process_shared.py | 15 +++++++-- .../utils/multi_process_shared_test.py | 31 ++++++++++++++++++- 2 files changed, 43 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/utils/multi_process_shared.py b/sdks/python/apache_beam/utils/multi_process_shared.py index b05fdd305a60..7004b481967f 100644 --- a/sdks/python/apache_beam/utils/multi_process_shared.py +++ b/sdks/python/apache_beam/utils/multi_process_shared.py @@ -38,6 +38,8 @@ import fasteners +from apache_beam.utils import retry + # In some python versions, there is a bug where AutoProxy doesn't handle # the kwarg 'manager_owned'. We implement our own backup here to make sure # we avoid this problem. More info here: @@ -391,10 +393,19 @@ def _get_manager(self): manager = _SingletonRegistrar( address=(host, int(port)), authkey=AUTH_KEY) multiprocessing.current_process().authkey = AUTH_KEY - try: + + @retry.with_exponential_backoff( + num_retries=5, + initial_delay_secs=0.1, + retry_filter=lambda exn: isinstance( + exn, (ConnectionError, EOFError))) + def connect_manager(): manager.connect() + + try: + connect_manager() self._manager = manager - except ConnectionError: + except (ConnectionError, EOFError): # The server is no longer good, assume it died. os.unlink(address_file) diff --git a/sdks/python/apache_beam/utils/multi_process_shared_test.py b/sdks/python/apache_beam/utils/multi_process_shared_test.py index 3c74903b8d99..2dc5d1fe5d16 100644 --- a/sdks/python/apache_beam/utils/multi_process_shared_test.py +++ b/sdks/python/apache_beam/utils/multi_process_shared_test.py @@ -22,6 +22,7 @@ import tempfile import threading import unittest +from unittest.mock import patch from typing import Any from apache_beam.utils import multi_process_shared @@ -293,7 +294,8 @@ def setUp(self): 'mix1', 'mix2', 'test_process_exit', - 'thundering_herd_test']: + 'thundering_herd_test', + 'transient_test']: for ext in ['', '.address', '.address.error']: try: os.remove(os.path.join(tempdir, tag + ext)) @@ -461,6 +463,33 @@ def test_zombie_reaping_on_acquire(self): except Exception: pass + def test_transient_connection_error_recovery(self): + shared1 = multi_process_shared.MultiProcessShared( + Counter, tag='transient_test', always_proxy=True, spawn_process=True) + shared2 = multi_process_shared.MultiProcessShared( + Counter, tag='transient_test', always_proxy=True, spawn_process=True) + + counter1 = shared1.acquire() + + orig_connect = multi_process_shared._SingletonRegistrar.connect + connect_calls = [0] + + def side_effect_connect(self_mgr, *args, **kwargs): + connect_calls[0] += 1 + if connect_calls[0] == 1: + raise ConnectionError("Simulated transient connection failure") + return orig_connect(self_mgr, *args, **kwargs) + + with patch.object( + multi_process_shared._SingletonRegistrar, + 'connect', + autospec=True, + side_effect=side_effect_connect): + counter2 = shared2.acquire() + + self.assertEqual(counter1.increment(), 1) + self.assertEqual(counter2.increment(), 2) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) From 7dca5ca5a2592deb1c1da8b98de8b83b2b42cdc4 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 11 May 2026 22:24:07 -0400 Subject: [PATCH 2/4] Reformat --- .../apache_beam/utils/multi_process_shared_test.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/utils/multi_process_shared_test.py b/sdks/python/apache_beam/utils/multi_process_shared_test.py index 2dc5d1fe5d16..d15da9471b59 100644 --- a/sdks/python/apache_beam/utils/multi_process_shared_test.py +++ b/sdks/python/apache_beam/utils/multi_process_shared_test.py @@ -480,11 +480,10 @@ def side_effect_connect(self_mgr, *args, **kwargs): raise ConnectionError("Simulated transient connection failure") return orig_connect(self_mgr, *args, **kwargs) - with patch.object( - multi_process_shared._SingletonRegistrar, - 'connect', - autospec=True, - side_effect=side_effect_connect): + with patch.object(multi_process_shared._SingletonRegistrar, + 'connect', + autospec=True, + side_effect=side_effect_connect): counter2 = shared2.acquire() self.assertEqual(counter1.increment(), 1) From b9d5c046eff0f74ddb50490437169cd298c51b73 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 11 May 2026 22:36:51 -0400 Subject: [PATCH 3/4] Fix lints --- sdks/python/apache_beam/utils/multi_process_shared_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/utils/multi_process_shared_test.py b/sdks/python/apache_beam/utils/multi_process_shared_test.py index d15da9471b59..18ed49c6fa17 100644 --- a/sdks/python/apache_beam/utils/multi_process_shared_test.py +++ b/sdks/python/apache_beam/utils/multi_process_shared_test.py @@ -22,8 +22,8 @@ import tempfile import threading import unittest -from unittest.mock import patch from typing import Any +from unittest.mock import patch from apache_beam.utils import multi_process_shared From 33fd6499a611dfc012665a6a5499f800edec5bc1 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 12 May 2026 10:25:55 -0400 Subject: [PATCH 4/4] Address comments. --- sdks/python/apache_beam/utils/multi_process_shared.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/utils/multi_process_shared.py b/sdks/python/apache_beam/utils/multi_process_shared.py index 7004b481967f..13a0f13e617b 100644 --- a/sdks/python/apache_beam/utils/multi_process_shared.py +++ b/sdks/python/apache_beam/utils/multi_process_shared.py @@ -394,18 +394,20 @@ def _get_manager(self): address=(host, int(port)), authkey=AUTH_KEY) multiprocessing.current_process().authkey = AUTH_KEY + retryable_exceptions = (ConnectionError, EOFError) + @retry.with_exponential_backoff( num_retries=5, initial_delay_secs=0.1, retry_filter=lambda exn: isinstance( - exn, (ConnectionError, EOFError))) + exn, retryable_exceptions)) def connect_manager(): manager.connect() try: connect_manager() self._manager = manager - except (ConnectionError, EOFError): + except retryable_exceptions: # The server is no longer good, assume it died. os.unlink(address_file)