diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner.py b/sdks/python/apache_beam/runners/interactive/interactive_runner.py index 241dcf388dd0..737a06c49b3b 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_runner.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_runner.py @@ -93,6 +93,12 @@ def is_fnapi_compatible(self): # return self._underlying_runner.is_fnapi_compatible() return False + def default_pickle_library_override(self): + """Delegates pickler override to the underlying runner.""" + if hasattr(self._underlying_runner, 'default_pickle_library_override'): + return self._underlying_runner.default_pickle_library_override() + return super().default_pickle_library_override() + def set_render_option(self, render_option): """Sets the rendering option. diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py index ed27d9e55e06..dc30155d5997 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py @@ -43,6 +43,7 @@ from apache_beam.runners.interactive.dataproc.types import ClusterMetadata from apache_beam.runners.interactive.testing.mock_env import isolated_env from apache_beam.runners.portability.flink_runner import FlinkRunner +from apache_beam.runners.runner import PipelineRunner from apache_beam.testing.test_stream import TestStream from apache_beam.transforms.window import GlobalWindow from apache_beam.transforms.window import IntervalWindow @@ -532,6 +533,25 @@ def test_defaults_to_efficient_cache(self): # Despite (highly redundant) windowing information, the cache is small. self.assertLess(size, sum(inputs)) + def test_default_pickle_library_override_delegates(self): + mock_underlying = unittest.mock.MagicMock(spec=PipelineRunner) + mock_underlying.default_pickle_library_override.return_value = 'cloudpickle' + + runner = interactive_runner.InteractiveRunner( + underlying_runner=mock_underlying) + + self.assertEqual(runner.default_pickle_library_override(), 'cloudpickle') + + def test_default_pickle_library_override_fallback(self): + mock_underlying = unittest.mock.MagicMock(spec=PipelineRunner) + del mock_underlying.default_pickle_library_override + + runner = interactive_runner.InteractiveRunner( + underlying_runner=mock_underlying) + + # Should fallback to the base class implementation without crashing + self.assertIsNone(runner.default_pickle_library_override()) + @unittest.skipIf( not ie.current_env().is_interactive_ready,