diff --git a/Cargo.lock b/Cargo.lock index aa5abc6c..bc7a82b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4525,6 +4525,7 @@ dependencies = [ "polars-core", "pyo3", "query_processing", + "quick-xml 0.39.2", "rayon", "representation", "ryu", diff --git a/Cargo.toml b/Cargo.toml index ba5548bf..77717016 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,6 +71,7 @@ md-5 = "0.11.0" sha1 = "0.11.0" hex = "0.4.3" memchr = "2.8.0" +quick-xml = "0.39.2" tracing = { version = "0.1", features = [ "log" ] } tracing-subscriber = { version = "0.3.19", features = [ "env-filter" ] } diff --git a/lib/cimxml_import/src/lib.rs b/lib/cimxml_import/src/lib.rs index 51d3d142..a922eec9 100644 --- a/lib/cimxml_import/src/lib.rs +++ b/lib/cimxml_import/src/lib.rs @@ -1,11 +1,25 @@ -use oxrdf::{GraphName, NamedOrBlankNode, Quad, Term}; +use oxrdf::{GraphName, NamedNode, NamedOrBlankNode, Quad, Term}; use std::collections::HashMap; +use representation::BaseRDFNodeTypeRef; + type MapType = HashMap, Vec)>>; -pub fn remap_predicate_datatype( - _predicate_map: HashMap>, -) -> HashMap> { - unimplemented!("Contact Data Treehouse to try!") +pub struct Remapper { +} + +impl Remapper { + pub fn new() -> Self { + Remapper { + } + } + + pub fn remap_predicate_datatype<'a>( + &'a self, + _predicate: &NamedNode, + _data_type: &BaseRDFNodeTypeRef<'a>, + ) -> BaseRDFNodeTypeRef<'a> { + unimplemented!("Contact Data Treehouse to try!") + } } pub fn fix_cim_quad(_quad: Quad, _base_iri: Option<&String>) -> Quad { diff --git a/lib/in_memory/src/lib.rs b/lib/in_memory/src/lib.rs index 60113aa0..fb7c2b8b 100644 --- a/lib/in_memory/src/lib.rs +++ b/lib/in_memory/src/lib.rs @@ -1,5 +1,4 @@ #![feature(pattern)] -#![feature(str_as_str)] use arrow::array::{Array, RecordBatch, StringArray, UInt32Array}; use arrow::datatypes::{Field, Schema}; diff --git a/lib/maplib/src/model.rs b/lib/maplib/src/model.rs index dba0e711..27e7024c 100644 --- a/lib/maplib/src/model.rs +++ b/lib/maplib/src/model.rs @@ -225,6 +225,35 @@ impl Model { .map_err(MaplibError::TriplestoreError) } + #[instrument(skip_all)] + pub fn map_xml_path( + &mut self, + path: &Path, + graph: &NamedGraph, + transient: bool, + ) -> Result<(), MaplibError> { + self.add_json_prefixes(); + let mut u8s = fs::read(path).map_err(|x| TriplestoreError::XMLError(x.to_string()))?; + self.triplestore + .map_xml(&mut u8s, graph, transient) + .map_err(MaplibError::TriplestoreError) + } + + #[instrument(skip_all)] + pub fn map_xml_string( + &mut self, + mut p: String, + graph: &NamedGraph, + transient: bool, + ) -> Result<(), MaplibError> { + self.add_json_prefixes(); + //Safety: we are never reading this vec back to a string + let u8s = unsafe { p.as_mut_vec() }; + self.triplestore + .map_xml(u8s, graph, transient) + .map_err(MaplibError::TriplestoreError) + } + #[allow(clippy::too_many_arguments)] #[instrument(skip_all)] pub fn read_triples( diff --git a/lib/query_processing/src/expressions.rs b/lib/query_processing/src/expressions.rs index 3704a93d..00156fb0 100644 --- a/lib/query_processing/src/expressions.rs +++ b/lib/query_processing/src/expressions.rs @@ -10,7 +10,7 @@ use oxrdf::vocab::{rdf, xsd}; use oxrdf::{BlankNode, Literal, NamedNode, Variable}; use polars::frame::UniqueKeepStrategy; use polars::prelude::{ - as_struct, by_name, coalesce, col, lit, DataType, Expr, IntoLazy, JoinArgs, JoinType, + as_struct, by_name, coalesce, col, lit, DataType, Expr, JoinArgs, JoinType, LazyFrame, LiteralValue, Operator, Scalar, }; use representation::cats::{maybe_decode_expr, Cats, LockedCats}; diff --git a/lib/representation/src/cats/decode.rs b/lib/representation/src/cats/decode.rs index 92781d2f..ff1734cc 100644 --- a/lib/representation/src/cats/decode.rs +++ b/lib/representation/src/cats/decode.rs @@ -24,6 +24,7 @@ impl CatEncs { let mut uch: Vec<_> = ser.u32()?.iter().collect(); let mut need_global = true; //Just a trick for the unlocked local to live long enough + #[allow(unused_assignments)] let mut unlocked_local = None; let decoded_local_vec = if let Some(local_cats) = &local_cats { unlocked_local = Some(local_cats.read().unwrap()); diff --git a/lib/representation/src/errors.rs b/lib/representation/src/errors.rs index d0b7dbe2..817a5816 100644 --- a/lib/representation/src/errors.rs +++ b/lib/representation/src/errors.rs @@ -6,4 +6,6 @@ pub enum RepresentationError { DatatypeError(String), #[error("Invalid literal `{0}`")] InvalidLiteralError(String), + #[error("Literal parse error `{0}`")] + LiteralParseError(String), } diff --git a/lib/representation/src/lib.rs b/lib/representation/src/lib.rs index af51e57c..f439f632 100644 --- a/lib/representation/src/lib.rs +++ b/lib/representation/src/lib.rs @@ -20,6 +20,8 @@ pub mod rdf_to_polars; mod rdf_type; pub mod solution_mapping; pub mod subtypes; +pub mod series_builder; +pub use series_builder::SeriesBuilder; pub use base_rdf_type::*; pub use rdf_state::*; diff --git a/lib/representation/src/series_builder.rs b/lib/representation/src/series_builder.rs new file mode 100644 index 00000000..02f8cf51 --- /dev/null +++ b/lib/representation/src/series_builder.rs @@ -0,0 +1,370 @@ +use crate::errors::RepresentationError; +use crate::rdf_to_polars::{ + default_decimal_precision, default_decimal_scale, default_time_unit, default_time_zone, +}; +use crate::{ + BaseRDFNodeType, LANG_STRING_LANG_FIELD, LANG_STRING_VALUE_FIELD, OBJECT_COL_NAME, +}; +use chrono::{DateTime, NaiveDate, NaiveDateTime, Utc}; +use memchr::memchr; +use oxrdf::vocab::{rdf, xsd}; +use oxrdf::{NamedOrBlankNode, Term}; +use polars::prelude::{as_struct, col, DataType, IntoLazy, NamedFrom, PlSmallStr, Series}; +use polars_core::frame::DataFrame; +use polars_core::prelude::{Int128Chunked, IntoSeries, NamedFromOwned, StringChunkedBuilder}; +use rust_decimal::Decimal; +use std::collections::HashMap; +use std::str::FromStr; + +pub type SubjectObjectBuilders = (SeriesBuilder, SeriesBuilder); +pub type ByObjectType = HashMap; +pub type BySubjectType = HashMap; +pub type PredMap = HashMap; + +pub enum SeriesBuilder { + String(StringChunkedBuilder, usize), + Bool(Vec), + U8(Vec), + U16(Vec), + U32(Vec), + U64(Vec), + I8(Vec), + I16(Vec), + I32(Vec), + I64(Vec), + F32(Vec), + F64(Vec), + Date(Vec), + Datetime(Vec), + Decimal(Vec), + LangString { + values: StringChunkedBuilder, + langs: StringChunkedBuilder, + len: usize, + }, +} + +impl SeriesBuilder { + pub fn new(dt: &BaseRDFNodeType) -> Self { + Self::with_capacity(dt, 0) + } + + pub fn with_capacity(dt: &BaseRDFNodeType, cap: usize) -> Self { + match dt { + BaseRDFNodeType::IRI | BaseRDFNodeType::BlankNode => { + SeriesBuilder::String(StringChunkedBuilder::new("s".into(), cap), 0) + } + BaseRDFNodeType::Literal(l) => match l.as_ref() { + xsd::BOOLEAN => SeriesBuilder::Bool(Vec::with_capacity(cap)), + xsd::UNSIGNED_BYTE => SeriesBuilder::U8(Vec::with_capacity(cap)), + xsd::UNSIGNED_SHORT => SeriesBuilder::U16(Vec::with_capacity(cap)), + xsd::UNSIGNED_INT => SeriesBuilder::U32(Vec::with_capacity(cap)), + xsd::UNSIGNED_LONG => SeriesBuilder::U64(Vec::with_capacity(cap)), + xsd::BYTE => SeriesBuilder::I8(Vec::with_capacity(cap)), + xsd::SHORT => SeriesBuilder::I16(Vec::with_capacity(cap)), + xsd::INT => SeriesBuilder::I32(Vec::with_capacity(cap)), + xsd::INTEGER | xsd::LONG => SeriesBuilder::I64(Vec::with_capacity(cap)), + xsd::FLOAT => SeriesBuilder::F32(Vec::with_capacity(cap)), + xsd::DOUBLE => SeriesBuilder::F64(Vec::with_capacity(cap)), + xsd::DATE => SeriesBuilder::Date(Vec::with_capacity(cap)), + xsd::DATE_TIME => SeriesBuilder::Datetime(Vec::with_capacity(cap)), + xsd::DATE_TIME_STAMP => SeriesBuilder::Datetime(Vec::with_capacity(cap)), + xsd::DECIMAL => SeriesBuilder::Decimal(Vec::with_capacity(cap)), + rdf::LANG_STRING => SeriesBuilder::LangString { + values: StringChunkedBuilder::new(LANG_STRING_VALUE_FIELD.into(), cap), + langs: StringChunkedBuilder::new(LANG_STRING_LANG_FIELD.into(), cap), + len: 0, + }, + _ => SeriesBuilder::String(StringChunkedBuilder::new("s".into(), cap), 0), + }, + BaseRDFNodeType::None => { + unreachable!("Should never happen") + } + } + } + + pub fn len(&self) -> usize { + match self { + SeriesBuilder::String(_, l) => *l, + SeriesBuilder::Bool(v) => v.len(), + SeriesBuilder::U8(v) => v.len(), + SeriesBuilder::U16(v) => v.len(), + SeriesBuilder::U32(v) => v.len(), + SeriesBuilder::U64(v) => v.len(), + SeriesBuilder::I8(v) => v.len(), + SeriesBuilder::I16(v) => v.len(), + SeriesBuilder::I32(v) => v.len(), + SeriesBuilder::I64(v) => v.len(), + SeriesBuilder::F32(v) => v.len(), + SeriesBuilder::F64(v) => v.len(), + SeriesBuilder::Date(v) => v.len(), + SeriesBuilder::Datetime(v) => v.len(), + SeriesBuilder::Decimal(v) => v.len(), + SeriesBuilder::LangString { len, .. } => *len, + } + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub fn push_str(&mut self, s: &str) { + if let SeriesBuilder::String(v, l) = self { + *l = *l + 1; + v.append_value(s); + } else { + panic!("Should never be used for non string builder") + } + } + + pub fn push_named_or_blank(&mut self, named_or_blank_node: &NamedOrBlankNode) { + match named_or_blank_node { + NamedOrBlankNode::NamedNode(v) => { + self.push_str(v.as_str()); + } + NamedOrBlankNode::BlankNode(v) => { + self.push_str(v.as_str()) + } + } + } + + pub fn parse_term(&mut self, term: &Term) -> Result<(), RepresentationError> { + match term { + Term::NamedNode(nn) => { + self.push_str(nn.as_str()); + Ok(()) + } + Term::BlankNode(bl) => { + self.push_str(bl.as_str()); + Ok(()) + } + Term::Literal(l) => { + self.parse_literal(l.value(), l.language()) + } + } + } + + pub fn parse_literal( + &mut self, + lex: &str, + lang: Option<&str>, + ) -> Result<(), RepresentationError> { + match self { + SeriesBuilder::String(v, l) => { + *l = *l + 1; + v.append_value(lex); + } + SeriesBuilder::Bool(v) => { + let b = bool::from_str(&lex) + .map_err(|x| RepresentationError::LiteralParseError(x.to_string()))?; + v.push(b); + } + SeriesBuilder::U8(v) => { + let u = u8::from_str(&lex) + .map_err(|x| RepresentationError::LiteralParseError(x.to_string()))?; + v.push(u); + } + SeriesBuilder::U16(v) => { + let u = u16::from_str(&lex) + .map_err(|x| RepresentationError::LiteralParseError(x.to_string()))?; + v.push(u); + } + SeriesBuilder::U32(v) => { + let u = u32::from_str(&lex) + .map_err(|x| RepresentationError::LiteralParseError(x.to_string()))?; + v.push(u); + } + SeriesBuilder::U64(v) => { + let u = u64::from_str(&lex) + .map_err(|x| RepresentationError::LiteralParseError(x.to_string()))?; + v.push(u); + } + SeriesBuilder::I8(v) => { + let i = i8::from_str(&lex) + .map_err(|x| RepresentationError::LiteralParseError(x.to_string()))?; + v.push(i); + } + SeriesBuilder::I16(v) => { + let i = i16::from_str(&lex) + .map_err(|x| RepresentationError::LiteralParseError(x.to_string()))?; + v.push(i); + } + SeriesBuilder::I32(v) => { + let i = i32::from_str(&lex) + .map_err(|x| RepresentationError::LiteralParseError(x.to_string()))?; + v.push(i); + } + SeriesBuilder::I64(v) => { + let i = i64::from_str(&lex) + .map_err(|x| RepresentationError::LiteralParseError(x.to_string()))?; + v.push(i); + } + SeriesBuilder::F32(v) => { + let f = f32::from_str(&lex) + .map_err(|x| RepresentationError::LiteralParseError(x.to_string()))?; + v.push(f); + } + SeriesBuilder::F64(v) => { + let f = f64::from_str(&lex) + .map_err(|x| RepresentationError::LiteralParseError(x.to_string()))?; + v.push(f); + } + SeriesBuilder::Date(v) => { + let d = parse_xsd_date(&lex)?; + v.push(d); + } + SeriesBuilder::Datetime(v) => { + let i = parse_xsd_datetime_micros(&lex)?; + v.push(i); + } + SeriesBuilder::Decimal(v) => { + let i = parse_xsd_decimal(&lex)?; + v.push(i); + } + SeriesBuilder::LangString { values, langs, len } => { + if let Some(lang) = lang { + values.append_value(lex.to_string()); + langs.append_value(lang.to_string()); + *len = *len + 1; + } else { + return Err(RepresentationError::LiteralParseError( + "Lang string missing language tag".to_string(), + )); + } + } + } + Ok(()) + } + + pub fn into_series(self, name: &str) -> Series { + match self { + SeriesBuilder::String(v, ..) => { + let s = v.finish(); + let mut s = s.into_series(); + s.rename(name.into()); + s + } + SeriesBuilder::Bool(v) => Series::new(name.into(), v), + SeriesBuilder::U8(v) => Series::from_vec(name.into(), v), + SeriesBuilder::U16(v) => Series::from_vec(name.into(), v), + SeriesBuilder::U32(v) => Series::from_vec(name.into(), v), + SeriesBuilder::U64(v) => Series::from_vec(name.into(), v), + SeriesBuilder::I8(v) => Series::from_vec(name.into(), v), + SeriesBuilder::I16(v) => Series::from_vec(name.into(), v), + SeriesBuilder::I32(v) => Series::from_vec(name.into(), v), + SeriesBuilder::I64(v) => Series::from_vec(name.into(), v), + SeriesBuilder::F32(v) => Series::from_vec(name.into(), v), + SeriesBuilder::F64(v) => Series::from_vec(name.into(), v), + SeriesBuilder::Date(v) => Series::from_vec(name.into(), v) + .cast(&DataType::Date) + .unwrap(), + SeriesBuilder::Datetime(v) => Series::from_vec(name.into(), v) + .cast(&DataType::Datetime( + default_time_unit(), + Some(default_time_zone()), + )) + .unwrap(), + SeriesBuilder::Decimal(v) => { + let i128ch = Int128Chunked::from_vec(name.into(), v); + let dec = i128ch + .into_decimal(default_decimal_precision(), default_decimal_scale()) + .unwrap(); + let mut ser = dec.into_series(); + ser.rename(PlSmallStr::from_str(name)); + ser + } + SeriesBuilder::LangString { values, langs, len } => { + let mut val_ser = values.finish().into_series(); + val_ser.rename(LANG_STRING_VALUE_FIELD.into()); + let mut lang_ser = langs.finish().into_series(); + lang_ser.rename(LANG_STRING_LANG_FIELD.into()); + let mut df = DataFrame::new(len, vec![val_ser.into(), lang_ser.into()]) + .unwrap() + .lazy() + .with_column( + as_struct(vec![ + col(LANG_STRING_VALUE_FIELD), + col(LANG_STRING_LANG_FIELD), + ]) + .alias(OBJECT_COL_NAME), + ) + .collect() + .unwrap(); + let mut ser = df + .drop_in_place(OBJECT_COL_NAME) + .unwrap() + .take_materialized_series(); + ser.rename(PlSmallStr::from_str(name)); + ser + } + } + } +} + +fn parse_xsd_date(value: &str) -> Result { + let use_value = if let Some(spot) = memchr(b'+', value.as_bytes()) { + &value[0..spot] + } else { + value + }; + match NaiveDate::parse_from_str(use_value, "%Y-%m-%d") { + Ok(parsed) => { + let dur = parsed.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()); + Ok(dur.num_days() as i32) + } + Err(x) => Err(RepresentationError::LiteralParseError(x.to_string())), + } +} + +fn parse_xsd_datetime_micros(value: &str) -> Result { + if let Ok(dt) = value.parse::>() { + Ok(dt.naive_utc().and_utc().timestamp_micros()) + } else if let Ok(dt) = value.parse::() { + Ok(dt.and_utc().timestamp_micros()) + } else { + Err(RepresentationError::LiteralParseError(format!( + "Could not parse datetime: {}", + value + ))) + } +} + +fn parse_xsd_decimal(value: &str) -> Result { + match Decimal::from_str(value) { + Ok(mut d) => { + d.rescale(default_decimal_scale() as u32); + Ok(d.mantissa()) + } + Err(x) => Err(RepresentationError::LiteralParseError(x.to_string())), + } +} + +pub fn ensure_pair<'a>( + pred_map: &'a mut PredMap, + predicate: &str, + subject_dt: &BaseRDFNodeType, + object_dt: &BaseRDFNodeType, +) -> &'a mut SubjectObjectBuilders { + if !pred_map.contains_key(predicate) { + pred_map.insert(predicate.to_string(), BySubjectType::new()); + }; + let bst = pred_map.get_mut(predicate).unwrap(); + + if !bst.contains_key(subject_dt) { + bst.insert(subject_dt.clone(), ByObjectType::new()); + } + + let bot = bst.get_mut(subject_dt).unwrap(); + + if !bot.contains_key(object_dt) { + bot.insert( + object_dt.clone(), + ( + SeriesBuilder::new(subject_dt), + SeriesBuilder::new(object_dt), + ), + ); + } + let pair = bot.get_mut(object_dt).unwrap(); + pair +} diff --git a/lib/triplestore/Cargo.toml b/lib/triplestore/Cargo.toml index bf97249d..2147c9b8 100644 --- a/lib/triplestore/Cargo.toml +++ b/lib/triplestore/Cargo.toml @@ -36,6 +36,7 @@ aho-corasick.workspace = true simd-json.workspace = true serde_json.workspace = true ordered-float.workspace = true +quick-xml.workspace = true pyo3 = { workspace = true, optional = true } diff --git a/lib/triplestore/src/errors.rs b/lib/triplestore/src/errors.rs index 620c6e7d..5c92775f 100644 --- a/lib/triplestore/src/errors.rs +++ b/lib/triplestore/src/errors.rs @@ -50,6 +50,8 @@ pub enum TriplestoreError { BadListError(String), #[error("Error reading JSON file: {0}")] ReadJSONFileError(String), + #[error("XML error: {0}")] + XMLError(String), #[error("Prefix IRI is invalid: {0}")] InvalidPrefixIRI(String), #[error("Please add the document string corresponding to the url to known_contexts: {0}")] diff --git a/lib/triplestore/src/lib.rs b/lib/triplestore/src/lib.rs index c93311bd..95cef833 100644 --- a/lib/triplestore/src/lib.rs +++ b/lib/triplestore/src/lib.rs @@ -5,6 +5,7 @@ pub mod cats; mod dblf; pub mod errors; mod map_json; +mod map_xml; pub mod native_parquet_write; pub mod query_solutions; pub mod rdfs_inferencing; diff --git a/lib/triplestore/src/map_xml.rs b/lib/triplestore/src/map_xml.rs new file mode 100644 index 00000000..3bc76918 --- /dev/null +++ b/lib/triplestore/src/map_xml.rs @@ -0,0 +1,373 @@ +use crate::errors::TriplestoreError; +use crate::{TriplesToAdd, Triplestore}; +use oxrdf::vocab::{rdf, xsd}; +use oxrdf::NamedNode; +use polars::prelude::{DataFrame, PlSmallStr}; +use polars_core::prelude::IntoColumn; +use quick_xml::escape::unescape; +use quick_xml::events::Event; +use quick_xml::Reader; +use representation::constants::RDF_PREFIX_IRI; +use representation::dataset::NamedGraph; +use representation::{BaseRDFNodeType, OBJECT_COL_NAME, SUBJECT_COL_NAME}; +use std::collections::HashMap; +use std::io::Cursor; +use std::sync::Arc; +use representation::series_builder::{ensure_pair, PredMap}; + +const XML_ROOT: &str = "http://sparql.xyz/facade-x/ns/root"; +const DEFAULT_XML_DATA_PREFIX: &str = "http://sparql.xyz/facade-x/data/"; + +struct Frame { + subject: String, + next_child: usize, + data_type: Option>, + previous_base_prefix: Option, + introduced_prefixed_namespaces: Vec<(String, Option)>, +} + +impl Triplestore { + pub fn map_xml( + &mut self, + u8s: &mut Vec, + named_graph: &NamedGraph, + transient: bool, + ) -> Result<(), TriplestoreError> { + let mut reader = Reader::from_reader(Cursor::new(u8s.as_slice())); + reader.config_mut().trim_text(true); + + let mut pred_map: PredMap = HashMap::new(); + let mut datatypes_map = HashMap::new(); + let string_type = Arc::new(BaseRDFNodeType::Literal(xsd::STRING.into_owned())); + datatypes_map.insert(xsd::STRING.as_str().to_string(), string_type.clone()); + let mut prefix_map: HashMap = HashMap::new(); + let mut base_prefix = NamedNode::new_unchecked(DEFAULT_XML_DATA_PREFIX); + let mut stack: Vec = Vec::new(); + let mut buf = Vec::new(); + loop { + match reader + .read_event_into(&mut buf) + .map_err(|e| TriplestoreError::XMLError(e.to_string()))? + { + Event::Eof => break, + Event::Start(e) => { + let (subj, text_dt, + new_base_prefix, + introduced_prefixed_namespaces) = open_element( + e.name().as_ref(), + e.attributes(), + &mut stack, + &mut pred_map, + &mut prefix_map, + &mut datatypes_map, + &base_prefix + )?; + let previous_base_prefix = if let Some(new_base_prefix) = new_base_prefix { + let previous_base_prefix = base_prefix; + base_prefix = new_base_prefix; + Some(previous_base_prefix) + } else { + None + }; + stack.push(Frame { + subject: subj, + next_child: 1, + data_type: text_dt, + previous_base_prefix, + introduced_prefixed_namespaces, + }); + } + Event::Empty(e) => { + let _ = open_element( + e.name().as_ref(), + e.attributes(), + &mut stack, + &mut pred_map, + &mut prefix_map, + &mut datatypes_map, + &base_prefix + )?; + } + Event::End(_) => { + if let Some(Frame { + previous_base_prefix, + introduced_prefixed_namespaces, + .. + }) = stack.pop() + { + for (pre, overridden) in introduced_prefixed_namespaces { + if let Some(overridden) = overridden { + prefix_map.insert(pre, overridden); + } else { + prefix_map.remove(&pre); + } + } + if let Some(old) = previous_base_prefix { + base_prefix = old; + } + } + } + Event::Text(t) => { + let raw = t + .decode() + .map_err(|e| TriplestoreError::XMLError(e.to_string()))?; + let s = unescape(&raw) + .map_err(|e| TriplestoreError::XMLError(e.to_string()))? + .into_owned(); + if !s.is_empty() { + push_text_child(&s, &mut stack, &mut pred_map, string_type.as_ref())?; + } + } + Event::CData(t) => { + let s = std::str::from_utf8(t.as_ref()) + .map_err(|e| TriplestoreError::XMLError(e.to_string()))? + .to_string(); + if !s.is_empty() { + push_text_child(&s, &mut stack, &mut pred_map, string_type.as_ref())?; + } + } + _ => {} + } + buf.clear(); + } + + flush_pred_map(self, pred_map, named_graph, transient) + } +} + +fn open_element( + name: &[u8], + attrs: quick_xml::events::attributes::Attributes, + stack: &mut Vec, + pred_map: &mut PredMap, + prefix_map: &mut HashMap, + datatypes_map: &mut HashMap>, + base_prefix: &NamedNode, +) -> Result< + ( + String, + Option>, + Option, + Vec<(String, Option)>, + ), + TriplestoreError, +> { + let name = std::str::from_utf8(name).map_err(|e| TriplestoreError::XMLError(e.to_string()))?; + let subject = new_iri_subject(); + if let Some(parent) = stack.last_mut() { + let parent_subject = parent.subject.clone(); + let n = parent.next_child; + parent.next_child += 1; + let rdfi = rdf_n_property(n); + push_iri_object(pred_map, rdfi.as_str(), &parent_subject, &subject); + } else { + push_iri_object(pred_map, rdf::TYPE.as_str(), &subject, XML_ROOT); + } + let mut new_base_prefix = None; + let mut datatype = None; + let mut introduced_prefixed_namespaces = Vec::new(); + let mut attrs_vec = Vec::new(); + for attr in attrs { + let attr = attr.map_err(|e| TriplestoreError::XMLError(e.to_string()))?; + attrs_vec.push(attr); + } + let mut non_xmlns_attrs = Vec::new(); + for attr in &attrs_vec { + let key = attr.key.as_ref(); + let key = + std::str::from_utf8(key).map_err(|x| TriplestoreError::XMLError(x.to_string()))?; + let raw = std::str::from_utf8(&attr.value) + .map_err(|e| TriplestoreError::XMLError(e.to_string()))?; + let value = unescape(raw) + .map_err(|e| TriplestoreError::XMLError(e.to_string()))? + .into_owned(); + if key == "xmlns" { + let nn = NamedNode::new(value.clone()).map_err(|e| { + TriplestoreError::XMLError(format!("Error parsing {}: {}", value, e.to_string())) + })?; + new_base_prefix = Some(nn); + } else if key.starts_with("xmlns:") { + let pre = key.strip_prefix("xmlns:").unwrap(); + let nn = NamedNode::new(value.clone()).map_err(|e| { + TriplestoreError::XMLError(format!("Error parsing {}: {}", value, e.to_string())) + })?; + + let previous = prefix_map.insert(pre.to_string(), nn); + introduced_prefixed_namespaces.push((pre.to_string(), previous)); + } else { + non_xmlns_attrs.push(attr); + } + } + let use_base_prefix= new_base_prefix.as_ref().unwrap_or(base_prefix); + + for attr in non_xmlns_attrs { + let key = attr.key.as_ref(); + let key = + std::str::from_utf8(key).map_err(|x| TriplestoreError::XMLError(x.to_string()))?; + let raw = std::str::from_utf8(&attr.value) + .map_err(|e| TriplestoreError::XMLError(e.to_string()))?; + let value = unescape(raw) + .map_err(|e| TriplestoreError::XMLError(e.to_string()))? + .into_owned(); + if key == "type" { + let pred = qname_to_iri(&value, prefix_map, use_base_prefix)?; + + if let Some(dt) = datatypes_map.get(value.as_str()) { + datatype = Some(dt.clone()); + } else { + let dt = Arc::new(BaseRDFNodeType::Literal(pred.clone())); + datatypes_map.insert(value.as_str().to_string(), dt.clone()); + datatype = Some(dt); + } + } else { + let pred = qname_to_iri(key, prefix_map, use_base_prefix)?; + + push_typed_text( + pred_map, + pred.as_str(), + &subject, + &value, + datatypes_map.get(xsd::STRING.as_str()).unwrap(), + )?; + } + } + let type_iri = qname_to_iri( + name, + prefix_map, + use_base_prefix, + )?; + push_iri_object(pred_map, rdf::TYPE.as_str(), &subject, type_iri.as_str()); + + Ok(( + subject, + datatype, + new_base_prefix, + introduced_prefixed_namespaces, + )) +} + +fn push_text_child( + text: &str, + stack: &mut Vec, + pred_map: &mut PredMap, + string_type: &BaseRDFNodeType, +) -> Result<(), TriplestoreError> { + if text.is_empty() { + return Ok(()); + } + let Some(frame) = stack.last_mut() else { + return Ok(()); + }; + let n = frame.next_child; + frame.next_child += 1; + let subject = frame.subject.clone(); + let rdfi = rdf_n_property(n); + let t = frame + .data_type + .as_ref() + .map(|x| x.as_ref()) + .unwrap_or(string_type); + push_typed_text(pred_map, rdfi.as_str(), &subject, text, t)?; + Ok(()) +} + +fn push_typed_text( + pred_map: &mut PredMap, + predicate: &str, + subject: &str, + text: &str, + data_type: &BaseRDFNodeType, +) -> Result<(), TriplestoreError> { + let pair = ensure_pair(pred_map, predicate, &BaseRDFNodeType::IRI, &data_type); + match pair.1.parse_literal(text.trim(), None) { + Ok(()) => { + pair.0.push_str(subject); + Ok(()) + } + Err(e) => Err(TriplestoreError::XMLError(e.to_string())), + } +} + + +fn flush_pred_map( + triples: &mut Triplestore, + pred_map: PredMap, + named_graph: &NamedGraph, + transient: bool, +) -> Result<(), TriplestoreError> { + let mut tta_vec = Vec::new(); + for (pred_iri, sub_map) in pred_map { + let predicate = NamedNode::new_unchecked(pred_iri); + for (subject_type, obj_map) in sub_map { + for (object_type, (sb, ob)) in obj_map { + if sb.is_empty() { + continue; + } + let mut subject_ser = sb.into_series(SUBJECT_COL_NAME); + subject_ser.rename(PlSmallStr::from_str(SUBJECT_COL_NAME)); + let mut object_ser = ob.into_series(OBJECT_COL_NAME); + object_ser.rename(PlSmallStr::from_str(OBJECT_COL_NAME)); + let len = subject_ser.len(); + let df = DataFrame::new( + len, + vec![subject_ser.into_column(), object_ser.into_column()], + ) + .unwrap(); + let subject_state = subject_type.default_input_cat_state(); + let object_state = object_type.default_input_cat_state(); + tta_vec.push(TriplesToAdd { + df, + subject_type: subject_type.clone(), + object_type: object_type.clone(), + predicate: Some(predicate.clone()), + graph: named_graph.clone(), + subject_cat_state: subject_state, + object_cat_state: object_state, + predicate_cat_state: None, + }); + } + } + } + triples.add_triples_vec(tta_vec, transient)?; + Ok(()) +} + + +fn push_iri_object(pred_map: &mut PredMap, predicate: &str, subject: &str, object: &str) { + let pair = ensure_pair( + pred_map, + predicate, + &BaseRDFNodeType::IRI, + &BaseRDFNodeType::IRI, + ); + pair.0.push_str(subject); + pair.1.push_str(object); +} + +fn new_iri_subject() -> String { + format!("urn:maplib:{}", uuid::Uuid::new_v4()) +} + +fn rdf_n_property(n: usize) -> NamedNode { + NamedNode::new_unchecked(format!("{}_{}", RDF_PREFIX_IRI, n)) +} + +fn qname_to_iri( + name: &str, + prefix_map: &mut HashMap, + base_prefix: &NamedNode, +) -> Result { + let nn = if let Some((pre, suf)) = name.split_once(':') { + if let Some(pre_nn) = prefix_map.get(pre) { + format!("{}{}", pre_nn.as_str(), suf) + } else { + return Err(TriplestoreError::XMLError(format!( + "Could not find prefix {pre}" + ))); + } + } else { + format!("{}{name}", base_prefix.as_str()) + }; + NamedNode::new(&nn) + .map_err(|e| TriplestoreError::XMLError(format!("Error parsing IRI {}: {}", nn, e))) +} diff --git a/lib/triplestore/src/sparql/lazy_graph_patterns.rs b/lib/triplestore/src/sparql/lazy_graph_patterns.rs index 406a7773..f1936d8f 100644 --- a/lib/triplestore/src/sparql/lazy_graph_patterns.rs +++ b/lib/triplestore/src/sparql/lazy_graph_patterns.rs @@ -20,14 +20,13 @@ use crate::sparql::errors::SparqlError; use tracing::{instrument, trace, warn}; use crate::sparql::lazy_graph_patterns::triples_ordering::order_triple_patterns; -use polars::prelude::{lit, IntoLazy, JoinType}; +use polars::prelude::{IntoLazy, JoinType}; use polars_core::frame::DataFrame; use query_processing::graph_patterns::join; use query_processing::pushdowns::Pushdowns; use representation::dataset::QueryGraph; use representation::query_context::{Context, PathEntry}; use representation::solution_mapping::{EagerSolutionMappings, SolutionMappings}; -use representation::BaseRDFNodeType; use spargebra::algebra::GraphPattern; use std::collections::HashMap; diff --git a/lib/triplestore/src/sparql/lazy_graph_patterns/join.rs b/lib/triplestore/src/sparql/lazy_graph_patterns/join.rs index 85ba07c3..9f705ee2 100644 --- a/lib/triplestore/src/sparql/lazy_graph_patterns/join.rs +++ b/lib/triplestore/src/sparql/lazy_graph_patterns/join.rs @@ -275,45 +275,6 @@ pub fn order_graph_patterns<'a>( ordering } -// Metaphor here is that quantity is cost to include, so less is better. -fn strictly_before( - gp1: &usize, - gp2: &usize, - visited: &HashSet, - candidate_gps: &HashMap, - candidate_bad_properties: &HashMap, -) -> Ordering { - let t1_connected = is_connected(candidate_gps.get(gp1).unwrap(), visited); - let t2_connected = is_connected(candidate_gps.get(gp2).unwrap(), visited); - if t1_connected && !t2_connected { - return Ordering::Less; - } - if !t1_connected && t2_connected { - return Ordering::Greater; - } - let bp1 = candidate_bad_properties.get(gp1).unwrap(); - let bp2 = candidate_bad_properties.get(gp2).unwrap(); - bp1.partial_cmp(bp2).unwrap_or(Ordering::Equal) -} - -fn is_connected(gp: &GraphPattern, visited: &HashSet) -> bool { - let gp_vars = variables(gp); - for v in &gp_vars { - if visited.contains(v.as_str()) { - return true; - } - } - false -} - -fn variables(gp: &GraphPattern) -> HashSet { - let mut vs = HashSet::new(); - gp.on_in_scope_variable(|x| { - vs.insert(x.as_str().to_string()); - }); - vs -} - fn bad_properties( gp: &GraphPattern, incoming_cols: Option<&HashSet>, diff --git a/lib/triplestore/src/triples_read.rs b/lib/triplestore/src/triples_read.rs index 70944efb..4d1de986 100644 --- a/lib/triplestore/src/triples_read.rs +++ b/lib/triplestore/src/triples_read.rs @@ -3,27 +3,24 @@ use crate::errors::TriplestoreError; use crate::TriplesToAdd; use std::cmp; -use cimxml_import::fix_cim_quad; +use cimxml_import::{fix_cim_quad, Remapper}; use memmap2::MmapOptions; -use oxrdf::{BlankNode, GraphName, NamedNode, NamedOrBlankNode, Quad, Term}; +use oxrdf::{BlankNode, GraphName, NamedNode, NamedOrBlankNode, Quad, Term, Triple}; use oxrdfio::{ JsonLdProfileSet, LoadedDocument, RdfFormat, RdfParser, RdfSyntaxError, SliceQuadParser, }; use oxttl::ntriples::SliceNTriplesParser; use oxttl::turtle::SliceTurtleParser; use oxttl::{NTriplesParser, TurtleParser}; -use polars::prelude::{as_struct, col, DataFrame, IntoLazy, LiteralValue, PlSmallStr, Series}; -use polars_core::prelude::{IntoColumn, Scalar}; +use polars::prelude::{concat, DataFrame, IntoLazy, UnionArgs}; +use polars_core::prelude::{IntoColumn}; use rayon::iter::ParallelIterator; -use rayon::prelude::{IntoParallelIterator, IntoParallelRefIterator}; +use rayon::prelude::{IntoParallelIterator}; use representation::dataset::NamedGraph; -use representation::rdf_to_polars::{ - polars_literal_values_to_series, rdf_literal_to_polars_literal_value, - rdf_owned_blank_node_to_polars_literal_value, rdf_owned_named_node_to_polars_literal_value, -}; +use representation::series_builder::{BySubjectType, PredMap}; use representation::{ - get_subject_datatype_ref, get_term_datatype_ref, BaseRDFNodeType, LANG_STRING_LANG_FIELD, - LANG_STRING_VALUE_FIELD, + get_subject_datatype_ref, get_term_datatype_ref, BaseRDFNodeType, BaseRDFNodeTypeRef, + SeriesBuilder }; use representation::{OBJECT_COL_NAME, SUBJECT_COL_NAME}; use std::collections::HashMap; @@ -34,8 +31,6 @@ use tracing::{debug, instrument}; const UTF8_BOM: [u8; 3] = [0xEF, 0xBB, 0xBF]; -type MapType = HashMap, Vec)>>; - #[derive(Eq, PartialEq, Debug, Clone)] pub enum ExtendedRdfFormat { Normal(RdfFormat), @@ -155,11 +150,16 @@ impl Triplestore { let parallel = if let Some(parallel) = parallel { parallel } else { - rdf_format == ExtendedRdfFormat::Normal(RdfFormat::NTriples) + matches!( + rdf_format, + ExtendedRdfFormat::Normal(RdfFormat::NTriples) + ) }; - let mut readers = if (rdf_format == ExtendedRdfFormat::Normal(RdfFormat::Turtle) - || rdf_format == ExtendedRdfFormat::Normal(RdfFormat::NTriples)) - && parallel + let mut readers = if matches!( + rdf_format, + ExtendedRdfFormat::Normal(RdfFormat::NTriples) + | ExtendedRdfFormat::Normal(RdfFormat::Turtle) + ) && parallel { let threads = if let Ok(threads) = std::thread::available_parallelism() { threads.get() @@ -241,7 +241,7 @@ impl Triplestore { let reader_batch_size = triples_batch_size / cmp::max(1, readers.len()); let readers_predicate_maps: Vec<_> = readers .into_par_iter() - .map(|r| create_predicate_map(r, &parser_call, reader_batch_size)) + .map(|r| create_predicate_map(r, &rdf_format, &parser_call, reader_batch_size)) .collect(); let mut updated_readers = vec![]; @@ -254,120 +254,122 @@ impl Triplestore { predicate_maps.push(predicate_map); } readers = updated_readers; - // We are remapping the datatypes here - if rdf_format == ExtendedRdfFormat::CIMXML { - predicate_maps = predicate_maps - .into_iter() - .map(|x| cimxml_import::remap_predicate_datatype(x)) - .collect(); - } - let mut par_graph_predicate_map: HashMap<(GraphName, String), Vec> = - HashMap::new(); - for m in predicate_maps { - for (graph_name, map) in m { - for (predicate, map) in map { - let key = (graph_name.clone(), predicate); - if let Some(v) = par_graph_predicate_map.get_mut(&key) { - v.push(map); - } else { - par_graph_predicate_map.insert(key, vec![map]); - } - } - } - } + let mut all_builders: Vec<( + NamedGraph, + NamedNode, + BaseRDFNodeType, + BaseRDFNodeType, + SeriesBuilder, + SeriesBuilder, + )> = Vec::new(); - let predicate_map: HashMap<(GraphName, String), MapType> = par_graph_predicate_map - .into_par_iter() - .map(|((graph_name, predicate), maps)| { - let mut subject_map: MapType = HashMap::new(); - for new_subject_map in maps { - for (subject_dt, new_object_map) in new_subject_map { - if let Some(object_map) = subject_map.get_mut(&subject_dt) { - for (object_dt, (new_subjects, new_objects)) in new_object_map { - if let Some((subjects, objects)) = - object_map.get_mut(&object_dt) - { - subjects.extend(new_subjects); - objects.extend(new_objects); - } else { - object_map.insert(object_dt, (new_subjects, new_objects)); - } - } - } else { - subject_map.insert(subject_dt, new_object_map); + for map in predicate_maps.into_iter() { + for (gr, pred_map) in map { + let use_graph = if matches!(graph, NamedGraph::DefaultGraph) { + NamedGraph::from(&gr) + } else { + graph.clone() + }; + for (pred, bst) in pred_map.into_iter() { + let pred_nn = NamedNode::new_unchecked(pred); + for (subj, bot) in bst { + for (obj, (subjects, objects)) in bot { + all_builders.push(( + use_graph.clone(), + pred_nn.clone(), + subj.clone(), + obj.clone(), + subjects, + objects, + )); } } } - ((graph_name, predicate), subject_map) - }) - .collect(); - + } + } debug!( "Processing quads took {} seconds", start_quadproc_now.elapsed().as_secs_f64() ); let start_tripleproc_now = Instant::now(); - let triples_to_add: Vec<_> = predicate_map + let triples_to_add: Vec<_> = all_builders .into_par_iter() - .map(|((graph_name, predicate), map)| { - let mut triples_to_add = vec![]; - for (subject_dt, obj_map) in map { - let subject_dt = BaseRDFNodeType::from_string(subject_dt); - for (object_dt, (subjects, objects)) in obj_map { - let object_dt = BaseRDFNodeType::from_string(object_dt); - let strings_iter = subjects.into_iter().map(|s| match s { - NamedOrBlankNode::NamedNode(nn) => nn.into_string(), - NamedOrBlankNode::BlankNode(bl) => bl.into_string(), - }); - let mut subjects_ser = Series::from_iter(strings_iter); - subjects_ser.rename(SUBJECT_COL_NAME.into()); - let len = subjects_ser.len(); - - let objects_ser = - particular_term_vec_to_series(objects, object_dt.clone()); + .map( + |(graph, predicate, subject_type, object_type, subjects, objects)| { + let l = subjects.len(); + let df = DataFrame::new( + l, + vec![ + subjects.into_series(SUBJECT_COL_NAME).into_column(), + objects.into_series(OBJECT_COL_NAME).into_column(), + ], + ) + .unwrap(); + TriplesToAdd { + df, + subject_type: subject_type.clone(), + object_type: object_type.clone(), + predicate: Some(predicate), + graph, + subject_cat_state: subject_type.default_input_cat_state(), + predicate_cat_state: None, + object_cat_state: subject_type.default_input_cat_state(), + } + }, + ) + .collect(); - let all_series = - vec![subjects_ser.into_column(), objects_ser.into_column()]; - let mut df = DataFrame::new(len, all_series).unwrap(); - // TODO: Include bad data also - df = df - .drop_nulls(Some(&[ - SUBJECT_COL_NAME.to_string(), - OBJECT_COL_NAME.to_string(), - ])) - .unwrap(); - // Are we overriding the default graph? - let use_graph = if matches!(graph, NamedGraph::DefaultGraph) { - NamedGraph::from(&graph_name) - } else { - graph.clone() - }; + let mut tta_map = HashMap::new(); + for triple in triples_to_add.into_iter() { + let k = (triple.graph.clone(), triple.predicate.clone(), triple.subject_type.clone(), triple.object_type.clone()); + if !tta_map.contains_key(&k) { + tta_map.insert(k.clone(), Vec::new()); + } + tta_map.get_mut(&k).unwrap().push(triple); + } - triples_to_add.push(TriplesToAdd { - df, - subject_type: subject_dt.clone(), - object_type: object_dt.clone(), - predicate: Some(NamedNode::new_unchecked(predicate.clone())), - graph: use_graph, - subject_cat_state: subject_dt.default_input_cat_state(), - predicate_cat_state: None, - object_cat_state: object_dt.default_input_cat_state(), - }); - } + let ttas:Vec<_> = tta_map.into_par_iter().map(|(k,mut ttas)| { + if ttas.len() == 1 { + ttas.pop().unwrap() + } else { + let (graph, predicate, subject_type, object_type) = k; + let mut lfs = Vec::with_capacity(ttas.len()); + for tta in ttas { + lfs.push(tta.df.lazy()); } - triples_to_add - }) - .collect(); - let triples_to_add: Vec<_> = triples_to_add.into_iter().flatten().collect(); + let df = concat(lfs, UnionArgs { + parallel: true, + rechunk: false, + to_supertypes: false, + diagonal: false, + strict: false, + from_partitioned_ds: false, + maintain_order: false, + }).unwrap().collect().unwrap(); + let subject_cat_state = subject_type.default_input_cat_state(); + let object_cat_state = object_type.default_input_cat_state(); + TriplesToAdd { + df, + subject_type, + object_type, + predicate, + graph, + subject_cat_state, + object_cat_state, + predicate_cat_state: None, + } + } + }).collect(); + debug!( "Creating the triples to add as DFs took {} seconds", start_tripleproc_now.elapsed().as_secs_f64() ); let start_add_triples_vec = Instant::now(); self.parser_call += 1; - self.add_triples_vec(triples_to_add, transient)?; + self.add_triples_vec(ttas, transient)?; debug!( "Adding triples vec took {} seconds", start_add_triples_vec.elapsed().as_secs_f64() @@ -399,16 +401,25 @@ fn blank_node_to_oxrdf_blank_node(bn: BlankNode, parser_call: &str) -> BlankNode fn create_predicate_map<'a>( mut r: MyFromSliceQuadReader<'a>, + rdf_format: &ExtendedRdfFormat, parser_call: &str, max_iterations: usize, ) -> Result< ( Option>, - HashMap>, + HashMap, ), TriplestoreError, > { + let cim_remapper = if matches!(rdf_format, ExtendedRdfFormat::CIMXML) { + Some(Remapper::new()) + } else { + None + }; let mut graph_predicate_map = HashMap::new(); + let mut subj_type_map: HashMap = HashMap::new(); + let mut obj_type_map: HashMap = HashMap::new(); + let mut unparseable = Vec::new(); let mut empty_iter = false; let mut reached_max = false; let mut i = 0usize; @@ -427,37 +438,49 @@ fn create_predicate_map<'a>( graph_predicate_map.insert(graph_name.clone(), HashMap::new()); graph_predicate_map.get_mut(&graph_name).unwrap() }; - let type_map: &mut HashMap<_, HashMap<_, (Vec, Vec)>> = + let type_map: &mut BySubjectType = if let Some(type_map) = predicate_map.get_mut(predicate.as_str()) { type_map } else { - let predicate_key = predicate.into_string(); + let predicate_key = predicate.as_str().to_string(); predicate_map.insert(predicate_key.clone(), HashMap::new()); predicate_map.get_mut(&predicate_key).unwrap() }; let subject_to_insert = subject_to_oxrdf_subject(subject, parser_call); let object_to_insert = term_to_oxrdf_term(object, parser_call); - let subject_datatype = get_subject_datatype_ref(&subject_to_insert); - let object_datatype = get_term_datatype_ref(&object_to_insert); - if let Some(obj_type_map) = type_map.get_mut(subject_datatype.as_str()) { - if let Some((subjects, objects)) = obj_type_map.get_mut(object_datatype.as_str()) { - subjects.push(subject_to_insert); - objects.push(object_to_insert); - } else { - obj_type_map.insert( - object_datatype.as_str().to_string(), - (vec![subject_to_insert], vec![object_to_insert]), - ); - } + let subject_ref_datatype = get_subject_datatype_ref(&subject_to_insert); + let object_ref_datatype = get_term_datatype_ref(&object_to_insert); + //Remap cim here + let object_ref_datatype = if let Some(remapper) = &cim_remapper { + remapper.remap_predicate_datatype(&predicate, &object_ref_datatype) } else { - let mut obj_type_map = HashMap::new(); - let subject_datatype_string = subject_datatype.as_str().to_string(); + object_ref_datatype + }; + let subject_type = get_or_insert_dt(subject_ref_datatype, &mut subj_type_map); + let object_type = get_or_insert_dt(object_ref_datatype, &mut obj_type_map); + if !type_map.contains_key(&subject_type) { + type_map.insert(subject_type.clone(), HashMap::new()); + } + + let obj_type_map = type_map.get_mut(&subject_type).unwrap(); + if !obj_type_map.contains_key(&object_type) { obj_type_map.insert( - object_datatype.as_str().to_string(), - (vec![subject_to_insert], vec![object_to_insert]), + object_type.clone(), + ( + SeriesBuilder::new(&subject_type), + SeriesBuilder::new(&object_type), + ), ); - type_map.insert(subject_datatype_string, obj_type_map); + } + let (subjects, objects) = obj_type_map.get_mut(&object_type).unwrap(); + match objects.parse_term(&object_to_insert) { + Ok(()) => { + subjects.push_named_or_blank(&subject_to_insert); + } + Err(e) => { + unparseable.push((Triple::new(subject_to_insert, predicate, object_to_insert), e)); + } } } else { empty_iter = true; @@ -512,60 +535,15 @@ impl Iterator for MyFromSliceQuadReader<'_> { } } -// These terms must be of the given dt -fn particular_term_vec_to_series(term_vec: Vec, dt: BaseRDFNodeType) -> Series { - if dt.is_lang_string() { - let langs = term_vec - .par_iter() - .map(|t| match t { - Term::Literal(l) => LiteralValue::Scalar(Scalar::from(PlSmallStr::from_string( - l.language().unwrap().to_string(), - ))), - _ => panic!("Should never happen"), - }) - .collect(); - let vals = term_vec - .into_par_iter() - .map(|t| match t { - Term::Literal(l) => { - let (s, _, _) = l.destruct(); - LiteralValue::Scalar(Scalar::from(PlSmallStr::from_string(s))) - } - _ => panic!("Should never happen"), - }) - .collect(); - - let val_ser = polars_literal_values_to_series(vals, LANG_STRING_VALUE_FIELD); - let lang_ser = polars_literal_values_to_series(langs, LANG_STRING_LANG_FIELD); - let mut df = DataFrame::new(val_ser.len(), vec![val_ser.into(), lang_ser.into()]) - .unwrap() - .lazy() - .with_column( - as_struct(vec![ - col(LANG_STRING_VALUE_FIELD), - col(LANG_STRING_LANG_FIELD), - ]) - .alias(OBJECT_COL_NAME), - ) - .collect() - .unwrap(); - df.drop_in_place(OBJECT_COL_NAME) - .unwrap() - .take_materialized_series() +fn get_or_insert_dt( + base_rdfnode_type_ref: BaseRDFNodeTypeRef, + type_map: &mut HashMap, +) -> BaseRDFNodeType { + if let Some(t) = type_map.get(base_rdfnode_type_ref.as_str()) { + t.clone() } else { - let use_dt = if let BaseRDFNodeType::Literal(l) = &dt { - Some(l.as_ref()) - } else { - None - }; - let any_iter: Vec<_> = term_vec - .into_par_iter() - .map(|t| match t { - Term::NamedNode(nn) => rdf_owned_named_node_to_polars_literal_value(nn), - Term::BlankNode(bb) => rdf_owned_blank_node_to_polars_literal_value(bb), - Term::Literal(l) => rdf_literal_to_polars_literal_value(&l, use_dt.clone()), - }) - .collect(); - polars_literal_values_to_series(any_iter, OBJECT_COL_NAME) + let owned = base_rdfnode_type_ref.clone().to_owned(); + type_map.insert(base_rdfnode_type_ref.as_str().to_string(), owned.into_owned()); + type_map.get(base_rdfnode_type_ref.as_str()).unwrap().clone() } } diff --git a/licensing/SPARQL_ANYTHING_LICENSE b/licensing/SPARQL_ANYTHING_LICENSE new file mode 100644 index 00000000..ec7f9ee5 --- /dev/null +++ b/licensing/SPARQL_ANYTHING_LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2024 SPARQL Anything Contributors @ http://github.com/sparql-anything + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/py_maplib/maplib/__init__.pyi b/py_maplib/maplib/__init__.pyi index 6d683a27..0af09a48 100644 --- a/py_maplib/maplib/__init__.pyi +++ b/py_maplib/maplib/__init__.pyi @@ -577,6 +577,27 @@ class Model: :param transient: Should the triples be included when serializing the graph? """ + def map_xml( + self, + path_or_string: str, + graph: str = None, + transient: bool = True, + ) -> None: + """ + Map an XML file or string to triples. + Usage: + + >>> m.map_xml("my_doc.xml") + + or: + + >>> m.map_xml('value') + + :param path_or_string: Path to an XML document or an XML string. + :param graph: The IRI of the graph to add triples to. None is the default graph. + :param transient: Should the triples be included when serializing the graph? + """ + def map_triples( self, data: Union[DataFrame, "SolutionMappings"] = None, @@ -981,6 +1002,14 @@ class Model: :param graph: The IRI of the graph to write. """ + def truncate_graph(self, graph: str = None) -> None: + """ + Removes all triples associated with the given graph from the triplestore, + includes transient triples and full-text search entries. + + :param graph: The IRI of the graph to truncate. + """ + def detach_graph(self, graph:str=None, preserve_name:bool=False) -> "Model": """ Detaches and returns a named graph as their own Model object. diff --git a/py_maplib/src/lib.rs b/py_maplib/src/lib.rs index 4308c1ed..4cca8107 100644 --- a/py_maplib/src/lib.rs +++ b/py_maplib/src/lib.rs @@ -275,6 +275,21 @@ impl PyModel { }) } + #[pyo3(signature = (path_or_string, graph=None, transient=None))] + #[instrument(skip_all)] + fn map_xml( + &self, + py: Python<'_>, + path_or_string: String, + graph: Option, + transient: Option, + ) -> PyResult<()> { + py.detach(move || { + let mut inner = self.inner.lock().unwrap(); + map_xml_mutex(&mut inner, path_or_string, graph, transient) + }) + } + #[pyo3(signature = (data, predicate=None, graph=None, validate_iris=None))] #[instrument(skip_all)] fn map_triples( @@ -1140,6 +1155,37 @@ fn map_json_mutex( Ok(()) } +fn map_xml_mutex( + inner: &mut MutexGuard, + string_or_path: String, + graph: Option, + transient: Option, +) -> PyResult<()> { + let graph = parse_optional_named_node(graph)?; + let named_graph = NamedGraph::from_maybe_named_node(graph.as_ref()); + + let is_xml_string = string_or_path.trim_start().starts_with('<'); + if is_xml_string { + inner + .map_xml_string( + string_or_path, + &named_graph, + transient.unwrap_or(DEFAULT_MAP_TO_TRANSIENT), + ) + .map_err(PyMaplibError::from)?; + } else { + let p = PathBuf::from(string_or_path); + inner + .map_xml_path( + p.as_ref(), + &named_graph, + transient.unwrap_or(DEFAULT_MAP_TO_TRANSIENT), + ) + .map_err(PyMaplibError::from)?; + } + Ok(()) +} + fn map_triples_mutex( inner: &mut MutexGuard, df: DataFrame, @@ -1943,3 +1989,7 @@ pub fn data_to_mappings_types( Ok((df, None)) } } + +fn truncate_graph(graph: &NamedNode) { + +} \ No newline at end of file diff --git a/py_maplib/tests/test_multi_expressions.py b/py_maplib/tests/test_multi_expressions.py index 060b67b0..943695e7 100644 --- a/py_maplib/tests/test_multi_expressions.py +++ b/py_maplib/tests/test_multi_expressions.py @@ -663,6 +663,27 @@ def test_generate_uuids(streaming): assert df.get_column("uuid").unique().len() == 3 + +@pytest.mark.parametrize("streaming", [True, False]) +def test_now(streaming): + m = Model() + sm = m.query( + """ + PREFIX : + PREFIX xsd: + SELECT ?now WHERE { + BIND(now() as ?now) + } + """, + solution_mappings=True, + streaming=streaming, + ) + assert sm.rdf_types == { + "now": RDFType.Literal("http://www.w3.org/2001/XMLSchema#dateTime"), + } + assert sm.mappings.height == 1 + + @pytest.mark.parametrize("streaming", [True, False]) def test_generate_iri_all_strings(streaming): m = Model() diff --git a/py_maplib/tests/test_xml.py b/py_maplib/tests/test_xml.py new file mode 100644 index 00000000..26d3067e --- /dev/null +++ b/py_maplib/tests/test_xml.py @@ -0,0 +1,122 @@ +import pathlib + +import pytest + +from .disk import disk_params +from maplib import Model + +PATH_HERE = pathlib.Path(__file__).parent +TESTDATA_PATH = PATH_HERE / "testdata" / "xmls" + + +@pytest.mark.parametrize("disk", disk_params()) +def test_map_xml_1(disk): + xml_1 = TESTDATA_PATH / "1.xml" + m = Model(storage_folder=disk) + m.map_xml(str(xml_1)) + df = m.query("""SELECT * WHERE {?s ?p ?o}""") + assert df.height > 0 + + df2 = m.query( + """ + PREFIX fx: + PREFIX xyz: + PREFIX rdf: + + SELECT ?title WHERE { + ?root a fx:root . + ?root rdf:_1 ?glossary . + ?glossary a xyz:glossary . + ?glossary rdf:_1 ?title_node . + ?title_node a xyz:title . + ?title_node rdf:_1 ?title . + } + """ + ) + assert df2.height == 1 + assert df2.get_column("title")[0] == "example glossary" + + +@pytest.mark.parametrize("disk", disk_params()) +def test_map_xml_2_repeated_children(disk): + xml_2 = TESTDATA_PATH / "2.xml" + m = Model(storage_folder=disk) + m.map_xml(str(xml_2)) + + df_items = m.query( + """ + PREFIX fx: + PREFIX xyz: + + SELECT ?items WHERE { + ?root a fx:root . + ?root ?p1 ?menu . + ?menu a xyz:menu . + ?menu ?p2 ?items . + ?items a xyz:items . + } + """ + ) + assert df_items.height == 22 + + +def test_map_xml_3_inline_string(): +#From https://github.com/SPARQL-Anything/sparql.anything/tree/v1.2-DEV/sparql-anything-xml/src/test/java/io/github/sparqlanything/xml +#Apache licensed, see licensing folder. + xml = """ + + abc + def +""" + m = Model() + m.map_xml(xml) + + df = m.query( + """ + PREFIX fx: + PREFIX ex: + PREFIX rdf: + + SELECT ?val WHERE { + ?root a fx:root . + ?root ?p ?name . + ?name a ex:name . + ?name rdf:_1 ?val . + } + ORDER BY ?val + """ + ) + assert df.height == 2 + values = sorted(df.get_column("val").to_list()) + assert values == ["abc", "def"] + + +def test_map_xml_attributes(): + # From https://github.com/SPARQL-Anything/sparql.anything/tree/v1.2-DEV/sparql-anything-xml/src/test/java/io/github/sparqlanything/xml + # Apache licensed, see licensing folder + xml = ( + '' + '' + '' + "" + ) + m = Model() + m.map_xml(xml) + df = m.query( + """ + PREFIX fx: + PREFIX xyz: + PREFIX ex: + + SELECT ?id ?name WHERE { + ?root a fx:root . + ?root ?p ?item . + ?item a xyz:item . + ?item ex:id ?id . + ?item xyz:name ?name . + } + """ + ) + assert df.height == 1 + assert df.get_column("id")[0] == "42" + assert df.get_column("name")[0] == "hello" diff --git a/py_maplib/tests/testdata/read_ntriples_prefix_escaped.nt b/py_maplib/tests/testdata/read_ntriples_prefix_escaped.nt index f46d456f..78a47b67 100644 --- a/py_maplib/tests/testdata/read_ntriples_prefix_escaped.nt +++ b/py_maplib/tests/testdata/read_ntriples_prefix_escaped.nt @@ -6,5 +6,6 @@ _:person_l0_p0_r0 "Ann" . _:person_l0_p0_r1 "Bob" . _:person_l0_p0_r0 . _:person_l0_p0_r1 . -_:person_l0_p0_r1 . - +_:person_l0_p0_r1 . +_:person_l0_p0_r1 . +_:person_l0_p0_r1 . diff --git a/py_maplib/tests/testdata/xmls/1.xml b/py_maplib/tests/testdata/xmls/1.xml new file mode 100644 index 00000000..b1bc6c20 --- /dev/null +++ b/py_maplib/tests/testdata/xmls/1.xml @@ -0,0 +1,24 @@ + + + + example glossary + + S + + + SGML + SGML + Standard Generalized Markup Language + SGML + ISO 8879:1986 + + A meta-markup language, used to create markup languages such as DocBook. + GML + XML + + markup + + + + + diff --git a/py_maplib/tests/testdata/xmls/2.xml b/py_maplib/tests/testdata/xmls/2.xml new file mode 100644 index 00000000..b4cf43b6 --- /dev/null +++ b/py_maplib/tests/testdata/xmls/2.xml @@ -0,0 +1,76 @@ + + + +
SVG Viewer
+ + Open + + + OpenNew + + + + + ZoomIn + + + + ZoomOut + + + + OriginalView + + + + + Quality + + + Pause + + + Mute + + + + Find + + + + FindAgain + + + + Copy + + + CopyAgain + + + + CopySVG + + + + ViewSVG + + + + ViewSource + + + + SaveAs + + + + + Help + + + About + + +
+
diff --git a/py_maplib/tests/testdata/xmls/3.xml b/py_maplib/tests/testdata/xmls/3.xml new file mode 100644 index 00000000..30be33b0 --- /dev/null +++ b/py_maplib/tests/testdata/xmls/3.xml @@ -0,0 +1,98 @@ + + + + cofaxCDS + org.cofax.cds.CDSServlet + + Philadelphia, PA + ksm@pobox.com + Cofax + /images/cofax.gif + /content/static + org.cofax.WysiwygTemplate + org.cofax.FilesTemplateLoader + templates + + listTemplate.htm + articleTemplate.htm + false + listTemplate.jsp + articleTemplate.jsp + 200 + 200 + 60 + 100 + 50 + 15 + 200 + 100 + 10 + 10 + forSearchEnginesList.htm + forSearchEngines.htm + WEB-INF/robots.db + true + org.cofax.SqlDataStore + org.cofax.SqlRedirection + cofax + com.microsoft.jdbc.sqlserver.SQLServerDriver + jdbc:microsoft:sqlserver://LOCALHOST:1433;DatabaseName=goon + sa + dataStoreTestQuery + SET NOCOUNT ON;select test='test'; + /usr/local/tomcat/logs/datastore.log + 10 + 100 + 100 + debug + 500 + + + + cofaxEmail + org.cofax.cds.EmailServlet + + mail1 + mail2 + + + + cofaxAdmin + org.cofax.cds.AdminServlet + + + fileServlet + org.cofax.cds.FileServlet + + + cofaxTools + org.cofax.cms.CofaxToolsServlet + + toolstemplates/ + 1 + 3.5 + /usr/local/tomcat/logs/CofaxTools.log + + 1 + /usr/local/tomcat/logs/dataLog.log + + /content/admin/remove?cache=pages&id= + /content/admin/remove?cache=templates&id= + /usr/local/tomcat/webapps/content/fileTransferFolder + 1 + 4 + true + + + + / + /cofaxutil/aemail/* + /admin/* + /static/* + /tools/* + + + cofax.tld + /WEB-INF/tlds/cofax.tld + + \ No newline at end of file