diff --git a/parquet-geospatial/Cargo.toml b/parquet-geospatial/Cargo.toml index ea19f9c9b49f..d35e3303dfe0 100644 --- a/parquet-geospatial/Cargo.toml +++ b/parquet-geospatial/Cargo.toml @@ -31,8 +31,11 @@ edition = { workspace = true } rust-version = { workspace = true } [dependencies] +arrow = { workspace = true } arrow-schema = { workspace = true } geo-traits = { version = "0.3" } +serde = { version = "1.0", default-features = false, features = ["derive"]} +serde_json = { version = "1.0", default-features = false, features = ["std"]} wkb = { version = "0.9.1" } [dev-dependencies] diff --git a/parquet-geospatial/src/lib.rs b/parquet-geospatial/src/lib.rs index 2ec60c44cc16..7704d623c266 100644 --- a/parquet-geospatial/src/lib.rs +++ b/parquet-geospatial/src/lib.rs @@ -30,3 +30,10 @@ pub mod bounding; pub mod interval; pub mod testing; + +mod types; + +pub use types::Hint as WkbTypeHint; +pub use types::Metadata as WkbMetadata; +pub use types::WkbArray; +pub use types::WkbType; diff --git a/parquet-geospatial/src/types.rs b/parquet-geospatial/src/types.rs new file mode 100644 index 000000000000..9ddb2c221f8e --- /dev/null +++ b/parquet-geospatial/src/types.rs @@ -0,0 +1,210 @@ +use std::sync::Arc; + +use arrow::array::{Array, ArrayRef, BinaryArray, BinaryViewArray, LargeBinaryArray, make_array}; +use arrow::buffer::NullBuffer; +use arrow::error::Result; +use arrow_schema::Field; +use arrow_schema::{ArrowError, DataType, extension::ExtensionType}; +use serde::{Deserialize, Serialize}; + +#[derive(Copy, Clone, Debug, Serialize, Deserialize)] +pub enum Hint { + Geometry, + Geography, +} + +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +pub struct Metadata { + #[serde(skip_serializing_if = "Option::is_none")] + pub crs: Option, // TODO: explore when this is valid JSON to avoid double escaping + #[serde(skip_serializing_if = "Option::is_none")] + pub algorithm: Option, + #[serde(skip_serializing_if = "Option::is_none")] + type_hint: Option, +} + +impl Metadata { + pub fn new(crs: Option, algorithm: Option) -> Self { + Self { + crs, + algorithm, + type_hint: None, + } + } + + pub fn with_type_hint(mut self, type_hint: Hint) -> Self { + self.type_hint = Some(type_hint); + self + } + + pub fn set_type_hint(&mut self, type_hint: Hint) { + self.type_hint = Some(type_hint) + } + + pub fn type_hint(&self) -> Option { + if self.type_hint.is_some() { + return self.type_hint; + } + + match &self.algorithm { + Some(s) if s.to_lowercase() == "planar" => Some(Hint::Geometry), + Some(_) => Some(Hint::Geography), + None => None, + } + } +} + +#[derive(Debug, Default)] +pub struct WkbType(Metadata); + +impl WkbType { + pub fn new_geometry(metadata: Option) -> Self { + Self(metadata.unwrap_or_default().with_type_hint(Hint::Geometry)) + } + + pub fn new_geography(metadata: Option) -> Self { + Self(metadata.unwrap_or_default().with_type_hint(Hint::Geography)) + } +} + +impl ExtensionType for WkbType { + const NAME: &'static str = "geoarrow.wkb"; + + type Metadata = Metadata; + + fn metadata(&self) -> &Self::Metadata { + &self.0 + } + + fn serialize_metadata(&self) -> Option { + serde_json::to_string(&self.0).ok() + } + + fn deserialize_metadata(metadata: Option<&str>) -> Result { + let Some(metadata) = metadata else { + return Ok(Self::Metadata::default()); + }; + + serde_json::from_str(metadata).map_err(|e| ArrowError::JsonError(e.to_string())) + } + + fn supports_data_type(&self, data_type: &arrow_schema::DataType) -> Result<()> { + match data_type { + DataType::Binary | DataType::LargeBinary | DataType::BinaryView => Ok(()), + dt => Err(ArrowError::InvalidArgumentError(format!( + "Geometry data type mismatch, expected one of Binary, LargeBinary, BinaryView. Found {dt}" + ))), + } + } + + fn try_new(data_type: &arrow_schema::DataType, metadata: Self::Metadata) -> Result { + let wkb = Self(metadata); + wkb.supports_data_type(data_type)?; + Ok(wkb) + } +} + +#[derive(Debug)] +pub struct WkbArray { + inner: ArrayRef, + metadata: Metadata, +} + +impl WkbArray { + pub fn try_new_geometry(inner: &dyn Array, metadata: Metadata) -> Result { + let inner = make_array(inner.to_data()); + let metadata = metadata.with_type_hint(Hint::Geometry); + + Ok(Self { inner, metadata }) + } + + pub fn try_new_geography(inner: &dyn Array, metadata: Metadata) -> Result { + let inner = make_array(inner.to_data()); + let metadata = metadata.with_type_hint(Hint::Geography); + + Ok(Self { inner, metadata }) + } + + /// Returns a reference to the underlying [`ArrayRef`] + pub fn inner(&self) -> &ArrayRef { + &self.inner + } + + /// Returns the inner [`ArrayRef`], consuming self + pub fn into_inner(self) -> ArrayRef { + self.inner + } + + /// Returns a reference to the [`Metadata`] associated with this [`WkbArray`] + pub fn metadata(&self) -> &Metadata { + &self.metadata + } + + /// Return the WKB (Well-Known-Binary) data stored at the given row + pub fn value(&self, index: usize) -> &[u8] { + if let Some(bv) = self.inner.as_any().downcast_ref::() { + bv.value(index) + } else if let Some(b) = self.inner.as_any().downcast_ref::() { + b.value(index) + } else if let Some(lb) = self.inner.as_any().downcast_ref::() { + lb.value(index) + } else { + // Panic safety: Our try_new method ensures our inner array is one of the expected + // binary array types + unreachable!() + } + } + + /// Return a [`Field`] to represent this [`WkbArray`] in an [`arrow_schema::Schema`] with a particular name + pub fn field(&self, name: impl Into) -> Field { + Field::new( + name.into(), + self.data_type().clone(), + self.inner.is_nullable(), + ) + .with_extension_type(WkbType(self.metadata().clone())) + } + + /// Returns a new [`DataType`] representing this [`WkbArray`]'s inner type + pub fn data_type(&self) -> &DataType { + self.inner.data_type() + } + + pub fn slice(&self, offset: usize, length: usize) -> Self { + let inner = self.inner.slice(offset, length); + Self { + inner, + metadata: self.metadata().clone(), + } + } + + pub fn len(&self) -> usize { + self.inner.len() + } + + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + pub fn nulls(&self) -> Option<&NullBuffer> { + self.inner.nulls() + } + + /// Is the element at index null? + pub fn is_null(&self, index: usize) -> bool { + self.nulls().is_some_and(|n| n.is_null(index)) + } + + /// Is the element at index valid (not null)? + pub fn is_valid(&self, index: usize) -> bool { + !self.is_null(index) + } + + // TODO: implement iter() +} + +impl From for ArrayRef { + fn from(wkb_array: WkbArray) -> Self { + Arc::new(wkb_array.into_inner()) + } +} diff --git a/parquet/src/arrow/schema/extension.rs b/parquet/src/arrow/schema/extension.rs index 9adec8b6c122..75e058a96d14 100644 --- a/parquet/src/arrow/schema/extension.rs +++ b/parquet/src/arrow/schema/extension.rs @@ -55,6 +55,25 @@ pub(crate) fn try_add_extension_type( LogicalType::Json => { arrow_field.try_with_extension_type(arrow_schema::extension::Json::default())?; } + #[cfg(feature = "geospatial")] + LogicalType::Geometry { crs } => { + use parquet_geospatial::WkbTypeHint; + + let md = parquet_geospatial::WkbMetadata::new(crs, None) + .with_type_hint(WkbTypeHint::Geometry); + arrow_field + .try_with_extension_type(parquet_geospatial::WkbType::new_geometry(Some(md)))?; + } + #[cfg(feature = "geospatial")] + LogicalType::Geography { crs, algorithm } => { + use parquet_geospatial::WkbTypeHint; + + let algorithm = algorithm.map(|a| a.to_string()); + let md = parquet_geospatial::WkbMetadata::new(crs, algorithm) + .with_type_hint(WkbTypeHint::Geography); + arrow_field + .try_with_extension_type(parquet_geospatial::WkbType::new_geography(Some(md)))?; + } _ => {} }; Ok(arrow_field) @@ -75,6 +94,10 @@ pub(crate) fn has_extension_type(parquet_type: &Type) -> bool { LogicalType::Uuid => true, #[cfg(feature = "arrow_canonical_extension_types")] LogicalType::Json => true, + #[cfg(feature = "geospatial")] + LogicalType::Geometry { .. } => true, + #[cfg(feature = "geospatial")] + LogicalType::Geography { .. } => true, _ => false, } } @@ -133,3 +156,46 @@ pub(crate) fn logical_type_for_string(field: &Field) -> Option { pub(crate) fn logical_type_for_string(_field: &Field) -> Option { Some(LogicalType::String) } + +#[cfg(feature = "geospatial")] +pub(crate) fn logical_type_for_binary(field: &Field) -> Option { + use parquet_geospatial::WkbType; + use parquet_geospatial::WkbTypeHint; + + match field.extension_type_name() { + Some(n) if n == WkbType::NAME => match field.try_extension_type::() { + Ok(wkb_type) => match wkb_type.metadata().type_hint() { + Some(WkbTypeHint::Geometry) => Some(LogicalType::Geometry { + crs: wkb_type.metadata().crs.clone(), + }), + Some(WkbTypeHint::Geography) => Some(LogicalType::Geography { + crs: wkb_type.metadata().crs.clone(), + algorithm: wkb_type + .metadata() + .algorithm + .clone() + .and_then(|a| a.parse().ok()), + }), + _ => None, + }, + Err(_e) => None, + }, + _ => None, + } +} + +#[cfg(not(feature = "geospatial"))] +pub(crate) fn logical_type_for_binary(field: &Field) -> Option { + None +} + +#[cfg(feature = "geospatial")] +pub(crate) fn logical_type_for_binary_view(field: &Field) -> Option { + // TODO: make this better-er + logical_type_for_binary(field) +} + +#[cfg(not(feature = "geospatial"))] +pub(crate) fn logical_type_for_binary_view(field: &Field) -> Option { + None +} diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index 75603ac86693..0bab91657b5c 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -39,8 +39,9 @@ mod primitive; use super::PARQUET_FIELD_ID_META_KEY; use crate::arrow::ProjectionMask; use crate::arrow::schema::extension::{ - has_extension_type, logical_type_for_fixed_size_binary, logical_type_for_string, - logical_type_for_struct, try_add_extension_type, + has_extension_type, logical_type_for_binary, logical_type_for_binary_view, + logical_type_for_fixed_size_binary, logical_type_for_string, logical_type_for_struct, + try_add_extension_type, }; pub(crate) use complex::{ParquetField, ParquetFieldType}; @@ -604,6 +605,7 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result { Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY) .with_repetition(repetition) .with_id(id) + .with_logical_type(logical_type_for_binary(field)) .build() } DataType::FixedSizeBinary(length) => { @@ -617,6 +619,7 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result { DataType::BinaryView => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY) .with_repetition(repetition) .with_id(id) + .with_logical_type(logical_type_for_binary_view(field)) .build(), DataType::Decimal32(precision, scale) | DataType::Decimal64(precision, scale) diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs index eaa889bb99f1..6049bf24f5f3 100644 --- a/parquet/src/basic.rs +++ b/parquet/src/basic.rs @@ -1017,6 +1017,24 @@ impl fmt::Display for EdgeInterpolationAlgorithm { } } +impl FromStr for EdgeInterpolationAlgorithm { + type Err = ParquetError; + + fn from_str(s: &str) -> Result { + match s.to_ascii_uppercase().as_str() { + "SPHERICAL" => Ok(EdgeInterpolationAlgorithm::SPHERICAL), + "VINCENTY" => Ok(EdgeInterpolationAlgorithm::VINCENTY), + "THOMAS" => Ok(EdgeInterpolationAlgorithm::THOMAS), + "ANDOYER" => Ok(EdgeInterpolationAlgorithm::ANDOYER), + "KARNEY" => Ok(EdgeInterpolationAlgorithm::KARNEY), + unknown => Err(general_err!( + "Unknown edge interpolation algorithm: {}", + unknown + )), + } + } +} + impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for EdgeInterpolationAlgorithm { fn read_thrift(prot: &mut R) -> Result { let val = prot.read_i32()?; diff --git a/parquet/src/geospatial/mod.rs b/parquet/src/geospatial/mod.rs index 9d55fca89d46..408e367ee3b8 100644 --- a/parquet/src/geospatial/mod.rs +++ b/parquet/src/geospatial/mod.rs @@ -49,3 +49,198 @@ pub mod accumulator; pub mod bounding_box; pub mod statistics; + +#[cfg(test)] +mod tests { + use std::{path::PathBuf, sync::Arc}; + + use arrow::util::test_util::parquet_test_data; + use arrow_array::{ArrayRef, BinaryArray, RecordBatch}; + use arrow_schema::{Schema, extension::ExtensionType}; + use bytes::Bytes; + use parquet_geospatial::{WkbArray, WkbMetadata, WkbType}; + + use crate::{ + arrow::{ArrowWriter, arrow_reader::ArrowReaderBuilder}, + basic::EdgeInterpolationAlgorithm, + file::{ + metadata::{ParquetMetaData, ParquetMetaDataReader}, + reader::ChunkReader, + }, + }; + + /// Ensure a file with Geometry LogicalType, written by another writer in + /// parquet-testing, can be read as a GeometryArray + #[test] + fn read_geometry_logical_type() { + let batch = read_geospatial_test_case("crs-projjson.parquet"); + + let wkb_type = assert_geometry_metadata(&batch, "geometry"); + let geom_column = batch + .column_by_name("geometry") + .expect("expected geometry column"); + let wkb_array = WkbArray::try_new_geometry(geom_column, wkb_type.metadata().clone()) + .expect("expected geometry column to be a WkbArray"); + + // verify the value + assert_eq!(wkb_array.len(), 1); + assert!(wkb_array.is_valid(0)); + let wkb_value = wkb_array.value(0); + assert_eq!(wkb_value.len(), 3549); + } + + /// Ensure a file with Geography LogicalType, written by another writer in + /// parquet-testing, can be read as a GeometryArray + #[test] + fn read_geography_logical_type() { + let batch = read_geospatial_test_case("crs-geography.parquet"); + + let wkb_type = assert_geometry_metadata(&batch, "geography"); + let geom_column = batch + .column_by_name("geography") + .expect("expected geography column"); + let wkb_array = WkbArray::try_new_geography(geom_column, wkb_type.metadata().clone()) + .expect("expected geography column to be a WkbArray"); + + // verify the value + assert_eq!(wkb_array.len(), 1); + assert!(wkb_array.is_valid(0)); + let wkb_value = wkb_array.value(0); + assert_eq!(wkb_value.len(), 3549); + } + + /// Writes a wkb (geometry) array to a parquet file and ensures the parquet logical type + /// annotation is correct + #[test] + fn write_geometry_logical_type() { + let array = geometry_array(); + let batch = wkb_array_to_batch(array); + let buffer = write_to_buffer(&batch); + + // read the parquet file's metadata and verify the logical type + let metadata = read_metadata(&Bytes::from(buffer)); + let schema = metadata.file_metadata().schema_descr(); + let fields = schema.root_schema().get_fields(); + assert_eq!(fields.len(), 1); + let field = &fields[0]; + assert_eq!(field.name(), "data"); + // data should have been written with the Variant logical type + assert_eq!( + field.get_basic_info().logical_type(), + Some(crate::basic::LogicalType::Geometry { + crs: Some(String::from("test crs")) + }) + ); + } + + /// Writes a wkb (geography) array to a parquet file and ensures the parquet logical type + /// annotation is correct + #[test] + fn write_geography_logical_type() { + let array = geography_array(); + let batch = wkb_array_to_batch(array); + let buffer = write_to_buffer(&batch); + + // read the parquet file's metadata and verify the logical type + let metadata = read_metadata(&Bytes::from(buffer)); + let schema = metadata.file_metadata().schema_descr(); + let fields = schema.root_schema().get_fields(); + assert_eq!(fields.len(), 1); + let field = &fields[0]; + assert_eq!(field.name(), "data"); + // data should have been written with the Variant logical type + assert_eq!( + field.get_basic_info().logical_type(), + Some(crate::basic::LogicalType::Geography { + crs: Some(String::from("test crs")), + algorithm: Some(EdgeInterpolationAlgorithm::SPHERICAL), + }) + ); + } + + /// Return a WkbArray with 3 rows: + fn geometry_array() -> WkbArray { + let values: Vec<&[u8]> = vec![b"not", b"actually", b"wkb"]; + let inner = BinaryArray::from_vec(values); + let md = WkbMetadata::new(Some(String::from("test crs")), None); + + WkbArray::try_new_geometry(&inner, md.clone()).unwrap() + } + + /// Return a WkbArray with 3 rows: + fn geography_array() -> WkbArray { + let values: Vec<&[u8]> = vec![b"not", b"actually", b"wkb"]; + let inner = BinaryArray::from_vec(values); + let md = WkbMetadata::new(Some(String::from("test crs")), None); + + WkbArray::try_new_geography(&inner, md.clone()).unwrap() + } + + /// creates a RecordBatch with a single column "data" from a WkbArray, + fn wkb_array_to_batch(array: WkbArray) -> RecordBatch { + let field = array.field("data"); + let schema = Schema::new(vec![field]); + RecordBatch::try_new(Arc::new(schema), vec![ArrayRef::from(array)]).unwrap() + } + + /// writes a RecordBatch to memory buffer and returns the buffer + fn write_to_buffer(batch: &RecordBatch) -> Vec { + let mut buffer = vec![]; + let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap(); + writer.write(batch).unwrap(); + writer.close().unwrap(); + buffer + } + + /// Reads the Parquet metadata + fn read_metadata(input: &T) -> ParquetMetaData { + let mut reader = ParquetMetaDataReader::new(); + reader.try_parse(input).unwrap(); + reader.finish().unwrap() + } + + /// Verifies the geospatial metadata is present in the schema for the specified + /// field name. + fn assert_geometry_metadata(batch: &RecordBatch, field_name: &str) -> WkbType { + println!("{batch:?}"); + let schema = batch.schema(); + let field = schema + .field_with_name(field_name) + .expect("could not find expected field"); + + // explicitly check the metadata so it is clear in the tests what the + // names are + let metadata_value = field + .metadata() + .get("ARROW:extension:name") + .expect("metadata does not exist"); + + assert_eq!(metadata_value, "geoarrow.wkb"); + + // verify that `GeometryType` also correctly finds the metadata + field + .try_extension_type::() + .expect("WkbExtensionType should be readable") + } + + /// Reads a RecordBatch from a reader (e.g. Vec or File) + fn read_to_batch(reader: T) -> RecordBatch { + let reader = ArrowReaderBuilder::try_new(reader) + .unwrap() + .build() + .unwrap(); + let mut batches: Vec = reader.collect::, _>>().unwrap(); + assert_eq!(batches.len(), 1); + batches.swap_remove(0) + } + + /// Read the specified test case filename from parquet-testing + /// See parquet-testing/geospatial/README.md for more details + fn read_geospatial_test_case(name: &str) -> RecordBatch { + let case_file = PathBuf::from(parquet_test_data()) + .join("geospatial") + .join(name); + let case_file = std::fs::File::open(case_file).unwrap(); + read_to_batch(case_file) + } +}