diff --git a/Cargo.lock b/Cargo.lock index f37e40aa..595a1297 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2451,7 +2451,7 @@ dependencies = [ [[package]] name = "py_maplib" -version = "0.18.18" +version = "0.18.17" dependencies = [ "chrono", "cimxml", diff --git a/Cargo.toml b/Cargo.toml index f7d746eb..d9b01642 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ polars = { version = "0.52.0", features = ["simd", "nightly", "new_streaming", " polars-core = { version = "0.52.0" } rayon = "1.10.0" regex = "1.11.1" -oxrdf = { version="0.2.4" } +oxrdf = { version = "0.2.4" } oxrdfio = { version = "0.1.7" } oxttl = { version="0.1.7" } oxiri = "0.2.11" diff --git a/lib/maplib/src/model.rs b/lib/maplib/src/model.rs index 4ef2cfcb..4938182f 100644 --- a/lib/maplib/src/model.rs +++ b/lib/maplib/src/model.rs @@ -184,7 +184,16 @@ impl Model { self.truncate_graph(&graph) } self.triplestore - .read_triples_from_path(p, rdf_format, base_iri, transient, parallel, checked, graph) + .read_triples_from_path( + p, + rdf_format, + base_iri, + transient, + parallel, + checked, + graph, + &self.prefixes, + ) .map_err(MaplibError::TriplestoreError) } @@ -217,7 +226,16 @@ impl Model { self.truncate_graph(&graph) } self.triplestore - .read_triples_from_string(s, rdf_format, base_iri, transient, parallel, checked, graph) + .read_triples_from_string( + s, + rdf_format, + base_iri, + transient, + parallel, + checked, + graph, + &self.prefixes, + ) .map_err(MaplibError::TriplestoreError) } diff --git a/lib/query_processing/src/cats.rs b/lib/query_processing/src/cats.rs index 160eaaa4..ef52d8e4 100644 --- a/lib/query_processing/src/cats.rs +++ b/lib/query_processing/src/cats.rs @@ -29,7 +29,8 @@ pub fn create_compatible_cats( if need_native_cat_cast { let locals = find_all_locals(&t, &states); let mut cats = Cats::new_empty(Some(global_cats.read().unwrap().deref())); - let renc_local = cats.merge(locals); + // We do not store local cats to disk + let renc_local = cats.merge(locals, None); let renc_local: HashMap<_, _> = renc_local .into_iter() .map(|(uu, map)| { diff --git a/lib/query_processing/src/expressions.rs b/lib/query_processing/src/expressions.rs index fa8332a9..1578e26f 100644 --- a/lib/query_processing/src/expressions.rs +++ b/lib/query_processing/src/expressions.rs @@ -32,7 +32,8 @@ pub fn named_node_local_enc(nn: &NamedNode, global_cats: &Cats) -> (Expr, Option if let Some(enc) = global_cats.encode_iri_slice(&[nn.as_str()]).pop().unwrap() { (lit(enc).cast(DataType::UInt32), None) } else { - let (enc, local) = Cats::new_singular_iri(nn.as_str(), global_cats.get_iri_counter()); + // We do not store these local cats to disk + let (enc, local) = Cats::new_singular_iri(nn.as_str(), global_cats.get_iri_counter(), None); (lit(enc).cast(DataType::UInt32), Some(local)) } } @@ -46,7 +47,9 @@ pub fn blank_node_local_enc(bl: &BlankNode, global_cats: &Cats) -> (Expr, Option if let Some(enc) = global_cats.encode_blanks(&[bl.as_str()]).pop().unwrap() { (lit(enc).cast(DataType::UInt32), None) } else { - let (enc, local) = Cats::new_singular_blank(bl.as_str(), global_cats.get_iri_counter()); + // Local cats not stored to disk, hence None path + let (enc, local) = + Cats::new_singular_blank(bl.as_str(), global_cats.get_iri_counter(), None); (lit(enc).cast(DataType::UInt32), Some(local)) } } @@ -67,7 +70,8 @@ pub fn maybe_literal_enc(l: &Literal, global_cats: &Cats) -> (Expr, BaseRDFNodeT } else { let dt = l.datatype().into_owned(); let offset = global_cats.get_literal_counter(&dt); - let (enc, local) = Cats::new_singular_literal(l.value(), dt, offset); + // Local cats are not stored to disk, hence None + let (enc, local) = Cats::new_singular_literal(l.value(), dt, offset, None); ( lit(enc).cast(DataType::UInt32), bt, diff --git a/lib/representation/src/cats.rs b/lib/representation/src/cats.rs index b884358d..0df737cc 100644 --- a/lib/representation/src/cats.rs +++ b/lib/representation/src/cats.rs @@ -2,6 +2,7 @@ mod decode; mod encode; mod globalize; mod image; +pub mod maps; mod re_encode; pub use decode::*; @@ -11,14 +12,14 @@ pub use image::*; pub use re_encode::*; use std::cmp; +use crate::cats::maps::CatMaps; use crate::BaseRDFNodeType; -use nohash_hasher::NoHashHasher; use oxrdf::vocab::xsd; use oxrdf::{NamedNode, NamedNodeRef}; use polars::prelude::DataFrame; -use std::collections::{BTreeMap, HashMap}; -use std::hash::BuildHasherDefault; +use std::collections::HashMap; use std::ops::Deref; +use std::path::Path; use std::sync::Arc; use std::sync::RwLock; use uuid::Uuid; @@ -29,7 +30,6 @@ pub const SUBJECT_RANK_COL_NAME: &str = "subject_rank"; pub fn literal_is_cat(nn: NamedNodeRef) -> bool { nn == xsd::STRING } -type RevMap = HashMap, BuildHasherDefault>>; #[derive(Debug)] pub struct CatTriples { @@ -79,9 +79,37 @@ pub struct EncodedTriples { #[derive(Debug, Clone)] pub struct CatEncs { - // We use a BTree map to keep strings sorted - pub map: BTreeMap, u32>, - pub rev_map: RevMap, + pub maps: CatMaps, +} + +impl CatEncs { + pub(crate) fn new_remap( + other: &CatEncs, + path: Option<&Path>, + c: &mut u32, + ) -> (CatEncs, CatReEnc) { + let (maps, remap) = CatMaps::new_remap(&other.maps, path, c); + let encs = CatEncs { maps }; + (encs, remap) + } +} + +impl CatEncs { + pub fn contains_str(&self, s: &str) -> bool { + self.maps.contains_str(s) + } + + pub fn contains_u32(&self, u: &u32) -> bool { + self.maps.contains_u32(u) + } + + pub(crate) fn is_empty(&self) -> bool { + self.maps.is_empty() + } + + pub fn merge(&mut self, other: &CatEncs, c: &mut u32) -> CatReEnc { + self.maps.merge(&other.maps, c) + } } #[derive(Debug, Clone)] @@ -148,23 +176,6 @@ impl From>> for LockedCats { } } -pub struct ReverseLookup<'a> { - rev_map: &'a RevMap, -} - -impl<'a> ReverseLookup<'a> {} - -impl ReverseLookup<'_> { - pub fn new<'a>(rev_map: &'a RevMap) -> ReverseLookup<'a> { - ReverseLookup { rev_map } - } - - pub fn lookup(&self, u: &u32) -> Option<&str> { - let s = self.rev_map.get(u); - s.map(|x| x.as_str()) - } -} - #[derive(Debug, Clone)] pub struct Cats { pub cat_map: HashMap, @@ -175,29 +186,34 @@ pub struct Cats { } impl Cats { - pub fn new_singular_literal(l: &str, dt: NamedNode, u: u32) -> (u32, Cats) { + pub fn new_singular_literal( + l: &str, + dt: NamedNode, + u: u32, + path: Option<&Path>, + ) -> (u32, Cats) { let t = CatType::Literal(dt); - let catenc = CatEncs::new_singular(l, u); + let catenc = CatEncs::new_singular(l, u, path); (u, Cats::from_map(HashMap::from([(t, catenc)]))) } - pub fn new_singular_blank(s: &str, u: u32) -> (u32, Cats) { + pub fn new_singular_blank(s: &str, u: u32, path: Option<&Path>) -> (u32, Cats) { let t = CatType::Blank; - let catenc = CatEncs::new_singular(s, u); + let catenc = CatEncs::new_singular(s, u, path); (u, Cats::from_map(HashMap::from([(t, catenc)]))) } - pub fn new_singular_iri(s: &str, u: u32) -> (u32, Cats) { + pub fn new_singular_iri(s: &str, u: u32, path: Option<&Path>) -> (u32, Cats) { let t = CatType::IRI; - let catenc = CatEncs::new_singular(s, u); + let catenc = CatEncs::new_singular(s, u, path); (u, Cats::from_map(HashMap::from([(t, catenc)]))) } } impl Cats { - pub fn get_reverse_lookup(&self, bt: &BaseRDFNodeType) -> ReverseLookup<'_> { + pub fn get_cat_encs(&self, bt: &BaseRDFNodeType) -> &CatEncs { let ct = CatType::from_base_rdf_node_type(bt); - ReverseLookup::new(&self.cat_map.get(&ct).unwrap().rev_map) + self.cat_map.get(&ct).unwrap() } pub(crate) fn from_map(cat_map: HashMap) -> Self { @@ -239,7 +255,7 @@ impl Cats { fn calc_new_blank_counter(&self) -> u32 { let mut counter = 0; if let Some(enc) = self.cat_map.get(&CatType::Blank) { - counter = cmp::max(enc.rev_map.keys().max().unwrap() + 1, counter); + counter = cmp::max(enc.counter() + 1, counter); } counter } @@ -247,7 +263,7 @@ impl Cats { fn calc_new_iri_counter(&self) -> u32 { let mut counter = 0; if let Some(enc) = self.cat_map.get(&CatType::IRI) { - counter = cmp::max(enc.rev_map.keys().max().unwrap() + 1, counter); + counter = cmp::max(enc.counter() + 1, counter); } counter } @@ -256,15 +272,15 @@ impl Cats { let mut map = HashMap::new(); for (p, cat) in &self.cat_map { if let CatType::Literal(nn) = p { - let counter = cat.rev_map.keys().max().unwrap() + 1; + let counter = cat.counter() + 1; map.insert(nn.clone(), counter); } } map } - fn get_counter(&self, c: &CatType) -> u32 { - match c { + fn get_counter(&self, t: &CatType) -> u32 { + match t { CatType::IRI => self.get_iri_counter(), CatType::Blank => self.get_blank_counter(), CatType::Literal(nn) => self.get_literal_counter(nn), diff --git a/lib/representation/src/cats/decode.rs b/lib/representation/src/cats/decode.rs index b773ffc6..431145ae 100644 --- a/lib/representation/src/cats/decode.rs +++ b/lib/representation/src/cats/decode.rs @@ -8,30 +8,20 @@ use polars::frame::column::ScalarColumn; use polars::frame::DataFrame; use polars::prelude::{col, Column, Expr, IntoColumn, IntoLazy, NamedFrom, SeriesSealed}; use polars::series::Series; -use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator}; +use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use std::borrow::Cow; impl CatEncs { - pub fn decode(&self, ser: &Series) -> Series { + pub fn decode_series(&self, ser: &Series) -> Series { let original_name = ser.name().clone(); let uch: Vec<_> = ser.u32().unwrap().iter().collect(); - let decoded_vec_iter = uch - .into_par_iter() - .map(|x| x.map(|x| self.rev_map.get(&x).unwrap())); - - let decoded_vec: Vec<_> = decoded_vec_iter.map(|x| x.map(|x| x.as_str())).collect(); + let decoded_vec = self.maps.decode_batch(uch.as_ref()); let new_ser = Series::new(original_name, decoded_vec); - // - // let mut new_ser = if let CatType::Prefix(pre) = cat_type { - // Series::from_iter(decoded_ser.map(|x| x.map(|x| format!("{}{}", pre.as_str(), x)))) - // } else { - // Series::from_iter(decoded_ser.map(|x| x.map(|x| x.as_str()))) - // }; new_ser } pub fn maybe_decode_string(&self, u: &u32) -> Option<&str> { - self.rev_map.get(u).map(|x| x.as_str()) + self.maps.maybe_decode(u) } } @@ -68,7 +58,7 @@ impl Cats { pub fn decode_of_type(&self, ser: &Series, bt: &BaseRDFNodeType) -> Series { let ct = CatType::from_base_rdf_node_type(bt); if let Some(enc) = self.cat_map.get(&ct) { - enc.decode(ser) + enc.decode_series(ser) } else { unreachable!("Should never be called when type does not exist") } diff --git a/lib/representation/src/cats/encode.rs b/lib/representation/src/cats/encode.rs index 9298c6ee..67f3b398 100644 --- a/lib/representation/src/cats/encode.rs +++ b/lib/representation/src/cats/encode.rs @@ -1,4 +1,5 @@ use super::{CatEncs, CatType, Cats, EncodedTriples}; +use crate::cats::maps::CatMaps; use crate::cats::LockedCats; use crate::solution_mapping::{BaseCatState, EagerSolutionMappings}; use crate::{BaseRDFNodeType, RDFNodeState, OBJECT_COL_NAME, SUBJECT_COL_NAME}; @@ -6,34 +7,22 @@ use oxrdf::NamedNode; use polars::frame::DataFrame; use polars::prelude::{col, lit, IntoLazy, Series}; use std::collections::HashMap; -use std::hash::BuildHasherDefault; -use std::sync::Arc; +use std::path::Path; impl CatEncs { - pub fn new_empty() -> CatEncs { - let rev_map = HashMap::with_capacity_and_hasher(2, BuildHasherDefault::default()); + pub fn new_empty(path: Option<&Path>) -> CatEncs { CatEncs { - map: Default::default(), - rev_map, + maps: CatMaps::new_empty(path), } } - pub fn contains_key(&self, s: &str) -> bool { - let s = s.to_string(); - self.map.contains_key(&s) - } - - pub fn new_singular(value: &str, u: u32) -> CatEncs { - let mut sing = Self::new_empty(); - let s = Arc::new(value.to_string()); - sing.map.insert(s.clone(), u); - sing.rev_map.insert(u, s); - sing + pub fn new_singular(value: &str, u: u32, path: Option<&Path>) -> CatEncs { + let maps = CatMaps::new_singular(value, u, path); + CatEncs { maps } } pub fn maybe_encode_str(&self, s: &str) -> Option<&u32> { - let s = Arc::new(s.to_string()); - self.map.get(&s) + self.maps.maybe_encode_string(s) } pub fn encode_new_str(&mut self, s: &str, u: u32) { @@ -41,23 +30,20 @@ impl CatEncs { } pub fn encode_new_string(&mut self, s: String, u: u32) { - let s = Arc::new(s.clone()); - self.map.insert(s.clone(), u); - self.rev_map.insert(u, s); - } - - pub fn encode_new_arc_string(&mut self, s: Arc, u: u32) { - self.map.insert(s.clone(), u); - self.rev_map.insert(u, s); + self.maps.encode_new_string(s, u); } - pub fn height(&self) -> u32 { - self.map.len() as u32 + pub fn counter(&self) -> u32 { + self.maps.counter() } } impl Cats { - pub fn encode_solution_mappings(&self, sm: EagerSolutionMappings) -> EagerSolutionMappings { + pub fn encode_solution_mappings( + &self, + sm: EagerSolutionMappings, + path: Option<&Path>, + ) -> EagerSolutionMappings { let EagerSolutionMappings { mappings, mut rdf_node_types, @@ -87,7 +73,7 @@ impl Cats { let encoded: Vec<_> = to_encode .into_iter() .map(|(c, t, ser)| { - let (enc, local) = self.encode_series(&ser, &t); + let (enc, local) = self.encode_series(&ser, &t, path); (c, t, enc, local) }) .collect(); @@ -109,7 +95,12 @@ impl Cats { EagerSolutionMappings::new(mappings.collect().unwrap(), rdf_node_types) } - pub fn encode_series(&self, series: &Series, t: &BaseRDFNodeType) -> (Series, Option) { + pub fn encode_series( + &self, + series: &Series, + t: &BaseRDFNodeType, + path: Option<&Path>, + ) -> (Series, Option) { let original_name = series.name().clone(); let mut use_height = match t { BaseRDFNodeType::IRI => self.iri_counter, @@ -121,7 +112,7 @@ impl Cats { }; let enc = self.get_encs(t).pop(); - let mut new_enc = CatEncs::new_empty(); + let mut new_enc = CatEncs::new_empty(path); let strch = series.str().unwrap(); let encoded_global: Vec<_> = strch .iter() @@ -158,7 +149,7 @@ impl Cats { }; encoded_global_local.push(encoded); } - let local = if !new_enc.map.is_empty() { + let local = if !new_enc.is_empty() { let cat_type = CatType::from_base_rdf_node_type(t); let mut map = HashMap::new(); map.insert(cat_type, new_enc); @@ -212,7 +203,7 @@ impl Cats { u32s } - pub fn encode_iri_or_local_cat(&self, iri: &str) -> (u32, RDFNodeState) { + pub fn encode_iri_or_local_cat(&self, iri: &str, path: Option<&Path>) -> (u32, RDFNodeState) { let mut v = self.encode_iri_slice(&[iri]); if let Some(u) = v.pop().unwrap() { ( @@ -223,7 +214,7 @@ impl Cats { ), ) } else { - let (u, l) = Cats::new_singular_iri(iri, self.iri_counter); + let (u, l) = Cats::new_singular_iri(iri, self.iri_counter, path); ( u, RDFNodeState::from_bases( @@ -234,7 +225,11 @@ impl Cats { } } - pub fn encode_blank_or_local_cat(&self, blank: &str) -> (u32, RDFNodeState) { + pub fn encode_blank_or_local_cat( + &self, + blank: &str, + path: Option<&Path>, + ) -> (u32, RDFNodeState) { let mut v = self.encode_blanks(&[blank]); if let Some(u) = v.pop().unwrap() { ( @@ -245,7 +240,7 @@ impl Cats { ), ) } else { - let (u, l) = Cats::new_singular_blank(blank, self.blank_counter); + let (u, l) = Cats::new_singular_blank(blank, self.blank_counter, path); ( u, RDFNodeState::from_bases( @@ -264,6 +259,7 @@ pub fn encode_triples( subject_cat_state: BaseCatState, object_cat_state: BaseCatState, global_cats: &Cats, + path: Option<&Path>, ) -> (Vec, EncodedTriples) { let mut map = HashMap::new(); map.insert( @@ -274,7 +270,7 @@ pub fn encode_triples( OBJECT_COL_NAME.to_string(), RDFNodeState::from_bases(object_type.clone(), object_cat_state), ); - let mut sm = global_cats.encode_solution_mappings(EagerSolutionMappings::new(df, map)); + let mut sm = global_cats.encode_solution_mappings(EagerSolutionMappings::new(df, map), path); let mut subject_state = sm.rdf_node_types.remove(SUBJECT_COL_NAME).unwrap(); let mut object_state = sm.rdf_node_types.remove(OBJECT_COL_NAME).unwrap(); diff --git a/lib/representation/src/cats/globalize.rs b/lib/representation/src/cats/globalize.rs index 0bccd89a..e2e8cdbb 100644 --- a/lib/representation/src/cats/globalize.rs +++ b/lib/representation/src/cats/globalize.rs @@ -3,32 +3,35 @@ use crate::solution_mapping::BaseCatState; use crate::BaseRDFNodeType; use oxrdf::NamedNode; use polars::frame::DataFrame; -use std::sync::Arc; +use std::path::Path; use tracing::instrument; impl Cats { - pub fn globalize(&mut self, mut cat_triples: Vec) -> Vec { + pub fn globalize( + &mut self, + mut cat_triples: Vec, + path: Option<&Path>, + ) -> Vec { let local_cats: Vec<_> = cat_triples .iter_mut() .map(|x| x.local_cats.drain(..)) .flatten() .collect(); - let re_enc_map = self.merge(local_cats); + let re_enc_map = self.merge(local_cats, path); let global_cat_triples = re_encode(cat_triples, re_enc_map); global_cat_triples } - pub fn encode_predicates(&mut self, cat_triples: &Vec) { + pub fn encode_predicates(&mut self, cat_triples: &Vec, path: Option<&Path>) { for ct in cat_triples { let t = CatType::IRI; if !self.cat_map.contains_key(&t) { - self.cat_map.insert(t.clone(), CatEncs::new_empty()); + self.cat_map.insert(t.clone(), CatEncs::new_empty(path)); } let enc = self.cat_map.get_mut(&t).unwrap(); let pred = ct.predicate.as_str().to_string(); - if !enc.map.contains_key(&pred) { - let arc_pred = Arc::new(pred); - enc.encode_new_arc_string(arc_pred.clone(), self.iri_counter); + if !enc.maps.contains_str(&pred) { + enc.encode_new_string(pred, self.iri_counter); self.iri_counter += 1; } } @@ -44,6 +47,7 @@ pub fn cat_encode_triples( subject_cat_state: BaseCatState, object_cat_state: BaseCatState, global_cats: &Cats, + path: Option<&Path>, ) -> CatTriples { let (local_cats, encoded_triples) = encode_triples( df, @@ -52,6 +56,7 @@ pub fn cat_encode_triples( subject_cat_state, object_cat_state, global_cats, + path, ); CatTriples { diff --git a/lib/representation/src/cats/image.rs b/lib/representation/src/cats/image.rs index 7d7811c0..833735bd 100644 --- a/lib/representation/src/cats/image.rs +++ b/lib/representation/src/cats/image.rs @@ -4,10 +4,10 @@ use crate::cats::LockedCats; use crate::solution_mapping::{BaseCatState, EagerSolutionMappings}; use crate::{BaseRDFNodeType, RDFNodeState}; use nohash_hasher::NoHashHasher; -use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::{HashMap, HashSet}; use std::hash::BuildHasherDefault; use std::ops::Deref; +use std::path::Path; use std::sync::Arc; impl Cats { @@ -43,7 +43,7 @@ impl Cats { let local_read_ref = local_read.as_ref().map(|x| x.deref()); let local_rev_map = if let Some(local) = local_read_ref { - Some(local.get_reverse_lookup(bt)) + Some(local.get_cat_encs(bt)) } else { None }; @@ -55,7 +55,7 @@ impl Cats { .filter(|x| x.is_some()) .map(|x| x.unwrap()); if let Some(local_rev_map) = local_rev_map { - ss.extend(new_ss.filter(|x| !local_rev_map.rev_map.contains_key(x))); + ss.extend(new_ss.filter(|x| !local_rev_map.contains_u32(x))); } else { ss.extend(new_ss); } @@ -70,6 +70,7 @@ impl Cats { pub fn merge_solution_mappings_locals( &mut self, sms: &Vec<&EagerSolutionMappings>, + path: Option<&Path>, ) -> HashMap> { let mut local_cats = vec![]; @@ -86,7 +87,7 @@ impl Cats { } } } - let remap = self.merge(local_cats); + let remap = self.merge(local_cats, path); let mut concat_reenc: HashMap< String, HashMap>>>, @@ -145,27 +146,8 @@ impl Cats { impl CatEncs { pub fn image(&self, s: &HashSet) -> Option { - let new_map: BTreeMap<_, _> = s - .par_iter() - .map(|x| { - if let Some(s) = self.rev_map.get(x) { - Some((*x, s.clone())) - } else { - None - } - }) - .filter(|x| x.is_some()) - .map(|x| x.unwrap()) - .map(|(x, y)| (y, x)) - .collect(); - let new_rev_map: HashMap<_, _, BuildHasherDefault>> = - new_map.iter().map(|(x, y)| (*y, x.clone())).collect(); - - if new_map.len() > 0 { - Some(CatEncs { - map: new_map, - rev_map: new_rev_map, - }) + if let Some(maps) = self.maps.image(s) { + Some(CatEncs { maps }) } else { None } @@ -175,10 +157,11 @@ impl CatEncs { pub fn new_solution_mapping_cats( sms: Vec, global_cats: &Cats, + path: Option<&Path>, ) -> (Vec, Cats) { let smsref: Vec<_> = sms.iter().collect(); let mut cats = global_cats.mappings_cat_image(&smsref); - let reenc = cats.merge_solution_mappings_locals(&smsref); + let reenc = cats.merge_solution_mappings_locals(&smsref, path); let new_sms: Vec<_> = sms .into_iter() .map(|sm| reencode_solution_mappings(sm, &reenc)) diff --git a/lib/representation/src/cats/maps.rs b/lib/representation/src/cats/maps.rs new file mode 100644 index 00000000..f98ae264 --- /dev/null +++ b/lib/representation/src/cats/maps.rs @@ -0,0 +1,172 @@ +pub mod in_memory; +pub mod on_disk; + +use crate::cats::maps::in_memory::CatMapsInMemory; +use crate::cats::maps::on_disk::CatMapsOnDisk; +use crate::cats::CatReEnc; +use std::collections::HashSet; +use std::path::Path; +use std::sync::Arc; + +#[derive(Debug, Clone)] +pub enum CatMaps { + InMemory(CatMapsInMemory), + OnDisk(CatMapsOnDisk), +} + +impl CatMaps { + pub(crate) fn new_remap( + maps: &CatMaps, + path: Option<&Path>, + c: &mut u32, + ) -> (CatMaps, CatReEnc) { + if let Some(path) = path { + todo!() + } else { + if let CatMaps::InMemory(mem) = maps { + let (mem, re_enc) = CatMapsInMemory::new_remap(mem, c); + (CatMaps::InMemory(mem), re_enc) + } else { + unreachable!("Should never happen") + } + } + } + + pub(crate) fn inner_join_re_enc(&self, other: &CatMaps) -> Vec<(u32, u32)> { + match self { + CatMaps::InMemory(mem) => { + if let CatMaps::InMemory(other_mem) = other { + mem.inner_join_re_enc(other_mem) + } else { + unreachable!("Should never happen") + } + } + CatMaps::OnDisk(_) => { + todo!() + } + } + } + + pub fn merge(&mut self, other: &CatMaps, c: &mut u32) -> CatReEnc { + match self { + CatMaps::InMemory(mem) => { + if let CatMaps::InMemory(other_mem) = other { + mem.merge(other_mem, c) + } else { + unreachable!("Should never happen") + } + } + CatMaps::OnDisk(_) => { + todo!() + } + } + } + + pub fn contains_str(&self, s: &str) -> bool { + match self { + CatMaps::InMemory(mem) => mem.contains_str(s), + CatMaps::OnDisk(disk) => { + todo!() + } + } + } + + pub fn contains_u32(&self, u: &u32) -> bool { + match self { + CatMaps::InMemory(mem) => mem.contains_u32(u), + CatMaps::OnDisk(disk) => { + todo!() + } + } + } + + pub fn is_empty(&self) -> bool { + match self { + CatMaps::InMemory(mem) => mem.is_empty(), + CatMaps::OnDisk(_) => { + todo!() + } + } + } + + pub fn encode_new_string(&mut self, s: String, u: u32) { + match self { + CatMaps::InMemory(mem) => mem.encode_new_string(s, u), + CatMaps::OnDisk(disk) => { + todo!() + } + } + } + + pub fn maybe_encode_string(&self, s: &str) -> Option<&u32> { + match self { + CatMaps::InMemory(mem) => mem.maybe_encode_string(s), + CatMaps::OnDisk(_) => { + todo!() + } + } + } + + pub fn new_singular(value: &str, u: u32, path: Option<&Path>) -> CatMaps { + if let Some(path) = path { + todo!() + } else { + CatMaps::InMemory(CatMapsInMemory::new_singular(value, u)) + } + } + + pub fn new_empty(path: Option<&Path>) -> CatMaps { + if let Some(path) = path { + todo!() + } else { + CatMaps::InMemory(CatMapsInMemory::new_empty()) + } + } + + pub fn encode(&self, key: &Arc) -> Option<&u32> { + match self { + CatMaps::InMemory(mem) => mem.get(key), + CatMaps::OnDisk(disk) => todo!(), + } + } + + pub fn counter(&self) -> u32 { + match self { + CatMaps::InMemory(m) => m.counter(), + CatMaps::OnDisk(d) => todo!(), + } + } + + pub fn decode_batch(&self, v: &[Option]) -> Vec> { + match self { + CatMaps::InMemory(mem) => mem.decode_batch(v), + CatMaps::OnDisk(disk) => disk.decode_batch(v), + } + } + + pub fn maybe_decode(&self, u: &u32) -> Option<&str> { + match self { + CatMaps::InMemory(mem) => mem.maybe_decode(u), + CatMaps::OnDisk(disk) => disk.maybe_decode_string(u), + } + } + + pub fn image(&self, s: &HashSet) -> Option { + match self { + CatMaps::InMemory(m) => { + if let Some(m) = m.image(s) { + Some(CatMaps::InMemory(m)) + } else { + None + } + } + CatMaps::OnDisk(d) => { + if let Some(m) = d.image(s) { + Some(CatMaps::OnDisk(m)) + } else { + None + } + } + } + } +} diff --git a/lib/representation/src/cats/maps/in_memory.rs b/lib/representation/src/cats/maps/in_memory.rs new file mode 100644 index 00000000..37e94f21 --- /dev/null +++ b/lib/representation/src/cats/maps/in_memory.rs @@ -0,0 +1,185 @@ +use crate::cats::CatReEnc; +use nohash_hasher::NoHashHasher; +use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator}; +use std::collections::{BTreeMap, HashMap, HashSet}; +use std::hash::BuildHasherDefault; +use std::sync::Arc; + +#[derive(Debug, Clone)] +pub struct CatMapsInMemory { + map: BTreeMap, u32>, + rev_map: HashMap, BuildHasherDefault>>, +} + +impl CatMapsInMemory { + pub fn new_remap(maps: &CatMapsInMemory, c: &mut u32) -> (CatMapsInMemory, CatReEnc) { + let mut remap = vec![]; + let mut new_maps = CatMapsInMemory::new_empty(); + for (s, v) in maps.map.iter() { + remap.push((*v, *c)); + new_maps.encode_new_arc_string(s.clone(), *c); + *c += 1; + } + let remap: HashMap<_, _, BuildHasherDefault>> = + remap.into_iter().collect(); + ( + new_maps, + CatReEnc { + cat_map: Arc::new(remap), + }, + ) + } + + fn encode_new_arc_string(&mut self, s: Arc, u: u32) { + self.map.insert(s.clone(), u); + self.rev_map.insert(u, s); + } + + pub(crate) fn contains_str(&self, s: &str) -> bool { + let arcs = Arc::new(s.to_string()); + self.map.contains_key(&arcs) + } + + pub(crate) fn contains_u32(&self, u: &u32) -> bool { + self.rev_map.contains_key(u) + } + + pub fn is_empty(&self) -> bool { + self.map.is_empty() + } + + pub fn height(&self) -> u32 { + self.map.len() as u32 + } + + pub fn encode_new_string(&mut self, s: String, u: u32) { + let s = Arc::new(s.clone()); + self.map.insert(s.clone(), u); + self.rev_map.insert(u, s); + } + + pub fn maybe_encode_string(&self, s: &str) -> Option<&u32> { + let s = Arc::new(s.to_string()); + self.map.get(&s) + } + + pub fn new_singular(value: &str, u: u32) -> Self { + let mut sing = CatMapsInMemory::new_empty(); + let s = Arc::new(value.to_string()); + sing.map.insert(s.clone(), u); + sing.rev_map.insert(u, s); + sing + } + + pub fn new_empty() -> CatMapsInMemory { + CatMapsInMemory { + map: Default::default(), + rev_map: Default::default(), + } + } + + pub fn get(&self, key: &Arc) -> Option<&u32> { + self.map.get(key) + } + + pub fn counter(&self) -> u32 { + self.rev_map.keys().max().unwrap().clone() + 1 + } + + pub fn get_rev(&self, key: &u32) -> Option<&Arc> { + self.rev_map.get(key) + } + + pub fn decode_batch(&self, v: &[Option]) -> Vec> { + let decoded_vec_iter = v + .into_par_iter() + .map(|x| x.map(|x| self.rev_map.get(&x).unwrap().as_str())); + decoded_vec_iter.collect() + } + + pub fn maybe_decode(&self, u: &u32) -> Option<&str> { + self.rev_map.get(u).map(|x| x.as_str()) + } + + pub fn image(&self, s: &HashSet) -> Option { + let new_map: HashMap<_, _, BuildHasherDefault>> = s + .par_iter() + .map(|x| { + if let Some(s) = self.rev_map.get(x) { + Some((s.clone(), x)) + } else { + None + } + }) + .filter(|x| x.is_some()) + .map(|x| x.unwrap()) + .map(|(x, y)| (*y, x)) + .collect(); + if new_map.is_empty() { + None + } else { + let map = new_map.iter().map(|(x, y)| (y.clone(), *x)).collect(); + Some(Self { + map, + rev_map: new_map, + }) + } + } + + pub fn inner_join_re_enc(&self, other: &CatMapsInMemory) -> Vec<(u32, u32)> { + let renc: Vec<_> = self + .map + .iter() + .map(|(x, l)| { + if let Some(r) = other.maybe_encode_string(x.as_str()) { + if l != r { + Some((*l, *r)) + } else { + None + } + } else { + None + } + }) + .filter(|x| x.is_some()) + .map(|x| x.unwrap()) + .collect(); + renc + } + + pub fn merge(&mut self, other: &CatMapsInMemory, c: &mut u32) -> CatReEnc { + let (remap, insert): (Vec<_>, Vec<_>) = other + .map + .iter() + .map(|(s, u)| { + if let Some(e) = self.map.get(s) { + (Some((*u, *e)), None) + } else { + (None, Some((s.clone(), u))) + } + }) + .unzip(); + let mut numbered_insert = Vec::new(); + let mut new_remap = Vec::new(); + for k in insert { + if let Some((s, u)) = k { + numbered_insert.push((s, *c)); + new_remap.push((*u, *c)); + *c += 1; + } + } + for (s, u) in numbered_insert { + self.encode_new_arc_string(s.clone(), u); + } + let remap: HashMap<_, _, BuildHasherDefault>> = remap + .into_iter() + .filter(|x| x.is_some()) + .map(|x| x.unwrap()) + .chain(new_remap.into_iter()) + .collect(); + let reenc = CatReEnc { + cat_map: Arc::new(remap), + }; + reenc + } +} diff --git a/lib/representation/src/cats/maps/on_disk.rs b/lib/representation/src/cats/maps/on_disk.rs new file mode 100644 index 00000000..59c707a6 --- /dev/null +++ b/lib/representation/src/cats/maps/on_disk.rs @@ -0,0 +1,25 @@ +use std::collections::HashSet; + +#[derive(Debug, Clone)] +pub struct CatMapsOnDisk { + map: (), + rev_map: (), +} + +impl CatMapsOnDisk { + pub(crate) fn decode_batch(&self, us: &[Option]) -> Vec> { + todo!() + } +} + +impl CatMapsOnDisk { + pub(crate) fn image(&self, s: &HashSet) -> Option { + todo!() + } +} + +impl CatMapsOnDisk { + pub(crate) fn maybe_decode_string(&self, p0: &u32) -> Option<&str> { + todo!() + } +} diff --git a/lib/representation/src/cats/re_encode.rs b/lib/representation/src/cats/re_encode.rs index 9e086510..d842f28c 100644 --- a/lib/representation/src/cats/re_encode.rs +++ b/lib/representation/src/cats/re_encode.rs @@ -6,9 +6,10 @@ use nohash_hasher::NoHashHasher; use polars::datatypes::PlSmallStr; use polars::error::PolarsResult; use polars::prelude::{col, Column, IntoColumn, IntoLazy, LazyFrame, Series}; -use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator}; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; use std::collections::HashMap; use std::hash::BuildHasherDefault; +use std::path::Path; use std::sync::Arc; #[derive(Debug, Clone)] @@ -53,24 +54,7 @@ impl CatReEnc { impl CatEncs { pub fn inner_join_re_enc(&self, other: &CatEncs) -> Vec<(u32, u32)> { - let renc: Vec<_> = self - .map - .iter() - .map(|(x, l)| { - if let Some(r) = other.maybe_encode_str(x) { - if l != r { - Some((*l, *r)) - } else { - None - } - } else { - None - } - }) - .filter(|x| x.is_some()) - .map(|x| x.unwrap()) - .collect(); - renc + self.maps.inner_join_re_enc(&other.maps) } } @@ -104,66 +88,25 @@ impl Cats { pub fn merge( &mut self, other_cats: Vec, + path: Option<&Path>, ) -> HashMap> { let mut map = HashMap::new(); for c in other_cats { let c = c.read().unwrap(); let mut other_map = HashMap::new(); for (t, other_enc) in c.cat_map.iter() { - let mut c = self.get_counter(&t); + let mut counter = self.get_counter(t); if let Some(enc) = self.cat_map.get_mut(t) { - let (remap, insert): (Vec<_>, Vec<_>) = other_enc - .map - .par_iter() - .map(|(s, u)| { - if let Some(e) = enc.map.get(s) { - (Some((*u, *e)), None) - } else { - (None, Some((s.clone(), *u))) - } - }) - .unzip(); - let mut numbered_insert = Vec::new(); - let mut new_remap = Vec::new(); - for k in insert { - if let Some((s, u)) = k { - numbered_insert.push((s, c)); - new_remap.push((u, c)); - c += 1; - } - } - for (s, u) in numbered_insert { - enc.encode_new_arc_string(s.clone(), u); - } - let remap: HashMap<_, _, BuildHasherDefault>> = remap - .into_iter() - .filter(|x| x.is_some()) - .map(|x| x.unwrap()) - .chain(new_remap.into_iter()) - .collect(); - let reenc = CatReEnc { - cat_map: Arc::new(remap), - }; - other_map.insert(t.clone(), reenc); + let re_enc = enc.merge(other_enc, &mut counter); + other_map.insert(t.clone(), re_enc); } else { - let mut remap = Vec::with_capacity(other_enc.map.len()); - let mut new_enc = CatEncs::new_empty(); - for (s, v) in other_enc.map.iter() { - remap.push((*v, c)); - new_enc.encode_new_str(s, c); - c += 1; - } + let (new_enc, re_enc) = CatEncs::new_remap(other_enc, path, &mut counter); + self.cat_map.insert(t.clone(), new_enc); - let remap: HashMap<_, _, BuildHasherDefault>> = - remap.into_iter().collect(); - other_map.insert( - t.clone(), - CatReEnc { - cat_map: Arc::new(remap), - }, - ); + + other_map.insert(t.clone(), re_enc); } - self.set_counter(c, t); + self.set_counter(counter, t); } if !other_map.is_empty() { map.insert(c.uuid.clone(), other_map); diff --git a/lib/triplestore/src/cats.rs b/lib/triplestore/src/cats.rs index f345ece6..b0b402a3 100644 --- a/lib/triplestore/src/cats.rs +++ b/lib/triplestore/src/cats.rs @@ -5,8 +5,14 @@ impl Triplestore { pub fn globalize(&mut self, cat_triples: Vec) -> Vec { let mut mutcat = self.global_cats.write().unwrap(); let cat_triples = { - let cat_triples = mutcat.globalize(cat_triples); - mutcat.encode_predicates(&cat_triples); + let cat_triples = mutcat.globalize( + cat_triples, + self.storage_folder.as_ref().map(|x| x.as_ref()), + ); + mutcat.encode_predicates( + &cat_triples, + self.storage_folder.as_ref().map(|x| x.as_ref()), + ); cat_triples }; cat_triples diff --git a/lib/triplestore/src/lib.rs b/lib/triplestore/src/lib.rs index 61a47d4b..7f25954d 100644 --- a/lib/triplestore/src/lib.rs +++ b/lib/triplestore/src/lib.rs @@ -254,7 +254,11 @@ impl Triplestore { graph: &NamedGraph, ) -> Result, TriplestoreError> { let prepare_triples_now = Instant::now(); - let dfs_to_add = prepare_add_triples_par(ts, self.global_cats.clone()); + let dfs_to_add = prepare_add_triples_par( + ts, + self.global_cats.clone(), + self.storage_folder.as_ref().map(|x| x.as_ref()), + ); trace!( "Preparing triples took {} seconds", prepare_triples_now.elapsed().as_secs_f32() @@ -454,6 +458,7 @@ struct TriplesToAddPartitionedPredicate { pub fn prepare_add_triples_par( mut ts: Vec, global_cats: LockedCats, + path: Option<&Path>, ) -> Vec { let mut all_partitioned: Vec = ts .par_drain(..) @@ -528,6 +533,7 @@ pub fn prepare_add_triples_par( subject_cat_state, object_cat_state, &cats, + path, ) }, ) diff --git a/lib/triplestore/src/sparql.rs b/lib/triplestore/src/sparql.rs index 0cd44822..f84aaedb 100644 --- a/lib/triplestore/src/sparql.rs +++ b/lib/triplestore/src/sparql.rs @@ -27,6 +27,7 @@ use query_processing::graph_patterns::unique_workaround; use query_processing::pushdowns::Pushdowns; use representation::cats::{Cats, LockedCats}; use representation::dataset::{NamedGraph, QueryGraph}; +use representation::debug::DebugOutput; use representation::polars_to_rdf::{df_as_result, QuerySolutions}; use representation::query_context::Context; use representation::rdf_to_polars::{ @@ -44,7 +45,6 @@ use spargebra::term::{ }; use spargebra::{GraphUpdateOperation, Query, Update}; use std::collections::{HashMap, HashSet}; -use representation::debug::DebugOutput; pub struct QueryResult { pub kind: QueryResultKind, @@ -682,7 +682,8 @@ fn named_node_pattern_expr( } fn named_node_u32_lit(nn: &NamedNode, name: &str, global_cats: &Cats) -> (Expr, RDFNodeState) { - let (u, state) = global_cats.encode_iri_or_local_cat(nn.as_str()); + // No disk based storage of local cat, hence path is None + let (u, state) = global_cats.encode_iri_or_local_cat(nn.as_str(), None); (lit(u).cast(DataType::UInt32).alias(name), state) } diff --git a/lib/triplestore/src/sparql/delete.rs b/lib/triplestore/src/sparql/delete.rs index 0ccc8c49..70b0a094 100644 --- a/lib/triplestore/src/sparql/delete.rs +++ b/lib/triplestore/src/sparql/delete.rs @@ -13,6 +13,7 @@ use representation::multitype::{set_struct_all_null_to_null_row, split_df_multic use representation::solution_mapping::EagerSolutionMappings; use representation::{BaseRDFNodeType, OBJECT_COL_NAME, PREDICATE_COL_NAME, SUBJECT_COL_NAME}; use std::collections::HashMap; +use std::path::Path; use std::time::Instant; use tracing::trace; @@ -28,7 +29,11 @@ impl Triplestore { .map(|(x, y)| partition_by_global_predicate_col(x, y, &cats)) .flatten() .collect(); - let global_cat_triples = triples_solution_mappings_to_global_cat_triples(sms, &cats); + let global_cat_triples = triples_solution_mappings_to_global_cat_triples( + sms, + &cats, + self.storage_folder.as_ref().map(|x| x.as_ref()), + ); drop(cats); if !global_cat_triples.is_empty() { self.delete_triples_vec(global_cat_triples, graph) @@ -169,6 +174,7 @@ fn partition_by_global_predicate_col( fn triples_solution_mappings_to_global_cat_triples( sm_preds: Vec<(EagerSolutionMappings, NamedNode)>, global_cats: &Cats, + storage_path: Option<&Path>, ) -> Vec { let mappings_maps_preds: Vec<_> = sm_preds .into_par_iter() @@ -221,6 +227,7 @@ fn triples_solution_mappings_to_global_cat_triples( subject_state, object_state, global_cats, + storage_path, ); e }) diff --git a/lib/triplestore/src/sparql/lazy_graph_patterns/pvalues.rs b/lib/triplestore/src/sparql/lazy_graph_patterns/pvalues.rs index 5417c5a0..6bd8ec45 100644 --- a/lib/triplestore/src/sparql/lazy_graph_patterns/pvalues.rs +++ b/lib/triplestore/src/sparql/lazy_graph_patterns/pvalues.rs @@ -46,7 +46,8 @@ impl Triplestore { todo!("Handle this error") }; let cats = self.global_cats.read().unwrap(); - let mut sm = cats.encode_solution_mappings(sm); + // No disk based storage of local cats if created + let mut sm = cats.encode_solution_mappings(sm, None); sm.mappings.as_single_chunk_par(); if let Some(mut mappings) = solution_mappings { //TODO: Remove this workaround diff --git a/lib/triplestore/src/sparql/lazy_graph_patterns/values.rs b/lib/triplestore/src/sparql/lazy_graph_patterns/values.rs index 9c72a1dc..a44c50c2 100644 --- a/lib/triplestore/src/sparql/lazy_graph_patterns/values.rs +++ b/lib/triplestore/src/sparql/lazy_graph_patterns/values.rs @@ -20,9 +20,10 @@ impl Triplestore { query_settings: &QuerySettings, ) -> Result { let sm = values_pattern(variables, bindings); + // No disk based storage of local cats if they are created let sm = { let cats = self.global_cats.read()?; - cats.encode_solution_mappings(sm) + cats.encode_solution_mappings(sm, None) }; if let Some(mut mappings) = solution_mappings { mappings = join( diff --git a/lib/triplestore/src/storage.rs b/lib/triplestore/src/storage.rs index 2f8796e8..c6383af7 100644 --- a/lib/triplestore/src/storage.rs +++ b/lib/triplestore/src/storage.rs @@ -13,7 +13,7 @@ use polars_core::series::SeriesIter; use polars_core::utils::Container; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use representation::cats::{ - Cats, LockedCats, ReverseLookup, OBJECT_RANK_COL_NAME, SUBJECT_RANK_COL_NAME, + CatEncs, Cats, LockedCats, OBJECT_RANK_COL_NAME, SUBJECT_RANK_COL_NAME, }; use representation::{ BaseRDFNodeType, LANG_STRING_LANG_FIELD, LANG_STRING_VALUE_FIELD, OBJECT_COL_NAME, @@ -134,7 +134,7 @@ impl Triples { let mut i = 0; let now_subjects = Instant::now(); - let maybe_subject_encs = get_rev_map(global_cats.deref(), &self.subject_type); + let maybe_subject_encs = maybe_get_cat_encs(global_cats.deref(), &self.subject_type); let subjects_str = if OFFSET_STEP / 10 * incoming_df.height() < remaining_height { let new_subjects_col = incoming_df.column(SUBJECT_COL_NAME).unwrap(); @@ -430,13 +430,13 @@ fn create_indices( ) -> Result { assert!(df.height() > 0); let now = Instant::now(); - let subject_encs = get_rev_map(cats, subj_type); + let subject_encs = maybe_get_cat_encs(cats, subj_type); //Should already be sorted by subject, object let subject_sparse_index = create_sparse_index( df.column(SUBJECT_COL_NAME) .unwrap() .as_materialized_series(), - subject_encs.as_ref(), + subject_encs, ); trace!( "Creating subject sparse map took {} seconds", @@ -466,10 +466,10 @@ fn create_indices( ) .collect() .unwrap(); - let object_encs = get_rev_map(cats, obj_type); + let object_encs = maybe_get_cat_encs(cats, obj_type); let sparse = create_sparse_index( df.column(OBJECT_COL_NAME).unwrap().as_materialized_series(), - object_encs.as_ref(), + object_encs, ); object_sort = Some(StoredTriples::new( @@ -493,9 +493,9 @@ fn create_indices( }) } -fn get_rev_map<'a>(c: &'a Cats, t: &BaseRDFNodeType) -> Option> { +fn maybe_get_cat_encs<'a>(c: &'a Cats, t: &BaseRDFNodeType) -> Option<&'a CatEncs> { if t.stored_cat() { - Some(c.get_reverse_lookup(t)) + Some(c.get_cat_encs(t)) } else { None } @@ -753,11 +753,11 @@ fn update_index_at_offset( u32_chunked: &UInt32Chunked, offset: usize, sparse_map: &mut BTreeMap, usize>, - rev_map: &ReverseLookup, + cat_encs: &CatEncs, ) { let u = u32_chunked.get(offset); if let Some(u) = u { - let s = rev_map.lookup(&u).unwrap(); + let s = cat_encs.maybe_decode_string(&u).unwrap(); let e = sparse_map.entry(Arc::new(s.to_string())); e.or_insert(offset); } @@ -782,14 +782,14 @@ pub(crate) fn repeated_from_last_row_expr(c: &str) -> Expr { .and(col(c).shift(lit(1)).eq(col(c))) } -fn create_sparse_index(ser: &Series, rev_map: Option<&ReverseLookup>) -> SparseIndex { - match rev_map { +fn create_sparse_index(ser: &Series, cat_encs: Option<&CatEncs>) -> SparseIndex { + match cat_encs { None => create_sparse_string_index(ser), Some(rev_map) => create_sparse_cat_index(ser, rev_map), } } -fn create_sparse_cat_index(ser: &Series, encs: &ReverseLookup) -> SparseIndex { +fn create_sparse_cat_index(ser: &Series, encs: &CatEncs) -> SparseIndex { assert!(!ser.is_empty()); let strch = ser.u32().unwrap(); let mut sparse_map = BTreeMap::new(); @@ -823,7 +823,7 @@ fn create_sparse_string_index(ser: &Series) -> SparseIndex { } //Assumes sorted. -fn create_deduplicated_string_vec<'a>(ser: &Series, rev_map: &'a ReverseLookup) -> Vec<&'a str> { +fn create_deduplicated_string_vec<'a>(ser: &Series, cat_encs: &'a CatEncs) -> Vec<&'a str> { let mut v = Vec::with_capacity(ser.len()); let mut last = None; for any in ser.iter() { @@ -835,7 +835,7 @@ fn create_deduplicated_string_vec<'a>(ser: &Series, rev_map: &'a ReverseLookup) } } last = Some(u); - let s = rev_map.lookup(&u).unwrap(); + let s = cat_encs.maybe_decode_string(&u).unwrap(); v.push(s); } _ => unreachable!("Should never happen"), @@ -872,8 +872,8 @@ fn compact_segments( } } } - let subject_encs = get_rev_map(cats, subj_type); - let object_encs = get_rev_map(cats, obj_type); + let subject_encs = maybe_get_cat_encs(cats, subj_type); + let object_encs = maybe_get_cat_encs(cats, obj_type); let fields = if let BaseRDFNodeType::Literal(nn) = obj_type { if nn.as_ref() == rdf::LANG_STRING { Some((LANG_STRING_VALUE_FIELD, LANG_STRING_LANG_FIELD)) @@ -888,8 +888,8 @@ fn compact_segments( SUBJECT_COL_NAME, OBJECT_COL_NAME, fields, - subject_encs.as_ref(), - object_encs.as_ref(), + subject_encs, + object_encs, )?; let height = compact_subjects.height(); @@ -898,7 +898,7 @@ fn compact_segments( .column(SUBJECT_COL_NAME) .unwrap() .as_materialized_series(), - subject_encs.as_ref(), + subject_encs, ); let stored_subjects = StoredTriples::new(compact_subjects, subj_type, obj_type, storage_folder)?; @@ -909,15 +909,15 @@ fn compact_segments( OBJECT_COL_NAME, SUBJECT_COL_NAME, None, - object_encs.as_ref(), - subject_encs.as_ref(), + object_encs, + subject_encs, )?; let object_index = create_sparse_index( compact_objects .column(OBJECT_COL_NAME) .unwrap() .as_materialized_series(), - object_encs.as_ref(), + object_encs, ); let stored_objects = StoredTriples::new(compact_objects, subj_type, obj_type, storage_folder)?; @@ -942,8 +942,8 @@ fn compact_dataframe_segments( col_a: &str, col_b: &str, b_fields: Option<(&str, &str)>, - a_cat_encs: Option<&ReverseLookup>, - b_cat_encs: Option<&ReverseLookup>, + a_cat_encs: Option<&CatEncs>, + b_cat_encs: Option<&CatEncs>, ) -> Result<(DataFrame, Option), TriplestoreError> { let mut new_segments = vec![]; for (mut df, new) in segments { @@ -1131,8 +1131,8 @@ impl<'a> TripleIterator<'a> { mut a_iterator: SeriesIter<'a>, mut b_1_iterator: SeriesIter<'a>, mut b_2_iterator: Option>, - a_cat_rev_map: Option<&'a ReverseLookup<'a>>, - b_cat_rev_map: Option<&'a ReverseLookup<'a>>, + a_cat_rev_map: Option<&'a CatEncs>, + b_cat_rev_map: Option<&'a CatEncs>, new: bool, ) -> Option { if let Some(a) = a_iterator.next() { @@ -1167,8 +1167,8 @@ impl<'a> TripleIterator<'a> { pub fn next( self, - a_cat_rev_map: Option<&'a ReverseLookup>, - b_cat_rev_map: Option<&'a ReverseLookup>, + a_cat_rev_map: Option<&'a CatEncs>, + b_cat_rev_map: Option<&'a CatEncs>, ) -> Option { let Self { mut a_iterator, @@ -1211,11 +1211,13 @@ impl<'a> TripleIterator<'a> { fn decode_elem<'a>( any_value: &AnyValue<'a>, - reverse_lookup: Option<&'a ReverseLookup>, + reverse_lookup: Option<&'a CatEncs>, ) -> Option> { if let Some(reverse_lookup) = reverse_lookup { if let AnyValue::UInt32(u) = any_value { - Some(AnyValue::String(reverse_lookup.lookup(u).unwrap())) + Some(AnyValue::String( + reverse_lookup.maybe_decode_string(u).unwrap(), + )) } else { unreachable!("Should never happen") } diff --git a/lib/triplestore/src/triples_read.rs b/lib/triplestore/src/triples_read.rs index 4685fd7c..bb85ddef 100644 --- a/lib/triplestore/src/triples_read.rs +++ b/lib/triplestore/src/triples_read.rs @@ -43,6 +43,7 @@ impl Triplestore { parallel: Option, checked: bool, graph: &NamedGraph, + prefixes: &HashMap, ) -> Result<(), TriplestoreError> { let now = Instant::now(); let rdf_format = if let Some(rdf_format) = rdf_format { @@ -68,6 +69,7 @@ impl Triplestore { parallel, checked, graph, + prefixes, )?; drop(map); @@ -88,6 +90,7 @@ impl Triplestore { parallel: Option, checked: bool, graph: &NamedGraph, + prefixes: &HashMap, ) -> Result<(), TriplestoreError> { self.read_triples( s.as_bytes(), @@ -97,6 +100,7 @@ impl Triplestore { parallel, checked, graph, + prefixes, ) } @@ -111,6 +115,7 @@ impl Triplestore { parallel: Option, checked: bool, graph: &NamedGraph, + prefixes: &HashMap, ) -> Result<(), TriplestoreError> { let start_quadproc_now = Instant::now(); let parallel = if let Some(parallel) = parallel { @@ -129,6 +134,9 @@ impl Triplestore { let mut readers = vec![]; if rdf_format == RdfFormat::Turtle { let mut parser = TurtleParser::new(); + for (k, v) in prefixes { + parser = parser.with_prefix(k, v.as_str()).unwrap(); + } if !checked { parser = parser.unchecked(); } diff --git a/py_maplib/maplib/__init__.pyi b/py_maplib/maplib/__init__.pyi index 2a2026c9..007bb9f6 100644 --- a/py_maplib/maplib/__init__.pyi +++ b/py_maplib/maplib/__init__.pyi @@ -356,7 +356,7 @@ class Model: :return: """ - def add_prefixes(self, template: Dict[str, str]): + def add_prefixes(self, prefixes: Dict[str, str]): """ Add prefixes that will be used in parsing of SPARQL, Datalog and OTTR. diff --git a/py_maplib/src/lib.rs b/py_maplib/src/lib.rs index 38f85538..9fda8a26 100644 --- a/py_maplib/src/lib.rs +++ b/py_maplib/src/lib.rs @@ -1285,7 +1285,15 @@ fn insert_sprout_mutex( let global_cats = &inner.triplestore.global_cats; let (mut sms, cats) = { let guard = global_cats.read().unwrap(); - new_solution_mapping_cats(sms, &guard) + new_solution_mapping_cats( + sms, + &guard, + inner + .triplestore + .storage_folder + .as_ref() + .map(|x| x.as_ref()), + ) }; let locked_cats = LockedCats::new(cats); for sm in &mut sms {