diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index aefe01364..926e4c70a 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -1859,7 +1859,63 @@ def create_custom_component(self, model: Any, config: Config, **kwargs: Any) -> for class_field in component_fields.keys() if class_field in model_args } - return custom_component_class(**kwargs) + + # Propagate the top-level api_budget to custom components that are subclasses of + # HttpRequester (and therefore accept an `api_budget` field), unless the manifest + # or an explicit kwarg has already provided one. Without this, custom requesters + # silently lose the connector-level HTTPAPIBudget and any configured rate-limit + # policies have no effect at runtime. + injected_api_budget = False + if ( + self._api_budget is not None + and "api_budget" in component_fields + and kwargs.get("api_budget") is None + and isinstance(custom_component_class, type) + and issubclass(custom_component_class, HttpRequester) + ): + kwargs["api_budget"] = self._api_budget + injected_api_budget = True + + custom_component = custom_component_class(**kwargs) + if injected_api_budget and isinstance(custom_component, HttpRequester): + self._sync_injected_api_budget_with_http_client(custom_component) + + return custom_component + + @staticmethod + def _sync_injected_api_budget_with_http_client(custom_requester: HttpRequester) -> None: + """Align an injected `api_budget` with the active `HttpClient` on custom requesters. + + Custom requesters can replace `_http_client` in `__post_init__` without forwarding + `api_budget`. If the factory injected a manifest-level budget and the replacement + client kept the default empty `APIBudget`, point both the client and its underlying + `LimiterSession`/`CachedLimiterSession` at the injected budget so rate-limiting is + actually enforced at request time. Non-`APIBudget` implementations (custom + `AbstractAPIBudget` subclasses) are left untouched. + """ + http_client = getattr(custom_requester, "_http_client", None) + http_client_api_budget = getattr(http_client, "_api_budget", None) + injected_api_budget = custom_requester.api_budget + + if ( + http_client is None + or http_client_api_budget is None + or injected_api_budget is None + or http_client_api_budget is injected_api_budget + ): + return + + if ( + isinstance(http_client_api_budget, APIBudget) + and len(http_client_api_budget._policies) == 0 + ): + http_client._api_budget = injected_api_budget + http_client_session = getattr(http_client, "_session", None) + if ( + http_client_session is not None + and getattr(http_client_session, "_api_budget", None) is http_client_api_budget + ): + http_client_session._api_budget = injected_api_budget @staticmethod def _get_class_from_fully_qualified_class_name( diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index add9a1c42..32ab11bf3 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -70,6 +70,7 @@ from airbyte_cdk.sources.declarative.models import ( CustomRecordExtractor as CustomRecordExtractorModel, ) +from airbyte_cdk.sources.declarative.models import CustomRequester as CustomRequesterModel from airbyte_cdk.sources.declarative.models import CustomSchemaLoader as CustomSchemaLoaderModel from airbyte_cdk.sources.declarative.models import DatetimeBasedCursor as DatetimeBasedCursorModel from airbyte_cdk.sources.declarative.models import DeclarativeStream as DeclarativeStreamModel @@ -4312,6 +4313,202 @@ def test_api_budget_fixed_window_policy(): assert matcher._url_path_pattern.pattern == "/v2/data" +def test_api_budget_propagated_to_custom_requester_subclass_of_http_requester(): + """Top-level `api_budget` must be forwarded to custom components that subclass `HttpRequester`. + + Without this propagation, connectors using a `CustomRequester` (i.e., a Python subclass of + `HttpRequester`) silently lose the manifest-level rate-limit policies because + `create_custom_component` does not forward `self._api_budget` the way + `create_http_requester` does. See airbytehq/oncall#12011 for the reproducer. + """ + manifest_api_budget = { + "type": "HTTPAPIBudget", + "policies": [ + { + "type": "MovingWindowCallRatePolicy", + "rates": [ + { + "type": "Rate", + "limit": 60, + "interval": "PT1M", + } + ], + "matchers": [], + } + ], + } + + custom_requester_definition = { + "type": "CustomRequester", + "class_name": "unit_tests.sources.declarative.parsers.testing_components.TestingRequester", + "url_base": "https://example.org", + "path": "/v1/data", + "http_method": "GET", + } + + config: Mapping[str, Any] = {} + local_factory = ModelToComponentFactory() + local_factory.set_api_budget(manifest_api_budget, config) + + custom_requester = local_factory.create_component( + model_type=CustomRequesterModel, + component_definition=custom_requester_definition, + config=config, + name="custom_stream", + ) + + assert isinstance(custom_requester, HttpRequester) + assert custom_requester.api_budget is not None, ( + "Manifest-level api_budget was not propagated to the CustomRequester instance" + ) + assert len(custom_requester.api_budget._policies) == 1 + policy = custom_requester.api_budget._policies[0] + assert isinstance(policy, MovingWindowCallRatePolicy) + # Verify the underlying HttpClient AND its LimiterSession both received the same + # budget: rate-limiting is enforced on the session at send() time, so asserting only + # on the client field is insufficient to prove the policies are actually active. + assert custom_requester._http_client._api_budget is custom_requester.api_budget + assert custom_requester._http_client._session._api_budget is custom_requester.api_budget + + +def test_api_budget_propagated_to_custom_requester_that_replaces_http_client(): + """Injected api budgets must survive requesters that replace `_http_client` in `__post_init__`. + + Some connector requesters call `super().__post_init__()` and then swap in a custom `HttpClient` + implementation without forwarding `api_budget`. The budget should still be applied to the active + client so manifest-level rate limits continue to work. + """ + manifest_api_budget = { + "type": "HTTPAPIBudget", + "policies": [ + { + "type": "MovingWindowCallRatePolicy", + "rates": [ + { + "type": "Rate", + "limit": 60, + "interval": "PT1M", + } + ], + "matchers": [], + } + ], + } + + custom_requester_definition = { + "type": "CustomRequester", + "class_name": "unit_tests.sources.declarative.parsers.testing_components.TestingRequesterWithReplacedHttpClient", + "url_base": "https://example.org", + "path": "/v1/data", + "http_method": "GET", + } + + config: Mapping[str, Any] = {} + local_factory = ModelToComponentFactory() + local_factory.set_api_budget(manifest_api_budget, config) + + custom_requester = local_factory.create_component( + model_type=CustomRequesterModel, + component_definition=custom_requester_definition, + config=config, + name="custom_stream", + ) + + assert isinstance(custom_requester, HttpRequester) + assert custom_requester.api_budget is not None + assert custom_requester._http_client._api_budget is custom_requester.api_budget + # The LimiterSession holds its own reference to the budget (captured at client + # construction time) and is what actually enforces rate limits on send(). Assert + # it was synced too, otherwise the injected budget is effectively inert. + assert custom_requester._http_client._session._api_budget is custom_requester.api_budget + + +def test_api_budget_not_overwriting_non_empty_budget_on_replaced_http_client(): + """A requester that intentionally installs its own budget should keep it.""" + manifest_api_budget = { + "type": "HTTPAPIBudget", + "policies": [ + { + "type": "MovingWindowCallRatePolicy", + "rates": [ + { + "type": "Rate", + "limit": 60, + "interval": "PT1M", + } + ], + "matchers": [], + } + ], + } + + custom_requester_definition = { + "type": "CustomRequester", + "class_name": "unit_tests.sources.declarative.parsers.testing_components.TestingRequesterWithReplacedHttpClientAndOwnBudget", + "url_base": "https://example.org", + "path": "/v1/data", + "http_method": "GET", + } + + config: Mapping[str, Any] = {} + local_factory = ModelToComponentFactory() + local_factory.set_api_budget(manifest_api_budget, config) + + custom_requester = local_factory.create_component( + model_type=CustomRequesterModel, + component_definition=custom_requester_definition, + config=config, + name="custom_stream", + ) + + assert isinstance(custom_requester, HttpRequester) + assert custom_requester.api_budget is not None + assert custom_requester._http_client._api_budget is not custom_requester.api_budget + assert len(custom_requester._http_client._api_budget._policies) == 1 + # The client's own budget must remain wired into its LimiterSession as well, so + # the sync step never silently swaps an intentionally-installed budget out from + # under the active session. + assert ( + custom_requester._http_client._session._api_budget + is custom_requester._http_client._api_budget + ) + + +def test_api_budget_not_propagated_to_non_http_requester_custom_components(): + """Custom components that do NOT subclass `HttpRequester` must not receive `api_budget`. + + This guards against accidentally injecting an `api_budget` kwarg into arbitrary custom + components (e.g., custom error handlers, partition routers) whose constructors would + reject the unexpected keyword. + """ + manifest_api_budget = { + "type": "HTTPAPIBudget", + "policies": [ + { + "type": "MovingWindowCallRatePolicy", + "rates": [{"type": "Rate", "limit": 1, "interval": "PT60S"}], + "matchers": [], + } + ], + } + + custom_error_handler_definition = { + "type": "CustomErrorHandler", + "class_name": "unit_tests.sources.declarative.parsers.testing_components.TestingSomeComponent", + "basic_field": "expected", + } + + config: Mapping[str, Any] = {} + local_factory = ModelToComponentFactory() + local_factory.set_api_budget(manifest_api_budget, config) + + # Must not raise TypeError about an unexpected "api_budget" kwarg. + custom_component = local_factory.create_component( + CustomErrorHandlerModel, custom_error_handler_definition, config + ) + assert custom_component.basic_field == "expected" + + def test_create_grouping_partition_router_with_underlying_router(): content = """ schema_loader: diff --git a/unit_tests/sources/declarative/parsers/testing_components.py b/unit_tests/sources/declarative/parsers/testing_components.py index 6269acb8e..eefd90936 100644 --- a/unit_tests/sources/declarative/parsers/testing_components.py +++ b/unit_tests/sources/declarative/parsers/testing_components.py @@ -3,6 +3,7 @@ # from dataclasses import dataclass, field +from datetime import timedelta from typing import Any, ClassVar, List, Mapping, Optional from airbyte_cdk.sources.declarative.extractors import DpathExtractor @@ -21,6 +22,8 @@ RequestInput, ) from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever +from airbyte_cdk.sources.streams.call_rate import APIBudget, MovingWindowCallRatePolicy, Rate +from airbyte_cdk.sources.streams.http import HttpClient @dataclass @@ -114,3 +117,43 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: parameters=parameters or {}, ) super().__post_init__(parameters) + + +@dataclass +class TestingRequesterWithReplacedHttpClient(HttpRequester): + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + super().__post_init__(parameters) + self._http_client = HttpClient( + name=self.name, + logger=self.logger, + error_handler=self.error_handler, + authenticator=self._authenticator, + use_cache=self.use_cache, + backoff_strategy=None, + disable_retries=self.disable_retries, + message_repository=self.message_repository, + ) + + +@dataclass +class TestingRequesterWithReplacedHttpClientAndOwnBudget(HttpRequester): + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + super().__post_init__(parameters) + self._http_client = HttpClient( + name=self.name, + logger=self.logger, + error_handler=self.error_handler, + api_budget=APIBudget( + policies=[ + MovingWindowCallRatePolicy( + rates=[Rate(limit=1, interval=timedelta(seconds=30))], + matchers=[], + ) + ] + ), + authenticator=self._authenticator, + use_cache=self.use_cache, + backoff_strategy=None, + disable_retries=self.disable_retries, + message_repository=self.message_repository, + )