From f068b2135d09cebef6a50f828c8673207ba98fdc Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 11 May 2026 21:52:47 -0400 Subject: [PATCH 1/2] Make SubprocessServer shared cache purging idempotent --- .../apache_beam/utils/subprocess_server.py | 6 +++- .../utils/subprocess_server_test.py | 28 ++++++++++++++++--- 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/utils/subprocess_server.py b/sdks/python/apache_beam/utils/subprocess_server.py index 5752a49dde2e..fed5ee591bcd 100644 --- a/sdks/python/apache_beam/utils/subprocess_server.py +++ b/sdks/python/apache_beam/utils/subprocess_server.py @@ -91,7 +91,11 @@ def purge(self, owner): to_delete = [] with self._lock: if owner not in self._live_owners: - raise ValueError(f"{owner} not in {self._live_owners}") + _LOGGER.warning( + "Subprocess owner %s already purged. If this occurs during atexit " + "shutdown, the subprocess was already cleaned up earlier.", + owner) + return self._live_owners.remove(owner) for key, entry in list(self._cache.items()): if owner in entry.owners: diff --git a/sdks/python/apache_beam/utils/subprocess_server_test.py b/sdks/python/apache_beam/utils/subprocess_server_test.py index 0f25d9904f07..74489cca6b45 100644 --- a/sdks/python/apache_beam/utils/subprocess_server_test.py +++ b/sdks/python/apache_beam/utils/subprocess_server_test.py @@ -421,10 +421,30 @@ def purge_worker(): t1.join() t2.join() - # Exactly one thread should raise the expected ValueError because they are cleanly serialized - self.assertEqual(len(exceptions), 1) - self.assertIsInstance(exceptions[0], ValueError) - self.assertNotIsInstance(exceptions[0], KeyError) + # Both threads should succeed cleanly without raising an exception under idempotent purging. + self.assertEqual(len(exceptions), 0) + + def test_stop_process_after_cache_purged(self): + # Reproduce the ValueError when stop_process() (called by atexit) + # runs after the cache/owner was already purged during test teardown. + cache = subprocess_server._SharedCache( + lambda *args: "dummy_process", lambda obj: None) + + class DummySubprocessServer(subprocess_server.SubprocessServer): + _cache = cache + def __init__(self): + super().__init__(lambda channel: None, ["dummy_cmd"], port=12345) + + server = DummySubprocessServer() + server.start_process() + owner_id = server._owner_id + + # Simulate pipeline context exit or test teardown purging the cache directly + cache.purge(owner_id) + + # Calling stop_process() (which happens during atexit) should succeed cleanly + # without raising ValueError. + server.stop_process() if __name__ == '__main__': From cbf5a8af093bbcc58a2fb32e4f54b35fd9f7664d Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 11 May 2026 22:25:44 -0400 Subject: [PATCH 2/2] Reformat --- sdks/python/apache_beam/utils/subprocess_server_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/apache_beam/utils/subprocess_server_test.py b/sdks/python/apache_beam/utils/subprocess_server_test.py index 74489cca6b45..efd357c4c136 100644 --- a/sdks/python/apache_beam/utils/subprocess_server_test.py +++ b/sdks/python/apache_beam/utils/subprocess_server_test.py @@ -432,6 +432,7 @@ def test_stop_process_after_cache_purged(self): class DummySubprocessServer(subprocess_server.SubprocessServer): _cache = cache + def __init__(self): super().__init__(lambda channel: None, ["dummy_cmd"], port=12345)