diff --git a/parquet-geospatial/Cargo.toml b/parquet-geospatial/Cargo.toml index 36c12edf0b77..471b355dc6eb 100644 --- a/parquet-geospatial/Cargo.toml +++ b/parquet-geospatial/Cargo.toml @@ -31,6 +31,8 @@ rust-version = { workspace = true } [dependencies] arrow-schema = { workspace = true } geo-traits = { version = "0.3" } +serde = { version = "1.0", default-features = false, features = ["derive"]} +serde_json = { version = "1.0", default-features = false, features = ["std"]} wkb = { version = "0.9.1" } [dev-dependencies] diff --git a/parquet-geospatial/src/lib.rs b/parquet-geospatial/src/lib.rs index 2ec60c44cc16..7b2b6166a4b0 100644 --- a/parquet-geospatial/src/lib.rs +++ b/parquet-geospatial/src/lib.rs @@ -30,3 +30,10 @@ pub mod bounding; pub mod interval; pub mod testing; + +mod types; + +pub use types::Edges as WkbEdges; +pub use types::Hint as WkbTypeHint; +pub use types::Metadata as WkbMetadata; +pub use types::WkbType; diff --git a/parquet-geospatial/src/types.rs b/parquet-geospatial/src/types.rs new file mode 100644 index 000000000000..f19911ad055a --- /dev/null +++ b/parquet-geospatial/src/types.rs @@ -0,0 +1,407 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_schema::{ArrowError, DataType, extension::ExtensionType}; +use serde::{Deserialize, Serialize}; + +/// Hints at the likely Parquet geospatial logical type represented by a [`Metadata`]. +/// +/// Based on the `algorithm` field: +/// - [`Hint::Geometry`]: WKB format with linear/planar edge interpolation +/// - [`Hint::Geography`]: WKB format with explicit non-linear/non-planar edge interpolation +/// +/// See the [Parquet Geospatial specification](https://github.com/apache/parquet-format/blob/master/Geospatial.md) +/// for more details. +#[derive(Copy, Clone, Debug, Serialize, Deserialize)] +pub enum Hint { + /// Geospatial features in WKB format with linear/planar edge interpolation + Geometry, + /// Geospatial features in WKB format with explicit non-linear/non-planar edge interpolation + Geography, +} + +/// The edge interpolation algorithms used with `GEOMETRY` logical types. +#[derive(Default, Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum Edges { + /// Edges are interpolated as geodesics on a sphere. + #[default] + Spherical, + /// + Vincenty, + /// Thomas, Paul D. Spheroidal geodesics, reference systems, & local geometry. US Naval Oceanographic Office, 1970 + Thomas, + /// Thomas, Paul D. Mathematical models for navigation systems. US Naval Oceanographic Office, 1965. + Andoyer, + /// Karney, Charles FF. "Algorithms for geodesics." Journal of Geodesy 87 (2013): 43-55 + Karney, +} + +/// The metadata associated with a [`WkbType`]. +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +pub struct Metadata { + /// The Coordinate Reference System (CRS) of the [`WkbType`], if present. + /// + /// This may be a raw string value (e.g., "EPSG:3857") or a JSON object (e.g., PROJJSON). + /// Note: Common lon/lat CRS representations (EPSG:4326, OGC:CRS84) are canonicalized + /// to `None` during serialization to match Parquet conventions. + #[serde(skip_serializing_if = "Option::is_none")] + pub crs: Option, + /// The edge interpolation algorithm of the [`WkbType`], if present. + #[serde(skip_serializing_if = "Option::is_none")] + pub algorithm: Option, +} + +impl Metadata { + /// Constructs a new [`Metadata`] with the given CRS and algorithm. + /// + /// If a CRS is provided, and can be parsed as JSON, it will be stored as a JSON object instead + /// of its string representation. + pub fn new(crs: Option<&str>, algorithm: Option) -> Self { + let crs = crs.map(|c| match serde_json::from_str(c) { + Ok(crs) => crs, + Err(_) => serde_json::Value::String(c.to_string()), + }); + + Self { crs, algorithm } + } + + /// Returns a [`Hint`] to the likely underlying Logical Type that this [`Metadata`] represents. + pub fn type_hint(&self) -> Hint { + match &self.algorithm { + Some(_) => Hint::Geography, + None => Hint::Geometry, + } + } + + /// Detect if the CRS is a common representation of lon/lat on the standard WGS84 ellipsoid + fn crs_is_lon_lat(&self) -> bool { + use serde_json::Value; + + let Some(crs) = &self.crs else { + return false; + }; + + match crs { + Value::String(s) if s == "EPSG:4326" || s == "OGC:CRS84" => true, + Value::Object(_) => match (&crs["id"]["authority"], &crs["id"]["code"]) { + (Value::String(auth), Value::String(code)) if auth == "OGC" && code == "CRS84" => { + true + } + (Value::String(auth), Value::String(code)) if auth == "EPSG" && code == "4326" => { + true + } + (Value::String(auth), Value::Number(code)) + if auth == "EPSG" && code.as_i64() == Some(4326) => + { + true + } + _ => false, + }, + _ => false, + } + } +} + +/// Well-Known Binary (WKB) [`ExtensionType`] for geospatial data. +/// +/// Represents the canonical Arrow Extension Type for storing +/// [GeoArrow](https://github.com/geoarrow/geoarrow) data. +#[derive(Debug, Default)] +pub struct WkbType(Metadata); + +impl WkbType { + /// Constructs a new [`WkbType`] with the given [`Metadata`]. + /// + /// If `None` is provided, default (empty) metadata is used. + pub fn new(metadata: Option) -> Self { + Self(metadata.unwrap_or_default()) + } +} + +type ArrowResult = Result; +impl ExtensionType for WkbType { + const NAME: &'static str = "geoarrow.wkb"; + + type Metadata = Metadata; + + fn metadata(&self) -> &Self::Metadata { + &self.0 + } + + fn serialize_metadata(&self) -> Option { + let md = if self.0.crs_is_lon_lat() { + &Metadata { + crs: None, // lon/lat CRS is canonicalized as omitted (None) for Parquet + algorithm: self.0.algorithm, + } + } else { + &self.0 + }; + + serde_json::to_string(md).ok() + } + + fn deserialize_metadata(metadata: Option<&str>) -> ArrowResult { + let Some(metadata) = metadata else { + return Ok(Self::Metadata::default()); + }; + + serde_json::from_str(metadata).map_err(|e| ArrowError::JsonError(e.to_string())) + } + + fn supports_data_type(&self, data_type: &arrow_schema::DataType) -> ArrowResult<()> { + match data_type { + DataType::Binary | DataType::LargeBinary | DataType::BinaryView => Ok(()), + dt => Err(ArrowError::InvalidArgumentError(format!( + "Geometry data type mismatch, expected one of Binary, LargeBinary, BinaryView. Found {dt}" + ))), + } + } + + fn try_new(data_type: &arrow_schema::DataType, metadata: Self::Metadata) -> ArrowResult { + let wkb = Self(metadata); + wkb.supports_data_type(data_type)?; + Ok(wkb) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow_schema::Field; + + /// Test metadata serialization and deserialization with empty/default metadata + #[test] + fn test_metadata_empty_roundtrip() -> ArrowResult<()> { + let metadata = Metadata::default(); + let wkb = WkbType::new(Some(metadata)); + + let serialized = wkb.serialize_metadata().unwrap(); + assert_eq!(serialized, "{}"); + + let deserialized = WkbType::deserialize_metadata(Some(&serialized))?; + assert!(deserialized.crs.is_none()); + assert!(deserialized.algorithm.is_none()); + + Ok(()) + } + + /// Test metadata serialization with CRS as a simple string + #[test] + fn test_metadata_crs_string_roundtrip() -> ArrowResult<()> { + let metadata = Metadata::new(Some("srid:1234"), None); + let wkb = WkbType::new(Some(metadata)); + + let serialized = wkb.serialize_metadata().unwrap(); + assert_eq!(serialized, r#"{"crs":"srid:1234"}"#); + + let deserialized = WkbType::deserialize_metadata(Some(&serialized))?; + assert_eq!( + deserialized.crs.unwrap(), + serde_json::Value::String(String::from("srid:1234")) + ); + assert!(deserialized.algorithm.is_none()); + + Ok(()) + } + + /// Test metadata serialization with CRS as a JSON object + #[test] + fn test_metadata_crs_json_object_roundtrip() -> ArrowResult<()> { + let crs_json = r#"{"type":"custom_json","properties":{"name":"EPSG:4326"}}"#; + let metadata = Metadata::new(Some(crs_json), None); + let wkb = WkbType::new(Some(metadata)); + + let serialized = wkb.serialize_metadata().unwrap(); + // Validate by parsing the JSON and checking structure (field order is not guaranteed) + let parsed: serde_json::Value = serde_json::from_str(&serialized).unwrap(); + assert_eq!(parsed["crs"]["type"], "custom_json"); + assert_eq!(parsed["crs"]["properties"]["name"], "EPSG:4326"); + + let deserialized = WkbType::deserialize_metadata(Some(&serialized))?; + + // Verify it's a JSON object with expected structure + let crs = deserialized.crs.unwrap(); + assert!(crs.is_object()); + assert_eq!(crs["type"], "custom_json"); + assert_eq!(crs["properties"]["name"], "EPSG:4326"); + + Ok(()) + } + + /// Test metadata serialization with algorithm field + #[test] + fn test_metadata_algorithm_roundtrip() -> ArrowResult<()> { + let metadata = Metadata::new(None, Some(Edges::Spherical)); + let wkb = WkbType::new(Some(metadata)); + + let serialized = wkb.serialize_metadata().unwrap(); + assert_eq!(serialized, r#"{"algorithm":"spherical"}"#); + + let deserialized = WkbType::deserialize_metadata(Some(&serialized))?; + assert!(deserialized.crs.is_none()); + assert_eq!(deserialized.algorithm, Some(Edges::Spherical)); + + Ok(()) + } + + /// Test metadata serialization with both CRS and algorithm + #[test] + fn test_metadata_full_roundtrip() -> ArrowResult<()> { + let metadata = Metadata::new(Some("srid:1234"), Some(Edges::Spherical)); + let wkb = WkbType::new(Some(metadata)); + + let serialized = wkb.serialize_metadata().unwrap(); + assert_eq!(serialized, r#"{"crs":"srid:1234","algorithm":"spherical"}"#); + + let deserialized = WkbType::deserialize_metadata(Some(&serialized))?; + assert_eq!( + deserialized.crs.unwrap(), + serde_json::Value::String("srid:1234".to_string()) + ); + assert_eq!(deserialized.algorithm, Some(Edges::Spherical)); + + Ok(()) + } + + /// Test deserialization of None metadata + #[test] + fn test_metadata_deserialize_none() -> ArrowResult<()> { + let deserialized = WkbType::deserialize_metadata(None)?; + assert!(deserialized.crs.is_none()); + assert!(deserialized.algorithm.is_none()); + Ok(()) + } + + /// Test deserialization of invalid JSON + #[test] + fn test_metadata_deserialize_invalid_json() { + let result = WkbType::deserialize_metadata(Some("not valid json {")); + assert!(matches!(result, Err(ArrowError::JsonError(_)))); + } + + /// Test metadata that results in a Geometry type hint + #[test] + fn test_type_hint_geometry() { + let metadata = Metadata::new(None, None); + assert!(matches!(metadata.type_hint(), Hint::Geometry)); + } + + /// Test metadata that results in a Geography type hint + #[test] + fn test_type_hint_edges_is_geography() { + let algorithms = vec![ + Edges::Spherical, + Edges::Vincenty, + Edges::Thomas, + Edges::Andoyer, + Edges::Karney, + ]; + for algo in algorithms { + let metadata = Metadata::new(None, Some(algo)); + assert!(matches!(metadata.type_hint(), Hint::Geography)); + } + } + + /// Test extension type integration using a Field + #[test] + fn test_extension_type_with_field() -> ArrowResult<()> { + let metadata = Metadata::new(Some("srid:1234"), None); + let wkb_type = WkbType::new(Some(metadata)); + + let mut field = Field::new("geometry", DataType::Binary, false); + field.try_with_extension_type(wkb_type)?; + + // Verify we can extract the extension type back + let extracted = field.try_extension_type::()?; + assert_eq!( + extracted.metadata().crs.as_ref().unwrap(), + &serde_json::Value::String(String::from("srid:1234")) + ); + + Ok(()) + } + + /// Test extension type DataType support + #[test] + fn test_extension_type_support() -> ArrowResult<()> { + let wkb = WkbType::default(); + // supported types + wkb.supports_data_type(&DataType::Binary)?; + wkb.supports_data_type(&DataType::LargeBinary)?; + wkb.supports_data_type(&DataType::BinaryView)?; + + // reject unsupported types with an error + let result = wkb.supports_data_type(&DataType::Utf8); + assert!(matches!(result, Err(ArrowError::InvalidArgumentError(_)))); + + Ok(()) + } + + /// Test CRS canonicalization logic for common lon/lat representations + #[test] + fn test_crs_canonicalization() -> ArrowResult<()> { + // EPSG:4326 as string should be omitted + let metadata = Metadata::new(Some("EPSG:4326"), None); + let wkb = WkbType::new(Some(metadata)); + let serialized = wkb.serialize_metadata().unwrap(); + assert_eq!(serialized, "{}"); + + // OGC:CRS84 as string should be omitted + let metadata = Metadata::new(Some("OGC:CRS84"), None); + let wkb = WkbType::new(Some(metadata)); + let serialized = wkb.serialize_metadata().unwrap(); + assert_eq!(serialized, "{}"); + + // A JSON object that reasonably looks like PROJJSON for EPSG:4326 should be omitted + // detect "4326" as a string + let crs_json = r#"{"id":{"authority":"EPSG","code":"4326"}}"#; + let metadata = Metadata::new(Some(crs_json), None); + let wkb = WkbType::new(Some(metadata)); + let serialized = wkb.serialize_metadata().unwrap(); + assert_eq!(serialized, "{}"); + + // detect 4326 as a number + let crs_json = r#"{"id":{"authority":"EPSG","code":4326}}"#; + let metadata = Metadata::new(Some(crs_json), None); + let wkb = WkbType::new(Some(metadata)); + let serialized = wkb.serialize_metadata().unwrap(); + assert_eq!(serialized, "{}"); + + // A JSON object that reasonably looks like PROJJSON for OGC:CRS84 should be omitted + let crs_json = r#"{"id":{"authority":"OGC","code":"CRS84"}}"#; + let metadata = Metadata::new(Some(crs_json), None); + let wkb = WkbType::new(Some(metadata)); + let serialized = wkb.serialize_metadata().unwrap(); + assert_eq!(serialized, "{}"); + + // Other input types should be preserved + let metadata = Metadata::new(Some("srid:1234"), None); + let wkb = WkbType::new(Some(metadata)); + let serialized = wkb.serialize_metadata().unwrap(); + assert_eq!(serialized, r#"{"crs":"srid:1234"}"#); + + // Canonicalization should work with algorithm field + let metadata = Metadata::new(Some("EPSG:4326"), Some(Edges::Spherical)); + let wkb = WkbType::new(Some(metadata)); + let serialized = wkb.serialize_metadata().unwrap(); + assert_eq!(serialized, r#"{"algorithm":"spherical"}"#); + + Ok(()) + } +} diff --git a/parquet/src/arrow/schema/extension.rs b/parquet/src/arrow/schema/extension.rs index fe3e856a6c38..247101964668 100644 --- a/parquet/src/arrow/schema/extension.rs +++ b/parquet/src/arrow/schema/extension.rs @@ -55,6 +55,17 @@ pub(crate) fn try_add_extension_type( LogicalType::Json => { arrow_field.try_with_extension_type(arrow_schema::extension::Json::default())?; } + #[cfg(feature = "geospatial")] + LogicalType::Geometry { crs } => { + let md = parquet_geospatial::WkbMetadata::new(crs.as_deref(), None); + arrow_field.try_with_extension_type(parquet_geospatial::WkbType::new(Some(md)))?; + } + #[cfg(feature = "geospatial")] + LogicalType::Geography { crs, algorithm } => { + let algorithm = algorithm.map(|a| a.try_as_edges()).transpose()?; + let md = parquet_geospatial::WkbMetadata::new(crs.as_deref(), algorithm); + arrow_field.try_with_extension_type(parquet_geospatial::WkbType::new(Some(md)))?; + } _ => {} }; Ok(arrow_field) @@ -75,6 +86,10 @@ pub(crate) fn has_extension_type(parquet_type: &Type) -> bool { LogicalType::Uuid => true, #[cfg(feature = "arrow_canonical_extension_types")] LogicalType::Json => true, + #[cfg(feature = "geospatial")] + LogicalType::Geometry { .. } => true, + #[cfg(feature = "geospatial")] + LogicalType::Geography { .. } => true, _ => false, } } @@ -133,3 +148,40 @@ pub(crate) fn logical_type_for_string(field: &Field) -> Option { pub(crate) fn logical_type_for_string(_field: &Field) -> Option { Some(LogicalType::String) } + +#[cfg(feature = "geospatial")] +pub(crate) fn logical_type_for_binary(field: &Field) -> Option { + use parquet_geospatial::WkbType; + use parquet_geospatial::WkbTypeHint; + + match field.extension_type_name() { + Some(n) if n == WkbType::NAME => match field.try_extension_type::() { + Ok(wkb_type) => match wkb_type.metadata().type_hint() { + WkbTypeHint::Geometry => Some(LogicalType::Geometry { + crs: wkb_type.metadata().crs.as_ref().map(|c| c.to_string()), + }), + WkbTypeHint::Geography => Some(LogicalType::Geography { + crs: wkb_type.metadata().crs.as_ref().map(|c| c.to_string()), + algorithm: wkb_type.metadata().algorithm.map(|a| a.into()), + }), + }, + Err(_e) => None, + }, + _ => None, + } +} + +#[cfg(not(feature = "geospatial"))] +pub(crate) fn logical_type_for_binary(field: &Field) -> Option { + None +} + +#[cfg(feature = "geospatial")] +pub(crate) fn logical_type_for_binary_view(field: &Field) -> Option { + logical_type_for_binary(field) +} + +#[cfg(not(feature = "geospatial"))] +pub(crate) fn logical_type_for_binary_view(field: &Field) -> Option { + None +} diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index b4a9ba7b7f1d..b33f9c14dde5 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -40,8 +40,9 @@ pub mod virtual_type; use super::PARQUET_FIELD_ID_META_KEY; use crate::arrow::ProjectionMask; use crate::arrow::schema::extension::{ - has_extension_type, logical_type_for_fixed_size_binary, logical_type_for_string, - logical_type_for_struct, try_add_extension_type, + has_extension_type, logical_type_for_binary, logical_type_for_binary_view, + logical_type_for_fixed_size_binary, logical_type_for_string, logical_type_for_struct, + try_add_extension_type, }; pub(crate) use complex::{ParquetField, ParquetFieldType, VirtualColumnType}; @@ -712,6 +713,7 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result { Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY) .with_repetition(repetition) .with_id(id) + .with_logical_type(logical_type_for_binary(field)) .build() } DataType::FixedSizeBinary(length) => { @@ -725,6 +727,7 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result { DataType::BinaryView => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY) .with_repetition(repetition) .with_id(id) + .with_logical_type(logical_type_for_binary_view(field)) .build(), DataType::Decimal32(precision, scale) | DataType::Decimal64(precision, scale) diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs index 9566454cb039..cb547ab8d2ed 100644 --- a/parquet/src/basic.rs +++ b/parquet/src/basic.rs @@ -1011,12 +1011,65 @@ pub enum EdgeInterpolationAlgorithm { _Unknown(i32), } +#[cfg(feature = "geospatial")] +impl EdgeInterpolationAlgorithm { + /// Converts an [`EdgeInterpolationAlgorithm`] into its corresponding algorithm defined by + /// [`parquet_geospatial::WkbEdges`]. + /// + /// This method will only return an Err if the [`EdgeInterpolationAlgorithm`] is the `_Unknown` + /// variant. + pub fn try_as_edges(&self) -> Result { + match &self { + Self::SPHERICAL => Ok(parquet_geospatial::WkbEdges::Spherical), + Self::VINCENTY => Ok(parquet_geospatial::WkbEdges::Vincenty), + Self::THOMAS => Ok(parquet_geospatial::WkbEdges::Thomas), + Self::ANDOYER => Ok(parquet_geospatial::WkbEdges::Andoyer), + Self::KARNEY => Ok(parquet_geospatial::WkbEdges::Karney), + unknown => Err(general_err!( + "Unknown edge interpolation algorithm: {}", + unknown + )), + } + } +} + impl fmt::Display for EdgeInterpolationAlgorithm { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.write_fmt(format_args!("{0:?}", self)) } } +#[cfg(feature = "geospatial")] +impl From for EdgeInterpolationAlgorithm { + fn from(value: parquet_geospatial::WkbEdges) -> Self { + match value { + parquet_geospatial::WkbEdges::Spherical => Self::SPHERICAL, + parquet_geospatial::WkbEdges::Vincenty => Self::VINCENTY, + parquet_geospatial::WkbEdges::Thomas => Self::THOMAS, + parquet_geospatial::WkbEdges::Andoyer => Self::ANDOYER, + parquet_geospatial::WkbEdges::Karney => Self::KARNEY, + } + } +} + +impl FromStr for EdgeInterpolationAlgorithm { + type Err = ParquetError; + + fn from_str(s: &str) -> Result { + match s.to_ascii_uppercase().as_str() { + "SPHERICAL" => Ok(EdgeInterpolationAlgorithm::SPHERICAL), + "VINCENTY" => Ok(EdgeInterpolationAlgorithm::VINCENTY), + "THOMAS" => Ok(EdgeInterpolationAlgorithm::THOMAS), + "ANDOYER" => Ok(EdgeInterpolationAlgorithm::ANDOYER), + "KARNEY" => Ok(EdgeInterpolationAlgorithm::KARNEY), + unknown => Err(general_err!( + "Unknown edge interpolation algorithm: {}", + unknown + )), + } + } +} + impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for EdgeInterpolationAlgorithm { fn read_thrift(prot: &mut R) -> Result { let val = prot.read_i32()?; @@ -2456,6 +2509,35 @@ mod tests { assert_eq!(EdgeInterpolationAlgorithm::KARNEY.to_string(), "KARNEY"); } + #[test] + fn test_from_str_edge_algo() { + assert_eq!( + "spHErical".parse::().unwrap(), + EdgeInterpolationAlgorithm::SPHERICAL + ); + assert_eq!( + "vinceNTY".parse::().unwrap(), + EdgeInterpolationAlgorithm::VINCENTY + ); + assert_eq!( + "tHOmas".parse::().unwrap(), + EdgeInterpolationAlgorithm::THOMAS + ); + assert_eq!( + "anDOYEr".parse::().unwrap(), + EdgeInterpolationAlgorithm::ANDOYER + ); + assert_eq!( + "kaRNey".parse::().unwrap(), + EdgeInterpolationAlgorithm::KARNEY + ); + assert!( + "does not exist" + .parse::() + .is_err() + ); + } + fn encodings_roundtrip(mut encodings: Vec) { encodings.sort(); let mask = EncodingMask::new_from_encodings(encodings.iter()); diff --git a/parquet/tests/geospatial.rs b/parquet/tests/geospatial.rs index b8c623a2a844..4f449df920e8 100644 --- a/parquet/tests/geospatial.rs +++ b/parquet/tests/geospatial.rs @@ -15,133 +15,145 @@ // specific language governing permissions and limitations // under the License. -//! Tests for Geometry and Geography logical types - -use parquet::{ - basic::{EdgeInterpolationAlgorithm, LogicalType}, - file::{ - metadata::ParquetMetaData, - reader::{FileReader, SerializedFileReader}, - }, - geospatial::bounding_box::BoundingBox, -}; -use serde_json::Value; -use std::fs::File; - -fn read_metadata(geospatial_test_file: &str) -> ParquetMetaData { - let path = format!( - "{}/geospatial/{geospatial_test_file}", - arrow::util::test_util::parquet_test_data(), - ); - let file = File::open(path).unwrap(); - let reader = SerializedFileReader::try_from(file).unwrap(); - reader.metadata().clone() -} - -#[test] -fn test_read_logical_type() { - // Some crs values are short strings - let expected_logical_type = [ - ("crs-default.parquet", LogicalType::Geometry { crs: None }), - ( - "crs-srid.parquet", - LogicalType::Geometry { - crs: Some("srid:5070".to_string()), - }, - ), - ( - "crs-projjson.parquet", - LogicalType::Geometry { - crs: Some("projjson:projjson_epsg_5070".to_string()), - }, - ), - ( - "crs-geography.parquet", - LogicalType::Geography { - crs: None, - algorithm: Some(EdgeInterpolationAlgorithm::SPHERICAL), - }, - ), - ]; - - for (geospatial_file, expected_type) in expected_logical_type { - let metadata = read_metadata(geospatial_file); - let column_descr = metadata.file_metadata().schema_descr().column(1); - let logical_type = column_descr.logical_type_ref().unwrap(); - - assert_eq!(logical_type, &expected_type); - } - - // The crs value may also contain arbitrary values (in this case some JSON - // a bit too lengthy to type out) - let metadata = read_metadata("crs-arbitrary-value.parquet"); - let column_descr = metadata.file_metadata().schema_descr().column(1); - let logical_type = column_descr.logical_type_ref().unwrap(); - - if let LogicalType::Geometry { crs } = logical_type { - let crs = crs.as_ref(); - let crs_parsed: Value = serde_json::from_str(crs.unwrap()).unwrap(); - assert_eq!(crs_parsed.get("id").unwrap().get("code").unwrap(), 5070); - } else { - panic!("Expected geometry type but got {logical_type:?}"); - } -} - -#[test] -fn test_read_geospatial_statistics() { - let metadata = read_metadata("geospatial.parquet"); - - // geospatial.parquet schema: - // optional binary field_id=-1 group (String); - // optional binary field_id=-1 wkt (String); - // optional binary field_id=-1 geometry (Geometry(crs=)); - let fields = metadata.file_metadata().schema().get_fields(); - let logical_type = fields[2].get_basic_info().logical_type_ref().unwrap(); - assert_eq!(logical_type, &LogicalType::Geometry { crs: None }); - - let geo_statistics = metadata.row_group(0).column(2).geo_statistics(); - assert!(geo_statistics.is_some()); - - let expected_bbox = BoundingBox::new(10.0, 40.0, 10.0, 40.0) - .with_zrange(30.0, 80.0) - .with_mrange(200.0, 1600.0); - let expected_geospatial_types = vec![ - 1, 2, 3, 4, 5, 6, 7, 1001, 1002, 1003, 1004, 1005, 1006, 1007, 2001, 2002, 2003, 2004, - 2005, 2006, 2007, 3001, 3002, 3003, 3004, 3005, 3006, 3007, - ]; - assert_eq!( - geo_statistics.unwrap().geospatial_types(), - Some(&expected_geospatial_types) - ); - assert_eq!(geo_statistics.unwrap().bounding_box(), Some(&expected_bbox)); -} - #[cfg(all(feature = "arrow", feature = "geospatial"))] mod test { //! Tests for Geometry and Geography logical types that require the arrow //! and/or geospatial features enabled - use super::*; - - use std::{iter::zip, sync::Arc}; + use std::{fs::File, iter::zip, sync::Arc}; use arrow_array::{ArrayRef, BinaryArray, RecordBatch, create_array}; - use arrow_schema::{DataType, Field, Schema}; + use arrow_schema::{DataType, Field, Schema, SchemaRef, extension::ExtensionType as _}; use bytes::Bytes; use parquet::{ - arrow::{ArrowWriter, arrow_writer::ArrowWriterOptions}, + arrow::{ + ArrowSchemaConverter, ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder, + arrow_writer::ArrowWriterOptions, + }, + basic::{EdgeInterpolationAlgorithm, LogicalType}, column::reader::ColumnReader, data_type::{ByteArray, ByteArrayType}, file::{ - metadata::RowGroupMetaData, + metadata::{ParquetMetaData, RowGroupMetaData}, properties::{EnabledStatistics, WriterProperties}, - reader::FileReader, + reader::{FileReader, SerializedFileReader}, writer::SerializedFileWriter, }, - geospatial::statistics::GeospatialStatistics, - schema::types::{SchemaDescriptor, Type}, + geospatial::{bounding_box::BoundingBox, statistics::GeospatialStatistics}, + schema::types::SchemaDescriptor, }; - use parquet_geospatial::testing::wkb_point_xy; + use parquet_geospatial::{WkbEdges, WkbMetadata, WkbType, testing::wkb_point_xy}; + use serde_json::Value; + + fn read_metadata(geospatial_test_file: &str) -> (Arc, SchemaRef) { + let path = format!( + "{}/geospatial/{geospatial_test_file}", + arrow::util::test_util::parquet_test_data(), + ); + let file = File::open(path).unwrap(); + let reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); + + (reader.metadata().clone(), reader.schema().clone()) + } + + #[test] + fn test_read_logical_type() { + // Some crs values are short strings + let expected_metadata = [ + ( + "crs-default.parquet", + LogicalType::Geometry { crs: None }, + WkbMetadata::new(None, None), + ), + ( + "crs-srid.parquet", + LogicalType::Geometry { + crs: Some("srid:5070".to_string()), + }, + WkbMetadata::new(Some("srid:5070"), None), + ), + ( + "crs-projjson.parquet", + LogicalType::Geometry { + crs: Some("projjson:projjson_epsg_5070".to_string()), + }, + WkbMetadata::new(Some("projjson:projjson_epsg_5070"), None), + ), + ( + "crs-geography.parquet", + LogicalType::Geography { + crs: None, + algorithm: Some(EdgeInterpolationAlgorithm::SPHERICAL), + }, + WkbMetadata::new(None, Some(WkbEdges::Spherical)), + ), + ]; + + for (geospatial_file, expected_type, expected_field_meta) in expected_metadata { + let (metadata, schema) = read_metadata(geospatial_file); + let column_descr = metadata.file_metadata().schema_descr().column(1); + let logical_type = column_descr.logical_type_ref().unwrap(); + + assert_eq!(logical_type, &expected_type); + + let field = schema.field(1); + let wkb_type = field.try_extension_type::().unwrap(); + + assert_eq!(wkb_type.metadata().crs, expected_field_meta.crs); + assert_eq!(wkb_type.metadata().algorithm, expected_field_meta.algorithm); + } + + // The crs value may also contain arbitrary values (in this case some JSON + // a bit too lengthy to type out) + let (metadata, schema) = read_metadata("crs-arbitrary-value.parquet"); + let column_descr = metadata.file_metadata().schema_descr().column(1); + let logical_type = column_descr.logical_type_ref().unwrap(); + + if let LogicalType::Geometry { crs } = logical_type { + let crs = crs.as_ref(); + let crs_parsed: Value = serde_json::from_str(crs.unwrap()).unwrap(); + assert_eq!(crs_parsed.get("id").unwrap().get("code").unwrap(), 5070); + } else { + panic!("Expected geometry type but got {logical_type:?}"); + } + + let field = schema.field(1); + let wkb_type = field.try_extension_type::().unwrap(); + assert_eq!( + wkb_type.metadata().crs.as_ref().unwrap()["id"]["code"], + 5070 + ); + assert_eq!(wkb_type.metadata().algorithm, None); + } + + #[test] + fn test_read_geospatial_statistics() { + let (metadata, _) = read_metadata("geospatial.parquet"); + + // geospatial.parquet schema: + // optional binary field_id=-1 group (String); + // optional binary field_id=-1 wkt (String); + // optional binary field_id=-1 geometry (Geometry(crs=)); + let fields = metadata.file_metadata().schema().get_fields(); + let logical_type = fields[2].get_basic_info().logical_type_ref().unwrap(); + assert_eq!(logical_type, &LogicalType::Geometry { crs: None }); + + let geo_statistics = metadata.row_group(0).column(2).geo_statistics(); + assert!(geo_statistics.is_some()); + + let expected_bbox = BoundingBox::new(10.0, 40.0, 10.0, 40.0) + .with_zrange(30.0, 80.0) + .with_mrange(200.0, 1600.0); + let expected_geospatial_types = vec![ + 1, 2, 3, 4, 5, 6, 7, 1001, 1002, 1003, 1004, 1005, 1006, 1007, 2001, 2002, 2003, 2004, + 2005, 2006, 2007, 3001, 3002, 3003, 3004, 3005, 3006, 3007, + ]; + assert_eq!( + geo_statistics.unwrap().geospatial_types(), + Some(&expected_geospatial_types) + ); + assert_eq!(geo_statistics.unwrap().bounding_box(), Some(&expected_bbox)); + } fn read_row_group_metadata(b: Bytes) -> Vec { let reader = SerializedFileReader::new(b).unwrap(); @@ -177,8 +189,7 @@ mod test { None, ]; - let root = parquet_schema_geometry(); - let schema = SchemaDescriptor::new(root.into()); + let schema = parquet_schema_geometry(); let props = WriterProperties::builder() .set_statistics_enabled(EnabledStatistics::Chunk) .build(); @@ -267,9 +278,7 @@ mod test { None, ]; - let root = parquet_schema_geometry(); - let schema = SchemaDescriptor::new(root.into()); - + let schema = parquet_schema_geometry(); let props = WriterProperties::builder() .set_statistics_enabled(EnabledStatistics::Chunk) .build(); @@ -360,8 +369,7 @@ mod test { let mut values = Vec::new(); let mut def_levels = Vec::new(); - let root = parquet_schema_geometry(); - let schema = SchemaDescriptor::new(root.into()); + let schema = parquet_schema_geometry(); let props = WriterProperties::builder() .set_statistics_enabled(EnabledStatistics::Chunk) .build(); @@ -406,17 +414,14 @@ mod test { } } - fn parquet_schema_geometry() -> Type { - Type::group_type_builder("root") - .with_fields(vec![ - Type::primitive_type_builder("geo", parquet::basic::Type::BYTE_ARRAY) - .with_logical_type(Some(LogicalType::Geometry { crs: None })) - .build() - .unwrap() - .into(), - ]) - .build() - .unwrap() + fn parquet_schema_geometry() -> SchemaDescriptor { + let wkb_meta = WkbMetadata::new(None, None); + let wkb_type = WkbType::new(Some(wkb_meta)); + + let field = Field::new("geo", DataType::Binary, true).with_extension_type(wkb_type); + let schema = Schema::new(vec![field]); + + ArrowSchemaConverter::new().convert(&schema).unwrap() } fn wkb_array_xy(coords: impl IntoIterator>) -> ArrayRef {