From 4d4e6e28477b4c91dd55350feec02f67dc5ceb9d Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sun, 26 Apr 2026 06:08:31 +0000 Subject: [PATCH 01/11] feat: add partitioned_stream_status tracking for progress estimation Adds partition lifecycle tracking to ConcurrentPerPartitionCursor state: - num_partitions_started: count of partitions that have begun processing - num_partitions_completed: count of partitions fully processed and cleaned up - num_partitions_expected: total partitions discovered (grows until discovery complete) - is_partition_discovery_complete: flag indicating parent stream finished reading These fields enable cursor-based % completion estimation for partitioned streams by providing completed/expected ratios in emitted state messages. --- .../concurrent_partition_cursor.py | 15 +++++ .../test_concurrent_perpartitioncursor.py | 66 +++++++++++++++---- 2 files changed, 70 insertions(+), 11 deletions(-) diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index d1f2ca41e..2d0769adc 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -185,6 +185,11 @@ def __init__( self._last_emission_time: float = 0.0 self._timer = Timer() + # Partitioned stream status tracking for progress estimation + self._num_partitions_started: int = 0 + self._num_partitions_completed: int = 0 + self._is_partition_discovery_complete: bool = False + self._set_initial_state(stream_state) # FIXME this is a temporary field the time of the migration from declarative cursors to concurrent ones @@ -217,6 +222,12 @@ def state(self) -> MutableMapping[str, Any]: state["lookback_window"] = self._lookback_window if self._parent_state is not None: state["parent_state"] = self._parent_state + state["partitioned_stream_status"] = { + "num_partitions_started": self._num_partitions_started, + "num_partitions_completed": self._num_partitions_completed, + "num_partitions_expected": self._generated_partitions_count, + "is_partition_discovery_complete": self._is_partition_discovery_complete, + } return state def close_partition(self, partition: Partition) -> None: @@ -322,6 +333,8 @@ def stream_slices(self) -> Iterable[StreamSlice]: slices, self._partition_router.get_stream_state ): yield from self._generate_slices_from_partition(partition, parent_state) + with self._lock: + self._is_partition_discovery_complete = True def _generate_slices_from_partition( self, partition: StreamSlice, parent_state: Mapping[str, Any] @@ -352,6 +365,7 @@ def _generate_slices_from_partition( with self._lock: seq = self._generated_partitions_count self._generated_partitions_count += 1 + self._num_partitions_started += 1 self._processing_partitions_indexes.append(seq) self._partition_key_to_index[partition_key] = seq @@ -566,6 +580,7 @@ def _cleanup_if_done(self, partition_key: str) -> None: seq = self._partition_key_to_index.pop(partition_key) self._processing_partitions_indexes.remove(seq) + self._num_partitions_completed += 1 logger.debug(f"Partition {partition_key} fully processed and cleaned up.") diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py index 28b9b8460..8037599bb 100644 --- a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -328,6 +328,16 @@ import requests_mock +def _strip_partitioned_stream_status(state_dict: dict) -> dict: + """Recursively strip partitioned_stream_status from state dicts, including nested parent_state.""" + state_dict.pop("partitioned_stream_status", None) + if "parent_state" in state_dict and isinstance(state_dict["parent_state"], dict): + for key, value in state_dict["parent_state"].items(): + if isinstance(value, dict): + _strip_partitioned_stream_status(value) + return state_dict + + def run_mocked_test( mock_requests, manifest, @@ -380,7 +390,18 @@ def run_mocked_test( # Verify state final_state = output.state_messages[-1].state.stream.stream_state - assert final_state.__dict__ == expected_state + final_state_dict = final_state.__dict__ + # Validate partitioned_stream_status exists and has correct shape, then remove for comparison + partitioned_status = final_state_dict.get("partitioned_stream_status") + if partitioned_status is not None: + assert "num_partitions_started" in partitioned_status + assert "num_partitions_completed" in partitioned_status + assert "num_partitions_expected" in partitioned_status + assert "is_partition_discovery_complete" in partitioned_status + assert partitioned_status["num_partitions_started"] >= partitioned_status["num_partitions_completed"] + assert partitioned_status["num_partitions_expected"] >= partitioned_status["num_partitions_started"] + _strip_partitioned_stream_status(final_state_dict) + assert final_state_dict == expected_state # Verify that each request was made exactly once for url, _ in mock_requests: @@ -1107,7 +1128,9 @@ def run_incremental_parent_state_test( final_states = [] # To store the final state after each read # Store the final state after the initial read - final_states.append(output.state_messages[-1].state.stream.stream_state.__dict__) + initial_final_state = output.state_messages[-1].state.stream.stream_state.__dict__ + _strip_partitioned_stream_status(initial_final_state) + final_states.append(initial_final_state) for message in output.records_and_state_messages: if message.type.value == "RECORD": @@ -1122,10 +1145,11 @@ def run_incremental_parent_state_test( # Assert that the number of intermediate states is as expected assert len(intermediate_states) - 1 == num_intermediate_states # Assert that ensure_at_least_one_state_emitted is called before yielding the last record from the last slice - assert ( - intermediate_states[-1][0].stream.stream_state.__dict__["parent_state"] - == intermediate_states[-2][0].stream.stream_state.__dict__["parent_state"] - ) + last_state_dict = intermediate_states[-1][0].stream.stream_state.__dict__.copy() + _strip_partitioned_stream_status(last_state_dict) + prev_state_dict = intermediate_states[-2][0].stream.stream_state.__dict__.copy() + _strip_partitioned_stream_status(prev_state_dict) + assert last_state_dict["parent_state"] == prev_state_dict["parent_state"] # For each intermediate state, perform another read starting from that state for state, records_before_state in intermediate_states[:-1]: @@ -1151,10 +1175,11 @@ def run_incremental_parent_state_test( ) # Store the final state after each intermediate read - final_state_intermediate = [ - message.state.stream.stream_state.__dict__ - for message in output_intermediate.state_messages - ] + final_state_intermediate = [] + for message in output_intermediate.state_messages: + state_dict = message.state.stream.stream_state.__dict__.copy() + _strip_partitioned_stream_status(state_dict) + final_state_intermediate.append(state_dict) final_states.append(final_state_intermediate[-1]) # Assert that the final state matches the expected state for all runs @@ -3654,7 +3679,14 @@ def test_given_no_partitions_processed_when_close_partition_then_no_state_update ) ) - assert cursor.state == { + state = cursor.state + partitioned_status = state.pop("partitioned_stream_status", None) + assert partitioned_status is not None + assert partitioned_status["num_partitions_started"] == 0 + assert partitioned_status["num_partitions_completed"] == 0 + assert partitioned_status["num_partitions_expected"] == 0 + assert partitioned_status["is_partition_discovery_complete"] is True + assert state == { "use_global_cursor": False, "lookback_window": 0, "states": [], @@ -3742,6 +3774,12 @@ def test_given_unfinished_first_parent_partition_no_parent_state_update(): cursor.ensure_at_least_one_state_emitted() state = cursor.state + partitioned_status = state.pop("partitioned_stream_status", None) + assert partitioned_status is not None + assert partitioned_status["num_partitions_started"] == 2 + assert partitioned_status["num_partitions_completed"] == 1 + assert partitioned_status["num_partitions_expected"] == 2 + assert partitioned_status["is_partition_discovery_complete"] is True assert state == { "use_global_cursor": False, "states": [ @@ -3838,6 +3876,12 @@ def test_given_unfinished_last_parent_partition_with_partial_parent_state_update cursor.ensure_at_least_one_state_emitted() state = cursor.state + partitioned_status = state.pop("partitioned_stream_status", None) + assert partitioned_status is not None + assert partitioned_status["num_partitions_started"] == 2 + assert partitioned_status["num_partitions_completed"] == 1 + assert partitioned_status["num_partitions_expected"] == 2 + assert partitioned_status["is_partition_discovery_complete"] is True assert state == { "use_global_cursor": False, "states": [ From ffc22c2202f925cf30e613eaaacb3963be22477f Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sun, 26 Apr 2026 06:10:24 +0000 Subject: [PATCH 02/11] style: fix ruff formatting in test file --- .../incremental/test_concurrent_perpartitioncursor.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py index 8037599bb..f0a1d5591 100644 --- a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -398,8 +398,14 @@ def run_mocked_test( assert "num_partitions_completed" in partitioned_status assert "num_partitions_expected" in partitioned_status assert "is_partition_discovery_complete" in partitioned_status - assert partitioned_status["num_partitions_started"] >= partitioned_status["num_partitions_completed"] - assert partitioned_status["num_partitions_expected"] >= partitioned_status["num_partitions_started"] + assert ( + partitioned_status["num_partitions_started"] + >= partitioned_status["num_partitions_completed"] + ) + assert ( + partitioned_status["num_partitions_expected"] + >= partitioned_status["num_partitions_started"] + ) _strip_partitioned_stream_status(final_state_dict) assert final_state_dict == expected_state From 134a78f3c6f21da73534f9670b565ac015f2a424 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sun, 26 Apr 2026 06:22:59 +0000 Subject: [PATCH 03/11] fix: update test assertions to include partitioned_stream_status field --- .../test_concurrent_perpartitioncursor.py | 26 +++++++++---------- .../test_per_partition_cursor_integration.py | 18 +++++++++++++ 2 files changed, 31 insertions(+), 13 deletions(-) diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py index f0a1d5591..8be450e35 100644 --- a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -393,19 +393,19 @@ def run_mocked_test( final_state_dict = final_state.__dict__ # Validate partitioned_stream_status exists and has correct shape, then remove for comparison partitioned_status = final_state_dict.get("partitioned_stream_status") - if partitioned_status is not None: - assert "num_partitions_started" in partitioned_status - assert "num_partitions_completed" in partitioned_status - assert "num_partitions_expected" in partitioned_status - assert "is_partition_discovery_complete" in partitioned_status - assert ( - partitioned_status["num_partitions_started"] - >= partitioned_status["num_partitions_completed"] - ) - assert ( - partitioned_status["num_partitions_expected"] - >= partitioned_status["num_partitions_started"] - ) + assert partitioned_status is not None, "partitioned_stream_status must always be present in state" + assert "num_partitions_started" in partitioned_status + assert "num_partitions_completed" in partitioned_status + assert "num_partitions_expected" in partitioned_status + assert "is_partition_discovery_complete" in partitioned_status + assert ( + partitioned_status["num_partitions_started"] + >= partitioned_status["num_partitions_completed"] + ) + assert ( + partitioned_status["num_partitions_expected"] + >= partitioned_status["num_partitions_started"] + ) _strip_partitioned_stream_status(final_state_dict) assert final_state_dict == expected_state diff --git a/unit_tests/sources/declarative/incremental/test_per_partition_cursor_integration.py b/unit_tests/sources/declarative/incremental/test_per_partition_cursor_integration.py index 2f2b6b2bd..852f5f77a 100644 --- a/unit_tests/sources/declarative/incremental/test_per_partition_cursor_integration.py +++ b/unit_tests/sources/declarative/incremental/test_per_partition_cursor_integration.py @@ -343,6 +343,12 @@ def test_given_record_for_partition_when_read_then_update_state(caplog): "cursor": {CURSOR_FIELD: "2022-01-01"}, }, ], + "partitioned_stream_status": { + "num_partitions_started": 2, + "num_partitions_completed": 2, + "num_partitions_expected": 2, + "is_partition_discovery_complete": True, + }, } @@ -581,6 +587,12 @@ def test_perpartition_with_fallback(caplog): "use_global_cursor": True, "state": {"cursor_field": "2022-02-19"}, "lookback_window": 1, + "partitioned_stream_status": { + "num_partitions_started": 6, + "num_partitions_completed": 6, + "num_partitions_expected": 6, + "is_partition_discovery_complete": True, + }, } @@ -763,6 +775,12 @@ def test_per_partition_cursor_within_limit(caplog): "cursor": {CURSOR_FIELD: "2022-03-29"}, }, ], + "partitioned_stream_status": { + "num_partitions_started": 3, + "num_partitions_completed": 3, + "num_partitions_expected": 3, + "is_partition_discovery_complete": True, + }, } From 66399ab6cbe3f8d9482084542fe9d481131b5b48 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sun, 26 Apr 2026 06:24:09 +0000 Subject: [PATCH 04/11] style: fix ruff formatting in test file --- .../incremental/test_concurrent_perpartitioncursor.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py index 8be450e35..6da7ff6c0 100644 --- a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -393,7 +393,9 @@ def run_mocked_test( final_state_dict = final_state.__dict__ # Validate partitioned_stream_status exists and has correct shape, then remove for comparison partitioned_status = final_state_dict.get("partitioned_stream_status") - assert partitioned_status is not None, "partitioned_stream_status must always be present in state" + assert partitioned_status is not None, ( + "partitioned_stream_status must always be present in state" + ) assert "num_partitions_started" in partitioned_status assert "num_partitions_completed" in partitioned_status assert "num_partitions_expected" in partitioned_status From 2bc0024c810d54aa5d52c32cc79e85fe7fc7a60e Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sun, 26 Apr 2026 06:26:51 +0000 Subject: [PATCH 05/11] refactor: use .values() and .copy() per review nitpicks --- .../incremental/test_concurrent_perpartitioncursor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py index 6da7ff6c0..a59d2a138 100644 --- a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -332,7 +332,7 @@ def _strip_partitioned_stream_status(state_dict: dict) -> dict: """Recursively strip partitioned_stream_status from state dicts, including nested parent_state.""" state_dict.pop("partitioned_stream_status", None) if "parent_state" in state_dict and isinstance(state_dict["parent_state"], dict): - for key, value in state_dict["parent_state"].items(): + for value in state_dict["parent_state"].values(): if isinstance(value, dict): _strip_partitioned_stream_status(value) return state_dict @@ -1136,7 +1136,7 @@ def run_incremental_parent_state_test( final_states = [] # To store the final state after each read # Store the final state after the initial read - initial_final_state = output.state_messages[-1].state.stream.stream_state.__dict__ + initial_final_state = output.state_messages[-1].state.stream.stream_state.__dict__.copy() _strip_partitioned_stream_status(initial_final_state) final_states.append(initial_final_state) From 41429a25625ac9f18fd9f1fbd3ebf774013933b7 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sun, 26 Apr 2026 06:28:19 +0000 Subject: [PATCH 06/11] docs: add per-sync counter comments and improve helper docstring --- .../declarative/incremental/concurrent_partition_cursor.py | 4 +++- .../incremental/test_concurrent_perpartitioncursor.py | 6 +++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index 2d0769adc..a5011aa46 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -185,7 +185,9 @@ def __init__( self._last_emission_time: float = 0.0 self._timer = Timer() - # Partitioned stream status tracking for progress estimation + # Partitioned stream status tracking for progress estimation. + # These counters are per-sync only and intentionally NOT restored from persisted state + # (_set_initial_state does not read them back). On resume, they reset to 0. self._num_partitions_started: int = 0 self._num_partitions_completed: int = 0 self._is_partition_discovery_complete: bool = False diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py index a59d2a138..c4e2163d2 100644 --- a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -329,7 +329,11 @@ def _strip_partitioned_stream_status(state_dict: dict) -> dict: - """Recursively strip partitioned_stream_status from state dicts, including nested parent_state.""" + """Recursively strip partitioned_stream_status from state dicts in-place, including nested parent_state. + + Mutates and returns the same dict. Callers that need to preserve the original should pass a copy. + Only traverses parent_state nesting; extend if the emitted shape gains more nesting layers. + """ state_dict.pop("partitioned_stream_status", None) if "parent_state" in state_dict and isinstance(state_dict["parent_state"], dict): for value in state_dict["parent_state"].values(): From 98f37108198e8b22b4c23d07824b26c33cd9e9ae Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sun, 26 Apr 2026 06:31:09 +0000 Subject: [PATCH 07/11] refactor: rename num_partitions_started to num_partitions_in_progress (computed) --- .../incremental/concurrent_partition_cursor.py | 5 ++--- .../test_concurrent_perpartitioncursor.py | 15 ++++++--------- .../test_per_partition_cursor_integration.py | 6 +++--- 3 files changed, 11 insertions(+), 15 deletions(-) diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index a5011aa46..1fe7bbbef 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -188,7 +188,6 @@ def __init__( # Partitioned stream status tracking for progress estimation. # These counters are per-sync only and intentionally NOT restored from persisted state # (_set_initial_state does not read them back). On resume, they reset to 0. - self._num_partitions_started: int = 0 self._num_partitions_completed: int = 0 self._is_partition_discovery_complete: bool = False @@ -225,7 +224,8 @@ def state(self) -> MutableMapping[str, Any]: if self._parent_state is not None: state["parent_state"] = self._parent_state state["partitioned_stream_status"] = { - "num_partitions_started": self._num_partitions_started, + "num_partitions_in_progress": self._generated_partitions_count + - self._num_partitions_completed, "num_partitions_completed": self._num_partitions_completed, "num_partitions_expected": self._generated_partitions_count, "is_partition_discovery_complete": self._is_partition_discovery_complete, @@ -367,7 +367,6 @@ def _generate_slices_from_partition( with self._lock: seq = self._generated_partitions_count self._generated_partitions_count += 1 - self._num_partitions_started += 1 self._processing_partitions_indexes.append(seq) self._partition_key_to_index[partition_key] = seq diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py index c4e2163d2..4174880b3 100644 --- a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -400,17 +400,14 @@ def run_mocked_test( assert partitioned_status is not None, ( "partitioned_stream_status must always be present in state" ) - assert "num_partitions_started" in partitioned_status + assert "num_partitions_in_progress" in partitioned_status assert "num_partitions_completed" in partitioned_status assert "num_partitions_expected" in partitioned_status assert "is_partition_discovery_complete" in partitioned_status - assert ( - partitioned_status["num_partitions_started"] - >= partitioned_status["num_partitions_completed"] - ) + assert partitioned_status["num_partitions_in_progress"] >= 0 assert ( partitioned_status["num_partitions_expected"] - >= partitioned_status["num_partitions_started"] + >= partitioned_status["num_partitions_completed"] ) _strip_partitioned_stream_status(final_state_dict) assert final_state_dict == expected_state @@ -3694,7 +3691,7 @@ def test_given_no_partitions_processed_when_close_partition_then_no_state_update state = cursor.state partitioned_status = state.pop("partitioned_stream_status", None) assert partitioned_status is not None - assert partitioned_status["num_partitions_started"] == 0 + assert partitioned_status["num_partitions_in_progress"] == 0 assert partitioned_status["num_partitions_completed"] == 0 assert partitioned_status["num_partitions_expected"] == 0 assert partitioned_status["is_partition_discovery_complete"] is True @@ -3788,7 +3785,7 @@ def test_given_unfinished_first_parent_partition_no_parent_state_update(): state = cursor.state partitioned_status = state.pop("partitioned_stream_status", None) assert partitioned_status is not None - assert partitioned_status["num_partitions_started"] == 2 + assert partitioned_status["num_partitions_in_progress"] == 1 assert partitioned_status["num_partitions_completed"] == 1 assert partitioned_status["num_partitions_expected"] == 2 assert partitioned_status["is_partition_discovery_complete"] is True @@ -3890,7 +3887,7 @@ def test_given_unfinished_last_parent_partition_with_partial_parent_state_update state = cursor.state partitioned_status = state.pop("partitioned_stream_status", None) assert partitioned_status is not None - assert partitioned_status["num_partitions_started"] == 2 + assert partitioned_status["num_partitions_in_progress"] == 1 assert partitioned_status["num_partitions_completed"] == 1 assert partitioned_status["num_partitions_expected"] == 2 assert partitioned_status["is_partition_discovery_complete"] is True diff --git a/unit_tests/sources/declarative/incremental/test_per_partition_cursor_integration.py b/unit_tests/sources/declarative/incremental/test_per_partition_cursor_integration.py index 852f5f77a..dbfcffb90 100644 --- a/unit_tests/sources/declarative/incremental/test_per_partition_cursor_integration.py +++ b/unit_tests/sources/declarative/incremental/test_per_partition_cursor_integration.py @@ -344,7 +344,7 @@ def test_given_record_for_partition_when_read_then_update_state(caplog): }, ], "partitioned_stream_status": { - "num_partitions_started": 2, + "num_partitions_in_progress": 0, "num_partitions_completed": 2, "num_partitions_expected": 2, "is_partition_discovery_complete": True, @@ -588,7 +588,7 @@ def test_perpartition_with_fallback(caplog): "state": {"cursor_field": "2022-02-19"}, "lookback_window": 1, "partitioned_stream_status": { - "num_partitions_started": 6, + "num_partitions_in_progress": 0, "num_partitions_completed": 6, "num_partitions_expected": 6, "is_partition_discovery_complete": True, @@ -776,7 +776,7 @@ def test_per_partition_cursor_within_limit(caplog): }, ], "partitioned_stream_status": { - "num_partitions_started": 3, + "num_partitions_in_progress": 0, "num_partitions_completed": 3, "num_partitions_expected": 3, "is_partition_discovery_complete": True, From cfb4543d80b5c59fd415f7ca804757b3ce13876f Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sun, 26 Apr 2026 06:39:49 +0000 Subject: [PATCH 08/11] fix: strip partitioned_stream_status in model_to_component_factory test assertion --- .../declarative/parsers/test_model_to_component_factory.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index add9a1c42..b8dc15ec1 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -1327,7 +1327,9 @@ def test_stream_with_incremental_and_async_retriever_with_partition_router(use_l assert isinstance(retriever, AsyncRetriever) stream_slicer = retriever.stream_slicer.stream_slicer assert isinstance(stream_slicer, ConcurrentPerPartitionCursor) - assert stream_slicer.state == stream_state + actual_state = stream_slicer.state + actual_state.pop("partitioned_stream_status", None) + assert actual_state == stream_state import json cursor_perpartition = stream_slicer._cursor_per_partition From bf2963c50547ecefd75965a148f759dd3a4d0356 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sun, 26 Apr 2026 06:48:47 +0000 Subject: [PATCH 09/11] feat: add _partitions_observed tracking via observe() for in_progress/pending distinction - Add _partitions_observed: set[str] to track partitions where observe() has been called - Update state property to emit 4 fields: in_progress, completed, expected, is_partition_discovery_complete - num_partitions_in_progress = len(observed) - completed (worker started but not finished) - Dropped num_partitions_not_started (derivable from expected - in_progress - completed) - _cleanup_if_done adds partition to observed set to prevent negative in_progress - Updated all test assertions for new semantics --- .../incremental/concurrent_partition_cursor.py | 15 ++++++++++----- .../test_concurrent_perpartitioncursor.py | 16 +++++++++++++--- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index 1fe7bbbef..0c64a4c53 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -190,6 +190,9 @@ def __init__( # (_set_initial_state does not read them back). On resume, they reset to 0. self._num_partitions_completed: int = 0 self._is_partition_discovery_complete: bool = False + # Tracks partition keys for which observe() has been called (worker produced at least one record). + # Only len() is used in state emission; the set itself is never serialized. + self._partitions_observed: set[str] = set() self._set_initial_state(stream_state) @@ -223,9 +226,9 @@ def state(self) -> MutableMapping[str, Any]: state["lookback_window"] = self._lookback_window if self._parent_state is not None: state["parent_state"] = self._parent_state + num_observed = len(self._partitions_observed) state["partitioned_stream_status"] = { - "num_partitions_in_progress": self._generated_partitions_count - - self._num_partitions_completed, + "num_partitions_in_progress": num_observed - self._num_partitions_completed, "num_partitions_completed": self._num_partitions_completed, "num_partitions_expected": self._generated_partitions_count, "is_partition_discovery_complete": self._is_partition_discovery_complete, @@ -552,11 +555,11 @@ def observe(self, record: Record) -> None: return self._synced_some_data = True + partition_key = self._to_partition_key(record.associated_slice.partition) + self._partitions_observed.add(partition_key) self._update_global_cursor(record_cursor) if not self._use_global_cursor: - self._cursor_per_partition[ - self._to_partition_key(record.associated_slice.partition) - ].observe(record) + self._cursor_per_partition[partition_key].observe(record) def _update_global_cursor(self, value: Any) -> None: if ( @@ -581,6 +584,8 @@ def _cleanup_if_done(self, partition_key: str) -> None: seq = self._partition_key_to_index.pop(partition_key) self._processing_partitions_indexes.remove(seq) + # Ensure completed partitions are counted as observed (handles partitions with no records) + self._partitions_observed.add(partition_key) self._num_partitions_completed += 1 logger.debug(f"Partition {partition_key} fully processed and cleaned up.") diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py index 4174880b3..a72a3363f 100644 --- a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -405,9 +405,11 @@ def run_mocked_test( assert "num_partitions_expected" in partitioned_status assert "is_partition_discovery_complete" in partitioned_status assert partitioned_status["num_partitions_in_progress"] >= 0 + assert partitioned_status["num_partitions_completed"] >= 0 assert ( partitioned_status["num_partitions_expected"] - >= partitioned_status["num_partitions_completed"] + >= partitioned_status["num_partitions_in_progress"] + + partitioned_status["num_partitions_completed"] ) _strip_partitioned_stream_status(final_state_dict) assert final_state_dict == expected_state @@ -3695,6 +3697,12 @@ def test_given_no_partitions_processed_when_close_partition_then_no_state_update assert partitioned_status["num_partitions_completed"] == 0 assert partitioned_status["num_partitions_expected"] == 0 assert partitioned_status["is_partition_discovery_complete"] is True + # Invariant: in_progress + completed <= expected + assert ( + partitioned_status["num_partitions_in_progress"] + + partitioned_status["num_partitions_completed"] + <= partitioned_status["num_partitions_expected"] + ) assert state == { "use_global_cursor": False, "lookback_window": 0, @@ -3785,7 +3793,8 @@ def test_given_unfinished_first_parent_partition_no_parent_state_update(): state = cursor.state partitioned_status = state.pop("partitioned_stream_status", None) assert partitioned_status is not None - assert partitioned_status["num_partitions_in_progress"] == 1 + # observe() not called in this test, so in_progress comes only from _cleanup_if_done adding to observed + assert partitioned_status["num_partitions_in_progress"] == 0 assert partitioned_status["num_partitions_completed"] == 1 assert partitioned_status["num_partitions_expected"] == 2 assert partitioned_status["is_partition_discovery_complete"] is True @@ -3887,7 +3896,8 @@ def test_given_unfinished_last_parent_partition_with_partial_parent_state_update state = cursor.state partitioned_status = state.pop("partitioned_stream_status", None) assert partitioned_status is not None - assert partitioned_status["num_partitions_in_progress"] == 1 + # observe() not called in this test, so in_progress comes only from _cleanup_if_done adding to observed + assert partitioned_status["num_partitions_in_progress"] == 0 assert partitioned_status["num_partitions_completed"] == 1 assert partitioned_status["num_partitions_expected"] == 2 assert partitioned_status["is_partition_discovery_complete"] is True From 86d975d54cc3dbc0d10e2364755827d44ccb3634 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sun, 26 Apr 2026 06:58:02 +0000 Subject: [PATCH 10/11] fix: strip partitioned_stream_status in test_substream_partition_router assertions --- .../test_substream_partition_router.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py b/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py index b69849ebe..f7a88c855 100644 --- a/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py +++ b/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py @@ -44,6 +44,16 @@ InMemoryPartition, ) + +def _strip_partitioned_stream_status(state_dict: dict) -> dict: + """Recursively strip partitioned_stream_status from state dicts (mutates in place).""" + state_dict.pop("partitioned_stream_status", None) + for value in state_dict.values(): + if isinstance(value, dict): + _strip_partitioned_stream_status(value) + return state_dict + + parent_records = [{"id": 1, "data": "data1"}, {"id": 2, "data": "data2"}] more_records = [ {"id": 10, "data": "data10", "slice": "second_parent"}, @@ -639,6 +649,7 @@ def test_substream_slicer_parent_state_update_with_cursor(parent_stream_config, # Check if the parent state has been updated correctly parent_state = partition_router.get_stream_state() + _strip_partitioned_stream_status(parent_state) assert parent_state == expected_state From 4c3f25c9370e85f1e8dc2aef1a225335697a350d Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sun, 26 Apr 2026 07:09:49 +0000 Subject: [PATCH 11/11] fix: clamp num_partitions_in_progress to non-negative (defensive guard for duplicate partition keys) --- .../declarative/incremental/concurrent_partition_cursor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index 0c64a4c53..7d7611097 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -228,7 +228,7 @@ def state(self) -> MutableMapping[str, Any]: state["parent_state"] = self._parent_state num_observed = len(self._partitions_observed) state["partitioned_stream_status"] = { - "num_partitions_in_progress": num_observed - self._num_partitions_completed, + "num_partitions_in_progress": max(0, num_observed - self._num_partitions_completed), "num_partitions_completed": self._num_partitions_completed, "num_partitions_expected": self._generated_partitions_count, "is_partition_discovery_complete": self._is_partition_discovery_complete,