diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager.py b/sdks/python/apache_beam/runners/interactive/cache_manager.py index 0dc79d4001aa..0b756a573698 100644 --- a/sdks/python/apache_beam/runners/interactive/cache_manager.py +++ b/sdks/python/apache_beam/runners/interactive/cache_manager.py @@ -19,6 +19,7 @@ import base64 import collections +import logging import os import tempfile from urllib.parse import quote @@ -29,9 +30,12 @@ from apache_beam.io import filesystems from apache_beam.io import textio from apache_beam.io import tfrecordio +from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.testing import test_stream from apache_beam.transforms import combiners +_LOGGER = logging.getLogger(__name__) + class CacheManager(object): """Abstract class for caching PCollections. @@ -286,13 +290,17 @@ def raw_source(self, *labels): return self._reader_class(self._glob_path(*labels)) def cleanup(self): - if self._cache_dir.startswith('gs://'): - from apache_beam.io.gcp import gcsfilesystem - from apache_beam.options.pipeline_options import PipelineOptions - fs = gcsfilesystem.GCSFileSystem(PipelineOptions()) - fs.delete([self._cache_dir + '/full/']) - elif filesystems.FileSystems.exists(self._cache_dir): - filesystems.FileSystems.delete([self._cache_dir]) + try: + if self._cache_dir.startswith('gs://'): + # Import GCP dependencies only when needed. + from apache_beam.io.gcp import gcsfilesystem + fs = gcsfilesystem.GCSFileSystem(PipelineOptions()) + fs.delete([self._cache_dir + '/full/']) + elif filesystems.FileSystems.exists(self._cache_dir): + filesystems.FileSystems.delete([self._cache_dir]) + except Exception as e: + _LOGGER.warning( + 'Failed to clean up cache directory %s: %s', self._cache_dir, e) self._saved_pcoders = {} def _glob_path(self, *labels):