From fce19eb95b827e04a37918d90f5d2583edd7ed75 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 18 Jul 2025 11:29:33 -0400 Subject: [PATCH 1/5] Fix Cargo for benchmark --- parquet-variant-compute/src/from_json.rs | 7 +- parquet-variant-compute/src/lib.rs | 2 +- .../src/variant_array_builder.rs | 277 +++++++++++++++++- parquet-variant-json/src/from_json.rs | 25 +- parquet-variant/src/builder.rs | 12 +- 5 files changed, 291 insertions(+), 32 deletions(-) diff --git a/parquet-variant-compute/src/from_json.rs b/parquet-variant-compute/src/from_json.rs index df4d7c2753ef..05207d094a25 100644 --- a/parquet-variant-compute/src/from_json.rs +++ b/parquet-variant-compute/src/from_json.rs @@ -21,7 +21,6 @@ use crate::{VariantArray, VariantArrayBuilder}; use arrow::array::{Array, ArrayRef, StringArray}; use arrow_schema::ArrowError; -use parquet_variant::VariantBuilder; use parquet_variant_json::json_to_variant; /// Parse a batch of JSON strings into a batch of Variants represented as @@ -41,10 +40,10 @@ pub fn batch_json_string_to_variant(input: &ArrayRef) -> Result VariantArrayVariantBuilder { + // append directly into the metadata and value buffers + let metadata_buffer = std::mem::take(&mut self.metadata_buffer); + let value_buffer = std::mem::take(&mut self.value_buffer); + VariantArrayVariantBuilder::new(self, metadata_buffer, value_buffer) + } +} + +/// A `VariantBuilder` that writes directly to the buffers of a `VariantArrayBuilder`. +/// +/// Note this struct implements [`VariantBuilderExt`], so it can be used +/// as a drop-in replacement for [`VariantBuilder`] in most cases. +/// +/// If [`Self::finish`] is not called, any changes will be rolled back +/// +/// See [`VariantArrayBuilder::variant_builder`] for an example +pub struct VariantArrayVariantBuilder<'a> { + /// was finish called? + finished: bool, + /// starting offset in the variant_builder's `metadata` buffer + metadata_offset: usize, + /// starting offset in the variant_builder's `value` buffer + value_offset: usize, + /// Parent array builder that this variant builder writes to. Buffers + /// have been moved into the variant builder, and must be returned on + /// drop + array_builder: &'a mut VariantArrayBuilder, + /// Builder for the in progress variant value, temporarily owns the buffers + /// from `array_builder` + variant_builder: VariantBuilder, +} + +impl<'a> VariantBuilderExt for VariantArrayVariantBuilder<'a> { + fn append_value<'m, 'v>(&mut self, value: impl Into>) { + self.variant_builder.append_value(value); + } + + fn new_list(&mut self) -> ListBuilder { + self.variant_builder.new_list() + } + + fn new_object(&mut self) -> ObjectBuilder { + self.variant_builder.new_object() + } +} + +impl<'a> VariantArrayVariantBuilder<'a> { + /// Constructs a new VariantArrayVariantBuilder + /// + /// Note this is not public as this is a structure that is logically + /// part of the [`VariantArrayBuilder`] and relies on its internal structure + fn new( + array_builder: &'a mut VariantArrayBuilder, + metadata_buffer: Vec, + value_buffer: Vec, + ) -> Self { + let metadata_offset = metadata_buffer.len(); + let value_offset = value_buffer.len(); + VariantArrayVariantBuilder { + finished: false, + metadata_offset, + value_offset, + variant_builder: VariantBuilder::new_with_buffers(metadata_buffer, value_buffer), + array_builder, + } + } + + /// Return a reference to the underlying `VariantBuilder` + pub fn inner(&self) -> &VariantBuilder { + &self.variant_builder + } + + /// Return a mutable reference to the underlying `VariantBuilder` + pub fn inner_mut(&mut self) -> &mut VariantBuilder { + &mut self.variant_builder + } + + /// Called to finish the in progress variant and write it to the underlying + /// buffers + /// + /// Note if you do not call finish, on drop any changes made to the + /// underlying buffers will be rolled back. + pub fn finish(mut self) { + self.finished = true; + // Note: buffers are returned and replaced in the drop impl + } +} + +impl<'a> Drop for VariantArrayVariantBuilder<'a> { + /// If the builder was not finished, roll back any changes made to the + /// underlying buffers (by truncating them) + fn drop(&mut self) { + let metadata_offset = self.metadata_offset; + let value_offset = self.value_offset; + + // get the buffers back from the variant builder + let (mut metadata_buffer, mut value_buffer) = + std::mem::take(&mut self.variant_builder).finish(); + + // Sanity Check: if the buffers got smaller, something went wrong (previous data was lost) + let metadata_len = metadata_buffer + .len() + .checked_sub(metadata_offset) + .expect("metadata length decreased unexpectedly"); + let value_len = value_buffer + .len() + .checked_sub(value_offset) + .expect("value length decreased unexpectedly"); + + if self.finished { + // if the object was finished, commit the changes by putting the + // offsets and lengths into the parent array builder. + self.array_builder + .metadata_locations + .push((metadata_offset, metadata_len)); + self.array_builder + .value_locations + .push((value_offset, value_len)); + self.array_builder.nulls.append_non_null(); + } else { + // if the object was not finished, truncate the buffers to the + // original offsets to roll back any changes. Note this is fast + // because truncate doesn't free any memory: it just has to drop + // elements (and u8 doesn't have a destructor) + metadata_buffer.truncate(metadata_offset); + value_buffer.truncate(value_offset); + } + + // put the buffers back into the array builder + self.array_builder.metadata_buffer = metadata_buffer; + self.array_builder.value_buffer = value_buffer; + } } fn binary_view_array_from_buffers( @@ -220,4 +390,91 @@ mod test { ); } } + + /// Test using sub builders to append variants + #[test] + fn test_variant_array_builder_variant_builder() { + let mut builder = VariantArrayBuilder::new(10); + builder.append_null(); // should not panic + builder.append_variant(Variant::from(42i32)); + + // let's make a sub-object in the next row + let mut sub_builder = builder.variant_builder(); + sub_builder + .new_object() + .with_field("foo", "bar") + .finish() + .unwrap(); + sub_builder.finish(); // must call finish to write the variant to the buffers + + // append a new list + let mut sub_builder = builder.variant_builder(); + sub_builder + .new_list() + .with_value(Variant::from(1i32)) + .with_value(Variant::from(2i32)) + .finish(); + sub_builder.finish(); + let variant_array = builder.build(); + + assert_eq!(variant_array.len(), 4); + assert!(variant_array.is_null(0)); + assert!(!variant_array.is_null(1)); + assert_eq!(variant_array.value(1), Variant::from(42i32)); + assert!(!variant_array.is_null(2)); + let variant = variant_array.value(2); + let variant = variant.as_object().expect("variant to be an object"); + assert_eq!(variant.get("foo").unwrap(), Variant::from("bar")); + assert!(!variant_array.is_null(3)); + let variant = variant_array.value(3); + let list = variant.as_list().expect("variant to be a list"); + assert_eq!(list.len(), 2); + } + + /// Test using non-finished sub builders to append variants + #[test] + fn test_variant_array_builder_variant_builder_reset() { + let mut builder = VariantArrayBuilder::new(10); + + // make a sub-object in the first row + let mut sub_builder = builder.variant_builder(); + sub_builder + .new_object() + .with_field("foo", 1i32) + .finish() + .unwrap(); + sub_builder.finish(); // must call finish to write the variant to the buffers + + // start appending an object but don't finish + let mut sub_builder = builder.variant_builder(); + sub_builder + .new_object() + .with_field("bar", 2i32) + .finish() + .unwrap(); + drop(sub_builder); // drop the sub builder without finishing it + + // make a third sub-object (this should reset the previous unfinished object) + let mut sub_builder = builder.variant_builder(); + sub_builder + .new_object() + .with_field("baz", 3i32) + .finish() + .unwrap(); + sub_builder.finish(); // must call finish to write the variant to the buffers + + let variant_array = builder.build(); + + // only the two finished objects should be present + assert_eq!(variant_array.len(), 2); + assert!(!variant_array.is_null(0)); + let variant = variant_array.value(0); + let variant = variant.as_object().expect("variant to be an object"); + assert_eq!(variant.get("foo").unwrap(), Variant::from(1i32)); + + assert!(!variant_array.is_null(1)); + let variant = variant_array.value(1); + let variant = variant.as_object().expect("variant to be an object"); + assert_eq!(variant.get("baz").unwrap(), Variant::from(3i32)); + } } diff --git a/parquet-variant-json/src/from_json.rs b/parquet-variant-json/src/from_json.rs index 3052bc504dee..355bd8008777 100644 --- a/parquet-variant-json/src/from_json.rs +++ b/parquet-variant-json/src/from_json.rs @@ -18,22 +18,28 @@ //! Module for parsing JSON strings as Variant use arrow_schema::ArrowError; -use parquet_variant::{ListBuilder, ObjectBuilder, Variant, VariantBuilder, VariantBuilderExt}; +use parquet_variant::{ListBuilder, ObjectBuilder, Variant, VariantBuilderExt}; use serde_json::{Number, Value}; -/// Converts a JSON string to Variant using [`VariantBuilder`]. The resulting `value` and `metadata` -/// buffers can be extracted using `builder.finish()` +/// Converts a JSON string to Variant to a [`VariantBuilderExt`], such as +/// [`VariantBuilder`]. +/// +/// The resulting `value` and `metadata` buffers can be +/// extracted using `builder.finish()` /// /// # Arguments /// * `json` - The JSON string to parse as Variant. /// * `variant_builder` - Object of type `VariantBuilder` used to build the vatiant from the JSON /// string /// +/// /// # Returns /// /// * `Ok(())` if successful /// * `Err` with error details if the conversion fails /// +/// [`VariantBuilder`]: parquet_variant::VariantBuilder +/// /// ```rust /// # use parquet_variant::VariantBuilder; /// # use parquet_variant_json::{ @@ -62,7 +68,7 @@ use serde_json::{Number, Value}; /// assert_eq!(json_result, serde_json::to_string(&json_value)?); /// # Ok::<(), Box>(()) /// ``` -pub fn json_to_variant(json: &str, builder: &mut VariantBuilder) -> Result<(), ArrowError> { +pub fn json_to_variant(json: &str, builder: &mut impl VariantBuilderExt) -> Result<(), ArrowError> { let json: Value = serde_json::from_str(json) .map_err(|e| ArrowError::InvalidArgumentError(format!("JSON format error: {e}")))?; @@ -70,7 +76,7 @@ pub fn json_to_variant(json: &str, builder: &mut VariantBuilder) -> Result<(), A Ok(()) } -fn build_json(json: &Value, builder: &mut VariantBuilder) -> Result<(), ArrowError> { +fn build_json(json: &Value, builder: &mut impl VariantBuilderExt) -> Result<(), ArrowError> { append_json(json, builder)?; Ok(()) } @@ -99,10 +105,7 @@ fn variant_from_number<'m, 'v>(n: &Number) -> Result, ArrowError } } -fn append_json<'m, 'v>( - json: &'v Value, - builder: &mut impl VariantBuilderExt<'m, 'v>, -) -> Result<(), ArrowError> { +fn append_json(json: &Value, builder: &mut impl VariantBuilderExt) -> Result<(), ArrowError> { match json { Value::Null => builder.append_value(Variant::Null), Value::Bool(b) => builder.append_value(*b), @@ -137,8 +140,8 @@ struct ObjectFieldBuilder<'o, 'v, 's> { builder: &'o mut ObjectBuilder<'v>, } -impl<'m, 'v> VariantBuilderExt<'m, 'v> for ObjectFieldBuilder<'_, '_, '_> { - fn append_value(&mut self, value: impl Into>) { +impl VariantBuilderExt for ObjectFieldBuilder<'_, '_, '_> { + fn append_value<'m, 'v>(&mut self, value: impl Into>) { self.builder.insert(self.key, value); } diff --git a/parquet-variant/src/builder.rs b/parquet-variant/src/builder.rs index d0eb4872e442..800eec2013b8 100644 --- a/parquet-variant/src/builder.rs +++ b/parquet-variant/src/builder.rs @@ -1324,16 +1324,16 @@ impl Drop for ObjectBuilder<'_> { /// /// Allows users to append values to a [`VariantBuilder`], [`ListBuilder`] or /// [`ObjectBuilder`]. using the same interface. -pub trait VariantBuilderExt<'m, 'v> { - fn append_value(&mut self, value: impl Into>); +pub trait VariantBuilderExt { + fn append_value<'m, 'v>(&mut self, value: impl Into>); fn new_list(&mut self) -> ListBuilder; fn new_object(&mut self) -> ObjectBuilder; } -impl<'m, 'v> VariantBuilderExt<'m, 'v> for ListBuilder<'_> { - fn append_value(&mut self, value: impl Into>) { +impl VariantBuilderExt for ListBuilder<'_> { + fn append_value<'m, 'v>(&mut self, value: impl Into>) { self.append_value(value); } @@ -1346,8 +1346,8 @@ impl<'m, 'v> VariantBuilderExt<'m, 'v> for ListBuilder<'_> { } } -impl<'m, 'v> VariantBuilderExt<'m, 'v> for VariantBuilder { - fn append_value(&mut self, value: impl Into>) { +impl VariantBuilderExt for VariantBuilder { + fn append_value<'m, 'v>(&mut self, value: impl Into>) { self.append_value(value); } From 23be4558ab09779352cbd6ee49514c0e929d9f6c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 22 Jul 2025 07:56:21 -0400 Subject: [PATCH 2/5] Update parquet-variant-json/src/from_json.rs Co-authored-by: Congxian Qiu --- parquet-variant-json/src/from_json.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet-variant-json/src/from_json.rs b/parquet-variant-json/src/from_json.rs index 355bd8008777..67b69186068d 100644 --- a/parquet-variant-json/src/from_json.rs +++ b/parquet-variant-json/src/from_json.rs @@ -29,7 +29,7 @@ use serde_json::{Number, Value}; /// /// # Arguments /// * `json` - The JSON string to parse as Variant. -/// * `variant_builder` - Object of type `VariantBuilder` used to build the vatiant from the JSON +/// * `variant_builder` - Object of type `VariantBuilder` used to build the variant from the JSON /// string /// /// From 99ea0c4fab3ad1c1509db2d150487fff2455b4ce Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 22 Jul 2025 18:16:18 -0400 Subject: [PATCH 3/5] Remove append_variant_buffers --- .../src/variant_array_builder.rs | 42 +++++-------------- 1 file changed, 10 insertions(+), 32 deletions(-) diff --git a/parquet-variant-compute/src/variant_array_builder.rs b/parquet-variant-compute/src/variant_array_builder.rs index b4cd222c381f..f1d510f12446 100644 --- a/parquet-variant-compute/src/variant_array_builder.rs +++ b/parquet-variant-compute/src/variant_array_builder.rs @@ -45,24 +45,17 @@ use std::sync::Arc; /// builder.append_variant(Variant::from(42)); /// // append a null row (note not a Variant::Null) /// builder.append_null(); -/// // append a pre-constructed metadata and value buffers -/// let (metadata, value) = { -/// let mut vb = VariantBuilder::new(); -/// let mut obj = vb.new_object(); -/// obj.insert("foo", "bar"); -/// obj.finish().unwrap(); -/// vb.finish() -/// }; -/// builder.append_variant_buffers(&metadata, &value); -/// -/// // Use `variant_builder` method to write values directly to the output array +/// // append an object to the builder /// let mut vb = builder.variant_builder(); -/// vb.append_value("Hello, World!"); -/// vb.finish(); // Note: call finish to write the variant to the buffers +/// vb.new_object() +/// .with_field("foo", "bar") +/// .finish() +/// .unwrap(); +/// vb.finish(); // must call finish to write the variant to the buffers /// /// // create the final VariantArray /// let variant_array = builder.build(); -/// assert_eq!(variant_array.len(), 4); +/// assert_eq!(variant_array.len(), 3); /// // // Access the values /// // row 1 is not null and is an integer /// assert!(!variant_array.is_null(0)); @@ -71,10 +64,9 @@ use std::sync::Arc; /// assert!(variant_array.is_null(1)); /// // row 2 is not null and is an object /// assert!(!variant_array.is_null(2)); -/// assert!(variant_array.value(2).as_object().is_some()); -/// // row 3 is a string -/// assert!(!variant_array.is_null(3)); -/// assert_eq!(variant_array.value(3), Variant::from("Hello, World!")); +/// let value = variant_array.value(2); +/// let obj = value.as_object().expect("expected object"); +/// assert_eq!(obj.get("foo"), Some(Variant::from("bar"))); /// ``` #[derive(Debug)] pub struct VariantArrayBuilder { @@ -160,20 +152,6 @@ impl VariantArrayBuilder { direct_builder.finish() } - /// Append a metadata and values buffer to the builder - pub fn append_variant_buffers(&mut self, metadata: &[u8], value: &[u8]) { - self.nulls.append_non_null(); - let metadata_length = metadata.len(); - let metadata_offset = self.metadata_buffer.len(); - self.metadata_locations - .push((metadata_offset, metadata_length)); - self.metadata_buffer.extend_from_slice(metadata); - let value_length = value.len(); - let value_offset = self.value_buffer.len(); - self.value_locations.push((value_offset, value_length)); - self.value_buffer.extend_from_slice(value); - } - /// Return a `VariantArrayVariantBuilder` that writes directly to the /// buffers of this builder. /// From 99200066346d3d7a2a0473dcb6650dd1811008b7 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 22 Jul 2025 18:22:54 -0400 Subject: [PATCH 4/5] Apply suggestions from code review Co-authored-by: Liang-Chi Hsieh --- parquet-variant-compute/src/variant_array_builder.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/parquet-variant-compute/src/variant_array_builder.rs b/parquet-variant-compute/src/variant_array_builder.rs index f1d510f12446..2950028bf3ec 100644 --- a/parquet-variant-compute/src/variant_array_builder.rs +++ b/parquet-variant-compute/src/variant_array_builder.rs @@ -193,10 +193,10 @@ impl VariantArrayBuilder { } } -/// A `VariantBuilder` that writes directly to the buffers of a `VariantArrayBuilder`. +/// A `VariantBuilderExt` that writes directly to the buffers of a `VariantArrayBuilder`. /// -/// Note this struct implements [`VariantBuilderExt`], so it can be used -/// as a drop-in replacement for [`VariantBuilder`] in most cases. +// This struct implements [`VariantBuilderExt`], so in most cases it can be used as a +// [`VariantBuilder`] to perform variant-related operations for [`VariantArrayBuilder`]. /// /// If [`Self::finish`] is not called, any changes will be rolled back /// From 65714e5250614265ab6b5de2d15095f1b4bed477 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 22 Jul 2025 18:40:44 -0400 Subject: [PATCH 5/5] Do not finish in drop --- .../src/variant_array_builder.rs | 65 ++++++++++++------- parquet-variant/src/builder.rs | 17 +++++ 2 files changed, 59 insertions(+), 23 deletions(-) diff --git a/parquet-variant-compute/src/variant_array_builder.rs b/parquet-variant-compute/src/variant_array_builder.rs index f1d510f12446..207cd3beb713 100644 --- a/parquet-variant-compute/src/variant_array_builder.rs +++ b/parquet-variant-compute/src/variant_array_builder.rs @@ -269,7 +269,34 @@ impl<'a> VariantArrayVariantBuilder<'a> { /// underlying buffers will be rolled back. pub fn finish(mut self) { self.finished = true; - // Note: buffers are returned and replaced in the drop impl + + let metadata_offset = self.metadata_offset; + let value_offset = self.value_offset; + // get the buffers back from the variant builder + let (metadata_buffer, value_buffer) = std::mem::take(&mut self.variant_builder).finish(); + + // Sanity Check: if the buffers got smaller, something went wrong (previous data was lost) + let metadata_len = metadata_buffer + .len() + .checked_sub(metadata_offset) + .expect("metadata length decreased unexpectedly"); + let value_len = value_buffer + .len() + .checked_sub(value_offset) + .expect("value length decreased unexpectedly"); + + // commit the changes by putting the + // offsets and lengths into the parent array builder. + self.array_builder + .metadata_locations + .push((metadata_offset, metadata_len)); + self.array_builder + .value_locations + .push((value_offset, value_len)); + self.array_builder.nulls.append_non_null(); + // put the buffers back into the array builder + self.array_builder.metadata_buffer = metadata_buffer; + self.array_builder.value_buffer = value_buffer; } } @@ -277,41 +304,33 @@ impl<'a> Drop for VariantArrayVariantBuilder<'a> { /// If the builder was not finished, roll back any changes made to the /// underlying buffers (by truncating them) fn drop(&mut self) { + if self.finished { + return; + } + + // if the object was not finished, need to rollback any changes by + // truncating the buffers to the original offsets let metadata_offset = self.metadata_offset; let value_offset = self.value_offset; // get the buffers back from the variant builder let (mut metadata_buffer, mut value_buffer) = - std::mem::take(&mut self.variant_builder).finish(); + std::mem::take(&mut self.variant_builder).into_buffers(); - // Sanity Check: if the buffers got smaller, something went wrong (previous data was lost) - let metadata_len = metadata_buffer + // Sanity Check: if the buffers got smaller, something went wrong (previous data was lost) so panic immediately + metadata_buffer .len() .checked_sub(metadata_offset) .expect("metadata length decreased unexpectedly"); - let value_len = value_buffer + value_buffer .len() .checked_sub(value_offset) .expect("value length decreased unexpectedly"); - if self.finished { - // if the object was finished, commit the changes by putting the - // offsets and lengths into the parent array builder. - self.array_builder - .metadata_locations - .push((metadata_offset, metadata_len)); - self.array_builder - .value_locations - .push((value_offset, value_len)); - self.array_builder.nulls.append_non_null(); - } else { - // if the object was not finished, truncate the buffers to the - // original offsets to roll back any changes. Note this is fast - // because truncate doesn't free any memory: it just has to drop - // elements (and u8 doesn't have a destructor) - metadata_buffer.truncate(metadata_offset); - value_buffer.truncate(value_offset); - } + // Note this truncate is fast because truncate doesn't free any memory: + // it just has to drop elements (and u8 doesn't have a destructor) + metadata_buffer.truncate(metadata_offset); + value_buffer.truncate(value_offset); // put the buffers back into the array builder self.array_builder.metadata_buffer = metadata_buffer; diff --git a/parquet-variant/src/builder.rs b/parquet-variant/src/builder.rs index 800eec2013b8..947a929cf5fd 100644 --- a/parquet-variant/src/builder.rs +++ b/parquet-variant/src/builder.rs @@ -502,6 +502,11 @@ impl MetadataBuilder { metadata_buffer } + + /// Return the inner buffer, without finalizing any in progress metadata. + pub(crate) fn take_buffer(self) -> Vec { + self.metadata_buffer + } } impl> FromIterator for MetadataBuilder { @@ -1002,6 +1007,18 @@ impl VariantBuilder { pub fn finish(self) -> (Vec, Vec) { (self.metadata_builder.finish(), self.buffer.into_inner()) } + + /// Return the inner metadata buffers and value buffer. + /// + /// This can be used to get the underlying buffers provided via + /// [`VariantBuilder::new_with_buffers`] without finalizing the metadata or + /// values (for rolling back changes). + pub fn into_buffers(self) -> (Vec, Vec) { + ( + self.metadata_builder.take_buffer(), + self.buffer.into_inner(), + ) + } } /// A builder for creating [`Variant::List`] values.