diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index 7b1aabf99154..28338d034b50 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -186,6 +186,7 @@ import org.apache.iceberg.mr.hive.actions.HiveIcebergDeleteOrphanFiles; import org.apache.iceberg.mr.hive.plan.IcebergBucketFunction; import org.apache.iceberg.mr.hive.udf.GenericUDFIcebergZorder; +import org.apache.iceberg.parquet.VariantUtil; import org.apache.iceberg.puffin.Blob; import org.apache.iceberg.puffin.BlobMetadata; import org.apache.iceberg.puffin.Puffin; @@ -1739,7 +1740,8 @@ private void fallbackToNonVectorizedModeBasedOnProperties(Properties tableProps) if (FileFormat.AVRO.name().equalsIgnoreCase(tableProps.getProperty(TableProperties.DEFAULT_FILE_FORMAT)) || isValidMetadataTable(tableProps.getProperty(IcebergAcidUtil.META_TABLE_PROPERTY)) || hasOrcTimeInSchema(tableProps, tableSchema) || - !hasParquetNestedTypeWithinListOrMap(tableProps, tableSchema)) { + !hasParquetNestedTypeWithinListOrMap(tableProps, tableSchema) || + VariantUtil.shouldUseVariantShredding(tableProps::getProperty, tableSchema)) { // disable vectorization SessionStateUtil.getQueryState(conf).ifPresent(queryState -> queryState.getConf().setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, false)); diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java index 234cf928432b..049a5a0dfb49 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java @@ -86,7 +86,7 @@ protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) { protected void configureDataWrite(Parquet.DataWriteBuilder builder) { builder.createWriterFunc(GenericParquetWriter::create); // Configure variant shredding if enabled and a sample record is available - if (VariantUtil.shouldUseVariantShredding(properties, dataSchema())) { + if (VariantUtil.shouldUseVariantShredding(properties::get, dataSchema())) { setVariantShreddingFunc(builder, VariantUtil.variantShreddingFunc(sampleRecord, dataSchema())); } } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/VariantUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/VariantUtil.java index 736a39b895c8..d03c4a675fa2 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/VariantUtil.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/VariantUtil.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; +import java.util.function.UnaryOperator; import org.apache.iceberg.Accessor; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; @@ -59,8 +60,8 @@ public record VariantField(int fieldId, Accessor accessor, String[] /** * Check if variant shredding is enabled via table properties. */ - public static boolean isVariantShreddingEnabled(Map properties) { - String shreddingEnabled = properties.get(InputFormatConfig.VARIANT_SHREDDING_ENABLED); + public static boolean isVariantShreddingEnabled(UnaryOperator propertyLookup) { + String shreddingEnabled = propertyLookup.apply(InputFormatConfig.VARIANT_SHREDDING_ENABLED); return Boolean.parseBoolean(shreddingEnabled); } @@ -73,7 +74,7 @@ public static boolean isShreddable(Object value) { public static List variantFieldsForShredding( Map properties, Schema schema) { - if (!isVariantShreddingEnabled(properties)) { + if (!isVariantShreddingEnabled(properties::get)) { return List.of(); } return variantFieldsForShredding(schema); @@ -89,8 +90,8 @@ private static List variantFieldsForShredding(Schema schema) { return results; } - public static boolean shouldUseVariantShredding(Map properties, Schema schema) { - return isVariantShreddingEnabled(properties) && hasVariantFields(schema); + public static boolean shouldUseVariantShredding(UnaryOperator propertyLookup, Schema schema) { + return isVariantShreddingEnabled(propertyLookup) && hasVariantFields(schema); } private static boolean hasVariantFields(Schema schema) { diff --git a/iceberg/iceberg-handler/src/test/queries/positive/variant_type_shredding.q b/iceberg/iceberg-handler/src/test/queries/positive/variant_type_shredding.q index 25d84dd0c0a1..df6794c431ab 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/variant_type_shredding.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/variant_type_shredding.q @@ -27,9 +27,6 @@ INSERT INTO tbl_shredded_variant VALUES (2, parse_json('{"name": "Bill", "active": false}')), (3, parse_json('{"name": "Henry", "age": 20}')); --- Disable vectorized execution until Variant type is supported -set hive.vectorized.execution.enabled=false; - -- Retrieve and verify SELECT id, try_variant_get(data, '$.name') FROM tbl_shredded_variant WHERE variant_get(data, '$.age') > 25; @@ -37,3 +34,23 @@ WHERE variant_get(data, '$.age') > 25; EXPLAIN SELECT id, try_variant_get(data, '$.name') FROM tbl_shredded_variant WHERE variant_get(data, '$.age') > 25; + +CREATE TABLE t ( + id INT, + v VARIANT +) +STORED BY ICEBERG +TBLPROPERTIES ( + 'format-version'='3', + 'variant.shredding.enabled'='true' +); + +INSERT INTO t VALUES +(1, parse_json('{"a": 1}')), +(2, parse_json('{"b": 2}')); + +SELECT + try_variant_get(v, '$.a'), + try_variant_get(v, '$.b') +FROM t +ORDER BY id; diff --git a/iceberg/iceberg-handler/src/test/results/positive/variant_type_shredding.q.out b/iceberg/iceberg-handler/src/test/results/positive/variant_type_shredding.q.out index b51bc7495259..f7c0910a9b89 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/variant_type_shredding.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/variant_type_shredding.q.out @@ -99,3 +99,57 @@ STAGE PLANS: Processor Tree: ListSink +PREHOOK: query: CREATE TABLE t ( + id INT, + v VARIANT +) +STORED BY ICEBERG +TBLPROPERTIES ( + 'format-version'='3', + 'variant.shredding.enabled'='true' +) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@t +POSTHOOK: query: CREATE TABLE t ( + id INT, + v VARIANT +) +STORED BY ICEBERG +TBLPROPERTIES ( + 'format-version'='3', + 'variant.shredding.enabled'='true' +) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@t +PREHOOK: query: INSERT INTO t VALUES +(1, parse_json('{"a": 1}')), +(2, parse_json('{"b": 2}')) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@t +POSTHOOK: query: INSERT INTO t VALUES +(1, parse_json('{"a": 1}')), +(2, parse_json('{"b": 2}')) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@t +PREHOOK: query: SELECT + try_variant_get(v, '$.a'), + try_variant_get(v, '$.b') +FROM t +ORDER BY id +PREHOOK: type: QUERY +PREHOOK: Input: default@t +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: SELECT + try_variant_get(v, '$.a'), + try_variant_get(v, '$.b') +FROM t +ORDER BY id +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t +POSTHOOK: Output: hdfs://### HDFS PATH ### +1 NULL +NULL 2