Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions lib/maplib/src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,13 @@ impl Model {
.map_err(MaplibError::TriplestoreError)
}

#[instrument(skip_all)]
pub fn map_df(&mut self, df: &DataFrame, graph: &NamedGraph) -> Result<(), MaplibError> {
self.triplestore
.map_df(df, graph)
.map_err(MaplibError::TriplestoreError)
}

#[instrument(skip_all)]
pub fn map_xml_path(
&mut self,
Expand Down
1 change: 1 addition & 0 deletions lib/triplestore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ simd-json.workspace = true
serde_json.workspace = true
ordered-float.workspace = true
quick-xml.workspace = true
uri_encode.workspace = true

pyo3 = { workspace = true, optional = true }

Expand Down
1 change: 1 addition & 0 deletions lib/triplestore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ extern crate core;
pub mod cats;
mod dblf;
pub mod errors;
mod map_df;
mod map_json;
mod map_xml;
pub mod native_parquet_write;
Expand Down
132 changes: 132 additions & 0 deletions lib/triplestore/src/map_df.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
use crate::errors::TriplestoreError;
use crate::{TriplesToAdd, Triplestore};
use oxrdf::vocab::{rdf, xsd};
use oxrdf::NamedNode;
use polars::prelude::{col, lit, DataFrame, IntoLazy};
use polars_core::prelude::{Column, IntoColumn, NamedFrom, Series};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use representation::constants::MAPLIB_PREFIX_IRI;
use representation::dataset::NamedGraph;
use representation::polars_to_rdf::polars_type_to_literal_type;
use representation::{BaseRDFNodeType, OBJECT_COL_NAME, SUBJECT_COL_NAME};
use std::collections::HashMap;

const FACADE_X_ROOT: &str = "http://sparql.xyz/facade-x/ns/root";
const FACADE_X_CHILD: &str = "http://sparql.xyz/facade-x/ns/child";
const FACADE_X_CHILD_NUMBER: &str = "http://sparql.xyz/facade-x/ns/childNumber";
const DEFAULT_FACADE_X_DATA_PREFIX: &str = "http://sparql.xyz/facade-x/data/";

impl Triplestore {
pub fn map_df(
&mut self,
df: &DataFrame,
named_graph: &NamedGraph,
) -> Result<(), TriplestoreError> {
let mut column_types = HashMap::new();
let col_names: Vec<_> = df.columns().iter().map(|x| x.name().to_string()).collect();
for c in df.columns() {
//Todo handle
let dt = polars_type_to_literal_type(c.dtype()).unwrap();
column_types.insert(c.name().to_string(), dt);
}
let id_col = uuid::Uuid::new_v4().to_string();
let mut df = df.clone();
let root_node_uuri = format!("{}{}", MAPLIB_PREFIX_IRI, uuid::Uuid::new_v4().to_string());
let uuids: Vec<_> = (0..df.height())
.into_par_iter()
.map(|_| format!("{}{}", MAPLIB_PREFIX_IRI, uuid::Uuid::new_v4().to_string()))
.collect();
df.with_column(Series::new(id_col.as_str().into(), uuids).into_column())
.unwrap();
let mut triples_to_add = Vec::new();
for c in &col_names {
let subj_type = BaseRDFNodeType::IRI;
let obj_type = column_types.get(c).unwrap().clone();
let subj_state = subj_type.default_input_cat_state();

if obj_type.is_multi() {
todo!()
} else {
let base_obj_type = obj_type.get_base_type().unwrap();
let base_obj_state = obj_type.get_base_state().unwrap();
let tta = TriplesToAdd {
df: df
.clone()
.lazy()
.select([
col(id_col.clone()).alias(SUBJECT_COL_NAME),
col(c).alias(OBJECT_COL_NAME),
])
.collect()
.unwrap(),
subject_type: subj_type,
object_type: base_obj_type.clone(),
predicate: Some(NamedNode::new_unchecked(format!(
"{}{}",
DEFAULT_FACADE_X_DATA_PREFIX,
uri_encode::encode_uri(c)
))),
graph: named_graph.clone(),
subject_cat_state: subj_state,
object_cat_state: base_obj_state.clone(),
predicate_cat_state: None,
};
triples_to_add.push(tta);
}
}
let tta_children = TriplesToAdd {
df: df
.clone()
.lazy()
.select([
lit(root_node_uuri.as_str()).alias(SUBJECT_COL_NAME),
col(id_col.clone()).alias(OBJECT_COL_NAME),
])
.collect()
.unwrap(),
subject_type: BaseRDFNodeType::IRI,
object_type: BaseRDFNodeType::IRI,
predicate: Some(NamedNode::new_unchecked(FACADE_X_CHILD.to_string())),
graph: named_graph.clone(),
subject_cat_state: BaseRDFNodeType::IRI.default_input_cat_state(),
object_cat_state: BaseRDFNodeType::IRI.default_input_cat_state(),
predicate_cat_state: None,
};
let num_dt = BaseRDFNodeType::Literal(xsd::UNSIGNED_INT.into_owned());
let tta_child_num = TriplesToAdd {
df: df
.clone()
.lazy()
.select([col(id_col).alias(SUBJECT_COL_NAME)])
.with_row_index(OBJECT_COL_NAME.to_string(), None)
.collect()
.unwrap(),
subject_type: BaseRDFNodeType::IRI,
object_type: num_dt.clone(),
predicate: Some(NamedNode::new_unchecked(FACADE_X_CHILD_NUMBER.to_string())),
graph: named_graph.clone(),
subject_cat_state: BaseRDFNodeType::IRI.default_input_cat_state(),
object_cat_state: num_dt.default_input_cat_state(),
predicate_cat_state: None,
};

let mut root_cols = Vec::new();
root_cols.push(Column::new(SUBJECT_COL_NAME.into(), vec![root_node_uuri]));
root_cols.push(Column::new(OBJECT_COL_NAME.into(), vec![FACADE_X_ROOT]));
let root = TriplesToAdd {
df: DataFrame::new(1, root_cols).unwrap(),
subject_type: BaseRDFNodeType::IRI,
object_type: BaseRDFNodeType::IRI,
predicate: Some(rdf::TYPE.into_owned()),
graph: named_graph.clone(),
subject_cat_state: BaseRDFNodeType::IRI.default_input_cat_state(),
object_cat_state: BaseRDFNodeType::IRI.default_input_cat_state(),
predicate_cat_state: None,
};
triples_to_add.push(tta_children);
triples_to_add.push(tta_child_num);
triples_to_add.push(root);
self.add_triples_vec(triples_to_add, false)?;
Ok(())
}
}
18 changes: 18 additions & 0 deletions py_maplib/maplib/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,24 @@ class Model:
:return: The generated template
"""


def map_df(
self,
df: DataFrame,
graph: str = None,
):
"""
Create a default template and map it based on a dataframe.
Usage:

>>> df = pl.read_csv("my_csv.csv")
>>> m.map_df(df)

:param df: DataFrame to map using Facade-X (using approximately the CSV-mapping)
:param graph: The IRI of the graph to add triples to.
:return: None
"""

def explore(
self,
host: str = "localhost",
Expand Down
9 changes: 9 additions & 0 deletions py_maplib/src/mutexes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,15 @@ pub(crate) fn map_default_mutex(
Ok(format!("{tmpl}"))
}

pub fn map_df_mutex(
inner: &mut MutexGuard<InnerModel>,
df: DataFrame,
graph: NamedGraph,
) -> PyResult<()> {
inner.map_df(&df, &graph).map_err(PyMaplibError::from)?;
Ok(())
}

pub(crate) fn query_mutex(
inner: &mut MutexGuard<InnerModel>,
query: String,
Expand Down
15 changes: 14 additions & 1 deletion py_maplib/src/py_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::mutexes::{
map_default_mutex, map_json_mutex, map_mutex, map_triples_mutex, map_xml_mutex, query_mutex,
read_mutex, read_template_mutex, reads_mutex, size_mutex, truncate_graph_mutex, update_mutex,
validate_mutex, write_cim_xml_mutex, write_native_parquet_mutex, write_triples_mutex,
writes_mutex,
writes_mutex, map_df_mutex
};
use crate::shacl::{PyValidationReport, SHACL_RESULTS_QUERY};
use crate::{
Expand Down Expand Up @@ -241,6 +241,19 @@ impl PyModel {
})
}

#[pyo3(signature = (df, graph=None))]
#[instrument(skip_all)]
fn map_df(&self, py: Python<'_>, df: &Bound<'_, PyAny>, graph: Option<String>) -> PyResult<()> {
let (df, _) = data_to_mappings_types(df, py)?;
py.detach(move || -> PyResult<()> {
let mut inner = self.inner.lock().unwrap();
let graph = parse_optional_named_node(graph)?;
let named_graph = NamedGraph::from_maybe_named_node(graph.as_ref());
map_df_mutex(&mut inner, df, named_graph)
})
}


/// Starts a graph explorer session.
///
/// To run from Jupyter Notebook use:
Expand Down
102 changes: 102 additions & 0 deletions py_maplib/tests/test_map_df.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import pytest
from .disk import disk_params
from maplib import (
Model,
xsd,
RDFType,
)
import polars as pl
import pathlib

pl.Config.set_fmt_str_lengths(300)

PATH_HERE = pathlib.Path(__file__).parent
TESTDATA_PATH = PATH_HERE / "testdata" / "jsons"

@pytest.mark.parametrize("disk", disk_params())
def test_map_df_easy(disk):
df = pl.DataFrame({
"a": ["abc", "def"],
"b": [1,2]
})
m = Model()
m.map_df(df)
out_df = m.query("""
SELECT * WHERE {
?root a fx:root .
?root fx:child ?row .
?row xyz:a ?a .
?row fx:childNumber ?ch_num .
BIND(IRI(CONCAT("urn:my_ns:", ?a)) AS ?a_uri)
}
""")
assert out_df.shape == (2,5)

@pytest.mark.parametrize("disk", disk_params())
def test_map_df_named_graph_arg(disk):
df = pl.DataFrame({
"a": ["abc", "def"],
"b": [1,2]
})
m = Model()
m.map_df(df, "urn:maplib:test")
out_df = m.query("""
SELECT * WHERE {
?root a fx:root .
?root fx:child ?row .
?row xyz:a ?a .
?row fx:childNumber ?ch_num .
BIND(IRI(CONCAT("urn:my_ns:", ?a)) AS ?a_uri)
}
""", graph="urn:maplib:test")
assert out_df.height == 2

def test_map_df_float():
df = pl.DataFrame(
{
"a": [
"abc",
"def",
"ghi",
],
"b": [
1.4,
4.2,
7.8,
],
}
)
m = Model()
m.map_df(df)
res = m.query(
"""
SELECT ?a ?c WHERE {?a xyz:b ?c} ORDER BY ?a ?c
""", solution_mappings=True
)
print(res)
assert res.rdf_types["c"] == RDFType.Literal(xsd.double)

def test_map_df_datetime():
df = pl.DataFrame(
{
"a": [
"abc",
"def",
"ghi",
],
"b": [
"2019-05-15T00:00:00",
"2020-02-02T00:00:00",
"2007-07-07T00:00:00",
],
}
)
df = df.with_columns(pl.col("b").str.to_datetime())
m = Model()
m.map_df(df)
res = m.query(
"""
SELECT ?a ?c WHERE {?a xyz:b ?c} ORDER BY ?a ?c
""", solution_mappings=True
)
assert res.rdf_types["c"] == RDFType.Literal(xsd.dateTime)
Loading