From ff0f0ab21bd607d48db34be754522ce7b7295a6b Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 20 May 2026 21:56:58 -0400 Subject: [PATCH 1/4] Fix silent test hang in subprocess expansion service on port bind failure --- .../runners/portability/expansion_service_main.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/expansion_service_main.py b/sdks/python/apache_beam/runners/portability/expansion_service_main.py index 269d02b3efbd..ac2ba71f6775 100644 --- a/sdks/python/apache_beam/runners/portability/expansion_service_main.py +++ b/sdks/python/apache_beam/runners/portability/expansion_service_main.py @@ -55,7 +55,9 @@ def main(argv): with fully_qualified_named_transform.FullyQualifiedNamedTransform.with_filter( known_args.fully_qualified_name_glob): - address = '0.0.0.0:{}'.format(known_args.port) + # Bind to localhost instead of 0.0.0.0 to ensure compatibility with loopback + # connections on dual-stack (IPv4/IPv6) systems. + address = 'localhost:{}'.format(known_args.port) server = grpc.server(thread_pool_executor.shared_unbounded_instance()) if known_args.serve_loopback_worker: beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server( @@ -71,9 +73,14 @@ def main(argv): artifact_service.ArtifactRetrievalService( artifact_service.BeamFilesystemHandler(None).file_reader), server) - server.add_insecure_port(address) + # Ensure gRPC server successfully binds. If this fails (e.g., due to port collision), + # add_insecure_port returns 0. We raise an error to crash the subprocess immediately, + # allowing the parent process to detect it and fail fast rather than hanging. + bound_port = server.add_insecure_port(address) + if not bound_port: + raise RuntimeError("Failed to bind expansion service to {}".format(address)) server.start() - _LOGGER.info('Listening for expansion requests at %d', known_args.port) + _LOGGER.info('Listening for expansion requests at %d', bound_port) def cleanup(unused_signum, unused_frame): _LOGGER.info('Shutting down expansion service.') From eb62fce917075b75455e611563486b3def4854df Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 20 May 2026 22:28:57 -0400 Subject: [PATCH 2/4] Formatting --- .../apache_beam/runners/portability/expansion_service_main.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/portability/expansion_service_main.py b/sdks/python/apache_beam/runners/portability/expansion_service_main.py index ac2ba71f6775..f2d03e0e898c 100644 --- a/sdks/python/apache_beam/runners/portability/expansion_service_main.py +++ b/sdks/python/apache_beam/runners/portability/expansion_service_main.py @@ -78,7 +78,8 @@ def main(argv): # allowing the parent process to detect it and fail fast rather than hanging. bound_port = server.add_insecure_port(address) if not bound_port: - raise RuntimeError("Failed to bind expansion service to {}".format(address)) + raise RuntimeError( + "Failed to bind expansion service to {}".format(address)) server.start() _LOGGER.info('Listening for expansion requests at %d', bound_port) From f6252eff0a15b282e0ea71c837f029ca52f000cb Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 20 May 2026 22:37:08 -0400 Subject: [PATCH 3/4] Add retry when starting subprocess server. --- .../apache_beam/utils/subprocess_server.py | 85 ++++++++++--------- 1 file changed, 46 insertions(+), 39 deletions(-) diff --git a/sdks/python/apache_beam/utils/subprocess_server.py b/sdks/python/apache_beam/utils/subprocess_server.py index d21cb486b8f4..462047c730ea 100644 --- a/sdks/python/apache_beam/utils/subprocess_server.py +++ b/sdks/python/apache_beam/utils/subprocess_server.py @@ -186,45 +186,52 @@ def __exit__(self, *unused_args): self.stop() def start(self): - try: - process, endpoint = self.start_process() - wait_secs = .1 - channel_options = [ - ("grpc.max_receive_message_length", -1), - ("grpc.max_send_message_length", -1), - # Default: 20000ms (20s), increased to 10 minutes for stability - ("grpc.keepalive_timeout_ms", 600_000), - # Default: 2, set to 0 to allow unlimited pings without data - ("grpc.http2.max_pings_without_data", 0), - # Default: False, set to True to allow keepalive pings when no calls - ("grpc.keepalive_permit_without_calls", True), - # Default: 2, set to 0 to allow unlimited ping strikes - ("grpc.http2.max_ping_strikes", 0), - # Default: 0 (disabled), enable socket reuse for better handling - ("grpc.so_reuseport", 1), - ] - self._grpc_channel = grpc.insecure_channel( - endpoint, options=channel_options) - channel_ready = grpc.channel_ready_future(self._grpc_channel) - while True: - if process is not None and process.poll() is not None: - _LOGGER.error("Started job service with %s", process.args) - raise RuntimeError( - 'Service failed to start up with error %s' % process.poll()) - try: - channel_ready.result(timeout=wait_secs) - break - except (grpc.FutureTimeoutError, grpc.RpcError): - wait_secs *= 1.2 - logging.log( - logging.WARNING if wait_secs > 1 else logging.DEBUG, - 'Waiting for grpc channel to be ready at %s.', - endpoint) - return self._stub_class(self._grpc_channel) - except: # pylint: disable=bare-except - _LOGGER.exception("Error bringing up service") - self.stop() - raise + max_attempts = 3 + for attempt in range(max_attempts): + try: + process, endpoint = self.start_process() + wait_secs = .1 + channel_options = [ + ("grpc.max_receive_message_length", -1), + ("grpc.max_send_message_length", -1), + # Default: 20000ms (20s), increased to 10 minutes for stability + ("grpc.keepalive_timeout_ms", 600_000), + # Default: 2, set to 0 to allow unlimited pings without data + ("grpc.http2.max_pings_without_data", 0), + # Default: False, set to True to allow keepalive pings when no calls + ("grpc.keepalive_permit_without_calls", True), + # Default: 2, set to 0 to allow unlimited ping strikes + ("grpc.http2.max_ping_strikes", 0), + # Default: 0 (disabled), enable socket reuse for better handling + ("grpc.so_reuseport", 1), + ] + self._grpc_channel = grpc.insecure_channel( + endpoint, options=channel_options) + channel_ready = grpc.channel_ready_future(self._grpc_channel) + while True: + if process is not None and process.poll() is not None: + _LOGGER.error("Started job service with %s", process.args) + raise RuntimeError( + 'Service failed to start up with error %s' % process.poll()) + try: + channel_ready.result(timeout=wait_secs) + break + except (grpc.FutureTimeoutError, grpc.RpcError): + wait_secs *= 1.2 + logging.log( + logging.WARNING if wait_secs > 1 else logging.DEBUG, + 'Waiting for grpc channel to be ready at %s.', + endpoint) + return self._stub_class(self._grpc_channel) + except Exception as e: + _LOGGER.warning( + "Error bringing up service on attempt %d: %s", + attempt + 1, + e, + exc_info=True) + self.stop() + if attempt == max_attempts - 1: + raise def start_process(self): if self._owner_id is not None: From 002d50bb07f0abf87fa05a8e8abf3ee10dee15d7 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 20 May 2026 22:47:07 -0400 Subject: [PATCH 4/4] Add sleep before retrying. --- sdks/python/apache_beam/utils/subprocess_server.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/apache_beam/utils/subprocess_server.py b/sdks/python/apache_beam/utils/subprocess_server.py index 462047c730ea..b22e6badb5e7 100644 --- a/sdks/python/apache_beam/utils/subprocess_server.py +++ b/sdks/python/apache_beam/utils/subprocess_server.py @@ -232,6 +232,7 @@ def start(self): self.stop() if attempt == max_attempts - 1: raise + time.sleep(1) def start_process(self): if self._owner_id is not None: