From bb5e7018f2e86eed19779bf64e5d652c6875d349 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 4 May 2026 23:23:44 +0000 Subject: [PATCH] feat(low-code-cdk): allow user-configurable check streams via hidden spec field on CheckStream Co-Authored-By: bot_apk --- CHANGELOG.md | 4 + .../declarative/checks/check_stream.py | 68 ++++- .../concurrent_declarative_source.py | 66 +++++ .../declarative_component_schema.yaml | 15 ++ .../models/declarative_component_schema.py | 6 + .../parsers/model_to_component_factory.py | 1 + .../declarative/checks/test_check_stream.py | 147 +++++++++- .../test_concurrent_declarative_source.py | 252 ++++++++++++++++++ 8 files changed, 543 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 31e73e411..2e99efa34 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ Newer updates can be found here: [GitHub Release Notes](https://github.com/airby # Changelog +## Unreleased + +low-code: Add opt-in `config_check_streams_path` to `CheckStream` so connections can supply a hidden array-of-strings config field that overrides the manifest's `stream_names` for the duration of the check, and convert the previous `ValueError` raised on unknown stream names into a `(False, message)` result that lists all unknowns and their source (config path vs. manifest). + ## 6.5.2 bugfix: Ensure that streams with partition router are not executed concurrently diff --git a/airbyte_cdk/sources/declarative/checks/check_stream.py b/airbyte_cdk/sources/declarative/checks/check_stream.py index 540813a49..9e97733b7 100644 --- a/airbyte_cdk/sources/declarative/checks/check_stream.py +++ b/airbyte_cdk/sources/declarative/checks/check_stream.py @@ -7,6 +7,8 @@ from dataclasses import InitVar, dataclass from typing import Any, Dict, List, Mapping, Optional, Tuple, Union +import dpath + from airbyte_cdk.sources import Source from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream @@ -41,16 +43,21 @@ class DynamicStreamCheckConfig: @dataclass class CheckStream(ConnectionChecker): - """ - Checks the connections by checking availability of one or many streams selected by the developer + """Checks the connection by verifying availability of one or many streams. Attributes: - stream_name (List[str]): names of streams to check + stream_names: Manifest-declared default stream names to check. + dynamic_streams_check_configs: Optional dynamic-stream check configs. + config_check_streams_path: Optional dot-delimited path into the + user-provided config whose value (when present and a non-empty + list) overrides `stream_names` for this check. When empty, + missing, or `None`, the manifest's `stream_names` is used. """ stream_names: List[str] parameters: InitVar[Mapping[str, Any]] dynamic_streams_check_configs: Optional[List[DynamicStreamCheckConfig]] = None + config_check_streams_path: Optional[str] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._parameters = parameters @@ -63,6 +70,32 @@ def _log_error(self, logger: logging.Logger, action: str, error: Exception) -> T logger.error(error_message + f"Error traceback: \n {traceback.format_exc()}", exc_info=True) return False, error_message + def _resolve_effective_stream_names( + self, config: Mapping[str, Any] + ) -> Tuple[Optional[List[str]], Optional[str]]: + """Resolves the list of stream names to check for this connection. + + Returns a `(stream_names, error_message)` tuple. When `error_message` + is set, the caller should short-circuit with `(False, error_message)`. + When `config_check_streams_path` is unset, or the referenced value is + missing or an empty list, falls back to the manifest's `stream_names`. + """ + if not self.config_check_streams_path: + return self.stream_names, None + + configured_value = dpath.get( + dict(config), self.config_check_streams_path, separator=".", default=None + ) + if configured_value is None: + return self.stream_names, None + if not isinstance(configured_value, list): + return None, ( + f"Config field '{self.config_check_streams_path}' must be a list of stream names." + ) + if not configured_value: + return self.stream_names, None + return list(configured_value), None + def check_connection( self, source: Source, @@ -78,12 +111,31 @@ def check_connection( return self._log_error(logger, "discovering streams", error) stream_name_to_stream = {s.name: s for s in streams} - for stream_name in self.stream_names: - if stream_name not in stream_name_to_stream: - raise ValueError( - f"{stream_name} is not part of the catalog. Expected one of {list(stream_name_to_stream.keys())}." - ) + effective_stream_names, override_error = self._resolve_effective_stream_names(config) + if override_error is not None: + return False, override_error + + source_label = ( + f"config path '{self.config_check_streams_path}'" + if self.config_check_streams_path and effective_stream_names is not self.stream_names + else "manifest" + ) + + unknown_stream_names = [ + stream_name + for stream_name in (effective_stream_names or []) + if stream_name not in stream_name_to_stream + ] + if unknown_stream_names: + available = list(stream_name_to_stream.keys()) + message = ( + f"Stream(s) {unknown_stream_names} from {source_label} are not part of " + f"the catalog. Expected one of {available}." + ) + return False, message + + for stream_name in effective_stream_names or []: stream_availability, message = self._check_stream_availability( stream_name_to_stream, stream_name, logger ) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 292615692..1d1310515 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -213,6 +213,8 @@ def __init__( self._validate_source() # apply additional post-processing to the manifest self._post_process_manifest() + # inject the hidden config-check-streams-path property into the spec, if requested + self._inject_config_check_streams_property() spec: Optional[Mapping[str, Any]] = self._source_config.get("spec") self._spec_component: Optional[Spec] = ( @@ -293,6 +295,70 @@ def _post_process_manifest(self) -> None: # apply manifest normalization, if required self._normalize_manifest() + def _inject_config_check_streams_property(self) -> None: + """Injects the hidden `config_check_streams_path` property into the spec. + + When the manifest's `check` block is a `CheckStream` with a + `config_check_streams_path` set, ensure the connector spec exposes a + hidden array-of-strings property at that path so platforms can plumb + the override through. The injected property is not added to + `required`. If the spec already declares a property at that key, its + existing shape is preserved and `airbyte_hidden: true` is forced. + + Manifests without a `spec` block are left unchanged — the runtime + override still works because it reads directly from the user's + config; the injection is only meaningful for platforms that surface + the connector spec. + + v1 only supports single-level paths. Dotted paths raise a clear error + so the manifest author can simplify their config schema or wait for + v2 nesting support. + """ + check = self._source_config.get("check") + if not isinstance(check, Mapping) or check.get("type") != "CheckStream": + return + + path = check.get("config_check_streams_path") + if not isinstance(path, str) or not path: + return + + if "." in path: + raise ValueError( + "CheckStream.config_check_streams_path only supports single-level paths in v1. " + f"Got '{path}'. Declare a top-level config field and reference it without dots." + ) + + spec = self._source_config.get("spec") + if not isinstance(spec, dict): + return + connection_specification = spec.setdefault("connection_specification", {}) + if not isinstance(connection_specification, dict): + raise ValueError( + "Expected 'spec.connection_specification' to be a mapping; got " + f"{type(connection_specification).__name__}." + ) + properties = connection_specification.setdefault("properties", {}) + if not isinstance(properties, dict): + raise ValueError( + "Expected 'spec.connection_specification.properties' to be a mapping; got " + f"{type(properties).__name__}." + ) + + existing = properties.get(path) + if isinstance(existing, dict): + existing["airbyte_hidden"] = True + else: + properties[path] = { + "type": "array", + "items": {"type": "string"}, + "title": "Check Streams Override", + "description": ( + "Internal override for the streams that should be exercised during " + "connection check. Set by the platform; not exposed in the standard UI." + ), + "airbyte_hidden": True, + } + def _migrate_manifest(self) -> None: """ This method is used to migrate the manifest. It should be called after the manifest has been validated. diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 82e345ed2..9a4ccd91a 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -343,6 +343,21 @@ definitions: type: array items: "$ref": "#/definitions/DynamicStreamCheckConfig" + config_check_streams_path: + title: Config Check Streams Path + description: |- + Optional dot-delimited path into the user's config that, when present, + overrides `stream_names` for the duration of the check. The referenced + config value must be a list of stream names. When empty or missing, the + manifest's `stream_names` is used. Setting this also injects a hidden + array-of-strings property at the top level of the connector spec + (single-level paths only in v1) so platforms that allow a connection + to override the check-time streams can plumb the value through without + exposing it in the standard config UI. + type: string + examples: + - "check_streams_override" + - "advanced.check_streams" DynamicStreamCheckConfig: type: object required: diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 8bd7b1146..9fb1483c0 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1609,6 +1609,12 @@ class CheckStream(BaseModel): title="Stream Names", ) dynamic_streams_check_configs: Optional[List[DynamicStreamCheckConfig]] = None + config_check_streams_path: Optional[str] = Field( + None, + description="Optional dot-delimited path into the user's config that, when present,\noverrides `stream_names` for the duration of the check. The referenced\nconfig value must be a list of stream names. When empty or missing, the\nmanifest's `stream_names` is used. Setting this also injects a hidden\narray-of-strings property at the top level of the connector spec\n(single-level paths only in v1) so platforms that allow a connection\nto override the check-time streams can plumb the value through without\nexposing it in the standard config UI.", + examples=["check_streams_override", "advanced.check_streams"], + title="Config Check Streams Path", + ) class IncrementingCountCursor(BaseModel): diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 8b806d2f5..5cfa901d0 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -1269,6 +1269,7 @@ def create_check_stream( return CheckStream( stream_names=model.stream_names or [], dynamic_streams_check_configs=dynamic_streams_check_configs, + config_check_streams_path=model.config_check_streams_path, parameters={}, ) diff --git a/unit_tests/sources/declarative/checks/test_check_stream.py b/unit_tests/sources/declarative/checks/test_check_stream.py index 5821ae02e..979dcaada 100644 --- a/unit_tests/sources/declarative/checks/test_check_stream.py +++ b/unit_tests/sources/declarative/checks/test_check_stream.py @@ -40,7 +40,17 @@ (True, None), ), ("test_fail_check", None, stream_names, {}, (True, None)), - ("test_try_to_check_invalid stream", record, ["invalid_stream_name"], {}, None), + ( + "test_try_to_check_invalid_stream", + record, + ["invalid_stream_name"], + {}, + ( + False, + "Stream(s) ['invalid_stream_name'] from manifest are not part of the catalog. " + "Expected one of ['s1'].", + ), + ), ], ) @pytest.mark.parametrize("slices_as_list", [True, False]) @@ -62,12 +72,8 @@ def test_check_stream_with_slices_as_list( check_stream = CheckStream(streams_to_check, parameters={}) - if expectation: - actual = check_stream.check_connection(source, logger, config) - assert actual == expectation - else: - with pytest.raises(ValueError): - check_stream.check_connection(source, logger, config) + actual = check_stream.check_connection(source, logger, config) + assert actual == expectation def mock_read_records(responses, default_response=None, **kwargs): @@ -168,6 +174,131 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp assert message in reason +def _make_available_stream(name: str) -> MagicMock: + stream = MagicMock(spec=Stream) + stream.name = name + stream.availability_strategy = None + stream.stream_slices.return_value = [{}] + stream.read_records.side_effect = mock_read_records({frozenset({}): iter([record])}) + return stream + + +@pytest.mark.parametrize( + "config_value, expected_streams_checked, expected_result", + [ + pytest.param( + None, + ["s1"], + (True, None), + id="path_set_but_config_missing_key_falls_back_to_manifest", + ), + pytest.param( + [], + ["s1"], + (True, None), + id="path_set_but_config_value_empty_falls_back_to_manifest", + ), + pytest.param( + ["s2"], + ["s2"], + (True, None), + id="path_set_with_valid_override_uses_config_value", + ), + ], +) +def test_check_stream_with_config_override_uses_expected_streams( + config_value, expected_streams_checked, expected_result +): + s1 = _make_available_stream("s1") + s2 = _make_available_stream("s2") + source = MagicMock() + source.streams.return_value = [s1, s2] + + check_stream = CheckStream( + stream_names=["s1"], + parameters={}, + config_check_streams_path="check_streams_override", + ) + + user_config = {"check_streams_override": config_value} if config_value is not None else {} + actual = check_stream.check_connection(source, logger, user_config) + assert actual == expected_result + + name_to_stream = {"s1": s1, "s2": s2} + for stream_name in expected_streams_checked: + name_to_stream[stream_name].stream_slices.assert_called() + for stream_name, stream in name_to_stream.items(): + if stream_name not in expected_streams_checked: + stream.stream_slices.assert_not_called() + + +def test_check_stream_with_unconfigured_path_keeps_original_behavior(): + s1 = _make_available_stream("s1") + source = MagicMock() + source.streams.return_value = [s1] + + check_stream = CheckStream(stream_names=["s1"], parameters={}) + actual = check_stream.check_connection(source, logger, {"unrelated": "value"}) + assert actual == (True, None) + + +def test_check_stream_with_config_override_unknown_stream_returns_message(): + s1 = _make_available_stream("s1") + source = MagicMock() + source.streams.return_value = [s1] + + check_stream = CheckStream( + stream_names=["s1"], + parameters={}, + config_check_streams_path="check_streams_override", + ) + ok, msg = check_stream.check_connection( + source, logger, {"check_streams_override": ["does_not_exist"]} + ) + assert ok is False + assert "does_not_exist" in msg + assert "config path 'check_streams_override'" in msg + assert "Expected one of" in msg + + +def test_check_stream_with_config_override_mixed_valid_and_unknown_returns_message(): + s1 = _make_available_stream("s1") + s2 = _make_available_stream("s2") + source = MagicMock() + source.streams.return_value = [s1, s2] + + check_stream = CheckStream( + stream_names=["s1"], + parameters={}, + config_check_streams_path="check_streams_override", + ) + ok, msg = check_stream.check_connection( + source, logger, {"check_streams_override": ["s2", "does_not_exist", "also_missing"]} + ) + assert ok is False + assert "does_not_exist" in msg + assert "also_missing" in msg + assert "config path 'check_streams_override'" in msg + + +def test_check_stream_with_config_override_non_list_returns_must_be_a_list_message(): + s1 = _make_available_stream("s1") + source = MagicMock() + source.streams.return_value = [s1] + + check_stream = CheckStream( + stream_names=["s1"], + parameters={}, + config_check_streams_path="check_streams_override", + ) + ok, msg = check_stream.check_connection( + source, logger, {"check_streams_override": "not-a-list"} + ) + assert ok is False + assert "must be a list" in msg + assert "check_streams_override" in msg + + _CONFIG = { "start_date": "2024-07-01T00:00:00.000Z", "custom_streams": [ @@ -528,7 +659,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp pytest.param( {"check": {"type": "CheckStream", "stream_names": ["non_existent_stream"]}}, Status.FAILED, - True, + False, 200, [], 0, diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index bf1f61610..811c8bb86 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -5518,3 +5518,255 @@ def test_get_partition_router(stream_factory, expected_type): assert isinstance(router, SubstreamPartitionRouter) elif expected_type == "GroupingPartitionRouter": assert isinstance(router, GroupingPartitionRouter) + + +_CONFIG_OVERRIDE_KEY = "check_streams_override" + + +def _build_two_stream_check_manifest( + check_block: Mapping[str, Any], spec_block: Optional[Mapping[str, Any]] = None +) -> Dict[str, Any]: + base_stream = { + "type": "DeclarativeStream", + "primary_key": "id", + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": {"id": {"type": "integer"}}, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.test.com", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "paginator": {"type": "NoPagination"}, + }, + } + s1 = deepcopy(base_stream) + s1["name"] = "s1" + s1["retriever"]["requester"]["path"] = "/s1" + s2 = deepcopy(base_stream) + s2["name"] = "s2" + s2["retriever"]["requester"]["path"] = "/s2" + + manifest: Dict[str, Any] = { + "version": "6.7.0", + "type": "DeclarativeSource", + "streams": [s1, s2], + "check": dict(check_block), + } + if spec_block is not None: + manifest["spec"] = dict(spec_block) + return manifest + + +def test_check_stream_config_override_path_injects_hidden_spec_property(): + manifest = _build_two_stream_check_manifest( + { + "type": "CheckStream", + "stream_names": ["s1"], + "config_check_streams_path": _CONFIG_OVERRIDE_KEY, + }, + { + "type": "Spec", + "connection_specification": { + "title": "Test Spec", + "type": "object", + "required": ["api_key"], + "properties": { + "api_key": { + "type": "string", + "airbyte_secret": True, + "title": "API Key", + } + }, + }, + }, + ) + + source = ConcurrentDeclarativeSource( + source_config=manifest, config={}, catalog=None, state=None + ) + spec = source.spec(logger) + + properties = spec.connectionSpecification["properties"] + assert _CONFIG_OVERRIDE_KEY in properties + injected = properties[_CONFIG_OVERRIDE_KEY] + assert injected["airbyte_hidden"] is True + assert injected["type"] == "array" + assert injected["items"] == {"type": "string"} + assert _CONFIG_OVERRIDE_KEY not in spec.connectionSpecification.get("required", []) + + +def test_check_stream_config_override_path_injects_into_spec_with_additional_properties_false(): + manifest = _build_two_stream_check_manifest( + { + "type": "CheckStream", + "stream_names": ["s1"], + "config_check_streams_path": _CONFIG_OVERRIDE_KEY, + }, + { + "type": "Spec", + "connection_specification": { + "title": "Test Spec", + "type": "object", + "additionalProperties": False, + "required": ["api_key"], + "properties": { + "api_key": { + "type": "string", + "airbyte_secret": True, + "title": "API Key", + } + }, + }, + }, + ) + + source = ConcurrentDeclarativeSource( + source_config=manifest, config={}, catalog=None, state=None + ) + spec = source.spec(logger) + + properties = spec.connectionSpecification["properties"] + assert spec.connectionSpecification["additionalProperties"] is False + assert _CONFIG_OVERRIDE_KEY in properties + assert properties[_CONFIG_OVERRIDE_KEY]["airbyte_hidden"] is True + + +def test_check_stream_config_override_path_preserves_pre_declared_property_and_forces_hidden(): + pre_declared = { + "type": "array", + "items": {"type": "string"}, + "title": "Author-defined override", + "description": "manifest-author description", + } + manifest = _build_two_stream_check_manifest( + { + "type": "CheckStream", + "stream_names": ["s1"], + "config_check_streams_path": _CONFIG_OVERRIDE_KEY, + }, + { + "type": "Spec", + "connection_specification": { + "type": "object", + "properties": {_CONFIG_OVERRIDE_KEY: pre_declared}, + }, + }, + ) + + source = ConcurrentDeclarativeSource( + source_config=manifest, config={}, catalog=None, state=None + ) + spec = source.spec(logger) + + injected = spec.connectionSpecification["properties"][_CONFIG_OVERRIDE_KEY] + assert injected["airbyte_hidden"] is True + assert injected["title"] == "Author-defined override" + assert injected["description"] == "manifest-author description" + + +def test_check_stream_config_override_path_with_dotted_path_raises(): + manifest = _build_two_stream_check_manifest( + { + "type": "CheckStream", + "stream_names": ["s1"], + "config_check_streams_path": "advanced.check_streams", + } + ) + + with pytest.raises(ValueError, match="single-level paths"): + ConcurrentDeclarativeSource(source_config=manifest, config={}, catalog=None, state=None) + + +def test_check_stream_config_override_uses_config_value_to_check_alternate_stream(): + manifest = _build_two_stream_check_manifest( + { + "type": "CheckStream", + "stream_names": ["s1"], + "config_check_streams_path": _CONFIG_OVERRIDE_KEY, + } + ) + + config = {_CONFIG_OVERRIDE_KEY: ["s2"]} + + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest(url="https://api.test.com/s1"), + HttpResponse(body=json.dumps([]), status_code=500), + ) + http_mocker.get( + HttpRequest(url="https://api.test.com/s2"), + HttpResponse(body=json.dumps([{"id": 1}]), status_code=200), + ) + source = ConcurrentDeclarativeSource( + source_config=manifest, config=config, catalog=None, state=None + ) + connection_status = source.check(logger, config) + + assert connection_status.status == Status.SUCCEEDED + + +def test_check_stream_config_override_unknown_stream_returns_failed_status_without_raising(): + manifest = _build_two_stream_check_manifest( + { + "type": "CheckStream", + "stream_names": ["s1"], + "config_check_streams_path": _CONFIG_OVERRIDE_KEY, + } + ) + + config = {_CONFIG_OVERRIDE_KEY: ["does_not_exist"]} + + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest(url="https://api.test.com/s1"), + HttpResponse(body=json.dumps([{"id": 1}]), status_code=200), + ) + http_mocker.get( + HttpRequest(url="https://api.test.com/s2"), + HttpResponse(body=json.dumps([{"id": 1}]), status_code=200), + ) + source = ConcurrentDeclarativeSource( + source_config=manifest, config=config, catalog=None, state=None + ) + connection_status = source.check(logger, config) + + assert connection_status.status == Status.FAILED + assert "does_not_exist" in (connection_status.message or "") + + +def test_check_stream_config_override_falls_back_to_manifest_when_config_missing(): + manifest = _build_two_stream_check_manifest( + { + "type": "CheckStream", + "stream_names": ["s1"], + "config_check_streams_path": _CONFIG_OVERRIDE_KEY, + } + ) + + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest(url="https://api.test.com/s1"), + HttpResponse(body=json.dumps([{"id": 1}]), status_code=200), + ) + http_mocker.get( + HttpRequest(url="https://api.test.com/s2"), + HttpResponse(body=json.dumps([]), status_code=500), + ) + source = ConcurrentDeclarativeSource( + source_config=manifest, config={}, catalog=None, state=None + ) + connection_status = source.check(logger, {}) + + assert connection_status.status == Status.SUCCEEDED