From 932d4591146f24db4320f9e538727ae1b4fe243f Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 18 May 2026 09:34:52 -0400 Subject: [PATCH 1/3] Fix interactive environment clean up failure at atexit. --- .../runners/interactive/cache_manager.py | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager.py b/sdks/python/apache_beam/runners/interactive/cache_manager.py index 0dc79d4001aa..b59f85d9ad52 100644 --- a/sdks/python/apache_beam/runners/interactive/cache_manager.py +++ b/sdks/python/apache_beam/runners/interactive/cache_manager.py @@ -19,6 +19,7 @@ import base64 import collections +import logging import os import tempfile from urllib.parse import quote @@ -29,9 +30,13 @@ from apache_beam.io import filesystems from apache_beam.io import textio from apache_beam.io import tfrecordio +from apache_beam.io.gcp import gcsfilesystem +from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.testing import test_stream from apache_beam.transforms import combiners +_LOGGER = logging.getLogger(__name__) + class CacheManager(object): """Abstract class for caching PCollections. @@ -286,13 +291,14 @@ def raw_source(self, *labels): return self._reader_class(self._glob_path(*labels)) def cleanup(self): - if self._cache_dir.startswith('gs://'): - from apache_beam.io.gcp import gcsfilesystem - from apache_beam.options.pipeline_options import PipelineOptions - fs = gcsfilesystem.GCSFileSystem(PipelineOptions()) - fs.delete([self._cache_dir + '/full/']) - elif filesystems.FileSystems.exists(self._cache_dir): - filesystems.FileSystems.delete([self._cache_dir]) + try: + if self._cache_dir.startswith('gs://'): + fs = gcsfilesystem.GCSFileSystem(PipelineOptions()) + fs.delete([self._cache_dir + '/full/']) + elif filesystems.FileSystems.exists(self._cache_dir): + filesystems.FileSystems.delete([self._cache_dir]) + except Exception as e: + _LOGGER.warning('Failed to clean up cache directory %s: %s', self._cache_dir, e) self._saved_pcoders = {} def _glob_path(self, *labels): From fbe67fb31e5dacb148ae950aa3e11b7870460549 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 18 May 2026 14:03:36 -0400 Subject: [PATCH 2/3] Fix failed tests. --- sdks/python/apache_beam/runners/interactive/cache_manager.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager.py b/sdks/python/apache_beam/runners/interactive/cache_manager.py index b59f85d9ad52..3eb6ad11094e 100644 --- a/sdks/python/apache_beam/runners/interactive/cache_manager.py +++ b/sdks/python/apache_beam/runners/interactive/cache_manager.py @@ -30,7 +30,6 @@ from apache_beam.io import filesystems from apache_beam.io import textio from apache_beam.io import tfrecordio -from apache_beam.io.gcp import gcsfilesystem from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.testing import test_stream from apache_beam.transforms import combiners @@ -293,6 +292,8 @@ def raw_source(self, *labels): def cleanup(self): try: if self._cache_dir.startswith('gs://'): + # Import GCP dependencies only when needed. + from apache_beam.io.gcp import gcsfilesystem fs = gcsfilesystem.GCSFileSystem(PipelineOptions()) fs.delete([self._cache_dir + '/full/']) elif filesystems.FileSystems.exists(self._cache_dir): From bd8fa41b9e7f613caa76cf1b7fb9c062a2642cfd Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 18 May 2026 14:16:47 -0400 Subject: [PATCH 3/3] Formatting. --- sdks/python/apache_beam/runners/interactive/cache_manager.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager.py b/sdks/python/apache_beam/runners/interactive/cache_manager.py index 3eb6ad11094e..0b756a573698 100644 --- a/sdks/python/apache_beam/runners/interactive/cache_manager.py +++ b/sdks/python/apache_beam/runners/interactive/cache_manager.py @@ -299,7 +299,8 @@ def cleanup(self): elif filesystems.FileSystems.exists(self._cache_dir): filesystems.FileSystems.delete([self._cache_dir]) except Exception as e: - _LOGGER.warning('Failed to clean up cache directory %s: %s', self._cache_dir, e) + _LOGGER.warning( + 'Failed to clean up cache directory %s: %s', self._cache_dir, e) self._saved_pcoders = {} def _glob_path(self, *labels):