From 3f4813996f98fa0d037b103c1bd4460841f661ae Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 11 May 2026 01:41:35 +0000 Subject: [PATCH] arrow-util: canonicalize ranges decoded from Parquet MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The reader for range columns called `push_range_with`, which writes the range to the row verbatim. For discrete range types (int4/int8/date), Parquet files authored by external engines may encode ranges with non-canonical bounds — e.g. `[1,10]` for `int4range`, which MZ stores internally as `[1,11)`. Without canonicalization those rows do not compare or hash equal to logically-identical values constructed inside MZ, so `COPY FROM PARQUET` rows mismatch their pure-SQL counterparts. Switch the decode path to `push_range`, which canonicalizes the range before packing. Add a unit test that decodes a hand-built non-canonical arrow StructArray and a testdrive regression test that round-trips a DuckDB-authored Parquet file through `COPY FROM ... (FORMAT PARQUET)`. Fixes DB-55. https://claude.ai/code/session_01KXioUsT2r5o2tRwrsHm4iR --- src/arrow-util/src/reader.rs | 150 ++++++++++++++++++++++++----- test/iceberg/mzcompose.py | 16 +++ test/iceberg/range-noncanonical.td | 52 ++++++++++ 3 files changed, 193 insertions(+), 25 deletions(-) create mode 100644 test/iceberg/range-noncanonical.td diff --git a/src/arrow-util/src/reader.rs b/src/arrow-util/src/reader.rs index 95fe2017201ab..66cb381d7618a 100644 --- a/src/arrow-util/src/reader.rs +++ b/src/arrow-util/src/reader.rs @@ -948,43 +948,41 @@ impl ColReader { .map(|n| !n.is_valid(idx)) .unwrap_or(false); - // Read finite bounds into owned Rows so the packing closures - // handed to `push_range_with` can capture them by move. - let lower_bound = if lower_is_infinite { + // Read finite bounds into owned Rows that live for the rest of + // this block, so the Datums we borrow out of them stay valid + // for the `push_range` call below. + let lower_row = if lower_is_infinite { None } else { let mut temp = SharedRow::get(); lower.read(idx, &mut temp.packer())?; - let row = temp.clone(); - Some(move |packer: &mut RowPacker| -> Result<(), anyhow::Error> { - packer.push(row.unpack_first()); - Ok(()) - }) + Some(temp.clone()) }; - - let upper_bound = if upper_is_infinite { + let upper_row = if upper_is_infinite { None } else { let mut temp = SharedRow::get(); upper.read(idx, &mut temp.packer())?; - let row = temp.clone(); - Some(move |packer: &mut RowPacker| -> Result<(), anyhow::Error> { - packer.push(row.unpack_first()); - Ok(()) - }) + Some(temp.clone()) }; + let lower_bound = RangeLowerBound { + inclusive: lower_inclusive.value(idx), + bound: lower_row.as_ref().map(|row| row.unpack_first()), + }; + let upper_bound = RangeUpperBound { + inclusive: upper_inclusive.value(idx), + bound: upper_row.as_ref().map(|row| row.unpack_first()), + }; + + // Use `push_range` (not `push_range_with`) so the range is + // canonicalized before being packed. Parquet files authored by + // external engines may encode discrete ranges in non-canonical + // form (e.g. `[1,10]` for int4range, which MZ stores as + // `[1,11)`); without canonicalization those rows would not + // compare or hash equal to MZ-constructed values. packer - .push_range_with( - RangeLowerBound { - inclusive: lower_inclusive.value(idx), - bound: lower_bound, - }, - RangeUpperBound { - inclusive: upper_inclusive.value(idx), - bound: upper_bound, - }, - ) + .push_range(Range::new(Some((lower_bound, upper_bound)))) .context("pack range")?; return Ok(()); @@ -1184,4 +1182,106 @@ mod tests { let num = rnd_row.into_element().unwrap_numeric(); assert_eq!(num.0, Numeric::from(100000000.009f64)); } + + /// Regression test for database-issues#11330: when a Parquet file authored + /// by an external engine encodes a discrete range in non-canonical form + /// (e.g. `[1,10]` for `int4range`), the reader must canonicalize it to MZ's + /// internal form (`[1,11)`). Otherwise rows ingested via `COPY FROM PARQUET` + /// don't compare or hash equal to logically-identical rows constructed + /// inside MZ. + #[mz_ore::test] + #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux` + fn range_canonicalizes_noncanonical_input() { + use arrow::array::ArrayRef; + use arrow::datatypes::DataType; + use mz_repr::adt::range::{Range, RangeLowerBound, RangeUpperBound}; + + let desc = RelationDesc::builder() + .with_column( + "r", + SqlScalarType::Range { + element_type: Box::new(SqlScalarType::Int32), + } + .nullable(true), + ) + .finish(); + + // Build a range StructArray containing two non-canonical encodings: + // row 0: `[1,10]` -> canonicalizes to `[1,11)` + // row 1: `(5,15)` -> canonicalizes to `[6,15)` + let lower = Int32Array::from(vec![Some(1), Some(5)]); + let upper = Int32Array::from(vec![Some(10), Some(15)]); + let lower_inclusive = BooleanArray::from(vec![true, false]); + let upper_inclusive = BooleanArray::from(vec![true, false]); + let empty = BooleanArray::from(vec![false, false]); + + #[allow(clippy::as_conversions)] + let range_fields: Vec<(Arc, ArrayRef)> = vec![ + ( + Arc::new(Field::new("lower", DataType::Int32, true)), + Arc::new(lower) as ArrayRef, + ), + ( + Arc::new(Field::new("upper", DataType::Int32, true)), + Arc::new(upper) as ArrayRef, + ), + ( + Arc::new(Field::new("lower_inclusive", DataType::Boolean, false)), + Arc::new(lower_inclusive) as ArrayRef, + ), + ( + Arc::new(Field::new("upper_inclusive", DataType::Boolean, false)), + Arc::new(upper_inclusive) as ArrayRef, + ), + ( + Arc::new(Field::new("empty", DataType::Boolean, false)), + Arc::new(empty) as ArrayRef, + ), + ]; + let range_struct = StructArray::from(range_fields); + + #[allow(clippy::as_conversions)] + let batch = StructArray::from(vec![( + Arc::new(Field::new("r", range_struct.data_type().clone(), true)), + Arc::new(range_struct) as ArrayRef, + )]); + + let reader = ArrowReader::new(&desc, batch).unwrap(); + + // Row 0: `[1,10]` -> `[1,11)` + let mut got = Row::default(); + reader.read(0, &mut got).unwrap(); + let mut want = Row::default(); + want.packer() + .push_range(Range::new(Some(( + RangeLowerBound { + inclusive: true, + bound: Some(Datum::Int32(1)), + }, + RangeUpperBound { + inclusive: true, + bound: Some(Datum::Int32(10)), + }, + )))) + .unwrap(); + assert_eq!(got, want, "row 0: [1,10] should canonicalize to [1,11)"); + + // Row 1: `(5,15)` -> `[6,15)` + let mut got = Row::default(); + reader.read(1, &mut got).unwrap(); + let mut want = Row::default(); + want.packer() + .push_range(Range::new(Some(( + RangeLowerBound { + inclusive: false, + bound: Some(Datum::Int32(5)), + }, + RangeUpperBound { + inclusive: false, + bound: Some(Datum::Int32(15)), + }, + )))) + .unwrap(); + assert_eq!(got, want, "row 1: (5,15) should canonicalize to [6,15)"); + } } diff --git a/test/iceberg/mzcompose.py b/test/iceberg/mzcompose.py index 78aa241741c5e..b631866a472ad 100644 --- a/test/iceberg/mzcompose.py +++ b/test/iceberg/mzcompose.py @@ -275,3 +275,19 @@ def workflow_large_upsert_batch(c: Composition) -> None: "--var=aws-endpoint=minio:9000", "large-upsert-batch.td", ) + + +def workflow_range_noncanonical(c: Composition) -> None: + """Regression test for database-issues#11330: COPY FROM PARQUET must + canonicalize range values reconstructed from external Parquet, otherwise + non-canonical encodings written by other engines land verbatim in MZ rows + and break equality against values constructed inside MZ. DuckDB authors the + Parquet file directly in minio so the bytes are not something MZ's sink + produced.""" + key = _setup(c) + + c.run_testdrive_files( + f"--var=s3-access-key={key}", + "--var=aws-endpoint=minio:9000", + "range-noncanonical.td", + ) diff --git a/test/iceberg/range-noncanonical.td b/test/iceberg/range-noncanonical.td new file mode 100644 index 0000000000000..da47d369f07ea --- /dev/null +++ b/test/iceberg/range-noncanonical.td @@ -0,0 +1,52 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# Regression test for database-issues#11330: COPY FROM PARQUET must canonicalize +# range values. External engines (here, DuckDB) author the Parquet file with +# non-canonical bounds for discrete ranges (e.g. `[1,10]` and `(5,15)` for +# int4range). MZ must rewrite these into its canonical half-open form on +# ingest, otherwise the resulting rows do not compare or hash equal to ranges +# constructed inside MZ. + +> CREATE SECRET access_key_secret AS '${arg.s3-access-key}' + +> CREATE CONNECTION aws_conn TO AWS ( + ACCESS KEY ID = 'tduser', + SECRET ACCESS KEY = SECRET access_key_secret, + ENDPOINT = 'http://${arg.aws-endpoint}', + REGION = 'us-east-1' + ); + +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM SET enable_copy_from_remote = true; + +# Author the Parquet file directly in minio with DuckDB, so the bytes are not +# something MZ's own sink produced. The struct shape mirrors MZ's range +# encoding (lower/upper/lower_inclusive/upper_inclusive/empty). +$ duckdb-execute name=ranges +CREATE SECRET s3_secret (TYPE S3, KEY_ID 'tduser', SECRET '${arg.s3-access-key}', ENDPOINT '${arg.aws-endpoint}', URL_STYLE 'path', USE_SSL false, REGION 'minio'); +COPY (SELECT * FROM (VALUES (1, {'lower': 1::INTEGER, 'upper': 10::INTEGER, 'lower_inclusive': true, 'upper_inclusive': true, 'empty': false}), (2, {'lower': 5, 'upper': 15, 'lower_inclusive': false, 'upper_inclusive': false, 'empty': false})) t(id, r) ORDER BY id) TO 's3://test-bucket/noncanon_ranges/data.parquet' (FORMAT PARQUET); + +> CREATE TABLE ranges(id int, r int4range); + +> COPY INTO ranges FROM 's3://test-bucket/noncanon_ranges' (FORMAT PARQUET, AWS CONNECTION = aws_conn); + +# DuckDB's `[1,10]` and `(5,15)` must arrive as MZ's canonical `[1,11)` and +# `[6,15)`. +> SELECT id, r::text FROM ranges ORDER BY id; +1 [1,11) +2 [6,15) + +# Once canonicalized, the rows must compare equal to logically-identical +# values constructed inside MZ. +> SELECT r = '[1,10]'::int4range FROM ranges WHERE id = 1; +true + +> SELECT r = '(5,15)'::int4range FROM ranges WHERE id = 2; +true