diff --git a/sdks/python/apache_beam/utils/multi_process_shared_test.py b/sdks/python/apache_beam/utils/multi_process_shared_test.py index 18ed49c6fa17..c597e459e1a1 100644 --- a/sdks/python/apache_beam/utils/multi_process_shared_test.py +++ b/sdks/python/apache_beam/utils/multi_process_shared_test.py @@ -25,9 +25,29 @@ from typing import Any from unittest.mock import patch +import pytest + from apache_beam.utils import multi_process_shared +@pytest.fixture(autouse=True) +def isolate_multi_process_shared_tests(tmp_path, monkeypatch): + """Isolates MultiProcessShared tests by using a unique temp directory per test. + + This prevents tests running in parallel (e.g. with pytest-xdist) from + interfering with each other by writing to the same shared default temp directory. + """ + orig_init = multi_process_shared.MultiProcessShared.__init__ + + def mock_init(self, constructor, tag, *args, **kwargs): + if 'path' not in kwargs: + kwargs['path'] = str(tmp_path) + return orig_init(self, constructor, tag, *args, **kwargs) + + monkeypatch.setattr( + multi_process_shared.MultiProcessShared, '__init__', mock_init) + + class CallableCounter(object): def __init__(self, start=0): self.running = start @@ -285,23 +305,6 @@ def test_release_always_proxy(self): class MultiProcessSharedSpawnProcessTest(unittest.TestCase): - def setUp(self): - tempdir = tempfile.gettempdir() - for tag in ['basic', - 'main', - 'to_delete', - 'to_keep', - 'mix1', - 'mix2', - 'test_process_exit', - 'thundering_herd_test', - 'transient_test']: - for ext in ['', '.address', '.address.error']: - try: - os.remove(os.path.join(tempdir, tag + ext)) - except OSError: - pass - def tearDown(self): for p in multiprocessing.active_children(): if p.is_alive():