Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ Newer updates can be found here: [GitHub Release Notes](https://github.com/airby

# Changelog

## 6.5.3

bugfix: Improve low-code async job retry exhaustion error messages and classification.

## 6.5.2

bugfix: Ensure that streams with partition router are not executed concurrently
Expand Down
25 changes: 21 additions & 4 deletions airbyte_cdk/sources/declarative/async_job/job_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
LOGGER = logging.getLogger("airbyte")
_NO_TIMEOUT = timedelta.max
_API_SIDE_RUNNING_STATUS = {AsyncJobStatus.RUNNING, AsyncJobStatus.TIMED_OUT}
_ASYNC_JOB_TERMINAL_FAILURE_MESSAGE = "Async job failed after exhausting all retry attempts."
_ASYNC_JOB_FINAL_FAILURE_MESSAGE = "Async job partition did not complete successfully."


class AsyncPartition:
Expand Down Expand Up @@ -487,11 +489,16 @@ def _process_partitions_with_errors(self, partition: AsyncPartition) -> None:
AirbyteTracedException: If at least one job could not be completed.
"""
status_by_job_id = {job.api_job_id(): job.status() for job in partition.jobs}
failure_type = (
FailureType.system_error
if any(job.is_creation_failure() for job in partition.jobs)
else FailureType.transient_error
)
self._non_breaking_exceptions.append(
AirbyteTracedException(
message="Async job failed after exhausting all retry attempts.",
message=_ASYNC_JOB_TERMINAL_FAILURE_MESSAGE,
internal_message=f"At least one job could not be completed for slice {partition.stream_slice}. Job statuses were: {status_by_job_id}. See warning logs for more information.",
failure_type=FailureType.system_error,
failure_type=failure_type,
)
)

Expand Down Expand Up @@ -536,15 +543,25 @@ def create_and_get_completed_partitions(self) -> Iterable[AsyncPartition]:
if self._non_breaking_exceptions:
# We emitted traced message but we didn't break on non_breaking_exception. We still need to raise an exception so that the
# call of `create_and_get_completed_partitions` knows that there was an issue with some partitions and the sync is incomplete.
failure_type = (
FailureType.transient_error
if any(
isinstance(exception, AirbyteTracedException)
and exception.message == _ASYNC_JOB_TERMINAL_FAILURE_MESSAGE
and exception.failure_type == FailureType.transient_error
for exception in self._non_breaking_exceptions
)
else FailureType.system_error
)
raise AirbyteTracedException(
message="One or more async jobs failed after exhausting all retry attempts.",
message=_ASYNC_JOB_FINAL_FAILURE_MESSAGE,
internal_message="\n".join(
[
filter_secrets(exception.__repr__())
for exception in self._non_breaking_exceptions
]
),
failure_type=FailureType.system_error,
failure_type=failure_type,
)

def _handle_non_breaking_error(self, exception: Exception) -> None:
Expand Down
19 changes: 19 additions & 0 deletions unit_tests/sources/declarative/async_job/test_job_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,25 @@ def test_given_jobs_failed_more_than_max_attempts_when_create_and_get_completed_

assert job_tracker.try_to_get_intent()

@mock.patch(sleep_mock_target)
def test_given_jobs_failed_more_than_max_attempts_when_create_and_get_completed_partitions_then_raise_transient_error(
self, mock_sleep: MagicMock
) -> None:
jobs = [self._an_async_job(str(i), _A_STREAM_SLICE) for i in range(_MAX_NUMBER_OF_ATTEMPTS)]
self._job_repository.start.side_effect = jobs
self._job_repository.update_jobs_status.side_effect = _status_update_per_jobs(
{job: [AsyncJobStatus.FAILED] for job in jobs}
)

orchestrator = self._orchestrator([_A_STREAM_SLICE], JobTracker(1))

with pytest.raises(AirbyteTracedException) as exc_info:
list(orchestrator.create_and_get_completed_partitions())

assert exc_info.value.message == "Async job partition did not complete successfully."
assert exc_info.value.failure_type == FailureType.transient_error
assert "At least one job could not be completed" in exc_info.value.internal_message

def given_budget_already_taken_before_start_when_create_and_get_completed_partitions_then_wait_for_budget_to_be_freed(
self,
) -> None:
Expand Down
Loading