From 2c4c3bff2f5ff1840be7da7286f5a9819faab86c Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 21 Apr 2026 23:25:33 +0000 Subject: [PATCH] feat: add any_of strategy to CheckStream for dynamic check stream selection Adds a new optional 'strategy' field on the low-code CheckStream component. - strategy: all (default) preserves existing behavior: every listed stream must be available for the check to pass. - strategy: any_of passes as soon as any listed stream is available, and only fails if every listed stream is unavailable. Short-circuits on the first success, aggregates failure reasons otherwise. This unblocks connectors where different users have scope for different streams (e.g. permission-scoped OAuth tokens) and a single hardcoded check stream would otherwise produce misleading failures. Resolves airbytehq/airbyte-internal-issues#16231 Co-Authored-By: bot_apk --- .../declarative/checks/check_stream.py | 77 +++++++++++++- .../declarative_component_schema.yaml | 16 +++ .../models/declarative_component_schema.py | 10 ++ .../parsers/model_to_component_factory.py | 1 + .../declarative/checks/test_check_stream.py | 100 ++++++++++++++++++ .../test_model_to_component_factory.py | 17 +++ 6 files changed, 216 insertions(+), 5 deletions(-) diff --git a/airbyte_cdk/sources/declarative/checks/check_stream.py b/airbyte_cdk/sources/declarative/checks/check_stream.py index 540813a49..d8608aaae 100644 --- a/airbyte_cdk/sources/declarative/checks/check_stream.py +++ b/airbyte_cdk/sources/declarative/checks/check_stream.py @@ -13,6 +13,10 @@ from airbyte_cdk.sources.streams.core import Stream from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy +STRATEGY_ALL = "all" +STRATEGY_ANY_OF = "any_of" +_SUPPORTED_STRATEGIES = (STRATEGY_ALL, STRATEGY_ANY_OF) + def evaluate_availability( stream: Union[Stream, AbstractStream], logger: logging.Logger @@ -42,20 +46,37 @@ class DynamicStreamCheckConfig: @dataclass class CheckStream(ConnectionChecker): """ - Checks the connections by checking availability of one or many streams selected by the developer + Checks the connection by checking availability of one or many streams selected by the developer. + + When multiple `stream_names` are provided, the `strategy` attribute controls how + success is evaluated: + + - `all` (default): every listed stream must be available for the check to pass. + This preserves the original behavior. + - `any_of`: the check passes as soon as any listed stream is available, and only + fails if every listed stream is unavailable. This is useful for APIs where + different users have scope for different streams (for example permission-scoped + OAuth tokens). Attributes: - stream_name (List[str]): names of streams to check + stream_names: Names of streams to check. + strategy: Evaluation strategy, either `all` or `any_of`. + dynamic_streams_check_configs: Optional configuration for dynamic stream checks. """ stream_names: List[str] parameters: InitVar[Mapping[str, Any]] + strategy: str = STRATEGY_ALL dynamic_streams_check_configs: Optional[List[DynamicStreamCheckConfig]] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._parameters = parameters if self.dynamic_streams_check_configs is None: self.dynamic_streams_check_configs = [] + if self.strategy not in _SUPPORTED_STRATEGIES: + raise ValueError( + f"Unsupported CheckStream strategy {self.strategy!r}. Expected one of {list(_SUPPORTED_STRATEGIES)}." + ) def _log_error(self, logger: logging.Logger, action: str, error: Exception) -> Tuple[bool, str]: """Logs an error and returns a formatted error message.""" @@ -84,9 +105,15 @@ def check_connection( f"{stream_name} is not part of the catalog. Expected one of {list(stream_name_to_stream.keys())}." ) - stream_availability, message = self._check_stream_availability( - stream_name_to_stream, stream_name, logger - ) + if self.stream_names: + if self.strategy == STRATEGY_ANY_OF: + stream_availability, message = self._check_any_of_stream_availability( + stream_name_to_stream, logger + ) + else: + stream_availability, message = self._check_all_stream_availability( + stream_name_to_stream, logger + ) if not stream_availability: return stream_availability, message @@ -101,6 +128,46 @@ def check_connection( return True, None + def _check_all_stream_availability( + self, + stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]], + logger: logging.Logger, + ) -> Tuple[bool, Any]: + """Returns success only if every stream in `stream_names` is available.""" + for stream_name in self.stream_names: + stream_availability, message = self._check_stream_availability( + stream_name_to_stream, stream_name, logger + ) + if not stream_availability: + return stream_availability, message + return True, None + + def _check_any_of_stream_availability( + self, + stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]], + logger: logging.Logger, + ) -> Tuple[bool, Any]: + """Returns success if any stream in `stream_names` is available. + + Short-circuits on the first available stream. If every listed stream is + unavailable, returns a single aggregated failure message so the user can + see why each candidate was rejected. + """ + failure_messages: List[str] = [] + for stream_name in self.stream_names: + stream_availability, message = self._check_stream_availability( + stream_name_to_stream, stream_name, logger + ) + if stream_availability: + return True, None + failure_messages.append(str(message)) + + aggregated = "; ".join(failure_messages) + return ( + False, + f"None of the configured check streams were available. {aggregated}", + ) + def _check_stream_availability( self, stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]], diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index dda723c00..a86077e1c 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -339,6 +339,22 @@ definitions: examples: - ["users"] - ["users", "contacts"] + strategy: + title: Check Strategy + description: >- + Strategy used to evaluate success when multiple `stream_names` are + provided. `all` (the default) requires every listed stream to be + available for the check to pass. `any_of` passes as soon as any + listed stream is available, and only fails if every listed stream + is unavailable. `any_of` is useful for APIs where different users + have scope for different streams (for example permission-scoped + OAuth tokens) and a single hardcoded check stream would otherwise + produce misleading failures. + type: string + enum: + - all + - any_of + default: all dynamic_streams_check_configs: type: array items: diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 2fe33e672..988be95ce 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1599,6 +1599,11 @@ class AuthFlow(BaseModel): oauth_config_specification: Optional[OAuthConfigSpecification] = None +class Strategy(Enum): + all = "all" + any_of = "any_of" + + class CheckStream(BaseModel): type: Literal["CheckStream"] stream_names: Optional[List[str]] = Field( @@ -1607,6 +1612,11 @@ class CheckStream(BaseModel): examples=[["users"], ["users", "contacts"]], title="Stream Names", ) + strategy: Optional[Strategy] = Field( + Strategy.all, + description="Strategy used to evaluate success when multiple `stream_names` are provided. `all` (the default) requires every listed stream to be available for the check to pass. `any_of` passes as soon as any listed stream is available, and only fails if every listed stream is unavailable. `any_of` is useful for APIs where different users have scope for different streams (for example permission-scoped OAuth tokens) and a single hardcoded check stream would otherwise produce misleading failures.", + title="Check Strategy", + ) dynamic_streams_check_configs: Optional[List[DynamicStreamCheckConfig]] = None diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index f0b984e8e..4be13be63 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -1268,6 +1268,7 @@ def create_check_stream( return CheckStream( stream_names=model.stream_names or [], + strategy=model.strategy.value if model.strategy is not None else "all", dynamic_streams_check_configs=dynamic_streams_check_configs, parameters={}, ) diff --git a/unit_tests/sources/declarative/checks/test_check_stream.py b/unit_tests/sources/declarative/checks/test_check_stream.py index 5821ae02e..a7784f07e 100644 --- a/unit_tests/sources/declarative/checks/test_check_stream.py +++ b/unit_tests/sources/declarative/checks/test_check_stream.py @@ -106,6 +106,106 @@ def test_check_stream_with_no_stream_slices_aborts(): assert "no stream slices were found, likely because the parent stream is empty" in reason +def _build_stream(name: str, *, available: bool, reason: str | None = None) -> MagicMock: + """Builds a mock `Stream` whose availability is driven by stream slice results. + + A stream is considered available when its `stream_slices` yields at least + one slice and `read_records` yields at least one record for that slice. + To simulate an "unavailable" stream, we make `stream_slices` yield an empty + iterator so the availability strategy reports the stream as unavailable. + """ + stream = MagicMock(spec=Stream) + stream.name = name + stream.availability_strategy = None + if available: + stream.stream_slices.return_value = iter([{}]) + stream.read_records.return_value = iter([MagicMock()]) + else: + stream.stream_slices.return_value = iter([]) + return stream + + +def test_check_stream_strategy_defaults_to_all(): + """The `strategy` attribute defaults to `all` to preserve existing behavior.""" + check_stream = CheckStream(["s1"], parameters={}) + assert check_stream.strategy == "all" + + +def test_check_stream_rejects_unsupported_strategy(): + with pytest.raises(ValueError, match="Unsupported CheckStream strategy"): + CheckStream(["s1"], parameters={}, strategy="first_available") + + +def test_check_stream_any_of_returns_success_when_first_stream_is_available(): + available = _build_stream("available_stream", available=True) + unavailable = _build_stream("unavailable_stream", available=False) + + source = MagicMock() + source.streams.return_value = [available, unavailable] + + check_stream = CheckStream( + ["available_stream", "unavailable_stream"], + parameters={}, + strategy="any_of", + ) + stream_is_available, reason = check_stream.check_connection(source, logger, config) + + assert stream_is_available + assert reason is None + # any_of should short-circuit on the first success and skip subsequent streams. + unavailable.stream_slices.assert_not_called() + + +def test_check_stream_any_of_returns_success_when_later_stream_is_available(): + unavailable = _build_stream("unavailable_stream", available=False) + available = _build_stream("available_stream", available=True) + + source = MagicMock() + source.streams.return_value = [unavailable, available] + + check_stream = CheckStream( + ["unavailable_stream", "available_stream"], + parameters={}, + strategy="any_of", + ) + stream_is_available, reason = check_stream.check_connection(source, logger, config) + + assert stream_is_available + assert reason is None + + +def test_check_stream_any_of_fails_only_when_all_streams_are_unavailable(): + first_unavailable = _build_stream("s1", available=False) + second_unavailable = _build_stream("s2", available=False) + + source = MagicMock() + source.streams.return_value = [first_unavailable, second_unavailable] + + check_stream = CheckStream(["s1", "s2"], parameters={}, strategy="any_of") + stream_is_available, reason = check_stream.check_connection(source, logger, config) + + assert not stream_is_available + assert "None of the configured check streams were available" in reason + # Aggregated failure message should reference both candidate streams so the + # user can see why each was rejected. + assert "Stream s1 is not available" in reason + assert "Stream s2 is not available" in reason + + +def test_check_stream_all_strategy_fails_if_any_stream_is_unavailable(): + available = _build_stream("s1", available=True) + unavailable = _build_stream("s2", available=False) + + source = MagicMock() + source.streams.return_value = [available, unavailable] + + check_stream = CheckStream(["s1", "s2"], parameters={}, strategy="all") + stream_is_available, reason = check_stream.check_connection(source, logger, config) + + assert not stream_is_available + assert "Stream s2 is not available" in reason + + @pytest.mark.parametrize( "test_name, response_code, available_expectation, expected_messages", [ diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index add9a1c42..317411e14 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -236,6 +236,23 @@ def test_create_check_stream(): assert isinstance(check, CheckStream) assert check.stream_names == ["list_stream"] + assert check.strategy == "all" + + +def test_create_check_stream_with_any_of_strategy(): + manifest = { + "check": { + "type": "CheckStream", + "stream_names": ["users", "contacts"], + "strategy": "any_of", + } + } + + check = factory.create_component(CheckStreamModel, manifest["check"], {}) + + assert isinstance(check, CheckStream) + assert check.stream_names == ["users", "contacts"] + assert check.strategy == "any_of" def test_create_component_type_mismatch():