diff --git a/src/arrow-util/src/builder.rs b/src/arrow-util/src/builder.rs index 0934923771bf7..398b48947dacb 100644 --- a/src/arrow-util/src/builder.rs +++ b/src/arrow-util/src/builder.rs @@ -671,8 +671,9 @@ fn builder_for_datatype( fields.len() ) } - let key_builder = StringBuilder::with_capacity(item_capacity, data_capacity); + let key_field = &fields[0]; let value_field = &fields[1]; + let key_builder = StringBuilder::with_capacity(item_capacity, data_capacity); let value_builder = ArrowColumn::new( value_field.name().clone(), value_field.is_nullable(), @@ -681,13 +682,27 @@ fn builder_for_datatype( item_capacity, data_capacity, )?; + // Use the names from the schema's entries struct rather than + // arrow-rs's defaults (`entries`/`keys`/`values`) — when the + // schema came from Iceberg (`key_value`/`key`/`value`) the + // RecordBatch validation rejects the mismatched DataType. + let field_names = MapFieldNames { + entry: entries_field.name().clone(), + key: key_field.name().clone(), + value: value_field.name().clone(), + }; + // Forward both inner fields so any metadata the schema set + // (e.g. Iceberg's PARQUET:field_id) survives onto the + // MapArray's nested fields; otherwise RecordBatch::try_new + // rejects the batch as schema-mismatched. ColBuilder::MapBuilder(Box::new( MapBuilder::with_capacity( - Some(MapFieldNames::default()), + Some(field_names), key_builder, value_builder, item_capacity, ) + .with_keys_field(Arc::clone(key_field)) .with_values_field(Arc::clone(value_field)), )) } else { @@ -937,6 +952,11 @@ impl ArrowColumn { (ColBuilder::Int32Builder(builder), Datum::UInt16(i)) => { builder.append_value(i32::from(i)) } + // Lossless signed-to-signed widening for destinations that don't + // support narrow integers (e.g., Iceberg has no smallint). + (ColBuilder::Int32Builder(builder), Datum::Int16(i)) => { + builder.append_value(i32::from(i)) + } (ColBuilder::Int64Builder(builder), Datum::UInt32(i)) => { builder.append_value(i64::from(i)) } diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index e33c482907639..2a4ef95807c14 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -3321,8 +3321,6 @@ fn plan_sink( | SqlScalarType::MzAclItem | SqlScalarType::AclItem | SqlScalarType::Int2Vector - // ranges - | SqlScalarType::Range { .. } ); if !is_valid { return Err(PlanError::IcebergSinkUnsupportedKeyType { diff --git a/src/storage/src/sink/iceberg.rs b/src/storage/src/sink/iceberg.rs index 0f70b674f94a9..149fc48184c05 100644 --- a/src/storage/src/sink/iceberg.rs +++ b/src/storage/src/sink/iceberg.rs @@ -651,7 +651,11 @@ fn merge_field_metadata_recursive( }, None => None, }; - let new_entries = merge_field_metadata_recursive(iceberg_entries, mz_entries)?; + // The Iceberg arrow representation names map fields differently from + // Materialize (`key_value`/`key`/`value` vs `entries`/`keys`/`values`), + // so name-based matching on the entries struct would drop the value + // field's extension metadata. Merge the entries struct positionally. + let new_entries = merge_map_entries_metadata(iceberg_entries, mz_entries)?; DataType::Map(Arc::new(new_entries), *sorted) } other => other.clone(), @@ -665,6 +669,73 @@ fn merge_field_metadata_recursive( .with_metadata(metadata)) } +/// Merge metadata into a Map's entries struct, matching key/value positionally. +/// +/// Iceberg's arrow representation names map fields `key_value`/`key`/`value`, +/// while Materialize uses `entries`/`keys`/`values`. Name-based matching would +/// drop the materialize extension metadata for the value field, which then +/// causes `ArrowBuilder` to fail with "Field 'value' missing extension metadata". +/// +/// Positional matching is safe because the Arrow spec defines Map structurally, +/// not by field name: `List>` with exactly +/// two struct children — key first, value second — and the names are only +/// conventional. See `Map` in apache/arrow `format/Schema.fbs`: +/// — "The names +/// of the child fields may be respectively 'entries', 'key', and 'value', but +/// this is not enforced." +/// +/// Future cleanup: we could instead align Materialize's arrow map field names +/// with the Parquet/Iceberg convention (`key_value`/`key`/`value`) in +/// `mz_arrow_util::builder::scalar_to_arrow_datatype_impl` and drop this +/// positional helper. That would also affect `COPY TO S3 ... FORMAT = 'parquet'` +/// output schemas, so we'd need to confirm no downstream consumers depend on +/// the current `entries`/`keys`/`values` names before flipping. +fn merge_map_entries_metadata( + iceberg_entries: &Field, + mz_entries: Option<&Field>, +) -> anyhow::Result { + let mut metadata = iceberg_entries.metadata().clone(); + if let Some(mz_f) = mz_entries { + if let Some(extension_name) = mz_f.metadata().get(ARROW_EXTENSION_NAME_KEY) { + metadata.insert(ARROW_EXTENSION_NAME_KEY.to_string(), extension_name.clone()); + } + } + + let iceberg_fields = match iceberg_entries.data_type() { + DataType::Struct(fields) => fields, + other => anyhow::bail!( + "Iceberg map entries field '{}' is not a Struct: {:?}", + iceberg_entries.name(), + other + ), + }; + let mz_fields = match mz_entries.map(|f| f.data_type()) { + Some(DataType::Struct(fields)) => Some(fields), + Some(other) => anyhow::bail!( + "Materialize map entries field '{}' is not a Struct: {:?}", + mz_entries.map(|f| f.name().as_str()).unwrap_or(""), + other + ), + None => None, + }; + + let new_fields: Vec = iceberg_fields + .iter() + .enumerate() + .map(|(idx, iceberg_inner)| { + let mz_inner = mz_fields.and_then(|fields| fields.get(idx)); + merge_field_metadata_recursive(iceberg_inner, mz_inner.map(|f| f.as_ref())) + }) + .collect::>>()?; + + Ok(Field::new( + iceberg_entries.name(), + DataType::Struct(new_fields.into()), + iceberg_entries.is_nullable(), + ) + .with_metadata(metadata)) +} + async fn reload_table( catalog: &dyn Catalog, namespace: String, @@ -1997,6 +2068,71 @@ mod tests { assert_eq!(equality_ids, vec![expected_id]); assert_ne!(expected_id, 2); } + + /// Regression test: iceberg-rust names map fields `key_value`/`key`/`value` + /// while Materialize uses `entries`/`keys`/`values`. The schema merge must + /// still copy the value field's extension metadata across so ArrowBuilder + /// can build the inner builder. + #[mz_ore::test] + #[allow(clippy::disallowed_types)] + fn merge_map_entries_preserves_value_extension_metadata() { + use std::collections::HashMap; + + let mz_value_metadata = HashMap::from([( + ARROW_EXTENSION_NAME_KEY.to_string(), + "materialize.v1.string".to_string(), + )]); + let mz_entries = Field::new( + "entries", + DataType::Struct( + vec![ + Field::new("keys", DataType::Utf8, false), + Field::new("values", DataType::Utf8, true).with_metadata(mz_value_metadata), + ] + .into(), + ), + false, + ); + let mz_map = Field::new("m", DataType::Map(Arc::new(mz_entries), false), true) + .with_metadata(HashMap::from([( + ARROW_EXTENSION_NAME_KEY.to_string(), + "materialize.v1.map".to_string(), + )])); + + let iceberg_entries = Field::new( + "key_value", + DataType::Struct( + vec![ + Field::new("key", DataType::Utf8, false), + Field::new("value", DataType::Utf8, true), + ] + .into(), + ), + false, + ); + let iceberg_map = Field::new("m", DataType::Map(Arc::new(iceberg_entries), false), true); + + let merged = merge_field_metadata_recursive(&iceberg_map, Some(&mz_map)) + .expect("merge should succeed"); + + let entries = match merged.data_type() { + DataType::Map(entries, _) => entries.as_ref(), + other => panic!("expected Map, got {other:?}"), + }; + let entry_fields = match entries.data_type() { + DataType::Struct(fields) => fields, + other => panic!("expected Struct, got {other:?}"), + }; + // Iceberg naming must be preserved on the merged schema... + assert_eq!(entry_fields[0].name(), "key"); + assert_eq!(entry_fields[1].name(), "value"); + // ...and the materialize extension must have been copied positionally + // to the value field even though its name didn't match `values`. + assert_eq!( + entry_fields[1].metadata().get(ARROW_EXTENSION_NAME_KEY), + Some(&"materialize.v1.string".to_string()), + ); + } } /// Commit completed batches to Iceberg as snapshots. diff --git a/src/testdrive/ci/Dockerfile b/src/testdrive/ci/Dockerfile index af703116c8d1b..d68d66171b78c 100644 --- a/src/testdrive/ci/Dockerfile +++ b/src/testdrive/ci/Dockerfile @@ -17,9 +17,15 @@ COPY protobuf-include /usr/local/include RUN chmod +x /usr/local/bin/protoc ENV PROTOC=/usr/local/bin/protoc ENV PROTOC_INCLUDE=/usr/local/include -# Install libduckdb.so that was downloaded -COPY duckdb-download/*/*/libduckdb.so /usr/local/lib/ -RUN ldconfig +# Install libduckdb.so that was downloaded. The cache contains +# `-unknown-linux-gnu//libduckdb.so` for every architecture +# we've built against; copy them all in and then pick the one matching this +# image's arch. (A glob COPY collapses both files onto the same destination +# path and silently picks whichever sorts last, which is wrong on aarch64.) +COPY duckdb-download /tmp/duckdb-download +RUN cp /tmp/duckdb-download/$(uname -m)-unknown-linux-gnu/*/libduckdb.so /usr/local/lib/ \ + && rm -rf /tmp/duckdb-download \ + && ldconfig ENV LD_LIBRARY_PATH=/usr/local/lib WORKDIR /workdir diff --git a/test/iceberg/catalog.td b/test/iceberg/catalog.td index 2401779ad2684..a05c4a3993c90 100644 --- a/test/iceberg/catalog.td +++ b/test/iceberg/catalog.td @@ -270,3 +270,63 @@ SELECT id, r.lower, r.upper, r.lower_inclusive, r.upper_inclusive, r.empty FROM 2 6 15 true false false 3 false false true 4 100 false false false + +# Test smallint columns +# Iceberg has no Int16, so smallint widens to Iceberg int (Int32) in the +# arrow/parquet schema. The Datum::Int16 -> Int32Builder promotion is what +# lets that conversion succeed end-to-end. +> CREATE TABLE smallints(id smallint, val smallint); + +> INSERT INTO smallints VALUES (1, -32768), (2, 0), (3, 32767); + +> CREATE SINK smallint_demo + FROM smallints + INTO ICEBERG CATALOG CONNECTION polaris ( + NAMESPACE 'default_namespace', + TABLE 'smallint_table' + ) + USING AWS CONNECTION aws_conn + KEY (id) NOT ENFORCED + MODE UPSERT + WITH (COMMIT INTERVAL '1s'); + +$ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration=10s + +$ duckdb-query name=iceberg +SELECT id, val FROM iceberg_scan('s3://test-bucket/default_namespace/smallint_table') ORDER BY id +1 -32768 +2 0 +3 32767 + +# Test map columns +# Materialize's arrow schema for maps uses field names entries/keys/values, +# while iceberg-rust uses key_value/key/value. The metadata merge must align +# them positionally so the value field's extension metadata survives. +> CREATE TABLE attrs(id int, props map[text => text]); + +> INSERT INTO attrs VALUES + (1, '{a=>foo,b=>bar}'), + (2, '{}'), + (3, '{key=>value}'); + +> CREATE SINK map_demo + FROM attrs + INTO ICEBERG CATALOG CONNECTION polaris ( + NAMESPACE 'default_namespace', + TABLE 'map_table' + ) + USING AWS CONNECTION aws_conn + KEY (id) NOT ENFORCED + MODE UPSERT + WITH (COMMIT INTERVAL '1s'); + +$ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration=10s + +# Verify the map round-trips: cardinality + sorted keys/values. Avoid +# `props::VARCHAR` because the textual rendering of MAP differs across +# DuckDB versions. +$ duckdb-query name=iceberg +SELECT id, cardinality(props), list_sort(map_keys(props))::VARCHAR, list_sort(map_values(props))::VARCHAR FROM iceberg_scan('s3://test-bucket/default_namespace/map_table') ORDER BY id +1 2 [a, b] [bar, foo] +2 0 [] [] +3 1 [key] [value] diff --git a/test/iceberg/key-validation.td b/test/iceberg/key-validation.td index 92ad6b3f97a04..5fc3861b86fae 100644 --- a/test/iceberg/key-validation.td +++ b/test/iceberg/key-validation.td @@ -87,6 +87,22 @@ contains:cannot be used as an Iceberg equality delete key WITH (COMMIT INTERVAL '1s'); contains:cannot be used as an Iceberg equality delete key +# Range key — ranges lower to Iceberg structs, which iceberg-rust's +# RecordBatchProjector skips as nested, so they cannot be equality delete keys. +> CREATE TABLE key_range (k int4range, v text); + +! CREATE SINK key_range_sink + FROM key_range + INTO ICEBERG CATALOG CONNECTION polaris ( + NAMESPACE 'default_namespace', + TABLE 'key_range_table' + ) + USING AWS CONNECTION aws_conn + KEY (k) NOT ENFORCED + MODE UPSERT + WITH (COMMIT INTERVAL '1s'); +contains:cannot be used as an Iceberg equality delete key + # Non-key float column is fine > CREATE TABLE val_float (k int, v float); @@ -108,3 +124,4 @@ contains:cannot be used as an Iceberg equality delete key > DROP TABLE key_double; > DROP TABLE key_map; > DROP TABLE key_list; +> DROP TABLE key_range;