diff --git a/sdks/python/apache_beam/utils/subprocess_server.py b/sdks/python/apache_beam/utils/subprocess_server.py index 5752a49dde2e..fed5ee591bcd 100644 --- a/sdks/python/apache_beam/utils/subprocess_server.py +++ b/sdks/python/apache_beam/utils/subprocess_server.py @@ -91,7 +91,11 @@ def purge(self, owner): to_delete = [] with self._lock: if owner not in self._live_owners: - raise ValueError(f"{owner} not in {self._live_owners}") + _LOGGER.warning( + "Subprocess owner %s already purged. If this occurs during atexit " + "shutdown, the subprocess was already cleaned up earlier.", + owner) + return self._live_owners.remove(owner) for key, entry in list(self._cache.items()): if owner in entry.owners: diff --git a/sdks/python/apache_beam/utils/subprocess_server_test.py b/sdks/python/apache_beam/utils/subprocess_server_test.py index 0f25d9904f07..efd357c4c136 100644 --- a/sdks/python/apache_beam/utils/subprocess_server_test.py +++ b/sdks/python/apache_beam/utils/subprocess_server_test.py @@ -421,10 +421,31 @@ def purge_worker(): t1.join() t2.join() - # Exactly one thread should raise the expected ValueError because they are cleanly serialized - self.assertEqual(len(exceptions), 1) - self.assertIsInstance(exceptions[0], ValueError) - self.assertNotIsInstance(exceptions[0], KeyError) + # Both threads should succeed cleanly without raising an exception under idempotent purging. + self.assertEqual(len(exceptions), 0) + + def test_stop_process_after_cache_purged(self): + # Reproduce the ValueError when stop_process() (called by atexit) + # runs after the cache/owner was already purged during test teardown. + cache = subprocess_server._SharedCache( + lambda *args: "dummy_process", lambda obj: None) + + class DummySubprocessServer(subprocess_server.SubprocessServer): + _cache = cache + + def __init__(self): + super().__init__(lambda channel: None, ["dummy_cmd"], port=12345) + + server = DummySubprocessServer() + server.start_process() + owner_id = server._owner_id + + # Simulate pipeline context exit or test teardown purging the cache directly + cache.purge(owner_id) + + # Calling stop_process() (which happens during atexit) should succeed cleanly + # without raising ValueError. + server.stop_process() if __name__ == '__main__':