Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 30 additions & 20 deletions airbyte_cdk/sources/streams/concurrent/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
Union,
)

from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.message import MessageRepository, NoopMessageRepository
from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY
Expand All @@ -30,6 +31,7 @@
AbstractStreamStateConverter,
)
from airbyte_cdk.sources.types import Record, StreamSlice
from airbyte_cdk.utils.traced_exception import AirbyteTracedException

LOGGER = logging.getLogger("airbyte")

Expand Down Expand Up @@ -265,28 +267,36 @@ def _slice_boundary_fields_wrapper(self) -> Tuple[str, str]:
def _get_concurrent_state(
self, state: MutableMapping[str, Any]
) -> Tuple[CursorValueType, MutableMapping[str, Any]]:
if self._connector_state_converter.is_state_message_compatible(state):
partitioned_state = self._connector_state_converter.deserialize(state)
slices_from_partitioned_state = partitioned_state.get("slices", [])

value_from_partitioned_state = None
if slices_from_partitioned_state:
# We assume here that the slices have been already merged
first_slice = slices_from_partitioned_state[0]
value_from_partitioned_state = (
first_slice[self._connector_state_converter.MOST_RECENT_RECORD_KEY]
if self._connector_state_converter.MOST_RECENT_RECORD_KEY in first_slice
else first_slice[self._connector_state_converter.END_KEY]
try:
if self._connector_state_converter.is_state_message_compatible(state):
partitioned_state = self._connector_state_converter.deserialize(state)
slices_from_partitioned_state = partitioned_state.get("slices", [])

value_from_partitioned_state = None
if slices_from_partitioned_state:
# We assume here that the slices have been already merged
first_slice = slices_from_partitioned_state[0]
value_from_partitioned_state = (
first_slice[self._connector_state_converter.MOST_RECENT_RECORD_KEY]
if self._connector_state_converter.MOST_RECENT_RECORD_KEY in first_slice
else first_slice[self._connector_state_converter.END_KEY]
)
return (
value_from_partitioned_state
or self._start
or self._connector_state_converter.zero_value,
partitioned_state,
)
return (
value_from_partitioned_state
or self._start
or self._connector_state_converter.zero_value,
partitioned_state,
return self._connector_state_converter.convert_from_sequential_state(
self._cursor_field, state, self._start
)
return self._connector_state_converter.convert_from_sequential_state(
self._cursor_field, state, self._start
)
except ValueError as error:
raise AirbyteTracedException(
message=f"State cursor timestamp for stream {self._stream_name} cannot be parsed by this connector version.",
internal_message=f"Failed to parse state cursor field {self._cursor_field.cursor_field_key} for stream {self._stream_name}: {error}",
failure_type=FailureType.system_error,
exception=error,
) from error

def observe(self, record: Record) -> None:
# Because observe writes to the most_recent_cursor_value_per_partition mapping,
Expand Down
11 changes: 9 additions & 2 deletions unit_tests/sources/declarative/test_state_delegating_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
ConcurrentDeclarativeSource,
)
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse
from airbyte_cdk.utils.traced_exception import AirbyteTracedException

_CONFIG = {"start_date": "2024-07-01T00:00:00.000Z"}
_MANIFEST = {
Expand Down Expand Up @@ -591,7 +592,7 @@ def test_cursor_age_validation_raises_error_for_incrementing_count_cursor():


def test_cursor_age_validation_raises_error_for_unparseable_cursor():
"""Test that unparseable cursor datetime raises ValueError when api_retention_period is set."""
"""Test that unparseable cursor datetime raises traced exception when api_retention_period is set."""
manifest = _create_manifest_with_retention_period("P7D")

state = [
Expand All @@ -608,9 +609,15 @@ def test_cursor_age_validation_raises_error_for_unparseable_cursor():
source_config=manifest, config=_CONFIG, catalog=None, state=state
)

with pytest.raises(ValueError, match="not-a-date"):
with pytest.raises(AirbyteTracedException) as exc_info:
source.discover(logger=MagicMock(), config=_CONFIG)

assert (
exc_info.value.message
== "State cursor timestamp for stream TestStream cannot be parsed by this connector version."
)
assert "not-a-date" in (exc_info.value.internal_message or "")


@freezegun.freeze_time("2024-07-15")
def test_final_state_cursor_falls_back_to_full_refresh_when_state_unparseable():
Expand Down
31 changes: 31 additions & 0 deletions unit_tests/sources/streams/concurrent/test_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import freezegun
import pytest

from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.message import MessageRepository
from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY
Expand All @@ -36,6 +37,7 @@
EpochValueConcurrentStreamStateConverter,
)
from airbyte_cdk.sources.types import Record, StreamSlice
from airbyte_cdk.utils.traced_exception import AirbyteTracedException

_A_STREAM_NAME = "a stream name"
_A_STREAM_NAMESPACE = "a stream namespace"
Expand Down Expand Up @@ -122,6 +124,35 @@ def test_given_no_cursor_value_when_observe_then_do_not_raise(self) -> None:

# did not raise

def test_given_unparseable_state_cursor_when_initialize_then_raise_traced_exception(
self,
) -> None:
with pytest.raises(AirbyteTracedException) as exc_info:
ConcurrentCursor(
_A_STREAM_NAME,
_A_STREAM_NAMESPACE,
{_A_CURSOR_FIELD_KEY: "2024-01-15T23:59:57"},
self._message_repository,
self._state_manager,
CustomFormatConcurrentStreamStateConverter(
datetime_format="%Y-%m-%dT00:00:00",
input_datetime_formats=["%Y-%m-%dT%H:%M:%S.%f%z"],
),
CursorField(_A_CURSOR_FIELD_KEY),
_NO_SLICE_BOUNDARIES,
None,
CustomFormatConcurrentStreamStateConverter.get_end_provider(),
_NO_LOOKBACK_WINDOW,
)

assert (
exc_info.value.message
== f"State cursor timestamp for stream {_A_STREAM_NAME} cannot be parsed by this connector version."
)
assert exc_info.value.failure_type == FailureType.system_error
assert "Failed to parse state cursor field" in (exc_info.value.internal_message or "")
assert "No format in" in (exc_info.value.internal_message or "")

def test_given_boundary_fields_when_close_partition_then_emit_state(self) -> None:
cursor = self._cursor_with_slice_boundary_fields()
cursor.close_partition(
Expand Down
Loading