diff --git a/airbyte_cdk/sources/declarative/expanders/record_expander.py b/airbyte_cdk/sources/declarative/expanders/record_expander.py index 6b489243d..1d8ec8289 100644 --- a/airbyte_cdk/sources/declarative/expanders/record_expander.py +++ b/airbyte_cdk/sources/declarative/expanders/record_expander.py @@ -2,7 +2,6 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # -import copy from dataclasses import InitVar, dataclass from enum import Enum from typing import Any, Iterable, Mapping, MutableMapping, Sequence @@ -59,7 +58,9 @@ class RecordExpander: Items from this array will be extracted and emitted as separate records. Supports wildcards (*). remain_original_record: If True, each expanded record will include the original - parent record in an "original_record" field. Defaults to False. + parent record in an "original_record" field. The parent record is shared + (not deep-copied) across all expanded siblings, so treat it as read-only. + Defaults to False. on_no_records: Behavior when expansion produces no records. "skip" (default) emits nothing. "emit_parent" emits the original parent record unchanged. config: The user-provided configuration as specified by the source's spec. @@ -111,7 +112,7 @@ def expand_record(self, record: MutableMapping[Any, Any]) -> Iterable[MutableMap if self.remain_original_record: yield { "value": item, - "original_record": copy.deepcopy(parent_record), + "original_record": parent_record, } else: yield item @@ -125,4 +126,4 @@ def _apply_parent_context( ) -> None: """Apply parent context to a child record.""" if self.remain_original_record: - child_record["original_record"] = copy.deepcopy(parent_record) + child_record["original_record"] = parent_record diff --git a/unit_tests/sources/declarative/extractors/test_dpath_extractor.py b/unit_tests/sources/declarative/extractors/test_dpath_extractor.py index 94fb72743..37f66ed98 100644 --- a/unit_tests/sources/declarative/extractors/test_dpath_extractor.py +++ b/unit_tests/sources/declarative/extractors/test_dpath_extractor.py @@ -416,6 +416,104 @@ def test_dpath_extractor_with_expansion( assert actual_records == expected_records +def test_record_expander_shares_parent_reference_for_dict_items(): + """Expanded dict records share the same parent reference instead of deep-copying. + + This prevents O(N²) memory usage when a parent record contains a large nested + array (e.g., a Stripe invoice event with many line items). Each expanded child + record's `original_record` should be the exact same object as all siblings. + """ + parent = { + "id": "evt_123", + "type": "invoice.updated", + "data": { + "object": { + "id": "in_456", + "lines": { + "data": [ + {"id": "il_1", "amount": 100}, + {"id": "il_2", "amount": 200}, + {"id": "il_3", "amount": 300}, + ] + }, + } + }, + } + + expander = RecordExpander( + expand_records_from_field=["data", "object", "lines", "data"], + config={}, + parameters={}, + remain_original_record=True, + ) + + records = list(expander.expand_record(parent)) + assert len(records) == 3 + + # All expanded records should share the same parent reference (identity, not just equality) + for rec in records: + assert rec["original_record"] is parent + + # Values are still correct + assert records[0]["id"] == "il_1" + assert records[1]["id"] == "il_2" + assert records[2]["id"] == "il_3" + + +def test_record_expander_shares_parent_reference_for_non_dict_items(): + """Non-dict expanded records also share the same parent reference.""" + parent = {"items": [1, "a", 3.14], "meta": "info"} + + expander = RecordExpander( + expand_records_from_field=["items"], + config={}, + parameters={}, + remain_original_record=True, + ) + + records = list(expander.expand_record(parent)) + assert len(records) == 3 + + for rec in records: + assert rec["original_record"] is parent + + +def test_record_expander_large_expansion_memory_efficient(): + """Expanding a record with many children should not multiply parent memory. + + Simulates a Stripe-like scenario: an event with 500 line items. With deep + copy this would create 500 copies of the full event (O(N²) memory). With + shared references, memory usage stays O(N). + """ + n_items = 500 + parent = { + "id": "evt_large", + "type": "invoice.updated", + "data": { + "object": { + "id": "in_big", + "lines": {"data": [{"id": f"il_{i}", "amount": i * 100} for i in range(n_items)]}, + } + }, + } + + expander = RecordExpander( + expand_records_from_field=["data", "object", "lines", "data"], + config={}, + parameters={}, + remain_original_record=True, + ) + + records = list(expander.expand_record(parent)) + assert len(records) == n_items + + # All share the same parent object — no deep copies + for rec in records: + assert rec["original_record"] is parent + assert records[0]["id"] == "il_0" + assert records[-1]["id"] == f"il_{n_items - 1}" + + @pytest.mark.parametrize( "field_path, expand_records_from_field, on_no_records, body, expected_records", [