Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def reduce_slice_range_if_possible(
>= self._allowed_number_of_attempt_with_same_slice
):
raise AirbyteTracedException(
message="Pagination reset cannot narrow the stream slice.",
internal_message=f"There were {self._number_of_attempt_with_same_slice} attempts with the same slice already while the max allowed is {self._allowed_number_of_attempt_with_same_slice}",
failure_type=FailureType.system_error,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import pytest

from airbyte_cdk.sources.declarative.models import FailureType
from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.declarative.retrievers.pagination_tracker import PaginationTracker
from airbyte_cdk.sources.declarative.types import Record, StreamSlice
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor
Expand Down Expand Up @@ -66,9 +66,15 @@ def test_given_no_cursor_when_reduce_slice_range_multiple_times_then_raise(self)
original_slice = StreamSlice(partition={}, cursor_slice={})

tracker.reduce_slice_range_if_possible(original_slice, original_slice)
with pytest.raises(AirbyteTracedException):
with pytest.raises(AirbyteTracedException) as exc_info:
tracker.reduce_slice_range_if_possible(original_slice, original_slice)

assert exc_info.value.message == "Pagination reset cannot narrow the stream slice."
assert exc_info.value.internal_message == (
"There were 2 attempts with the same slice already while the max allowed is 2"
)
assert exc_info.value.failure_type == FailureType.system_error

def test_given_cursor_when_reduce_slice_range_then_return_cursor_stream_slice(self):
tracker = PaginationTracker(cursor=self._cursor)
self._cursor.reduce_slice_range.return_value = _A_STREAM_SLICE
Expand All @@ -84,9 +90,15 @@ def test_given_cursor_cant_reduce_slice_when_reduce_slice_range_then_raise(self)
original_slice = StreamSlice(partition={}, cursor_slice={})
self._cursor.reduce_slice_range.return_value = _A_STREAM_SLICE

with pytest.raises(AirbyteTracedException):
with pytest.raises(AirbyteTracedException) as exc_info:
tracker.reduce_slice_range_if_possible(_A_STREAM_SLICE, original_slice)

assert exc_info.value.message == "Pagination reset cannot narrow the stream slice."
assert exc_info.value.internal_message == (
"There were 1 attempts with the same slice already while the max allowed is 1"
)
assert exc_info.value.failure_type == FailureType.system_error

def test_cursor_called_with_original_slice_when_reduce_slice_range(self):
tracker = PaginationTracker(cursor=self._cursor)
original_slice = StreamSlice(partition={}, cursor_slice={})
Expand Down
Loading