Skip to content
24 changes: 22 additions & 2 deletions src/arrow-util/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -671,8 +671,9 @@ fn builder_for_datatype(
fields.len()
)
}
let key_builder = StringBuilder::with_capacity(item_capacity, data_capacity);
let key_field = &fields[0];
let value_field = &fields[1];
let key_builder = StringBuilder::with_capacity(item_capacity, data_capacity);
let value_builder = ArrowColumn::new(
value_field.name().clone(),
value_field.is_nullable(),
Expand All @@ -681,13 +682,27 @@ fn builder_for_datatype(
item_capacity,
data_capacity,
)?;
// Use the names from the schema's entries struct rather than
// arrow-rs's defaults (`entries`/`keys`/`values`) — when the
// schema came from Iceberg (`key_value`/`key`/`value`) the
// RecordBatch validation rejects the mismatched DataType.
let field_names = MapFieldNames {
entry: entries_field.name().clone(),
key: key_field.name().clone(),
value: value_field.name().clone(),
};
// Forward both inner fields so any metadata the schema set
// (e.g. Iceberg's PARQUET:field_id) survives onto the
// MapArray's nested fields; otherwise RecordBatch::try_new
// rejects the batch as schema-mismatched.
ColBuilder::MapBuilder(Box::new(
MapBuilder::with_capacity(
Some(MapFieldNames::default()),
Some(field_names),
key_builder,
value_builder,
item_capacity,
)
.with_keys_field(Arc::clone(key_field))
.with_values_field(Arc::clone(value_field)),
))
} else {
Expand Down Expand Up @@ -937,6 +952,11 @@ impl ArrowColumn {
(ColBuilder::Int32Builder(builder), Datum::UInt16(i)) => {
builder.append_value(i32::from(i))
}
// Lossless signed-to-signed widening for destinations that don't
// support narrow integers (e.g., Iceberg has no smallint).
(ColBuilder::Int32Builder(builder), Datum::Int16(i)) => {
builder.append_value(i32::from(i))
}
(ColBuilder::Int64Builder(builder), Datum::UInt32(i)) => {
builder.append_value(i64::from(i))
}
Expand Down
2 changes: 0 additions & 2 deletions src/sql/src/plan/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3321,8 +3321,6 @@ fn plan_sink(
| SqlScalarType::MzAclItem
| SqlScalarType::AclItem
| SqlScalarType::Int2Vector
// ranges
| SqlScalarType::Range { .. }
);
if !is_valid {
return Err(PlanError::IcebergSinkUnsupportedKeyType {
Expand Down
138 changes: 137 additions & 1 deletion src/storage/src/sink/iceberg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,11 @@ fn merge_field_metadata_recursive(
},
None => None,
};
let new_entries = merge_field_metadata_recursive(iceberg_entries, mz_entries)?;
// The Iceberg arrow representation names map fields differently from
// Materialize (`key_value`/`key`/`value` vs `entries`/`keys`/`values`),
// so name-based matching on the entries struct would drop the value
// field's extension metadata. Merge the entries struct positionally.
let new_entries = merge_map_entries_metadata(iceberg_entries, mz_entries)?;
DataType::Map(Arc::new(new_entries), *sorted)
}
other => other.clone(),
Expand All @@ -665,6 +669,73 @@ fn merge_field_metadata_recursive(
.with_metadata(metadata))
}

/// Merge metadata into a Map's entries struct, matching key/value positionally.
///
/// Iceberg's arrow representation names map fields `key_value`/`key`/`value`,
/// while Materialize uses `entries`/`keys`/`values`. Name-based matching would
/// drop the materialize extension metadata for the value field, which then
/// causes `ArrowBuilder` to fail with "Field 'value' missing extension metadata".
///
/// Positional matching is safe because the Arrow spec defines Map structurally,
/// not by field name: `List<entries: Struct<key: K, value: V>>` with exactly
/// two struct children — key first, value second — and the names are only
/// conventional. See `Map` in apache/arrow `format/Schema.fbs`:
/// <https://github.com/apache/arrow/blob/main/format/Schema.fbs> — "The names
/// of the child fields may be respectively 'entries', 'key', and 'value', but
/// this is not enforced."
///
/// Future cleanup: we could instead align Materialize's arrow map field names
/// with the Parquet/Iceberg convention (`key_value`/`key`/`value`) in
/// `mz_arrow_util::builder::scalar_to_arrow_datatype_impl` and drop this
/// positional helper. That would also affect `COPY TO S3 ... FORMAT = 'parquet'`
/// output schemas, so we'd need to confirm no downstream consumers depend on
/// the current `entries`/`keys`/`values` names before flipping.
fn merge_map_entries_metadata(
iceberg_entries: &Field,
mz_entries: Option<&Field>,
) -> anyhow::Result<Field> {
let mut metadata = iceberg_entries.metadata().clone();
if let Some(mz_f) = mz_entries {
if let Some(extension_name) = mz_f.metadata().get(ARROW_EXTENSION_NAME_KEY) {
metadata.insert(ARROW_EXTENSION_NAME_KEY.to_string(), extension_name.clone());
}
}

let iceberg_fields = match iceberg_entries.data_type() {
DataType::Struct(fields) => fields,
other => anyhow::bail!(
"Iceberg map entries field '{}' is not a Struct: {:?}",
iceberg_entries.name(),
other
),
};
let mz_fields = match mz_entries.map(|f| f.data_type()) {
Some(DataType::Struct(fields)) => Some(fields),
Some(other) => anyhow::bail!(
"Materialize map entries field '{}' is not a Struct: {:?}",
mz_entries.map(|f| f.name().as_str()).unwrap_or(""),
other
),
None => None,
};

let new_fields: Vec<Field> = iceberg_fields
.iter()
.enumerate()
.map(|(idx, iceberg_inner)| {
let mz_inner = mz_fields.and_then(|fields| fields.get(idx));
merge_field_metadata_recursive(iceberg_inner, mz_inner.map(|f| f.as_ref()))
})
.collect::<anyhow::Result<Vec<_>>>()?;

Ok(Field::new(
iceberg_entries.name(),
DataType::Struct(new_fields.into()),
iceberg_entries.is_nullable(),
)
.with_metadata(metadata))
}

async fn reload_table(
catalog: &dyn Catalog,
namespace: String,
Expand Down Expand Up @@ -1997,6 +2068,71 @@ mod tests {
assert_eq!(equality_ids, vec![expected_id]);
assert_ne!(expected_id, 2);
}

/// Regression test: iceberg-rust names map fields `key_value`/`key`/`value`
/// while Materialize uses `entries`/`keys`/`values`. The schema merge must
/// still copy the value field's extension metadata across so ArrowBuilder
/// can build the inner builder.
#[mz_ore::test]
#[allow(clippy::disallowed_types)]
fn merge_map_entries_preserves_value_extension_metadata() {
use std::collections::HashMap;

let mz_value_metadata = HashMap::from([(
ARROW_EXTENSION_NAME_KEY.to_string(),
"materialize.v1.string".to_string(),
)]);
let mz_entries = Field::new(
"entries",
DataType::Struct(
vec![
Field::new("keys", DataType::Utf8, false),
Field::new("values", DataType::Utf8, true).with_metadata(mz_value_metadata),
]
.into(),
),
false,
);
let mz_map = Field::new("m", DataType::Map(Arc::new(mz_entries), false), true)
.with_metadata(HashMap::from([(
ARROW_EXTENSION_NAME_KEY.to_string(),
"materialize.v1.map".to_string(),
)]));

let iceberg_entries = Field::new(
"key_value",
DataType::Struct(
vec![
Field::new("key", DataType::Utf8, false),
Field::new("value", DataType::Utf8, true),
]
.into(),
),
false,
);
let iceberg_map = Field::new("m", DataType::Map(Arc::new(iceberg_entries), false), true);

let merged = merge_field_metadata_recursive(&iceberg_map, Some(&mz_map))
.expect("merge should succeed");

let entries = match merged.data_type() {
DataType::Map(entries, _) => entries.as_ref(),
other => panic!("expected Map, got {other:?}"),
};
let entry_fields = match entries.data_type() {
DataType::Struct(fields) => fields,
other => panic!("expected Struct, got {other:?}"),
};
// Iceberg naming must be preserved on the merged schema...
assert_eq!(entry_fields[0].name(), "key");
assert_eq!(entry_fields[1].name(), "value");
// ...and the materialize extension must have been copied positionally
// to the value field even though its name didn't match `values`.
assert_eq!(
entry_fields[1].metadata().get(ARROW_EXTENSION_NAME_KEY),
Some(&"materialize.v1.string".to_string()),
);
}
}

/// Commit completed batches to Iceberg as snapshots.
Expand Down
12 changes: 9 additions & 3 deletions src/testdrive/ci/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@ COPY protobuf-include /usr/local/include
RUN chmod +x /usr/local/bin/protoc
ENV PROTOC=/usr/local/bin/protoc
ENV PROTOC_INCLUDE=/usr/local/include
# Install libduckdb.so that was downloaded
COPY duckdb-download/*/*/libduckdb.so /usr/local/lib/
RUN ldconfig
# Install libduckdb.so that was downloaded. The cache contains
# `<arch>-unknown-linux-gnu/<version>/libduckdb.so` for every architecture
# we've built against; copy them all in and then pick the one matching this
# image's arch. (A glob COPY collapses both files onto the same destination
# path and silently picks whichever sorts last, which is wrong on aarch64.)
COPY duckdb-download /tmp/duckdb-download
RUN cp /tmp/duckdb-download/$(uname -m)-unknown-linux-gnu/*/libduckdb.so /usr/local/lib/ \
&& rm -rf /tmp/duckdb-download \
&& ldconfig
ENV LD_LIBRARY_PATH=/usr/local/lib

WORKDIR /workdir
Expand Down
60 changes: 60 additions & 0 deletions test/iceberg/catalog.td
Original file line number Diff line number Diff line change
Expand Up @@ -270,3 +270,63 @@ SELECT id, r.lower, r.upper, r.lower_inclusive, r.upper_inclusive, r.empty FROM
2 6 15 true false false
3 <null> <null> false false true
4 <null> 100 false false false

# Test smallint columns
# Iceberg has no Int16, so smallint widens to Iceberg int (Int32) in the
# arrow/parquet schema. The Datum::Int16 -> Int32Builder promotion is what
# lets that conversion succeed end-to-end.
> CREATE TABLE smallints(id smallint, val smallint);

> INSERT INTO smallints VALUES (1, -32768), (2, 0), (3, 32767);

> CREATE SINK smallint_demo
FROM smallints
INTO ICEBERG CATALOG CONNECTION polaris (
NAMESPACE 'default_namespace',
TABLE 'smallint_table'
)
USING AWS CONNECTION aws_conn
KEY (id) NOT ENFORCED
MODE UPSERT
WITH (COMMIT INTERVAL '1s');

$ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration=10s

$ duckdb-query name=iceberg
SELECT id, val FROM iceberg_scan('s3://test-bucket/default_namespace/smallint_table') ORDER BY id
1 -32768
2 0
3 32767

# Test map columns
# Materialize's arrow schema for maps uses field names entries/keys/values,
# while iceberg-rust uses key_value/key/value. The metadata merge must align
# them positionally so the value field's extension metadata survives.
> CREATE TABLE attrs(id int, props map[text => text]);

> INSERT INTO attrs VALUES
(1, '{a=>foo,b=>bar}'),
(2, '{}'),
(3, '{key=>value}');

> CREATE SINK map_demo
FROM attrs
INTO ICEBERG CATALOG CONNECTION polaris (
NAMESPACE 'default_namespace',
TABLE 'map_table'
)
USING AWS CONNECTION aws_conn
KEY (id) NOT ENFORCED
MODE UPSERT
WITH (COMMIT INTERVAL '1s');

$ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration=10s

# Verify the map round-trips: cardinality + sorted keys/values. Avoid
# `props::VARCHAR` because the textual rendering of MAP differs across
# DuckDB versions.
$ duckdb-query name=iceberg
SELECT id, cardinality(props), list_sort(map_keys(props))::VARCHAR, list_sort(map_values(props))::VARCHAR FROM iceberg_scan('s3://test-bucket/default_namespace/map_table') ORDER BY id
1 2 [a, b] [bar, foo]
2 0 [] []
3 1 [key] [value]
17 changes: 17 additions & 0 deletions test/iceberg/key-validation.td
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,22 @@ contains:cannot be used as an Iceberg equality delete key
WITH (COMMIT INTERVAL '1s');
contains:cannot be used as an Iceberg equality delete key

# Range key — ranges lower to Iceberg structs, which iceberg-rust's
# RecordBatchProjector skips as nested, so they cannot be equality delete keys.
> CREATE TABLE key_range (k int4range, v text);

! CREATE SINK key_range_sink
FROM key_range
INTO ICEBERG CATALOG CONNECTION polaris (
NAMESPACE 'default_namespace',
TABLE 'key_range_table'
)
USING AWS CONNECTION aws_conn
KEY (k) NOT ENFORCED
MODE UPSERT
WITH (COMMIT INTERVAL '1s');
contains:cannot be used as an Iceberg equality delete key

# Non-key float column is fine
> CREATE TABLE val_float (k int, v float);

Expand All @@ -108,3 +124,4 @@ contains:cannot be used as an Iceberg equality delete key
> DROP TABLE key_double;
> DROP TABLE key_map;
> DROP TABLE key_list;
> DROP TABLE key_range;
Loading