Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,23 @@ def on_exception(self, exception: StreamThreadException) -> Iterable[AirbyteMess
def _flag_exception(self, stream_name: str, exception: Exception) -> None:
self._exceptions_per_stream_name.setdefault(stream_name, []).append(exception)

def _get_failure_type_for_collected_exceptions(self) -> FailureType:
failure_types = [
exception.failure_type
for exceptions in self._exceptions_per_stream_name.values()
for exception in exceptions
if isinstance(exception, AirbyteTracedException)
]
if failure_types and all(
failure_type == FailureType.config_error for failure_type in failure_types
):
return FailureType.config_error
if failure_types and all(
failure_type == FailureType.transient_error for failure_type in failure_types
):
return FailureType.transient_error
return FailureType.system_error

def start_next_partition_generator(self) -> Optional[AirbyteMessage]:
"""
Submits the next partition generator to the thread pool.
Expand Down Expand Up @@ -382,12 +399,11 @@ def is_done(self) -> bool:
error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name)
self._logger.info(error_message)
# We still raise at least one exception when a stream raises an exception because the platform currently relies
# on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error
# type because this combined error isn't actionable, but rather the previously emitted individual errors.
# on a non-zero exit code to determine if a sync attempt has failed.
raise AirbyteTracedException(
message=error_message,
internal_message="Concurrent read failure",
failure_type=FailureType.config_error,
failure_type=self._get_failure_type_for_collected_exceptions(),
)
return is_done

Expand Down
9 changes: 9 additions & 0 deletions airbyte_cdk/sources/streams/http/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import logging
import os
import sqlite3
import urllib
from pathlib import Path
from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Union
Expand Down Expand Up @@ -346,6 +347,14 @@ def _send(

try:
response = self._session.send(request, **request_kwargs)
except sqlite3.OperationalError as e:
raise AirbyteTracedException(
internal_message=f"HTTP response cache operation failed with OperationalError: {e}",
message="Internal HTTP response cache failed.",
failure_type=FailureType.system_error,
exception=e,
stream_descriptor=StreamDescriptor(name=self._name),
)
except requests.RequestException as e:
exc = e

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -773,9 +773,32 @@ def test_is_done_is_true_if_all_partitions_are_closed_and_no_streams_are_generat

assert handler.is_done()

def test_is_done_uses_failure_type_from_collected_exceptions(self):
handler = ConcurrentReadProcessor(
[],
self._partition_enqueuer,
self._thread_pool_manager,
self._logger,
self._slice_logger,
self._message_repository,
self._partition_reader,
)
handler._exceptions_per_stream_name = {
_STREAM_NAME: [
AirbyteTracedException(
message="Internal HTTP response cache failed.",
failure_type=FailureType.system_error,
)
]
}

with pytest.raises(AirbyteTracedException) as exc_info:
handler.is_done()

assert exc_info.value.failure_type == FailureType.system_error

@freezegun.freeze_time("2020-01-01T00:00:00")
def test_on_exception_non_ate_uses_templated_message_with_correct_failure_type(self):
"""Regression test: non-ATE exceptions on Path B produce a safe templated message, not the generic fallback."""
stream_instances_to_read_from = [self._stream, self._another_stream]

handler = ConcurrentReadProcessor(
Expand Down
18 changes: 18 additions & 0 deletions unit_tests/sources/streams/http/test_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import logging
import os
import sqlite3
from datetime import timedelta
from unittest.mock import MagicMock, patch

Expand Down Expand Up @@ -448,6 +449,23 @@ def test_session_request_exception_raises_backoff_exception():
http_client._send(prepared_request, {})


def test_session_operational_error_raises_cache_traced_exception():
mocked_session = MagicMock(spec=requests.Session)
mocked_session.send.side_effect = sqlite3.OperationalError("no more rows available")
http_client = HttpClient(name="test", logger=MagicMock(), session=mocked_session)
prepared_request = requests.PreparedRequest()

with pytest.raises(AirbyteTracedException) as exc_info:
http_client._send(prepared_request, {})

assert exc_info.value.message == "Internal HTTP response cache failed."
assert (
exc_info.value.internal_message
== "HTTP response cache operation failed with OperationalError: no more rows available"
)
assert exc_info.value.failure_type == FailureType.system_error


def test_that_response_was_cached(requests_mock):
cached_http_client = test_cache_http_client()

Expand Down
Loading