From b00c20ca5ed93a4292016f713ef45de43bb8853b Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 21 Apr 2026 13:04:53 +0000 Subject: [PATCH 1/3] fix(async-job): propagate wrapped FailureType on async job aggregation Replace hardcoded system_error in AsyncJobOrchestrator's aggregated failure with the highest-precedence FailureType among wrapped non-breaking exceptions (config_error > transient_error > system_error). The user-facing message is chosen per FailureType to stay deterministic; underlying failure-type counts and exception reprs are moved into internal_message. Co-Authored-By: bot_apk --- .../declarative/async_job/job_orchestrator.py | 70 ++++++++++++++++++- .../async_job/test_job_orchestrator.py | 39 +++++++++++ 2 files changed, 106 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py b/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py index 21bb3b071..8ba02e20e 100644 --- a/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py +++ b/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py @@ -8,6 +8,7 @@ from datetime import timedelta from typing import ( Any, + Dict, Generator, Generic, Iterable, @@ -38,6 +39,29 @@ _NO_TIMEOUT = timedelta.max _API_SIDE_RUNNING_STATUS = {AsyncJobStatus.RUNNING, AsyncJobStatus.TIMED_OUT} +# Precedence used to aggregate the `FailureType` of many non-breaking +# exceptions into a single value. A `config_error` means the user must act +# before retries can succeed, so it dominates. `transient_error` is next +# (retryable). `system_error` is the fallback for genuine internal failures. +_FAILURE_TYPE_PRECEDENCE: Tuple[FailureType, ...] = ( + FailureType.config_error, + FailureType.transient_error, + FailureType.system_error, +) + +# Deterministic, aggregation-friendly user-facing messages per dominant +# `FailureType`. Counts and raw exception reprs go into `internal_message` +# so that the `message` field stays stable as a log aggregation key. +_ASYNC_JOB_FAILURE_MESSAGE_BY_TYPE: Mapping[FailureType, str] = { + FailureType.config_error: ( + "Async jobs failed because the source API rejected the request as unauthorized or forbidden." + ), + FailureType.transient_error: ( + "Async jobs failed after exhausting retries for source API rate limit or transient errors." + ), + FailureType.system_error: "Async jobs failed after exhausting retry attempts.", +} + class AsyncPartition: """ @@ -481,16 +505,56 @@ def create_and_get_completed_partitions(self) -> Iterable[AsyncPartition]: if self._non_breaking_exceptions: # We emitted traced message but we didn't break on non_breaking_exception. We still need to raise an exception so that the # call of `create_and_get_completed_partitions` knows that there was an issue with some partitions and the sync is incomplete. + failure_type = self._aggregate_failure_type(self._non_breaking_exceptions) + failure_counts = self._count_failure_types(self._non_breaking_exceptions) + summary = ", ".join( + f"{ft.value}={failure_counts[ft]}" + for ft in _FAILURE_TYPE_PRECEDENCE + if ft in failure_counts + ) raise AirbyteTracedException( - message="One or more async jobs failed after exhausting all retry attempts.", + message=_ASYNC_JOB_FAILURE_MESSAGE_BY_TYPE[failure_type], internal_message="\n".join( - [ + [f"Underlying failure breakdown: {summary}."] + + [ filter_secrets(exception.__repr__()) for exception in self._non_breaking_exceptions ] ), - failure_type=FailureType.system_error, + failure_type=failure_type, + ) + + @staticmethod + def _aggregate_failure_type(exceptions: List[Exception]) -> FailureType: + """Return the highest-precedence `FailureType` across `exceptions`. + + Non-`AirbyteTracedException` exceptions are treated as `system_error` + (matching `AirbyteTracedException`'s default). The precedence order + is `config_error` > `transient_error` > `system_error`. + """ + types_present: Set[FailureType] = { + exc.failure_type + if isinstance(exc, AirbyteTracedException) and exc.failure_type is not None + else FailureType.system_error + for exc in exceptions + } + for failure_type in _FAILURE_TYPE_PRECEDENCE: + if failure_type in types_present: + return failure_type + return FailureType.system_error + + @staticmethod + def _count_failure_types(exceptions: List[Exception]) -> Dict[FailureType, int]: + """Return a count of each `FailureType` observed in `exceptions`.""" + counts: Dict[FailureType, int] = {} + for exc in exceptions: + failure_type = ( + exc.failure_type + if isinstance(exc, AirbyteTracedException) and exc.failure_type is not None + else FailureType.system_error ) + counts[failure_type] = counts.get(failure_type, 0) + 1 + return counts def _handle_non_breaking_error(self, exception: Exception) -> None: LOGGER.error(f"Failed to start the Job: {exception}, traceback: {traceback.format_exc()}") diff --git a/unit_tests/sources/declarative/async_job/test_job_orchestrator.py b/unit_tests/sources/declarative/async_job/test_job_orchestrator.py index 7fe9bcdf0..4b089dbc6 100644 --- a/unit_tests/sources/declarative/async_job/test_job_orchestrator.py +++ b/unit_tests/sources/declarative/async_job/test_job_orchestrator.py @@ -298,6 +298,45 @@ def test_given_exception_when_start_job_and_skip_this_exception( assert self._message_repository.emit_message.call_count == 3 # one for each traced message assert exception.failure_type == FailureType.system_error # type: ignore # exception should be of type AirbyteTracedException + def test_aggregate_failure_type_gives_config_error_highest_precedence(self) -> None: + exceptions: List[Exception] = [ + AirbyteTracedException("a", failure_type=FailureType.transient_error), + AirbyteTracedException("b", failure_type=FailureType.config_error), + AirbyteTracedException("c"), + ValueError("d"), + ] + assert AsyncJobOrchestrator._aggregate_failure_type(exceptions) == FailureType.config_error + + def test_aggregate_failure_type_prefers_transient_over_system(self) -> None: + exceptions: List[Exception] = [ + AirbyteTracedException("a"), + AirbyteTracedException("b", failure_type=FailureType.transient_error), + ValueError("c"), + ] + assert ( + AsyncJobOrchestrator._aggregate_failure_type(exceptions) == FailureType.transient_error + ) + + def test_aggregate_failure_type_defaults_to_system_error(self) -> None: + exceptions: List[Exception] = [ + ValueError("a"), + AirbyteTracedException("b"), + ] + assert AsyncJobOrchestrator._aggregate_failure_type(exceptions) == FailureType.system_error + + def test_count_failure_types_counts_traced_and_plain_exceptions(self) -> None: + exceptions: List[Exception] = [ + AirbyteTracedException("a", failure_type=FailureType.transient_error), + AirbyteTracedException("b", failure_type=FailureType.transient_error), + AirbyteTracedException("c"), + ValueError("d"), + ] + counts = AsyncJobOrchestrator._count_failure_types(exceptions) + assert counts == { + FailureType.transient_error: 2, + FailureType.system_error: 2, + } + @mock.patch(sleep_mock_target) def test_given_jobs_failed_more_than_max_attempts_when_create_and_get_completed_partitions_then_free_job_budget( self, mock_sleep: MagicMock From 755d775d91570b20c765d606c8a7b870b93fa7fa Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 13 May 2026 15:02:09 +0000 Subject: [PATCH 2/3] fix(async-job): preserve creation failure type --- .../sources/declarative/async_job/job.py | 6 +++ .../declarative/async_job/job_orchestrator.py | 32 ++++++++++-- .../async_job/test_job_orchestrator.py | 49 +++++++++++++++++++ 3 files changed, 83 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/declarative/async_job/job.py b/airbyte_cdk/sources/declarative/async_job/job.py index 597b0d199..29275821f 100644 --- a/airbyte_cdk/sources/declarative/async_job/job.py +++ b/airbyte_cdk/sources/declarative/async_job/job.py @@ -24,12 +24,14 @@ def __init__( job_parameters: StreamSlice, timeout: Optional[timedelta] = None, is_creation_failure: bool = False, + creation_failure_exception: Optional[Exception] = None, ) -> None: self._api_job_id = api_job_id self._job_parameters = job_parameters self._status = AsyncJobStatus.RUNNING self._retry_after: Optional[datetime] = None self._is_creation_failure = is_creation_failure + self._creation_failure_exception = creation_failure_exception timeout = timeout if timeout else timedelta(minutes=60) self._timer = Timer(timeout) @@ -64,6 +66,10 @@ def is_creation_failure(self) -> bool: """Return True if this job was never actually created on the API side.""" return self._is_creation_failure + def creation_failure_exception(self) -> Optional[Exception]: + """Return the exception that prevented API job creation.""" + return self._creation_failure_exception + def set_retry_after(self, retry_after: datetime) -> None: """Set the earliest time this job can be retried.""" self._retry_after = retry_after diff --git a/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py b/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py index fc6862144..306a1eb2d 100644 --- a/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py +++ b/airbyte_cdk/sources/declarative/async_job/job_orchestrator.py @@ -328,16 +328,19 @@ def _keep_api_budget_with_failed_job( # Even though we're not sure this will break the stream, we will emit here for simplicity's sake. If we wanted to be more accurate, # we would keep the exceptions in-memory until we know that we have reached the max attempt. self._message_repository.emit_message(traced_exception.as_airbyte_message()) - job = self._create_failed_job(_slice) + job = self._create_failed_job(_slice, traced_exception) self._job_tracker.add_job(intent, job.api_job_id()) return job - def _create_failed_job(self, stream_slice: StreamSlice) -> AsyncJob: + def _create_failed_job( + self, stream_slice: StreamSlice, exception: Optional[Exception] = None + ) -> AsyncJob: job = AsyncJob( f"{uuid.uuid4()} - Job that could not start", stream_slice, _NO_TIMEOUT, is_creation_failure=True, + creation_failure_exception=exception, ) job.update_status(AsyncJobStatus.FAILED) return job @@ -511,11 +514,32 @@ def _process_partitions_with_errors(self, partition: AsyncPartition) -> None: AirbyteTracedException: If at least one job could not be completed. """ status_by_job_id = {job.api_job_id(): job.status() for job in partition.jobs} + creation_failure_exceptions = [ + exception + for exception in (job.creation_failure_exception() for job in partition.jobs) + if exception is not None + ] + creation_failure_details = ( + "\n".join( + ["Creation failure exceptions:"] + + [ + filter_secrets(exception.__repr__()) + for exception in creation_failure_exceptions + ] + ) + if creation_failure_exceptions + else "See warning logs for more information." + ) + failure_type = ( + self._aggregate_failure_type(creation_failure_exceptions) + if creation_failure_exceptions + else FailureType.system_error + ) self._non_breaking_exceptions.append( AirbyteTracedException( message="Async job failed after exhausting all retry attempts.", - internal_message=f"At least one job could not be completed for slice {partition.stream_slice}. Job statuses were: {status_by_job_id}. See warning logs for more information.", - failure_type=FailureType.system_error, + internal_message=f"At least one job could not be completed for slice {partition.stream_slice}. Job statuses were: {status_by_job_id}. {creation_failure_details}", + failure_type=failure_type, ) ) diff --git a/unit_tests/sources/declarative/async_job/test_job_orchestrator.py b/unit_tests/sources/declarative/async_job/test_job_orchestrator.py index d6c522719..6d1ecccfe 100644 --- a/unit_tests/sources/declarative/async_job/test_job_orchestrator.py +++ b/unit_tests/sources/declarative/async_job/test_job_orchestrator.py @@ -365,6 +365,55 @@ def test_count_failure_types_counts_traced_and_plain_exceptions(self) -> None: FailureType.system_error: 2, } + @mock.patch(sleep_mock_target) + def test_given_job_creation_transient_errors_when_attempts_exhausted_then_aggregate_is_transient( + self, mock_sleep: MagicMock + ) -> None: + self._job_repository.start.side_effect = [ + AirbyteTracedException( + message="API rate limit exceeded.", + internal_message="HTTP 429", + failure_type=FailureType.transient_error, + ), + AirbyteTracedException( + message="API rate limit exceeded.", + internal_message="HTTP 429", + failure_type=FailureType.transient_error, + ), + AirbyteTracedException( + message="API rate limit exceeded.", + internal_message="HTTP 429", + failure_type=FailureType.transient_error, + ), + ] + orchestrator = self._orchestrator([_A_STREAM_SLICE]) + + partitions, exception = self._accumulate_create_and_get_completed_partitions(orchestrator) + + assert len(partitions) == 0 + assert isinstance(exception, AirbyteTracedException) + assert exception.failure_type == FailureType.transient_error + assert exception.internal_message is not None + assert "Underlying failure breakdown: transient_error=1." in exception.internal_message + assert "HTTP 429" in exception.internal_message + + def test_create_failed_job_keeps_creation_failure_exception(self) -> None: + traced_exception = AirbyteTracedException( + message="API rate limit exceeded.", + internal_message="HTTP 429", + failure_type=FailureType.transient_error, + ) + orchestrator = AsyncJobOrchestrator( + self._job_repository, + [], + JobTracker(_NO_JOB_LIMIT), + self._message_repository, + ) + + job = orchestrator._create_failed_job(_A_STREAM_SLICE, traced_exception) + + assert job.creation_failure_exception() is traced_exception + @mock.patch(sleep_mock_target) def test_given_jobs_failed_more_than_max_attempts_when_create_and_get_completed_partitions_then_free_job_budget( self, mock_sleep: MagicMock From 356535fc7ec2804740695de40b198f67ec942e69 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 13 May 2026 15:19:24 +0000 Subject: [PATCH 3/3] fix(image): handle missing shared README symlinks --- airbyte_cdk/utils/docker.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/airbyte_cdk/utils/docker.py b/airbyte_cdk/utils/docker.py index 24467ff3c..2b9098514 100644 --- a/airbyte_cdk/utils/docker.py +++ b/airbyte_cdk/utils/docker.py @@ -213,6 +213,14 @@ def build_connector_image( # ensure the directory exists connector_dockerfile_dir.mkdir(parents=True, exist_ok=True) + if ( + metadata.data.language == ConnectorLanguage.PYTHON + and (connector_directory / "README.md").is_symlink() + ): + dockerfile_text = dockerfile_text.replace( + "COPY . ./\n", + "COPY . ./\nRUN if [ -L README.md ] && [ ! -e README.md ]; then rm README.md && touch README.md; fi\n", + ) dockerfile_path.write_text(dockerfile_text) dockerignore_path.write_text(dockerignore_text)