diff --git a/CHANGELOG.md b/CHANGELOG.md index 31e73e411..03f5bdc92 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ Newer updates can be found here: [GitHub Release Notes](https://github.com/airby # Changelog +## 6.5.3 + +bugfix: Improve low-code async job retry exhaustion error messages and classification. + ## 6.5.2 bugfix: Ensure that streams with partition router are not executed concurrently diff --git a/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py b/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py index 3607ad026..a928aa75d 100644 --- a/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py +++ b/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py @@ -37,6 +37,8 @@ LOGGER = logging.getLogger("airbyte") _NO_TIMEOUT = timedelta.max _API_SIDE_RUNNING_STATUS = {AsyncJobStatus.RUNNING, AsyncJobStatus.TIMED_OUT} +_ASYNC_JOB_TERMINAL_FAILURE_MESSAGE = "Async job failed after exhausting all retry attempts." +_ASYNC_JOB_FINAL_FAILURE_MESSAGE = "Async job partition did not complete successfully." class AsyncPartition: @@ -487,11 +489,16 @@ def _process_partitions_with_errors(self, partition: AsyncPartition) -> None: AirbyteTracedException: If at least one job could not be completed. """ status_by_job_id = {job.api_job_id(): job.status() for job in partition.jobs} + failure_type = ( + FailureType.system_error + if any(job.is_creation_failure() for job in partition.jobs) + else FailureType.transient_error + ) self._non_breaking_exceptions.append( AirbyteTracedException( - message="Async job failed after exhausting all retry attempts.", + message=_ASYNC_JOB_TERMINAL_FAILURE_MESSAGE, internal_message=f"At least one job could not be completed for slice {partition.stream_slice}. Job statuses were: {status_by_job_id}. See warning logs for more information.", - failure_type=FailureType.system_error, + failure_type=failure_type, ) ) @@ -536,15 +543,25 @@ def create_and_get_completed_partitions(self) -> Iterable[AsyncPartition]: if self._non_breaking_exceptions: # We emitted traced message but we didn't break on non_breaking_exception. We still need to raise an exception so that the # call of `create_and_get_completed_partitions` knows that there was an issue with some partitions and the sync is incomplete. + failure_type = ( + FailureType.transient_error + if any( + isinstance(exception, AirbyteTracedException) + and exception.message == _ASYNC_JOB_TERMINAL_FAILURE_MESSAGE + and exception.failure_type == FailureType.transient_error + for exception in self._non_breaking_exceptions + ) + else FailureType.system_error + ) raise AirbyteTracedException( - message="One or more async jobs failed after exhausting all retry attempts.", + message=_ASYNC_JOB_FINAL_FAILURE_MESSAGE, internal_message="\n".join( [ filter_secrets(exception.__repr__()) for exception in self._non_breaking_exceptions ] ), - failure_type=FailureType.system_error, + failure_type=failure_type, ) def _handle_non_breaking_error(self, exception: Exception) -> None: diff --git a/unit_tests/sources/declarative/async_job/test_job_orchestrator.py b/unit_tests/sources/declarative/async_job/test_job_orchestrator.py index dce40e624..1f8593cfd 100644 --- a/unit_tests/sources/declarative/async_job/test_job_orchestrator.py +++ b/unit_tests/sources/declarative/async_job/test_job_orchestrator.py @@ -344,6 +344,25 @@ def test_given_jobs_failed_more_than_max_attempts_when_create_and_get_completed_ assert job_tracker.try_to_get_intent() + @mock.patch(sleep_mock_target) + def test_given_jobs_failed_more_than_max_attempts_when_create_and_get_completed_partitions_then_raise_transient_error( + self, mock_sleep: MagicMock + ) -> None: + jobs = [self._an_async_job(str(i), _A_STREAM_SLICE) for i in range(_MAX_NUMBER_OF_ATTEMPTS)] + self._job_repository.start.side_effect = jobs + self._job_repository.update_jobs_status.side_effect = _status_update_per_jobs( + {job: [AsyncJobStatus.FAILED] for job in jobs} + ) + + orchestrator = self._orchestrator([_A_STREAM_SLICE], JobTracker(1)) + + with pytest.raises(AirbyteTracedException) as exc_info: + list(orchestrator.create_and_get_completed_partitions()) + + assert exc_info.value.message == "Async job partition did not complete successfully." + assert exc_info.value.failure_type == FailureType.transient_error + assert "At least one job could not be completed" in exc_info.value.internal_message + def given_budget_already_taken_before_start_when_create_and_get_completed_partitions_then_wait_for_budget_to_be_freed( self, ) -> None: