diff --git a/airbyte_cdk/sources/streams/concurrent/cursor.py b/airbyte_cdk/sources/streams/concurrent/cursor.py index 11eaad235..7581e36a1 100644 --- a/airbyte_cdk/sources/streams/concurrent/cursor.py +++ b/airbyte_cdk/sources/streams/concurrent/cursor.py @@ -19,6 +19,7 @@ Union, ) +from airbyte_cdk.models import FailureType from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager from airbyte_cdk.sources.message import MessageRepository, NoopMessageRepository from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY @@ -30,6 +31,7 @@ AbstractStreamStateConverter, ) from airbyte_cdk.sources.types import Record, StreamSlice +from airbyte_cdk.utils.traced_exception import AirbyteTracedException LOGGER = logging.getLogger("airbyte") @@ -265,28 +267,36 @@ def _slice_boundary_fields_wrapper(self) -> Tuple[str, str]: def _get_concurrent_state( self, state: MutableMapping[str, Any] ) -> Tuple[CursorValueType, MutableMapping[str, Any]]: - if self._connector_state_converter.is_state_message_compatible(state): - partitioned_state = self._connector_state_converter.deserialize(state) - slices_from_partitioned_state = partitioned_state.get("slices", []) - - value_from_partitioned_state = None - if slices_from_partitioned_state: - # We assume here that the slices have been already merged - first_slice = slices_from_partitioned_state[0] - value_from_partitioned_state = ( - first_slice[self._connector_state_converter.MOST_RECENT_RECORD_KEY] - if self._connector_state_converter.MOST_RECENT_RECORD_KEY in first_slice - else first_slice[self._connector_state_converter.END_KEY] + try: + if self._connector_state_converter.is_state_message_compatible(state): + partitioned_state = self._connector_state_converter.deserialize(state) + slices_from_partitioned_state = partitioned_state.get("slices", []) + + value_from_partitioned_state = None + if slices_from_partitioned_state: + # We assume here that the slices have been already merged + first_slice = slices_from_partitioned_state[0] + value_from_partitioned_state = ( + first_slice[self._connector_state_converter.MOST_RECENT_RECORD_KEY] + if self._connector_state_converter.MOST_RECENT_RECORD_KEY in first_slice + else first_slice[self._connector_state_converter.END_KEY] + ) + return ( + value_from_partitioned_state + or self._start + or self._connector_state_converter.zero_value, + partitioned_state, ) - return ( - value_from_partitioned_state - or self._start - or self._connector_state_converter.zero_value, - partitioned_state, + return self._connector_state_converter.convert_from_sequential_state( + self._cursor_field, state, self._start ) - return self._connector_state_converter.convert_from_sequential_state( - self._cursor_field, state, self._start - ) + except ValueError as error: + raise AirbyteTracedException( + message=f"State cursor timestamp for stream {self._stream_name} cannot be parsed by this connector version.", + internal_message=f"Failed to parse state cursor field {self._cursor_field.cursor_field_key} for stream {self._stream_name}: {error}", + failure_type=FailureType.system_error, + exception=error, + ) from error def observe(self, record: Record) -> None: # Because observe writes to the most_recent_cursor_value_per_partition mapping, diff --git a/unit_tests/sources/declarative/test_state_delegating_stream.py b/unit_tests/sources/declarative/test_state_delegating_stream.py index 90bd0d76c..9abd5f3ad 100644 --- a/unit_tests/sources/declarative/test_state_delegating_stream.py +++ b/unit_tests/sources/declarative/test_state_delegating_stream.py @@ -25,6 +25,7 @@ ConcurrentDeclarativeSource, ) from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse +from airbyte_cdk.utils.traced_exception import AirbyteTracedException _CONFIG = {"start_date": "2024-07-01T00:00:00.000Z"} _MANIFEST = { @@ -591,7 +592,7 @@ def test_cursor_age_validation_raises_error_for_incrementing_count_cursor(): def test_cursor_age_validation_raises_error_for_unparseable_cursor(): - """Test that unparseable cursor datetime raises ValueError when api_retention_period is set.""" + """Test that unparseable cursor datetime raises traced exception when api_retention_period is set.""" manifest = _create_manifest_with_retention_period("P7D") state = [ @@ -608,9 +609,15 @@ def test_cursor_age_validation_raises_error_for_unparseable_cursor(): source_config=manifest, config=_CONFIG, catalog=None, state=state ) - with pytest.raises(ValueError, match="not-a-date"): + with pytest.raises(AirbyteTracedException) as exc_info: source.discover(logger=MagicMock(), config=_CONFIG) + assert ( + exc_info.value.message + == "State cursor timestamp for stream TestStream cannot be parsed by this connector version." + ) + assert "not-a-date" in (exc_info.value.internal_message or "") + @freezegun.freeze_time("2024-07-15") def test_final_state_cursor_falls_back_to_full_refresh_when_state_unparseable(): diff --git a/unit_tests/sources/streams/concurrent/test_cursor.py b/unit_tests/sources/streams/concurrent/test_cursor.py index 13fe1df87..00b3703be 100644 --- a/unit_tests/sources/streams/concurrent/test_cursor.py +++ b/unit_tests/sources/streams/concurrent/test_cursor.py @@ -11,6 +11,7 @@ import freezegun import pytest +from airbyte_cdk.models import FailureType from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager from airbyte_cdk.sources.message import MessageRepository from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY @@ -36,6 +37,7 @@ EpochValueConcurrentStreamStateConverter, ) from airbyte_cdk.sources.types import Record, StreamSlice +from airbyte_cdk.utils.traced_exception import AirbyteTracedException _A_STREAM_NAME = "a stream name" _A_STREAM_NAMESPACE = "a stream namespace" @@ -122,6 +124,35 @@ def test_given_no_cursor_value_when_observe_then_do_not_raise(self) -> None: # did not raise + def test_given_unparseable_state_cursor_when_initialize_then_raise_traced_exception( + self, + ) -> None: + with pytest.raises(AirbyteTracedException) as exc_info: + ConcurrentCursor( + _A_STREAM_NAME, + _A_STREAM_NAMESPACE, + {_A_CURSOR_FIELD_KEY: "2024-01-15T23:59:57"}, + self._message_repository, + self._state_manager, + CustomFormatConcurrentStreamStateConverter( + datetime_format="%Y-%m-%dT00:00:00", + input_datetime_formats=["%Y-%m-%dT%H:%M:%S.%f%z"], + ), + CursorField(_A_CURSOR_FIELD_KEY), + _NO_SLICE_BOUNDARIES, + None, + CustomFormatConcurrentStreamStateConverter.get_end_provider(), + _NO_LOOKBACK_WINDOW, + ) + + assert ( + exc_info.value.message + == f"State cursor timestamp for stream {_A_STREAM_NAME} cannot be parsed by this connector version." + ) + assert exc_info.value.failure_type == FailureType.system_error + assert "Failed to parse state cursor field" in (exc_info.value.internal_message or "") + assert "No format in" in (exc_info.value.internal_message or "") + def test_given_boundary_fields_when_close_partition_then_emit_state(self) -> None: cursor = self._cursor_with_slice_boundary_fields() cursor.close_partition(