diff --git a/Cargo.lock b/Cargo.lock index 5b989b4..290521b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4958,6 +4958,7 @@ dependencies = [ "sprs", "thiserror", "tracing", + "uri_encode", "utils", "uuid", ] diff --git a/lib/maplib/src/model.rs b/lib/maplib/src/model.rs index 49f7f62..23e46b5 100644 --- a/lib/maplib/src/model.rs +++ b/lib/maplib/src/model.rs @@ -228,6 +228,13 @@ impl Model { .map_err(MaplibError::TriplestoreError) } + #[instrument(skip_all)] + pub fn map_df(&mut self, df: &DataFrame, graph: &NamedGraph) -> Result<(), MaplibError> { + self.triplestore + .map_df(df, graph) + .map_err(MaplibError::TriplestoreError) + } + #[instrument(skip_all)] pub fn map_xml_path( &mut self, diff --git a/lib/triplestore/Cargo.toml b/lib/triplestore/Cargo.toml index e1b6866..a641779 100644 --- a/lib/triplestore/Cargo.toml +++ b/lib/triplestore/Cargo.toml @@ -38,6 +38,7 @@ simd-json.workspace = true serde_json.workspace = true ordered-float.workspace = true quick-xml.workspace = true +uri_encode.workspace = true pyo3 = { workspace = true, optional = true } diff --git a/lib/triplestore/src/lib.rs b/lib/triplestore/src/lib.rs index 8c02fbb..44e4869 100644 --- a/lib/triplestore/src/lib.rs +++ b/lib/triplestore/src/lib.rs @@ -4,6 +4,7 @@ extern crate core; pub mod cats; mod dblf; pub mod errors; +mod map_df; mod map_json; mod map_xml; pub mod native_parquet_write; diff --git a/lib/triplestore/src/map_df.rs b/lib/triplestore/src/map_df.rs new file mode 100644 index 0000000..9b248bd --- /dev/null +++ b/lib/triplestore/src/map_df.rs @@ -0,0 +1,132 @@ +use crate::errors::TriplestoreError; +use crate::{TriplesToAdd, Triplestore}; +use oxrdf::vocab::{rdf, xsd}; +use oxrdf::NamedNode; +use polars::prelude::{col, lit, DataFrame, IntoLazy}; +use polars_core::prelude::{Column, IntoColumn, NamedFrom, Series}; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; +use representation::constants::MAPLIB_PREFIX_IRI; +use representation::dataset::NamedGraph; +use representation::polars_to_rdf::polars_type_to_literal_type; +use representation::{BaseRDFNodeType, OBJECT_COL_NAME, SUBJECT_COL_NAME}; +use std::collections::HashMap; + +const FACADE_X_ROOT: &str = "http://sparql.xyz/facade-x/ns/root"; +const FACADE_X_CHILD: &str = "http://sparql.xyz/facade-x/ns/child"; +const FACADE_X_CHILD_NUMBER: &str = "http://sparql.xyz/facade-x/ns/childNumber"; +const DEFAULT_FACADE_X_DATA_PREFIX: &str = "http://sparql.xyz/facade-x/data/"; + +impl Triplestore { + pub fn map_df( + &mut self, + df: &DataFrame, + named_graph: &NamedGraph, + ) -> Result<(), TriplestoreError> { + let mut column_types = HashMap::new(); + let col_names: Vec<_> = df.columns().iter().map(|x| x.name().to_string()).collect(); + for c in df.columns() { + //Todo handle + let dt = polars_type_to_literal_type(c.dtype()).unwrap(); + column_types.insert(c.name().to_string(), dt); + } + let id_col = uuid::Uuid::new_v4().to_string(); + let mut df = df.clone(); + let root_node_uuri = format!("{}{}", MAPLIB_PREFIX_IRI, uuid::Uuid::new_v4().to_string()); + let uuids: Vec<_> = (0..df.height()) + .into_par_iter() + .map(|_| format!("{}{}", MAPLIB_PREFIX_IRI, uuid::Uuid::new_v4().to_string())) + .collect(); + df.with_column(Series::new(id_col.as_str().into(), uuids).into_column()) + .unwrap(); + let mut triples_to_add = Vec::new(); + for c in &col_names { + let subj_type = BaseRDFNodeType::IRI; + let obj_type = column_types.get(c).unwrap().clone(); + let subj_state = subj_type.default_input_cat_state(); + + if obj_type.is_multi() { + todo!() + } else { + let base_obj_type = obj_type.get_base_type().unwrap(); + let base_obj_state = obj_type.get_base_state().unwrap(); + let tta = TriplesToAdd { + df: df + .clone() + .lazy() + .select([ + col(id_col.clone()).alias(SUBJECT_COL_NAME), + col(c).alias(OBJECT_COL_NAME), + ]) + .collect() + .unwrap(), + subject_type: subj_type, + object_type: base_obj_type.clone(), + predicate: Some(NamedNode::new_unchecked(format!( + "{}{}", + DEFAULT_FACADE_X_DATA_PREFIX, + uri_encode::encode_uri(c) + ))), + graph: named_graph.clone(), + subject_cat_state: subj_state, + object_cat_state: base_obj_state.clone(), + predicate_cat_state: None, + }; + triples_to_add.push(tta); + } + } + let tta_children = TriplesToAdd { + df: df + .clone() + .lazy() + .select([ + lit(root_node_uuri.as_str()).alias(SUBJECT_COL_NAME), + col(id_col.clone()).alias(OBJECT_COL_NAME), + ]) + .collect() + .unwrap(), + subject_type: BaseRDFNodeType::IRI, + object_type: BaseRDFNodeType::IRI, + predicate: Some(NamedNode::new_unchecked(FACADE_X_CHILD.to_string())), + graph: named_graph.clone(), + subject_cat_state: BaseRDFNodeType::IRI.default_input_cat_state(), + object_cat_state: BaseRDFNodeType::IRI.default_input_cat_state(), + predicate_cat_state: None, + }; + let num_dt = BaseRDFNodeType::Literal(xsd::UNSIGNED_INT.into_owned()); + let tta_child_num = TriplesToAdd { + df: df + .clone() + .lazy() + .select([col(id_col).alias(SUBJECT_COL_NAME)]) + .with_row_index(OBJECT_COL_NAME.to_string(), None) + .collect() + .unwrap(), + subject_type: BaseRDFNodeType::IRI, + object_type: num_dt.clone(), + predicate: Some(NamedNode::new_unchecked(FACADE_X_CHILD_NUMBER.to_string())), + graph: named_graph.clone(), + subject_cat_state: BaseRDFNodeType::IRI.default_input_cat_state(), + object_cat_state: num_dt.default_input_cat_state(), + predicate_cat_state: None, + }; + + let mut root_cols = Vec::new(); + root_cols.push(Column::new(SUBJECT_COL_NAME.into(), vec![root_node_uuri])); + root_cols.push(Column::new(OBJECT_COL_NAME.into(), vec![FACADE_X_ROOT])); + let root = TriplesToAdd { + df: DataFrame::new(1, root_cols).unwrap(), + subject_type: BaseRDFNodeType::IRI, + object_type: BaseRDFNodeType::IRI, + predicate: Some(rdf::TYPE.into_owned()), + graph: named_graph.clone(), + subject_cat_state: BaseRDFNodeType::IRI.default_input_cat_state(), + object_cat_state: BaseRDFNodeType::IRI.default_input_cat_state(), + predicate_cat_state: None, + }; + triples_to_add.push(tta_children); + triples_to_add.push(tta_child_num); + triples_to_add.push(root); + self.add_triples_vec(triples_to_add, false)?; + Ok(()) + } +} diff --git a/py_maplib/maplib/__init__.pyi b/py_maplib/maplib/__init__.pyi index b0b5564..c3ef8bf 100644 --- a/py_maplib/maplib/__init__.pyi +++ b/py_maplib/maplib/__init__.pyi @@ -677,6 +677,24 @@ class Model: :return: The generated template """ + + def map_df( + self, + df: DataFrame, + graph: str = None, + ): + """ + Create a default template and map it based on a dataframe. + Usage: + + >>> df = pl.read_csv("my_csv.csv") + >>> m.map_df(df) + + :param df: DataFrame to map using Facade-X (using approximately the CSV-mapping) + :param graph: The IRI of the graph to add triples to. + :return: None + """ + def explore( self, host: str = "localhost", diff --git a/py_maplib/src/mutexes.rs b/py_maplib/src/mutexes.rs index f53b1da..a9fab78 100644 --- a/py_maplib/src/mutexes.rs +++ b/py_maplib/src/mutexes.rs @@ -310,6 +310,15 @@ pub(crate) fn map_default_mutex( Ok(format!("{tmpl}")) } +pub fn map_df_mutex( + inner: &mut MutexGuard, + df: DataFrame, + graph: NamedGraph, +) -> PyResult<()> { + inner.map_df(&df, &graph).map_err(PyMaplibError::from)?; + Ok(()) +} + pub(crate) fn query_mutex( inner: &mut MutexGuard, query: String, diff --git a/py_maplib/src/py_model.rs b/py_maplib/src/py_model.rs index 285c1f7..3840463 100644 --- a/py_maplib/src/py_model.rs +++ b/py_maplib/src/py_model.rs @@ -5,7 +5,7 @@ use crate::mutexes::{ map_default_mutex, map_json_mutex, map_mutex, map_triples_mutex, map_xml_mutex, query_mutex, read_mutex, read_template_mutex, reads_mutex, size_mutex, truncate_graph_mutex, update_mutex, validate_mutex, write_cim_xml_mutex, write_native_parquet_mutex, write_triples_mutex, - writes_mutex, + writes_mutex, map_df_mutex }; use crate::shacl::{PyValidationReport, SHACL_RESULTS_QUERY}; use crate::{ @@ -241,6 +241,19 @@ impl PyModel { }) } + #[pyo3(signature = (df, graph=None))] + #[instrument(skip_all)] + fn map_df(&self, py: Python<'_>, df: &Bound<'_, PyAny>, graph: Option) -> PyResult<()> { + let (df, _) = data_to_mappings_types(df, py)?; + py.detach(move || -> PyResult<()> { + let mut inner = self.inner.lock().unwrap(); + let graph = parse_optional_named_node(graph)?; + let named_graph = NamedGraph::from_maybe_named_node(graph.as_ref()); + map_df_mutex(&mut inner, df, named_graph) + }) + } + + /// Starts a graph explorer session. /// /// To run from Jupyter Notebook use: diff --git a/py_maplib/tests/test_map_df.py b/py_maplib/tests/test_map_df.py new file mode 100644 index 0000000..d3615de --- /dev/null +++ b/py_maplib/tests/test_map_df.py @@ -0,0 +1,102 @@ +import pytest +from .disk import disk_params +from maplib import ( + Model, + xsd, + RDFType, +) +import polars as pl +import pathlib + +pl.Config.set_fmt_str_lengths(300) + +PATH_HERE = pathlib.Path(__file__).parent +TESTDATA_PATH = PATH_HERE / "testdata" / "jsons" + +@pytest.mark.parametrize("disk", disk_params()) +def test_map_df_easy(disk): + df = pl.DataFrame({ + "a": ["abc", "def"], + "b": [1,2] + }) + m = Model() + m.map_df(df) + out_df = m.query(""" + SELECT * WHERE { + ?root a fx:root . + ?root fx:child ?row . + ?row xyz:a ?a . + ?row fx:childNumber ?ch_num . + BIND(IRI(CONCAT("urn:my_ns:", ?a)) AS ?a_uri) + } + """) + assert out_df.shape == (2,5) + +@pytest.mark.parametrize("disk", disk_params()) +def test_map_df_named_graph_arg(disk): + df = pl.DataFrame({ + "a": ["abc", "def"], + "b": [1,2] + }) + m = Model() + m.map_df(df, "urn:maplib:test") + out_df = m.query(""" + SELECT * WHERE { + ?root a fx:root . + ?root fx:child ?row . + ?row xyz:a ?a . + ?row fx:childNumber ?ch_num . + BIND(IRI(CONCAT("urn:my_ns:", ?a)) AS ?a_uri) + } + """, graph="urn:maplib:test") + assert out_df.height == 2 + +def test_map_df_float(): + df = pl.DataFrame( + { + "a": [ + "abc", + "def", + "ghi", + ], + "b": [ + 1.4, + 4.2, + 7.8, + ], + } + ) + m = Model() + m.map_df(df) + res = m.query( + """ + SELECT ?a ?c WHERE {?a xyz:b ?c} ORDER BY ?a ?c + """, solution_mappings=True + ) + print(res) + assert res.rdf_types["c"] == RDFType.Literal(xsd.double) + +def test_map_df_datetime(): + df = pl.DataFrame( + { + "a": [ + "abc", + "def", + "ghi", + ], + "b": [ + "2019-05-15T00:00:00", + "2020-02-02T00:00:00", + "2007-07-07T00:00:00", + ], + } + ) + df = df.with_columns(pl.col("b").str.to_datetime()) + m = Model() + m.map_df(df) + res = m.query( + """ + SELECT ?a ?c WHERE {?a xyz:b ?c} ORDER BY ?a ?c + """, solution_mappings=True + ) + assert res.rdf_types["c"] == RDFType.Literal(xsd.dateTime) \ No newline at end of file