Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions arrow-json/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,18 @@ bench = false
[package.metadata.docs.rs]
all-features = true

[features]
default = []
variant_experimental = []

[dependencies]
arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
arrow-cast = { workspace = true }
arrow-data = { workspace = true }
arrow-schema = { workspace = true }
parquet-variant-compute = { workspace = true }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah this is kind of wacky -- that arrow now depends on some sub part of parquet

parquet-variant = { workspace = true }
half = { version = "2.1", default-features = false }
indexmap = { version = "2.0", default-features = false, features = ["std"] }
num-traits = { version = "0.2.19", default-features = false, features = ["std"] }
Expand Down
1 change: 1 addition & 0 deletions arrow-json/src/reader/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ impl<O: OffsetSizeTrait> ListArrayDecoder<O> {
_ => unreachable!(),
};
let decoder = make_decoder(
Some(field.clone()),
field.data_type().clone(),
coerce_primitive,
strict_mode,
Expand Down
2 changes: 2 additions & 0 deletions arrow-json/src/reader/map_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,15 @@ impl MapArrayDecoder {
};

let keys = make_decoder(
Some(fields[0].clone()),
fields[0].data_type().clone(),
coerce_primitive,
strict_mode,
fields[0].is_nullable(),
struct_mode,
)?;
let values = make_decoder(
Some(fields[1].clone()),
fields[1].data_type().clone(),
coerce_primitive,
strict_mode,
Expand Down
115 changes: 115 additions & 0 deletions arrow-json/src/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ use std::io::BufRead;
use std::sync::Arc;

use chrono::Utc;
#[cfg(feature = "variant_experimental")]
use parquet_variant_compute::VariantType;
use serde_core::Serialize;

use arrow_array::timezone::Tz;
Expand All @@ -162,6 +164,9 @@ use crate::reader::struct_array::StructArrayDecoder;
use crate::reader::tape::{Tape, TapeDecoder};
use crate::reader::timestamp_array::TimestampArrayDecoder;

#[cfg(feature = "variant_experimental")]
use crate::reader::variant_array::VariantArrayDecoder;

mod binary_array;
mod boolean_array;
mod decimal_array;
Expand All @@ -176,6 +181,8 @@ mod string_view_array;
mod struct_array;
mod tape;
mod timestamp_array;
#[cfg(feature = "variant_experimental")]
mod variant_array;

/// A builder for [`Reader`] and [`Decoder`]
pub struct ReaderBuilder {
Expand Down Expand Up @@ -304,6 +311,7 @@ impl ReaderBuilder {
};

let decoder = make_decoder(
None,
data_type,
self.coerce_primitive,
self.strict_mode,
Expand Down Expand Up @@ -686,12 +694,18 @@ macro_rules! primitive_decoder {
}

fn make_decoder(
field: Option<FieldRef>,
data_type: DataType,
coerce_primitive: bool,
strict_mode: bool,
is_nullable: bool,
Comment on lines +697 to 701
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems very awkward, to take a Field but still require both data_type and is_nullable?
Should we just admit that the decoder officially uses fields now, and rely on Field::is_nullalbe and Field::data_type?

Especially when ReaderBuilder::build_decoder (L280/305 above) is anyway deriving those two value from fields it was already working with?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it looks like nullability in particular is not always directly taken from a field (nested struct case below), and the data type is not always from a field either (decoder builder above). So maybe we should consider passing the field's metadata instead? But I don't know if we have extension type support directly on metadata (or if it requires a Field instance to work with)?

struct_mode: StructMode,
) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
#[cfg(feature = "variant_experimental")]
if let Some(field) = field && field.try_extension_type::<VariantType>().is_ok() {
return Ok(Box::new(VariantArrayDecoder{}));
}

downcast_integer! {
data_type => (primitive_decoder, data_type),
DataType::Null => Ok(Box::<NullArrayDecoder>::default()),
Expand Down Expand Up @@ -758,6 +772,7 @@ fn make_decoder(

#[cfg(test)]
mod tests {
use parquet_variant::VariantBuilder;
use serde_json::json;
use std::fs::File;
use std::io::{BufReader, Cursor, Seek};
Expand Down Expand Up @@ -2771,6 +2786,106 @@ mod tests {
);
}

#[cfg(feature = "variant_experimental")]
#[test]
fn test_variant() {
use parquet_variant::Variant;
use parquet_variant_compute::VariantArrayBuilder;

let do_test = |json_input: &str, ids: Vec<i32>, variants: Vec<Variant>| {
let variant_array = VariantArrayBuilder::new(0).build();

let struct_field = Schema::new(vec![
Field::new("id", DataType::Int32, false),
// call VariantArray::field to get the correct Field
variant_array.field("var"),
]);

let builder = ReaderBuilder::new(Arc::new(struct_field.clone()));
let result = builder
.with_struct_mode(StructMode::ObjectOnly)
.build(Cursor::new(json_input.as_bytes()))
.unwrap()
.next()
.unwrap()
.unwrap();

let int_array = arrow_array::array::Int32Array::from(ids);

let variant_array = {
let mut variant_builder = VariantArrayBuilder::new(variants.len());
for v in variants {
variant_builder.append_variant(v);
}
variant_builder.build()
};

let variant_struct_array: StructArray = variant_array.into();

let expected = RecordBatch::try_new(
struct_field.into(),
vec![Arc::new(int_array), Arc::new(variant_struct_array)],
)
.unwrap();

assert_eq!(result, expected);
};

do_test(
"{\"id\": 1, \"var\": \"a\"}\n{\"id\": 2, \"var\": \"b\"}",
vec![1, 2],
vec![Variant::from("a"), Variant::from("b")],
);

let mut builder = VariantBuilder::new();
let mut object_builder = builder.new_object();
object_builder.insert("int64", Variant::Int64(1));
object_builder.insert("double", Variant::Double(1.0));
object_builder.insert("null", Variant::Null);
object_builder.insert("true", Variant::BooleanTrue);
object_builder.insert("false", Variant::BooleanFalse);
object_builder.insert("string", Variant::from("a"));
object_builder.finish();
let (metadata, value) = builder.finish();
let variant = Variant::try_new(&metadata, &value).unwrap();

do_test(
"{\"id\": 1, \"var\": {\"int64\": 1, \"double\": 1.0, \"null\": null, \"true\": true, \"false\": false, \"string\": \"a\"}}",
vec![1],
vec![variant],
);

// nested structs
let mut builder = VariantBuilder::new();
let mut object_builder = builder.new_object();
{
let mut list_builder = object_builder.new_list("somelist");
{
let mut nested_object_builder = list_builder.new_object();
nested_object_builder.insert("num", Variant::Int64(2));
nested_object_builder.finish();
}
{
let mut nested_object_builder = list_builder.new_object();
nested_object_builder.insert("num", Variant::Int64(3));
nested_object_builder.finish();
}
list_builder.finish();
object_builder.insert("scalar", Variant::from("a"));
}
object_builder.finish();

let (metadata, value) = builder.finish();
let variant = Variant::try_new(&metadata, &value).unwrap();

do_test(
"{\"id\": 1, \"var\": {\"somelist\": [{\"num\": 2}, {\"num\": 3}], \"scalar\": \"a\"}}",
vec![1],
vec![variant],
);
}


#[test]
fn test_decode_list_struct_with_wrong_types() {
let int_field = Field::new("a", DataType::Int32, true);
Expand Down
1 change: 1 addition & 0 deletions arrow-json/src/reader/struct_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ impl StructArrayDecoder {
// it doesn't contain any nulls not masked by its parent
let nullable = f.is_nullable() || is_nullable;
make_decoder(
Some(f.clone()),
f.data_type().clone(),
coerce_primitive,
strict_mode,
Expand Down
99 changes: 99 additions & 0 deletions arrow-json/src/reader/variant_array.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow_array::{Array, StructArray};
use parquet_variant::{ObjectFieldBuilder, Variant, VariantBuilder, VariantBuilderExt};
use parquet_variant_compute::VariantArrayBuilder;
use arrow_data::ArrayData;
use arrow_schema::ArrowError;

use crate::reader::ArrayDecoder;
use crate::reader::tape::{Tape, TapeElement};

#[derive(Default)]
pub struct VariantArrayDecoder {}

impl ArrayDecoder for VariantArrayDecoder {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is so great

fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError> {
let mut array_builder = VariantArrayBuilder::new(pos.len());
for p in pos {
let mut builder = VariantBuilder::new();
variant_from_tape_element(&mut builder, *p, tape)?;
let (metadata, value) = builder.finish();
array_builder.append_value(Variant::new(&metadata, &value));
}
let variant_struct_array: StructArray = array_builder.build().into();
Ok(variant_struct_array.into_data())
}
}

fn variant_from_tape_element(builder: &mut impl VariantBuilderExt, mut p: u32, tape: &Tape) -> Result<u32, ArrowError> {
match tape.get(p) {
TapeElement::StartObject(end_idx) => {
let mut object_builder = builder.try_new_object()?;
p += 1;
while p < end_idx {
// Read field name
let field_name = match tape.get(p) {
TapeElement::String(s) => tape.get_string(s),
_ => return Err(tape.error(p, "field name")),
};

let mut field_builder = ObjectFieldBuilder::new(field_name, &mut object_builder);
p = tape.next(p, "field value")?;
p = variant_from_tape_element(&mut field_builder, p, tape)?;
}
object_builder.finish();
}
TapeElement::EndObject(_u32) => unreachable!(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it truly unreachable, even for invalid input JSON? Does the tape decoder have enough invariant checking to prove that this can never arise? What about bugs in the decoding process or tape navigation?

(perhaps it's better to just return an Err here instead of panicking)

TapeElement::StartList(end_idx) => {
let mut list_builder = builder.try_new_list()?;
p+= 1;
while p < end_idx {
p = variant_from_tape_element(&mut list_builder, p, tape)?;
}
list_builder.finish();
}
TapeElement::EndList(_u32) => unreachable!(),
TapeElement::String(idx) => builder.append_value(tape.get_string(idx)),
TapeElement::Number(idx) => {
let s = tape.get_string(idx);
builder.append_value(parse_number(s)?)
},
TapeElement::I64(i) => builder.append_value(i),
TapeElement::I32(i) => builder.append_value(i),
TapeElement::F64(f) => builder.append_value(f),
TapeElement::F32(f) => builder.append_value(f),
TapeElement::True => builder.append_value(true),
TapeElement::False => builder.append_value(false),
TapeElement::Null => builder.append_value(Variant::Null),
}
p += 1;
Ok(p)
}

fn parse_number<'a, 'b>(s: &'a str) -> Result<Variant<'a, 'b>, ArrowError> {
match lexical_core::parse::<i64>(s.as_bytes()) {
Ok(v) => Ok(Variant::from(v)),
Err(_) => {
match lexical_core::parse::<f64>(s.as_bytes()) {
Ok(v) => Ok(Variant::from(v)),
Err(_) => Err(ArrowError::JsonError(format!("failed to parse {s} as number"))),
}
}
}
Comment on lines +90 to +98
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: a few ideas to simplify the code

Suggested change
match lexical_core::parse::<i64>(s.as_bytes()) {
Ok(v) => Ok(Variant::from(v)),
Err(_) => {
match lexical_core::parse::<f64>(s.as_bytes()) {
Ok(v) => Ok(Variant::from(v)),
Err(_) => Err(ArrowError::JsonError(format!("failed to parse {s} as number"))),
}
}
}
if let Ok(v) = lexical_core::parse(s.as_bytes()) {
return Ok(Variant::Int64(v));
}
match lexical_core::parse(s.as_bytes()) {
Ok(v) => Ok(Variant::Double(v)),
Err(_) => Err(ArrowError::JsonError(format!("failed to parse {s} as number"))),
}

}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we care about missing newline at EOF?

1 change: 1 addition & 0 deletions arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ force_validate = ["arrow-array/force_validate", "arrow-data/force_validate"]
ffi = ["arrow-schema/ffi", "arrow-data/ffi", "arrow-array/ffi"]
chrono-tz = ["arrow-array/chrono-tz"]
canonical_extension_types = ["arrow-schema/canonical_extension_types"]
variant_experimental = ["arrow-json?/variant_experimental"]

[dev-dependencies]
chrono = { workspace = true }
Expand Down
6 changes: 4 additions & 2 deletions parquet-variant-compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ rust-version = { workspace = true }


[dependencies]
arrow = { workspace = true , features = ["canonical_extension_types"]}
arrow-schema = { workspace = true }
arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
arrow-cast = { workspace = true }
arrow-schema = { workspace = true, features = ["canonical_extension_types"] }
half = { version = "2.1", default-features = false }
indexmap = "2.10.0"
parquet-variant = { workspace = true }
Expand Down
Loading