From 7062a57221dcdfacccaf7ea9e83c434ea0633681 Mon Sep 17 00:00:00 2001 From: Christian Bush Date: Mon, 9 Mar 2026 13:29:24 -0700 Subject: [PATCH 1/7] [DataLoader] Add pyiceberg ArrivalOrder support via upstream PR MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Point pyiceberg dependency at sumedhsakdeo/iceberg-python#3046 which adds ArrivalOrder — bounded-memory concurrent record batch streaming — to ArrowScan.to_record_batches(). Co-Authored-By: Sumedh Sakdeo --- integrations/python/dataloader/pyproject.toml | 3 + .../dataloader/tests/test_arrival_order.py | 132 ++++++++++++++++++ integrations/python/dataloader/uv.lock | 21 +-- 3 files changed, 137 insertions(+), 19 deletions(-) create mode 100644 integrations/python/dataloader/tests/test_arrival_order.py diff --git a/integrations/python/dataloader/pyproject.toml b/integrations/python/dataloader/pyproject.toml index 44c8ec0f8..04aa29892 100644 --- a/integrations/python/dataloader/pyproject.toml +++ b/integrations/python/dataloader/pyproject.toml @@ -32,6 +32,9 @@ disallow_untyped_defs = true module = ["datafusion.*", "pyiceberg.*", "pyarrow.*", "tenacity.*"] ignore_missing_imports = true +[tool.uv.sources] +pyiceberg = { git = "https://github.com/sumedhsakdeo/iceberg-python", branch = "fix/arrow-scan-benchmark-3036" } + [tool.ruff] line-length = 120 target-version = "py312" diff --git a/integrations/python/dataloader/tests/test_arrival_order.py b/integrations/python/dataloader/tests/test_arrival_order.py new file mode 100644 index 000000000..17033e22c --- /dev/null +++ b/integrations/python/dataloader/tests/test_arrival_order.py @@ -0,0 +1,132 @@ +"""Tests verifying the ArrivalOrder API from pyiceberg PR #3046 is available and functional. + +These tests confirm that the openhouse dataloader can access the new ScanOrder class hierarchy +added upstream (apache/iceberg-python#3046) and that ArrowScan.to_record_batches accepts the +order parameter. +""" + +import os + +import pyarrow as pa +import pyarrow.parquet as pq +import pytest +from pyiceberg.expressions import AlwaysTrue +from pyiceberg.io import load_file_io +from pyiceberg.io.pyarrow import ArrowScan +from pyiceberg.manifest import DataFile, FileFormat +from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC +from pyiceberg.schema import Schema +from pyiceberg.table import ArrivalOrder, FileScanTask, ScanOrder, TaskOrder +from pyiceberg.table.metadata import new_table_metadata +from pyiceberg.table.sorting import UNSORTED_SORT_ORDER +from pyiceberg.types import LongType, NestedField, StringType + +_SCHEMA = Schema( + NestedField(field_id=1, name="id", field_type=LongType(), required=False), + NestedField(field_id=2, name="name", field_type=StringType(), required=False), +) + + +def _write_parquet(tmp_path: object, table: pa.Table) -> str: + """Write a parquet file with Iceberg field IDs and return its path.""" + file_path = str(tmp_path / "test.parquet") # type: ignore[operator] + fields = [field.with_metadata({b"PARQUET:field_id": str(i + 1).encode()}) for i, field in enumerate(table.schema)] + pq.write_table(table.cast(pa.schema(fields)), file_path) + return file_path + + +def _make_arrow_scan_and_task(tmp_path: object, table: pa.Table) -> tuple[ArrowScan, FileScanTask]: + """Create an ArrowScan and FileScanTask from a PyArrow table written to disk.""" + file_path = _write_parquet(tmp_path, table) + + metadata = new_table_metadata( + schema=_SCHEMA, + partition_spec=UNPARTITIONED_PARTITION_SPEC, + sort_order=UNSORTED_SORT_ORDER, + location=str(tmp_path), + properties={}, + ) + + arrow_scan = ArrowScan( + table_metadata=metadata, + io=load_file_io(properties={}, location=file_path), + projected_schema=_SCHEMA, + row_filter=AlwaysTrue(), + ) + + data_file = DataFile.from_args( + file_path=file_path, + file_format=FileFormat.PARQUET, + record_count=table.num_rows, + file_size_in_bytes=os.path.getsize(file_path), + ) + data_file._spec_id = 0 + task = FileScanTask(data_file=data_file) + + return arrow_scan, task + + +def _sample_table() -> pa.Table: + return pa.table( + { + "id": pa.array([1, 2, 3], type=pa.int64()), + "name": pa.array(["alice", "bob", "charlie"], type=pa.string()), + } + ) + + +class TestScanOrderImports: + """Verify the ScanOrder class hierarchy is importable from pyiceberg.table.""" + + def test_scan_order_base_class_exists(self) -> None: + assert ScanOrder is not None + + def test_task_order_is_scan_order(self) -> None: + assert issubclass(TaskOrder, ScanOrder) + + def test_arrival_order_is_scan_order(self) -> None: + assert issubclass(ArrivalOrder, ScanOrder) + + def test_arrival_order_default_params(self) -> None: + ao = ArrivalOrder() + assert ao.concurrent_streams == 8 + assert ao.batch_size is None + assert ao.max_buffered_batches == 16 + + def test_arrival_order_custom_params(self) -> None: + ao = ArrivalOrder(concurrent_streams=4, batch_size=32768, max_buffered_batches=8) + assert ao.concurrent_streams == 4 + assert ao.batch_size == 32768 + assert ao.max_buffered_batches == 8 + + def test_arrival_order_rejects_invalid_concurrent_streams(self) -> None: + with pytest.raises(ValueError, match="concurrent_streams"): + ArrivalOrder(concurrent_streams=0) + + def test_arrival_order_rejects_invalid_max_buffered_batches(self) -> None: + with pytest.raises(ValueError, match="max_buffered_batches"): + ArrivalOrder(max_buffered_batches=0) + + +class TestToRecordBatchesOrder: + """Verify ArrowScan.to_record_batches accepts the order parameter and returns correct data.""" + + def test_default_order_returns_all_rows(self, tmp_path: object) -> None: + """Default (TaskOrder) still works — backward compatible.""" + arrow_scan, task = _make_arrow_scan_and_task(tmp_path, _sample_table()) + batches = list(arrow_scan.to_record_batches([task])) + result = pa.Table.from_batches(batches).sort_by("id") + assert result.column("id").to_pylist() == [1, 2, 3] + + def test_explicit_task_order_returns_all_rows(self, tmp_path: object) -> None: + arrow_scan, task = _make_arrow_scan_and_task(tmp_path, _sample_table()) + batches = list(arrow_scan.to_record_batches([task], order=TaskOrder())) + result = pa.Table.from_batches(batches).sort_by("id") + assert result.column("id").to_pylist() == [1, 2, 3] + + def test_arrival_order_returns_all_rows(self, tmp_path: object) -> None: + arrow_scan, task = _make_arrow_scan_and_task(tmp_path, _sample_table()) + batches = list(arrow_scan.to_record_batches([task], order=ArrivalOrder(concurrent_streams=2))) + result = pa.Table.from_batches(batches).sort_by("id") + assert result.column("id").to_pylist() == [1, 2, 3] + assert result.column("name").to_pylist() == ["alice", "bob", "charlie"] diff --git a/integrations/python/dataloader/uv.lock b/integrations/python/dataloader/uv.lock index b9ad8b32d..5108decf4 100644 --- a/integrations/python/dataloader/uv.lock +++ b/integrations/python/dataloader/uv.lock @@ -571,7 +571,7 @@ dev = [ requires-dist = [ { name = "datafusion", specifier = "==51.0.0" }, { name = "mypy", marker = "extra == 'dev'", specifier = ">=1.14.0" }, - { name = "pyiceberg", specifier = "~=0.11.0" }, + { name = "pyiceberg", git = "https://github.com/sumedhsakdeo/iceberg-python?branch=fix%2Farrow-scan-benchmark-3036" }, { name = "pytest", marker = "extra == 'dev'", specifier = ">=8.0.0" }, { name = "requests", specifier = ">=2.31.0" }, { name = "responses", marker = "extra == 'dev'", specifier = ">=0.25.0" }, @@ -759,7 +759,7 @@ wheels = [ [[package]] name = "pyiceberg" version = "0.11.0" -source = { registry = "https://pypi.org/simple" } +source = { git = "https://github.com/sumedhsakdeo/iceberg-python?branch=fix%2Farrow-scan-benchmark-3036#75ba28bfc6d8bbeac398357c6db80327632a2dc8" } dependencies = [ { name = "cachetools" }, { name = "click" }, @@ -774,23 +774,6 @@ dependencies = [ { name = "tenacity" }, { name = "zstandard" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/bd/22/3d02ad39710bf51834d108e6d548cee9c1916850460ccba80db47a982567/pyiceberg-0.11.0.tar.gz", hash = "sha256:095bbafc87d204cf8d3ffc1c434e07cf9a67a709192ac0b11dcb0f8251f7ad4e", size = 1074873, upload-time = "2026-02-10T02:28:20.762Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/c6/37/b5a818444f5563ee2dacac93cc690e63396ab60308be353502dc7008168b/pyiceberg-0.11.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:6fc89c9581d42ff2383cc9ba3f443ab9f175d8e85216ecbd819e955e9069bc46", size = 532694, upload-time = "2026-02-10T02:28:01.298Z" }, - { url = "https://files.pythonhosted.org/packages/7d/f9/ef76d6cf62a7ba9d61a5e20216000d4b366d8eac3be5c89c2ce5c8eb38f9/pyiceberg-0.11.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:e2dfdf5438cc5ad8eb8b2e3f7a41ab6f286fe8b6fd6f5c1407381f627097e2e0", size = 532901, upload-time = "2026-02-10T02:28:02.517Z" }, - { url = "https://files.pythonhosted.org/packages/15/2a/bcec7d0ca75259cdb83ddceee1c59cdad619d2dfe36cee802c7e7207d96a/pyiceberg-0.11.0-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:4543e93c78bb4fd78da7093c8232d62487a68661ba6bff0bafc0b346b34ca38c", size = 729261, upload-time = "2026-02-10T02:28:03.694Z" }, - { url = "https://files.pythonhosted.org/packages/99/ff/db75a2062a0b4b64ad0a6c677cab5b6e3ac19e0820584c597e1822f2cf7c/pyiceberg-0.11.0-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:8dda2ad8d57e3af743ab67d976a23ca1cd54a4849110b5c2375f5d9466a4ae80", size = 729979, upload-time = "2026-02-10T02:28:04.878Z" }, - { url = "https://files.pythonhosted.org/packages/d8/eb/453e8c4a7e6eb698bf1402337e3cd3516f20c4bbe0f06961d3e6c5031cca/pyiceberg-0.11.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:b5999fb41ea0b4b153a5c80d56512ef0596f95fdd62512d1806b8db89fd4a5f9", size = 723778, upload-time = "2026-02-10T02:28:06.573Z" }, - { url = "https://files.pythonhosted.org/packages/c8/7b/4f38016722ecc04f97000f7b7f80ba1d74e66dcbf630a4c2b620b5393ce0/pyiceberg-0.11.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:63c76f882ad30bda5b5fc685c6ab053e5b5585eadab04d1afc515eec4e272b14", size = 726955, upload-time = "2026-02-10T02:28:08.684Z" }, - { url = "https://files.pythonhosted.org/packages/56/14/dc689c0637d7f6716cae614afcce5782903cc87a781dfd47e6d6e72ce104/pyiceberg-0.11.0-cp312-cp312-win_amd64.whl", hash = "sha256:4bb26a9308e8bb97c1d3518209d221f2a790a37b9806b8b91fee4c47be4919a6", size = 531019, upload-time = "2026-02-10T02:28:10.333Z" }, - { url = "https://files.pythonhosted.org/packages/c6/72/ef1e816d79d703eec1182398947a6b72f502eefeee01c4484bd5e1493b07/pyiceberg-0.11.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:c707f4463dd9c1ca664d41d5ddd38babadf1bf5fa1946cb591c033a6a2827eb4", size = 532359, upload-time = "2026-02-10T02:28:11.473Z" }, - { url = "https://files.pythonhosted.org/packages/1f/41/ec85279b1b8ed57d0d27d4675203d314b8f5d69383e1df68f615f45e9dda/pyiceberg-0.11.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:f1c944969fda799a2d26dc6f57448ace44ee07e334306ba6f5110df1aadeeef1", size = 532496, upload-time = "2026-02-10T02:28:13.19Z" }, - { url = "https://files.pythonhosted.org/packages/b9/b4/02861c450057c9a6e2f2e1eb0ef735c2e28473cff60b2747c50d0427ec1c/pyiceberg-0.11.0-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:1be075b9ecc175b8dd76822b081b379ce33cda33d6403eaf607268f6061f3275", size = 721917, upload-time = "2026-02-10T02:28:14.484Z" }, - { url = "https://files.pythonhosted.org/packages/16/cf/924b7b14267d47f5055bb5d032c7d24eb9542ac3631b460e1398fe9935ea/pyiceberg-0.11.0-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:a3507d079d43d724bffb80e75201f2995822af844b674642dcf73c19d5303994", size = 723754, upload-time = "2026-02-10T02:28:15.77Z" }, - { url = "https://files.pythonhosted.org/packages/24/a1/df2d73af6dc3ee301e727d0bef4421c57de02b5030cf38e39ed25ef36154/pyiceberg-0.11.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:eb3719cd61a0512596b4306283072de443d84ec7b68654f565b0d7c2d7cdeeeb", size = 715749, upload-time = "2026-02-10T02:28:17.034Z" }, - { url = "https://files.pythonhosted.org/packages/8e/0a/c3cdcd5ed417aceb2f73e8463d97e8dd7e3f7021015d0c8d51394a5c5a63/pyiceberg-0.11.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:b9a71fd6b1c3c625ed2a9ca2cecf0dc8713acc5814e78c9becde3b1f42315c35", size = 720600, upload-time = "2026-02-10T02:28:18.275Z" }, - { url = "https://files.pythonhosted.org/packages/01/b8/29ec7281fb831ab983f953b00924c1cc3ebc21e9f67a1466af9b63767ba4/pyiceberg-0.11.0-cp313-cp313-win_amd64.whl", hash = "sha256:bed2df9eb7e1496af22fa2307dbd13f29865b98ba5851695ffd1f4436edc05f9", size = 530631, upload-time = "2026-02-10T02:28:19.561Z" }, -] [[package]] name = "pyparsing" From 9047297a8f2f7448f19534aa69c435e85f17c464 Mon Sep 17 00:00:00 2001 From: Christian Bush Date: Mon, 9 Mar 2026 14:15:59 -0700 Subject: [PATCH 2/7] Pin pyiceberg source to SHA instead of branch Co-Authored-By: Sumedh Sakdeo --- integrations/python/dataloader/pyproject.toml | 2 +- integrations/python/dataloader/uv.lock | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/integrations/python/dataloader/pyproject.toml b/integrations/python/dataloader/pyproject.toml index 04aa29892..efda7fe11 100644 --- a/integrations/python/dataloader/pyproject.toml +++ b/integrations/python/dataloader/pyproject.toml @@ -33,7 +33,7 @@ module = ["datafusion.*", "pyiceberg.*", "pyarrow.*", "tenacity.*"] ignore_missing_imports = true [tool.uv.sources] -pyiceberg = { git = "https://github.com/sumedhsakdeo/iceberg-python", branch = "fix/arrow-scan-benchmark-3036" } +pyiceberg = { git = "https://github.com/sumedhsakdeo/iceberg-python", rev = "75ba28bfc6d8bbeac398357c6db80327632a2dc8" } [tool.ruff] line-length = 120 diff --git a/integrations/python/dataloader/uv.lock b/integrations/python/dataloader/uv.lock index 5108decf4..197e6b204 100644 --- a/integrations/python/dataloader/uv.lock +++ b/integrations/python/dataloader/uv.lock @@ -571,7 +571,7 @@ dev = [ requires-dist = [ { name = "datafusion", specifier = "==51.0.0" }, { name = "mypy", marker = "extra == 'dev'", specifier = ">=1.14.0" }, - { name = "pyiceberg", git = "https://github.com/sumedhsakdeo/iceberg-python?branch=fix%2Farrow-scan-benchmark-3036" }, + { name = "pyiceberg", git = "https://github.com/sumedhsakdeo/iceberg-python?rev=75ba28bfc6d8bbeac398357c6db80327632a2dc8" }, { name = "pytest", marker = "extra == 'dev'", specifier = ">=8.0.0" }, { name = "requests", specifier = ">=2.31.0" }, { name = "responses", marker = "extra == 'dev'", specifier = ">=0.25.0" }, @@ -759,7 +759,7 @@ wheels = [ [[package]] name = "pyiceberg" version = "0.11.0" -source = { git = "https://github.com/sumedhsakdeo/iceberg-python?branch=fix%2Farrow-scan-benchmark-3036#75ba28bfc6d8bbeac398357c6db80327632a2dc8" } +source = { git = "https://github.com/sumedhsakdeo/iceberg-python?rev=75ba28bfc6d8bbeac398357c6db80327632a2dc8#75ba28bfc6d8bbeac398357c6db80327632a2dc8" } dependencies = [ { name = "cachetools" }, { name = "click" }, From 155da8eb04d8004175c5599a25ba6ebfa61c4f04 Mon Sep 17 00:00:00 2001 From: Christian Bush Date: Mon, 9 Mar 2026 18:06:47 -0700 Subject: [PATCH 3/7] Use PEP 508 direct reference for pyiceberg instead of uv.sources Replace [tool.uv.sources] override with a PEP 508 direct reference in dependencies so the fork SHA is baked into the published wheel metadata, not just the dev environment. This ensures both openhouse-dataloader and lipy-openhouse resolve pyiceberg from the same fork commit. Co-Authored-By: Sumedh Sakdeo --- integrations/python/dataloader/pyproject.toml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/integrations/python/dataloader/pyproject.toml b/integrations/python/dataloader/pyproject.toml index efda7fe11..01665135a 100644 --- a/integrations/python/dataloader/pyproject.toml +++ b/integrations/python/dataloader/pyproject.toml @@ -2,6 +2,9 @@ requires = ["hatchling", "hatch-vcs"] build-backend = "hatchling.build" +[tool.hatch.metadata] +allow-direct-references = true + [project] name = "openhouse.dataloader" dynamic = ["version"] @@ -10,7 +13,7 @@ readme = "README.md" requires-python = ">=3.12" license = {text = "BSD-2-Clause"} keywords = ["openhouse", "data-loader", "lakehouse", "iceberg", "datafusion"] -dependencies = ["datafusion==51.0.0", "pyiceberg~=0.11.0", "requests>=2.31.0", "tenacity>=8.0.0"] +dependencies = ["datafusion==51.0.0", "pyiceberg @ git+https://github.com/sumedhsakdeo/iceberg-python@75ba28bfc6d8bbeac398357c6db80327632a2dc8", "requests>=2.31.0", "tenacity>=8.0.0"] [project.optional-dependencies] dev = ["responses>=0.25.0", "ruff>=0.9.0", "pytest>=8.0.0", "twine>=6.0.0", "mypy>=1.14.0", "types-requests>=2.31.0"] @@ -32,9 +35,6 @@ disallow_untyped_defs = true module = ["datafusion.*", "pyiceberg.*", "pyarrow.*", "tenacity.*"] ignore_missing_imports = true -[tool.uv.sources] -pyiceberg = { git = "https://github.com/sumedhsakdeo/iceberg-python", rev = "75ba28bfc6d8bbeac398357c6db80327632a2dc8" } - [tool.ruff] line-length = 120 target-version = "py312" From f079ab0c9a28da1dca973ea2b6a18e9d254c7287 Mon Sep 17 00:00:00 2001 From: Christian Bush Date: Tue, 10 Mar 2026 13:44:43 -0700 Subject: [PATCH 4/7] [DataLoader] Add batch_size parameter for intra-file streaming Use ArrivalOrder(concurrent_streams=1) from pyiceberg PR #3046 to stream RecordBatches incrementally instead of materializing entire files into memory. The new batch_size parameter controls rows per batch, preventing OOM on large files in distributed workers. --- .../src/openhouse/dataloader/data_loader.py | 6 ++ .../openhouse/dataloader/data_loader_split.py | 12 ++- .../dataloader/tests/test_data_loader.py | 73 +++++++++++++++++++ .../tests/test_data_loader_split.py | 46 ++++++++++++ 4 files changed, 134 insertions(+), 3 deletions(-) diff --git a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py index fdeb2fcde..ad92e6cb9 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py @@ -78,6 +78,7 @@ def __init__( filters: Filter | None = None, context: DataLoaderContext | None = None, max_attempts: int = 3, + batch_size: int | None = None, ): """ Args: @@ -90,6 +91,9 @@ def __init__( filters: Row filter expression, defaults to always_true() (all rows) context: Data loader context max_attempts: Total number of attempts including the initial try (default 3) + batch_size: Number of rows per RecordBatch yielded by each split. + Controls memory usage per worker — smaller values reduce peak memory + but increase per-batch overhead. None uses the PyArrow default (~131K rows). """ self._catalog = catalog self._table_id = TableIdentifier(database, table, branch) @@ -98,6 +102,7 @@ def __init__( self._filters = filters if filters is not None else always_true() self._context = context or DataLoaderContext() self._max_attempts = max_attempts + self._batch_size = batch_size @cached_property def _iceberg_table(self) -> Table: @@ -163,4 +168,5 @@ def __iter__(self) -> Iterator[DataLoaderSplit]: yield DataLoaderSplit( file_scan_task=scan_task, scan_context=scan_context, + batch_size=self._batch_size, ) diff --git a/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py b/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py index 5ce1266c7..0bb64b1df 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py @@ -9,7 +9,7 @@ from datafusion.substrait import Producer from pyarrow import RecordBatch from pyiceberg.io.pyarrow import ArrowScan -from pyiceberg.table import FileScanTask +from pyiceberg.table import ArrivalOrder, FileScanTask from openhouse.dataloader._table_scan_context import TableScanContext from openhouse.dataloader.udf_registry import NoOpRegistry, UDFRegistry @@ -25,10 +25,12 @@ def __init__( plan: LogicalPlan | None = None, session_context: SessionContext | None = None, udf_registry: UDFRegistry | None = None, + batch_size: int | None = None, ): self._file_scan_task = file_scan_task self._udf_registry = udf_registry or NoOpRegistry() self._scan_context = scan_context + self._batch_size = batch_size if (plan is None) != (session_context is None): raise ValueError("plan and session_context must both be provided or both be None") @@ -59,7 +61,8 @@ def __iter__(self) -> Iterator[RecordBatch]: """Reads the file scan task and yields Arrow RecordBatches. Uses PyIceberg's ArrowScan to handle format dispatch, schema resolution, - delete files, and partition spec lookups. + delete files, and partition spec lookups. Batches are streamed + incrementally (not materialized into memory) via ArrivalOrder. """ ctx = self._scan_context arrow_scan = ArrowScan( @@ -68,4 +71,7 @@ def __iter__(self) -> Iterator[RecordBatch]: projected_schema=ctx.projected_schema, row_filter=ctx.row_filter, ) - yield from arrow_scan.to_record_batches([self._file_scan_task]) + yield from arrow_scan.to_record_batches( + [self._file_scan_task], + order=ArrivalOrder(concurrent_streams=1, batch_size=self._batch_size), + ) diff --git a/integrations/python/dataloader/tests/test_data_loader.py b/integrations/python/dataloader/tests/test_data_loader.py index b092c1c0d..a7958b706 100644 --- a/integrations/python/dataloader/tests/test_data_loader.py +++ b/integrations/python/dataloader/tests/test_data_loader.py @@ -322,3 +322,76 @@ def test_snapshot_id_with_columns_and_filters(tmp_path): assert scan_kwargs["snapshot_id"] == 99 assert scan_kwargs["selected_fields"] == (COL_ID,) assert "row_filter" in scan_kwargs + + +# --- batch_size tests --- + + +def test_batch_size_default_returns_all_data(tmp_path): + """Without batch_size, all data is returned correctly (backwards compatibility).""" + catalog = _make_real_catalog(tmp_path) + + loader = OpenHouseDataLoader(catalog=catalog, database="db", table="tbl") + result = _materialize(loader) + + assert result.num_rows == 3 + result = result.sort_by(COL_ID) + assert result.column(COL_ID).to_pylist() == TEST_DATA[COL_ID] + + +def test_batch_size_limits_rows_per_batch(tmp_path): + """When batch_size is set, each RecordBatch has at most batch_size rows.""" + many_rows = { + COL_ID: list(range(100)), + COL_NAME: [f"name_{i}" for i in range(100)], + COL_VALUE: [float(i) for i in range(100)], + } + catalog = _make_real_catalog(tmp_path, data=many_rows) + + loader = OpenHouseDataLoader(catalog=catalog, database="db", table="tbl", batch_size=10) + batches = [batch for split in loader for batch in split] + + assert len(batches) >= 2, "Expected multiple batches with batch_size=10 and 100 rows" + for batch in batches: + assert batch.num_rows <= 10, f"Batch has {batch.num_rows} rows, expected at most 10" + + total_rows = sum(b.num_rows for b in batches) + assert total_rows == 100 + + +def test_batch_size_returns_correct_data(tmp_path): + """batch_size controls chunking but doesn't alter the data returned.""" + catalog = _make_real_catalog(tmp_path) + + loader = OpenHouseDataLoader(catalog=catalog, database="db", table="tbl", batch_size=1) + result = _materialize(loader) + + assert result.num_rows == 3 + result = result.sort_by(COL_ID) + assert result.column(COL_ID).to_pylist() == TEST_DATA[COL_ID] + assert result.column(COL_NAME).to_pylist() == TEST_DATA[COL_NAME] + assert result.column(COL_VALUE).to_pylist() == TEST_DATA[COL_VALUE] + + +def test_batch_size_with_columns_and_filters(tmp_path): + """batch_size works alongside column selection and row filters.""" + catalog = _make_real_catalog(tmp_path) + + loader = OpenHouseDataLoader( + catalog=catalog, database="db", table="tbl", columns=[COL_ID], filters=col(COL_ID) == 1, batch_size=1 + ) + result = _materialize(loader) + + assert result.num_rows == 1 + assert set(result.column_names) == {COL_ID} + assert result.column(COL_ID).to_pylist() == [1] + + +def test_batch_size_with_empty_table(tmp_path): + """batch_size on an empty table yields no batches.""" + catalog = _make_real_catalog(tmp_path, data=EMPTY_DATA) + + loader = OpenHouseDataLoader(catalog=catalog, database="db", table="tbl", batch_size=10) + result = _materialize(loader) + + assert result.num_rows == 0 diff --git a/integrations/python/dataloader/tests/test_data_loader_split.py b/integrations/python/dataloader/tests/test_data_loader_split.py index f7b609659..530a616e6 100644 --- a/integrations/python/dataloader/tests/test_data_loader_split.py +++ b/integrations/python/dataloader/tests/test_data_loader_split.py @@ -31,6 +31,7 @@ def _create_test_split( iceberg_schema: Schema, io_properties: dict[str, str] | None = None, filename: str | None = None, + batch_size: int | None = None, ) -> DataLoaderSplit: """Create a DataLoaderSplit for testing by writing data to disk. @@ -91,6 +92,7 @@ def _create_test_split( session_context=ctx, file_scan_task=task, scan_context=scan_context, + batch_size=batch_size, ) @@ -345,3 +347,47 @@ def _to_substrait(plan, ctx): mock_udf_registry.register_udfs.assert_called_once_with(session_context) producer.assert_called_once_with(mock_plan, session_context) assert split._plan_substrait_bytes == b"serialized-plan" + + +# --- batch_size tests --- + +_BATCH_SCHEMA = Schema( + NestedField(field_id=1, name="id", field_type=LongType(), required=False), +) + + +def _make_large_table(num_rows: int) -> pa.Table: + return pa.table({"id": pa.array(list(range(num_rows)), type=pa.int64())}) + + +def test_split_batch_size_limits_rows_per_batch(tmp_path): + """When batch_size is set, each RecordBatch has at most that many rows.""" + table = _make_large_table(100) + split = _create_test_split(tmp_path, table, FileFormat.PARQUET, _BATCH_SCHEMA, batch_size=10) + + batches = list(split) + + assert len(batches) >= 2, "Expected multiple batches with batch_size=10 and 100 rows" + for batch in batches: + assert batch.num_rows <= 10 + assert sum(b.num_rows for b in batches) == 100 + + +def test_split_batch_size_none_returns_all_rows(tmp_path): + """Default batch_size (None) returns all data correctly.""" + table = _make_large_table(50) + split = _create_test_split(tmp_path, table, FileFormat.PARQUET, _BATCH_SCHEMA) + + result = pa.Table.from_batches(list(split)) + assert result.num_rows == 50 + assert sorted(result.column("id").to_pylist()) == list(range(50)) + + +def test_split_batch_size_preserves_data(tmp_path): + """batch_size controls chunking but all data is preserved.""" + table = _make_large_table(25) + split = _create_test_split(tmp_path, table, FileFormat.PARQUET, _BATCH_SCHEMA, batch_size=7) + + result = pa.Table.from_batches(list(split)) + assert result.num_rows == 25 + assert sorted(result.column("id").to_pylist()) == list(range(25)) From cdf6ea7c5b17043b48d524f1d65fe7f1c334069f Mon Sep 17 00:00:00 2001 From: Christian Bush Date: Tue, 10 Mar 2026 14:28:11 -0700 Subject: [PATCH 5/7] Clean up test helpers: rename and split factory functions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Rename _make_large_table → _make_table in test_data_loader_split.py - Split _make_arrow_scan_and_task into _make_arrow_scan and _make_file_scan_task in test_arrival_order.py --- .../dataloader/tests/test_arrival_order.py | 29 +++++++++++-------- .../tests/test_data_loader_split.py | 8 ++--- 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/integrations/python/dataloader/tests/test_arrival_order.py b/integrations/python/dataloader/tests/test_arrival_order.py index 17033e22c..21e3870f2 100644 --- a/integrations/python/dataloader/tests/test_arrival_order.py +++ b/integrations/python/dataloader/tests/test_arrival_order.py @@ -35,10 +35,7 @@ def _write_parquet(tmp_path: object, table: pa.Table) -> str: return file_path -def _make_arrow_scan_and_task(tmp_path: object, table: pa.Table) -> tuple[ArrowScan, FileScanTask]: - """Create an ArrowScan and FileScanTask from a PyArrow table written to disk.""" - file_path = _write_parquet(tmp_path, table) - +def _make_arrow_scan(tmp_path: object, file_path: str) -> ArrowScan: metadata = new_table_metadata( schema=_SCHEMA, partition_spec=UNPARTITIONED_PARTITION_SPEC, @@ -46,14 +43,15 @@ def _make_arrow_scan_and_task(tmp_path: object, table: pa.Table) -> tuple[ArrowS location=str(tmp_path), properties={}, ) - - arrow_scan = ArrowScan( + return ArrowScan( table_metadata=metadata, io=load_file_io(properties={}, location=file_path), projected_schema=_SCHEMA, row_filter=AlwaysTrue(), ) + +def _make_file_scan_task(file_path: str, table: pa.Table) -> FileScanTask: data_file = DataFile.from_args( file_path=file_path, file_format=FileFormat.PARQUET, @@ -61,9 +59,7 @@ def _make_arrow_scan_and_task(tmp_path: object, table: pa.Table) -> tuple[ArrowS file_size_in_bytes=os.path.getsize(file_path), ) data_file._spec_id = 0 - task = FileScanTask(data_file=data_file) - - return arrow_scan, task + return FileScanTask(data_file=data_file) def _sample_table() -> pa.Table: @@ -113,19 +109,28 @@ class TestToRecordBatchesOrder: def test_default_order_returns_all_rows(self, tmp_path: object) -> None: """Default (TaskOrder) still works — backward compatible.""" - arrow_scan, task = _make_arrow_scan_and_task(tmp_path, _sample_table()) + table = _sample_table() + file_path = _write_parquet(tmp_path, table) + arrow_scan = _make_arrow_scan(tmp_path, file_path) + task = _make_file_scan_task(file_path, table) batches = list(arrow_scan.to_record_batches([task])) result = pa.Table.from_batches(batches).sort_by("id") assert result.column("id").to_pylist() == [1, 2, 3] def test_explicit_task_order_returns_all_rows(self, tmp_path: object) -> None: - arrow_scan, task = _make_arrow_scan_and_task(tmp_path, _sample_table()) + table = _sample_table() + file_path = _write_parquet(tmp_path, table) + arrow_scan = _make_arrow_scan(tmp_path, file_path) + task = _make_file_scan_task(file_path, table) batches = list(arrow_scan.to_record_batches([task], order=TaskOrder())) result = pa.Table.from_batches(batches).sort_by("id") assert result.column("id").to_pylist() == [1, 2, 3] def test_arrival_order_returns_all_rows(self, tmp_path: object) -> None: - arrow_scan, task = _make_arrow_scan_and_task(tmp_path, _sample_table()) + table = _sample_table() + file_path = _write_parquet(tmp_path, table) + arrow_scan = _make_arrow_scan(tmp_path, file_path) + task = _make_file_scan_task(file_path, table) batches = list(arrow_scan.to_record_batches([task], order=ArrivalOrder(concurrent_streams=2))) result = pa.Table.from_batches(batches).sort_by("id") assert result.column("id").to_pylist() == [1, 2, 3] diff --git a/integrations/python/dataloader/tests/test_data_loader_split.py b/integrations/python/dataloader/tests/test_data_loader_split.py index 530a616e6..bdb7440a9 100644 --- a/integrations/python/dataloader/tests/test_data_loader_split.py +++ b/integrations/python/dataloader/tests/test_data_loader_split.py @@ -356,13 +356,13 @@ def _to_substrait(plan, ctx): ) -def _make_large_table(num_rows: int) -> pa.Table: +def _make_table(num_rows: int) -> pa.Table: return pa.table({"id": pa.array(list(range(num_rows)), type=pa.int64())}) def test_split_batch_size_limits_rows_per_batch(tmp_path): """When batch_size is set, each RecordBatch has at most that many rows.""" - table = _make_large_table(100) + table = _make_table(100) split = _create_test_split(tmp_path, table, FileFormat.PARQUET, _BATCH_SCHEMA, batch_size=10) batches = list(split) @@ -375,7 +375,7 @@ def test_split_batch_size_limits_rows_per_batch(tmp_path): def test_split_batch_size_none_returns_all_rows(tmp_path): """Default batch_size (None) returns all data correctly.""" - table = _make_large_table(50) + table = _make_table(50) split = _create_test_split(tmp_path, table, FileFormat.PARQUET, _BATCH_SCHEMA) result = pa.Table.from_batches(list(split)) @@ -385,7 +385,7 @@ def test_split_batch_size_none_returns_all_rows(tmp_path): def test_split_batch_size_preserves_data(tmp_path): """batch_size controls chunking but all data is preserved.""" - table = _make_large_table(25) + table = _make_table(25) split = _create_test_split(tmp_path, table, FileFormat.PARQUET, _BATCH_SCHEMA, batch_size=7) result = pa.Table.from_batches(list(split)) From ecee2932f8bebe509b26048367f4c12471a70214 Mon Sep 17 00:00:00 2001 From: Christian Bush Date: Tue, 10 Mar 2026 14:47:34 -0700 Subject: [PATCH 6/7] Clarify batch_size docstring with PyArrow Scanner detail --- .../dataloader/src/openhouse/dataloader/data_loader.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py index ad92e6cb9..23dd603df 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py @@ -91,9 +91,10 @@ def __init__( filters: Row filter expression, defaults to always_true() (all rows) context: Data loader context max_attempts: Total number of attempts including the initial try (default 3) - batch_size: Number of rows per RecordBatch yielded by each split. - Controls memory usage per worker — smaller values reduce peak memory - but increase per-batch overhead. None uses the PyArrow default (~131K rows). + batch_size: Maximum number of rows per RecordBatch yielded by each split. + Passed to PyArrow's Scanner which produces batches of at most this many + rows. Smaller values reduce peak memory but increase per-batch overhead. + None uses the PyArrow default (~131K rows). """ self._catalog = catalog self._table_id = TableIdentifier(database, table, branch) From 827f1d9d6d2ded0e58b976f394aa6def318d5677 Mon Sep 17 00:00:00 2001 From: Christian Bush Date: Tue, 10 Mar 2026 20:24:31 -0700 Subject: [PATCH 7/7] Address review feedback on batch_size tests and docstring - Remove ArrivalOrder from __iter__ docstring, use reviewer's suggested wording about bounded memory (data_loader_split.py) - Replace data-verification batch_size tests in test_data_loader.py with passthrough tests that only check batch_size is forwarded to splits - Add batch_size to integration tests with batch count assertions --- .../openhouse/dataloader/data_loader_split.py | 4 +- .../dataloader/tests/integration_tests.py | 12 ++- .../dataloader/tests/test_data_loader.py | 74 ++++--------------- 3 files changed, 24 insertions(+), 66 deletions(-) diff --git a/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py b/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py index 0bb64b1df..4a037dc44 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py @@ -61,8 +61,8 @@ def __iter__(self) -> Iterator[RecordBatch]: """Reads the file scan task and yields Arrow RecordBatches. Uses PyIceberg's ArrowScan to handle format dispatch, schema resolution, - delete files, and partition spec lookups. Batches are streamed - incrementally (not materialized into memory) via ArrivalOrder. + delete files, and partition spec lookups. The number of batches loaded + into memory at once is bounded to prevent using too much memory at once. """ ctx = self._scan_context arrow_scan = ArrowScan( diff --git a/integrations/python/dataloader/tests/integration_tests.py b/integrations/python/dataloader/tests/integration_tests.py index 9d859ca32..dca7cba76 100644 --- a/integrations/python/dataloader/tests/integration_tests.py +++ b/integrations/python/dataloader/tests/integration_tests.py @@ -166,14 +166,18 @@ def read_token() -> str: snap1 = OpenHouseDataLoader(catalog=catalog, database=DATABASE_ID, table=TABLE_ID).snapshot_id assert snap1 is not None - # 4. Read all data and verify - loader = OpenHouseDataLoader(catalog=catalog, database=DATABASE_ID, table=TABLE_ID) - result = _read_all(loader) + # 4. Read all data with batch_size and verify batch count + loader = OpenHouseDataLoader(catalog=catalog, database=DATABASE_ID, table=TABLE_ID, batch_size=2) + batches = [batch for split in loader for batch in split] + assert len(batches) == 2, f"Expected 2 batches (3 rows, batch_size=2), got {len(batches)}" + for batch in batches: + assert batch.num_rows <= 2 + result = pa.concat_tables([pa.Table.from_batches([b]) for b in batches]).sort_by(COL_ID) assert result.num_rows == 3 assert result.column(COL_ID).to_pylist() == [1, 2, 3] assert result.column(COL_NAME).to_pylist() == ["alice", "bob", "charlie"] assert result.column(COL_SCORE).to_pylist() == [1.1, 2.2, 3.3] - print(f"PASS: read all {result.num_rows} rows") + print(f"PASS: read all {result.num_rows} rows in {len(batches)} batches (batch_size=2)") # 5a. Row filter loader = OpenHouseDataLoader(catalog=catalog, database=DATABASE_ID, table=TABLE_ID, filters=col(COL_ID) > 1) diff --git a/integrations/python/dataloader/tests/test_data_loader.py b/integrations/python/dataloader/tests/test_data_loader.py index a7958b706..7862a7f35 100644 --- a/integrations/python/dataloader/tests/test_data_loader.py +++ b/integrations/python/dataloader/tests/test_data_loader.py @@ -327,71 +327,25 @@ def test_snapshot_id_with_columns_and_filters(tmp_path): # --- batch_size tests --- -def test_batch_size_default_returns_all_data(tmp_path): - """Without batch_size, all data is returned correctly (backwards compatibility).""" +def test_batch_size_forwarded_to_splits(tmp_path): + """batch_size is correctly passed through to each DataLoaderSplit.""" catalog = _make_real_catalog(tmp_path) - loader = OpenHouseDataLoader(catalog=catalog, database="db", table="tbl") - result = _materialize(loader) - - assert result.num_rows == 3 - result = result.sort_by(COL_ID) - assert result.column(COL_ID).to_pylist() == TEST_DATA[COL_ID] - - -def test_batch_size_limits_rows_per_batch(tmp_path): - """When batch_size is set, each RecordBatch has at most batch_size rows.""" - many_rows = { - COL_ID: list(range(100)), - COL_NAME: [f"name_{i}" for i in range(100)], - COL_VALUE: [float(i) for i in range(100)], - } - catalog = _make_real_catalog(tmp_path, data=many_rows) - - loader = OpenHouseDataLoader(catalog=catalog, database="db", table="tbl", batch_size=10) - batches = [batch for split in loader for batch in split] - - assert len(batches) >= 2, "Expected multiple batches with batch_size=10 and 100 rows" - for batch in batches: - assert batch.num_rows <= 10, f"Batch has {batch.num_rows} rows, expected at most 10" + loader = OpenHouseDataLoader(catalog=catalog, database="db", table="tbl", batch_size=32768) + splits = list(loader) - total_rows = sum(b.num_rows for b in batches) - assert total_rows == 100 + assert len(splits) >= 1 + for split in splits: + assert split._batch_size == 32768 -def test_batch_size_returns_correct_data(tmp_path): - """batch_size controls chunking but doesn't alter the data returned.""" +def test_batch_size_default_is_none(tmp_path): + """Omitting batch_size defaults to None in each split.""" catalog = _make_real_catalog(tmp_path) - loader = OpenHouseDataLoader(catalog=catalog, database="db", table="tbl", batch_size=1) - result = _materialize(loader) - - assert result.num_rows == 3 - result = result.sort_by(COL_ID) - assert result.column(COL_ID).to_pylist() == TEST_DATA[COL_ID] - assert result.column(COL_NAME).to_pylist() == TEST_DATA[COL_NAME] - assert result.column(COL_VALUE).to_pylist() == TEST_DATA[COL_VALUE] - - -def test_batch_size_with_columns_and_filters(tmp_path): - """batch_size works alongside column selection and row filters.""" - catalog = _make_real_catalog(tmp_path) - - loader = OpenHouseDataLoader( - catalog=catalog, database="db", table="tbl", columns=[COL_ID], filters=col(COL_ID) == 1, batch_size=1 - ) - result = _materialize(loader) - - assert result.num_rows == 1 - assert set(result.column_names) == {COL_ID} - assert result.column(COL_ID).to_pylist() == [1] - - -def test_batch_size_with_empty_table(tmp_path): - """batch_size on an empty table yields no batches.""" - catalog = _make_real_catalog(tmp_path, data=EMPTY_DATA) - - loader = OpenHouseDataLoader(catalog=catalog, database="db", table="tbl", batch_size=10) - result = _materialize(loader) + loader = OpenHouseDataLoader(catalog=catalog, database="db", table="tbl") + splits = list(loader) - assert result.num_rows == 0 + assert len(splits) >= 1 + for split in splits: + assert split._batch_size is None