Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions airbyte_cdk/sources/declarative/async_job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ def __init__(
job_parameters: StreamSlice,
timeout: Optional[timedelta] = None,
is_creation_failure: bool = False,
creation_failure_exception: Optional[Exception] = None,
) -> None:
self._api_job_id = api_job_id
self._job_parameters = job_parameters
self._status = AsyncJobStatus.RUNNING
self._retry_after: Optional[datetime] = None
self._is_creation_failure = is_creation_failure
self._creation_failure_exception = creation_failure_exception

timeout = timeout if timeout else timedelta(minutes=60)
self._timer = Timer(timeout)
Expand Down Expand Up @@ -64,6 +66,10 @@ def is_creation_failure(self) -> bool:
"""Return True if this job was never actually created on the API side."""
return self._is_creation_failure

def creation_failure_exception(self) -> Optional[Exception]:
"""Return the exception that prevented API job creation."""
return self._creation_failure_exception

def set_retry_after(self, retry_after: datetime) -> None:
"""Set the earliest time this job can be retried."""
self._retry_after = retry_after
Expand Down
102 changes: 95 additions & 7 deletions airbyte_cdk/sources/declarative/async_job/job_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from datetime import datetime, timedelta, timezone
from typing import (
Any,
Dict,
Generator,
Generic,
Iterable,
Expand Down Expand Up @@ -38,6 +39,29 @@
_NO_TIMEOUT = timedelta.max
_API_SIDE_RUNNING_STATUS = {AsyncJobStatus.RUNNING, AsyncJobStatus.TIMED_OUT}

# Precedence used to aggregate the `FailureType` of many non-breaking
# exceptions into a single value. A `config_error` means the user must act
# before retries can succeed, so it dominates. `transient_error` is next
# (retryable). `system_error` is the fallback for genuine internal failures.
_FAILURE_TYPE_PRECEDENCE: Tuple[FailureType, ...] = (
FailureType.config_error,
FailureType.transient_error,
FailureType.system_error,
)

# Deterministic, aggregation-friendly user-facing messages per dominant
# `FailureType`. Counts and raw exception reprs go into `internal_message`
# so that the `message` field stays stable as a log aggregation key.
_ASYNC_JOB_FAILURE_MESSAGE_BY_TYPE: Mapping[FailureType, str] = {
FailureType.config_error: (
"Async jobs failed because the source API rejected the request as unauthorized or forbidden."
),
FailureType.transient_error: (
"Async jobs failed after exhausting retries for source API rate limit or transient errors."
),
FailureType.system_error: "Async jobs failed after exhausting retry attempts.",
}


class AsyncPartition:
"""
Expand Down Expand Up @@ -304,16 +328,19 @@ def _keep_api_budget_with_failed_job(
# Even though we're not sure this will break the stream, we will emit here for simplicity's sake. If we wanted to be more accurate,
# we would keep the exceptions in-memory until we know that we have reached the max attempt.
self._message_repository.emit_message(traced_exception.as_airbyte_message())
job = self._create_failed_job(_slice)
job = self._create_failed_job(_slice, traced_exception)
self._job_tracker.add_job(intent, job.api_job_id())
return job

def _create_failed_job(self, stream_slice: StreamSlice) -> AsyncJob:
def _create_failed_job(
self, stream_slice: StreamSlice, exception: Optional[Exception] = None
) -> AsyncJob:
job = AsyncJob(
f"{uuid.uuid4()} - Job that could not start",
stream_slice,
_NO_TIMEOUT,
is_creation_failure=True,
creation_failure_exception=exception,
)
job.update_status(AsyncJobStatus.FAILED)
return job
Expand Down Expand Up @@ -487,11 +514,32 @@ def _process_partitions_with_errors(self, partition: AsyncPartition) -> None:
AirbyteTracedException: If at least one job could not be completed.
"""
status_by_job_id = {job.api_job_id(): job.status() for job in partition.jobs}
creation_failure_exceptions = [
exception
for exception in (job.creation_failure_exception() for job in partition.jobs)
if exception is not None
]
creation_failure_details = (
"\n".join(
["Creation failure exceptions:"]
+ [
filter_secrets(exception.__repr__())
for exception in creation_failure_exceptions
]
)
if creation_failure_exceptions
else "See warning logs for more information."
)
failure_type = (
self._aggregate_failure_type(creation_failure_exceptions)
if creation_failure_exceptions
else FailureType.system_error
)
self._non_breaking_exceptions.append(
AirbyteTracedException(
message="Async job failed after exhausting all retry attempts.",
internal_message=f"At least one job could not be completed for slice {partition.stream_slice}. Job statuses were: {status_by_job_id}. See warning logs for more information.",
failure_type=FailureType.system_error,
internal_message=f"At least one job could not be completed for slice {partition.stream_slice}. Job statuses were: {status_by_job_id}. {creation_failure_details}",
failure_type=failure_type,
)
)

Expand Down Expand Up @@ -536,16 +584,56 @@ def create_and_get_completed_partitions(self) -> Iterable[AsyncPartition]:
if self._non_breaking_exceptions:
# We emitted traced message but we didn't break on non_breaking_exception. We still need to raise an exception so that the
# call of `create_and_get_completed_partitions` knows that there was an issue with some partitions and the sync is incomplete.
failure_type = self._aggregate_failure_type(self._non_breaking_exceptions)
failure_counts = self._count_failure_types(self._non_breaking_exceptions)
summary = ", ".join(
f"{ft.value}={failure_counts[ft]}"
for ft in _FAILURE_TYPE_PRECEDENCE
if ft in failure_counts
)
raise AirbyteTracedException(
message="One or more async jobs failed after exhausting all retry attempts.",
message=_ASYNC_JOB_FAILURE_MESSAGE_BY_TYPE[failure_type],
internal_message="\n".join(
[
[f"Underlying failure breakdown: {summary}."]
+ [
filter_secrets(exception.__repr__())
for exception in self._non_breaking_exceptions
]
),
failure_type=FailureType.system_error,
failure_type=failure_type,
)

@staticmethod
def _aggregate_failure_type(exceptions: List[Exception]) -> FailureType:
"""Return the highest-precedence `FailureType` across `exceptions`.

Non-`AirbyteTracedException` exceptions are treated as `system_error`
(matching `AirbyteTracedException`'s default). The precedence order
is `config_error` > `transient_error` > `system_error`.
"""
types_present: Set[FailureType] = {
exc.failure_type
if isinstance(exc, AirbyteTracedException) and exc.failure_type is not None
else FailureType.system_error
for exc in exceptions
}
for failure_type in _FAILURE_TYPE_PRECEDENCE:
if failure_type in types_present:
return failure_type
return FailureType.system_error

@staticmethod
def _count_failure_types(exceptions: List[Exception]) -> Dict[FailureType, int]:
"""Return a count of each `FailureType` observed in `exceptions`."""
counts: Dict[FailureType, int] = {}
for exc in exceptions:
failure_type = (
exc.failure_type
if isinstance(exc, AirbyteTracedException) and exc.failure_type is not None
else FailureType.system_error
)
counts[failure_type] = counts.get(failure_type, 0) + 1
return counts

def _handle_non_breaking_error(self, exception: Exception) -> None:
LOGGER.error(f"Failed to start the Job: {exception}, traceback: {traceback.format_exc()}")
Expand Down
8 changes: 8 additions & 0 deletions airbyte_cdk/utils/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,14 @@ def build_connector_image(

# ensure the directory exists
connector_dockerfile_dir.mkdir(parents=True, exist_ok=True)
if (
metadata.data.language == ConnectorLanguage.PYTHON
and (connector_directory / "README.md").is_symlink()
):
dockerfile_text = dockerfile_text.replace(
"COPY . ./\n",
"COPY . ./\nRUN if [ -L README.md ] && [ ! -e README.md ]; then rm README.md && touch README.md; fi\n",
)
dockerfile_path.write_text(dockerfile_text)
dockerignore_path.write_text(dockerignore_text)

Expand Down
88 changes: 88 additions & 0 deletions unit_tests/sources/declarative/async_job/test_job_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,94 @@ def test_given_exception_when_start_job_and_skip_this_exception(
assert self._message_repository.emit_message.call_count == 3 # one for each traced message
assert exception.failure_type == FailureType.system_error # type: ignore # exception should be of type AirbyteTracedException

def test_aggregate_failure_type_gives_config_error_highest_precedence(self) -> None:
exceptions: List[Exception] = [
AirbyteTracedException("a", failure_type=FailureType.transient_error),
AirbyteTracedException("b", failure_type=FailureType.config_error),
AirbyteTracedException("c"),
ValueError("d"),
]
assert AsyncJobOrchestrator._aggregate_failure_type(exceptions) == FailureType.config_error

def test_aggregate_failure_type_prefers_transient_over_system(self) -> None:
exceptions: List[Exception] = [
AirbyteTracedException("a"),
AirbyteTracedException("b", failure_type=FailureType.transient_error),
ValueError("c"),
]
assert (
AsyncJobOrchestrator._aggregate_failure_type(exceptions) == FailureType.transient_error
)

def test_aggregate_failure_type_defaults_to_system_error(self) -> None:
exceptions: List[Exception] = [
ValueError("a"),
AirbyteTracedException("b"),
]
assert AsyncJobOrchestrator._aggregate_failure_type(exceptions) == FailureType.system_error

def test_count_failure_types_counts_traced_and_plain_exceptions(self) -> None:
exceptions: List[Exception] = [
AirbyteTracedException("a", failure_type=FailureType.transient_error),
AirbyteTracedException("b", failure_type=FailureType.transient_error),
AirbyteTracedException("c"),
ValueError("d"),
]
counts = AsyncJobOrchestrator._count_failure_types(exceptions)
assert counts == {
FailureType.transient_error: 2,
FailureType.system_error: 2,
}

@mock.patch(sleep_mock_target)
def test_given_job_creation_transient_errors_when_attempts_exhausted_then_aggregate_is_transient(
self, mock_sleep: MagicMock
) -> None:
self._job_repository.start.side_effect = [
AirbyteTracedException(
message="API rate limit exceeded.",
internal_message="HTTP 429",
failure_type=FailureType.transient_error,
),
AirbyteTracedException(
message="API rate limit exceeded.",
internal_message="HTTP 429",
failure_type=FailureType.transient_error,
),
AirbyteTracedException(
message="API rate limit exceeded.",
internal_message="HTTP 429",
failure_type=FailureType.transient_error,
),
]
orchestrator = self._orchestrator([_A_STREAM_SLICE])

partitions, exception = self._accumulate_create_and_get_completed_partitions(orchestrator)

assert len(partitions) == 0
assert isinstance(exception, AirbyteTracedException)
assert exception.failure_type == FailureType.transient_error
assert exception.internal_message is not None
assert "Underlying failure breakdown: transient_error=1." in exception.internal_message
assert "HTTP 429" in exception.internal_message

def test_create_failed_job_keeps_creation_failure_exception(self) -> None:
traced_exception = AirbyteTracedException(
message="API rate limit exceeded.",
internal_message="HTTP 429",
failure_type=FailureType.transient_error,
)
orchestrator = AsyncJobOrchestrator(
self._job_repository,
[],
JobTracker(_NO_JOB_LIMIT),
self._message_repository,
)

job = orchestrator._create_failed_job(_A_STREAM_SLICE, traced_exception)

assert job.creation_failure_exception() is traced_exception

@mock.patch(sleep_mock_target)
def test_given_jobs_failed_more_than_max_attempts_when_create_and_get_completed_partitions_then_free_job_budget(
self, mock_sleep: MagicMock
Expand Down
Loading