diff --git a/airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py b/airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py index 4987ea38c..dedf2fa7b 100644 --- a/airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py +++ b/airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py @@ -67,6 +67,7 @@ def reduce_slice_range_if_possible( >= self._allowed_number_of_attempt_with_same_slice ): raise AirbyteTracedException( + message="Pagination reset cannot narrow the stream slice.", internal_message=f"There were {self._number_of_attempt_with_same_slice} attempts with the same slice already while the max allowed is {self._allowed_number_of_attempt_with_same_slice}", failure_type=FailureType.system_error, ) diff --git a/unit_tests/sources/declarative/retrievers/test_pagination_tracker.py b/unit_tests/sources/declarative/retrievers/test_pagination_tracker.py index 6b8207cc1..1e817b16d 100644 --- a/unit_tests/sources/declarative/retrievers/test_pagination_tracker.py +++ b/unit_tests/sources/declarative/retrievers/test_pagination_tracker.py @@ -6,7 +6,7 @@ import pytest -from airbyte_cdk.sources.declarative.models import FailureType +from airbyte_cdk.models import FailureType from airbyte_cdk.sources.declarative.retrievers.pagination_tracker import PaginationTracker from airbyte_cdk.sources.declarative.types import Record, StreamSlice from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor @@ -66,9 +66,15 @@ def test_given_no_cursor_when_reduce_slice_range_multiple_times_then_raise(self) original_slice = StreamSlice(partition={}, cursor_slice={}) tracker.reduce_slice_range_if_possible(original_slice, original_slice) - with pytest.raises(AirbyteTracedException): + with pytest.raises(AirbyteTracedException) as exc_info: tracker.reduce_slice_range_if_possible(original_slice, original_slice) + assert exc_info.value.message == "Pagination reset cannot narrow the stream slice." + assert exc_info.value.internal_message == ( + "There were 2 attempts with the same slice already while the max allowed is 2" + ) + assert exc_info.value.failure_type == FailureType.system_error + def test_given_cursor_when_reduce_slice_range_then_return_cursor_stream_slice(self): tracker = PaginationTracker(cursor=self._cursor) self._cursor.reduce_slice_range.return_value = _A_STREAM_SLICE @@ -84,9 +90,15 @@ def test_given_cursor_cant_reduce_slice_when_reduce_slice_range_then_raise(self) original_slice = StreamSlice(partition={}, cursor_slice={}) self._cursor.reduce_slice_range.return_value = _A_STREAM_SLICE - with pytest.raises(AirbyteTracedException): + with pytest.raises(AirbyteTracedException) as exc_info: tracker.reduce_slice_range_if_possible(_A_STREAM_SLICE, original_slice) + assert exc_info.value.message == "Pagination reset cannot narrow the stream slice." + assert exc_info.value.internal_message == ( + "There were 1 attempts with the same slice already while the max allowed is 1" + ) + assert exc_info.value.failure_type == FailureType.system_error + def test_cursor_called_with_original_slice_when_reduce_slice_range(self): tracker = PaginationTracker(cursor=self._cursor) original_slice = StreamSlice(partition={}, cursor_slice={})