Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ polars = { version = "0.52.0", features = ["simd", "nightly", "new_streaming", "
polars-core = { version = "0.52.0" }
rayon = "1.10.0"
regex = "1.11.1"
oxrdf = { version="0.2.4" }
oxrdf = { version = "0.2.4" }
oxrdfio = { version = "0.1.7" }
oxttl = { version="0.1.7" }
oxiri = "0.2.11"
Expand Down
22 changes: 20 additions & 2 deletions lib/maplib/src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,16 @@ impl Model {
self.truncate_graph(&graph)
}
self.triplestore
.read_triples_from_path(p, rdf_format, base_iri, transient, parallel, checked, graph)
.read_triples_from_path(
p,
rdf_format,
base_iri,
transient,
parallel,
checked,
graph,
&self.prefixes,
)
.map_err(MaplibError::TriplestoreError)
}

Expand Down Expand Up @@ -217,7 +226,16 @@ impl Model {
self.truncate_graph(&graph)
}
self.triplestore
.read_triples_from_string(s, rdf_format, base_iri, transient, parallel, checked, graph)
.read_triples_from_string(
s,
rdf_format,
base_iri,
transient,
parallel,
checked,
graph,
&self.prefixes,
)
.map_err(MaplibError::TriplestoreError)
}

Expand Down
3 changes: 2 additions & 1 deletion lib/query_processing/src/cats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ pub fn create_compatible_cats(
if need_native_cat_cast {
let locals = find_all_locals(&t, &states);
let mut cats = Cats::new_empty(Some(global_cats.read().unwrap().deref()));
let renc_local = cats.merge(locals);
// We do not store local cats to disk
let renc_local = cats.merge(locals, None);
let renc_local: HashMap<_, _> = renc_local
.into_iter()
.map(|(uu, map)| {
Expand Down
10 changes: 7 additions & 3 deletions lib/query_processing/src/expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ pub fn named_node_local_enc(nn: &NamedNode, global_cats: &Cats) -> (Expr, Option
if let Some(enc) = global_cats.encode_iri_slice(&[nn.as_str()]).pop().unwrap() {
(lit(enc).cast(DataType::UInt32), None)
} else {
let (enc, local) = Cats::new_singular_iri(nn.as_str(), global_cats.get_iri_counter());
// We do not store these local cats to disk
let (enc, local) = Cats::new_singular_iri(nn.as_str(), global_cats.get_iri_counter(), None);
(lit(enc).cast(DataType::UInt32), Some(local))
}
}
Expand All @@ -46,7 +47,9 @@ pub fn blank_node_local_enc(bl: &BlankNode, global_cats: &Cats) -> (Expr, Option
if let Some(enc) = global_cats.encode_blanks(&[bl.as_str()]).pop().unwrap() {
(lit(enc).cast(DataType::UInt32), None)
} else {
let (enc, local) = Cats::new_singular_blank(bl.as_str(), global_cats.get_iri_counter());
// Local cats not stored to disk, hence None path
let (enc, local) =
Cats::new_singular_blank(bl.as_str(), global_cats.get_iri_counter(), None);
(lit(enc).cast(DataType::UInt32), Some(local))
}
}
Expand All @@ -67,7 +70,8 @@ pub fn maybe_literal_enc(l: &Literal, global_cats: &Cats) -> (Expr, BaseRDFNodeT
} else {
let dt = l.datatype().into_owned();
let offset = global_cats.get_literal_counter(&dt);
let (enc, local) = Cats::new_singular_literal(l.value(), dt, offset);
// Local cats are not stored to disk, hence None
let (enc, local) = Cats::new_singular_literal(l.value(), dt, offset, None);
(
lit(enc).cast(DataType::UInt32),
bt,
Expand Down
90 changes: 53 additions & 37 deletions lib/representation/src/cats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod decode;
mod encode;
mod globalize;
mod image;
pub mod maps;
mod re_encode;

pub use decode::*;
Expand All @@ -11,14 +12,14 @@ pub use image::*;
pub use re_encode::*;
use std::cmp;

use crate::cats::maps::CatMaps;
use crate::BaseRDFNodeType;
use nohash_hasher::NoHashHasher;
use oxrdf::vocab::xsd;
use oxrdf::{NamedNode, NamedNodeRef};
use polars::prelude::DataFrame;
use std::collections::{BTreeMap, HashMap};
use std::hash::BuildHasherDefault;
use std::collections::HashMap;
use std::ops::Deref;
use std::path::Path;
use std::sync::Arc;
use std::sync::RwLock;
use uuid::Uuid;
Expand All @@ -29,7 +30,6 @@ pub const SUBJECT_RANK_COL_NAME: &str = "subject_rank";
pub fn literal_is_cat(nn: NamedNodeRef) -> bool {
nn == xsd::STRING
}
type RevMap = HashMap<u32, Arc<String>, BuildHasherDefault<NoHashHasher<u32>>>;

#[derive(Debug)]
pub struct CatTriples {
Expand Down Expand Up @@ -79,9 +79,37 @@ pub struct EncodedTriples {

#[derive(Debug, Clone)]
pub struct CatEncs {
// We use a BTree map to keep strings sorted
pub map: BTreeMap<Arc<String>, u32>,
pub rev_map: RevMap,
pub maps: CatMaps,
}

impl CatEncs {
pub(crate) fn new_remap(
other: &CatEncs,
path: Option<&Path>,
c: &mut u32,
) -> (CatEncs, CatReEnc) {
let (maps, remap) = CatMaps::new_remap(&other.maps, path, c);
let encs = CatEncs { maps };
(encs, remap)
}
}

impl CatEncs {
pub fn contains_str(&self, s: &str) -> bool {
self.maps.contains_str(s)
}

pub fn contains_u32(&self, u: &u32) -> bool {
self.maps.contains_u32(u)
}

pub(crate) fn is_empty(&self) -> bool {
self.maps.is_empty()
}

pub fn merge(&mut self, other: &CatEncs, c: &mut u32) -> CatReEnc {
self.maps.merge(&other.maps, c)
}
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -148,23 +176,6 @@ impl From<Arc<RwLock<Cats>>> for LockedCats {
}
}

pub struct ReverseLookup<'a> {
rev_map: &'a RevMap,
}

impl<'a> ReverseLookup<'a> {}

impl ReverseLookup<'_> {
pub fn new<'a>(rev_map: &'a RevMap) -> ReverseLookup<'a> {
ReverseLookup { rev_map }
}

pub fn lookup(&self, u: &u32) -> Option<&str> {
let s = self.rev_map.get(u);
s.map(|x| x.as_str())
}
}

#[derive(Debug, Clone)]
pub struct Cats {
pub cat_map: HashMap<CatType, CatEncs>,
Expand All @@ -175,29 +186,34 @@ pub struct Cats {
}

impl Cats {
pub fn new_singular_literal(l: &str, dt: NamedNode, u: u32) -> (u32, Cats) {
pub fn new_singular_literal(
l: &str,
dt: NamedNode,
u: u32,
path: Option<&Path>,
) -> (u32, Cats) {
let t = CatType::Literal(dt);
let catenc = CatEncs::new_singular(l, u);
let catenc = CatEncs::new_singular(l, u, path);
(u, Cats::from_map(HashMap::from([(t, catenc)])))
}

pub fn new_singular_blank(s: &str, u: u32) -> (u32, Cats) {
pub fn new_singular_blank(s: &str, u: u32, path: Option<&Path>) -> (u32, Cats) {
let t = CatType::Blank;
let catenc = CatEncs::new_singular(s, u);
let catenc = CatEncs::new_singular(s, u, path);
(u, Cats::from_map(HashMap::from([(t, catenc)])))
}

pub fn new_singular_iri(s: &str, u: u32) -> (u32, Cats) {
pub fn new_singular_iri(s: &str, u: u32, path: Option<&Path>) -> (u32, Cats) {
let t = CatType::IRI;
let catenc = CatEncs::new_singular(s, u);
let catenc = CatEncs::new_singular(s, u, path);
(u, Cats::from_map(HashMap::from([(t, catenc)])))
}
}

impl Cats {
pub fn get_reverse_lookup(&self, bt: &BaseRDFNodeType) -> ReverseLookup<'_> {
pub fn get_cat_encs(&self, bt: &BaseRDFNodeType) -> &CatEncs {
let ct = CatType::from_base_rdf_node_type(bt);
ReverseLookup::new(&self.cat_map.get(&ct).unwrap().rev_map)
self.cat_map.get(&ct).unwrap()
}

pub(crate) fn from_map(cat_map: HashMap<CatType, CatEncs>) -> Self {
Expand Down Expand Up @@ -239,15 +255,15 @@ impl Cats {
fn calc_new_blank_counter(&self) -> u32 {
let mut counter = 0;
if let Some(enc) = self.cat_map.get(&CatType::Blank) {
counter = cmp::max(enc.rev_map.keys().max().unwrap() + 1, counter);
counter = cmp::max(enc.counter() + 1, counter);
}
counter
}

fn calc_new_iri_counter(&self) -> u32 {
let mut counter = 0;
if let Some(enc) = self.cat_map.get(&CatType::IRI) {
counter = cmp::max(enc.rev_map.keys().max().unwrap() + 1, counter);
counter = cmp::max(enc.counter() + 1, counter);
}
counter
}
Expand All @@ -256,15 +272,15 @@ impl Cats {
let mut map = HashMap::new();
for (p, cat) in &self.cat_map {
if let CatType::Literal(nn) = p {
let counter = cat.rev_map.keys().max().unwrap() + 1;
let counter = cat.counter() + 1;
map.insert(nn.clone(), counter);
}
}
map
}

fn get_counter(&self, c: &CatType) -> u32 {
match c {
fn get_counter(&self, t: &CatType) -> u32 {
match t {
CatType::IRI => self.get_iri_counter(),
CatType::Blank => self.get_blank_counter(),
CatType::Literal(nn) => self.get_literal_counter(nn),
Expand Down
20 changes: 5 additions & 15 deletions lib/representation/src/cats/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,20 @@ use polars::frame::column::ScalarColumn;
use polars::frame::DataFrame;
use polars::prelude::{col, Column, Expr, IntoColumn, IntoLazy, NamedFrom, SeriesSealed};
use polars::series::Series;
use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator};
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use std::borrow::Cow;

impl CatEncs {
pub fn decode(&self, ser: &Series) -> Series {
pub fn decode_series(&self, ser: &Series) -> Series {
let original_name = ser.name().clone();
let uch: Vec<_> = ser.u32().unwrap().iter().collect();
let decoded_vec_iter = uch
.into_par_iter()
.map(|x| x.map(|x| self.rev_map.get(&x).unwrap()));

let decoded_vec: Vec<_> = decoded_vec_iter.map(|x| x.map(|x| x.as_str())).collect();
let decoded_vec = self.maps.decode_batch(uch.as_ref());
let new_ser = Series::new(original_name, decoded_vec);
//
// let mut new_ser = if let CatType::Prefix(pre) = cat_type {
// Series::from_iter(decoded_ser.map(|x| x.map(|x| format!("{}{}", pre.as_str(), x))))
// } else {
// Series::from_iter(decoded_ser.map(|x| x.map(|x| x.as_str())))
// };
new_ser
}

pub fn maybe_decode_string(&self, u: &u32) -> Option<&str> {
self.rev_map.get(u).map(|x| x.as_str())
self.maps.maybe_decode(u)
}
}

Expand Down Expand Up @@ -68,7 +58,7 @@ impl Cats {
pub fn decode_of_type(&self, ser: &Series, bt: &BaseRDFNodeType) -> Series {
let ct = CatType::from_base_rdf_node_type(bt);
if let Some(enc) = self.cat_map.get(&ct) {
enc.decode(ser)
enc.decode_series(ser)
} else {
unreachable!("Should never be called when type does not exist")
}
Expand Down
Loading