Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2628,6 +2628,40 @@ definitions:
type:
type: string
enum: [JsonDecoder]
JsonItemsDecoder:
title: JSON Items (Streaming)
description: >-
Select 'JSON Items (Streaming)' to stream-decode a single JSON document
by yielding each element of a nested array, one at a time. Use this for
very large single-document JSON responses (e.g. a wrapping object
containing a multi-GB array) where buffering the whole document into
memory would cause out-of-memory errors. Powered by the `ijson`
streaming parser.
type: object
required:
- type
- items_path
properties:
type:
type: string
enum: [JsonItemsDecoder]
items_path:
title: Items Path
description: >-
Dot-separated path to the JSON array whose elements should be
yielded as records. Uses `ijson` path syntax (e.g. `data.users`),
not JSONPath syntax — do not include leading `$.` or trailing
`[*]`.
type: string
examples:
- dataByDepartmentAndSearchTerm
- dataByAsin
- data.users
encoding:
title: Encoding
description: Text encoding used to decode the streamed bytes before JSON parsing.
type: string
default: utf-8
JsonlDecoder:
title: JSON Lines
description: Select 'JSON Lines' if the response consists of JSON objects separated by new lines ('\n') in JSONL format.
Expand Down Expand Up @@ -2869,6 +2903,7 @@ definitions:
- "$ref": "#/definitions/CsvDecoder"
- "$ref": "#/definitions/GzipDecoder"
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/JsonItemsDecoder"
- "$ref": "#/definitions/JsonlDecoder"
ListPartitionRouter:
title: List Partition Router
Expand Down Expand Up @@ -3909,6 +3944,7 @@ definitions:
description: Component decoding the response so records can be extracted.
anyOf:
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/JsonItemsDecoder"
- "$ref": "#/definitions/XmlDecoder"
- "$ref": "#/definitions/CsvDecoder"
- "$ref": "#/definitions/JsonlDecoder"
Expand Down Expand Up @@ -3997,6 +4033,7 @@ definitions:
- "$ref": "#/definitions/CsvDecoder"
- "$ref": "#/definitions/GzipDecoder"
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/JsonItemsDecoder"
- "$ref": "#/definitions/JsonlDecoder"
CsvDecoder:
title: CSV
Expand Down Expand Up @@ -4163,6 +4200,7 @@ definitions:
- "$ref": "#/definitions/CsvDecoder"
- "$ref": "#/definitions/GzipDecoder"
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/JsonItemsDecoder"
- "$ref": "#/definitions/JsonlDecoder"
- "$ref": "#/definitions/IterableDecoder"
- "$ref": "#/definitions/XmlDecoder"
Expand All @@ -4175,6 +4213,7 @@ definitions:
- "$ref": "#/definitions/CsvDecoder"
- "$ref": "#/definitions/GzipDecoder"
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/JsonItemsDecoder"
- "$ref": "#/definitions/JsonlDecoder"
- "$ref": "#/definitions/IterableDecoder"
- "$ref": "#/definitions/XmlDecoder"
Expand Down
28 changes: 28 additions & 0 deletions airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from io import BufferedIOBase, TextIOWrapper
from typing import Any, List, Optional

import ijson
import orjson
import requests

Expand Down Expand Up @@ -98,6 +99,33 @@ def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
logger.warning(f"Cannot decode/parse line {line!r} as JSON, error: {e}")


@dataclass
class JsonItemsParser(Parser):
"""Streaming JSON parser that yields each element of a nested array.

Use this for very large single-document JSON responses where the records
of interest live under a nested array (e.g. `dataByDepartmentAndSearchTerm`,
`data.users`). Powered by `ijson`, this parser does not materialize the
full document — peak memory is bounded by a single record plus ijson's
internal parse buffers, regardless of document size.

`items_path` uses `ijson` dotted path syntax (e.g. `data.users`), not
JSONPath syntax (`$.data.users[*]`). Internally we append `.item`, which
is the `ijson` convention for "iterate elements of this array".
"""

items_path: str = ""
encoding: Optional[str] = "utf-8"

def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
if not self.items_path:
raise ValueError("JsonItemsParser requires a non-empty items_path.")
# ijson auto-selects the best available backend (yajl2_c when present)
# and reads from `data` lazily — it does not call `.read()` on the
# whole stream up front.
yield from ijson.items(data, f"{self.items_path}.item")


@dataclass
class CsvParser(Parser):
# TODO: migrate implementation to re-use file-base classes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,21 @@ class JsonDecoder(BaseModel):
type: Literal["JsonDecoder"]


class JsonItemsDecoder(BaseModel):
type: Literal["JsonItemsDecoder"]
items_path: str = Field(
...,
description="Dot-separated path to the JSON array whose elements should be yielded as records. Uses `ijson` path syntax (e.g. `data.users`), not JSONPath syntax — do not include leading `$.` or trailing `[*]`.",
examples=["dataByDepartmentAndSearchTerm", "dataByAsin", "data.users"],
title="Items Path",
)
encoding: Optional[str] = Field(
"utf-8",
description="Text encoding used to decode the streamed bytes before JSON parsing.",
title="Encoding",
)


class JsonlDecoder(BaseModel):
type: Literal["JsonlDecoder"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
CompositeRawDecoder,
CsvParser,
GzipParser,
JsonItemsParser,
JsonLineParser,
JsonParser,
Parser,
Expand Down Expand Up @@ -321,6 +322,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
JsonFileSchemaLoader as JsonFileSchemaLoaderModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
JsonItemsDecoder as JsonItemsDecoderModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
JsonlDecoder as JsonlDecoderModel,
)
Expand Down Expand Up @@ -763,6 +767,7 @@ def _init_mappings(self) -> None:
HttpResponseFilterModel: self.create_http_response_filter,
InlineSchemaLoaderModel: self.create_inline_schema_loader,
JsonDecoderModel: self.create_json_decoder,
JsonItemsDecoderModel: self.create_json_items_decoder,
JsonlDecoderModel: self.create_jsonl_decoder,
JsonSchemaPropertySelectorModel: self.create_json_schema_property_selector,
GzipDecoderModel: self.create_gzip_decoder,
Expand Down Expand Up @@ -2656,6 +2661,14 @@ def create_jsonl_decoder(
stream_response=False if self._emit_connector_builder_messages else True,
)

def create_json_items_decoder(
self, model: JsonItemsDecoderModel, config: Config, **kwargs: Any
) -> Decoder:
return CompositeRawDecoder(
parser=ModelToComponentFactory._get_parser(model, config),
stream_response=False if self._emit_connector_builder_messages else True,
)

def create_gzip_decoder(
self, model: GzipDecoderModel, config: Config, **kwargs: Any
) -> Decoder:
Expand Down Expand Up @@ -2704,6 +2717,11 @@ def _get_parser(model: BaseModel, config: Config) -> Parser:
if isinstance(model, JsonDecoderModel):
# Note that the logic is a bit different from the JsonDecoder as there is some legacy that is maintained to return {} on error cases
return JsonParser()
elif isinstance(model, JsonItemsDecoderModel):
return JsonItemsParser(
items_path=model.items_path,
encoding=model.encoding or "utf-8",
)
elif isinstance(model, JsonlDecoderModel):
return JsonLineParser()
elif isinstance(model, CsvDecoderModel):
Expand Down
Loading
Loading