diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 6fbda8f00..6df8092be 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2628,6 +2628,40 @@ definitions: type: type: string enum: [JsonDecoder] + JsonItemsDecoder: + title: JSON Items (Streaming) + description: >- + Select 'JSON Items (Streaming)' to stream-decode a single JSON document + by yielding each element of a nested array, one at a time. Use this for + very large single-document JSON responses (e.g. a wrapping object + containing a multi-GB array) where buffering the whole document into + memory would cause out-of-memory errors. Powered by the `ijson` + streaming parser. + type: object + required: + - type + - items_path + properties: + type: + type: string + enum: [JsonItemsDecoder] + items_path: + title: Items Path + description: >- + Dot-separated path to the JSON array whose elements should be + yielded as records. Uses `ijson` path syntax (e.g. `data.users`), + not JSONPath syntax — do not include leading `$.` or trailing + `[*]`. + type: string + examples: + - dataByDepartmentAndSearchTerm + - dataByAsin + - data.users + encoding: + title: Encoding + description: Text encoding used to decode the streamed bytes before JSON parsing. + type: string + default: utf-8 JsonlDecoder: title: JSON Lines description: Select 'JSON Lines' if the response consists of JSON objects separated by new lines ('\n') in JSONL format. @@ -2869,6 +2903,7 @@ definitions: - "$ref": "#/definitions/CsvDecoder" - "$ref": "#/definitions/GzipDecoder" - "$ref": "#/definitions/JsonDecoder" + - "$ref": "#/definitions/JsonItemsDecoder" - "$ref": "#/definitions/JsonlDecoder" ListPartitionRouter: title: List Partition Router @@ -3909,6 +3944,7 @@ definitions: description: Component decoding the response so records can be extracted. anyOf: - "$ref": "#/definitions/JsonDecoder" + - "$ref": "#/definitions/JsonItemsDecoder" - "$ref": "#/definitions/XmlDecoder" - "$ref": "#/definitions/CsvDecoder" - "$ref": "#/definitions/JsonlDecoder" @@ -3997,6 +4033,7 @@ definitions: - "$ref": "#/definitions/CsvDecoder" - "$ref": "#/definitions/GzipDecoder" - "$ref": "#/definitions/JsonDecoder" + - "$ref": "#/definitions/JsonItemsDecoder" - "$ref": "#/definitions/JsonlDecoder" CsvDecoder: title: CSV @@ -4163,6 +4200,7 @@ definitions: - "$ref": "#/definitions/CsvDecoder" - "$ref": "#/definitions/GzipDecoder" - "$ref": "#/definitions/JsonDecoder" + - "$ref": "#/definitions/JsonItemsDecoder" - "$ref": "#/definitions/JsonlDecoder" - "$ref": "#/definitions/IterableDecoder" - "$ref": "#/definitions/XmlDecoder" @@ -4175,6 +4213,7 @@ definitions: - "$ref": "#/definitions/CsvDecoder" - "$ref": "#/definitions/GzipDecoder" - "$ref": "#/definitions/JsonDecoder" + - "$ref": "#/definitions/JsonItemsDecoder" - "$ref": "#/definitions/JsonlDecoder" - "$ref": "#/definitions/IterableDecoder" - "$ref": "#/definitions/XmlDecoder" diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 7acb5c1e2..c70fd6582 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -11,6 +11,7 @@ from io import BufferedIOBase, TextIOWrapper from typing import Any, List, Optional +import ijson import orjson import requests @@ -98,6 +99,33 @@ def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE: logger.warning(f"Cannot decode/parse line {line!r} as JSON, error: {e}") +@dataclass +class JsonItemsParser(Parser): + """Streaming JSON parser that yields each element of a nested array. + + Use this for very large single-document JSON responses where the records + of interest live under a nested array (e.g. `dataByDepartmentAndSearchTerm`, + `data.users`). Powered by `ijson`, this parser does not materialize the + full document — peak memory is bounded by a single record plus ijson's + internal parse buffers, regardless of document size. + + `items_path` uses `ijson` dotted path syntax (e.g. `data.users`), not + JSONPath syntax (`$.data.users[*]`). Internally we append `.item`, which + is the `ijson` convention for "iterate elements of this array". + """ + + items_path: str = "" + encoding: Optional[str] = "utf-8" + + def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE: + if not self.items_path: + raise ValueError("JsonItemsParser requires a non-empty items_path.") + # ijson auto-selects the best available backend (yajl2_c when present) + # and reads from `data` lazily — it does not call `.read()` on the + # whole stream up front. + yield from ijson.items(data, f"{self.items_path}.item") + + @dataclass class CsvParser(Parser): # TODO: migrate implementation to re-use file-base classes diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 931cef7f1..2a7e9fabb 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -660,6 +660,21 @@ class JsonDecoder(BaseModel): type: Literal["JsonDecoder"] +class JsonItemsDecoder(BaseModel): + type: Literal["JsonItemsDecoder"] + items_path: str = Field( + ..., + description="Dot-separated path to the JSON array whose elements should be yielded as records. Uses `ijson` path syntax (e.g. `data.users`), not JSONPath syntax — do not include leading `$.` or trailing `[*]`.", + examples=["dataByDepartmentAndSearchTerm", "dataByAsin", "data.users"], + title="Items Path", + ) + encoding: Optional[str] = Field( + "utf-8", + description="Text encoding used to decode the streamed bytes before JSON parsing.", + title="Encoding", + ) + + class JsonlDecoder(BaseModel): type: Literal["JsonlDecoder"] diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index bc9b4f82c..29f0ce10f 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -97,6 +97,7 @@ CompositeRawDecoder, CsvParser, GzipParser, + JsonItemsParser, JsonLineParser, JsonParser, Parser, @@ -321,6 +322,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( JsonFileSchemaLoader as JsonFileSchemaLoaderModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + JsonItemsDecoder as JsonItemsDecoderModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( JsonlDecoder as JsonlDecoderModel, ) @@ -763,6 +767,7 @@ def _init_mappings(self) -> None: HttpResponseFilterModel: self.create_http_response_filter, InlineSchemaLoaderModel: self.create_inline_schema_loader, JsonDecoderModel: self.create_json_decoder, + JsonItemsDecoderModel: self.create_json_items_decoder, JsonlDecoderModel: self.create_jsonl_decoder, JsonSchemaPropertySelectorModel: self.create_json_schema_property_selector, GzipDecoderModel: self.create_gzip_decoder, @@ -2656,6 +2661,14 @@ def create_jsonl_decoder( stream_response=False if self._emit_connector_builder_messages else True, ) + def create_json_items_decoder( + self, model: JsonItemsDecoderModel, config: Config, **kwargs: Any + ) -> Decoder: + return CompositeRawDecoder( + parser=ModelToComponentFactory._get_parser(model, config), + stream_response=False if self._emit_connector_builder_messages else True, + ) + def create_gzip_decoder( self, model: GzipDecoderModel, config: Config, **kwargs: Any ) -> Decoder: @@ -2704,6 +2717,11 @@ def _get_parser(model: BaseModel, config: Config) -> Parser: if isinstance(model, JsonDecoderModel): # Note that the logic is a bit different from the JsonDecoder as there is some legacy that is maintained to return {} on error cases return JsonParser() + elif isinstance(model, JsonItemsDecoderModel): + return JsonItemsParser( + items_path=model.items_path, + encoding=model.encoding or "utf-8", + ) elif isinstance(model, JsonlDecoderModel): return JsonLineParser() elif isinstance(model, CsvDecoderModel): diff --git a/poetry.lock b/poetry.lock index 55c2fa668..2016c77d9 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2170,6 +2170,112 @@ files = [ [package.extras] all = ["flake8 (>=7.1.1)", "mypy (>=1.11.2)", "pytest (>=8.3.2)", "ruff (>=0.6.2)"] +[[package]] +name = "ijson" +version = "3.5.0" +description = "Iterative JSON parser with standard Python iterator interfaces" +optional = false +python-versions = ">=3.9" +groups = ["main"] +markers = "python_version <= \"3.11\" or python_version >= \"3.12\"" +files = [ + {file = "ijson-3.5.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:ea8dcac10d86adaeead454bc25c97b68d0bda573d5fd6f86f5e21cf8f7906f88"}, + {file = "ijson-3.5.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:92b0495bbb2150bbf14fc5d98fb6d76bcd1c526605a172709e602e6fedc96495"}, + {file = "ijson-3.5.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:7af0c4c8943be8b09a4e57bdc1da6001dae7b36526d4154fe5c8224738d0921f"}, + {file = "ijson-3.5.0-cp310-cp310-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:45887d5e84ff0d2b138c926cebd9071830733968afe8d9d12080b3c178c7f918"}, + {file = "ijson-3.5.0-cp310-cp310-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:9a70b575be8e57a28c80e90ed349ad3a851c3478524c70e36e07d6092ecd12c9"}, + {file = "ijson-3.5.0-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:2adeecd45830bfd5580ca79a584154713aabef0b9607e16249133df5d2859813"}, + {file = "ijson-3.5.0-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:d873e72889e7fc5962ab58909f1adff338d7c2f49e450e5b5fe844eff8155a14"}, + {file = "ijson-3.5.0-cp310-cp310-musllinux_1_2_i686.whl", hash = "sha256:9a88c559456a79708592234d697645d92b599718f4cbbeaa6515f83ac63ca0ae"}, + {file = "ijson-3.5.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:cf83f58ad50dc0d39a2105cb26d4f359b38f42cef68b913170d4d47d97d97ba5"}, + {file = "ijson-3.5.0-cp310-cp310-win32.whl", hash = "sha256:aec4580a7712a19b1f95cd41bed260fc6a31266d37ef941827772a4c199e8143"}, + {file = "ijson-3.5.0-cp310-cp310-win_amd64.whl", hash = "sha256:9a9c4c70501e23e8eb1675330686d1598eebfa14b6f0dbc8f00c2e081cc628fa"}, + {file = "ijson-3.5.0-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:5616311404b858d32740b7ad8b9a799c62165f5ecb85d0a8ed16c21665a90533"}, + {file = "ijson-3.5.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:e9733f94029dd41702d573ef64752e2556e72aea14623d6dbb7a44ca1ccf30fd"}, + {file = "ijson-3.5.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:db8398c6721b98412a4f618da8022550c8b9c5d9214040646071b5deb4d4a393"}, + {file = "ijson-3.5.0-cp311-cp311-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:c061314845c08163b1784b6076ea5f075372461a32e6916f4e5f211fd4130b64"}, + {file = "ijson-3.5.0-cp311-cp311-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:1111a1c5ac79119c5d6e836f900c1a53844b50a18af38311baa6bb61e2645aca"}, + {file = "ijson-3.5.0-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:1e74aff8c681c24002b61b1822f9511d4c384f324f7dbc08c78538e01fdc9fcb"}, + {file = "ijson-3.5.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:739a7229b1b0cc5f7e2785a6e7a5fc915e850d3fed9588d0e89a09f88a417253"}, + {file = "ijson-3.5.0-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:ef88712160360cab3ca6471a4e5418243f8b267cf1fe1620879d1b5558babc71"}, + {file = "ijson-3.5.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:6ca0d1b6b5f8166a6248f4309497585fb8553b04bc8179a0260fad636cfdb798"}, + {file = "ijson-3.5.0-cp311-cp311-win32.whl", hash = "sha256:966039cf9047c7967febf7b9a52ec6f38f5464a4c7fbb5565e0224b7376fefff"}, + {file = "ijson-3.5.0-cp311-cp311-win_amd64.whl", hash = "sha256:6bad6a1634cb7c9f3f4c7e52325283b35b565f5b6cc27d42660c6912ce883422"}, + {file = "ijson-3.5.0-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:1ebefbe149a6106cc848a3eaf536af51a9b5ccc9082de801389f152dba6ab755"}, + {file = "ijson-3.5.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:19e30d9f00f82e64de689c0b8651b9cfed879c184b139d7e1ea5030cec401c21"}, + {file = "ijson-3.5.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:a04a33ee78a6f27b9b8528c1ca3c207b1df3b8b867a4cf2fcc4109986f35c227"}, + {file = "ijson-3.5.0-cp312-cp312-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:7d48dc2984af02eb3c56edfb3f13b3f62f2f3e4fe36f058c8cfc75d93adf4fed"}, + {file = "ijson-3.5.0-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:f1e73a44844d9adbca9cf2c4132cd875933e83f3d4b23881fcaf82be83644c7d"}, + {file = "ijson-3.5.0-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:7389a56b8562a19948bdf1d7bae3a2edc8c7f86fb59834dcb1c4c722818e645a"}, + {file = "ijson-3.5.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:3176f23f8ebec83f374ed0c3b4e5a0c4db7ede54c005864efebbed46da123608"}, + {file = "ijson-3.5.0-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:6babd88e508630c6ef86c9bebaaf13bb2fb8ec1d8f8868773a03c20253f599bc"}, + {file = "ijson-3.5.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:dc1b3836b174b6db2fa8319f1926fb5445abd195dc963368092103f8579cb8ed"}, + {file = "ijson-3.5.0-cp312-cp312-win32.whl", hash = "sha256:6673de9395fb9893c1c79a43becd8c8fbee0a250be6ea324bfd1487bb5e9ee4c"}, + {file = "ijson-3.5.0-cp312-cp312-win_amd64.whl", hash = "sha256:f4f7fabd653459dcb004175235f310435959b1bb5dfa8878578391c6cc9ad944"}, + {file = "ijson-3.5.0-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:e9cedc10e40dd6023c351ed8bfc7dcfce58204f15c321c3c1546b9c7b12562a4"}, + {file = "ijson-3.5.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:3647649f782ee06c97490b43680371186651f3f69bebe64c6083ee7615d185e5"}, + {file = "ijson-3.5.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:90e74be1dce05fce73451c62d1118671f78f47c9f6be3991c82b91063bf01fc9"}, + {file = "ijson-3.5.0-cp313-cp313-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:78e9ad73e7be2dd80627504bd5cbf512348c55ce2c06e362ed7683b5220e8568"}, + {file = "ijson-3.5.0-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:9577449313cc94be89a4fe4b3e716c65f09cc19636d5a6b2861c4e80dddebd58"}, + {file = "ijson-3.5.0-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:3e4c1178fb50aff5f5701a30a5152ead82a14e189ce0f6102fa1b5f10b2f54ff"}, + {file = "ijson-3.5.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:0eb402ab026ffb37a918d75af2b7260fe6cfbce13232cc83728a714dd30bd81d"}, + {file = "ijson-3.5.0-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:5b08ee08355f9f729612a8eb9bf69cc14f9310c3b2a487c6f1c3c65d85216ec4"}, + {file = "ijson-3.5.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:bda62b6d48442903e7bf56152108afb7f0f1293c2b9bef2f2c369defea76ab18"}, + {file = "ijson-3.5.0-cp313-cp313-win32.whl", hash = "sha256:8d073d9b13574cfa11083cc7267c238b7a6ed563c2661e79192da4a25f09c82c"}, + {file = "ijson-3.5.0-cp313-cp313-win_amd64.whl", hash = "sha256:2419f9e32e0968a876b04d8f26aeac042abd16f582810b576936bbc4c6015069"}, + {file = "ijson-3.5.0-cp313-cp313t-macosx_10_13_universal2.whl", hash = "sha256:4d4b0cd676b8c842f7648c1a783448fac5cd3b98289abd83711b3e275e143524"}, + {file = "ijson-3.5.0-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:252dec3680a48bb82d475e36b4ae1b3a9d7eb690b951bb98a76c5fe519e30188"}, + {file = "ijson-3.5.0-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:aa1b5dca97d323931fde2501172337384c958914d81a9dac7f00f0d4bfc76bc7"}, + {file = "ijson-3.5.0-cp313-cp313t-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:7a5ec7fd86d606094bba6f6f8f87494897102fa4584ef653f3005c51a784c320"}, + {file = "ijson-3.5.0-cp313-cp313t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:009f41443e1521847701c6d87fa3923c0b1961be3c7e7de90947c8cb92ea7c44"}, + {file = "ijson-3.5.0-cp313-cp313t-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:e4c3651d1f9fe2839a93fdf8fd1d5ca3a54975349894249f3b1b572bcc4bd577"}, + {file = "ijson-3.5.0-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:945b7abcfcfeae2cde17d8d900870f03536494245dda7ad4f8d056faa303256c"}, + {file = "ijson-3.5.0-cp313-cp313t-musllinux_1_2_i686.whl", hash = "sha256:0574b0a841ff97495c13e9d7260fbf3d85358b061f540c52a123db9dbbaa2ed6"}, + {file = "ijson-3.5.0-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:f969ffb2b89c5cdf686652d7fb66252bc72126fa54d416317411497276056a18"}, + {file = "ijson-3.5.0-cp313-cp313t-win32.whl", hash = "sha256:59d3f9f46deed1332ad669518b8099920512a78bda64c1f021fcd2aff2b36693"}, + {file = "ijson-3.5.0-cp313-cp313t-win_amd64.whl", hash = "sha256:5c2839fa233746d8aad3b8cd2354e441613f5df66d721d59da4a09394bd1db2b"}, + {file = "ijson-3.5.0-cp314-cp314-macosx_10_15_universal2.whl", hash = "sha256:25a5a6b2045c90bb83061df27cfa43572afa43ba9408611d7bfe237c20a731a9"}, + {file = "ijson-3.5.0-cp314-cp314-macosx_10_15_x86_64.whl", hash = "sha256:8976c54c0b864bc82b951bae06567566ac77ef63b90a773a69cd73aab47f4f4f"}, + {file = "ijson-3.5.0-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:859eb2038f7f1b0664df4241957694cc35e6295992d71c98659b22c69b3cbc10"}, + {file = "ijson-3.5.0-cp314-cp314-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:c911aa02991c7c0d3639b6619b93a93210ff1e7f58bf7225d613abea10adc78e"}, + {file = "ijson-3.5.0-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:903cbdc350173605220edc19796fbea9b2203c8b3951fb7335abfa8ed37afda8"}, + {file = "ijson-3.5.0-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:a4549d96ded5b8efa71639b2160235415f6bdb8c83367615e2dbabcb72755c33"}, + {file = "ijson-3.5.0-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:6b2dcf6349e6042d83f3f8c39ce84823cf7577eba25bac5aae5e39bbbbbe9c1c"}, + {file = "ijson-3.5.0-cp314-cp314-musllinux_1_2_i686.whl", hash = "sha256:e44af39e6f8a17e5627dcd89715d8279bf3474153ff99aae031a936e5c5572e5"}, + {file = "ijson-3.5.0-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:9260332304b7e7828db56d43f08fc970a3ab741bf84ff10189361ea1b60c395b"}, + {file = "ijson-3.5.0-cp314-cp314-win32.whl", hash = "sha256:63bc8121bb422f6969ced270173a3fa692c29d4ae30c860a2309941abd81012a"}, + {file = "ijson-3.5.0-cp314-cp314-win_amd64.whl", hash = "sha256:01b6dad72b7b7df225ef970d334556dfad46c696a2c6767fb5d9ed8889728bca"}, + {file = "ijson-3.5.0-cp314-cp314t-macosx_10_15_universal2.whl", hash = "sha256:2ea4b676ec98e374c1df400a47929859e4fa1239274339024df4716e802aa7e4"}, + {file = "ijson-3.5.0-cp314-cp314t-macosx_10_15_x86_64.whl", hash = "sha256:014586eec043e23c80be9a923c56c3a0920a0f1f7d17478ce7bc20ba443968ef"}, + {file = "ijson-3.5.0-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:d5b8b886b0248652d437f66e7c5ac318bbdcb2c7137a7e5327a68ca00b286f5f"}, + {file = "ijson-3.5.0-cp314-cp314t-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:498fd46ae2349297e43acf97cdc421e711dbd7198418677259393d2acdc62d78"}, + {file = "ijson-3.5.0-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:22a51b4f9b81f12793731cf226266d1de2112c3c04ba4a04117ad4e466897e05"}, + {file = "ijson-3.5.0-cp314-cp314t-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:9636c710dc4ac4a281baa266a64f323b4cc165cec26836af702c44328b59a515"}, + {file = "ijson-3.5.0-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:f7168a39e8211107666d71b25693fd1b2bac0b33735ef744114c403c6cac21e1"}, + {file = "ijson-3.5.0-cp314-cp314t-musllinux_1_2_i686.whl", hash = "sha256:8696454245415bc617ab03b0dc3ae4c86987df5dc6a90bad378fe72c5409d89e"}, + {file = "ijson-3.5.0-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:c21bfb61f71f191565885bf1bc29e0a186292d866b4880637b833848360bdc1b"}, + {file = "ijson-3.5.0-cp314-cp314t-win32.whl", hash = "sha256:a2619460d6795b70d0155e5bf016200ac8a63ab5397aa33588bb02b6c21759e6"}, + {file = "ijson-3.5.0-cp314-cp314t-win_amd64.whl", hash = "sha256:4f24b78d4ef028d17eb57ad1b16c0aed4a17bdd9badbf232dc5d9305b7e13854"}, + {file = "ijson-3.5.0-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:0ec62d397447cbe4941818c53e22b054e03250ff9cdbaea75144b11bc6db44ed"}, + {file = "ijson-3.5.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:75980237a16e5e36ad46fbdd33e3f3d817c187624974c48947df0a2bfa104b9e"}, + {file = "ijson-3.5.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:a9c321e8e1cdeac8aac698d09a90d98a049c9be8e8330c89cf2fcc517c96d51d"}, + {file = "ijson-3.5.0-cp39-cp39-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:92878b130d7ad71919c70b4f50ad23ec7fbf2d09a9c675f9179d49c4be869a63"}, + {file = "ijson-3.5.0-cp39-cp39-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:a1ab890d43656c1d12c4a8dafb7fac5a2278ed3e4408102e0971f48b6ed4583d"}, + {file = "ijson-3.5.0-cp39-cp39-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:a55185e8983fef0b21abc1a0bbaa11eeb2fabdc651e2167f23defa9fe4eb999b"}, + {file = "ijson-3.5.0-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:5a3af031e30751164c3289294f249f942535fbe7e8f35eb3ecc374247449214e"}, + {file = "ijson-3.5.0-cp39-cp39-musllinux_1_2_i686.whl", hash = "sha256:f4c8f5ccf7230a9a94c1d836322783ed0c0ec2a151f3d53b2e0a67c89ad66970"}, + {file = "ijson-3.5.0-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:6e249796d2090afc1c42d2458ab0dbf0072a30ffa246b5683e3f7b9dc9b1b7f9"}, + {file = "ijson-3.5.0-cp39-cp39-win32.whl", hash = "sha256:1b2cf2c0c79313fbc607a0d90788ffb4f4614872983af4aa85c5b92533ec4da2"}, + {file = "ijson-3.5.0-cp39-cp39-win_amd64.whl", hash = "sha256:d38cb03f6b7cc26d542ff710adfe98e5f6d53878461c45456c97d3668297ec0d"}, + {file = "ijson-3.5.0-pp311-pypy311_pp73-macosx_10_15_x86_64.whl", hash = "sha256:d64c624da0e9d692d6eb0ff63a79656b59d76bf80773a17c5b0f835e4e8ef627"}, + {file = "ijson-3.5.0-pp311-pypy311_pp73-macosx_11_0_arm64.whl", hash = "sha256:876f7df73b7e0d6474f9caa729b9cdbfc8e76de9075a4887dfd689e29e85c4ca"}, + {file = "ijson-3.5.0-pp311-pypy311_pp73-manylinux1_i686.manylinux_2_28_i686.manylinux_2_5_i686.whl", hash = "sha256:e7dbff2c8d9027809b0cde663df44f3210da10ea377121d42896fb6ee405dd31"}, + {file = "ijson-3.5.0-pp311-pypy311_pp73-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:4217a1edc278660679e1197c83a1a2a2d367792bfbb2a3279577f4b59b93730d"}, + {file = "ijson-3.5.0-pp311-pypy311_pp73-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:04f0fc740311388ee745ba55a12292b722d6f52000b11acbb913982ba5fbdf87"}, + {file = "ijson-3.5.0-pp311-pypy311_pp73-win_amd64.whl", hash = "sha256:fdeee6957f92e0c114f65c55cf8fe7eabb80cfacab64eea6864060913173f66d"}, + {file = "ijson-3.5.0.tar.gz", hash = "sha256:94688760720e3f5212731b3cb8d30267f9a045fb38fb3870254e7b9504246f31"}, +] + [[package]] name = "importlib-metadata" version = "8.7.0" @@ -7045,4 +7151,4 @@ vector-db-based = ["cohere", "langchain_community", "langchain_core", "langchain [metadata] lock-version = "2.1" python-versions = ">=3.10,<3.14" -content-hash = "b785d39f246498c8facd7854999dbdbfb78808489a09922dd3a1551be331ea7d" +content-hash = "50fd249190f0cfd134efa5df94efe41beddbf57ceb880babc02059a8451ac5ea" diff --git a/pyproject.toml b/pyproject.toml index bcdab217b..ab6975e50 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -84,6 +84,7 @@ cryptography = ">=44.0.0,<45.0.0" # Constrained as transitive dependency due to pytz = "2024.2" pytest = {version = "^7", optional = true } orjson = "^3.10.7" +ijson = "^3.3.0" # Streaming JSON parser used by `JsonItemsParser` to handle very large response bodies without OOMing. serpyco-rs = "^1.10.2" sqlalchemy = {version = "^2.0,!=2.0.36", optional = true } fastapi = { version = ">=0.116.1", optional = true } diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index d92d6c605..6b4658532 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -18,6 +18,7 @@ CompositeRawDecoder, CsvParser, GzipParser, + JsonItemsParser, JsonLineParser, JsonParser, ) @@ -362,3 +363,124 @@ def test_given_response_is_not_streamed_when_decode_then_can_be_called_multiple_ content_second_time = list(composite_raw_decoder.decode(response)) assert content == content_second_time + + +# --------------------------------------------------------------------------- +# JsonItemsParser +# --------------------------------------------------------------------------- + + +def _make_records(count: int) -> List[dict]: + return [{"id": i, "name": f"name-{i}"} for i in range(count)] + + +@pytest.mark.parametrize( + "payload, items_path, expected_count", + [ + pytest.param( + {"dataByDepartmentAndSearchTerm": _make_records(5)}, + "dataByDepartmentAndSearchTerm", + 5, + id="top_level_array", + ), + pytest.param( + {"data": {"users": _make_records(3)}}, + "data.users", + 3, + id="nested_array", + ), + pytest.param( + {"dataByAsin": []}, + "dataByAsin", + 0, + id="empty_array", + ), + ], +) +def test_json_items_parser_yields_each_item( + requests_mock, payload: dict, items_path: str, expected_count: int +) -> None: + requests_mock.register_uri( + "GET", "https://airbyte.io/", content=json.dumps(payload).encode("utf-8") + ) + response = requests.get("https://airbyte.io/", stream=True) + + decoder = CompositeRawDecoder(parser=JsonItemsParser(items_path=items_path)) + records = list(decoder.decode(response)) + + assert len(records) == expected_count + if expected_count: + # Records should be yielded in source order and match the payload. + expected_items = payload + for key in items_path.split("."): + expected_items = expected_items[key] + assert records == expected_items + + +@pytest.mark.parametrize("encoding", ["utf-8", "iso-8859-1"]) +def test_json_items_parser_honors_encoding(requests_mock, encoding: str) -> None: + payload = {"data": [{"name": "Hé"} for _ in range(3)]} + requests_mock.register_uri( + "GET", "https://airbyte.io/", content=json.dumps(payload).encode(encoding) + ) + response = requests.get("https://airbyte.io/", stream=True) + + decoder = CompositeRawDecoder(parser=JsonItemsParser(items_path="data", encoding=encoding)) + records = list(decoder.decode(response)) + + assert records == payload["data"] + + +def test_json_items_parser_composes_with_gzip(requests_mock) -> None: + payload = {"dataByAsin": _make_records(4)} + requests_mock.register_uri( + "GET", + "https://airbyte.io/", + content=compress_with_gzip(json.dumps(payload)), + headers={"Content-Encoding": "gzip"}, + ) + response = requests.get("https://airbyte.io/", stream=True) + + parser = GzipParser(inner_parser=JsonItemsParser(items_path="dataByAsin")) + decoder = CompositeRawDecoder(parser=parser) + + assert list(decoder.decode(response)) == payload["dataByAsin"] + + +def test_json_items_parser_requires_items_path() -> None: + parser = JsonItemsParser() + with pytest.raises(ValueError, match="items_path"): + list(parser.parse(BytesIO(b'{"data": []}'))) + + +def test_json_items_parser_is_lazy() -> None: + """The parser should yield the first record before reading the entire stream.""" + + class _CountingStream: + """A file-like wrapper that counts how many bytes have been read so far.""" + + def __init__(self, content: bytes) -> None: + self._buffer = BytesIO(content) + self.total_size = len(content) + self.bytes_read = 0 + + def read(self, size: int = -1) -> bytes: + chunk = self._buffer.read(size) + self.bytes_read += len(chunk) + return chunk + + def readable(self) -> bool: # pragma: no cover - interface compliance + return True + + # Large enough that pulling one record cannot require reading the whole document. + payload = {"data": _make_records(10_000)} + raw = json.dumps(payload).encode("utf-8") + stream = _CountingStream(raw) + + parser = JsonItemsParser(items_path="data") + iterator = parser.parse(stream) # type: ignore[arg-type] + + # Pull one item — we should not have consumed the entire document by this point. + first = next(iterator) + assert first == {"id": 0, "name": "name-0"} + assert stream.bytes_read < stream.total_size