-
Notifications
You must be signed in to change notification settings - Fork 1.2k
[arrow-json] support deserializing JSON to variant #8998
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -141,6 +141,8 @@ use std::io::BufRead; | |
| use std::sync::Arc; | ||
|
|
||
| use chrono::Utc; | ||
| #[cfg(feature = "variant_experimental")] | ||
| use parquet_variant_compute::VariantType; | ||
| use serde_core::Serialize; | ||
|
|
||
| use arrow_array::timezone::Tz; | ||
|
|
@@ -162,6 +164,9 @@ use crate::reader::struct_array::StructArrayDecoder; | |
| use crate::reader::tape::{Tape, TapeDecoder}; | ||
| use crate::reader::timestamp_array::TimestampArrayDecoder; | ||
|
|
||
| #[cfg(feature = "variant_experimental")] | ||
| use crate::reader::variant_array::VariantArrayDecoder; | ||
|
|
||
| mod binary_array; | ||
| mod boolean_array; | ||
| mod decimal_array; | ||
|
|
@@ -176,6 +181,8 @@ mod string_view_array; | |
| mod struct_array; | ||
| mod tape; | ||
| mod timestamp_array; | ||
| #[cfg(feature = "variant_experimental")] | ||
| mod variant_array; | ||
|
|
||
| /// A builder for [`Reader`] and [`Decoder`] | ||
| pub struct ReaderBuilder { | ||
|
|
@@ -304,6 +311,7 @@ impl ReaderBuilder { | |
| }; | ||
|
|
||
| let decoder = make_decoder( | ||
| None, | ||
| data_type, | ||
| self.coerce_primitive, | ||
| self.strict_mode, | ||
|
|
@@ -686,12 +694,18 @@ macro_rules! primitive_decoder { | |
| } | ||
|
|
||
| fn make_decoder( | ||
| field: Option<FieldRef>, | ||
| data_type: DataType, | ||
| coerce_primitive: bool, | ||
| strict_mode: bool, | ||
| is_nullable: bool, | ||
|
Comment on lines
+697
to
701
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems very awkward, to take a Especially when
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But it looks like nullability in particular is not always directly taken from a field (nested struct case below), and the data type is not always from a field either (decoder builder above). So maybe we should consider passing the field's metadata instead? But I don't know if we have extension type support directly on metadata (or if it requires a Field instance to work with)? |
||
| struct_mode: StructMode, | ||
| ) -> Result<Box<dyn ArrayDecoder>, ArrowError> { | ||
| #[cfg(feature = "variant_experimental")] | ||
| if let Some(field) = field && field.try_extension_type::<VariantType>().is_ok() { | ||
| return Ok(Box::new(VariantArrayDecoder{})); | ||
| } | ||
|
|
||
| downcast_integer! { | ||
| data_type => (primitive_decoder, data_type), | ||
| DataType::Null => Ok(Box::<NullArrayDecoder>::default()), | ||
|
|
@@ -758,6 +772,7 @@ fn make_decoder( | |
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use parquet_variant::VariantBuilder; | ||
| use serde_json::json; | ||
| use std::fs::File; | ||
| use std::io::{BufReader, Cursor, Seek}; | ||
|
|
@@ -2771,6 +2786,106 @@ mod tests { | |
| ); | ||
| } | ||
|
|
||
| #[cfg(feature = "variant_experimental")] | ||
| #[test] | ||
| fn test_variant() { | ||
| use parquet_variant::Variant; | ||
| use parquet_variant_compute::VariantArrayBuilder; | ||
|
|
||
| let do_test = |json_input: &str, ids: Vec<i32>, variants: Vec<Variant>| { | ||
| let variant_array = VariantArrayBuilder::new(0).build(); | ||
|
|
||
| let struct_field = Schema::new(vec![ | ||
| Field::new("id", DataType::Int32, false), | ||
| // call VariantArray::field to get the correct Field | ||
| variant_array.field("var"), | ||
| ]); | ||
|
|
||
| let builder = ReaderBuilder::new(Arc::new(struct_field.clone())); | ||
| let result = builder | ||
| .with_struct_mode(StructMode::ObjectOnly) | ||
| .build(Cursor::new(json_input.as_bytes())) | ||
| .unwrap() | ||
| .next() | ||
| .unwrap() | ||
| .unwrap(); | ||
|
|
||
| let int_array = arrow_array::array::Int32Array::from(ids); | ||
|
|
||
| let variant_array = { | ||
| let mut variant_builder = VariantArrayBuilder::new(variants.len()); | ||
| for v in variants { | ||
| variant_builder.append_variant(v); | ||
| } | ||
| variant_builder.build() | ||
| }; | ||
|
|
||
| let variant_struct_array: StructArray = variant_array.into(); | ||
|
|
||
| let expected = RecordBatch::try_new( | ||
| struct_field.into(), | ||
| vec![Arc::new(int_array), Arc::new(variant_struct_array)], | ||
| ) | ||
| .unwrap(); | ||
|
|
||
| assert_eq!(result, expected); | ||
| }; | ||
|
|
||
| do_test( | ||
| "{\"id\": 1, \"var\": \"a\"}\n{\"id\": 2, \"var\": \"b\"}", | ||
| vec![1, 2], | ||
| vec![Variant::from("a"), Variant::from("b")], | ||
| ); | ||
|
|
||
| let mut builder = VariantBuilder::new(); | ||
| let mut object_builder = builder.new_object(); | ||
| object_builder.insert("int64", Variant::Int64(1)); | ||
| object_builder.insert("double", Variant::Double(1.0)); | ||
| object_builder.insert("null", Variant::Null); | ||
| object_builder.insert("true", Variant::BooleanTrue); | ||
| object_builder.insert("false", Variant::BooleanFalse); | ||
| object_builder.insert("string", Variant::from("a")); | ||
| object_builder.finish(); | ||
| let (metadata, value) = builder.finish(); | ||
| let variant = Variant::try_new(&metadata, &value).unwrap(); | ||
|
|
||
| do_test( | ||
| "{\"id\": 1, \"var\": {\"int64\": 1, \"double\": 1.0, \"null\": null, \"true\": true, \"false\": false, \"string\": \"a\"}}", | ||
| vec![1], | ||
| vec![variant], | ||
| ); | ||
|
|
||
| // nested structs | ||
| let mut builder = VariantBuilder::new(); | ||
| let mut object_builder = builder.new_object(); | ||
| { | ||
| let mut list_builder = object_builder.new_list("somelist"); | ||
| { | ||
| let mut nested_object_builder = list_builder.new_object(); | ||
| nested_object_builder.insert("num", Variant::Int64(2)); | ||
| nested_object_builder.finish(); | ||
| } | ||
| { | ||
| let mut nested_object_builder = list_builder.new_object(); | ||
| nested_object_builder.insert("num", Variant::Int64(3)); | ||
| nested_object_builder.finish(); | ||
| } | ||
| list_builder.finish(); | ||
| object_builder.insert("scalar", Variant::from("a")); | ||
| } | ||
| object_builder.finish(); | ||
|
|
||
| let (metadata, value) = builder.finish(); | ||
| let variant = Variant::try_new(&metadata, &value).unwrap(); | ||
|
|
||
| do_test( | ||
| "{\"id\": 1, \"var\": {\"somelist\": [{\"num\": 2}, {\"num\": 3}], \"scalar\": \"a\"}}", | ||
| vec![1], | ||
| vec![variant], | ||
| ); | ||
| } | ||
|
|
||
|
|
||
| #[test] | ||
| fn test_decode_list_struct_with_wrong_types() { | ||
| let int_field = Field::new("a", DataType::Int32, true); | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,99 @@ | ||||||||||||||||||||||||||||||||||||
| // Licensed to the Apache Software Foundation (ASF) under one | ||||||||||||||||||||||||||||||||||||
| // or more contributor license agreements. See the NOTICE file | ||||||||||||||||||||||||||||||||||||
| // distributed with this work for additional information | ||||||||||||||||||||||||||||||||||||
| // regarding copyright ownership. The ASF licenses this file | ||||||||||||||||||||||||||||||||||||
| // to you under the Apache License, Version 2.0 (the | ||||||||||||||||||||||||||||||||||||
| // "License"); you may not use this file except in compliance | ||||||||||||||||||||||||||||||||||||
| // with the License. You may obtain a copy of the License at | ||||||||||||||||||||||||||||||||||||
| // | ||||||||||||||||||||||||||||||||||||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||||||||||||||||||||||||||
| // | ||||||||||||||||||||||||||||||||||||
| // Unless required by applicable law or agreed to in writing, | ||||||||||||||||||||||||||||||||||||
| // software distributed under the License is distributed on an | ||||||||||||||||||||||||||||||||||||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||||||||||||||||||||||||||||||||||||
| // KIND, either express or implied. See the License for the | ||||||||||||||||||||||||||||||||||||
| // specific language governing permissions and limitations | ||||||||||||||||||||||||||||||||||||
| // under the License. | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| use arrow_array::{Array, StructArray}; | ||||||||||||||||||||||||||||||||||||
| use parquet_variant::{ObjectFieldBuilder, Variant, VariantBuilder, VariantBuilderExt}; | ||||||||||||||||||||||||||||||||||||
| use parquet_variant_compute::VariantArrayBuilder; | ||||||||||||||||||||||||||||||||||||
| use arrow_data::ArrayData; | ||||||||||||||||||||||||||||||||||||
| use arrow_schema::ArrowError; | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| use crate::reader::ArrayDecoder; | ||||||||||||||||||||||||||||||||||||
| use crate::reader::tape::{Tape, TapeElement}; | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| #[derive(Default)] | ||||||||||||||||||||||||||||||||||||
| pub struct VariantArrayDecoder {} | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| impl ArrayDecoder for VariantArrayDecoder { | ||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is so great |
||||||||||||||||||||||||||||||||||||
| fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError> { | ||||||||||||||||||||||||||||||||||||
| let mut array_builder = VariantArrayBuilder::new(pos.len()); | ||||||||||||||||||||||||||||||||||||
| for p in pos { | ||||||||||||||||||||||||||||||||||||
| let mut builder = VariantBuilder::new(); | ||||||||||||||||||||||||||||||||||||
| variant_from_tape_element(&mut builder, *p, tape)?; | ||||||||||||||||||||||||||||||||||||
| let (metadata, value) = builder.finish(); | ||||||||||||||||||||||||||||||||||||
| array_builder.append_value(Variant::new(&metadata, &value)); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| let variant_struct_array: StructArray = array_builder.build().into(); | ||||||||||||||||||||||||||||||||||||
| Ok(variant_struct_array.into_data()) | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| fn variant_from_tape_element(builder: &mut impl VariantBuilderExt, mut p: u32, tape: &Tape) -> Result<u32, ArrowError> { | ||||||||||||||||||||||||||||||||||||
| match tape.get(p) { | ||||||||||||||||||||||||||||||||||||
| TapeElement::StartObject(end_idx) => { | ||||||||||||||||||||||||||||||||||||
| let mut object_builder = builder.try_new_object()?; | ||||||||||||||||||||||||||||||||||||
| p += 1; | ||||||||||||||||||||||||||||||||||||
| while p < end_idx { | ||||||||||||||||||||||||||||||||||||
| // Read field name | ||||||||||||||||||||||||||||||||||||
| let field_name = match tape.get(p) { | ||||||||||||||||||||||||||||||||||||
| TapeElement::String(s) => tape.get_string(s), | ||||||||||||||||||||||||||||||||||||
| _ => return Err(tape.error(p, "field name")), | ||||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| let mut field_builder = ObjectFieldBuilder::new(field_name, &mut object_builder); | ||||||||||||||||||||||||||||||||||||
| p = tape.next(p, "field value")?; | ||||||||||||||||||||||||||||||||||||
| p = variant_from_tape_element(&mut field_builder, p, tape)?; | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| object_builder.finish(); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| TapeElement::EndObject(_u32) => unreachable!(), | ||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it truly unreachable, even for invalid input JSON? Does the tape decoder have enough invariant checking to prove that this can never arise? What about bugs in the decoding process or tape navigation? (perhaps it's better to just return an Err here instead of panicking) |
||||||||||||||||||||||||||||||||||||
| TapeElement::StartList(end_idx) => { | ||||||||||||||||||||||||||||||||||||
| let mut list_builder = builder.try_new_list()?; | ||||||||||||||||||||||||||||||||||||
| p+= 1; | ||||||||||||||||||||||||||||||||||||
| while p < end_idx { | ||||||||||||||||||||||||||||||||||||
| p = variant_from_tape_element(&mut list_builder, p, tape)?; | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| list_builder.finish(); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| TapeElement::EndList(_u32) => unreachable!(), | ||||||||||||||||||||||||||||||||||||
| TapeElement::String(idx) => builder.append_value(tape.get_string(idx)), | ||||||||||||||||||||||||||||||||||||
| TapeElement::Number(idx) => { | ||||||||||||||||||||||||||||||||||||
| let s = tape.get_string(idx); | ||||||||||||||||||||||||||||||||||||
| builder.append_value(parse_number(s)?) | ||||||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||||||
| TapeElement::I64(i) => builder.append_value(i), | ||||||||||||||||||||||||||||||||||||
| TapeElement::I32(i) => builder.append_value(i), | ||||||||||||||||||||||||||||||||||||
| TapeElement::F64(f) => builder.append_value(f), | ||||||||||||||||||||||||||||||||||||
| TapeElement::F32(f) => builder.append_value(f), | ||||||||||||||||||||||||||||||||||||
| TapeElement::True => builder.append_value(true), | ||||||||||||||||||||||||||||||||||||
| TapeElement::False => builder.append_value(false), | ||||||||||||||||||||||||||||||||||||
| TapeElement::Null => builder.append_value(Variant::Null), | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| p += 1; | ||||||||||||||||||||||||||||||||||||
| Ok(p) | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| fn parse_number<'a, 'b>(s: &'a str) -> Result<Variant<'a, 'b>, ArrowError> { | ||||||||||||||||||||||||||||||||||||
| match lexical_core::parse::<i64>(s.as_bytes()) { | ||||||||||||||||||||||||||||||||||||
| Ok(v) => Ok(Variant::from(v)), | ||||||||||||||||||||||||||||||||||||
| Err(_) => { | ||||||||||||||||||||||||||||||||||||
| match lexical_core::parse::<f64>(s.as_bytes()) { | ||||||||||||||||||||||||||||||||||||
| Ok(v) => Ok(Variant::from(v)), | ||||||||||||||||||||||||||||||||||||
| Err(_) => Err(ArrowError::JsonError(format!("failed to parse {s} as number"))), | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
Comment on lines
+90
to
+98
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: a few ideas to simplify the code
Suggested change
|
||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we care about missing newline at EOF? |
||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah this is kind of wacky -- that arrow now depends on some sub part of parquet