Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 125 additions & 25 deletions src/arrow-util/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -948,43 +948,41 @@ impl ColReader {
.map(|n| !n.is_valid(idx))
.unwrap_or(false);

// Read finite bounds into owned Rows so the packing closures
// handed to `push_range_with` can capture them by move.
let lower_bound = if lower_is_infinite {
// Read finite bounds into owned Rows that live for the rest of
// this block, so the Datums we borrow out of them stay valid
// for the `push_range` call below.
let lower_row = if lower_is_infinite {
None
} else {
let mut temp = SharedRow::get();
lower.read(idx, &mut temp.packer())?;
let row = temp.clone();
Some(move |packer: &mut RowPacker| -> Result<(), anyhow::Error> {
packer.push(row.unpack_first());
Ok(())
})
Some(temp.clone())
};

let upper_bound = if upper_is_infinite {
let upper_row = if upper_is_infinite {
None
} else {
let mut temp = SharedRow::get();
upper.read(idx, &mut temp.packer())?;
let row = temp.clone();
Some(move |packer: &mut RowPacker| -> Result<(), anyhow::Error> {
packer.push(row.unpack_first());
Ok(())
})
Some(temp.clone())
};

let lower_bound = RangeLowerBound {
inclusive: lower_inclusive.value(idx),
bound: lower_row.as_ref().map(|row| row.unpack_first()),
};
let upper_bound = RangeUpperBound {
inclusive: upper_inclusive.value(idx),
bound: upper_row.as_ref().map(|row| row.unpack_first()),
};

// Use `push_range` (not `push_range_with`) so the range is
// canonicalized before being packed. Parquet files authored by
// external engines may encode discrete ranges in non-canonical
// form (e.g. `[1,10]` for int4range, which MZ stores as
// `[1,11)`); without canonicalization those rows would not
// compare or hash equal to MZ-constructed values.
packer
.push_range_with(
RangeLowerBound {
inclusive: lower_inclusive.value(idx),
bound: lower_bound,
},
RangeUpperBound {
inclusive: upper_inclusive.value(idx),
bound: upper_bound,
},
)
.push_range(Range::new(Some((lower_bound, upper_bound))))
.context("pack range")?;

return Ok(());
Expand Down Expand Up @@ -1184,4 +1182,106 @@ mod tests {
let num = rnd_row.into_element().unwrap_numeric();
assert_eq!(num.0, Numeric::from(100000000.009f64));
}

/// Regression test for database-issues#11330: when a Parquet file authored
/// by an external engine encodes a discrete range in non-canonical form
/// (e.g. `[1,10]` for `int4range`), the reader must canonicalize it to MZ's
/// internal form (`[1,11)`). Otherwise rows ingested via `COPY FROM PARQUET`
/// don't compare or hash equal to logically-identical rows constructed
/// inside MZ.
#[mz_ore::test]
#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
fn range_canonicalizes_noncanonical_input() {
use arrow::array::ArrayRef;
use arrow::datatypes::DataType;
use mz_repr::adt::range::{Range, RangeLowerBound, RangeUpperBound};

let desc = RelationDesc::builder()
.with_column(
"r",
SqlScalarType::Range {
element_type: Box::new(SqlScalarType::Int32),
}
.nullable(true),
)
.finish();

// Build a range StructArray containing two non-canonical encodings:
// row 0: `[1,10]` -> canonicalizes to `[1,11)`
// row 1: `(5,15)` -> canonicalizes to `[6,15)`
let lower = Int32Array::from(vec![Some(1), Some(5)]);
let upper = Int32Array::from(vec![Some(10), Some(15)]);
let lower_inclusive = BooleanArray::from(vec![true, false]);
let upper_inclusive = BooleanArray::from(vec![true, false]);
let empty = BooleanArray::from(vec![false, false]);

#[allow(clippy::as_conversions)]
let range_fields: Vec<(Arc<Field>, ArrayRef)> = vec![
(
Arc::new(Field::new("lower", DataType::Int32, true)),
Arc::new(lower) as ArrayRef,
),
(
Arc::new(Field::new("upper", DataType::Int32, true)),
Arc::new(upper) as ArrayRef,
),
(
Arc::new(Field::new("lower_inclusive", DataType::Boolean, false)),
Arc::new(lower_inclusive) as ArrayRef,
),
(
Arc::new(Field::new("upper_inclusive", DataType::Boolean, false)),
Arc::new(upper_inclusive) as ArrayRef,
),
(
Arc::new(Field::new("empty", DataType::Boolean, false)),
Arc::new(empty) as ArrayRef,
),
];
let range_struct = StructArray::from(range_fields);

#[allow(clippy::as_conversions)]
let batch = StructArray::from(vec![(
Arc::new(Field::new("r", range_struct.data_type().clone(), true)),
Arc::new(range_struct) as ArrayRef,
)]);

let reader = ArrowReader::new(&desc, batch).unwrap();

// Row 0: `[1,10]` -> `[1,11)`
let mut got = Row::default();
reader.read(0, &mut got).unwrap();
let mut want = Row::default();
want.packer()
.push_range(Range::new(Some((
RangeLowerBound {
inclusive: true,
bound: Some(Datum::Int32(1)),
},
RangeUpperBound {
inclusive: true,
bound: Some(Datum::Int32(10)),
},
))))
.unwrap();
assert_eq!(got, want, "row 0: [1,10] should canonicalize to [1,11)");

// Row 1: `(5,15)` -> `[6,15)`
let mut got = Row::default();
reader.read(1, &mut got).unwrap();
let mut want = Row::default();
want.packer()
.push_range(Range::new(Some((
RangeLowerBound {
inclusive: false,
bound: Some(Datum::Int32(5)),
},
RangeUpperBound {
inclusive: false,
bound: Some(Datum::Int32(15)),
},
))))
.unwrap();
assert_eq!(got, want, "row 1: (5,15) should canonicalize to [6,15)");
}
}
16 changes: 16 additions & 0 deletions test/iceberg/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,3 +275,19 @@ def workflow_large_upsert_batch(c: Composition) -> None:
"--var=aws-endpoint=minio:9000",
"large-upsert-batch.td",
)


def workflow_range_noncanonical(c: Composition) -> None:
"""Regression test for database-issues#11330: COPY FROM PARQUET must
canonicalize range values reconstructed from external Parquet, otherwise
non-canonical encodings written by other engines land verbatim in MZ rows
and break equality against values constructed inside MZ. DuckDB authors the
Parquet file directly in minio so the bytes are not something MZ's sink
produced."""
key = _setup(c)

c.run_testdrive_files(
f"--var=s3-access-key={key}",
"--var=aws-endpoint=minio:9000",
"range-noncanonical.td",
)
Comment on lines +287 to +293
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many workflows in test/iceberg do just this, we could put them all in a workflow_cdc instead that iterates over the files? But more of a test cleanup, not that important for this PR itself.

52 changes: 52 additions & 0 deletions test/iceberg/range-noncanonical.td
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

# Regression test for database-issues#11330: COPY FROM PARQUET must canonicalize
# range values. External engines (here, DuckDB) author the Parquet file with
# non-canonical bounds for discrete ranges (e.g. `[1,10]` and `(5,15)` for
# int4range). MZ must rewrite these into its canonical half-open form on
# ingest, otherwise the resulting rows do not compare or hash equal to ranges
# constructed inside MZ.

> CREATE SECRET access_key_secret AS '${arg.s3-access-key}'

> CREATE CONNECTION aws_conn TO AWS (
ACCESS KEY ID = 'tduser',
SECRET ACCESS KEY = SECRET access_key_secret,
ENDPOINT = 'http://${arg.aws-endpoint}',
REGION = 'us-east-1'
);

$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
ALTER SYSTEM SET enable_copy_from_remote = true;

# Author the Parquet file directly in minio with DuckDB, so the bytes are not
# something MZ's own sink produced. The struct shape mirrors MZ's range
# encoding (lower/upper/lower_inclusive/upper_inclusive/empty).
$ duckdb-execute name=ranges
CREATE SECRET s3_secret (TYPE S3, KEY_ID 'tduser', SECRET '${arg.s3-access-key}', ENDPOINT '${arg.aws-endpoint}', URL_STYLE 'path', USE_SSL false, REGION 'minio');
COPY (SELECT * FROM (VALUES (1, {'lower': 1::INTEGER, 'upper': 10::INTEGER, 'lower_inclusive': true, 'upper_inclusive': true, 'empty': false}), (2, {'lower': 5, 'upper': 15, 'lower_inclusive': false, 'upper_inclusive': false, 'empty': false})) t(id, r) ORDER BY id) TO 's3://test-bucket/noncanon_ranges/data.parquet' (FORMAT PARQUET);

> CREATE TABLE ranges(id int, r int4range);

> COPY INTO ranges FROM 's3://test-bucket/noncanon_ranges' (FORMAT PARQUET, AWS CONNECTION = aws_conn);

# DuckDB's `[1,10]` and `(5,15)` must arrive as MZ's canonical `[1,11)` and
# `[6,15)`.
> SELECT id, r::text FROM ranges ORDER BY id;
1 [1,11)
2 [6,15)

# Once canonicalized, the rows must compare equal to logically-identical
# values constructed inside MZ.
> SELECT r = '[1,10]'::int4range FROM ranges WHERE id = 1;
true

> SELECT r = '(5,15)'::int4range FROM ranges WHERE id = 2;
true
Loading