Skip to content

feat: add JsonItemsDecoder for streaming large JSON responses#1026

Draft
devin-ai-integration[bot] wants to merge 4 commits into
mainfrom
devin/1778790048-streaming-json-items-decoder
Draft

feat: add JsonItemsDecoder for streaming large JSON responses#1026
devin-ai-integration[bot] wants to merge 4 commits into
mainfrom
devin/1778790048-streaming-json-items-decoder

Conversation

@devin-ai-integration
Copy link
Copy Markdown
Contributor

Summary

Adds a new declarative JsonItemsDecoder that streams elements of a nested array out of a single JSON document one at a time, so manifest-only connectors can decode multi-GB JSON responses without OOMing.

Related to https://github.com/airbytehq/oncall/issues/12143:

That issue surfaced 8 GB-cap OOM (exit code 137) on source-amazon-seller-partner's Brand Analytics streams. Today the only ways to JSON-decode a single large document in the declarative CDK are JsonDecoder (full response.contentorjson.loads) and GzipDecoder wrapping JsonDecoder (full decompress → full parse). Both materialize the entire payload in memory. The closed connector-side fix (airbytehq/airbyte#77709) added a custom Python component for this, but per maintainer feedback we want it as a first-class CDK component so any connector can opt in via YAML.

What's in this PR

CDK-side only:

  • New parser: JsonItemsParser in airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py, alongside JsonParser / JsonLineParser / CsvParser / GzipParser. Uses ijson.items(stream, f"{items_path}.item") to lazily yield each element of the configured array.
  • New schema entry: JsonItemsDecoder in declarative_component_schema.yaml with items_path (required) and encoding (default utf-8). Added to the anyOf unions for GzipDecoder.decoder, ZipfileDecoder.decoder, and the top-level decoder / download_decoder slots.
  • Pydantic models regenerated via poe assemble.
  • Factory wiring in model_to_component_factory.py: create_json_items_decoder + a new branch in _get_parser that builds JsonItemsParser(items_path=..., encoding=...).
  • Runtime dep: ijson = "^3.3.0" added to [tool.poetry.dependencies] (this is what was in the now-closed build: add ijson as a runtime dependency #1011). Since the CDK now imports ijson directly, dropped the DEP002 ignore entry that PR added.
  • Unit tests in unit_tests/sources/declarative/decoders/test_composite_decoder.py covering: top-level / nested / empty array paths, encoding, gzip composition, required-field validation, and a _CountingStream-based test confirming the parser yields the first item before consuming the full document (lazy streaming).

Example connector manifest after this lands:

download_decoder:
  type: GzipDecoder
  decoder:
    type: JsonItemsDecoder
    items_path: dataByDepartmentAndSearchTerm
download_extractor:
  type: DpathExtractor
  field_path: []

Declarative-First Evaluation

This is the declarative approach. The previous attempt used a custom Python component in the connector; the new component is a generic, reusable building block that any manifest-only connector can opt into via YAML, no custom code required. Existing declarative decoders (JsonDecoder, GzipDecoder wrapping JsonDecoder) cannot stream a single large document — they buffer the full payload — so neither can solve the OOM on their own.

Local verification

  • poetry run pytest unit_tests/sources/declarative/decoders/ -x → 60 passed
  • poetry run pytest unit_tests/sources/declarative/parsers/ -x → 157 passed
  • poetry run ruff check + ruff format --check on changed files → clean
  • poetry run mypy --config-file mypy.ini on the two modified airbyte_cdk/ files → clean

Review & Testing Checklist for Human

  • Confirm the items_path syntax (ijson dotted path with implicit .item suffix, not JSONPath) is the right ergonomics for connector authors. The schema description and parser docstring spell this out, but it differs from how DpathExtractor uses field_path.
  • Sanity-check the anyOf wiring: JsonItemsDecoder is now valid wherever JsonDecoder is valid (top-level decoder / download_decoder, inside GzipDecoder, inside ZipfileDecoder). Confirm that's the desired scope.
  • Confirm pinning to ijson = "^3.3.0" is acceptable (3.3.0 is the floor where ijson.items(stream, "path.item") behaves consistently across backends; 3.5.0 is what poetry currently resolves to).
  • Test plan: once this lands and an SDM image with ijson ships, the follow-up connector PR replaces source-amazon-seller-partner's custom GzipJsonStreamingItemsDecoder with a manifest-only JsonItemsDecoder (5 Brand Analytics streams). End-to-end memory metrics will be captured then; the CDK unit test only asserts the parser is lazy (does not read the full document before yielding the first item) and that ijson is wired correctly.

Notes

  • JsonItemsDecoder is a sibling of JsonDecoder rather than a flag on it — JSON path semantics and encoding are first-class enough that overloading JsonDecoder would muddy the schema for the common case.
  • Naming was discussed and confirmed: JsonItemsDecoder + items_path. Avoided "streaming" in the name because streaming is implicit for CompositeRawDecoder-backed parsers.
  • Follow-up: once this lands and a new SDM image with ijson is published, I'll open a connector-side PR that:
    • Deletes GzipJsonStreamingItemsDecoder from source-amazon-seller-partner/components.py
    • Switches the 5 Brand Analytics streams in manifest.yaml to JsonItemsDecoder + DpathExtractor
    • Bumps baseImage and the connector PATCH version

Link to Devin session: https://app.devin.ai/sessions/e31a7df6ebe54ce4a68e0eecc7117555

devin-ai-integration Bot and others added 2 commits May 14, 2026 20:20
Adds the ijson streaming JSON parser as a direct dependency so connectors that
ship inside the source-declarative-manifest base image can stream-parse very
large JSON response bodies without materializing the full document in memory.

Motivation: source-amazon-seller-partner currently OOMs while reading
GET_BRAND_ANALYTICS_SEARCH_TERMS_REPORT documents that can exceed 3 GB
uncompressed. See airbytehq/oncall#12143.
Adds a new declarative decoder, JsonItemsDecoder, that streams elements
of a nested array out of a single JSON document one at a time using the
ijson library. This lets manifest-only connectors decode multi-GB JSON
responses (e.g. Amazon Seller Partner Brand Analytics reports) without
loading the full document into memory.

- New `JsonItemsParser` in composite_raw_decoder.py (wraps ijson.items)
- New `JsonItemsDecoder` schema entry, wired into GzipDecoder /
  ZipfileDecoder / top-level decoder unions so it composes with the
  existing decoder hierarchy
- Pydantic models regenerated from schema
- Factory: create_json_items_decoder + JsonItemsDecoderModel handling
  in _get_parser
- Drop ijson from deptry DEP002 ignore list now that the CDK imports it
  directly; update pyproject.toml comment to reflect first-class use
- Unit tests covering top-level, nested, empty, encoding, gzip
  composition, missing path validation, and lazy streaming behavior
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions
Copy link
Copy Markdown

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

💡 Show Tips and Tricks

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1778790048-streaming-json-items-decoder#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1778790048-streaming-json-items-decoder

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment
📚 Show Repo Guidance

Helpful Resources

📝 Edit this welcome message.

…msDecoder

The earlier regeneration via `poe assemble` produced datamodel-code-generator drift
unrelated to this PR (Optional[conint(ge=1)] instead of Optional[int] + ge=1 kwarg,
removed ScopesJoinStrategy, reordered classes, whitespace in descriptions). That
drift broke mypy on Python 3.13. Reset the generated file to match main and add
only the new `JsonItemsDecoder` Pydantic class manually, mirroring the style of
`JsonDecoder` / `JsonlDecoder`.
@github-actions
Copy link
Copy Markdown

PyTest Results (Fast)

4 074 tests  +3 481   4 062 ✅ +3 480   7m 18s ⏱️ + 3m 47s
    1 suites ±    0      12 💤 +    2 
    1 files   ±    0       0 ❌  -     1 

Results for commit fe89f60. ± Comparison against base commit 19a7083.

This pull request skips 1 test.
unit_tests.sources.declarative.test_concurrent_declarative_source ‑ test_read_with_concurrent_and_synchronous_streams

@github-actions
Copy link
Copy Markdown

PyTest Results (Full)

4 077 tests  +8   4 065 ✅ +8   11m 13s ⏱️ +23s
    1 suites ±0      12 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit fe89f60. ± Comparison against base commit 19a7083.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

0 participants