From ef1d8a6ba1f9794fd83771de3c8edc7e75b0cca0 Mon Sep 17 00:00:00 2001 From: Harsh Motwani Date: Mon, 7 Jul 2025 16:48:14 -0700 Subject: [PATCH 01/11] Added utility functions to convert batches of variants to and from json strings --- parquet-variant-compute/Cargo.toml | 49 ++++++++ parquet-variant-compute/src/from_json.rs | 119 ++++++++++++++++++++ parquet-variant-compute/src/lib.rs | 27 +++++ parquet-variant-compute/src/to_json.rs | 137 +++++++++++++++++++++++ 4 files changed, 332 insertions(+) create mode 100644 parquet-variant-compute/Cargo.toml create mode 100644 parquet-variant-compute/src/from_json.rs create mode 100644 parquet-variant-compute/src/lib.rs create mode 100644 parquet-variant-compute/src/to_json.rs diff --git a/parquet-variant-compute/Cargo.toml b/parquet-variant-compute/Cargo.toml new file mode 100644 index 000000000000..b743ca6aa4f5 --- /dev/null +++ b/parquet-variant-compute/Cargo.toml @@ -0,0 +1,49 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "parquet-variant-compute" +# This package is still in development and thus the version does +# not follow the versions of the rest of the crates in this repo. +version = "0.1.0" +license = { workspace = true } +description = "Apache Parquet Variant Batch Processing" +homepage = { workspace = true } +repository = { workspace = true } +authors = { workspace = true } +keywords = ["arrow", "parquet", "variant"] +readme = "README.md" +edition = { workspace = true } +# needs a newer version than workspace due to +# rror: `Option::::unwrap` is not yet stable as a const fn +rust-version = "1.83" + + +[dependencies] +arrow = { workspace = true } +arrow-schema = { workspace = true } +parquet-variant = { path = "../parquet-variant" } +chrono = { workspace = true } +serde_json = "1.0" +base64 = "0.22" + + +[lib] +name = "parquet_variant_compute" +bench = false + +[dev-dependencies] diff --git a/parquet-variant-compute/src/from_json.rs b/parquet-variant-compute/src/from_json.rs new file mode 100644 index 000000000000..1769aafd768d --- /dev/null +++ b/parquet-variant-compute/src/from_json.rs @@ -0,0 +1,119 @@ +use std::sync::Arc; + +use arrow::array::{ + Array, ArrayRef, BinaryBuilder, BooleanBufferBuilder, StringArray, StructArray, +}; +use arrow::buffer::NullBuffer; +use arrow::datatypes::{DataType, Field}; +use arrow_schema::ArrowError; +use parquet_variant::{json_to_variant, VariantBuilder}; + +fn variant_arrow_repr() -> DataType { + let metadata_field = Field::new("metadata", DataType::Binary, true); + let value_field = Field::new("value", DataType::Binary, true); + let fields = vec![metadata_field, value_field]; + DataType::Struct(fields.into()) +} + +pub fn batch_json_string_to_variant(input: &ArrayRef) -> Result { + let input_string_array = match input.as_any().downcast_ref::() { + Some(string_array) => Ok(string_array), + None => Err(ArrowError::CastError( + "Expected reference to StringArray as input".into(), + )), + }?; + + let mut metadata_builder = BinaryBuilder::new(); + let mut value_builder = BinaryBuilder::new(); + let mut validity = BooleanBufferBuilder::new(input.len()); + for i in 0..input.len() { + if input.is_null(i) { + metadata_builder.append_null(); + value_builder.append_null(); + validity.append(false); + } else { + let mut vb = VariantBuilder::new(); + json_to_variant(input_string_array.value(i), &mut vb)?; + let (metadata, value) = vb.finish(); + metadata_builder.append_value(&metadata); + value_builder.append_value(&value); + validity.append(true); + } + } + let struct_fields: Vec = vec![ + Arc::new(metadata_builder.finish()), + Arc::new(value_builder.finish()), + ]; + let variant_fields = match variant_arrow_repr() { + DataType::Struct(fields) => fields, + _ => unreachable!("variant_arrow_repr is hard-coded and must match the expected schema"), + }; + let null_buffer = NullBuffer::new(validity.finish()); + Ok(StructArray::new( + variant_fields, + struct_fields, + Some(null_buffer), + )) +} + +#[cfg(test)] +mod test { + use crate::batch_json_string_to_variant; + use arrow::array::{Array, ArrayRef, BinaryArray, StringArray}; + use arrow_schema::ArrowError; + use parquet_variant::{Variant, VariantBuilder}; + use std::sync::Arc; + + #[test] + fn test_batch_json_string_to_variant() -> Result<(), ArrowError> { + let input = StringArray::from(vec![ + Some("1"), + None, + Some("{\"a\": 32}"), + Some("null"), + None, + ]); + let array_ref: ArrayRef = Arc::new(input); + let output = batch_json_string_to_variant(&array_ref).unwrap(); + + let struct_array = &output; + let metadata_array = struct_array + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let value_array = struct_array + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!(struct_array.is_null(0), false); + assert_eq!(struct_array.is_null(1), true); + assert_eq!(struct_array.is_null(2), false); + assert_eq!(struct_array.is_null(3), false); + assert_eq!(struct_array.is_null(4), true); + + assert_eq!(metadata_array.value(0), &[1, 0, 0]); + assert_eq!(value_array.value(0), &[12, 1]); + + { + let mut vb = VariantBuilder::new(); + let mut ob = vb.new_object(); + ob.insert("a", Variant::Int8(32)); + ob.finish()?; + let (object_metadata, object_value) = vb.finish(); + assert_eq!(metadata_array.value(2), &object_metadata); + assert_eq!(value_array.value(2), &object_value); + } + + assert_eq!(metadata_array.value(3), &[1, 0, 0]); + assert_eq!(value_array.value(3), &[0]); + + assert!(metadata_array.is_null(1)); + assert!(value_array.is_null(1)); + assert!(metadata_array.is_null(4)); + assert!(value_array.is_null(4)); + Ok(()) + } +} diff --git a/parquet-variant-compute/src/lib.rs b/parquet-variant-compute/src/lib.rs new file mode 100644 index 000000000000..97431448a2ec --- /dev/null +++ b/parquet-variant-compute/src/lib.rs @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod from_json; +mod to_json; + +/// Parse a batch of JSON strings into a batch of Variants represented as +/// STRUCT where nulls are preserved. The JSON strings in the input +/// must be valid. +pub use from_json::batch_json_string_to_variant; +/// Transform a batch of Variant represented as STRUCT to a batch +/// of JSON strings where nulls are preserved. The JSON strings in the input must be valid. +pub use to_json::batch_variant_to_json_string; diff --git a/parquet-variant-compute/src/to_json.rs b/parquet-variant-compute/src/to_json.rs new file mode 100644 index 000000000000..8fc30ce039a3 --- /dev/null +++ b/parquet-variant-compute/src/to_json.rs @@ -0,0 +1,137 @@ +use arrow::array::{Array, ArrayRef, BinaryArray, StringArray, StringBuilder, StructArray}; +use arrow::datatypes::DataType; +use arrow_schema::ArrowError; +use parquet_variant::{variant_to_json_string, Variant}; + +pub fn batch_variant_to_json_string(input: &ArrayRef) -> Result { + let struct_array = input + .as_any() + .downcast_ref::() + .ok_or_else(|| ArrowError::CastError("Expected StructArray as input".into()))?; + + // Validate field types + let data_type = struct_array.data_type(); + match data_type { + DataType::Struct(inner_fields) => { + if inner_fields.len() != 2 + || inner_fields[0].data_type() != &DataType::Binary + || inner_fields[1].data_type() != &DataType::Binary + { + return Err(ArrowError::CastError( + "Expected struct with two binary fields".into(), + )); + } + } + _ => { + return Err(ArrowError::CastError( + "Expected StructArray with known fields".into(), + )) + } + } + + let value_array = struct_array + .column(0) + .as_any() + .downcast_ref::() + .ok_or_else(|| ArrowError::CastError("Expected BinaryArray for 'value'".into()))?; + + let metadata_array = struct_array + .column(1) + .as_any() + .downcast_ref::() + .ok_or_else(|| ArrowError::CastError("Expected BinaryArray for 'metadata'".into()))?; + + let mut builder = StringBuilder::new(); + + for i in 0..struct_array.len() { + if struct_array.is_null(i) { + builder.append_null(); + } else { + let metadata = metadata_array.value(i); + let value = value_array.value(i); + let variant = Variant::new(metadata, value); + let json_string = variant_to_json_string(&variant)?; + builder.append_value(&json_string); + } + } + + Ok(builder.finish()) +} + +#[cfg(test)] +mod test { + use crate::batch_variant_to_json_string; + use arrow::array::{Array, ArrayRef, BinaryBuilder, BooleanBufferBuilder, StructArray}; + use arrow::buffer::NullBuffer; + use arrow::datatypes::DataType; + use arrow::datatypes::Field; + use arrow_schema::Fields; + use std::sync::Arc; + + #[test] + fn test_batch_variant_to_json_string() { + let mut metadata_builder = BinaryBuilder::new(); + let mut value_builder = BinaryBuilder::new(); + + // Row 0: [1, 0, 0], [12, 0] + metadata_builder.append_value(&[1, 0, 0]); + value_builder.append_value(&[12, 0]); + + // Row 1: null + metadata_builder.append_null(); + value_builder.append_null(); + + // Row 2: [1, 1, 0, 1, 97], [2, 1, 0, 0, 1, 32] + metadata_builder.append_value(&[1, 1, 0, 1, 97]); + value_builder.append_value(&[2, 1, 0, 0, 2, 12, 32]); + + // Row 3: [1, 0, 0], [0] + metadata_builder.append_value(&[1, 0, 0]); + value_builder.append_value(&[0]); + + // Row 4: null + metadata_builder.append_null(); + value_builder.append_null(); + + let metadata_array = Arc::new(metadata_builder.finish()) as ArrayRef; + let value_array = Arc::new(value_builder.finish()) as ArrayRef; + + let fields: Fields = vec![ + Field::new("value", DataType::Binary, true), + Field::new("metadata", DataType::Binary, true), + ] + .into(); + + let mut validity = BooleanBufferBuilder::new(value_array.len()); + for i in 0..value_array.len() { + let is_valid = value_array.is_valid(i) && metadata_array.is_valid(i); + validity.append(is_valid); + } + let null_buffer = NullBuffer::new(validity.finish()); + + let struct_array = StructArray::new( + fields, + vec![value_array.clone(), metadata_array.clone()], + Some(null_buffer), // Null bitmap (let Arrow infer from children) + ); + + let input = Arc::new(struct_array) as ArrayRef; + + let result = batch_variant_to_json_string(&input).unwrap(); + + // Expected output: ["0", null, "{\"a\":32}", "null", null] + let expected = vec![Some("0"), None, Some("{\"a\":32}"), Some("null"), None]; + + let result_vec: Vec> = (0..result.len()) + .map(|i| { + if result.is_null(i) { + None + } else { + Some(result.value(i)) + } + }) + .collect(); + + assert_eq!(result_vec, expected); + } +} From a2b656547bb1956fb875318411a15fdbeb4e8c77 Mon Sep 17 00:00:00 2001 From: Harsh Motwani Date: Tue, 8 Jul 2025 12:59:34 -0700 Subject: [PATCH 02/11] made to_json zero-copy --- parquet-variant-compute/src/to_json.rs | 45 ++++++++++++++++++-------- 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/parquet-variant-compute/src/to_json.rs b/parquet-variant-compute/src/to_json.rs index 8fc30ce039a3..2c867a31635f 100644 --- a/parquet-variant-compute/src/to_json.rs +++ b/parquet-variant-compute/src/to_json.rs @@ -1,7 +1,8 @@ -use arrow::array::{Array, ArrayRef, BinaryArray, StringArray, StringBuilder, StructArray}; +use arrow::array::{Array, ArrayRef, BinaryArray, BooleanBufferBuilder, StringArray, StructArray}; +use arrow::buffer::{Buffer, NullBuffer, OffsetBuffer, ScalarBuffer}; use arrow::datatypes::DataType; use arrow_schema::ArrowError; -use parquet_variant::{variant_to_json_string, Variant}; +use parquet_variant::{variant_to_json, Variant}; pub fn batch_variant_to_json_string(input: &ArrayRef) -> Result { let struct_array = input @@ -29,33 +30,51 @@ pub fn batch_variant_to_json_string(input: &ArrayRef) -> Result() - .ok_or_else(|| ArrowError::CastError("Expected BinaryArray for 'value'".into()))?; + .ok_or_else(|| ArrowError::CastError("Expected BinaryArray for 'metadata'".into()))?; - let metadata_array = struct_array + let value_array = struct_array .column(1) .as_any() .downcast_ref::() - .ok_or_else(|| ArrowError::CastError("Expected BinaryArray for 'metadata'".into()))?; + .ok_or_else(|| ArrowError::CastError("Expected BinaryArray for 'value'".into()))?; - let mut builder = StringBuilder::new(); + // Zero-copy builder + let mut json_buffer: Vec = Vec::with_capacity(1024); + let mut offsets: Vec = Vec::with_capacity(struct_array.len() + 1); + let mut validity = BooleanBufferBuilder::new(struct_array.len()); + let mut current_offset: i32 = 0; + offsets.push(current_offset); for i in 0..struct_array.len() { if struct_array.is_null(i) { - builder.append_null(); + validity.append(false); + offsets.push(current_offset); } else { let metadata = metadata_array.value(i); let value = value_array.value(i); let variant = Variant::new(metadata, value); - let json_string = variant_to_json_string(&variant)?; - builder.append_value(&json_string); + let start_len = json_buffer.len(); + variant_to_json(&mut json_buffer, &variant)?; + let written = (json_buffer.len() - start_len) as i32; + current_offset += written; + offsets.push(current_offset); + validity.append(true); } } - Ok(builder.finish()) + let offsets_buffer = OffsetBuffer::new(ScalarBuffer::from(offsets)); + let value_buffer = Buffer::from_vec(json_buffer); + let null_buffer = NullBuffer::new(validity.finish()); + + Ok(StringArray::new( + offsets_buffer, + value_buffer, + Some(null_buffer), + )) } #[cfg(test)] @@ -97,8 +116,8 @@ mod test { let value_array = Arc::new(value_builder.finish()) as ArrayRef; let fields: Fields = vec![ - Field::new("value", DataType::Binary, true), Field::new("metadata", DataType::Binary, true), + Field::new("value", DataType::Binary, true), ] .into(); @@ -111,7 +130,7 @@ mod test { let struct_array = StructArray::new( fields, - vec![value_array.clone(), metadata_array.clone()], + vec![metadata_array.clone(), value_array.clone()], Some(null_buffer), // Null bitmap (let Arrow infer from children) ); From 824f0f0ec7dba8179a0e9294dca988b804ab99bc Mon Sep 17 00:00:00 2001 From: Harsh Motwani Date: Tue, 8 Jul 2025 13:16:24 -0700 Subject: [PATCH 03/11] removed unnecessary dependencies --- parquet-variant-compute/Cargo.toml | 3 --- parquet-variant-compute/src/to_json.rs | 4 +++- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/parquet-variant-compute/Cargo.toml b/parquet-variant-compute/Cargo.toml index b743ca6aa4f5..2615988a02d9 100644 --- a/parquet-variant-compute/Cargo.toml +++ b/parquet-variant-compute/Cargo.toml @@ -37,9 +37,6 @@ rust-version = "1.83" arrow = { workspace = true } arrow-schema = { workspace = true } parquet-variant = { path = "../parquet-variant" } -chrono = { workspace = true } -serde_json = "1.0" -base64 = "0.22" [lib] diff --git a/parquet-variant-compute/src/to_json.rs b/parquet-variant-compute/src/to_json.rs index 2c867a31635f..187bf090308b 100644 --- a/parquet-variant-compute/src/to_json.rs +++ b/parquet-variant-compute/src/to_json.rs @@ -43,7 +43,9 @@ pub fn batch_variant_to_json_string(input: &ArrayRef) -> Result = Vec::with_capacity(1024); + // The size per JSON string is assumed to be 128 bytes. If this holds true, resizing could be + // minimized to improve performance. + let mut json_buffer: Vec = Vec::with_capacity(struct_array.len() * 128); let mut offsets: Vec = Vec::with_capacity(struct_array.len() + 1); let mut validity = BooleanBufferBuilder::new(struct_array.len()); let mut current_offset: i32 = 0; From da1cc318b7c1a5b04a5c805a0d18fa54bfd98dc0 Mon Sep 17 00:00:00 2001 From: Harsh Motwani Date: Tue, 8 Jul 2025 13:16:32 -0700 Subject: [PATCH 04/11] added docs --- parquet-variant-compute/src/from_json.rs | 17 +++++++++++++++++ parquet-variant-compute/src/to_json.rs | 20 ++++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/parquet-variant-compute/src/from_json.rs b/parquet-variant-compute/src/from_json.rs index 1769aafd768d..c5783eabef32 100644 --- a/parquet-variant-compute/src/from_json.rs +++ b/parquet-variant-compute/src/from_json.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use std::sync::Arc; use arrow::array::{ diff --git a/parquet-variant-compute/src/to_json.rs b/parquet-variant-compute/src/to_json.rs index 187bf090308b..ed02367dc5d3 100644 --- a/parquet-variant-compute/src/to_json.rs +++ b/parquet-variant-compute/src/to_json.rs @@ -1,3 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Module for transforming a batch of Variants represented as +//! STRUCT into a batch of JSON strings. + use arrow::array::{Array, ArrayRef, BinaryArray, BooleanBufferBuilder, StringArray, StructArray}; use arrow::buffer::{Buffer, NullBuffer, OffsetBuffer, ScalarBuffer}; use arrow::datatypes::DataType; From d0eaa72406399b43a28f291c030db7d2f1f04a86 Mon Sep 17 00:00:00 2001 From: Harsh Motwani Date: Tue, 8 Jul 2025 15:58:13 -0700 Subject: [PATCH 05/11] minor fixes --- parquet-variant-compute/Cargo.toml | 4 +--- parquet-variant-compute/src/from_json.rs | 3 +++ 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/parquet-variant-compute/Cargo.toml b/parquet-variant-compute/Cargo.toml index 2615988a02d9..6f7d198e5cb0 100644 --- a/parquet-variant-compute/Cargo.toml +++ b/parquet-variant-compute/Cargo.toml @@ -26,17 +26,15 @@ homepage = { workspace = true } repository = { workspace = true } authors = { workspace = true } keywords = ["arrow", "parquet", "variant"] -readme = "README.md" edition = { workspace = true } # needs a newer version than workspace due to # rror: `Option::::unwrap` is not yet stable as a const fn -rust-version = "1.83" [dependencies] arrow = { workspace = true } arrow-schema = { workspace = true } -parquet-variant = { path = "../parquet-variant" } +parquet-variant = { workspace = true } [lib] diff --git a/parquet-variant-compute/src/from_json.rs b/parquet-variant-compute/src/from_json.rs index c5783eabef32..83af77b53ef1 100644 --- a/parquet-variant-compute/src/from_json.rs +++ b/parquet-variant-compute/src/from_json.rs @@ -15,6 +15,9 @@ // specific language governing permissions and limitations // under the License. +//! Module for transforming a batch of JSON strings into a batch of Variants represented as +//! STRUCT + use std::sync::Arc; use arrow::array::{ From d11290d16d5e4ee403e2deb17c4c843fdce7d06d Mon Sep 17 00:00:00 2001 From: Harsh Motwani Date: Tue, 8 Jul 2025 16:45:37 -0700 Subject: [PATCH 06/11] reduced copying in json_to_variant --- parquet-variant-compute/src/from_json.rs | 67 +++++++++++++++++++----- parquet-variant-compute/src/lib.rs | 5 -- parquet-variant-compute/src/to_json.rs | 4 +- 3 files changed, 56 insertions(+), 20 deletions(-) diff --git a/parquet-variant-compute/src/from_json.rs b/parquet-variant-compute/src/from_json.rs index 83af77b53ef1..ebc93ca9387d 100644 --- a/parquet-variant-compute/src/from_json.rs +++ b/parquet-variant-compute/src/from_json.rs @@ -20,10 +20,8 @@ use std::sync::Arc; -use arrow::array::{ - Array, ArrayRef, BinaryBuilder, BooleanBufferBuilder, StringArray, StructArray, -}; -use arrow::buffer::NullBuffer; +use arrow::array::{Array, ArrayRef, BinaryArray, BooleanBufferBuilder, StringArray, StructArray}; +use arrow::buffer::{Buffer, NullBuffer, OffsetBuffer, ScalarBuffer}; use arrow::datatypes::{DataType, Field}; use arrow_schema::ArrowError; use parquet_variant::{json_to_variant, VariantBuilder}; @@ -35,6 +33,9 @@ fn variant_arrow_repr() -> DataType { DataType::Struct(fields.into()) } +/// Parse a batch of JSON strings into a batch of Variants represented as +/// STRUCT where nulls are preserved. The JSON strings in the input +/// must be valid. pub fn batch_json_string_to_variant(input: &ArrayRef) -> Result { let input_string_array = match input.as_any().downcast_ref::() { Some(string_array) => Ok(string_array), @@ -43,27 +44,65 @@ pub fn batch_json_string_to_variant(input: &ArrayRef) -> Result = Vec::with_capacity(input.len() * 128); + let mut metadata_offsets: Vec = Vec::with_capacity(input.len() + 1); + let mut metadata_validity = BooleanBufferBuilder::new(input.len()); + let mut metadata_current_offset: i32 = 0; + metadata_offsets.push(metadata_current_offset); + + let mut value_buffer: Vec = Vec::with_capacity(input.len() * 128); + let mut value_offsets: Vec = Vec::with_capacity(input.len() + 1); + let mut value_validity = BooleanBufferBuilder::new(input.len()); + let mut value_current_offset: i32 = 0; + value_offsets.push(value_current_offset); + let mut validity = BooleanBufferBuilder::new(input.len()); for i in 0..input.len() { if input.is_null(i) { - metadata_builder.append_null(); - value_builder.append_null(); + metadata_validity.append(false); + value_validity.append(false); + metadata_offsets.push(metadata_current_offset); + value_offsets.push(value_current_offset); validity.append(false); } else { let mut vb = VariantBuilder::new(); json_to_variant(input_string_array.value(i), &mut vb)?; let (metadata, value) = vb.finish(); - metadata_builder.append_value(&metadata); - value_builder.append_value(&value); validity.append(true); + + metadata_current_offset += metadata.len() as i32; + metadata_buffer.extend(metadata); + metadata_offsets.push(metadata_current_offset); + metadata_validity.append(true); + + value_current_offset += value.len() as i32; + value_buffer.extend(value); + value_offsets.push(value_current_offset); + value_validity.append(true); + println!("{value_current_offset} {metadata_current_offset}"); } } - let struct_fields: Vec = vec![ - Arc::new(metadata_builder.finish()), - Arc::new(value_builder.finish()), - ]; + let metadata_offsets_buffer = OffsetBuffer::new(ScalarBuffer::from(metadata_offsets)); + let metadata_data_buffer = Buffer::from_vec(metadata_buffer); + let metadata_null_buffer = NullBuffer::new(metadata_validity.finish()); + + let value_offsets_buffer = OffsetBuffer::new(ScalarBuffer::from(value_offsets)); + let value_data_buffer = Buffer::from_vec(value_buffer); + let value_null_buffer = NullBuffer::new(value_validity.finish()); + + let metadata_array = BinaryArray::new( + metadata_offsets_buffer, + metadata_data_buffer, + Some(metadata_null_buffer), + ); + let value_array = BinaryArray::new( + value_offsets_buffer, + value_data_buffer, + Some(value_null_buffer), + ); + + let struct_fields: Vec = vec![Arc::new(metadata_array), Arc::new(value_array)]; let variant_fields = match variant_arrow_repr() { DataType::Struct(fields) => fields, _ => unreachable!("variant_arrow_repr is hard-coded and must match the expected schema"), diff --git a/parquet-variant-compute/src/lib.rs b/parquet-variant-compute/src/lib.rs index 97431448a2ec..599ba328146e 100644 --- a/parquet-variant-compute/src/lib.rs +++ b/parquet-variant-compute/src/lib.rs @@ -18,10 +18,5 @@ mod from_json; mod to_json; -/// Parse a batch of JSON strings into a batch of Variants represented as -/// STRUCT where nulls are preserved. The JSON strings in the input -/// must be valid. pub use from_json::batch_json_string_to_variant; -/// Transform a batch of Variant represented as STRUCT to a batch -/// of JSON strings where nulls are preserved. The JSON strings in the input must be valid. pub use to_json::batch_variant_to_json_string; diff --git a/parquet-variant-compute/src/to_json.rs b/parquet-variant-compute/src/to_json.rs index ed02367dc5d3..005cab06df7d 100644 --- a/parquet-variant-compute/src/to_json.rs +++ b/parquet-variant-compute/src/to_json.rs @@ -24,6 +24,8 @@ use arrow::datatypes::DataType; use arrow_schema::ArrowError; use parquet_variant::{variant_to_json, Variant}; +/// Transform a batch of Variant represented as STRUCT to a batch +/// of JSON strings where nulls are preserved. The JSON strings in the input must be valid. pub fn batch_variant_to_json_string(input: &ArrayRef) -> Result { let struct_array = input .as_any() @@ -64,7 +66,7 @@ pub fn batch_variant_to_json_string(input: &ArrayRef) -> Result = Vec::with_capacity(struct_array.len() * 128); let mut offsets: Vec = Vec::with_capacity(struct_array.len() + 1); let mut validity = BooleanBufferBuilder::new(struct_array.len()); From e8206f2d2e5e2b945589bd1a397678d95a1ec03b Mon Sep 17 00:00:00 2001 From: Harsh Motwani Date: Tue, 8 Jul 2025 17:34:15 -0700 Subject: [PATCH 07/11] nullability fix --- parquet-variant-compute/src/from_json.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/parquet-variant-compute/src/from_json.rs b/parquet-variant-compute/src/from_json.rs index ebc93ca9387d..32ab0ee79464 100644 --- a/parquet-variant-compute/src/from_json.rs +++ b/parquet-variant-compute/src/from_json.rs @@ -27,8 +27,9 @@ use arrow_schema::ArrowError; use parquet_variant::{json_to_variant, VariantBuilder}; fn variant_arrow_repr() -> DataType { - let metadata_field = Field::new("metadata", DataType::Binary, true); - let value_field = Field::new("value", DataType::Binary, true); + // The subfields are expected to be non-nullable according to the parquet variant spec. + let metadata_field = Field::new("metadata", DataType::Binary, false); + let value_field = Field::new("value", DataType::Binary, false); let fields = vec![metadata_field, value_field]; DataType::Struct(fields.into()) } @@ -60,8 +61,9 @@ pub fn batch_json_string_to_variant(input: &ArrayRef) -> Result Date: Tue, 8 Jul 2025 23:56:41 -0700 Subject: [PATCH 08/11] pushing from top level repo --- Cargo.toml | 2 ++ parquet-variant/src/variant.rs | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a9b00f9537dc..1bb1db7194f9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ members = [ "arrow-string", "parquet", "parquet-variant", + "parquet-variant-compute", "parquet_derive", "parquet_derive_test", ] @@ -95,6 +96,7 @@ arrow-schema = { version = "55.2.0", path = "./arrow-schema" } arrow-select = { version = "55.2.0", path = "./arrow-select" } arrow-string = { version = "55.2.0", path = "./arrow-string" } parquet = { version = "55.2.0", path = "./parquet", default-features = false } +parquet-variant = { path = "./parquet-variant" } chrono = { version = "0.4.40", default-features = false, features = ["clock"] } diff --git a/parquet-variant/src/variant.rs b/parquet-variant/src/variant.rs index 96cdb53c15e8..db401d9f1e16 100644 --- a/parquet-variant/src/variant.rs +++ b/parquet-variant/src/variant.rs @@ -275,8 +275,8 @@ impl<'m, 'v> Variant<'m, 'v> { /// /// [validated]: Self#Validation pub fn try_new(metadata: &'m [u8], value: &'v [u8]) -> Result { - let metadata = VariantMetadata::try_new(metadata)?; - Self::try_new_with_metadata(metadata, value) + // let metadata = VariantMetadata::try_new(metadata)?; + Self::try_new_with_metadata(VariantMetadata::try_new(metadata)?, value) } /// Attempts to interpret a metadata and value buffer pair as a new `Variant`. From 4f406edfc7d3751d5332b9ad3b9077eeb01fb07b Mon Sep 17 00:00:00 2001 From: Harsh Motwani Date: Tue, 8 Jul 2025 23:57:42 -0700 Subject: [PATCH 09/11] removed unnecessary change --- parquet-variant/src/variant.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet-variant/src/variant.rs b/parquet-variant/src/variant.rs index db401d9f1e16..96cdb53c15e8 100644 --- a/parquet-variant/src/variant.rs +++ b/parquet-variant/src/variant.rs @@ -275,8 +275,8 @@ impl<'m, 'v> Variant<'m, 'v> { /// /// [validated]: Self#Validation pub fn try_new(metadata: &'m [u8], value: &'v [u8]) -> Result { - // let metadata = VariantMetadata::try_new(metadata)?; - Self::try_new_with_metadata(VariantMetadata::try_new(metadata)?, value) + let metadata = VariantMetadata::try_new(metadata)?; + Self::try_new_with_metadata(metadata, value) } /// Attempts to interpret a metadata and value buffer pair as a new `Variant`. From a4da974358e676c8fd9bdcf8220d9beadbda5afa Mon Sep 17 00:00:00 2001 From: Harsh Motwani Date: Wed, 9 Jul 2025 10:25:08 -0700 Subject: [PATCH 10/11] remove debugging code --- parquet-variant-compute/src/from_json.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/parquet-variant-compute/src/from_json.rs b/parquet-variant-compute/src/from_json.rs index 32ab0ee79464..43aee7de8036 100644 --- a/parquet-variant-compute/src/from_json.rs +++ b/parquet-variant-compute/src/from_json.rs @@ -82,7 +82,6 @@ pub fn batch_json_string_to_variant(input: &ArrayRef) -> Result Date: Fri, 11 Jul 2025 09:21:16 -0400 Subject: [PATCH 11/11] Fix msrv --- parquet-variant-compute/Cargo.toml | 4 ++-- parquet-variant-json/Cargo.toml | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/parquet-variant-compute/Cargo.toml b/parquet-variant-compute/Cargo.toml index 15ea2f6002cf..a053803c5551 100644 --- a/parquet-variant-compute/Cargo.toml +++ b/parquet-variant-compute/Cargo.toml @@ -27,8 +27,8 @@ repository = { workspace = true } authors = { workspace = true } keywords = ["arrow", "parquet", "variant"] edition = { workspace = true } -# needs a newer version than workspace due to -# rror: `Option::::unwrap` is not yet stable as a const fn +# parquet-variant needs newer version than workspace +rust-version = "1.83" [dependencies] diff --git a/parquet-variant-json/Cargo.toml b/parquet-variant-json/Cargo.toml index 86281e4ae98e..fed480afb4f3 100644 --- a/parquet-variant-json/Cargo.toml +++ b/parquet-variant-json/Cargo.toml @@ -28,8 +28,7 @@ authors = { workspace = true } keywords = ["arrow", "parquet", "variant"] readme = "README.md" edition = { workspace = true } -# needs a newer version than workspace due to -# rror: `Option::::unwrap` is not yet stable as a const fn +# parquet-variant needs newer version than workspace rust-version = "1.83"