diff --git a/Cargo.toml b/Cargo.toml index 5f6861518e14..aab2ab8f7bc5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ members = [ "arrow-string", "parquet", "parquet-variant", + "parquet-variant-compute", "parquet-variant-json", "parquet_derive", "parquet_derive_test", @@ -103,6 +104,7 @@ parquet = { version = "55.2.0", path = "./parquet", default-features = false } # These crates have not yet been released and thus do not use the workspace version parquet-variant = { version = "0.1.0", path = "./parquet-variant"} parquet-variant-json = { version = "0.1.0", path = "./parquet-variant-json" } +parquet-variant-compute = { version = "0.1.0", path = "./parquet-variant-json" } chrono = { version = "0.4.40", default-features = false, features = ["clock"] } diff --git a/parquet-variant-compute/Cargo.toml b/parquet-variant-compute/Cargo.toml new file mode 100644 index 000000000000..a053803c5551 --- /dev/null +++ b/parquet-variant-compute/Cargo.toml @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "parquet-variant-compute" +# This package is still in development and thus the version does +# not follow the versions of the rest of the crates in this repo. +version = "0.1.0" +license = { workspace = true } +description = "Apache Parquet Variant Batch Processing" +homepage = { workspace = true } +repository = { workspace = true } +authors = { workspace = true } +keywords = ["arrow", "parquet", "variant"] +edition = { workspace = true } +# parquet-variant needs newer version than workspace +rust-version = "1.83" + + +[dependencies] +arrow = { workspace = true } +arrow-schema = { workspace = true } +parquet-variant = { workspace = true } +parquet-variant-json = { workspace = true } + +[lib] +name = "parquet_variant_compute" +bench = false + +[dev-dependencies] diff --git a/parquet-variant-compute/src/from_json.rs b/parquet-variant-compute/src/from_json.rs new file mode 100644 index 000000000000..85777c6af25f --- /dev/null +++ b/parquet-variant-compute/src/from_json.rs @@ -0,0 +1,181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Module for transforming a batch of JSON strings into a batch of Variants represented as +//! STRUCT + +use std::sync::Arc; + +use arrow::array::{Array, ArrayRef, BinaryArray, BooleanBufferBuilder, StringArray, StructArray}; +use arrow::buffer::{Buffer, NullBuffer, OffsetBuffer, ScalarBuffer}; +use arrow::datatypes::{DataType, Field}; +use arrow_schema::ArrowError; +use parquet_variant::VariantBuilder; +use parquet_variant_json::json_to_variant; + +fn variant_arrow_repr() -> DataType { + // The subfields are expected to be non-nullable according to the parquet variant spec. + let metadata_field = Field::new("metadata", DataType::Binary, false); + let value_field = Field::new("value", DataType::Binary, false); + let fields = vec![metadata_field, value_field]; + DataType::Struct(fields.into()) +} + +/// Parse a batch of JSON strings into a batch of Variants represented as +/// STRUCT where nulls are preserved. The JSON strings in the input +/// must be valid. +pub fn batch_json_string_to_variant(input: &ArrayRef) -> Result { + let input_string_array = match input.as_any().downcast_ref::() { + Some(string_array) => Ok(string_array), + None => Err(ArrowError::CastError( + "Expected reference to StringArray as input".into(), + )), + }?; + + // Zero-copy builders + let mut metadata_buffer: Vec = Vec::with_capacity(input.len() * 128); + let mut metadata_offsets: Vec = Vec::with_capacity(input.len() + 1); + let mut metadata_validity = BooleanBufferBuilder::new(input.len()); + let mut metadata_current_offset: i32 = 0; + metadata_offsets.push(metadata_current_offset); + + let mut value_buffer: Vec = Vec::with_capacity(input.len() * 128); + let mut value_offsets: Vec = Vec::with_capacity(input.len() + 1); + let mut value_validity = BooleanBufferBuilder::new(input.len()); + let mut value_current_offset: i32 = 0; + value_offsets.push(value_current_offset); + + let mut validity = BooleanBufferBuilder::new(input.len()); + for i in 0..input.len() { + if input.is_null(i) { + // The subfields are expected to be non-nullable according to the parquet variant spec. + metadata_validity.append(true); + value_validity.append(true); + metadata_offsets.push(metadata_current_offset); + value_offsets.push(value_current_offset); + validity.append(false); + } else { + let mut vb = VariantBuilder::new(); + json_to_variant(input_string_array.value(i), &mut vb)?; + let (metadata, value) = vb.finish(); + validity.append(true); + + metadata_current_offset += metadata.len() as i32; + metadata_buffer.extend(metadata); + metadata_offsets.push(metadata_current_offset); + metadata_validity.append(true); + + value_current_offset += value.len() as i32; + value_buffer.extend(value); + value_offsets.push(value_current_offset); + value_validity.append(true); + } + } + let metadata_offsets_buffer = OffsetBuffer::new(ScalarBuffer::from(metadata_offsets)); + let metadata_data_buffer = Buffer::from_vec(metadata_buffer); + let metadata_null_buffer = NullBuffer::new(metadata_validity.finish()); + + let value_offsets_buffer = OffsetBuffer::new(ScalarBuffer::from(value_offsets)); + let value_data_buffer = Buffer::from_vec(value_buffer); + let value_null_buffer = NullBuffer::new(value_validity.finish()); + + let metadata_array = BinaryArray::new( + metadata_offsets_buffer, + metadata_data_buffer, + Some(metadata_null_buffer), + ); + let value_array = BinaryArray::new( + value_offsets_buffer, + value_data_buffer, + Some(value_null_buffer), + ); + + let struct_fields: Vec = vec![Arc::new(metadata_array), Arc::new(value_array)]; + let variant_fields = match variant_arrow_repr() { + DataType::Struct(fields) => fields, + _ => unreachable!("variant_arrow_repr is hard-coded and must match the expected schema"), + }; + let null_buffer = NullBuffer::new(validity.finish()); + Ok(StructArray::new( + variant_fields, + struct_fields, + Some(null_buffer), + )) +} + +#[cfg(test)] +mod test { + use crate::batch_json_string_to_variant; + use arrow::array::{Array, ArrayRef, BinaryArray, StringArray}; + use arrow_schema::ArrowError; + use parquet_variant::{Variant, VariantBuilder}; + use std::sync::Arc; + + #[test] + fn test_batch_json_string_to_variant() -> Result<(), ArrowError> { + let input = StringArray::from(vec![ + Some("1"), + None, + Some("{\"a\": 32}"), + Some("null"), + None, + ]); + let array_ref: ArrayRef = Arc::new(input); + let output = batch_json_string_to_variant(&array_ref).unwrap(); + + let struct_array = &output; + let metadata_array = struct_array + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let value_array = struct_array + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + assert!(!struct_array.is_null(0)); + assert!(struct_array.is_null(1)); + assert!(!struct_array.is_null(2)); + assert!(!struct_array.is_null(3)); + assert!(struct_array.is_null(4)); + + assert_eq!(metadata_array.value(0), &[1, 0, 0]); + assert_eq!(value_array.value(0), &[12, 1]); + + { + let mut vb = VariantBuilder::new(); + let mut ob = vb.new_object(); + ob.insert("a", Variant::Int8(32)); + ob.finish()?; + let (object_metadata, object_value) = vb.finish(); + assert_eq!(metadata_array.value(2), &object_metadata); + assert_eq!(value_array.value(2), &object_value); + } + + assert_eq!(metadata_array.value(3), &[1, 0, 0]); + assert_eq!(value_array.value(3), &[0]); + + // Ensure that the subfields are not actually nullable + assert!(!metadata_array.is_null(1)); + assert!(!value_array.is_null(1)); + assert!(!metadata_array.is_null(4)); + assert!(!value_array.is_null(4)); + Ok(()) + } +} diff --git a/parquet-variant-compute/src/lib.rs b/parquet-variant-compute/src/lib.rs new file mode 100644 index 000000000000..599ba328146e --- /dev/null +++ b/parquet-variant-compute/src/lib.rs @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod from_json; +mod to_json; + +pub use from_json::batch_json_string_to_variant; +pub use to_json::batch_variant_to_json_string; diff --git a/parquet-variant-compute/src/to_json.rs b/parquet-variant-compute/src/to_json.rs new file mode 100644 index 000000000000..c7c4653ac780 --- /dev/null +++ b/parquet-variant-compute/src/to_json.rs @@ -0,0 +1,181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Module for transforming a batch of Variants represented as +//! STRUCT into a batch of JSON strings. + +use arrow::array::{Array, ArrayRef, BinaryArray, BooleanBufferBuilder, StringArray, StructArray}; +use arrow::buffer::{Buffer, NullBuffer, OffsetBuffer, ScalarBuffer}; +use arrow::datatypes::DataType; +use arrow_schema::ArrowError; +use parquet_variant::Variant; +use parquet_variant_json::variant_to_json; + +/// Transform a batch of Variant represented as STRUCT to a batch +/// of JSON strings where nulls are preserved. The JSON strings in the input must be valid. +pub fn batch_variant_to_json_string(input: &ArrayRef) -> Result { + let struct_array = input + .as_any() + .downcast_ref::() + .ok_or_else(|| ArrowError::CastError("Expected StructArray as input".into()))?; + + // Validate field types + let data_type = struct_array.data_type(); + match data_type { + DataType::Struct(inner_fields) => { + if inner_fields.len() != 2 + || inner_fields[0].data_type() != &DataType::Binary + || inner_fields[1].data_type() != &DataType::Binary + { + return Err(ArrowError::CastError( + "Expected struct with two binary fields".into(), + )); + } + } + _ => { + return Err(ArrowError::CastError( + "Expected StructArray with known fields".into(), + )) + } + } + + let metadata_array = struct_array + .column(0) + .as_any() + .downcast_ref::() + .ok_or_else(|| ArrowError::CastError("Expected BinaryArray for 'metadata'".into()))?; + + let value_array = struct_array + .column(1) + .as_any() + .downcast_ref::() + .ok_or_else(|| ArrowError::CastError("Expected BinaryArray for 'value'".into()))?; + + // Zero-copy builder + // The size per JSON string is assumed to be 128 bytes. If this holds true, resizing could be + // minimized for performance. + let mut json_buffer: Vec = Vec::with_capacity(struct_array.len() * 128); + let mut offsets: Vec = Vec::with_capacity(struct_array.len() + 1); + let mut validity = BooleanBufferBuilder::new(struct_array.len()); + let mut current_offset: i32 = 0; + offsets.push(current_offset); + + for i in 0..struct_array.len() { + if struct_array.is_null(i) { + validity.append(false); + offsets.push(current_offset); + } else { + let metadata = metadata_array.value(i); + let value = value_array.value(i); + let variant = Variant::new(metadata, value); + let start_len = json_buffer.len(); + variant_to_json(&mut json_buffer, &variant)?; + let written = (json_buffer.len() - start_len) as i32; + current_offset += written; + offsets.push(current_offset); + validity.append(true); + } + } + + let offsets_buffer = OffsetBuffer::new(ScalarBuffer::from(offsets)); + let value_buffer = Buffer::from_vec(json_buffer); + let null_buffer = NullBuffer::new(validity.finish()); + + Ok(StringArray::new( + offsets_buffer, + value_buffer, + Some(null_buffer), + )) +} + +#[cfg(test)] +mod test { + use crate::batch_variant_to_json_string; + use arrow::array::{Array, ArrayRef, BinaryBuilder, BooleanBufferBuilder, StructArray}; + use arrow::buffer::NullBuffer; + use arrow::datatypes::DataType; + use arrow::datatypes::Field; + use arrow_schema::Fields; + use std::sync::Arc; + + #[test] + fn test_batch_variant_to_json_string() { + let mut metadata_builder = BinaryBuilder::new(); + let mut value_builder = BinaryBuilder::new(); + + // Row 0: [1, 0, 0], [12, 0] + metadata_builder.append_value([1, 0, 0]); + value_builder.append_value([12, 0]); + + // Row 1: null + metadata_builder.append_null(); + value_builder.append_null(); + + // Row 2: [1, 1, 0, 1, 97], [2, 1, 0, 0, 1, 32] + metadata_builder.append_value([1, 1, 0, 1, 97]); + value_builder.append_value([2, 1, 0, 0, 2, 12, 32]); + + // Row 3: [1, 0, 0], [0] + metadata_builder.append_value([1, 0, 0]); + value_builder.append_value([0]); + + // Row 4: null + metadata_builder.append_null(); + value_builder.append_null(); + + let metadata_array = Arc::new(metadata_builder.finish()) as ArrayRef; + let value_array = Arc::new(value_builder.finish()) as ArrayRef; + + let fields: Fields = vec![ + Field::new("metadata", DataType::Binary, true), + Field::new("value", DataType::Binary, true), + ] + .into(); + + let mut validity = BooleanBufferBuilder::new(value_array.len()); + for i in 0..value_array.len() { + let is_valid = value_array.is_valid(i) && metadata_array.is_valid(i); + validity.append(is_valid); + } + let null_buffer = NullBuffer::new(validity.finish()); + + let struct_array = StructArray::new( + fields, + vec![metadata_array.clone(), value_array.clone()], + Some(null_buffer), // Null bitmap (let Arrow infer from children) + ); + + let input = Arc::new(struct_array) as ArrayRef; + + let result = batch_variant_to_json_string(&input).unwrap(); + + // Expected output: ["0", null, "{\"a\":32}", "null", null] + let expected = vec![Some("0"), None, Some("{\"a\":32}"), Some("null"), None]; + + let result_vec: Vec> = (0..result.len()) + .map(|i| { + if result.is_null(i) { + None + } else { + Some(result.value(i)) + } + }) + .collect(); + + assert_eq!(result_vec, expected); + } +} diff --git a/parquet-variant-json/Cargo.toml b/parquet-variant-json/Cargo.toml index 86281e4ae98e..fed480afb4f3 100644 --- a/parquet-variant-json/Cargo.toml +++ b/parquet-variant-json/Cargo.toml @@ -28,8 +28,7 @@ authors = { workspace = true } keywords = ["arrow", "parquet", "variant"] readme = "README.md" edition = { workspace = true } -# needs a newer version than workspace due to -# rror: `Option::::unwrap` is not yet stable as a const fn +# parquet-variant needs newer version than workspace rust-version = "1.83"