From 30126954ced198d756a685e67bd6196fa8aa8d02 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Thu, 7 May 2026 12:00:26 +0300 Subject: [PATCH 1/7] fix(concurrent-cdk): enforce runtime cap on concurrent partition generators to prevent thread pool starvation deadlock When multiple stream completions arrive back-to-back in the queue, both on_partition_generation_completed and on_partition_complete_sentinel each call start_next_partition_generator independently, causing the number of concurrent generator tasks to exceed the initial_number_partitions_to_generate bound enforced only at construction time. When all worker slots are occupied by sleeping generate_partitions tasks no partition readers can run, leaving the main thread blocked on queue.get() indefinitely. The fix adds max_concurrent_partition_generators (Optional[int]) to ConcurrentReadProcessor. When set, start_next_partition_generator returns None immediately if the cap is already reached. Recovery is guaranteed: on_partition_generation_completed decrements _streams_currently_generating_partitions before calling start_next_partition_generator, so the guard always passes there. The default of None preserves existing behaviour for callers that do not use ConcurrentSource (including block_simultaneous_read tests). ConcurrentSource passes initial_number_partitions_to_generate explicitly, tying the runtime cap to the same value as the construction-time assertion. Root cause introduced in PR #870 (feat: Add block_simultaneous_read), which added a second call site to start_next_partition_generator inside on_partition_complete_sentinel without a corresponding runtime bound. Observed in production: source-google-ads 4.2.5, job 82903962, all workers blocked in partition_enqueuer.py:59 time.sleep(), main thread stuck on concurrent_source.py queue.get(). Co-Authored-By: Claude Sonnet 4.6 --- .../concurrent_read_processor.py | 16 ++++++ .../concurrent_source/concurrent_source.py | 1 + .../test_concurrent_read_processor.py | 49 +++++++++++++++++++ 3 files changed, 66 insertions(+) diff --git a/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py b/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py index a78905e72..aa112cfe4 100644 --- a/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py +++ b/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py @@ -45,6 +45,7 @@ def __init__( slice_logger: SliceLogger, message_repository: MessageRepository, partition_reader: PartitionReader, + max_concurrent_partition_generators: Optional[int] = None, ): """ This class is responsible for handling items from a concurrent stream read process. @@ -55,6 +56,10 @@ def __init__( :param slice_logger: SliceLogger instance :param message_repository: MessageRepository instance :param partition_reader: PartitionReader instance + :param max_concurrent_partition_generators: Maximum number of partition generators allowed + to run concurrently. None means no limit. When set, must be less than the number of + workers so at least one worker slot is always available for partition reading, + preventing thread pool starvation. ConcurrentSource passes this explicitly. """ self._stream_name_to_instance = {s.name: s for s in stream_instances_to_read_from} self._record_counter = {} @@ -64,6 +69,7 @@ def __init__( self._record_counter[stream.name] = 0 self._thread_pool_manager = thread_pool_manager self._partition_enqueuer = partition_enqueuer + self._max_concurrent_partition_generators = max_concurrent_partition_generators self._stream_instances_to_start_partition_generation = stream_instances_to_read_from self._streams_currently_generating_partitions: List[str] = [] self._logger = logger @@ -255,6 +261,16 @@ def start_next_partition_generator(self) -> Optional[AirbyteMessage]: if not self._stream_instances_to_start_partition_generation: return None + # Enforce the concurrent generator cap so at least one worker slot is always available + # for partition reading. Recovery is guaranteed: on_partition_generation_completed + # decrements the count before calling here, so the guard always passes there. + if ( + self._max_concurrent_partition_generators is not None + and len(self._streams_currently_generating_partitions) + >= self._max_concurrent_partition_generators + ): + return None + # Remember initial queue size to avoid infinite loops if all streams are blocked max_attempts = len(self._stream_instances_to_start_partition_generation) attempts = 0 diff --git a/airbyte_cdk/sources/concurrent_source/concurrent_source.py b/airbyte_cdk/sources/concurrent_source/concurrent_source.py index de2d93523..43eb78ff6 100644 --- a/airbyte_cdk/sources/concurrent_source/concurrent_source.py +++ b/airbyte_cdk/sources/concurrent_source/concurrent_source.py @@ -117,6 +117,7 @@ def read( self._queue, PartitionLogger(self._slice_logger, self._logger, self._message_repository), ), + max_concurrent_partition_generators=self._initial_number_partitions_to_generate, ) # Enqueue initial partition generation tasks diff --git a/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py b/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py index 910111a05..877b35416 100644 --- a/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py +++ b/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py @@ -851,6 +851,55 @@ def test_start_next_partition_generator(self): self._partition_enqueuer.generate_partitions, self._stream ) + def test_start_next_partition_generator_respects_concurrent_limit(self): + stream_instances_to_read_from = [self._stream] + handler = ConcurrentReadProcessor( + stream_instances_to_read_from, + self._partition_enqueuer, + self._thread_pool_manager, + self._logger, + self._slice_logger, + self._message_repository, + self._partition_reader, + max_concurrent_partition_generators=1, + ) + handler._streams_currently_generating_partitions.append(_STREAM_NAME) + + status_message = handler.start_next_partition_generator() + + assert status_message is None + assert handler._stream_instances_to_start_partition_generation == stream_instances_to_read_from + self._thread_pool_manager.submit.assert_not_called() + + def test_start_next_partition_generator_starts_when_below_limit(self): + other_stream = Mock(spec=AbstractStream) + other_stream.name = "other_stream" + other_stream.block_simultaneous_read = "" + other_stream.as_airbyte_stream.return_value = AirbyteStream( + name="other_stream", + json_schema={}, + supported_sync_modes=[SyncMode.full_refresh], + ) + handler = ConcurrentReadProcessor( + [other_stream], + self._partition_enqueuer, + self._thread_pool_manager, + self._logger, + self._slice_logger, + self._message_repository, + self._partition_reader, + max_concurrent_partition_generators=2, + ) + handler._streams_currently_generating_partitions.append(_STREAM_NAME) + + status_message = handler.start_next_partition_generator() + + assert status_message is not None + assert "other_stream" in handler._streams_currently_generating_partitions + self._thread_pool_manager.submit.assert_called_with( + self._partition_enqueuer.generate_partitions, other_stream + ) + class TestBlockSimultaneousRead(unittest.TestCase): """Tests for block_simultaneous_read functionality""" From 3ce47381cf8491c4cee4ffe3355a25331934ad11 Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Thu, 7 May 2026 09:07:41 +0000 Subject: [PATCH 2/7] Auto-fix lint and format issues --- .../streams/concurrent/test_concurrent_read_processor.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py b/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py index 877b35416..dd39b1c0a 100644 --- a/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py +++ b/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py @@ -868,7 +868,9 @@ def test_start_next_partition_generator_respects_concurrent_limit(self): status_message = handler.start_next_partition_generator() assert status_message is None - assert handler._stream_instances_to_start_partition_generation == stream_instances_to_read_from + assert ( + handler._stream_instances_to_start_partition_generation == stream_instances_to_read_from + ) self._thread_pool_manager.submit.assert_not_called() def test_start_next_partition_generator_starts_when_below_limit(self): From 5a1ba16f01a23aa8fb65155419199c5223866980 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Thu, 7 May 2026 12:13:33 +0300 Subject: [PATCH 3/7] fixup: validate max_concurrent_partition_generators and add debug log on cap guard Addresses reviewer feedback: - Raise ValueError when max_concurrent_partition_generators is not None and < 1, preventing silent hangs from zero/negative values - Log at DEBUG when the cap guard fires, so operators can observe deferral Co-Authored-By: Claude Sonnet 4.6 --- .../concurrent_source/concurrent_read_processor.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py b/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py index aa112cfe4..ae2f1a13d 100644 --- a/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py +++ b/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py @@ -67,6 +67,10 @@ def __init__( for stream in stream_instances_to_read_from: self._streams_to_running_partitions[stream.name] = set() self._record_counter[stream.name] = 0 + if max_concurrent_partition_generators is not None and max_concurrent_partition_generators < 1: + raise ValueError( + f"max_concurrent_partition_generators must be >= 1 or None, got {max_concurrent_partition_generators}" + ) self._thread_pool_manager = thread_pool_manager self._partition_enqueuer = partition_enqueuer self._max_concurrent_partition_generators = max_concurrent_partition_generators @@ -269,6 +273,10 @@ def start_next_partition_generator(self) -> Optional[AirbyteMessage]: and len(self._streams_currently_generating_partitions) >= self._max_concurrent_partition_generators ): + self._logger.debug( + f"Concurrent partition generator cap ({self._max_concurrent_partition_generators}) reached " + f"({len(self._streams_currently_generating_partitions)} active). Deferring next generator start." + ) return None # Remember initial queue size to avoid infinite loops if all streams are blocked From e8fa4360283faf67c722c94c5d8b7c5d64a9a773 Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Thu, 7 May 2026 09:15:21 +0000 Subject: [PATCH 4/7] Auto-fix lint and format issues --- .../sources/concurrent_source/concurrent_read_processor.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py b/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py index ae2f1a13d..489e7f752 100644 --- a/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py +++ b/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py @@ -67,7 +67,10 @@ def __init__( for stream in stream_instances_to_read_from: self._streams_to_running_partitions[stream.name] = set() self._record_counter[stream.name] = 0 - if max_concurrent_partition_generators is not None and max_concurrent_partition_generators < 1: + if ( + max_concurrent_partition_generators is not None + and max_concurrent_partition_generators < 1 + ): raise ValueError( f"max_concurrent_partition_generators must be >= 1 or None, got {max_concurrent_partition_generators}" ) From ab17e2ceb41f2ce17db1146363f843235fe06c5a Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Thu, 7 May 2026 12:35:27 +0300 Subject: [PATCH 5/7] fixup: clarify max_concurrent_partition_generators docstring for single-threaded mode The previous wording said the value "must be less than the number of workers" which is incorrect for the single-threaded case (num_workers=1, initial_number_of_partitions_to_generate=1) that ConcurrentSource.create() explicitly allows. Reword to "should be less than the number of workers in multi-worker mode" and note that ConcurrentSource.create() handles the distinction. Co-Authored-By: Claude Sonnet 4.6 --- .../concurrent_source/concurrent_read_processor.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py b/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py index 489e7f752..fa6a568bc 100644 --- a/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py +++ b/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py @@ -57,9 +57,11 @@ def __init__( :param message_repository: MessageRepository instance :param partition_reader: PartitionReader instance :param max_concurrent_partition_generators: Maximum number of partition generators allowed - to run concurrently. None means no limit. When set, must be less than the number of - workers so at least one worker slot is always available for partition reading, - preventing thread pool starvation. ConcurrentSource passes this explicitly. + to run concurrently. None means no limit. When set, should be less than the number of + workers in multi-worker mode so at least one worker slot is always available for + partition reading, preventing thread pool starvation. In single-threaded mode + (num_workers=1) the value may equal num_workers; ConcurrentSource.create() handles + this distinction. ConcurrentSource.read() passes this value explicitly. """ self._stream_name_to_instance = {s.name: s for s in stream_instances_to_read_from} self._record_counter = {} From 4c69e5454a5101d6c54ab3e2593cbf1a7a99cc01 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Thu, 7 May 2026 12:51:11 +0300 Subject: [PATCH 6/7] fixup: validate initial_number_of_partitions_to_generate >= 1 in ConcurrentSource.create() Without this check, passing 0 bypasses the existing too_many_generator assertion (0 < num_workers is always true), then propagates into ConcurrentReadProcessor as max_concurrent_partition_generators=0 and raises ValueError at read() time instead of at construction time. Fail fast at create() so the error is raised consistently. Co-Authored-By: Claude Sonnet 4.6 --- airbyte_cdk/sources/concurrent_source/concurrent_source.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/airbyte_cdk/sources/concurrent_source/concurrent_source.py b/airbyte_cdk/sources/concurrent_source/concurrent_source.py index 43eb78ff6..474780bcc 100644 --- a/airbyte_cdk/sources/concurrent_source/concurrent_source.py +++ b/airbyte_cdk/sources/concurrent_source/concurrent_source.py @@ -47,6 +47,10 @@ def create( queue: Optional[Queue[QueueItem]] = None, timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS, ) -> "ConcurrentSource": + if initial_number_of_partitions_to_generate < 1: + raise ValueError( + f"initial_number_of_partitions_to_generate must be >= 1, got {initial_number_of_partitions_to_generate}" + ) is_single_threaded = initial_number_of_partitions_to_generate == 1 and num_workers == 1 too_many_generator = ( not is_single_threaded and initial_number_of_partitions_to_generate >= num_workers From 5990d452f689969e1208d716129a6f7149aab89e Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Thu, 7 May 2026 13:21:12 +0300 Subject: [PATCH 7/7] test: cover ValueError for invalid max_concurrent_partition_generators values Co-Authored-By: Claude Sonnet 4.6 --- .../concurrent/test_concurrent_read_processor.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py b/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py index dd39b1c0a..856bd1156 100644 --- a/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py +++ b/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py @@ -851,6 +851,20 @@ def test_start_next_partition_generator(self): self._partition_enqueuer.generate_partitions, self._stream ) + def test_invalid_max_concurrent_partition_generators_raises(self): + for invalid in (0, -1): + with self.assertRaises(ValueError): + ConcurrentReadProcessor( + [self._stream], + self._partition_enqueuer, + self._thread_pool_manager, + self._logger, + self._slice_logger, + self._message_repository, + self._partition_reader, + max_concurrent_partition_generators=invalid, + ) + def test_start_next_partition_generator_respects_concurrent_limit(self): stream_instances_to_read_from = [self._stream] handler = ConcurrentReadProcessor(