From 23f76d3441321a737b8922148694388a1498b5ae Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sat, 9 May 2026 10:32:41 +0000 Subject: [PATCH] fix(cdk): surface HTTP cache operational errors Co-Authored-By: bot_apk --- .../concurrent_read_processor.py | 22 +++++++++++++--- .../sources/streams/http/http_client.py | 9 +++++++ .../test_concurrent_read_processor.py | 25 ++++++++++++++++++- .../sources/streams/http/test_http_client.py | 18 +++++++++++++ 4 files changed, 70 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py b/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py index a78905e72..9b5d8c77b 100644 --- a/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py +++ b/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py @@ -240,6 +240,23 @@ def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMess def _flag_exception(self, stream_name: str, exception: Exception) -> None: self._exceptions_per_stream_name.setdefault(stream_name, []).append(exception) + def _get_failure_type_for_collected_exceptions(self) -> FailureType: + failure_types = [ + exception.failure_type + for exceptions in self._exceptions_per_stream_name.values() + for exception in exceptions + if isinstance(exception, AirbyteTracedException) + ] + if failure_types and all( + failure_type == FailureType.config_error for failure_type in failure_types + ): + return FailureType.config_error + if failure_types and all( + failure_type == FailureType.transient_error for failure_type in failure_types + ): + return FailureType.transient_error + return FailureType.system_error + def start_next_partition_generator(self) -> Optional[AirbyteMessage]: """ Submits the next partition generator to the thread pool. @@ -382,12 +399,11 @@ def is_done(self) -> bool: error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name) self._logger.info(error_message) # We still raise at least one exception when a stream raises an exception because the platform currently relies - # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error - # type because this combined error isn't actionable, but rather the previously emitted individual errors. + # on a non-zero exit code to determine if a sync attempt has failed. raise AirbyteTracedException( message=error_message, internal_message="Concurrent read failure", - failure_type=FailureType.config_error, + failure_type=self._get_failure_type_for_collected_exceptions(), ) return is_done diff --git a/airbyte_cdk/sources/streams/http/http_client.py b/airbyte_cdk/sources/streams/http/http_client.py index c1d0eabd6..bb1c52713 100644 --- a/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte_cdk/sources/streams/http/http_client.py @@ -4,6 +4,7 @@ import logging import os +import sqlite3 import urllib from pathlib import Path from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Union @@ -346,6 +347,14 @@ def _send( try: response = self._session.send(request, **request_kwargs) + except sqlite3.OperationalError as e: + raise AirbyteTracedException( + internal_message=f"HTTP response cache operation failed with OperationalError: {e}", + message="Internal HTTP response cache failed.", + failure_type=FailureType.system_error, + exception=e, + stream_descriptor=StreamDescriptor(name=self._name), + ) except requests.RequestException as e: exc = e diff --git a/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py b/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py index 910111a05..37082058a 100644 --- a/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py +++ b/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py @@ -773,9 +773,32 @@ def test_is_done_is_true_if_all_partitions_are_closed_and_no_streams_are_generat assert handler.is_done() + def test_is_done_uses_failure_type_from_collected_exceptions(self): + handler = ConcurrentReadProcessor( + [], + self._partition_enqueuer, + self._thread_pool_manager, + self._logger, + self._slice_logger, + self._message_repository, + self._partition_reader, + ) + handler._exceptions_per_stream_name = { + _STREAM_NAME: [ + AirbyteTracedException( + message="Internal HTTP response cache failed.", + failure_type=FailureType.system_error, + ) + ] + } + + with pytest.raises(AirbyteTracedException) as exc_info: + handler.is_done() + + assert exc_info.value.failure_type == FailureType.system_error + @freezegun.freeze_time("2020-01-01T00:00:00") def test_on_exception_non_ate_uses_templated_message_with_correct_failure_type(self): - """Regression test: non-ATE exceptions on Path B produce a safe templated message, not the generic fallback.""" stream_instances_to_read_from = [self._stream, self._another_stream] handler = ConcurrentReadProcessor( diff --git a/unit_tests/sources/streams/http/test_http_client.py b/unit_tests/sources/streams/http/test_http_client.py index 48d396cb6..fa564f787 100644 --- a/unit_tests/sources/streams/http/test_http_client.py +++ b/unit_tests/sources/streams/http/test_http_client.py @@ -2,6 +2,7 @@ import logging import os +import sqlite3 from datetime import timedelta from unittest.mock import MagicMock, patch @@ -448,6 +449,23 @@ def test_session_request_exception_raises_backoff_exception(): http_client._send(prepared_request, {}) +def test_session_operational_error_raises_cache_traced_exception(): + mocked_session = MagicMock(spec=requests.Session) + mocked_session.send.side_effect = sqlite3.OperationalError("no more rows available") + http_client = HttpClient(name="test", logger=MagicMock(), session=mocked_session) + prepared_request = requests.PreparedRequest() + + with pytest.raises(AirbyteTracedException) as exc_info: + http_client._send(prepared_request, {}) + + assert exc_info.value.message == "Internal HTTP response cache failed." + assert ( + exc_info.value.internal_message + == "HTTP response cache operation failed with OperationalError: no more rows available" + ) + assert exc_info.value.failure_type == FailureType.system_error + + def test_that_response_was_cached(requests_mock): cached_http_client = test_cache_http_client()