From 809f2ffe77c3c88a64141c6337055046760b50ef Mon Sep 17 00:00:00 2001 From: Blake Orth Date: Thu, 6 Nov 2025 15:28:44 -0700 Subject: [PATCH 1/4] WIP: feedback commit --- parquet-geospatial/Cargo.toml | 1 + parquet-geospatial/src/crs.rs | 97 +++++++++++++++++++++++++++ parquet-geospatial/src/lib.rs | 6 ++ parquet-geospatial/src/types.rs | 87 ++++++++++++++++++++++++ parquet/src/arrow/schema/complex.rs | 1 + parquet/src/arrow/schema/extension.rs | 51 ++++++++++++++ parquet/src/arrow/schema/mod.rs | 13 ++-- parquet/src/geospatial/mod.rs | 77 +++++++++++++++++++++ 8 files changed, 329 insertions(+), 4 deletions(-) create mode 100644 parquet-geospatial/src/crs.rs create mode 100644 parquet-geospatial/src/types.rs diff --git a/parquet-geospatial/Cargo.toml b/parquet-geospatial/Cargo.toml index ea19f9c9b49f..774768523bc5 100644 --- a/parquet-geospatial/Cargo.toml +++ b/parquet-geospatial/Cargo.toml @@ -33,6 +33,7 @@ rust-version = { workspace = true } [dependencies] arrow-schema = { workspace = true } geo-traits = { version = "0.3" } +serde_json = { version = "1.0", default-features = false, features = ["std"]} wkb = { version = "0.9.1" } [dev-dependencies] diff --git a/parquet-geospatial/src/crs.rs b/parquet-geospatial/src/crs.rs new file mode 100644 index 000000000000..4712e21d3206 --- /dev/null +++ b/parquet-geospatial/src/crs.rs @@ -0,0 +1,97 @@ +use std::{collections::HashMap, sync::Arc}; + +use arrow_schema::{Schema, SchemaBuilder}; +use serde_json::{Value, json}; + +#[derive(Debug)] +pub enum Crs { + Projjson(serde_json::Value), + Srid(u64), + Other(String), +} + +impl Crs { + // TODO: make fallible + fn try_from_parquet_str(crs: &str, metadata: &HashMap) -> Self { + let de: Value = serde_json::from_str(crs).unwrap(); + + // A CRS that does not exist or is empty defaults to 4326 + // TODO: http link to parquet geospatial doc + let Some(crs) = de["crs"].as_str() else { + return Crs::Srid(4326); + }; + + if let Some(key) = crs.strip_prefix("projjson:") { + let Some(proj_meta) = metadata.get(key) else { + panic!("Failed to find key in meta: {:?}", metadata) + }; + + Self::Projjson(serde_json::from_str(proj_meta).unwrap()) + } else if let Some(srid) = crs.strip_prefix("srid:") { + Self::Srid(srid.parse().unwrap()) + } else { + if crs.is_empty() { + return Self::Srid(4326); + } + + Self::Other(crs.to_owned()) + } + } + + // TODO: make fallible + pub(super) fn try_from_arrow_str(crs: &str) -> Self { + let de: Value = serde_json::from_str(crs).unwrap(); + + let crs = match de["crs"] { + Value::Null => panic!("CRS must be specified. Inputs crs {crs}"), + _ => &de["crs"], + }; + + match de["crs_type"].as_str() { + Some("projjson") => Crs::Projjson(crs.clone()), + Some("srid") => Crs::Srid(crs.as_number().unwrap().as_u64().unwrap()), + _ => Crs::Other(crs.to_string()) + } + } + + pub(super) fn to_arrow_string(&self) -> String { + match &self { + Self::Projjson(pj) => json!({ + "crs": pj, + "crs_type": "projjson", + }) + .to_string(), + Self::Srid(srid) => json!({ + "crs": srid, + "crs_type": "srid", + }) + .to_string(), + Self::Other(s) => json!({ + "crs": s, + }) + .to_string(), + } + } +} + +// TODO: make fallible +pub fn parquet_to_arrow(schema: &mut Schema, metadata: &HashMap) { + let n_fields = schema.flattened_fields().len(); + let mut sb: SchemaBuilder = schema.as_ref().into(); + for i in 0..n_fields { + let mut field = sb.field(i).as_ref().clone(); + // TODO: use consts instead of raw strings + if let Some(ext_name) = field.extension_type_name() + && ext_name == "geoarrow.wkb" + && let Some(ext_meta) = field.metadata_mut().get_mut("ARROW:extension:metadata") + { + let crs = Crs::try_from_parquet_str(ext_meta, metadata); + *ext_meta = crs.to_arrow_string(); + } + + let schema_field = sb.field_mut(i); + *schema_field = Arc::new(field); + } + + *schema = sb.finish() +} diff --git a/parquet-geospatial/src/lib.rs b/parquet-geospatial/src/lib.rs index 2ec60c44cc16..c546d2b0cae9 100644 --- a/parquet-geospatial/src/lib.rs +++ b/parquet-geospatial/src/lib.rs @@ -28,5 +28,11 @@ //! [Geometry issue]: https://github.com/apache/arrow-rs/issues/8373 pub mod bounding; +pub mod crs; pub mod interval; pub mod testing; + +mod types; + +pub use types::Geography as GeographyType; +pub use types::Geometry as GeometryType; diff --git a/parquet-geospatial/src/types.rs b/parquet-geospatial/src/types.rs new file mode 100644 index 000000000000..76feda4e5e67 --- /dev/null +++ b/parquet-geospatial/src/types.rs @@ -0,0 +1,87 @@ +use arrow_schema::{Schema, SchemaBuilder, extension::ExtensionType}; + +use crate::crs::Crs; + +pub struct Geometry { + crs: Crs, +} + +impl Geometry { + pub fn with_parse_crs(crs: &str) -> Self { + Self { + crs: Crs::try_from_arrow_str(crs), + } + } +} + +impl ExtensionType for Geometry { + const NAME: &'static str = "geoarrow.wkb"; + + type Metadata = Crs; + + fn metadata(&self) -> &Self::Metadata { + &self.crs + } + + fn serialize_metadata(&self) -> Option { + Some(self.crs.to_arrow_string()) + } + + fn deserialize_metadata( + metadata: Option<&str>, + ) -> Result { + Ok(Crs::try_from_arrow_str(metadata.unwrap())) + } + + fn supports_data_type( + &self, + data_type: &arrow_schema::DataType, + ) -> Result<(), arrow_schema::ArrowError> { + Ok(()) + } + + fn try_new( + data_type: &arrow_schema::DataType, + metadata: Self::Metadata, + ) -> Result { + let geom = Self { crs: metadata }; + geom.supports_data_type(data_type)?; + Ok(geom) + } +} + +pub struct Geography; + +impl ExtensionType for Geography { + const NAME: &'static str = "geoarrow.wkb"; + + type Metadata = (); + + fn metadata(&self) -> &Self::Metadata { + todo!() + } + + fn serialize_metadata(&self) -> Option { + todo!() + } + + fn deserialize_metadata( + metadata: Option<&str>, + ) -> Result { + todo!() + } + + fn supports_data_type( + &self, + data_type: &arrow_schema::DataType, + ) -> Result<(), arrow_schema::ArrowError> { + todo!() + } + + fn try_new( + data_type: &arrow_schema::DataType, + metadata: Self::Metadata, + ) -> Result { + todo!() + } +} diff --git a/parquet/src/arrow/schema/complex.rs b/parquet/src/arrow/schema/complex.rs index 9622f6270d9e..2fc2d99d0685 100644 --- a/parquet/src/arrow/schema/complex.rs +++ b/parquet/src/arrow/schema/complex.rs @@ -567,6 +567,7 @@ fn convert_field( _ => Field::new(name, data_type, nullable), }; + println!("OINK! hint: {hint:?}"); Ok(field.with_metadata(hint.metadata().clone())) } None => { diff --git a/parquet/src/arrow/schema/extension.rs b/parquet/src/arrow/schema/extension.rs index 9adec8b6c122..789d1135cf1f 100644 --- a/parquet/src/arrow/schema/extension.rs +++ b/parquet/src/arrow/schema/extension.rs @@ -55,6 +55,15 @@ pub(crate) fn try_add_extension_type( LogicalType::Json => { arrow_field.try_with_extension_type(arrow_schema::extension::Json::default())?; } + #[cfg(feature = "geospatial")] + LogicalType::Geometry { crs } => { + let geom = parquet_geospatial::GeometryType::with_parse_crs(&crs.unwrap()); + arrow_field.try_with_extension_type(geom)?; + } + #[cfg(feature = "geospatial")] + LogicalType::Geography { .. } => { + arrow_field.try_with_extension_type(parquet_geospatial::GeographyType)?; + } _ => {} }; Ok(arrow_field) @@ -75,6 +84,10 @@ pub(crate) fn has_extension_type(parquet_type: &Type) -> bool { LogicalType::Uuid => true, #[cfg(feature = "arrow_canonical_extension_types")] LogicalType::Json => true, + #[cfg(feature = "geospatial")] + LogicalType::Geometry { .. } => true, + #[cfg(feature = "geospatial")] + LogicalType::Geography { .. } => true, _ => false, } } @@ -133,3 +146,41 @@ pub(crate) fn logical_type_for_string(field: &Field) -> Option { pub(crate) fn logical_type_for_string(_field: &Field) -> Option { Some(LogicalType::String) } + +#[cfg(feature = "geospatial")] +pub(crate) fn logical_type_for_binary(field: &Field) -> Option { + use parquet_geospatial::{GeographyType, GeometryType}; + + match field.extension_type_name() { + Some(n) if n == GeometryType::NAME => match field.try_extension_type::() { + Ok(GeometryType) => Some(LogicalType::Geometry { crs: todo!() }), + Err(_e) => None, + }, + Some(n) if n == GeographyType::NAME => match field.try_extension_type::() { + Ok(GeographyType) => Some(LogicalType::Geography { + crs: todo!(), + algorithm: todo!(), + }), + Err(_e) => None, + }, + _ => return None, + }; + + None +} + +#[cfg(not(feature = "geospatial"))] +pub(crate) fn logical_type_for_binary(field: &Field) -> Option { + None +} + +#[cfg(feature = "geospatial")] +pub(crate) fn logical_type_for_binary_view(field: &Field) -> Option { + // TODO: make this better-er + logical_type_for_binary(field) +} + +#[cfg(not(feature = "geospatial"))] +pub(crate) fn logical_type_for_binary_view(field: &Field) -> Option { + None +} diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index 75603ac86693..9572cd03477a 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -39,8 +39,9 @@ mod primitive; use super::PARQUET_FIELD_ID_META_KEY; use crate::arrow::ProjectionMask; use crate::arrow::schema::extension::{ - has_extension_type, logical_type_for_fixed_size_binary, logical_type_for_string, - logical_type_for_struct, try_add_extension_type, + has_extension_type, logical_type_for_binary, logical_type_for_binary_view, + logical_type_for_fixed_size_binary, logical_type_for_string, logical_type_for_struct, + try_add_extension_type, }; pub(crate) use complex::{ParquetField, ParquetFieldType}; @@ -76,16 +77,18 @@ pub(crate) fn parquet_to_arrow_schema_and_fields( key_value_metadata: Option<&Vec>, ) -> Result<(Schema, Option)> { let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default(); - let maybe_schema = metadata + let mut maybe_schema = metadata .remove(super::ARROW_SCHEMA_META_KEY) .map(|value| get_arrow_schema_from_metadata(&value)) .transpose()?; // Add the Arrow metadata to the Parquet metadata skipping keys that collide - if let Some(arrow_schema) = &maybe_schema { + if let Some(arrow_schema) = maybe_schema.as_mut() { arrow_schema.metadata().iter().for_each(|(k, v)| { metadata.entry(k.clone()).or_insert_with(|| v.clone()); }); + #[cfg(feature = "geospatial")] + parquet_geospatial::crs::parquet_to_arrow(arrow_schema, &metadata) } let hint = maybe_schema.as_ref().map(|s| s.fields()); @@ -604,6 +607,7 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result { Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY) .with_repetition(repetition) .with_id(id) + .with_logical_type(logical_type_for_binary(field)) .build() } DataType::FixedSizeBinary(length) => { @@ -617,6 +621,7 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result { DataType::BinaryView => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY) .with_repetition(repetition) .with_id(id) + .with_logical_type(logical_type_for_binary_view(field)) .build(), DataType::Decimal32(precision, scale) | DataType::Decimal64(precision, scale) diff --git a/parquet/src/geospatial/mod.rs b/parquet/src/geospatial/mod.rs index 9d55fca89d46..9ba73b7bdd12 100644 --- a/parquet/src/geospatial/mod.rs +++ b/parquet/src/geospatial/mod.rs @@ -49,3 +49,80 @@ pub mod accumulator; pub mod bounding_box; pub mod statistics; + +#[cfg(test)] +mod tests { + use std::path::PathBuf; + + use arrow::util::test_util::parquet_test_data; + use arrow_array::RecordBatch; + use parquet_geospatial::GeometryType; + + use crate::{arrow::arrow_reader::ArrowReaderBuilder, file::reader::ChunkReader}; + + /// Ensure a file with Geometry LogicalType, written by another writer in + /// parquet-testing, can be read as a GeometryArray + #[test] + fn read_geometry_logical_type() { + // Note: case-075 2 columns ("id", "var") + // The variant looks like this: + // "Variant(metadata=VariantMetadata(dict={}), value=Variant(type=STRING, value=iceberg))" + let batch = read_geospatial_test_case("crs-projjson.parquet"); + + assert_geometry_metadata(&batch, "geometry"); + // let var_column = batch.column_by_name("var").expect("expected var column"); + // let var_array = + // VariantArray::try_new(&var_column).expect("expected var column to be a VariantArray"); + + // // verify the value + // assert_eq!(var_array.len(), 1); + // assert!(var_array.is_valid(0)); + // let var_value = var_array.value(0); + // assert_eq!(var_value, Variant::from("iceberg")); + } + + /// Verifies the geospatial metadata is present in the schema for the specified + /// field name. + fn assert_geometry_metadata(batch: &RecordBatch, field_name: &str) { + println!("{batch:?}"); + let schema = batch.schema(); + let field = schema + .field_with_name(field_name) + .expect("could not find expected field"); + + // explicitly check the metadata so it is clear in the tests what the + // names are + let metadata_value = field + .metadata() + .get("ARROW:extension:name") + .expect("metadata does not exist"); + + assert_eq!(metadata_value, "geoarrow.wkb"); + + // verify that `GeometryType` also correctly finds the metadata + field + .try_extension_type::() + .expect("GeometryExtensionType should be readable"); + } + + /// Reads a RecordBatch from a reader (e.g. Vec or File) + fn read_to_batch(reader: T) -> RecordBatch { + let reader = ArrowReaderBuilder::try_new(reader) + .unwrap() + .build() + .unwrap(); + let mut batches: Vec = reader.collect::, _>>().unwrap(); + assert_eq!(batches.len(), 1); + batches.swap_remove(0) + } + + /// Read the specified test case filename from parquet-testing + /// See parquet-testing/geospatial/README.md for more details + fn read_geospatial_test_case(name: &str) -> RecordBatch { + let case_file = PathBuf::from(parquet_test_data()) + .join("geospatial") + .join(name); + let case_file = std::fs::File::open(case_file).unwrap(); + read_to_batch(case_file) + } +} From a40f62c8a504c354fcea65b95ab0f9b07df05fa3 Mon Sep 17 00:00:00 2001 From: Blake Orth Date: Mon, 10 Nov 2025 15:02:27 -0700 Subject: [PATCH 2/4] Removes specialized parsing --- parquet-geospatial/Cargo.toml | 1 + parquet-geospatial/src/crs.rs | 90 ++++++++------------------- parquet-geospatial/src/lib.rs | 3 +- parquet-geospatial/src/types.rs | 79 +++++++---------------- parquet/src/arrow/schema/complex.rs | 1 - parquet/src/arrow/schema/extension.rs | 21 +++---- parquet/src/arrow/schema/mod.rs | 2 +- parquet/src/geospatial/mod.rs | 8 ++- 8 files changed, 64 insertions(+), 141 deletions(-) diff --git a/parquet-geospatial/Cargo.toml b/parquet-geospatial/Cargo.toml index 774768523bc5..9b5624214119 100644 --- a/parquet-geospatial/Cargo.toml +++ b/parquet-geospatial/Cargo.toml @@ -33,6 +33,7 @@ rust-version = { workspace = true } [dependencies] arrow-schema = { workspace = true } geo-traits = { version = "0.3" } +serde = { version = "1.0", default-features = false, features = ["derive"]} serde_json = { version = "1.0", default-features = false, features = ["std"]} wkb = { version = "0.9.1" } diff --git a/parquet-geospatial/src/crs.rs b/parquet-geospatial/src/crs.rs index 4712e21d3206..6f91b29252e8 100644 --- a/parquet-geospatial/src/crs.rs +++ b/parquet-geospatial/src/crs.rs @@ -1,96 +1,58 @@ use std::{collections::HashMap, sync::Arc}; use arrow_schema::{Schema, SchemaBuilder}; -use serde_json::{Value, json}; +use serde::{Deserialize, Serialize}; -#[derive(Debug)] -pub enum Crs { - Projjson(serde_json::Value), - Srid(u64), - Other(String), +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct Crs { + crs: Option, + crs_type: Option, } impl Crs { // TODO: make fallible fn try_from_parquet_str(crs: &str, metadata: &HashMap) -> Self { - let de: Value = serde_json::from_str(crs).unwrap(); + let crs: Crs = serde_json::from_str(crs).unwrap(); - // A CRS that does not exist or is empty defaults to 4326 - // TODO: http link to parquet geospatial doc - let Some(crs) = de["crs"].as_str() else { - return Crs::Srid(4326); + let Some(crs_value) = &crs.crs else { + return crs; }; - if let Some(key) = crs.strip_prefix("projjson:") { - let Some(proj_meta) = metadata.get(key) else { - panic!("Failed to find key in meta: {:?}", metadata) - }; - - Self::Projjson(serde_json::from_str(proj_meta).unwrap()) - } else if let Some(srid) = crs.strip_prefix("srid:") { - Self::Srid(srid.parse().unwrap()) - } else { - if crs.is_empty() { - return Self::Srid(4326); - } - - Self::Other(crs.to_owned()) - } - } - - // TODO: make fallible - pub(super) fn try_from_arrow_str(crs: &str) -> Self { - let de: Value = serde_json::from_str(crs).unwrap(); + let Some(key) = crs_value.strip_prefix("projjson:") else { + return crs; + }; - let crs = match de["crs"] { - Value::Null => panic!("CRS must be specified. Inputs crs {crs}"), - _ => &de["crs"], + let Some(proj_meta) = metadata.get(key) else { + return crs; }; - match de["crs_type"].as_str() { - Some("projjson") => Crs::Projjson(crs.clone()), - Some("srid") => Crs::Srid(crs.as_number().unwrap().as_u64().unwrap()), - _ => Crs::Other(crs.to_string()) + Self { + crs: Some(proj_meta.clone()), + crs_type: Some(String::from("projjson")), } } pub(super) fn to_arrow_string(&self) -> String { - match &self { - Self::Projjson(pj) => json!({ - "crs": pj, - "crs_type": "projjson", - }) - .to_string(), - Self::Srid(srid) => json!({ - "crs": srid, - "crs_type": "srid", - }) - .to_string(), - Self::Other(s) => json!({ - "crs": s, - }) - .to_string(), - } + serde_json::to_string(&self).unwrap_or_default() } } // TODO: make fallible -pub fn parquet_to_arrow(schema: &mut Schema, metadata: &HashMap) { +pub fn replace_keyvalue(schema: &mut Schema, metadata: &HashMap) { let n_fields = schema.flattened_fields().len(); let mut sb: SchemaBuilder = schema.as_ref().into(); for i in 0..n_fields { let mut field = sb.field(i).as_ref().clone(); // TODO: use consts instead of raw strings - if let Some(ext_name) = field.extension_type_name() - && ext_name == "geoarrow.wkb" - && let Some(ext_meta) = field.metadata_mut().get_mut("ARROW:extension:metadata") - { - let crs = Crs::try_from_parquet_str(ext_meta, metadata); - *ext_meta = crs.to_arrow_string(); - } + if let Some("geoarrow.wkb") = field.extension_type_name() { + if let Some(ext_meta) = field.metadata_mut().get_mut("ARROW:extension:metadata") { + let crs = Crs::try_from_parquet_str(ext_meta, metadata); + *ext_meta = crs.to_arrow_string(); - let schema_field = sb.field_mut(i); - *schema_field = Arc::new(field); + let schema_field = sb.field_mut(i); + *schema_field = Arc::new(field); + } + } } *schema = sb.finish() diff --git a/parquet-geospatial/src/lib.rs b/parquet-geospatial/src/lib.rs index c546d2b0cae9..697b25846240 100644 --- a/parquet-geospatial/src/lib.rs +++ b/parquet-geospatial/src/lib.rs @@ -34,5 +34,4 @@ pub mod testing; mod types; -pub use types::Geography as GeographyType; -pub use types::Geometry as GeometryType; +pub use types::WkbType; diff --git a/parquet-geospatial/src/types.rs b/parquet-geospatial/src/types.rs index 76feda4e5e67..03a687de9812 100644 --- a/parquet-geospatial/src/types.rs +++ b/parquet-geospatial/src/types.rs @@ -1,87 +1,52 @@ -use arrow_schema::{Schema, SchemaBuilder, extension::ExtensionType}; +use arrow_schema::{ArrowError, DataType, extension::ExtensionType}; +use serde::{Deserialize, Serialize}; -use crate::crs::Crs; +#[derive(Default)] +pub struct WkbType(Option); -pub struct Geometry { - crs: Crs, +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +pub struct Metadata { + crs: Option, + algorithm: Option, } -impl Geometry { - pub fn with_parse_crs(crs: &str) -> Self { - Self { - crs: Crs::try_from_arrow_str(crs), - } - } -} - -impl ExtensionType for Geometry { +impl ExtensionType for WkbType { const NAME: &'static str = "geoarrow.wkb"; - type Metadata = Crs; + type Metadata = Option; fn metadata(&self) -> &Self::Metadata { - &self.crs + &self.0 } fn serialize_metadata(&self) -> Option { - Some(self.crs.to_arrow_string()) + self.0.clone().map(|md| serde_json::to_string(&md).unwrap()) } fn deserialize_metadata( metadata: Option<&str>, ) -> Result { - Ok(Crs::try_from_arrow_str(metadata.unwrap())) + Ok(metadata.map(|md| serde_json::from_str(md).unwrap())) } fn supports_data_type( &self, data_type: &arrow_schema::DataType, ) -> Result<(), arrow_schema::ArrowError> { - Ok(()) - } - - fn try_new( - data_type: &arrow_schema::DataType, - metadata: Self::Metadata, - ) -> Result { - let geom = Self { crs: metadata }; - geom.supports_data_type(data_type)?; - Ok(geom) - } -} - -pub struct Geography; - -impl ExtensionType for Geography { - const NAME: &'static str = "geoarrow.wkb"; - - type Metadata = (); - - fn metadata(&self) -> &Self::Metadata { - todo!() - } - - fn serialize_metadata(&self) -> Option { - todo!() - } - - fn deserialize_metadata( - metadata: Option<&str>, - ) -> Result { - todo!() - } - - fn supports_data_type( - &self, - data_type: &arrow_schema::DataType, - ) -> Result<(), arrow_schema::ArrowError> { - todo!() + match data_type { + DataType::Binary | DataType::LargeBinary | DataType::BinaryView => Ok(()), + dt => Err(ArrowError::InvalidArgumentError(format!( + "Geometry data type mismatch, expected one of Binary, LargeBinary, BinaryView. Found {dt}" + ))), + } } fn try_new( data_type: &arrow_schema::DataType, metadata: Self::Metadata, ) -> Result { - todo!() + let wkb = Self(metadata); + wkb.supports_data_type(data_type)?; + Ok(wkb) } } diff --git a/parquet/src/arrow/schema/complex.rs b/parquet/src/arrow/schema/complex.rs index 2fc2d99d0685..9622f6270d9e 100644 --- a/parquet/src/arrow/schema/complex.rs +++ b/parquet/src/arrow/schema/complex.rs @@ -567,7 +567,6 @@ fn convert_field( _ => Field::new(name, data_type, nullable), }; - println!("OINK! hint: {hint:?}"); Ok(field.with_metadata(hint.metadata().clone())) } None => { diff --git a/parquet/src/arrow/schema/extension.rs b/parquet/src/arrow/schema/extension.rs index 789d1135cf1f..d56f58942c40 100644 --- a/parquet/src/arrow/schema/extension.rs +++ b/parquet/src/arrow/schema/extension.rs @@ -56,13 +56,12 @@ pub(crate) fn try_add_extension_type( arrow_field.try_with_extension_type(arrow_schema::extension::Json::default())?; } #[cfg(feature = "geospatial")] - LogicalType::Geometry { crs } => { - let geom = parquet_geospatial::GeometryType::with_parse_crs(&crs.unwrap()); - arrow_field.try_with_extension_type(geom)?; + LogicalType::Geometry { .. } => { + arrow_field.try_with_extension_type(parquet_geospatial::WkbType::default())?; } #[cfg(feature = "geospatial")] LogicalType::Geography { .. } => { - arrow_field.try_with_extension_type(parquet_geospatial::GeographyType)?; + arrow_field.try_with_extension_type(parquet_geospatial::WkbType::default())?; } _ => {} }; @@ -149,17 +148,13 @@ pub(crate) fn logical_type_for_string(_field: &Field) -> Option { #[cfg(feature = "geospatial")] pub(crate) fn logical_type_for_binary(field: &Field) -> Option { - use parquet_geospatial::{GeographyType, GeometryType}; + use parquet_geospatial::WkbType; match field.extension_type_name() { - Some(n) if n == GeometryType::NAME => match field.try_extension_type::() { - Ok(GeometryType) => Some(LogicalType::Geometry { crs: todo!() }), - Err(_e) => None, - }, - Some(n) if n == GeographyType::NAME => match field.try_extension_type::() { - Ok(GeographyType) => Some(LogicalType::Geography { - crs: todo!(), - algorithm: todo!(), + Some(n) if n == WkbType::NAME => match field.try_extension_type::() { + // TODO: detect and differentiate between Geometry and Geography + Ok(wkb_type) => Some(LogicalType::Geometry { + crs: wkb_type.serialize_metadata(), }), Err(_e) => None, }, diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index 9572cd03477a..c8b983c4a629 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -88,7 +88,7 @@ pub(crate) fn parquet_to_arrow_schema_and_fields( metadata.entry(k.clone()).or_insert_with(|| v.clone()); }); #[cfg(feature = "geospatial")] - parquet_geospatial::crs::parquet_to_arrow(arrow_schema, &metadata) + parquet_geospatial::crs::replace_keyvalue(arrow_schema, &metadata) } let hint = maybe_schema.as_ref().map(|s| s.fields()); diff --git a/parquet/src/geospatial/mod.rs b/parquet/src/geospatial/mod.rs index 9ba73b7bdd12..3ae48d985ac5 100644 --- a/parquet/src/geospatial/mod.rs +++ b/parquet/src/geospatial/mod.rs @@ -56,7 +56,7 @@ mod tests { use arrow::util::test_util::parquet_test_data; use arrow_array::RecordBatch; - use parquet_geospatial::GeometryType; + use parquet_geospatial::WkbType; use crate::{arrow::arrow_reader::ArrowReaderBuilder, file::reader::ChunkReader}; @@ -70,7 +70,9 @@ mod tests { let batch = read_geospatial_test_case("crs-projjson.parquet"); assert_geometry_metadata(&batch, "geometry"); - // let var_column = batch.column_by_name("var").expect("expected var column"); + let _geom_column = batch + .column_by_name("geometry") + .expect("expected geometry column"); // let var_array = // VariantArray::try_new(&var_column).expect("expected var column to be a VariantArray"); @@ -101,7 +103,7 @@ mod tests { // verify that `GeometryType` also correctly finds the metadata field - .try_extension_type::() + .try_extension_type::() .expect("GeometryExtensionType should be readable"); } From 1ffbe90f0c1e6c30c5ae07c45a476ca50bd2d7fa Mon Sep 17 00:00:00 2001 From: Blake Orth Date: Wed, 12 Nov 2025 17:04:13 -0700 Subject: [PATCH 3/4] Basic round-trip for geospatial types --- parquet-geospatial/Cargo.toml | 1 + parquet-geospatial/src/lib.rs | 2 + parquet-geospatial/src/types.rs | 128 +++++++++++++++++++++++--- parquet/src/arrow/schema/extension.rs | 26 ++++-- parquet/src/arrow/schema/mod.rs | 2 - parquet/src/geospatial/mod.rs | 122 ++++++++++++++++++++---- 6 files changed, 239 insertions(+), 42 deletions(-) diff --git a/parquet-geospatial/Cargo.toml b/parquet-geospatial/Cargo.toml index 9b5624214119..d35e3303dfe0 100644 --- a/parquet-geospatial/Cargo.toml +++ b/parquet-geospatial/Cargo.toml @@ -31,6 +31,7 @@ edition = { workspace = true } rust-version = { workspace = true } [dependencies] +arrow = { workspace = true } arrow-schema = { workspace = true } geo-traits = { version = "0.3" } serde = { version = "1.0", default-features = false, features = ["derive"]} diff --git a/parquet-geospatial/src/lib.rs b/parquet-geospatial/src/lib.rs index 697b25846240..3b692df321d7 100644 --- a/parquet-geospatial/src/lib.rs +++ b/parquet-geospatial/src/lib.rs @@ -34,4 +34,6 @@ pub mod testing; mod types; +pub use types::Metadata as WkbMetadata; +pub use types::WkbArray; pub use types::WkbType; diff --git a/parquet-geospatial/src/types.rs b/parquet-geospatial/src/types.rs index 03a687de9812..8097f2d45cfa 100644 --- a/parquet-geospatial/src/types.rs +++ b/parquet-geospatial/src/types.rs @@ -1,13 +1,27 @@ +use std::sync::Arc; + +use arrow::array::{Array, ArrayRef, BinaryArray, BinaryViewArray, LargeBinaryArray, make_array}; +use arrow::buffer::NullBuffer; +use arrow::error::Result; +use arrow_schema::Field; use arrow_schema::{ArrowError, DataType, extension::ExtensionType}; use serde::{Deserialize, Serialize}; #[derive(Default)] pub struct WkbType(Option); +impl WkbType { + pub fn new(metadata: Option) -> Self { + Self(metadata) + } +} + #[derive(Clone, Debug, Default, Serialize, Deserialize)] pub struct Metadata { - crs: Option, - algorithm: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub crs: Option, // TODO: explore when this is valid JSON to avoid double escaping + #[serde(skip_serializing_if = "Option::is_none")] + pub algorithm: Option, } impl ExtensionType for WkbType { @@ -23,16 +37,11 @@ impl ExtensionType for WkbType { self.0.clone().map(|md| serde_json::to_string(&md).unwrap()) } - fn deserialize_metadata( - metadata: Option<&str>, - ) -> Result { + fn deserialize_metadata(metadata: Option<&str>) -> Result { Ok(metadata.map(|md| serde_json::from_str(md).unwrap())) } - fn supports_data_type( - &self, - data_type: &arrow_schema::DataType, - ) -> Result<(), arrow_schema::ArrowError> { + fn supports_data_type(&self, data_type: &arrow_schema::DataType) -> Result<()> { match data_type { DataType::Binary | DataType::LargeBinary | DataType::BinaryView => Ok(()), dt => Err(ArrowError::InvalidArgumentError(format!( @@ -41,12 +50,105 @@ impl ExtensionType for WkbType { } } - fn try_new( - data_type: &arrow_schema::DataType, - metadata: Self::Metadata, - ) -> Result { + fn try_new(data_type: &arrow_schema::DataType, metadata: Self::Metadata) -> Result { let wkb = Self(metadata); wkb.supports_data_type(data_type)?; Ok(wkb) } } + +pub struct WkbArray { + inner: ArrayRef, + metadata: Option, +} + +impl WkbArray { + pub fn try_new(inner: &dyn Array, metadata: Option) -> Result { + // TODO: validate the input array is one of our expected binary types + let inner = make_array(inner.to_data()); + Ok(Self { inner, metadata }) + } + + /// Returns a reference to the underlying [`ArrayRef`] + pub fn inner(&self) -> &ArrayRef { + &self.inner + } + + /// Returns the inner [`ArrayRef`], consuming self + pub fn into_inner(self) -> ArrayRef { + self.inner + } + + /// Returns a reference to the [`Metadata`] associated with this [`WkbArray`] + pub fn metadata(&self) -> &Option { + &self.metadata + } + + /// Return the WKB (Well-Known-Binary) data stored at the given row + pub fn value(&self, index: usize) -> &[u8] { + if let Some(bv) = self.inner.as_any().downcast_ref::() { + bv.value(index) + } else if let Some(b) = self.inner.as_any().downcast_ref::() { + b.value(index) + } else if let Some(lb) = self.inner.as_any().downcast_ref::() { + lb.value(index) + } else { + // Panic safety: Our try_new method ensures our inner array is one of the expected + // binary array types + unreachable!() + } + } + + /// Return a [`Field`] to represent this [`WkbArray`] in an [`arrow_schema::Schema`] with a particular name + pub fn field(&self, name: impl Into) -> Field { + Field::new( + name.into(), + self.data_type().clone(), + self.inner.is_nullable(), + ) + .with_extension_type(WkbType(self.metadata().clone())) + } + + /// Returns a new [`DataType`] representing this [`WkbArray`]'s inner type + pub fn data_type(&self) -> &DataType { + self.inner.data_type() + } + + pub fn slice(&self, offset: usize, length: usize) -> Self { + let inner = self.inner.slice(offset, length); + Self { + inner, + metadata: self.metadata().clone(), + } + } + + pub fn len(&self) -> usize { + self.inner.len() + } + + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + pub fn nulls(&self) -> Option<&NullBuffer> { + self.inner.nulls() + } + + /// Is the element at index null? + pub fn is_null(&self, index: usize) -> bool { + self.nulls().is_some_and(|n| n.is_null(index)) + } + + /// Is the element at index valid (not null)? + pub fn is_valid(&self, index: usize) -> bool { + !self.is_null(index) + } + + // TODO: implement iter() +} + +impl From for ArrayRef { + fn from(wkb_array: WkbArray) -> Self { + Arc::new(wkb_array.into_inner()) + } +} diff --git a/parquet/src/arrow/schema/extension.rs b/parquet/src/arrow/schema/extension.rs index d56f58942c40..e818202c3512 100644 --- a/parquet/src/arrow/schema/extension.rs +++ b/parquet/src/arrow/schema/extension.rs @@ -56,12 +56,24 @@ pub(crate) fn try_add_extension_type( arrow_field.try_with_extension_type(arrow_schema::extension::Json::default())?; } #[cfg(feature = "geospatial")] - LogicalType::Geometry { .. } => { - arrow_field.try_with_extension_type(parquet_geospatial::WkbType::default())?; + LogicalType::Geometry { crs } => { + let md = crs.map(|c| parquet_geospatial::WkbMetadata { + crs: Some(c), + algorithm: None, + }); + arrow_field.try_with_extension_type(parquet_geospatial::WkbType::new(md))?; } #[cfg(feature = "geospatial")] - LogicalType::Geography { .. } => { - arrow_field.try_with_extension_type(parquet_geospatial::WkbType::default())?; + LogicalType::Geography { crs, algorithm } => { + let md = if crs.is_some() || algorithm.is_some() { + Some(parquet_geospatial::WkbMetadata { + crs, + algorithm: algorithm.map(|a| a.to_string()), + }) + } else { + None + }; + arrow_field.try_with_extension_type(parquet_geospatial::WkbType::new(md))?; } _ => {} }; @@ -158,10 +170,8 @@ pub(crate) fn logical_type_for_binary(field: &Field) -> Option { }), Err(_e) => None, }, - _ => return None, - }; - - None + _ => None, + } } #[cfg(not(feature = "geospatial"))] diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index c8b983c4a629..8f1e18298146 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -87,8 +87,6 @@ pub(crate) fn parquet_to_arrow_schema_and_fields( arrow_schema.metadata().iter().for_each(|(k, v)| { metadata.entry(k.clone()).or_insert_with(|| v.clone()); }); - #[cfg(feature = "geospatial")] - parquet_geospatial::crs::replace_keyvalue(arrow_schema, &metadata) } let hint = maybe_schema.as_ref().map(|s| s.fields()); diff --git a/parquet/src/geospatial/mod.rs b/parquet/src/geospatial/mod.rs index 3ae48d985ac5..6a1e7ef959ed 100644 --- a/parquet/src/geospatial/mod.rs +++ b/parquet/src/geospatial/mod.rs @@ -52,40 +52,124 @@ pub mod statistics; #[cfg(test)] mod tests { - use std::path::PathBuf; + use std::{path::PathBuf, sync::Arc}; use arrow::util::test_util::parquet_test_data; - use arrow_array::RecordBatch; - use parquet_geospatial::WkbType; + use arrow_array::{ArrayRef, BinaryArray, RecordBatch}; + use arrow_schema::{Schema, extension::ExtensionType}; + use bytes::Bytes; + use parquet_geospatial::{WkbArray, WkbMetadata, WkbType}; - use crate::{arrow::arrow_reader::ArrowReaderBuilder, file::reader::ChunkReader}; + use crate::{ + arrow::{ArrowWriter, arrow_reader::ArrowReaderBuilder}, + file::{ + metadata::{ParquetMetaData, ParquetMetaDataReader}, + reader::ChunkReader, + }, + }; /// Ensure a file with Geometry LogicalType, written by another writer in /// parquet-testing, can be read as a GeometryArray #[test] fn read_geometry_logical_type() { - // Note: case-075 2 columns ("id", "var") - // The variant looks like this: - // "Variant(metadata=VariantMetadata(dict={}), value=Variant(type=STRING, value=iceberg))" let batch = read_geospatial_test_case("crs-projjson.parquet"); - assert_geometry_metadata(&batch, "geometry"); - let _geom_column = batch + let wkb_type = assert_geometry_metadata(&batch, "geometry"); + let geom_column = batch .column_by_name("geometry") .expect("expected geometry column"); - // let var_array = - // VariantArray::try_new(&var_column).expect("expected var column to be a VariantArray"); - - // // verify the value - // assert_eq!(var_array.len(), 1); - // assert!(var_array.is_valid(0)); - // let var_value = var_array.value(0); - // assert_eq!(var_value, Variant::from("iceberg")); + let wkb_array = WkbArray::try_new(geom_column, wkb_type.metadata().clone()) + .expect("expected geometry column to be a WkbArray"); + + // verify the value + assert_eq!(wkb_array.len(), 1); + assert!(wkb_array.is_valid(0)); + let wkb_value = wkb_array.value(0); + assert_eq!(wkb_value.len(), 3549); + } + + /// Ensure a file with Geography LogicalType, written by another writer in + /// parquet-testing, can be read as a GeometryArray + #[test] + fn read_geography_logical_type() { + let batch = read_geospatial_test_case("crs-geography.parquet"); + + let wkb_type = assert_geometry_metadata(&batch, "geography"); + let geom_column = batch + .column_by_name("geography") + .expect("expected geography column"); + let wkb_array = WkbArray::try_new(geom_column, wkb_type.metadata().clone()) + .expect("expected geometry column to be a WkbArray"); + + // verify the value + assert_eq!(wkb_array.len(), 1); + assert!(wkb_array.is_valid(0)); + let wkb_value = wkb_array.value(0); + assert_eq!(wkb_value.len(), 3549); + } + + /// Writes a wkb array to a parquet file and ensures the parquet logical type + /// annotation is correct + #[test] + fn write_geometry_logical_type() { + let (array, md) = geometry_array(); + let batch = wkb_array_to_batch(array); + let buffer = write_to_buffer(&batch); + + // read the parquet file's metadata and verify the logical type + let metadata = read_metadata(&Bytes::from(buffer)); + let schema = metadata.file_metadata().schema_descr(); + let fields = schema.root_schema().get_fields(); + assert_eq!(fields.len(), 1); + let field = &fields[0]; + assert_eq!(field.name(), "data"); + // data should have been written with the Variant logical type + assert_eq!( + field.get_basic_info().logical_type(), + Some(crate::basic::LogicalType::Geometry { + crs: Some(serde_json::to_string(&md).unwrap()) + }) + ); + } + + /// Return a WkbArray with 3 rows: + fn geometry_array() -> (WkbArray, WkbMetadata) { + let values: Vec<&[u8]> = vec![b"not", b"actually", b"wkb"]; + let inner = BinaryArray::from_vec(values); + let md = WkbMetadata { + crs: Some(String::from("test crs")), + algorithm: None, + }; + + (WkbArray::try_new(&inner, Some(md.clone())).unwrap(), md) + } + + /// creates a RecordBatch with a single column "data" from a WkbArray, + fn wkb_array_to_batch(array: WkbArray) -> RecordBatch { + let field = array.field("data"); + let schema = Schema::new(vec![field]); + RecordBatch::try_new(Arc::new(schema), vec![ArrayRef::from(array)]).unwrap() + } + + /// writes a RecordBatch to memory buffer and returns the buffer + fn write_to_buffer(batch: &RecordBatch) -> Vec { + let mut buffer = vec![]; + let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap(); + writer.write(batch).unwrap(); + writer.close().unwrap(); + buffer + } + + /// Reads the Parquet metadata + fn read_metadata(input: &T) -> ParquetMetaData { + let mut reader = ParquetMetaDataReader::new(); + reader.try_parse(input).unwrap(); + reader.finish().unwrap() } /// Verifies the geospatial metadata is present in the schema for the specified /// field name. - fn assert_geometry_metadata(batch: &RecordBatch, field_name: &str) { + fn assert_geometry_metadata(batch: &RecordBatch, field_name: &str) -> WkbType { println!("{batch:?}"); let schema = batch.schema(); let field = schema @@ -104,7 +188,7 @@ mod tests { // verify that `GeometryType` also correctly finds the metadata field .try_extension_type::() - .expect("GeometryExtensionType should be readable"); + .expect("WkbExtensionType should be readable") } /// Reads a RecordBatch from a reader (e.g. Vec or File) From efe42a2623208816cf69cfda74658fced383abc6 Mon Sep 17 00:00:00 2001 From: Blake Orth Date: Thu, 13 Nov 2025 17:26:21 -0700 Subject: [PATCH 4/4] - Adds WkbArray and metadata type hints - Removes some unused code - Shows current functionality with basic tests for reading and writing both geometry and geography --- parquet-geospatial/src/crs.rs | 59 ------------------- parquet-geospatial/src/lib.rs | 2 +- parquet-geospatial/src/types.rs | 84 ++++++++++++++++++++++----- parquet/src/arrow/schema/extension.rs | 46 +++++++++------ parquet/src/arrow/schema/mod.rs | 4 +- parquet/src/basic.rs | 18 ++++++ parquet/src/geospatial/mod.rs | 56 ++++++++++++++---- 7 files changed, 163 insertions(+), 106 deletions(-) delete mode 100644 parquet-geospatial/src/crs.rs diff --git a/parquet-geospatial/src/crs.rs b/parquet-geospatial/src/crs.rs deleted file mode 100644 index 6f91b29252e8..000000000000 --- a/parquet-geospatial/src/crs.rs +++ /dev/null @@ -1,59 +0,0 @@ -use std::{collections::HashMap, sync::Arc}; - -use arrow_schema::{Schema, SchemaBuilder}; -use serde::{Deserialize, Serialize}; - -#[derive(Debug, Default, Serialize, Deserialize)] -pub struct Crs { - crs: Option, - crs_type: Option, -} - -impl Crs { - // TODO: make fallible - fn try_from_parquet_str(crs: &str, metadata: &HashMap) -> Self { - let crs: Crs = serde_json::from_str(crs).unwrap(); - - let Some(crs_value) = &crs.crs else { - return crs; - }; - - let Some(key) = crs_value.strip_prefix("projjson:") else { - return crs; - }; - - let Some(proj_meta) = metadata.get(key) else { - return crs; - }; - - Self { - crs: Some(proj_meta.clone()), - crs_type: Some(String::from("projjson")), - } - } - - pub(super) fn to_arrow_string(&self) -> String { - serde_json::to_string(&self).unwrap_or_default() - } -} - -// TODO: make fallible -pub fn replace_keyvalue(schema: &mut Schema, metadata: &HashMap) { - let n_fields = schema.flattened_fields().len(); - let mut sb: SchemaBuilder = schema.as_ref().into(); - for i in 0..n_fields { - let mut field = sb.field(i).as_ref().clone(); - // TODO: use consts instead of raw strings - if let Some("geoarrow.wkb") = field.extension_type_name() { - if let Some(ext_meta) = field.metadata_mut().get_mut("ARROW:extension:metadata") { - let crs = Crs::try_from_parquet_str(ext_meta, metadata); - *ext_meta = crs.to_arrow_string(); - - let schema_field = sb.field_mut(i); - *schema_field = Arc::new(field); - } - } - } - - *schema = sb.finish() -} diff --git a/parquet-geospatial/src/lib.rs b/parquet-geospatial/src/lib.rs index 3b692df321d7..7704d623c266 100644 --- a/parquet-geospatial/src/lib.rs +++ b/parquet-geospatial/src/lib.rs @@ -28,12 +28,12 @@ //! [Geometry issue]: https://github.com/apache/arrow-rs/issues/8373 pub mod bounding; -pub mod crs; pub mod interval; pub mod testing; mod types; +pub use types::Hint as WkbTypeHint; pub use types::Metadata as WkbMetadata; pub use types::WkbArray; pub use types::WkbType; diff --git a/parquet-geospatial/src/types.rs b/parquet-geospatial/src/types.rs index 8097f2d45cfa..9ddb2c221f8e 100644 --- a/parquet-geospatial/src/types.rs +++ b/parquet-geospatial/src/types.rs @@ -7,13 +7,10 @@ use arrow_schema::Field; use arrow_schema::{ArrowError, DataType, extension::ExtensionType}; use serde::{Deserialize, Serialize}; -#[derive(Default)] -pub struct WkbType(Option); - -impl WkbType { - pub fn new(metadata: Option) -> Self { - Self(metadata) - } +#[derive(Copy, Clone, Debug, Serialize, Deserialize)] +pub enum Hint { + Geometry, + Geography, } #[derive(Clone, Debug, Default, Serialize, Deserialize)] @@ -22,23 +19,73 @@ pub struct Metadata { pub crs: Option, // TODO: explore when this is valid JSON to avoid double escaping #[serde(skip_serializing_if = "Option::is_none")] pub algorithm: Option, + #[serde(skip_serializing_if = "Option::is_none")] + type_hint: Option, +} + +impl Metadata { + pub fn new(crs: Option, algorithm: Option) -> Self { + Self { + crs, + algorithm, + type_hint: None, + } + } + + pub fn with_type_hint(mut self, type_hint: Hint) -> Self { + self.type_hint = Some(type_hint); + self + } + + pub fn set_type_hint(&mut self, type_hint: Hint) { + self.type_hint = Some(type_hint) + } + + pub fn type_hint(&self) -> Option { + if self.type_hint.is_some() { + return self.type_hint; + } + + match &self.algorithm { + Some(s) if s.to_lowercase() == "planar" => Some(Hint::Geometry), + Some(_) => Some(Hint::Geography), + None => None, + } + } +} + +#[derive(Debug, Default)] +pub struct WkbType(Metadata); + +impl WkbType { + pub fn new_geometry(metadata: Option) -> Self { + Self(metadata.unwrap_or_default().with_type_hint(Hint::Geometry)) + } + + pub fn new_geography(metadata: Option) -> Self { + Self(metadata.unwrap_or_default().with_type_hint(Hint::Geography)) + } } impl ExtensionType for WkbType { const NAME: &'static str = "geoarrow.wkb"; - type Metadata = Option; + type Metadata = Metadata; fn metadata(&self) -> &Self::Metadata { &self.0 } fn serialize_metadata(&self) -> Option { - self.0.clone().map(|md| serde_json::to_string(&md).unwrap()) + serde_json::to_string(&self.0).ok() } fn deserialize_metadata(metadata: Option<&str>) -> Result { - Ok(metadata.map(|md| serde_json::from_str(md).unwrap())) + let Some(metadata) = metadata else { + return Ok(Self::Metadata::default()); + }; + + serde_json::from_str(metadata).map_err(|e| ArrowError::JsonError(e.to_string())) } fn supports_data_type(&self, data_type: &arrow_schema::DataType) -> Result<()> { @@ -57,15 +104,24 @@ impl ExtensionType for WkbType { } } +#[derive(Debug)] pub struct WkbArray { inner: ArrayRef, - metadata: Option, + metadata: Metadata, } impl WkbArray { - pub fn try_new(inner: &dyn Array, metadata: Option) -> Result { - // TODO: validate the input array is one of our expected binary types + pub fn try_new_geometry(inner: &dyn Array, metadata: Metadata) -> Result { + let inner = make_array(inner.to_data()); + let metadata = metadata.with_type_hint(Hint::Geometry); + + Ok(Self { inner, metadata }) + } + + pub fn try_new_geography(inner: &dyn Array, metadata: Metadata) -> Result { let inner = make_array(inner.to_data()); + let metadata = metadata.with_type_hint(Hint::Geography); + Ok(Self { inner, metadata }) } @@ -80,7 +136,7 @@ impl WkbArray { } /// Returns a reference to the [`Metadata`] associated with this [`WkbArray`] - pub fn metadata(&self) -> &Option { + pub fn metadata(&self) -> &Metadata { &self.metadata } diff --git a/parquet/src/arrow/schema/extension.rs b/parquet/src/arrow/schema/extension.rs index e818202c3512..75e058a96d14 100644 --- a/parquet/src/arrow/schema/extension.rs +++ b/parquet/src/arrow/schema/extension.rs @@ -57,23 +57,22 @@ pub(crate) fn try_add_extension_type( } #[cfg(feature = "geospatial")] LogicalType::Geometry { crs } => { - let md = crs.map(|c| parquet_geospatial::WkbMetadata { - crs: Some(c), - algorithm: None, - }); - arrow_field.try_with_extension_type(parquet_geospatial::WkbType::new(md))?; + use parquet_geospatial::WkbTypeHint; + + let md = parquet_geospatial::WkbMetadata::new(crs, None) + .with_type_hint(WkbTypeHint::Geometry); + arrow_field + .try_with_extension_type(parquet_geospatial::WkbType::new_geometry(Some(md)))?; } #[cfg(feature = "geospatial")] LogicalType::Geography { crs, algorithm } => { - let md = if crs.is_some() || algorithm.is_some() { - Some(parquet_geospatial::WkbMetadata { - crs, - algorithm: algorithm.map(|a| a.to_string()), - }) - } else { - None - }; - arrow_field.try_with_extension_type(parquet_geospatial::WkbType::new(md))?; + use parquet_geospatial::WkbTypeHint; + + let algorithm = algorithm.map(|a| a.to_string()); + let md = parquet_geospatial::WkbMetadata::new(crs, algorithm) + .with_type_hint(WkbTypeHint::Geography); + arrow_field + .try_with_extension_type(parquet_geospatial::WkbType::new_geography(Some(md)))?; } _ => {} }; @@ -161,13 +160,24 @@ pub(crate) fn logical_type_for_string(_field: &Field) -> Option { #[cfg(feature = "geospatial")] pub(crate) fn logical_type_for_binary(field: &Field) -> Option { use parquet_geospatial::WkbType; + use parquet_geospatial::WkbTypeHint; match field.extension_type_name() { Some(n) if n == WkbType::NAME => match field.try_extension_type::() { - // TODO: detect and differentiate between Geometry and Geography - Ok(wkb_type) => Some(LogicalType::Geometry { - crs: wkb_type.serialize_metadata(), - }), + Ok(wkb_type) => match wkb_type.metadata().type_hint() { + Some(WkbTypeHint::Geometry) => Some(LogicalType::Geometry { + crs: wkb_type.metadata().crs.clone(), + }), + Some(WkbTypeHint::Geography) => Some(LogicalType::Geography { + crs: wkb_type.metadata().crs.clone(), + algorithm: wkb_type + .metadata() + .algorithm + .clone() + .and_then(|a| a.parse().ok()), + }), + _ => None, + }, Err(_e) => None, }, _ => None, diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index 8f1e18298146..0bab91657b5c 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -77,13 +77,13 @@ pub(crate) fn parquet_to_arrow_schema_and_fields( key_value_metadata: Option<&Vec>, ) -> Result<(Schema, Option)> { let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default(); - let mut maybe_schema = metadata + let maybe_schema = metadata .remove(super::ARROW_SCHEMA_META_KEY) .map(|value| get_arrow_schema_from_metadata(&value)) .transpose()?; // Add the Arrow metadata to the Parquet metadata skipping keys that collide - if let Some(arrow_schema) = maybe_schema.as_mut() { + if let Some(arrow_schema) = &maybe_schema { arrow_schema.metadata().iter().for_each(|(k, v)| { metadata.entry(k.clone()).or_insert_with(|| v.clone()); }); diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs index eaa889bb99f1..6049bf24f5f3 100644 --- a/parquet/src/basic.rs +++ b/parquet/src/basic.rs @@ -1017,6 +1017,24 @@ impl fmt::Display for EdgeInterpolationAlgorithm { } } +impl FromStr for EdgeInterpolationAlgorithm { + type Err = ParquetError; + + fn from_str(s: &str) -> Result { + match s.to_ascii_uppercase().as_str() { + "SPHERICAL" => Ok(EdgeInterpolationAlgorithm::SPHERICAL), + "VINCENTY" => Ok(EdgeInterpolationAlgorithm::VINCENTY), + "THOMAS" => Ok(EdgeInterpolationAlgorithm::THOMAS), + "ANDOYER" => Ok(EdgeInterpolationAlgorithm::ANDOYER), + "KARNEY" => Ok(EdgeInterpolationAlgorithm::KARNEY), + unknown => Err(general_err!( + "Unknown edge interpolation algorithm: {}", + unknown + )), + } + } +} + impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for EdgeInterpolationAlgorithm { fn read_thrift(prot: &mut R) -> Result { let val = prot.read_i32()?; diff --git a/parquet/src/geospatial/mod.rs b/parquet/src/geospatial/mod.rs index 6a1e7ef959ed..408e367ee3b8 100644 --- a/parquet/src/geospatial/mod.rs +++ b/parquet/src/geospatial/mod.rs @@ -62,6 +62,7 @@ mod tests { use crate::{ arrow::{ArrowWriter, arrow_reader::ArrowReaderBuilder}, + basic::EdgeInterpolationAlgorithm, file::{ metadata::{ParquetMetaData, ParquetMetaDataReader}, reader::ChunkReader, @@ -78,7 +79,7 @@ mod tests { let geom_column = batch .column_by_name("geometry") .expect("expected geometry column"); - let wkb_array = WkbArray::try_new(geom_column, wkb_type.metadata().clone()) + let wkb_array = WkbArray::try_new_geometry(geom_column, wkb_type.metadata().clone()) .expect("expected geometry column to be a WkbArray"); // verify the value @@ -98,8 +99,8 @@ mod tests { let geom_column = batch .column_by_name("geography") .expect("expected geography column"); - let wkb_array = WkbArray::try_new(geom_column, wkb_type.metadata().clone()) - .expect("expected geometry column to be a WkbArray"); + let wkb_array = WkbArray::try_new_geography(geom_column, wkb_type.metadata().clone()) + .expect("expected geography column to be a WkbArray"); // verify the value assert_eq!(wkb_array.len(), 1); @@ -108,11 +109,11 @@ mod tests { assert_eq!(wkb_value.len(), 3549); } - /// Writes a wkb array to a parquet file and ensures the parquet logical type + /// Writes a wkb (geometry) array to a parquet file and ensures the parquet logical type /// annotation is correct #[test] fn write_geometry_logical_type() { - let (array, md) = geometry_array(); + let array = geometry_array(); let batch = wkb_array_to_batch(array); let buffer = write_to_buffer(&batch); @@ -127,21 +128,52 @@ mod tests { assert_eq!( field.get_basic_info().logical_type(), Some(crate::basic::LogicalType::Geometry { - crs: Some(serde_json::to_string(&md).unwrap()) + crs: Some(String::from("test crs")) + }) + ); + } + + /// Writes a wkb (geography) array to a parquet file and ensures the parquet logical type + /// annotation is correct + #[test] + fn write_geography_logical_type() { + let array = geography_array(); + let batch = wkb_array_to_batch(array); + let buffer = write_to_buffer(&batch); + + // read the parquet file's metadata and verify the logical type + let metadata = read_metadata(&Bytes::from(buffer)); + let schema = metadata.file_metadata().schema_descr(); + let fields = schema.root_schema().get_fields(); + assert_eq!(fields.len(), 1); + let field = &fields[0]; + assert_eq!(field.name(), "data"); + // data should have been written with the Variant logical type + assert_eq!( + field.get_basic_info().logical_type(), + Some(crate::basic::LogicalType::Geography { + crs: Some(String::from("test crs")), + algorithm: Some(EdgeInterpolationAlgorithm::SPHERICAL), }) ); } /// Return a WkbArray with 3 rows: - fn geometry_array() -> (WkbArray, WkbMetadata) { + fn geometry_array() -> WkbArray { + let values: Vec<&[u8]> = vec![b"not", b"actually", b"wkb"]; + let inner = BinaryArray::from_vec(values); + let md = WkbMetadata::new(Some(String::from("test crs")), None); + + WkbArray::try_new_geometry(&inner, md.clone()).unwrap() + } + + /// Return a WkbArray with 3 rows: + fn geography_array() -> WkbArray { let values: Vec<&[u8]> = vec![b"not", b"actually", b"wkb"]; let inner = BinaryArray::from_vec(values); - let md = WkbMetadata { - crs: Some(String::from("test crs")), - algorithm: None, - }; + let md = WkbMetadata::new(Some(String::from("test crs")), None); - (WkbArray::try_new(&inner, Some(md.clone())).unwrap(), md) + WkbArray::try_new_geography(&inner, md.clone()).unwrap() } /// creates a RecordBatch with a single column "data" from a WkbArray,