Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 72 additions & 5 deletions airbyte_cdk/sources/declarative/checks/check_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
from airbyte_cdk.sources.streams.core import Stream
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy

STRATEGY_ALL = "all"
STRATEGY_ANY_OF = "any_of"
_SUPPORTED_STRATEGIES = (STRATEGY_ALL, STRATEGY_ANY_OF)


def evaluate_availability(
stream: Union[Stream, AbstractStream], logger: logging.Logger
Expand Down Expand Up @@ -42,20 +46,37 @@ class DynamicStreamCheckConfig:
@dataclass
class CheckStream(ConnectionChecker):
"""
Checks the connections by checking availability of one or many streams selected by the developer
Checks the connection by checking availability of one or many streams selected by the developer.

When multiple `stream_names` are provided, the `strategy` attribute controls how
success is evaluated:

- `all` (default): every listed stream must be available for the check to pass.
This preserves the original behavior.
- `any_of`: the check passes as soon as any listed stream is available, and only
fails if every listed stream is unavailable. This is useful for APIs where
different users have scope for different streams (for example permission-scoped
OAuth tokens).

Attributes:
stream_name (List[str]): names of streams to check
stream_names: Names of streams to check.
strategy: Evaluation strategy, either `all` or `any_of`.
dynamic_streams_check_configs: Optional configuration for dynamic stream checks.
"""

stream_names: List[str]
parameters: InitVar[Mapping[str, Any]]
strategy: str = STRATEGY_ALL
dynamic_streams_check_configs: Optional[List[DynamicStreamCheckConfig]] = None

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._parameters = parameters
if self.dynamic_streams_check_configs is None:
self.dynamic_streams_check_configs = []
if self.strategy not in _SUPPORTED_STRATEGIES:
raise ValueError(
f"Unsupported CheckStream strategy {self.strategy!r}. Expected one of {list(_SUPPORTED_STRATEGIES)}."
)

def _log_error(self, logger: logging.Logger, action: str, error: Exception) -> Tuple[bool, str]:
"""Logs an error and returns a formatted error message."""
Expand Down Expand Up @@ -84,9 +105,15 @@ def check_connection(
f"{stream_name} is not part of the catalog. Expected one of {list(stream_name_to_stream.keys())}."
)

stream_availability, message = self._check_stream_availability(
stream_name_to_stream, stream_name, logger
)
if self.stream_names:
if self.strategy == STRATEGY_ANY_OF:
stream_availability, message = self._check_any_of_stream_availability(
stream_name_to_stream, logger
)
else:
stream_availability, message = self._check_all_stream_availability(
stream_name_to_stream, logger
)
if not stream_availability:
return stream_availability, message

Expand All @@ -101,6 +128,46 @@ def check_connection(

return True, None

def _check_all_stream_availability(
self,
stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]],
logger: logging.Logger,
) -> Tuple[bool, Any]:
"""Returns success only if every stream in `stream_names` is available."""
for stream_name in self.stream_names:
stream_availability, message = self._check_stream_availability(
stream_name_to_stream, stream_name, logger
)
if not stream_availability:
return stream_availability, message
return True, None

def _check_any_of_stream_availability(
self,
stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]],
logger: logging.Logger,
) -> Tuple[bool, Any]:
"""Returns success if any stream in `stream_names` is available.

Short-circuits on the first available stream. If every listed stream is
unavailable, returns a single aggregated failure message so the user can
see why each candidate was rejected.
"""
failure_messages: List[str] = []
for stream_name in self.stream_names:
stream_availability, message = self._check_stream_availability(
stream_name_to_stream, stream_name, logger
)
if stream_availability:
return True, None
failure_messages.append(str(message))

aggregated = "; ".join(failure_messages)
return (
False,
f"None of the configured check streams were available. {aggregated}",
)

def _check_stream_availability(
self,
stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]],
Expand Down
16 changes: 16 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,22 @@ definitions:
examples:
- ["users"]
- ["users", "contacts"]
strategy:
title: Check Strategy
description: >-
Strategy used to evaluate success when multiple `stream_names` are
provided. `all` (the default) requires every listed stream to be
available for the check to pass. `any_of` passes as soon as any
listed stream is available, and only fails if every listed stream
is unavailable. `any_of` is useful for APIs where different users
have scope for different streams (for example permission-scoped
OAuth tokens) and a single hardcoded check stream would otherwise
produce misleading failures.
type: string
enum:
- all
- any_of
default: all
dynamic_streams_check_configs:
type: array
items:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1599,6 +1599,11 @@ class AuthFlow(BaseModel):
oauth_config_specification: Optional[OAuthConfigSpecification] = None


class Strategy(Enum):
all = "all"
any_of = "any_of"


class CheckStream(BaseModel):
type: Literal["CheckStream"]
stream_names: Optional[List[str]] = Field(
Expand All @@ -1607,6 +1612,11 @@ class CheckStream(BaseModel):
examples=[["users"], ["users", "contacts"]],
title="Stream Names",
)
strategy: Optional[Strategy] = Field(
Strategy.all,
description="Strategy used to evaluate success when multiple `stream_names` are provided. `all` (the default) requires every listed stream to be available for the check to pass. `any_of` passes as soon as any listed stream is available, and only fails if every listed stream is unavailable. `any_of` is useful for APIs where different users have scope for different streams (for example permission-scoped OAuth tokens) and a single hardcoded check stream would otherwise produce misleading failures.",
title="Check Strategy",
)
dynamic_streams_check_configs: Optional[List[DynamicStreamCheckConfig]] = None


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1268,6 +1268,7 @@ def create_check_stream(

return CheckStream(
stream_names=model.stream_names or [],
strategy=model.strategy.value if model.strategy is not None else "all",
dynamic_streams_check_configs=dynamic_streams_check_configs,
parameters={},
)
Expand Down
100 changes: 100 additions & 0 deletions unit_tests/sources/declarative/checks/test_check_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,106 @@ def test_check_stream_with_no_stream_slices_aborts():
assert "no stream slices were found, likely because the parent stream is empty" in reason


def _build_stream(name: str, *, available: bool, reason: str | None = None) -> MagicMock:
"""Builds a mock `Stream` whose availability is driven by stream slice results.

A stream is considered available when its `stream_slices` yields at least
one slice and `read_records` yields at least one record for that slice.
To simulate an "unavailable" stream, we make `stream_slices` yield an empty
iterator so the availability strategy reports the stream as unavailable.
"""
stream = MagicMock(spec=Stream)
stream.name = name
stream.availability_strategy = None
if available:
stream.stream_slices.return_value = iter([{}])
stream.read_records.return_value = iter([MagicMock()])
else:
stream.stream_slices.return_value = iter([])
return stream


def test_check_stream_strategy_defaults_to_all():
"""The `strategy` attribute defaults to `all` to preserve existing behavior."""
check_stream = CheckStream(["s1"], parameters={})
assert check_stream.strategy == "all"


def test_check_stream_rejects_unsupported_strategy():
with pytest.raises(ValueError, match="Unsupported CheckStream strategy"):
CheckStream(["s1"], parameters={}, strategy="first_available")


def test_check_stream_any_of_returns_success_when_first_stream_is_available():
available = _build_stream("available_stream", available=True)
unavailable = _build_stream("unavailable_stream", available=False)

source = MagicMock()
source.streams.return_value = [available, unavailable]

check_stream = CheckStream(
["available_stream", "unavailable_stream"],
parameters={},
strategy="any_of",
)
stream_is_available, reason = check_stream.check_connection(source, logger, config)

assert stream_is_available
assert reason is None
# any_of should short-circuit on the first success and skip subsequent streams.
unavailable.stream_slices.assert_not_called()


def test_check_stream_any_of_returns_success_when_later_stream_is_available():
unavailable = _build_stream("unavailable_stream", available=False)
available = _build_stream("available_stream", available=True)

source = MagicMock()
source.streams.return_value = [unavailable, available]

check_stream = CheckStream(
["unavailable_stream", "available_stream"],
parameters={},
strategy="any_of",
)
stream_is_available, reason = check_stream.check_connection(source, logger, config)

assert stream_is_available
assert reason is None


def test_check_stream_any_of_fails_only_when_all_streams_are_unavailable():
first_unavailable = _build_stream("s1", available=False)
second_unavailable = _build_stream("s2", available=False)

source = MagicMock()
source.streams.return_value = [first_unavailable, second_unavailable]

check_stream = CheckStream(["s1", "s2"], parameters={}, strategy="any_of")
stream_is_available, reason = check_stream.check_connection(source, logger, config)

assert not stream_is_available
assert "None of the configured check streams were available" in reason
# Aggregated failure message should reference both candidate streams so the
# user can see why each was rejected.
assert "Stream s1 is not available" in reason
assert "Stream s2 is not available" in reason


def test_check_stream_all_strategy_fails_if_any_stream_is_unavailable():
available = _build_stream("s1", available=True)
unavailable = _build_stream("s2", available=False)

source = MagicMock()
source.streams.return_value = [available, unavailable]

check_stream = CheckStream(["s1", "s2"], parameters={}, strategy="all")
stream_is_available, reason = check_stream.check_connection(source, logger, config)

assert not stream_is_available
assert "Stream s2 is not available" in reason


@pytest.mark.parametrize(
"test_name, response_code, available_expectation, expected_messages",
[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,23 @@ def test_create_check_stream():

assert isinstance(check, CheckStream)
assert check.stream_names == ["list_stream"]
assert check.strategy == "all"


def test_create_check_stream_with_any_of_strategy():
manifest = {
"check": {
"type": "CheckStream",
"stream_names": ["users", "contacts"],
"strategy": "any_of",
}
}

check = factory.create_component(CheckStreamModel, manifest["check"], {})

assert isinstance(check, CheckStream)
assert check.stream_names == ["users", "contacts"]
assert check.strategy == "any_of"


def test_create_component_type_mismatch():
Expand Down
Loading