feat: add JsonItemsDecoder for streaming large JSON responses#1026
Draft
devin-ai-integration[bot] wants to merge 4 commits into
Draft
feat: add JsonItemsDecoder for streaming large JSON responses#1026devin-ai-integration[bot] wants to merge 4 commits into
devin-ai-integration[bot] wants to merge 4 commits into
Conversation
Adds the ijson streaming JSON parser as a direct dependency so connectors that ship inside the source-declarative-manifest base image can stream-parse very large JSON response bodies without materializing the full document in memory. Motivation: source-amazon-seller-partner currently OOMs while reading GET_BRAND_ANALYTICS_SEARCH_TERMS_REPORT documents that can exceed 3 GB uncompressed. See airbytehq/oncall#12143.
Adds a new declarative decoder, JsonItemsDecoder, that streams elements of a nested array out of a single JSON document one at a time using the ijson library. This lets manifest-only connectors decode multi-GB JSON responses (e.g. Amazon Seller Partner Brand Analytics reports) without loading the full document into memory. - New `JsonItemsParser` in composite_raw_decoder.py (wraps ijson.items) - New `JsonItemsDecoder` schema entry, wired into GzipDecoder / ZipfileDecoder / top-level decoder unions so it composes with the existing decoder hierarchy - Pydantic models regenerated from schema - Factory: create_json_items_decoder + JsonItemsDecoderModel handling in _get_parser - Drop ijson from deptry DEP002 ignore list now that the CDK imports it directly; update pyproject.toml comment to reflect first-class use - Unit tests covering top-level, nested, empty, encoding, gzip composition, missing path validation, and lazy streaming behavior
Contributor
Author
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1778790048-streaming-json-items-decoder#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1778790048-streaming-json-items-decoderPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
…msDecoder The earlier regeneration via `poe assemble` produced datamodel-code-generator drift unrelated to this PR (Optional[conint(ge=1)] instead of Optional[int] + ge=1 kwarg, removed ScopesJoinStrategy, reordered classes, whitespace in descriptions). That drift broke mypy on Python 3.13. Reset the generated file to match main and add only the new `JsonItemsDecoder` Pydantic class manually, mirroring the style of `JsonDecoder` / `JsonlDecoder`.
PyTest Results (Fast)4 074 tests +3 481 4 062 ✅ +3 480 7m 18s ⏱️ + 3m 47s Results for commit fe89f60. ± Comparison against base commit 19a7083. This pull request skips 1 test. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds a new declarative
JsonItemsDecoderthat streams elements of a nested array out of a single JSON document one at a time, so manifest-only connectors can decode multi-GB JSON responses without OOMing.Related to https://github.com/airbytehq/oncall/issues/12143:
That issue surfaced 8 GB-cap OOM (
exit code 137) on source-amazon-seller-partner's Brand Analytics streams. Today the only ways to JSON-decode a single large document in the declarative CDK areJsonDecoder(fullresponse.content→orjson.loads) andGzipDecoderwrappingJsonDecoder(full decompress → full parse). Both materialize the entire payload in memory. The closed connector-side fix (airbytehq/airbyte#77709) added a custom Python component for this, but per maintainer feedback we want it as a first-class CDK component so any connector can opt in via YAML.What's in this PR
CDK-side only:
JsonItemsParserinairbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py, alongsideJsonParser/JsonLineParser/CsvParser/GzipParser. Usesijson.items(stream, f"{items_path}.item")to lazily yield each element of the configured array.JsonItemsDecoderindeclarative_component_schema.yamlwithitems_path(required) andencoding(defaultutf-8). Added to theanyOfunions forGzipDecoder.decoder,ZipfileDecoder.decoder, and the top-leveldecoder/download_decoderslots.poe assemble.model_to_component_factory.py:create_json_items_decoder+ a new branch in_get_parserthat buildsJsonItemsParser(items_path=..., encoding=...).ijson = "^3.3.0"added to[tool.poetry.dependencies](this is what was in the now-closed build: add ijson as a runtime dependency #1011). Since the CDK now importsijsondirectly, dropped theDEP002ignore entry that PR added.unit_tests/sources/declarative/decoders/test_composite_decoder.pycovering: top-level / nested / empty array paths, encoding, gzip composition, required-field validation, and a_CountingStream-based test confirming the parser yields the first item before consuming the full document (lazy streaming).Example connector manifest after this lands:
Declarative-First Evaluation
This is the declarative approach. The previous attempt used a custom Python component in the connector; the new component is a generic, reusable building block that any manifest-only connector can opt into via YAML, no custom code required. Existing declarative decoders (
JsonDecoder,GzipDecoderwrappingJsonDecoder) cannot stream a single large document — they buffer the full payload — so neither can solve the OOM on their own.Local verification
poetry run pytest unit_tests/sources/declarative/decoders/ -x→ 60 passedpoetry run pytest unit_tests/sources/declarative/parsers/ -x→ 157 passedpoetry run ruff check+ruff format --checkon changed files → cleanpoetry run mypy --config-file mypy.inion the two modifiedairbyte_cdk/files → cleanReview & Testing Checklist for Human
items_pathsyntax (ijsondotted path with implicit.itemsuffix, not JSONPath) is the right ergonomics for connector authors. The schema description and parser docstring spell this out, but it differs from howDpathExtractorusesfield_path.anyOfwiring:JsonItemsDecoderis now valid whereverJsonDecoderis valid (top-leveldecoder/download_decoder, insideGzipDecoder, insideZipfileDecoder). Confirm that's the desired scope.ijson = "^3.3.0"is acceptable (3.3.0 is the floor whereijson.items(stream, "path.item")behaves consistently across backends; 3.5.0 is what poetry currently resolves to).ijsonships, the follow-up connector PR replacessource-amazon-seller-partner's customGzipJsonStreamingItemsDecoderwith a manifest-onlyJsonItemsDecoder(5 Brand Analytics streams). End-to-end memory metrics will be captured then; the CDK unit test only asserts the parser is lazy (does not read the full document before yielding the first item) and that ijson is wired correctly.Notes
JsonItemsDecoderis a sibling ofJsonDecoderrather than a flag on it — JSON path semantics and encoding are first-class enough that overloadingJsonDecoderwould muddy the schema for the common case.JsonItemsDecoder+items_path. Avoided "streaming" in the name because streaming is implicit forCompositeRawDecoder-backed parsers.ijsonis published, I'll open a connector-side PR that:GzipJsonStreamingItemsDecoderfromsource-amazon-seller-partner/components.pymanifest.yamltoJsonItemsDecoder+DpathExtractorbaseImageand the connector PATCH versionLink to Devin session: https://app.devin.ai/sessions/e31a7df6ebe54ce4a68e0eecc7117555