From c924739371b029009fcfd0b8dfb13ea95fc5b059 Mon Sep 17 00:00:00 2001 From: Magnus Bakken <10287813+magbak@users.noreply.github.com> Date: Sun, 16 Nov 2025 14:29:49 +0100 Subject: [PATCH 1/5] Disk storage skunkworks --- lib/representation/src/cats.rs | 22 ++++++----- lib/representation/src/cats/decode.rs | 4 +- lib/representation/src/cats/encode.rs | 26 ++++++------- lib/representation/src/cats/forwards.rs | 34 +++++++++++++++++ .../src/cats/forwards/in_memory.rs | 19 ++++++++++ .../src/cats/forwards/on_disk.rs | 4 ++ lib/representation/src/cats/image.rs | 27 ++----------- lib/representation/src/cats/re_encode.rs | 10 ++--- lib/representation/src/cats/reverse.rs | 34 +++++++++++++++++ .../src/cats/reverse/in_memory.rs | 38 +++++++++++++++++++ .../src/cats/reverse/on_disk.rs | 19 ++++++++++ 11 files changed, 185 insertions(+), 52 deletions(-) create mode 100644 lib/representation/src/cats/forwards.rs create mode 100644 lib/representation/src/cats/forwards/in_memory.rs create mode 100644 lib/representation/src/cats/forwards/on_disk.rs create mode 100644 lib/representation/src/cats/reverse.rs create mode 100644 lib/representation/src/cats/reverse/in_memory.rs create mode 100644 lib/representation/src/cats/reverse/on_disk.rs diff --git a/lib/representation/src/cats.rs b/lib/representation/src/cats.rs index ac2ba27b..d687bf59 100644 --- a/lib/representation/src/cats.rs +++ b/lib/representation/src/cats.rs @@ -4,6 +4,8 @@ mod globalize; mod image; mod re_encode; mod split; +pub mod forwards; +pub mod reverse; pub use decode::*; pub use encode::*; @@ -18,12 +20,14 @@ use nohash_hasher::NoHashHasher; use oxrdf::vocab::xsd; use oxrdf::{NamedNode, NamedNodeRef}; use polars::prelude::DataFrame; -use std::collections::{BTreeMap, HashMap}; +use std::collections::{HashMap}; use std::hash::BuildHasherDefault; use std::ops::Deref; use std::sync::Arc; use std::sync::RwLock; use uuid::Uuid; +use crate::cats::forwards::ForwardsCat; +use crate::cats::reverse::ReverseCat; const SUBJECT_PREFIX_COL_NAME: &str = "subject_prefix"; const OBJECT_PREFIX_COL_NAME: &str = "object_prefix"; @@ -73,8 +77,8 @@ pub struct EncodedTriples { #[derive(Debug, Clone)] pub struct CatEncs { // We use a BTree map to keep strings sorted - pub map: BTreeMap, u32>, - pub rev_map: Option, BuildHasherDefault>>>, + pub forward: ForwardsCat, + pub reverse: Option, } #[derive(Debug, Clone)] @@ -148,7 +152,7 @@ pub struct Cats { blank_counter: u32, literal_counter_map: HashMap, pub uuid: String, - pub rev_iri_suffix_map: HashMap, BuildHasherDefault>>, + pub rev_iri_suffix_map: ForwardsCat, belongs_prefix_map: HashMap>>, prefix_map: HashMap, prefix_rev_map: HashMap, @@ -190,7 +194,7 @@ impl Cats { } else { panic!(); }; - self.cat_map.get(&ct).unwrap().rev_map.as_ref().unwrap() + self.cat_map.get(&ct).unwrap().reverse.as_ref().unwrap() } } @@ -213,9 +217,9 @@ impl Cats { cats.prefix_map.insert(i, nn.clone()); cats.prefix_rev_map.insert(nn.clone(), i); cats.rev_iri_suffix_map - .extend(cat_enc.map.iter().map(|(x, y)| (*y, x.clone()))); + .extend(cat_enc.forward.iter().map(|(x, y)| (*y, x.clone()))); cats.belongs_prefix_map - .extend(cat_enc.map.values().map(|x| (*x, i))); + .extend(cat_enc.forward.values().map(|x| (*x, i))); i += 1; } } @@ -254,7 +258,7 @@ impl Cats { let mut counter = 0; if let Some(enc) = self.cat_map.get(&CatType::Blank) { counter = cmp::max( - enc.rev_map.as_ref().unwrap().keys().max().unwrap() + 1, + enc.reverse.as_ref().unwrap().keys().max().unwrap() + 1, counter, ); } @@ -270,7 +274,7 @@ impl Cats { let mut map = HashMap::new(); for (p, cat) in &self.cat_map { if let CatType::Literal(nn) = p { - let counter = cat.rev_map.as_ref().unwrap().keys().max().unwrap() + 1; + let counter = cat.reverse.as_ref().unwrap().keys().max().unwrap() + 1; map.insert(nn.clone(), counter); } } diff --git a/lib/representation/src/cats/decode.rs b/lib/representation/src/cats/decode.rs index 41fff692..cb3d1c65 100644 --- a/lib/representation/src/cats/decode.rs +++ b/lib/representation/src/cats/decode.rs @@ -20,7 +20,7 @@ impl CatEncs { let uch: Vec<_> = ser.u32().unwrap().iter().collect(); let decoded_vec_iter = uch .into_par_iter() - .map(|x| x.map(|x| self.rev_map.as_ref().unwrap().get(&x).unwrap())); + .map(|x| x.map(|x| self.reverse.as_ref().unwrap().get(&x).unwrap())); let decoded_vec: Vec<_> = decoded_vec_iter.map(|x| x.map(|x| x.as_str())).collect(); let new_ser = Series::new(original_name, decoded_vec); @@ -34,7 +34,7 @@ impl CatEncs { } pub fn maybe_decode_non_iri_string(&self, u: &u32) -> Option<&str> { - self.rev_map.as_ref().unwrap().get(u).map(|x| x.as_str()) + self.reverse.as_ref().unwrap().get(u).map(|x| x.as_str()) } } diff --git a/lib/representation/src/cats/encode.rs b/lib/representation/src/cats/encode.rs index da5c596f..ea4d8b20 100644 --- a/lib/representation/src/cats/encode.rs +++ b/lib/representation/src/cats/encode.rs @@ -23,21 +23,21 @@ impl CatEncs { )) }; CatEncs { - map: Default::default(), - rev_map, + forward: Default::default(), + reverse: rev_map, } } pub fn contains_key(&self, s: &str) -> bool { let s = s.to_string(); - self.map.contains_key(&s) + self.forward.contains_key(&s) } pub fn new_singular(value: &str, u: u32, is_iri: bool) -> CatEncs { let mut sing = Self::new_empty(is_iri); let s = Arc::new(value.to_string()); - sing.map.insert(s.clone(), u); - if let Some(rev_map) = &mut sing.rev_map { + sing.forward.insert(s.clone(), u); + if let Some(rev_map) = &mut sing.reverse { rev_map.insert(u, s); } sing @@ -45,7 +45,7 @@ impl CatEncs { pub fn maybe_encode_str(&self, s: &str) -> Option<&u32> { let s = Arc::new(s.to_string()); - self.map.get(&s) + self.forward.get(&s) } pub fn encode_new_str(&mut self, s: &str, u: u32) { @@ -54,21 +54,21 @@ impl CatEncs { pub fn encode_new_string(&mut self, s: String, u: u32) { let s = Arc::new(s.clone()); - self.map.insert(s.clone(), u); - if let Some(rev_map) = &mut self.rev_map { + self.forward.insert(s.clone(), u); + if let Some(rev_map) = &mut self.reverse { rev_map.insert(u, s); } } pub fn encode_new_arc_string(&mut self, s: Arc, u: u32) { - self.map.insert(s.clone(), u); - if let Some(rev_map) = &mut self.rev_map { + self.forward.insert(s.clone(), u); + if let Some(rev_map) = &mut self.reverse { rev_map.insert(u, s); } } pub fn height(&self) -> u32 { - self.map.len() as u32 + self.forward.len() as u32 } } @@ -256,7 +256,7 @@ impl Cats { let cats = if !new_enc_map.is_empty() { let mut keep_new_enc_map = HashMap::new(); for (u, enc) in new_enc_map { - if !enc.map.is_empty() { + if !enc.forward.is_empty() { keep_new_enc_map.insert( CatType::Prefix(NamedNode::new_unchecked(rev_map.get(&u).unwrap())), enc, @@ -313,7 +313,7 @@ impl Cats { }; encoded_global_local.push(encoded); } - let local = if !new_enc.map.is_empty() { + let local = if !new_enc.forward.is_empty() { let cat_type = if let BaseRDFNodeType::Literal(nn) = t { CatType::Literal(nn.clone()) } else { diff --git a/lib/representation/src/cats/forwards.rs b/lib/representation/src/cats/forwards.rs new file mode 100644 index 00000000..b3e381d0 --- /dev/null +++ b/lib/representation/src/cats/forwards.rs @@ -0,0 +1,34 @@ +mod in_memory; +mod on_disk; + +use std::collections::HashSet; +use std::sync::Arc; +use crate::cats::forwards::in_memory::ForwardsCatInMemory; +use crate::cats::forwards::on_disk::ForwardsCatOnDisk; +use crate::cats::reverse::ReverseCat; + +#[derive(Debug, Clone)] +pub enum ForwardsCat { + ForwardCatInMemory(ForwardsCatInMemory), + ForwardCatOnDisk(ForwardsCatOnDisk), +} + +impl ForwardsCat { + pub fn image(&self, rev_img: &ReverseCat) -> Self { + match self { + ForwardsCat::ForwardCatInMemory(m) => { + ForwardsCat::ForwardCatInMemory(m.image(rev_img)) + } + ForwardsCat::ForwardCatOnDisk(d) => {} + } + } + fn lookup(&self, key:&u32) -> Option<&str> { + todo!() + } + fn batch_lookup(&self, keys: &[u32]) -> Vec> { + todo!() + } + fn batch_insert(&self, kvs: &[(u32, Arc)]) -> u32 { + todo!() + } +} \ No newline at end of file diff --git a/lib/representation/src/cats/forwards/in_memory.rs b/lib/representation/src/cats/forwards/in_memory.rs new file mode 100644 index 00000000..a7dca24b --- /dev/null +++ b/lib/representation/src/cats/forwards/in_memory.rs @@ -0,0 +1,19 @@ +use std::collections::{BTreeMap, HashMap, HashSet}; +use std::hash::BuildHasherDefault; +use std::sync::Arc; +use nohash_hasher::NoHashHasher; +use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; +use crate::cats::forwards::ForwardsCat; +use crate::cats::reverse::ReverseCat; + +#[derive(Debug, Clone)] +pub struct ForwardsCatInMemory { + map: HashMap, BuildHasherDefault>> +} + +impl ForwardsCatInMemory { + pub fn image(&self, rev_img: &ReverseCat) -> ForwardsCatInMemory { + let fw = Self::from(rev_img.iter().map(|(x, y)| (y,x))); + fw + } +} \ No newline at end of file diff --git a/lib/representation/src/cats/forwards/on_disk.rs b/lib/representation/src/cats/forwards/on_disk.rs new file mode 100644 index 00000000..8860960e --- /dev/null +++ b/lib/representation/src/cats/forwards/on_disk.rs @@ -0,0 +1,4 @@ +#[derive(Debug, Clone)] +pub struct ForwardsCatOnDisk { + +} \ No newline at end of file diff --git a/lib/representation/src/cats/image.rs b/lib/representation/src/cats/image.rs index 0242f525..49e40562 100644 --- a/lib/representation/src/cats/image.rs +++ b/lib/representation/src/cats/image.rs @@ -152,7 +152,7 @@ impl Cats { for (k, map) in mmap { let prefix = self.prefix_map.get(&k).unwrap(); let ct = CatType::Prefix(prefix.clone()); - encs.insert(ct, CatEncs { map, rev_map: None }); + encs.insert(ct, CatEncs { forward: map, reverse: None }); } } else { let ct = if let BaseRDFNodeType::Literal(nn) = t { @@ -175,28 +175,9 @@ impl Cats { impl CatEncs { pub fn image_of_non_iri(&self, s: &HashSet) -> Option { - let rev_map_ref = self.rev_map.as_ref().unwrap(); - let new_map: BTreeMap<_, _> = s - .par_iter() - .map(|x| { - if let Some(s) = rev_map_ref.get(x) { - Some((*x, s.clone())) - } else { - None - } - }) - .filter(|x| x.is_some()) - .map(|x| x.unwrap()) - .map(|(x, y)| (y, x)) - .collect(); - let new_rev_map: HashMap<_, _, BuildHasherDefault>> = - new_map.iter().map(|(x, y)| (*y, x.clone())).collect(); - - if new_map.len() > 0 { - Some(CatEncs { - map: new_map, - rev_map: Some(new_rev_map), - }) + if let Some(new_reverse) = self.reverse.as_ref().unwrap().image(s) { + let forward = self.forward.image(&new_reverse); + Some(Self { forward, reverse: Some(new_reverse) }) } else { None } diff --git a/lib/representation/src/cats/re_encode.rs b/lib/representation/src/cats/re_encode.rs index 6ea59c71..81f09289 100644 --- a/lib/representation/src/cats/re_encode.rs +++ b/lib/representation/src/cats/re_encode.rs @@ -53,7 +53,7 @@ impl CatReEnc { impl CatEncs { pub fn inner_join_re_enc(&self, other: &CatEncs) -> Vec<(u32, u32)> { let renc: Vec<_> = self - .map + .forward .iter() .map(|(x, l)| { if let Some(r) = other.maybe_encode_str(x) { @@ -125,10 +125,10 @@ impl Cats { let mut c = self.get_counter(&t); if let Some(enc) = self.cat_map.get_mut(t) { let (remap, insert): (Vec<_>, Vec<_>) = other_enc - .map + .forward .iter() .map(|(s, u)| { - if let Some(e) = enc.map.get(s) { + if let Some(e) = enc.forward.get(s) { (Some((*u, *e)), None) } else { (None, Some((s.clone(), *u))) @@ -162,9 +162,9 @@ impl Cats { }; other_map.insert(t.clone(), reenc); } else { - let mut remap = Vec::with_capacity(other_enc.map.len()); + let mut remap = Vec::with_capacity(other_enc.forward.len()); let mut new_enc = CatEncs::new_empty(matches!(t, CatType::Prefix(..))); - for (s, v) in other_enc.map.iter() { + for (s, v) in other_enc.forward.iter() { remap.push((*v, c)); new_enc.encode_new_str(s, c); if let Some(prefix_u) = &prefix_u { diff --git a/lib/representation/src/cats/reverse.rs b/lib/representation/src/cats/reverse.rs new file mode 100644 index 00000000..04954d37 --- /dev/null +++ b/lib/representation/src/cats/reverse.rs @@ -0,0 +1,34 @@ +mod in_memory; +mod on_disk; + +use std::collections::HashSet; +use std::sync::Arc; +use crate::cats::reverse::in_memory::ReverseCatInMemory; +use crate::cats::reverse::on_disk::ReverseCatOnDisk; + +#[derive(Debug, Clone)] +pub enum ReverseCat { + ReverseCatInMemory(ReverseCatInMemory), + ReverseCatOnDisk(ReverseCatOnDisk), +} + +impl ReverseCat { + pub fn from)>>(iter:T, on_disk:bool) -> Self { + if on_disk { + ReverseCat::ReverseCatOnDisk(ReverseCatOnDisk::from(iter)) + } else { + ReverseCat::ReverseCatInMemory(ReverseCatInMemory::from(iter)) + } + } + + pub fn image(&self, s: &HashSet) -> Option { + match self { + ReverseCat::ReverseCatInMemory(m) => { + m.image(s).map(|x|ReverseCat::ReverseCatInMemory(x)) + } + ReverseCat::ReverseCatOnDisk(d) => { + d.image(s).map(|x|ReverseCat::ReverseCatOnDisk(x)) + } + } + } +} \ No newline at end of file diff --git a/lib/representation/src/cats/reverse/in_memory.rs b/lib/representation/src/cats/reverse/in_memory.rs new file mode 100644 index 00000000..a4811138 --- /dev/null +++ b/lib/representation/src/cats/reverse/in_memory.rs @@ -0,0 +1,38 @@ +use nohash_hasher::NoHashHasher; +use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; +use std::collections::{BTreeMap, HashMap, HashSet}; +use std::hash::BuildHasherDefault; +use std::sync::Arc; + +#[derive(Debug, Clone)] +pub struct ReverseCatInMemory { + rev_map: HashMap, BuildHasherDefault>>, +} + +impl ReverseCatInMemory { + pub(crate) fn image(&self, s: &HashSet) -> Option { + let new_map: HashMap<_, _, BuildHasherDefault>> = s + .par_iter() + .map(|x| { + if let Some(s) = self.rev_map.get(x) { + Some((s.clone(), x)) + } else { + None + } + }) + .filter(|x| x.is_some()) + .map(|x| x.unwrap()) + .map(|(x, y)| (*y, x)) + .collect(); + if new_map.is_empty() { + None + } else { + Some(Self { rev_map: new_map }) + } + } + + pub fn from)>>(iter: T) -> Self { + let rev_map: HashMap<_, _, BuildHasherDefault>> = iter.collect(); + ReverseCatInMemory { rev_map } + } +} diff --git a/lib/representation/src/cats/reverse/on_disk.rs b/lib/representation/src/cats/reverse/on_disk.rs new file mode 100644 index 00000000..d5a4df6f --- /dev/null +++ b/lib/representation/src/cats/reverse/on_disk.rs @@ -0,0 +1,19 @@ +use std::collections::HashSet; +use std::sync::Arc; + +#[derive(Debug, Clone)] +pub struct ReverseCatOnDisk { + +} + +impl ReverseCatOnDisk { + pub(crate) fn image(&self, p0: &HashSet) -> Option { + todo!() + } +} + +impl ReverseCatOnDisk { + pub fn from)>>(iter:T) -> Self { + todo!() + } +} \ No newline at end of file From 0c3f5240657fda395804505c048bd17c73dea785 Mon Sep 17 00:00:00 2001 From: Magnus Bakken <10287813+magbak@users.noreply.github.com> Date: Mon, 1 Dec 2025 19:39:41 +0100 Subject: [PATCH 2/5] Further on disk --- lib/representation/src/cats.rs | 9 ++- lib/representation/src/cats/forwards.rs | 48 ++++++++++++++-- .../src/cats/forwards/in_memory.rs | 31 ++++++++-- .../src/cats/forwards/on_disk.rs | 36 +++++++++++- lib/representation/src/cats/image.rs | 2 +- lib/representation/src/cats/re_encode.rs | 24 ++++---- lib/representation/src/cats/reverse.rs | 56 ++++++++++++++++++- .../src/cats/reverse/in_memory.rs | 29 ++++++++++ .../src/cats/reverse/on_disk.rs | 31 +++++++++- 9 files changed, 235 insertions(+), 31 deletions(-) diff --git a/lib/representation/src/cats.rs b/lib/representation/src/cats.rs index e3c3738b..852c868b 100644 --- a/lib/representation/src/cats.rs +++ b/lib/representation/src/cats.rs @@ -3,7 +3,6 @@ mod encode; mod globalize; mod image; mod re_encode; -mod split; pub mod forwards; pub mod reverse; @@ -86,7 +85,7 @@ pub struct EncodedTriples { pub struct CatEncs { // We use a BTree map to keep strings sorted pub forward: ForwardsCat, - pub reverse: Option, + pub reverse: ReverseCat, } #[derive(Debug, Clone)] @@ -244,7 +243,7 @@ impl Cats { fn calc_new_blank_counter(&self) -> u32 { let mut counter = 0; if let Some(enc) = self.cat_map.get(&CatType::Blank) { - counter = cmp::max(enc.rev_map.keys().max().unwrap() + 1, counter); + counter = cmp::max(enc.reverse.counter() + 1, counter); } counter } @@ -252,7 +251,7 @@ impl Cats { fn calc_new_iri_counter(&self) -> u32 { let mut counter = 0; if let Some(enc) = self.cat_map.get(&CatType::IRI) { - counter = cmp::max(enc.rev_map.keys().max().unwrap() + 1, counter); + counter = cmp::max(enc.reverse.counter() + 1, counter); } counter } @@ -261,7 +260,7 @@ impl Cats { let mut map = HashMap::new(); for (p, cat) in &self.cat_map { if let CatType::Literal(nn) = p { - let counter = cat.rev_map.keys().max().unwrap() + 1; + let counter = cat.reverse.counter() + 1; map.insert(nn.clone(), counter); } } diff --git a/lib/representation/src/cats/forwards.rs b/lib/representation/src/cats/forwards.rs index b3e381d0..6521426e 100644 --- a/lib/representation/src/cats/forwards.rs +++ b/lib/representation/src/cats/forwards.rs @@ -1,10 +1,9 @@ mod in_memory; mod on_disk; -use std::collections::HashSet; use std::sync::Arc; -use crate::cats::forwards::in_memory::ForwardsCatInMemory; -use crate::cats::forwards::on_disk::ForwardsCatOnDisk; +use crate::cats::forwards::in_memory::{ForwardsCatInMemory, ForwardsCatInMemoryIterator}; +use crate::cats::forwards::on_disk::{ForwardsCatOnDisk, ForwardsCatOnDiskIterator}; use crate::cats::reverse::ReverseCat; #[derive(Debug, Clone)] @@ -19,7 +18,9 @@ impl ForwardsCat { ForwardsCat::ForwardCatInMemory(m) => { ForwardsCat::ForwardCatInMemory(m.image(rev_img)) } - ForwardsCat::ForwardCatOnDisk(d) => {} + ForwardsCat::ForwardCatOnDisk(d) => { + ForwardsCat::ForwardCatOnDisk(d.image(rev_img)) + } } } fn lookup(&self, key:&u32) -> Option<&str> { @@ -31,4 +32,43 @@ impl ForwardsCat { fn batch_insert(&self, kvs: &[(u32, Arc)]) -> u32 { todo!() } + + pub(crate) fn get(&self, key:&Arc) -> Option<&u32> { + match self { + ForwardsCat::ForwardCatInMemory(mem) => { + mem.get(key) + } + ForwardsCat::ForwardCatOnDisk(disk) => { + disk.get(key) + } + } + } + + pub fn iter(&self) -> ForwardsCatIterator { + match self { + ForwardsCat::ForwardCatInMemory(mem) => { ForwardsCatIterator::InMemory(mem.iter())} + ForwardsCat::ForwardCatOnDisk(disk) => { ForwardsCatIterator::OnDisk(disk.iter())} + } + } +} + +enum ForwardsCatIterator<'a> { + InMemory(ForwardsCatInMemoryIterator<'a>), + OnDisk(ForwardsCatOnDiskIterator<'a>), +} + +impl Iterator for ForwardsCatIterator<'_> { + type Item = (Arc, u32); + + + fn next(&mut self) -> Option { + match self { + ForwardsCatIterator::InMemory(inmem) => { + inmem.next() + } + ForwardsCatIterator::OnDisk(ondisk) => { + ondisk.next() + } + } + } } \ No newline at end of file diff --git a/lib/representation/src/cats/forwards/in_memory.rs b/lib/representation/src/cats/forwards/in_memory.rs index a7dca24b..3e71eddc 100644 --- a/lib/representation/src/cats/forwards/in_memory.rs +++ b/lib/representation/src/cats/forwards/in_memory.rs @@ -1,14 +1,12 @@ -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::{HashMap}; use std::hash::BuildHasherDefault; use std::sync::Arc; use nohash_hasher::NoHashHasher; -use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; -use crate::cats::forwards::ForwardsCat; use crate::cats::reverse::ReverseCat; #[derive(Debug, Clone)] pub struct ForwardsCatInMemory { - map: HashMap, BuildHasherDefault>> + map: HashMap,u32, BuildHasherDefault>> } impl ForwardsCatInMemory { @@ -16,4 +14,27 @@ impl ForwardsCatInMemory { let fw = Self::from(rev_img.iter().map(|(x, y)| (y,x))); fw } -} \ No newline at end of file + + pub fn iter(&self) -> ForwardsCatInMemoryIterator { + ForwardsCatInMemoryIterator { + fwc: self.map.iter() + } + } + + pub fn get(&self, key: &Arc) -> Option<&u32> { + self.map.get(key) + } +} + +pub struct ForwardsCatInMemoryIterator<'a> { + fwc: std::collections::hash_map::Iter<'a, Arc, u32> +} + +impl ForwardsCatInMemoryIterator<'_> { + pub fn next(&mut self) -> Option<(Arc, u32)> { + match self.fwc.next() { + None => {None} + Some((s,u)) => {Some((s.clone(), *u))} + } + } +} diff --git a/lib/representation/src/cats/forwards/on_disk.rs b/lib/representation/src/cats/forwards/on_disk.rs index 8860960e..a5b19b5f 100644 --- a/lib/representation/src/cats/forwards/on_disk.rs +++ b/lib/representation/src/cats/forwards/on_disk.rs @@ -1,4 +1,38 @@ +use std::sync::Arc; +use crate::cats::reverse::ReverseCat; + #[derive(Debug, Clone)] pub struct ForwardsCatOnDisk { - + map: () +} + +impl ForwardsCatOnDisk { + pub(crate) fn image(&self, rev: &ReverseCat) -> ForwardsCatOnDisk { + todo!() + } +} + +impl ForwardsCatOnDisk { + pub fn get(&self, key: &Arc) -> Option<&u32> { + self.map.get(key) + } + + pub fn iter(&self) -> ForwardsCatOnDiskIterator { + ForwardsCatOnDiskIterator{ + fwc: &() + } + } +} + +pub struct ForwardsCatOnDiskIterator<'a> { + fwc: &'a () +} + +impl ForwardsCatOnDiskIterator<'_> { + pub fn next(&mut self) -> Option<(Arc, u32)> { + match self.fwc.next() { + None => {None} + Some((s,u)) => {Some((s.clone(), *u))} + } + } } \ No newline at end of file diff --git a/lib/representation/src/cats/image.rs b/lib/representation/src/cats/image.rs index 7d7811c0..321c2ea0 100644 --- a/lib/representation/src/cats/image.rs +++ b/lib/representation/src/cats/image.rs @@ -148,7 +148,7 @@ impl CatEncs { let new_map: BTreeMap<_, _> = s .par_iter() .map(|x| { - if let Some(s) = self.rev_map.get(x) { + if let Some(s) = self.reverse.get(x) { Some((*x, s.clone())) } else { None diff --git a/lib/representation/src/cats/re_encode.rs b/lib/representation/src/cats/re_encode.rs index 4afcfc48..10bdae67 100644 --- a/lib/representation/src/cats/re_encode.rs +++ b/lib/representation/src/cats/re_encode.rs @@ -57,9 +57,9 @@ impl CatEncs { .forward .iter() .map(|(x, l)| { - if let Some(r) = other.maybe_encode_str(x) { - if l != r { - Some((*l, *r)) + if let Some(r) = other.maybe_encode_str(x.as_str()) { + if &l != r { + Some((l, *r)) } else { None } @@ -113,13 +113,13 @@ impl Cats { let mut c = self.get_counter(&t); if let Some(enc) = self.cat_map.get_mut(t) { let (remap, insert): (Vec<_>, Vec<_>) = other_enc - .map - .par_iter() + .forward + .iter() .map(|(s, u)| { - if let Some(e) = enc.forward.get(s) { - (Some((*u, *e)), None) + if let Some(e) = enc.forward.get(&s) { + (Some((u, *e)), None) } else { - (None, Some((s.clone(), *u))) + (None, Some((s.clone(), u))) } }) .unzip(); @@ -146,11 +146,11 @@ impl Cats { }; other_map.insert(t.clone(), reenc); } else { - let mut remap = Vec::with_capacity(other_enc.map.len()); + let mut remap = vec![]; let mut new_enc = CatEncs::new_empty(); - for (s, v) in other_enc.map.iter() { - remap.push((*v, c)); - new_enc.encode_new_str(s, c); + for (s, v) in other_enc.forward.iter() { + remap.push((v, c)); + new_enc.encode_new_arc_string(s, c); c += 1; } self.cat_map.insert(t.clone(), new_enc); diff --git a/lib/representation/src/cats/reverse.rs b/lib/representation/src/cats/reverse.rs index 04954d37..32f2fbc8 100644 --- a/lib/representation/src/cats/reverse.rs +++ b/lib/representation/src/cats/reverse.rs @@ -3,8 +3,8 @@ mod on_disk; use std::collections::HashSet; use std::sync::Arc; -use crate::cats::reverse::in_memory::ReverseCatInMemory; -use crate::cats::reverse::on_disk::ReverseCatOnDisk; +use crate::cats::reverse::in_memory::{ReverseCatInMemory, ReverseCatInMemoryIterator}; +use crate::cats::reverse::on_disk::{ReverseCatOnDisk, ReverseCatOnDiskIterator}; #[derive(Debug, Clone)] pub enum ReverseCat { @@ -12,6 +12,30 @@ pub enum ReverseCat { ReverseCatOnDisk(ReverseCatOnDisk), } +impl ReverseCat { + pub(crate) fn counter(&self) -> u32 { + match self { + ReverseCat::ReverseCatInMemory(m) => { + m.counter() + } + ReverseCat::ReverseCatOnDisk(d) => { + d.counter() + } + } + } + + pub fn get(&self, key:&u32) -> Option<&Arc> { + match self { + ReverseCat::ReverseCatInMemory(mem) => { + mem.get(key) + } + ReverseCat::ReverseCatOnDisk(disk) => { + disk.get(key) + } + } + } +} + impl ReverseCat { pub fn from)>>(iter:T, on_disk:bool) -> Self { if on_disk { @@ -31,4 +55,32 @@ impl ReverseCat { } } } + + pub fn iter(&self) -> ReverseCatIterator { + match self { + ReverseCat::ReverseCatInMemory(mem) => {ReverseCatIterator::InMemory(mem.iter())} + ReverseCat::ReverseCatOnDisk(disk) => {ReverseCatIterator::OnDisk(disk.iter())} + } + } +} + +enum ReverseCatIterator<'a> { + InMemory(ReverseCatInMemoryIterator<'a>), + OnDisk(ReverseCatOnDiskIterator<'a>), +} + +impl Iterator for ReverseCatIterator<'_> { + type Item = (u32, Arc); + + + fn next(&mut self) -> Option { + match self { + ReverseCatIterator::InMemory(inmem) => { + inmem.next() + } + ReverseCatIterator::OnDisk(ondisk) => { + ondisk.next() + } + } + } } \ No newline at end of file diff --git a/lib/representation/src/cats/reverse/in_memory.rs b/lib/representation/src/cats/reverse/in_memory.rs index a4811138..6ae63013 100644 --- a/lib/representation/src/cats/reverse/in_memory.rs +++ b/lib/representation/src/cats/reverse/in_memory.rs @@ -9,6 +9,16 @@ pub struct ReverseCatInMemory { rev_map: HashMap, BuildHasherDefault>>, } +impl ReverseCatInMemory { + pub(crate) fn counter(&self) -> u32 { + self.rev_map.keys().max().unwrap().clone() + } + + pub fn get(&self, key: &u32) -> Option<&Arc> { + self.rev_map.get(key) + } +} + impl ReverseCatInMemory { pub(crate) fn image(&self, s: &HashSet) -> Option { let new_map: HashMap<_, _, BuildHasherDefault>> = s @@ -35,4 +45,23 @@ impl ReverseCatInMemory { let rev_map: HashMap<_, _, BuildHasherDefault>> = iter.collect(); ReverseCatInMemory { rev_map } } + + pub fn iter(&self) -> ReverseCatInMemoryIterator { + ReverseCatInMemoryIterator{ + rwc:self.rev_map.iter() + } + } +} + +pub struct ReverseCatInMemoryIterator<'a> { + pub(crate) rwc: std::collections::hash_map::Iter<'a, u32, Arc> } + +impl ReverseCatInMemoryIterator<'_> { + pub fn next(&mut self) -> Option<(u32, Arc)> { + match self.rwc.next() { + None => {None} + Some((u,s)) => {Some((*u, s.clone()))} + } + } +} \ No newline at end of file diff --git a/lib/representation/src/cats/reverse/on_disk.rs b/lib/representation/src/cats/reverse/on_disk.rs index d5a4df6f..95211b31 100644 --- a/lib/representation/src/cats/reverse/on_disk.rs +++ b/lib/representation/src/cats/reverse/on_disk.rs @@ -7,7 +7,13 @@ pub struct ReverseCatOnDisk { } impl ReverseCatOnDisk { - pub(crate) fn image(&self, p0: &HashSet) -> Option { + pub(crate) fn counter(&self) -> u32 { + todo!() + } +} + +impl ReverseCatOnDisk { + pub(crate) fn image(&self, us: &HashSet) -> Option { todo!() } } @@ -16,4 +22,27 @@ impl ReverseCatOnDisk { pub fn from)>>(iter:T) -> Self { todo!() } + + pub fn iter(&self) -> ReverseCatOnDiskIterator { + ReverseCatOnDiskIterator{ + rwc:&() + } + } + + pub fn get(&self, key: &u32) -> Option<&Arc> { + todo!() + } +} + +pub struct ReverseCatOnDiskIterator<'a> { + rwc: &'a () +} + +impl ReverseCatOnDiskIterator<'_> { + pub fn next(&mut self) -> Option<(u32, Arc)> { + match self.rwc.next() { + None => {None} + Some((u,s)) => {Some((*u, s.clone()))} + } + } } \ No newline at end of file From 902816d21100b751129301608657a10eae1701ff Mon Sep 17 00:00:00 2001 From: Magnus Bakken <10287813+magbak@users.noreply.github.com> Date: Sun, 28 Dec 2025 15:58:53 +0100 Subject: [PATCH 3/5] Further on disk --- .gitignore | 1 + lib/query_processing/src/cats.rs | 4 +- lib/query_processing/src/expressions.rs | 10 +- lib/representation/src/cats.rs | 83 ++++---- lib/representation/src/cats/decode.rs | 20 +- lib/representation/src/cats/encode.rs | 74 ++++--- lib/representation/src/cats/forwards.rs | 74 ------- .../src/cats/forwards/in_memory.rs | 40 ---- .../src/cats/forwards/on_disk.rs | 38 ---- lib/representation/src/cats/globalize.rs | 21 +- lib/representation/src/cats/image.rs | 38 +--- lib/representation/src/cats/maps.rs | 175 ++++++++++++++++ lib/representation/src/cats/maps/in_memory.rs | 189 ++++++++++++++++++ lib/representation/src/cats/maps/on_disk.rs | 25 +++ lib/representation/src/cats/re_encode.rs | 81 ++------ lib/representation/src/cats/reverse.rs | 86 -------- .../src/cats/reverse/in_memory.rs | 67 ------- .../src/cats/reverse/on_disk.rs | 48 ----- lib/triplestore/src/cats.rs | 10 +- lib/triplestore/src/lib.rs | 8 +- lib/triplestore/src/sparql.rs | 3 +- lib/triplestore/src/sparql/delete.rs | 9 +- .../src/sparql/lazy_graph_patterns/pvalues.rs | 3 +- .../src/sparql/lazy_graph_patterns/values.rs | 3 +- lib/triplestore/src/storage.rs | 64 +++--- 25 files changed, 585 insertions(+), 589 deletions(-) delete mode 100644 lib/representation/src/cats/forwards.rs delete mode 100644 lib/representation/src/cats/forwards/in_memory.rs delete mode 100644 lib/representation/src/cats/forwards/on_disk.rs create mode 100644 lib/representation/src/cats/maps.rs create mode 100644 lib/representation/src/cats/maps/in_memory.rs create mode 100644 lib/representation/src/cats/maps/on_disk.rs delete mode 100644 lib/representation/src/cats/reverse.rs delete mode 100644 lib/representation/src/cats/reverse/in_memory.rs delete mode 100644 lib/representation/src/cats/reverse/on_disk.rs diff --git a/.gitignore b/.gitignore index e06bf7cd..6ad4f6b0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +.DS_Store .ipynb_checkpoints /old_fun __pycache__ diff --git a/lib/query_processing/src/cats.rs b/lib/query_processing/src/cats.rs index 160eaaa4..cf6bb469 100644 --- a/lib/query_processing/src/cats.rs +++ b/lib/query_processing/src/cats.rs @@ -5,6 +5,7 @@ use representation::{BaseRDFNodeType, RDFNodeState}; use std::collections::{HashMap, HashSet}; use std::hash::BuildHasherDefault; use std::ops::Deref; +use std::path::Path; use std::sync::Arc; pub fn create_compatible_cats( @@ -29,7 +30,8 @@ pub fn create_compatible_cats( if need_native_cat_cast { let locals = find_all_locals(&t, &states); let mut cats = Cats::new_empty(Some(global_cats.read().unwrap().deref())); - let renc_local = cats.merge(locals); + // We do not store local cats to disk + let renc_local = cats.merge(locals, None); let renc_local: HashMap<_, _> = renc_local .into_iter() .map(|(uu, map)| { diff --git a/lib/query_processing/src/expressions.rs b/lib/query_processing/src/expressions.rs index fa8332a9..1578e26f 100644 --- a/lib/query_processing/src/expressions.rs +++ b/lib/query_processing/src/expressions.rs @@ -32,7 +32,8 @@ pub fn named_node_local_enc(nn: &NamedNode, global_cats: &Cats) -> (Expr, Option if let Some(enc) = global_cats.encode_iri_slice(&[nn.as_str()]).pop().unwrap() { (lit(enc).cast(DataType::UInt32), None) } else { - let (enc, local) = Cats::new_singular_iri(nn.as_str(), global_cats.get_iri_counter()); + // We do not store these local cats to disk + let (enc, local) = Cats::new_singular_iri(nn.as_str(), global_cats.get_iri_counter(), None); (lit(enc).cast(DataType::UInt32), Some(local)) } } @@ -46,7 +47,9 @@ pub fn blank_node_local_enc(bl: &BlankNode, global_cats: &Cats) -> (Expr, Option if let Some(enc) = global_cats.encode_blanks(&[bl.as_str()]).pop().unwrap() { (lit(enc).cast(DataType::UInt32), None) } else { - let (enc, local) = Cats::new_singular_blank(bl.as_str(), global_cats.get_iri_counter()); + // Local cats not stored to disk, hence None path + let (enc, local) = + Cats::new_singular_blank(bl.as_str(), global_cats.get_iri_counter(), None); (lit(enc).cast(DataType::UInt32), Some(local)) } } @@ -67,7 +70,8 @@ pub fn maybe_literal_enc(l: &Literal, global_cats: &Cats) -> (Expr, BaseRDFNodeT } else { let dt = l.datatype().into_owned(); let offset = global_cats.get_literal_counter(&dt); - let (enc, local) = Cats::new_singular_literal(l.value(), dt, offset); + // Local cats are not stored to disk, hence None + let (enc, local) = Cats::new_singular_literal(l.value(), dt, offset, None); ( lit(enc).cast(DataType::UInt32), bt, diff --git a/lib/representation/src/cats.rs b/lib/representation/src/cats.rs index 852c868b..292419d5 100644 --- a/lib/representation/src/cats.rs +++ b/lib/representation/src/cats.rs @@ -2,9 +2,8 @@ mod decode; mod encode; mod globalize; mod image; +pub mod maps; mod re_encode; -pub mod forwards; -pub mod reverse; pub use decode::*; pub use encode::*; @@ -13,19 +12,19 @@ pub use image::*; pub use re_encode::*; use std::cmp; +use crate::cats::maps::CatMaps; use crate::BaseRDFNodeType; use nohash_hasher::NoHashHasher; use oxrdf::vocab::xsd; use oxrdf::{NamedNode, NamedNodeRef}; use polars::prelude::DataFrame; -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use std::hash::BuildHasherDefault; use std::ops::Deref; +use std::path::Path; use std::sync::Arc; use std::sync::RwLock; use uuid::Uuid; -use crate::cats::forwards::ForwardsCat; -use crate::cats::reverse::ReverseCat; pub const OBJECT_RANK_COL_NAME: &str = "object_rank"; pub const SUBJECT_RANK_COL_NAME: &str = "subject_rank"; @@ -83,9 +82,33 @@ pub struct EncodedTriples { #[derive(Debug, Clone)] pub struct CatEncs { - // We use a BTree map to keep strings sorted - pub forward: ForwardsCat, - pub reverse: ReverseCat, + pub maps: CatMaps, +} + +impl CatEncs { + pub(crate) fn new_remap(other: &CatEncs, path: Option<&Path>) -> (CatEncs, CatReEnc) { + let (maps, remap) = CatMaps::new_remap(&other.maps, path); + let encs = CatEncs { maps }; + (encs, remap) + } +} + +impl CatEncs { + pub fn contains_str(&self, s: &str) -> bool { + self.maps.contains_str(s) + } + + pub fn contains_u32(&self, u: &u32) -> bool { + self.maps.contains_u32(u) + } + + pub(crate) fn is_empty(&self) -> bool { + self.maps.is_empty() + } + + pub fn merge(&mut self, other: &CatEncs) -> CatReEnc { + self.maps.merge(&other.maps) + } } #[derive(Debug, Clone)] @@ -152,23 +175,6 @@ impl From>> for LockedCats { } } -pub struct ReverseLookup<'a> { - rev_map: &'a RevMap, -} - -impl<'a> ReverseLookup<'a> {} - -impl ReverseLookup<'_> { - pub fn new<'a>(rev_map: &'a RevMap) -> ReverseLookup<'a> { - ReverseLookup { rev_map } - } - - pub fn lookup(&self, u: &u32) -> Option<&str> { - let s = self.rev_map.get(u); - s.map(|x| x.as_str()) - } -} - #[derive(Debug, Clone)] pub struct Cats { pub cat_map: HashMap, @@ -179,29 +185,34 @@ pub struct Cats { } impl Cats { - pub fn new_singular_literal(l: &str, dt: NamedNode, u: u32) -> (u32, Cats) { + pub fn new_singular_literal( + l: &str, + dt: NamedNode, + u: u32, + path: Option<&Path>, + ) -> (u32, Cats) { let t = CatType::Literal(dt); - let catenc = CatEncs::new_singular(l, u); + let catenc = CatEncs::new_singular(l, u, path); (u, Cats::from_map(HashMap::from([(t, catenc)]))) } - pub fn new_singular_blank(s: &str, u: u32) -> (u32, Cats) { + pub fn new_singular_blank(s: &str, u: u32, path: Option<&Path>) -> (u32, Cats) { let t = CatType::Blank; - let catenc = CatEncs::new_singular(s, u); + let catenc = CatEncs::new_singular(s, u, path); (u, Cats::from_map(HashMap::from([(t, catenc)]))) } - pub fn new_singular_iri(s: &str, u: u32) -> (u32, Cats) { + pub fn new_singular_iri(s: &str, u: u32, path: Option<&Path>) -> (u32, Cats) { let t = CatType::IRI; - let catenc = CatEncs::new_singular(s, u); + let catenc = CatEncs::new_singular(s, u, path); (u, Cats::from_map(HashMap::from([(t, catenc)]))) } } impl Cats { - pub fn get_reverse_lookup(&self, bt: &BaseRDFNodeType) -> ReverseLookup<'_> { + pub fn get_cat_encs(&self, bt: &BaseRDFNodeType) -> &CatEncs { let ct = CatType::from_base_rdf_node_type(bt); - ReverseLookup::new(&self.cat_map.get(&ct).unwrap().rev_map) + self.cat_map.get(&ct).unwrap() } pub(crate) fn from_map(cat_map: HashMap) -> Self { @@ -243,7 +254,7 @@ impl Cats { fn calc_new_blank_counter(&self) -> u32 { let mut counter = 0; if let Some(enc) = self.cat_map.get(&CatType::Blank) { - counter = cmp::max(enc.reverse.counter() + 1, counter); + counter = cmp::max(enc.counter() + 1, counter); } counter } @@ -251,7 +262,7 @@ impl Cats { fn calc_new_iri_counter(&self) -> u32 { let mut counter = 0; if let Some(enc) = self.cat_map.get(&CatType::IRI) { - counter = cmp::max(enc.reverse.counter() + 1, counter); + counter = cmp::max(enc.counter() + 1, counter); } counter } @@ -260,7 +271,7 @@ impl Cats { let mut map = HashMap::new(); for (p, cat) in &self.cat_map { if let CatType::Literal(nn) = p { - let counter = cat.reverse.counter() + 1; + let counter = cat.counter() + 1; map.insert(nn.clone(), counter); } } diff --git a/lib/representation/src/cats/decode.rs b/lib/representation/src/cats/decode.rs index b773ffc6..431145ae 100644 --- a/lib/representation/src/cats/decode.rs +++ b/lib/representation/src/cats/decode.rs @@ -8,30 +8,20 @@ use polars::frame::column::ScalarColumn; use polars::frame::DataFrame; use polars::prelude::{col, Column, Expr, IntoColumn, IntoLazy, NamedFrom, SeriesSealed}; use polars::series::Series; -use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator}; +use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use std::borrow::Cow; impl CatEncs { - pub fn decode(&self, ser: &Series) -> Series { + pub fn decode_series(&self, ser: &Series) -> Series { let original_name = ser.name().clone(); let uch: Vec<_> = ser.u32().unwrap().iter().collect(); - let decoded_vec_iter = uch - .into_par_iter() - .map(|x| x.map(|x| self.rev_map.get(&x).unwrap())); - - let decoded_vec: Vec<_> = decoded_vec_iter.map(|x| x.map(|x| x.as_str())).collect(); + let decoded_vec = self.maps.decode_batch(uch.as_ref()); let new_ser = Series::new(original_name, decoded_vec); - // - // let mut new_ser = if let CatType::Prefix(pre) = cat_type { - // Series::from_iter(decoded_ser.map(|x| x.map(|x| format!("{}{}", pre.as_str(), x)))) - // } else { - // Series::from_iter(decoded_ser.map(|x| x.map(|x| x.as_str()))) - // }; new_ser } pub fn maybe_decode_string(&self, u: &u32) -> Option<&str> { - self.rev_map.get(u).map(|x| x.as_str()) + self.maps.maybe_decode(u) } } @@ -68,7 +58,7 @@ impl Cats { pub fn decode_of_type(&self, ser: &Series, bt: &BaseRDFNodeType) -> Series { let ct = CatType::from_base_rdf_node_type(bt); if let Some(enc) = self.cat_map.get(&ct) { - enc.decode(ser) + enc.decode_series(ser) } else { unreachable!("Should never be called when type does not exist") } diff --git a/lib/representation/src/cats/encode.rs b/lib/representation/src/cats/encode.rs index 80344e01..67f3b398 100644 --- a/lib/representation/src/cats/encode.rs +++ b/lib/representation/src/cats/encode.rs @@ -1,4 +1,5 @@ use super::{CatEncs, CatType, Cats, EncodedTriples}; +use crate::cats::maps::CatMaps; use crate::cats::LockedCats; use crate::solution_mapping::{BaseCatState, EagerSolutionMappings}; use crate::{BaseRDFNodeType, RDFNodeState, OBJECT_COL_NAME, SUBJECT_COL_NAME}; @@ -6,34 +7,22 @@ use oxrdf::NamedNode; use polars::frame::DataFrame; use polars::prelude::{col, lit, IntoLazy, Series}; use std::collections::HashMap; -use std::hash::BuildHasherDefault; -use std::sync::Arc; +use std::path::Path; impl CatEncs { - pub fn new_empty() -> CatEncs { - let rev_map = HashMap::with_capacity_and_hasher(2, BuildHasherDefault::default()); + pub fn new_empty(path: Option<&Path>) -> CatEncs { CatEncs { - forward: Default::default(), - reverse: rev_map, + maps: CatMaps::new_empty(path), } } - pub fn contains_key(&self, s: &str) -> bool { - let s = s.to_string(); - self.forward.contains_key(&s) - } - - pub fn new_singular(value: &str, u: u32) -> CatEncs { - let mut sing = Self::new_empty(); - let s = Arc::new(value.to_string()); - sing.map.insert(s.clone(), u); - sing.rev_map.insert(u, s); - sing + pub fn new_singular(value: &str, u: u32, path: Option<&Path>) -> CatEncs { + let maps = CatMaps::new_singular(value, u, path); + CatEncs { maps } } pub fn maybe_encode_str(&self, s: &str) -> Option<&u32> { - let s = Arc::new(s.to_string()); - self.forward.get(&s) + self.maps.maybe_encode_string(s) } pub fn encode_new_str(&mut self, s: &str, u: u32) { @@ -41,23 +30,20 @@ impl CatEncs { } pub fn encode_new_string(&mut self, s: String, u: u32) { - let s = Arc::new(s.clone()); - self.map.insert(s.clone(), u); - self.rev_map.insert(u, s); - } - - pub fn encode_new_arc_string(&mut self, s: Arc, u: u32) { - self.map.insert(s.clone(), u); - self.rev_map.insert(u, s); + self.maps.encode_new_string(s, u); } - pub fn height(&self) -> u32 { - self.forward.len() as u32 + pub fn counter(&self) -> u32 { + self.maps.counter() } } impl Cats { - pub fn encode_solution_mappings(&self, sm: EagerSolutionMappings) -> EagerSolutionMappings { + pub fn encode_solution_mappings( + &self, + sm: EagerSolutionMappings, + path: Option<&Path>, + ) -> EagerSolutionMappings { let EagerSolutionMappings { mappings, mut rdf_node_types, @@ -87,7 +73,7 @@ impl Cats { let encoded: Vec<_> = to_encode .into_iter() .map(|(c, t, ser)| { - let (enc, local) = self.encode_series(&ser, &t); + let (enc, local) = self.encode_series(&ser, &t, path); (c, t, enc, local) }) .collect(); @@ -109,7 +95,12 @@ impl Cats { EagerSolutionMappings::new(mappings.collect().unwrap(), rdf_node_types) } - pub fn encode_series(&self, series: &Series, t: &BaseRDFNodeType) -> (Series, Option) { + pub fn encode_series( + &self, + series: &Series, + t: &BaseRDFNodeType, + path: Option<&Path>, + ) -> (Series, Option) { let original_name = series.name().clone(); let mut use_height = match t { BaseRDFNodeType::IRI => self.iri_counter, @@ -121,7 +112,7 @@ impl Cats { }; let enc = self.get_encs(t).pop(); - let mut new_enc = CatEncs::new_empty(); + let mut new_enc = CatEncs::new_empty(path); let strch = series.str().unwrap(); let encoded_global: Vec<_> = strch .iter() @@ -158,7 +149,7 @@ impl Cats { }; encoded_global_local.push(encoded); } - let local = if !new_enc.map.is_empty() { + let local = if !new_enc.is_empty() { let cat_type = CatType::from_base_rdf_node_type(t); let mut map = HashMap::new(); map.insert(cat_type, new_enc); @@ -212,7 +203,7 @@ impl Cats { u32s } - pub fn encode_iri_or_local_cat(&self, iri: &str) -> (u32, RDFNodeState) { + pub fn encode_iri_or_local_cat(&self, iri: &str, path: Option<&Path>) -> (u32, RDFNodeState) { let mut v = self.encode_iri_slice(&[iri]); if let Some(u) = v.pop().unwrap() { ( @@ -223,7 +214,7 @@ impl Cats { ), ) } else { - let (u, l) = Cats::new_singular_iri(iri, self.iri_counter); + let (u, l) = Cats::new_singular_iri(iri, self.iri_counter, path); ( u, RDFNodeState::from_bases( @@ -234,7 +225,11 @@ impl Cats { } } - pub fn encode_blank_or_local_cat(&self, blank: &str) -> (u32, RDFNodeState) { + pub fn encode_blank_or_local_cat( + &self, + blank: &str, + path: Option<&Path>, + ) -> (u32, RDFNodeState) { let mut v = self.encode_blanks(&[blank]); if let Some(u) = v.pop().unwrap() { ( @@ -245,7 +240,7 @@ impl Cats { ), ) } else { - let (u, l) = Cats::new_singular_blank(blank, self.blank_counter); + let (u, l) = Cats::new_singular_blank(blank, self.blank_counter, path); ( u, RDFNodeState::from_bases( @@ -264,6 +259,7 @@ pub fn encode_triples( subject_cat_state: BaseCatState, object_cat_state: BaseCatState, global_cats: &Cats, + path: Option<&Path>, ) -> (Vec, EncodedTriples) { let mut map = HashMap::new(); map.insert( @@ -274,7 +270,7 @@ pub fn encode_triples( OBJECT_COL_NAME.to_string(), RDFNodeState::from_bases(object_type.clone(), object_cat_state), ); - let mut sm = global_cats.encode_solution_mappings(EagerSolutionMappings::new(df, map)); + let mut sm = global_cats.encode_solution_mappings(EagerSolutionMappings::new(df, map), path); let mut subject_state = sm.rdf_node_types.remove(SUBJECT_COL_NAME).unwrap(); let mut object_state = sm.rdf_node_types.remove(OBJECT_COL_NAME).unwrap(); diff --git a/lib/representation/src/cats/forwards.rs b/lib/representation/src/cats/forwards.rs deleted file mode 100644 index 6521426e..00000000 --- a/lib/representation/src/cats/forwards.rs +++ /dev/null @@ -1,74 +0,0 @@ -mod in_memory; -mod on_disk; - -use std::sync::Arc; -use crate::cats::forwards::in_memory::{ForwardsCatInMemory, ForwardsCatInMemoryIterator}; -use crate::cats::forwards::on_disk::{ForwardsCatOnDisk, ForwardsCatOnDiskIterator}; -use crate::cats::reverse::ReverseCat; - -#[derive(Debug, Clone)] -pub enum ForwardsCat { - ForwardCatInMemory(ForwardsCatInMemory), - ForwardCatOnDisk(ForwardsCatOnDisk), -} - -impl ForwardsCat { - pub fn image(&self, rev_img: &ReverseCat) -> Self { - match self { - ForwardsCat::ForwardCatInMemory(m) => { - ForwardsCat::ForwardCatInMemory(m.image(rev_img)) - } - ForwardsCat::ForwardCatOnDisk(d) => { - ForwardsCat::ForwardCatOnDisk(d.image(rev_img)) - } - } - } - fn lookup(&self, key:&u32) -> Option<&str> { - todo!() - } - fn batch_lookup(&self, keys: &[u32]) -> Vec> { - todo!() - } - fn batch_insert(&self, kvs: &[(u32, Arc)]) -> u32 { - todo!() - } - - pub(crate) fn get(&self, key:&Arc) -> Option<&u32> { - match self { - ForwardsCat::ForwardCatInMemory(mem) => { - mem.get(key) - } - ForwardsCat::ForwardCatOnDisk(disk) => { - disk.get(key) - } - } - } - - pub fn iter(&self) -> ForwardsCatIterator { - match self { - ForwardsCat::ForwardCatInMemory(mem) => { ForwardsCatIterator::InMemory(mem.iter())} - ForwardsCat::ForwardCatOnDisk(disk) => { ForwardsCatIterator::OnDisk(disk.iter())} - } - } -} - -enum ForwardsCatIterator<'a> { - InMemory(ForwardsCatInMemoryIterator<'a>), - OnDisk(ForwardsCatOnDiskIterator<'a>), -} - -impl Iterator for ForwardsCatIterator<'_> { - type Item = (Arc, u32); - - - fn next(&mut self) -> Option { - match self { - ForwardsCatIterator::InMemory(inmem) => { - inmem.next() - } - ForwardsCatIterator::OnDisk(ondisk) => { - ondisk.next() - } - } - } -} \ No newline at end of file diff --git a/lib/representation/src/cats/forwards/in_memory.rs b/lib/representation/src/cats/forwards/in_memory.rs deleted file mode 100644 index 3e71eddc..00000000 --- a/lib/representation/src/cats/forwards/in_memory.rs +++ /dev/null @@ -1,40 +0,0 @@ -use std::collections::{HashMap}; -use std::hash::BuildHasherDefault; -use std::sync::Arc; -use nohash_hasher::NoHashHasher; -use crate::cats::reverse::ReverseCat; - -#[derive(Debug, Clone)] -pub struct ForwardsCatInMemory { - map: HashMap,u32, BuildHasherDefault>> -} - -impl ForwardsCatInMemory { - pub fn image(&self, rev_img: &ReverseCat) -> ForwardsCatInMemory { - let fw = Self::from(rev_img.iter().map(|(x, y)| (y,x))); - fw - } - - pub fn iter(&self) -> ForwardsCatInMemoryIterator { - ForwardsCatInMemoryIterator { - fwc: self.map.iter() - } - } - - pub fn get(&self, key: &Arc) -> Option<&u32> { - self.map.get(key) - } -} - -pub struct ForwardsCatInMemoryIterator<'a> { - fwc: std::collections::hash_map::Iter<'a, Arc, u32> -} - -impl ForwardsCatInMemoryIterator<'_> { - pub fn next(&mut self) -> Option<(Arc, u32)> { - match self.fwc.next() { - None => {None} - Some((s,u)) => {Some((s.clone(), *u))} - } - } -} diff --git a/lib/representation/src/cats/forwards/on_disk.rs b/lib/representation/src/cats/forwards/on_disk.rs deleted file mode 100644 index a5b19b5f..00000000 --- a/lib/representation/src/cats/forwards/on_disk.rs +++ /dev/null @@ -1,38 +0,0 @@ -use std::sync::Arc; -use crate::cats::reverse::ReverseCat; - -#[derive(Debug, Clone)] -pub struct ForwardsCatOnDisk { - map: () -} - -impl ForwardsCatOnDisk { - pub(crate) fn image(&self, rev: &ReverseCat) -> ForwardsCatOnDisk { - todo!() - } -} - -impl ForwardsCatOnDisk { - pub fn get(&self, key: &Arc) -> Option<&u32> { - self.map.get(key) - } - - pub fn iter(&self) -> ForwardsCatOnDiskIterator { - ForwardsCatOnDiskIterator{ - fwc: &() - } - } -} - -pub struct ForwardsCatOnDiskIterator<'a> { - fwc: &'a () -} - -impl ForwardsCatOnDiskIterator<'_> { - pub fn next(&mut self) -> Option<(Arc, u32)> { - match self.fwc.next() { - None => {None} - Some((s,u)) => {Some((s.clone(), *u))} - } - } -} \ No newline at end of file diff --git a/lib/representation/src/cats/globalize.rs b/lib/representation/src/cats/globalize.rs index 0bccd89a..e2e8cdbb 100644 --- a/lib/representation/src/cats/globalize.rs +++ b/lib/representation/src/cats/globalize.rs @@ -3,32 +3,35 @@ use crate::solution_mapping::BaseCatState; use crate::BaseRDFNodeType; use oxrdf::NamedNode; use polars::frame::DataFrame; -use std::sync::Arc; +use std::path::Path; use tracing::instrument; impl Cats { - pub fn globalize(&mut self, mut cat_triples: Vec) -> Vec { + pub fn globalize( + &mut self, + mut cat_triples: Vec, + path: Option<&Path>, + ) -> Vec { let local_cats: Vec<_> = cat_triples .iter_mut() .map(|x| x.local_cats.drain(..)) .flatten() .collect(); - let re_enc_map = self.merge(local_cats); + let re_enc_map = self.merge(local_cats, path); let global_cat_triples = re_encode(cat_triples, re_enc_map); global_cat_triples } - pub fn encode_predicates(&mut self, cat_triples: &Vec) { + pub fn encode_predicates(&mut self, cat_triples: &Vec, path: Option<&Path>) { for ct in cat_triples { let t = CatType::IRI; if !self.cat_map.contains_key(&t) { - self.cat_map.insert(t.clone(), CatEncs::new_empty()); + self.cat_map.insert(t.clone(), CatEncs::new_empty(path)); } let enc = self.cat_map.get_mut(&t).unwrap(); let pred = ct.predicate.as_str().to_string(); - if !enc.map.contains_key(&pred) { - let arc_pred = Arc::new(pred); - enc.encode_new_arc_string(arc_pred.clone(), self.iri_counter); + if !enc.maps.contains_str(&pred) { + enc.encode_new_string(pred, self.iri_counter); self.iri_counter += 1; } } @@ -44,6 +47,7 @@ pub fn cat_encode_triples( subject_cat_state: BaseCatState, object_cat_state: BaseCatState, global_cats: &Cats, + path: Option<&Path>, ) -> CatTriples { let (local_cats, encoded_triples) = encode_triples( df, @@ -52,6 +56,7 @@ pub fn cat_encode_triples( subject_cat_state, object_cat_state, global_cats, + path, ); CatTriples { diff --git a/lib/representation/src/cats/image.rs b/lib/representation/src/cats/image.rs index 321c2ea0..314e1211 100644 --- a/lib/representation/src/cats/image.rs +++ b/lib/representation/src/cats/image.rs @@ -4,10 +4,11 @@ use crate::cats::LockedCats; use crate::solution_mapping::{BaseCatState, EagerSolutionMappings}; use crate::{BaseRDFNodeType, RDFNodeState}; use nohash_hasher::NoHashHasher; -use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; -use std::collections::{BTreeMap, HashMap, HashSet}; +use rayon::iter::ParallelIterator; +use std::collections::{HashMap, HashSet}; use std::hash::BuildHasherDefault; use std::ops::Deref; +use std::path::{Path, PathBuf}; use std::sync::Arc; impl Cats { @@ -43,7 +44,7 @@ impl Cats { let local_read_ref = local_read.as_ref().map(|x| x.deref()); let local_rev_map = if let Some(local) = local_read_ref { - Some(local.get_reverse_lookup(bt)) + Some(local.get_cat_encs(bt)) } else { None }; @@ -55,7 +56,7 @@ impl Cats { .filter(|x| x.is_some()) .map(|x| x.unwrap()); if let Some(local_rev_map) = local_rev_map { - ss.extend(new_ss.filter(|x| !local_rev_map.rev_map.contains_key(x))); + ss.extend(new_ss.filter(|x| !local_rev_map.contains_u32(x))); } else { ss.extend(new_ss); } @@ -70,6 +71,7 @@ impl Cats { pub fn merge_solution_mappings_locals( &mut self, sms: &Vec<&EagerSolutionMappings>, + path: Option<&Path>, ) -> HashMap> { let mut local_cats = vec![]; @@ -86,7 +88,7 @@ impl Cats { } } } - let remap = self.merge(local_cats); + let remap = self.merge(local_cats, path); let mut concat_reenc: HashMap< String, HashMap>>>, @@ -145,27 +147,8 @@ impl Cats { impl CatEncs { pub fn image(&self, s: &HashSet) -> Option { - let new_map: BTreeMap<_, _> = s - .par_iter() - .map(|x| { - if let Some(s) = self.reverse.get(x) { - Some((*x, s.clone())) - } else { - None - } - }) - .filter(|x| x.is_some()) - .map(|x| x.unwrap()) - .map(|(x, y)| (y, x)) - .collect(); - let new_rev_map: HashMap<_, _, BuildHasherDefault>> = - new_map.iter().map(|(x, y)| (*y, x.clone())).collect(); - - if new_map.len() > 0 { - Some(CatEncs { - map: new_map, - rev_map: new_rev_map, - }) + if let Some(maps) = self.maps.image(s) { + Some(CatEncs { maps }) } else { None } @@ -175,10 +158,11 @@ impl CatEncs { pub fn new_solution_mapping_cats( sms: Vec, global_cats: &Cats, + path: Option<&Path>, ) -> (Vec, Cats) { let smsref: Vec<_> = sms.iter().collect(); let mut cats = global_cats.mappings_cat_image(&smsref); - let reenc = cats.merge_solution_mappings_locals(&smsref); + let reenc = cats.merge_solution_mappings_locals(&smsref, path); let new_sms: Vec<_> = sms .into_iter() .map(|sm| reencode_solution_mappings(sm, &reenc)) diff --git a/lib/representation/src/cats/maps.rs b/lib/representation/src/cats/maps.rs new file mode 100644 index 00000000..cd3e3be1 --- /dev/null +++ b/lib/representation/src/cats/maps.rs @@ -0,0 +1,175 @@ +pub mod in_memory; +pub mod on_disk; + +use crate::cats::maps::in_memory::CatMapsInMemory; +use crate::cats::maps::on_disk::CatMapsOnDisk; +use crate::cats::CatReEnc; +use std::collections::HashSet; +use std::path::Path; +use std::sync::Arc; + +#[derive(Debug, Clone)] +pub enum CatMaps { + InMemory(CatMapsInMemory), + OnDisk(CatMapsOnDisk), +} + +impl CatMaps { + pub(crate) fn new_remap(maps: &CatMaps, path: Option<&Path>) -> (CatMaps, CatReEnc) { + if let Some(path) = path { + todo!() + } else { + if let CatMaps::InMemory(mem) = maps { + let (mem, re_enc) = CatMapsInMemory::new_remap(mem); + (CatMaps::InMemory(mem), re_enc) + } else { + unreachable!("Should never happen") + } + } + } + + pub(crate) fn inner_join_re_enc(&self, other: &CatMaps) -> Vec<(u32, u32)> { + match self { + CatMaps::InMemory(mem) => { + if let CatMaps::InMemory(other_mem) = other { + mem.inner_join_re_enc(other_mem) + } else { + unreachable!("Should never happen") + } + } + CatMaps::OnDisk(_) => { + todo!() + } + } + } + + pub fn merge(&mut self, other: &CatMaps) -> CatReEnc { + match self { + CatMaps::InMemory(mem) => { + if let CatMaps::InMemory(other_mem) = other { + mem.merge(other_mem) + } else { + unreachable!("Should never happen") + } + } + CatMaps::OnDisk(_) => { + todo!() + } + } + } + + pub fn contains_str(&self, s: &str) -> bool { + match self { + CatMaps::InMemory(mem) => mem.contains_str(s), + CatMaps::OnDisk(disk) => { + todo!() + } + } + } + + pub fn contains_u32(&self, u: &u32) -> bool { + match self { + CatMaps::InMemory(mem) => mem.contains_u32(u), + CatMaps::OnDisk(disk) => { + todo!() + } + } + } + + pub fn is_empty(&self) -> bool { + match self { + CatMaps::InMemory(mem) => mem.is_empty(), + CatMaps::OnDisk(_) => { + todo!() + } + } + } + + pub fn encode_new_string(&mut self, s: String, u: u32) { + match self { + CatMaps::InMemory(mem) => mem.encode_new_string(s, u), + CatMaps::OnDisk(disk) => { + todo!() + } + } + } + + pub fn maybe_encode_string(&self, s: &str) -> Option<&u32> { + match self { + CatMaps::InMemory(mem) => mem.maybe_encode_string(s), + CatMaps::OnDisk(_) => { + todo!() + } + } + } + + pub fn new_singular(value: &str, u: u32, path: Option<&Path>) -> CatMaps { + if let Some(path) = path { + todo!() + } else { + CatMaps::InMemory(CatMapsInMemory::new_singular(value, u)) + } + } + + pub fn new_empty(path: Option<&Path>) -> CatMaps { + if let Some(path) = path { + todo!() + } else { + CatMaps::InMemory(CatMapsInMemory::new_empty()) + } + } + + fn batch_decode(&self, keys: &[u32]) -> Vec> { + todo!() + } + fn batch_encode(&self, kvs: &[(u32, Arc)]) -> u32 { + todo!() + } + + pub fn encode(&self, key: &Arc) -> Option<&u32> { + match self { + CatMaps::InMemory(mem) => mem.get(key), + CatMaps::OnDisk(disk) => todo!(), + } + } + + pub fn counter(&self) -> u32 { + match self { + CatMaps::InMemory(m) => m.counter(), + CatMaps::OnDisk(d) => todo!(), + } + } + + pub fn decode_batch(&self, v: &[Option]) -> Vec> { + match self { + CatMaps::InMemory(mem) => mem.decode_batch(v), + CatMaps::OnDisk(disk) => disk.decode_batch(v), + } + } + + pub fn maybe_decode(&self, u: &u32) -> Option<&str> { + match self { + CatMaps::InMemory(mem) => mem.maybe_decode(u), + CatMaps::OnDisk(disk) => disk.maybe_decode_string(u), + } + } + + pub fn image(&self, s: &HashSet) -> Option { + match self { + CatMaps::InMemory(m) => { + if let Some(m) = m.image(s) { + Some(CatMaps::InMemory(m)) + } else { + None + } + } + CatMaps::OnDisk(d) => { + if let Some(m) = d.image(s) { + Some(CatMaps::OnDisk(m)) + } else { + None + } + } + } + } +} diff --git a/lib/representation/src/cats/maps/in_memory.rs b/lib/representation/src/cats/maps/in_memory.rs new file mode 100644 index 00000000..4bbf33f0 --- /dev/null +++ b/lib/representation/src/cats/maps/in_memory.rs @@ -0,0 +1,189 @@ +use crate::cats::maps::CatMaps; +use crate::cats::{CatEncs, CatReEnc}; +use nohash_hasher::NoHashHasher; +use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator}; +use std::collections::{BTreeMap, HashMap, HashSet}; +use std::hash::BuildHasherDefault; +use std::sync::Arc; + +#[derive(Debug, Clone)] +pub struct CatMapsInMemory { + map: BTreeMap, u32>, + rev_map: HashMap, BuildHasherDefault>>, +} + +impl CatMapsInMemory { + pub(crate) fn new_remap(maps: &CatMapsInMemory) -> (CatMapsInMemory, CatReEnc) { + let mut c = 0; + let mut remap = vec![]; + let mut new_maps = CatMapsInMemory::new_empty(); + for (s, v) in maps.map.iter() { + remap.push((*v, c)); + new_maps.encode_new_arc_string(s.clone(), c); + c += 1; + } + let remap: HashMap<_, _, BuildHasherDefault>> = + remap.into_iter().collect(); + ( + new_maps, + CatReEnc { + cat_map: Arc::new(remap), + }, + ) + } + + fn encode_new_arc_string(&mut self, s: Arc, u: u32) { + self.map.insert(s.clone(), u); + self.rev_map.insert(u, s); + } +} + +impl CatMapsInMemory { + pub(crate) fn inner_join_re_enc(&self, other: &CatMapsInMemory) -> Vec<(u32, u32)> { + let renc: Vec<_> = self + .map + .iter() + .map(|(x, l)| { + if let Some(r) = other.maybe_encode_string(x.as_str()) { + if l != r { + Some((*l, *r)) + } else { + None + } + } else { + None + } + }) + .filter(|x| x.is_some()) + .map(|x| x.unwrap()) + .collect(); + renc + } +} + +impl CatMapsInMemory { + pub fn merge(&mut self, other: &CatMapsInMemory) -> CatReEnc { + let mut c = self.counter(); + let (remap, insert): (Vec<_>, Vec<_>) = other + .map + .iter() + .map(|(s, u)| { + if let Some(e) = self.map.get(s) { + (Some((*u, *e)), None) + } else { + (None, Some((s.clone(), u))) + } + }) + .unzip(); + let mut numbered_insert = Vec::new(); + let mut new_remap = Vec::new(); + for k in insert { + if let Some((s, u)) = k { + numbered_insert.push((s, c)); + new_remap.push((*u, c)); + c += 1; + } + } + for (s, u) in numbered_insert { + self.encode_new_arc_string(s.clone(), u); + } + let remap: HashMap<_, _, BuildHasherDefault>> = remap + .into_iter() + .filter(|x| x.is_some()) + .map(|x| x.unwrap()) + .chain(new_remap.into_iter()) + .collect(); + let reenc = CatReEnc { + cat_map: Arc::new(remap), + }; + reenc + } + + pub(crate) fn contains_str(&self, s: &str) -> bool { + let arcs = Arc::new(s.to_string()); + self.map.contains_key(&arcs) + } + + pub(crate) fn contains_u32(&self, u: &u32) -> bool { + self.rev_map.contains_key(u) + } + + pub fn is_empty(&self) -> bool { + self.map.is_empty() + } + + pub fn height(&self) -> u32 { + self.map.len() as u32 + } + + pub fn encode_new_string(&mut self, s: String, u: u32) { + let s = Arc::new(s.clone()); + self.map.insert(s.clone(), u); + self.rev_map.insert(u, s); + } + + pub fn maybe_encode_string(&self, s: &str) -> Option<&u32> { + let s = Arc::new(s.to_string()); + self.map.get(&s) + } + + pub fn new_singular(value: &str, u: u32) -> Self { + let mut sing = CatMapsInMemory::new_empty(); + let s = Arc::new(value.to_string()); + sing.map.insert(s.clone(), u); + sing.rev_map.insert(u, s); + sing + } + + pub(crate) fn new_empty() -> CatMapsInMemory { + todo!() + } + + pub fn get(&self, key: &Arc) -> Option<&u32> { + self.map.get(key) + } + + pub(crate) fn counter(&self) -> u32 { + self.rev_map.keys().max().unwrap().clone() + } + + pub fn get_rev(&self, key: &u32) -> Option<&Arc> { + self.rev_map.get(key) + } + + pub fn decode_batch(&self, v: &[Option]) -> Vec> { + let decoded_vec_iter = v + .into_par_iter() + .map(|x| x.map(|x| self.rev_map.get(&x).unwrap().as_str())); + decoded_vec_iter.collect() + } + + pub fn maybe_decode(&self, u: &u32) -> Option<&str> { + self.rev_map.get(u).map(|x| x.as_str()) + } + + pub(crate) fn image(&self, s: &HashSet) -> Option { + let new_map: HashMap<_, _, BuildHasherDefault>> = s + .par_iter() + .map(|x| { + if let Some(s) = self.rev_map.get(x) { + Some((s.clone(), x)) + } else { + None + } + }) + .filter(|x| x.is_some()) + .map(|x| x.unwrap()) + .map(|(x, y)| (*y, x)) + .collect(); + if new_map.is_empty() { + None + } else { + let map = new_map.iter().map(|(x, y)| (y.clone(), *x)).collect(); + Some(Self { + map, + rev_map: new_map, + }) + } + } +} diff --git a/lib/representation/src/cats/maps/on_disk.rs b/lib/representation/src/cats/maps/on_disk.rs new file mode 100644 index 00000000..d8675e46 --- /dev/null +++ b/lib/representation/src/cats/maps/on_disk.rs @@ -0,0 +1,25 @@ +use std::collections::HashSet; + +#[derive(Debug, Clone)] +pub struct CatMapsOnDisk { + map: (), + rev_map: (), +} + +impl CatMapsOnDisk { + pub(crate) fn decode_batch(&self, p0: &[Option]) -> Vec> { + todo!() + } +} + +impl CatMapsOnDisk { + pub(crate) fn image(&self, p0: &HashSet) -> Option { + todo!() + } +} + +impl CatMapsOnDisk { + pub(crate) fn maybe_decode_string(&self, p0: &u32) -> Option<&str> { + todo!() + } +} diff --git a/lib/representation/src/cats/re_encode.rs b/lib/representation/src/cats/re_encode.rs index 10bdae67..6842e9d9 100644 --- a/lib/representation/src/cats/re_encode.rs +++ b/lib/representation/src/cats/re_encode.rs @@ -6,9 +6,10 @@ use nohash_hasher::NoHashHasher; use polars::datatypes::PlSmallStr; use polars::error::PolarsResult; use polars::prelude::{col, Column, IntoColumn, IntoLazy, LazyFrame, Series}; -use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator}; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; use std::collections::HashMap; use std::hash::BuildHasherDefault; +use std::path::Path; use std::sync::Arc; #[derive(Debug, Clone)] @@ -53,24 +54,7 @@ impl CatReEnc { impl CatEncs { pub fn inner_join_re_enc(&self, other: &CatEncs) -> Vec<(u32, u32)> { - let renc: Vec<_> = self - .forward - .iter() - .map(|(x, l)| { - if let Some(r) = other.maybe_encode_str(x.as_str()) { - if &l != r { - Some((l, *r)) - } else { - None - } - } else { - None - } - }) - .filter(|x| x.is_some()) - .map(|x| x.unwrap()) - .collect(); - renc + self.maps.inner_join_re_enc(&other.maps) } } @@ -104,66 +88,25 @@ impl Cats { pub fn merge( &mut self, other_cats: Vec, + path: Option<&Path>, ) -> HashMap> { let mut map = HashMap::new(); for c in other_cats { let c = c.read().unwrap(); let mut other_map = HashMap::new(); for (t, other_enc) in c.cat_map.iter() { - let mut c = self.get_counter(&t); if let Some(enc) = self.cat_map.get_mut(t) { - let (remap, insert): (Vec<_>, Vec<_>) = other_enc - .forward - .iter() - .map(|(s, u)| { - if let Some(e) = enc.forward.get(&s) { - (Some((u, *e)), None) - } else { - (None, Some((s.clone(), u))) - } - }) - .unzip(); - let mut numbered_insert = Vec::new(); - let mut new_remap = Vec::new(); - for k in insert { - if let Some((s, u)) = k { - numbered_insert.push((s, c)); - new_remap.push((u, c)); - c += 1; - } - } - for (s, u) in numbered_insert { - enc.encode_new_arc_string(s.clone(), u); - } - let remap: HashMap<_, _, BuildHasherDefault>> = remap - .into_iter() - .filter(|x| x.is_some()) - .map(|x| x.unwrap()) - .chain(new_remap.into_iter()) - .collect(); - let reenc = CatReEnc { - cat_map: Arc::new(remap), - }; - other_map.insert(t.clone(), reenc); + let re_enc = enc.merge(other_enc); + other_map.insert(t.clone(), re_enc); } else { - let mut remap = vec![]; - let mut new_enc = CatEncs::new_empty(); - for (s, v) in other_enc.forward.iter() { - remap.push((v, c)); - new_enc.encode_new_arc_string(s, c); - c += 1; - } + let (new_enc, re_enc) = CatEncs::new_remap(other_enc, path); + self.cat_map.insert(t.clone(), new_enc); - let remap: HashMap<_, _, BuildHasherDefault>> = - remap.into_iter().collect(); - other_map.insert( - t.clone(), - CatReEnc { - cat_map: Arc::new(remap), - }, - ); + + other_map.insert(t.clone(), re_enc); } - self.set_counter(c, t); + let counter = self.cat_map.get(t).unwrap().counter(); + self.set_counter(counter, t); } if !other_map.is_empty() { map.insert(c.uuid.clone(), other_map); diff --git a/lib/representation/src/cats/reverse.rs b/lib/representation/src/cats/reverse.rs deleted file mode 100644 index 32f2fbc8..00000000 --- a/lib/representation/src/cats/reverse.rs +++ /dev/null @@ -1,86 +0,0 @@ -mod in_memory; -mod on_disk; - -use std::collections::HashSet; -use std::sync::Arc; -use crate::cats::reverse::in_memory::{ReverseCatInMemory, ReverseCatInMemoryIterator}; -use crate::cats::reverse::on_disk::{ReverseCatOnDisk, ReverseCatOnDiskIterator}; - -#[derive(Debug, Clone)] -pub enum ReverseCat { - ReverseCatInMemory(ReverseCatInMemory), - ReverseCatOnDisk(ReverseCatOnDisk), -} - -impl ReverseCat { - pub(crate) fn counter(&self) -> u32 { - match self { - ReverseCat::ReverseCatInMemory(m) => { - m.counter() - } - ReverseCat::ReverseCatOnDisk(d) => { - d.counter() - } - } - } - - pub fn get(&self, key:&u32) -> Option<&Arc> { - match self { - ReverseCat::ReverseCatInMemory(mem) => { - mem.get(key) - } - ReverseCat::ReverseCatOnDisk(disk) => { - disk.get(key) - } - } - } -} - -impl ReverseCat { - pub fn from)>>(iter:T, on_disk:bool) -> Self { - if on_disk { - ReverseCat::ReverseCatOnDisk(ReverseCatOnDisk::from(iter)) - } else { - ReverseCat::ReverseCatInMemory(ReverseCatInMemory::from(iter)) - } - } - - pub fn image(&self, s: &HashSet) -> Option { - match self { - ReverseCat::ReverseCatInMemory(m) => { - m.image(s).map(|x|ReverseCat::ReverseCatInMemory(x)) - } - ReverseCat::ReverseCatOnDisk(d) => { - d.image(s).map(|x|ReverseCat::ReverseCatOnDisk(x)) - } - } - } - - pub fn iter(&self) -> ReverseCatIterator { - match self { - ReverseCat::ReverseCatInMemory(mem) => {ReverseCatIterator::InMemory(mem.iter())} - ReverseCat::ReverseCatOnDisk(disk) => {ReverseCatIterator::OnDisk(disk.iter())} - } - } -} - -enum ReverseCatIterator<'a> { - InMemory(ReverseCatInMemoryIterator<'a>), - OnDisk(ReverseCatOnDiskIterator<'a>), -} - -impl Iterator for ReverseCatIterator<'_> { - type Item = (u32, Arc); - - - fn next(&mut self) -> Option { - match self { - ReverseCatIterator::InMemory(inmem) => { - inmem.next() - } - ReverseCatIterator::OnDisk(ondisk) => { - ondisk.next() - } - } - } -} \ No newline at end of file diff --git a/lib/representation/src/cats/reverse/in_memory.rs b/lib/representation/src/cats/reverse/in_memory.rs deleted file mode 100644 index 6ae63013..00000000 --- a/lib/representation/src/cats/reverse/in_memory.rs +++ /dev/null @@ -1,67 +0,0 @@ -use nohash_hasher::NoHashHasher; -use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; -use std::collections::{BTreeMap, HashMap, HashSet}; -use std::hash::BuildHasherDefault; -use std::sync::Arc; - -#[derive(Debug, Clone)] -pub struct ReverseCatInMemory { - rev_map: HashMap, BuildHasherDefault>>, -} - -impl ReverseCatInMemory { - pub(crate) fn counter(&self) -> u32 { - self.rev_map.keys().max().unwrap().clone() - } - - pub fn get(&self, key: &u32) -> Option<&Arc> { - self.rev_map.get(key) - } -} - -impl ReverseCatInMemory { - pub(crate) fn image(&self, s: &HashSet) -> Option { - let new_map: HashMap<_, _, BuildHasherDefault>> = s - .par_iter() - .map(|x| { - if let Some(s) = self.rev_map.get(x) { - Some((s.clone(), x)) - } else { - None - } - }) - .filter(|x| x.is_some()) - .map(|x| x.unwrap()) - .map(|(x, y)| (*y, x)) - .collect(); - if new_map.is_empty() { - None - } else { - Some(Self { rev_map: new_map }) - } - } - - pub fn from)>>(iter: T) -> Self { - let rev_map: HashMap<_, _, BuildHasherDefault>> = iter.collect(); - ReverseCatInMemory { rev_map } - } - - pub fn iter(&self) -> ReverseCatInMemoryIterator { - ReverseCatInMemoryIterator{ - rwc:self.rev_map.iter() - } - } -} - -pub struct ReverseCatInMemoryIterator<'a> { - pub(crate) rwc: std::collections::hash_map::Iter<'a, u32, Arc> -} - -impl ReverseCatInMemoryIterator<'_> { - pub fn next(&mut self) -> Option<(u32, Arc)> { - match self.rwc.next() { - None => {None} - Some((u,s)) => {Some((*u, s.clone()))} - } - } -} \ No newline at end of file diff --git a/lib/representation/src/cats/reverse/on_disk.rs b/lib/representation/src/cats/reverse/on_disk.rs deleted file mode 100644 index 95211b31..00000000 --- a/lib/representation/src/cats/reverse/on_disk.rs +++ /dev/null @@ -1,48 +0,0 @@ -use std::collections::HashSet; -use std::sync::Arc; - -#[derive(Debug, Clone)] -pub struct ReverseCatOnDisk { - -} - -impl ReverseCatOnDisk { - pub(crate) fn counter(&self) -> u32 { - todo!() - } -} - -impl ReverseCatOnDisk { - pub(crate) fn image(&self, us: &HashSet) -> Option { - todo!() - } -} - -impl ReverseCatOnDisk { - pub fn from)>>(iter:T) -> Self { - todo!() - } - - pub fn iter(&self) -> ReverseCatOnDiskIterator { - ReverseCatOnDiskIterator{ - rwc:&() - } - } - - pub fn get(&self, key: &u32) -> Option<&Arc> { - todo!() - } -} - -pub struct ReverseCatOnDiskIterator<'a> { - rwc: &'a () -} - -impl ReverseCatOnDiskIterator<'_> { - pub fn next(&mut self) -> Option<(u32, Arc)> { - match self.rwc.next() { - None => {None} - Some((u,s)) => {Some((*u, s.clone()))} - } - } -} \ No newline at end of file diff --git a/lib/triplestore/src/cats.rs b/lib/triplestore/src/cats.rs index f345ece6..b0b402a3 100644 --- a/lib/triplestore/src/cats.rs +++ b/lib/triplestore/src/cats.rs @@ -5,8 +5,14 @@ impl Triplestore { pub fn globalize(&mut self, cat_triples: Vec) -> Vec { let mut mutcat = self.global_cats.write().unwrap(); let cat_triples = { - let cat_triples = mutcat.globalize(cat_triples); - mutcat.encode_predicates(&cat_triples); + let cat_triples = mutcat.globalize( + cat_triples, + self.storage_folder.as_ref().map(|x| x.as_ref()), + ); + mutcat.encode_predicates( + &cat_triples, + self.storage_folder.as_ref().map(|x| x.as_ref()), + ); cat_triples }; cat_triples diff --git a/lib/triplestore/src/lib.rs b/lib/triplestore/src/lib.rs index 61a47d4b..7f25954d 100644 --- a/lib/triplestore/src/lib.rs +++ b/lib/triplestore/src/lib.rs @@ -254,7 +254,11 @@ impl Triplestore { graph: &NamedGraph, ) -> Result, TriplestoreError> { let prepare_triples_now = Instant::now(); - let dfs_to_add = prepare_add_triples_par(ts, self.global_cats.clone()); + let dfs_to_add = prepare_add_triples_par( + ts, + self.global_cats.clone(), + self.storage_folder.as_ref().map(|x| x.as_ref()), + ); trace!( "Preparing triples took {} seconds", prepare_triples_now.elapsed().as_secs_f32() @@ -454,6 +458,7 @@ struct TriplesToAddPartitionedPredicate { pub fn prepare_add_triples_par( mut ts: Vec, global_cats: LockedCats, + path: Option<&Path>, ) -> Vec { let mut all_partitioned: Vec = ts .par_drain(..) @@ -528,6 +533,7 @@ pub fn prepare_add_triples_par( subject_cat_state, object_cat_state, &cats, + path, ) }, ) diff --git a/lib/triplestore/src/sparql.rs b/lib/triplestore/src/sparql.rs index ff89b8cc..446936e4 100644 --- a/lib/triplestore/src/sparql.rs +++ b/lib/triplestore/src/sparql.rs @@ -682,7 +682,8 @@ fn named_node_pattern_expr( } fn named_node_u32_lit(nn: &NamedNode, name: &str, global_cats: &Cats) -> (Expr, RDFNodeState) { - let (u, state) = global_cats.encode_iri_or_local_cat(nn.as_str()); + // No disk based storage of local cat, hence path is None + let (u, state) = global_cats.encode_iri_or_local_cat(nn.as_str(), None); (lit(u).cast(DataType::UInt32).alias(name), state) } diff --git a/lib/triplestore/src/sparql/delete.rs b/lib/triplestore/src/sparql/delete.rs index 0ccc8c49..bfc2a047 100644 --- a/lib/triplestore/src/sparql/delete.rs +++ b/lib/triplestore/src/sparql/delete.rs @@ -13,6 +13,7 @@ use representation::multitype::{set_struct_all_null_to_null_row, split_df_multic use representation::solution_mapping::EagerSolutionMappings; use representation::{BaseRDFNodeType, OBJECT_COL_NAME, PREDICATE_COL_NAME, SUBJECT_COL_NAME}; use std::collections::HashMap; +use std::path::{Path, PathBuf}; use std::time::Instant; use tracing::trace; @@ -28,7 +29,11 @@ impl Triplestore { .map(|(x, y)| partition_by_global_predicate_col(x, y, &cats)) .flatten() .collect(); - let global_cat_triples = triples_solution_mappings_to_global_cat_triples(sms, &cats); + let global_cat_triples = triples_solution_mappings_to_global_cat_triples( + sms, + &cats, + self.storage_folder.as_ref().map(|x| x.as_ref()), + ); drop(cats); if !global_cat_triples.is_empty() { self.delete_triples_vec(global_cat_triples, graph) @@ -169,6 +174,7 @@ fn partition_by_global_predicate_col( fn triples_solution_mappings_to_global_cat_triples( sm_preds: Vec<(EagerSolutionMappings, NamedNode)>, global_cats: &Cats, + storage_path: Option<&Path>, ) -> Vec { let mappings_maps_preds: Vec<_> = sm_preds .into_par_iter() @@ -221,6 +227,7 @@ fn triples_solution_mappings_to_global_cat_triples( subject_state, object_state, global_cats, + storage_path, ); e }) diff --git a/lib/triplestore/src/sparql/lazy_graph_patterns/pvalues.rs b/lib/triplestore/src/sparql/lazy_graph_patterns/pvalues.rs index 5417c5a0..6bd8ec45 100644 --- a/lib/triplestore/src/sparql/lazy_graph_patterns/pvalues.rs +++ b/lib/triplestore/src/sparql/lazy_graph_patterns/pvalues.rs @@ -46,7 +46,8 @@ impl Triplestore { todo!("Handle this error") }; let cats = self.global_cats.read().unwrap(); - let mut sm = cats.encode_solution_mappings(sm); + // No disk based storage of local cats if created + let mut sm = cats.encode_solution_mappings(sm, None); sm.mappings.as_single_chunk_par(); if let Some(mut mappings) = solution_mappings { //TODO: Remove this workaround diff --git a/lib/triplestore/src/sparql/lazy_graph_patterns/values.rs b/lib/triplestore/src/sparql/lazy_graph_patterns/values.rs index 9c72a1dc..a44c50c2 100644 --- a/lib/triplestore/src/sparql/lazy_graph_patterns/values.rs +++ b/lib/triplestore/src/sparql/lazy_graph_patterns/values.rs @@ -20,9 +20,10 @@ impl Triplestore { query_settings: &QuerySettings, ) -> Result { let sm = values_pattern(variables, bindings); + // No disk based storage of local cats if they are created let sm = { let cats = self.global_cats.read()?; - cats.encode_solution_mappings(sm) + cats.encode_solution_mappings(sm, None) }; if let Some(mut mappings) = solution_mappings { mappings = join( diff --git a/lib/triplestore/src/storage.rs b/lib/triplestore/src/storage.rs index 2f8796e8..61b8446f 100644 --- a/lib/triplestore/src/storage.rs +++ b/lib/triplestore/src/storage.rs @@ -13,7 +13,7 @@ use polars_core::series::SeriesIter; use polars_core::utils::Container; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use representation::cats::{ - Cats, LockedCats, ReverseLookup, OBJECT_RANK_COL_NAME, SUBJECT_RANK_COL_NAME, + CatEncs, Cats, LockedCats, OBJECT_RANK_COL_NAME, SUBJECT_RANK_COL_NAME, }; use representation::{ BaseRDFNodeType, LANG_STRING_LANG_FIELD, LANG_STRING_VALUE_FIELD, OBJECT_COL_NAME, @@ -134,7 +134,7 @@ impl Triples { let mut i = 0; let now_subjects = Instant::now(); - let maybe_subject_encs = get_rev_map(global_cats.deref(), &self.subject_type); + let maybe_subject_encs = maybe_get_cat_encs(global_cats.deref(), &self.subject_type); let subjects_str = if OFFSET_STEP / 10 * incoming_df.height() < remaining_height { let new_subjects_col = incoming_df.column(SUBJECT_COL_NAME).unwrap(); @@ -430,13 +430,13 @@ fn create_indices( ) -> Result { assert!(df.height() > 0); let now = Instant::now(); - let subject_encs = get_rev_map(cats, subj_type); + let subject_encs = maybe_get_cat_encs(cats, subj_type); //Should already be sorted by subject, object let subject_sparse_index = create_sparse_index( df.column(SUBJECT_COL_NAME) .unwrap() .as_materialized_series(), - subject_encs.as_ref(), + subject_encs, ); trace!( "Creating subject sparse map took {} seconds", @@ -466,10 +466,10 @@ fn create_indices( ) .collect() .unwrap(); - let object_encs = get_rev_map(cats, obj_type); + let object_encs = maybe_get_cat_encs(cats, obj_type); let sparse = create_sparse_index( df.column(OBJECT_COL_NAME).unwrap().as_materialized_series(), - object_encs.as_ref(), + object_encs, ); object_sort = Some(StoredTriples::new( @@ -493,9 +493,9 @@ fn create_indices( }) } -fn get_rev_map<'a>(c: &'a Cats, t: &BaseRDFNodeType) -> Option> { +fn maybe_get_cat_encs<'a>(c: &'a Cats, t: &BaseRDFNodeType) -> Option<&'a CatEncs> { if t.stored_cat() { - Some(c.get_reverse_lookup(t)) + Some(c.get_cat_encs(t)) } else { None } @@ -753,11 +753,11 @@ fn update_index_at_offset( u32_chunked: &UInt32Chunked, offset: usize, sparse_map: &mut BTreeMap, usize>, - rev_map: &ReverseLookup, + cat_encs: &CatEncs, ) { let u = u32_chunked.get(offset); if let Some(u) = u { - let s = rev_map.lookup(&u).unwrap(); + let s = cat_encs.maybe_decode_string(&u).unwrap(); let e = sparse_map.entry(Arc::new(s.to_string())); e.or_insert(offset); } @@ -782,14 +782,14 @@ pub(crate) fn repeated_from_last_row_expr(c: &str) -> Expr { .and(col(c).shift(lit(1)).eq(col(c))) } -fn create_sparse_index(ser: &Series, rev_map: Option<&ReverseLookup>) -> SparseIndex { - match rev_map { +fn create_sparse_index(ser: &Series, cat_encs: Option<&CatEncs>) -> SparseIndex { + match cat_encs { None => create_sparse_string_index(ser), Some(rev_map) => create_sparse_cat_index(ser, rev_map), } } -fn create_sparse_cat_index(ser: &Series, encs: &ReverseLookup) -> SparseIndex { +fn create_sparse_cat_index(ser: &Series, encs: &CatEncs) -> SparseIndex { assert!(!ser.is_empty()); let strch = ser.u32().unwrap(); let mut sparse_map = BTreeMap::new(); @@ -823,7 +823,7 @@ fn create_sparse_string_index(ser: &Series) -> SparseIndex { } //Assumes sorted. -fn create_deduplicated_string_vec<'a>(ser: &Series, rev_map: &'a ReverseLookup) -> Vec<&'a str> { +fn create_deduplicated_string_vec<'a>(ser: &Series, CatEncs: &'a CatEncs) -> Vec<&'a str> { let mut v = Vec::with_capacity(ser.len()); let mut last = None; for any in ser.iter() { @@ -835,7 +835,7 @@ fn create_deduplicated_string_vec<'a>(ser: &Series, rev_map: &'a ReverseLookup) } } last = Some(u); - let s = rev_map.lookup(&u).unwrap(); + let s = CatEncs.maybe_decode_string(&u).unwrap(); v.push(s); } _ => unreachable!("Should never happen"), @@ -872,8 +872,8 @@ fn compact_segments( } } } - let subject_encs = get_rev_map(cats, subj_type); - let object_encs = get_rev_map(cats, obj_type); + let subject_encs = maybe_get_cat_encs(cats, subj_type); + let object_encs = maybe_get_cat_encs(cats, obj_type); let fields = if let BaseRDFNodeType::Literal(nn) = obj_type { if nn.as_ref() == rdf::LANG_STRING { Some((LANG_STRING_VALUE_FIELD, LANG_STRING_LANG_FIELD)) @@ -888,8 +888,8 @@ fn compact_segments( SUBJECT_COL_NAME, OBJECT_COL_NAME, fields, - subject_encs.as_ref(), - object_encs.as_ref(), + subject_encs, + object_encs, )?; let height = compact_subjects.height(); @@ -898,7 +898,7 @@ fn compact_segments( .column(SUBJECT_COL_NAME) .unwrap() .as_materialized_series(), - subject_encs.as_ref(), + subject_encs, ); let stored_subjects = StoredTriples::new(compact_subjects, subj_type, obj_type, storage_folder)?; @@ -909,15 +909,15 @@ fn compact_segments( OBJECT_COL_NAME, SUBJECT_COL_NAME, None, - object_encs.as_ref(), - subject_encs.as_ref(), + object_encs, + subject_encs, )?; let object_index = create_sparse_index( compact_objects .column(OBJECT_COL_NAME) .unwrap() .as_materialized_series(), - object_encs.as_ref(), + object_encs, ); let stored_objects = StoredTriples::new(compact_objects, subj_type, obj_type, storage_folder)?; @@ -942,8 +942,8 @@ fn compact_dataframe_segments( col_a: &str, col_b: &str, b_fields: Option<(&str, &str)>, - a_cat_encs: Option<&ReverseLookup>, - b_cat_encs: Option<&ReverseLookup>, + a_cat_encs: Option<&CatEncs>, + b_cat_encs: Option<&CatEncs>, ) -> Result<(DataFrame, Option), TriplestoreError> { let mut new_segments = vec![]; for (mut df, new) in segments { @@ -1131,8 +1131,8 @@ impl<'a> TripleIterator<'a> { mut a_iterator: SeriesIter<'a>, mut b_1_iterator: SeriesIter<'a>, mut b_2_iterator: Option>, - a_cat_rev_map: Option<&'a ReverseLookup<'a>>, - b_cat_rev_map: Option<&'a ReverseLookup<'a>>, + a_cat_rev_map: Option<&'a CatEncs>, + b_cat_rev_map: Option<&'a CatEncs>, new: bool, ) -> Option { if let Some(a) = a_iterator.next() { @@ -1167,8 +1167,8 @@ impl<'a> TripleIterator<'a> { pub fn next( self, - a_cat_rev_map: Option<&'a ReverseLookup>, - b_cat_rev_map: Option<&'a ReverseLookup>, + a_cat_rev_map: Option<&'a CatEncs>, + b_cat_rev_map: Option<&'a CatEncs>, ) -> Option { let Self { mut a_iterator, @@ -1211,11 +1211,13 @@ impl<'a> TripleIterator<'a> { fn decode_elem<'a>( any_value: &AnyValue<'a>, - reverse_lookup: Option<&'a ReverseLookup>, + reverse_lookup: Option<&'a CatEncs>, ) -> Option> { if let Some(reverse_lookup) = reverse_lookup { if let AnyValue::UInt32(u) = any_value { - Some(AnyValue::String(reverse_lookup.lookup(u).unwrap())) + Some(AnyValue::String( + reverse_lookup.maybe_decode_string(u).unwrap(), + )) } else { unreachable!("Should never happen") } From 74925dae5e8efc5072861695ab762add985446c7 Mon Sep 17 00:00:00 2001 From: Magnus Bakken <10287813+magbak@users.noreply.github.com> Date: Mon, 29 Dec 2025 10:12:33 +0100 Subject: [PATCH 4/5] Prepare for disk based storage of cats --- Cargo.toml | 2 +- lib/maplib/src/model.rs | 22 ++- lib/query_processing/src/cats.rs | 1 - lib/representation/src/cats.rs | 19 +-- lib/representation/src/cats/image.rs | 3 +- lib/representation/src/cats/maps.rs | 19 +-- lib/representation/src/cats/maps/in_memory.rs | 144 +++++++++--------- lib/representation/src/cats/maps/on_disk.rs | 4 +- lib/representation/src/cats/re_encode.rs | 6 +- lib/triplestore/src/sparql.rs | 2 +- lib/triplestore/src/sparql/delete.rs | 2 +- lib/triplestore/src/storage.rs | 4 +- lib/triplestore/src/triples_read.rs | 8 + 13 files changed, 127 insertions(+), 109 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f7d746eb..d9b01642 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ polars = { version = "0.52.0", features = ["simd", "nightly", "new_streaming", " polars-core = { version = "0.52.0" } rayon = "1.10.0" regex = "1.11.1" -oxrdf = { version="0.2.4" } +oxrdf = { version = "0.2.4" } oxrdfio = { version = "0.1.7" } oxttl = { version="0.1.7" } oxiri = "0.2.11" diff --git a/lib/maplib/src/model.rs b/lib/maplib/src/model.rs index 4ef2cfcb..4938182f 100644 --- a/lib/maplib/src/model.rs +++ b/lib/maplib/src/model.rs @@ -184,7 +184,16 @@ impl Model { self.truncate_graph(&graph) } self.triplestore - .read_triples_from_path(p, rdf_format, base_iri, transient, parallel, checked, graph) + .read_triples_from_path( + p, + rdf_format, + base_iri, + transient, + parallel, + checked, + graph, + &self.prefixes, + ) .map_err(MaplibError::TriplestoreError) } @@ -217,7 +226,16 @@ impl Model { self.truncate_graph(&graph) } self.triplestore - .read_triples_from_string(s, rdf_format, base_iri, transient, parallel, checked, graph) + .read_triples_from_string( + s, + rdf_format, + base_iri, + transient, + parallel, + checked, + graph, + &self.prefixes, + ) .map_err(MaplibError::TriplestoreError) } diff --git a/lib/query_processing/src/cats.rs b/lib/query_processing/src/cats.rs index cf6bb469..ef52d8e4 100644 --- a/lib/query_processing/src/cats.rs +++ b/lib/query_processing/src/cats.rs @@ -5,7 +5,6 @@ use representation::{BaseRDFNodeType, RDFNodeState}; use std::collections::{HashMap, HashSet}; use std::hash::BuildHasherDefault; use std::ops::Deref; -use std::path::Path; use std::sync::Arc; pub fn create_compatible_cats( diff --git a/lib/representation/src/cats.rs b/lib/representation/src/cats.rs index 292419d5..0df737cc 100644 --- a/lib/representation/src/cats.rs +++ b/lib/representation/src/cats.rs @@ -14,12 +14,10 @@ use std::cmp; use crate::cats::maps::CatMaps; use crate::BaseRDFNodeType; -use nohash_hasher::NoHashHasher; use oxrdf::vocab::xsd; use oxrdf::{NamedNode, NamedNodeRef}; use polars::prelude::DataFrame; use std::collections::HashMap; -use std::hash::BuildHasherDefault; use std::ops::Deref; use std::path::Path; use std::sync::Arc; @@ -32,7 +30,6 @@ pub const SUBJECT_RANK_COL_NAME: &str = "subject_rank"; pub fn literal_is_cat(nn: NamedNodeRef) -> bool { nn == xsd::STRING } -type RevMap = HashMap, BuildHasherDefault>>; #[derive(Debug)] pub struct CatTriples { @@ -86,8 +83,12 @@ pub struct CatEncs { } impl CatEncs { - pub(crate) fn new_remap(other: &CatEncs, path: Option<&Path>) -> (CatEncs, CatReEnc) { - let (maps, remap) = CatMaps::new_remap(&other.maps, path); + pub(crate) fn new_remap( + other: &CatEncs, + path: Option<&Path>, + c: &mut u32, + ) -> (CatEncs, CatReEnc) { + let (maps, remap) = CatMaps::new_remap(&other.maps, path, c); let encs = CatEncs { maps }; (encs, remap) } @@ -106,8 +107,8 @@ impl CatEncs { self.maps.is_empty() } - pub fn merge(&mut self, other: &CatEncs) -> CatReEnc { - self.maps.merge(&other.maps) + pub fn merge(&mut self, other: &CatEncs, c: &mut u32) -> CatReEnc { + self.maps.merge(&other.maps, c) } } @@ -278,8 +279,8 @@ impl Cats { map } - fn get_counter(&self, c: &CatType) -> u32 { - match c { + fn get_counter(&self, t: &CatType) -> u32 { + match t { CatType::IRI => self.get_iri_counter(), CatType::Blank => self.get_blank_counter(), CatType::Literal(nn) => self.get_literal_counter(nn), diff --git a/lib/representation/src/cats/image.rs b/lib/representation/src/cats/image.rs index 314e1211..833735bd 100644 --- a/lib/representation/src/cats/image.rs +++ b/lib/representation/src/cats/image.rs @@ -4,11 +4,10 @@ use crate::cats::LockedCats; use crate::solution_mapping::{BaseCatState, EagerSolutionMappings}; use crate::{BaseRDFNodeType, RDFNodeState}; use nohash_hasher::NoHashHasher; -use rayon::iter::ParallelIterator; use std::collections::{HashMap, HashSet}; use std::hash::BuildHasherDefault; use std::ops::Deref; -use std::path::{Path, PathBuf}; +use std::path::Path; use std::sync::Arc; impl Cats { diff --git a/lib/representation/src/cats/maps.rs b/lib/representation/src/cats/maps.rs index cd3e3be1..f98ae264 100644 --- a/lib/representation/src/cats/maps.rs +++ b/lib/representation/src/cats/maps.rs @@ -15,12 +15,16 @@ pub enum CatMaps { } impl CatMaps { - pub(crate) fn new_remap(maps: &CatMaps, path: Option<&Path>) -> (CatMaps, CatReEnc) { + pub(crate) fn new_remap( + maps: &CatMaps, + path: Option<&Path>, + c: &mut u32, + ) -> (CatMaps, CatReEnc) { if let Some(path) = path { todo!() } else { if let CatMaps::InMemory(mem) = maps { - let (mem, re_enc) = CatMapsInMemory::new_remap(mem); + let (mem, re_enc) = CatMapsInMemory::new_remap(mem, c); (CatMaps::InMemory(mem), re_enc) } else { unreachable!("Should never happen") @@ -43,11 +47,11 @@ impl CatMaps { } } - pub fn merge(&mut self, other: &CatMaps) -> CatReEnc { + pub fn merge(&mut self, other: &CatMaps, c: &mut u32) -> CatReEnc { match self { CatMaps::InMemory(mem) => { if let CatMaps::InMemory(other_mem) = other { - mem.merge(other_mem) + mem.merge(other_mem, c) } else { unreachable!("Should never happen") } @@ -119,13 +123,6 @@ impl CatMaps { } } - fn batch_decode(&self, keys: &[u32]) -> Vec> { - todo!() - } - fn batch_encode(&self, kvs: &[(u32, Arc)]) -> u32 { - todo!() - } - pub fn encode(&self, key: &Arc) -> Option<&u32> { match self { CatMaps::InMemory(mem) => mem.get(key), diff --git a/lib/representation/src/cats/maps/in_memory.rs b/lib/representation/src/cats/maps/in_memory.rs index 4bbf33f0..37e94f21 100644 --- a/lib/representation/src/cats/maps/in_memory.rs +++ b/lib/representation/src/cats/maps/in_memory.rs @@ -1,5 +1,4 @@ -use crate::cats::maps::CatMaps; -use crate::cats::{CatEncs, CatReEnc}; +use crate::cats::CatReEnc; use nohash_hasher::NoHashHasher; use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator}; use std::collections::{BTreeMap, HashMap, HashSet}; @@ -13,14 +12,13 @@ pub struct CatMapsInMemory { } impl CatMapsInMemory { - pub(crate) fn new_remap(maps: &CatMapsInMemory) -> (CatMapsInMemory, CatReEnc) { - let mut c = 0; + pub fn new_remap(maps: &CatMapsInMemory, c: &mut u32) -> (CatMapsInMemory, CatReEnc) { let mut remap = vec![]; let mut new_maps = CatMapsInMemory::new_empty(); for (s, v) in maps.map.iter() { - remap.push((*v, c)); - new_maps.encode_new_arc_string(s.clone(), c); - c += 1; + remap.push((*v, *c)); + new_maps.encode_new_arc_string(s.clone(), *c); + *c += 1; } let remap: HashMap<_, _, BuildHasherDefault>> = remap.into_iter().collect(); @@ -36,68 +34,6 @@ impl CatMapsInMemory { self.map.insert(s.clone(), u); self.rev_map.insert(u, s); } -} - -impl CatMapsInMemory { - pub(crate) fn inner_join_re_enc(&self, other: &CatMapsInMemory) -> Vec<(u32, u32)> { - let renc: Vec<_> = self - .map - .iter() - .map(|(x, l)| { - if let Some(r) = other.maybe_encode_string(x.as_str()) { - if l != r { - Some((*l, *r)) - } else { - None - } - } else { - None - } - }) - .filter(|x| x.is_some()) - .map(|x| x.unwrap()) - .collect(); - renc - } -} - -impl CatMapsInMemory { - pub fn merge(&mut self, other: &CatMapsInMemory) -> CatReEnc { - let mut c = self.counter(); - let (remap, insert): (Vec<_>, Vec<_>) = other - .map - .iter() - .map(|(s, u)| { - if let Some(e) = self.map.get(s) { - (Some((*u, *e)), None) - } else { - (None, Some((s.clone(), u))) - } - }) - .unzip(); - let mut numbered_insert = Vec::new(); - let mut new_remap = Vec::new(); - for k in insert { - if let Some((s, u)) = k { - numbered_insert.push((s, c)); - new_remap.push((*u, c)); - c += 1; - } - } - for (s, u) in numbered_insert { - self.encode_new_arc_string(s.clone(), u); - } - let remap: HashMap<_, _, BuildHasherDefault>> = remap - .into_iter() - .filter(|x| x.is_some()) - .map(|x| x.unwrap()) - .chain(new_remap.into_iter()) - .collect(); - let reenc = CatReEnc { - cat_map: Arc::new(remap), - }; - reenc - } pub(crate) fn contains_str(&self, s: &str) -> bool { let arcs = Arc::new(s.to_string()); @@ -135,16 +71,19 @@ impl CatMapsInMemory { sing } - pub(crate) fn new_empty() -> CatMapsInMemory { - todo!() + pub fn new_empty() -> CatMapsInMemory { + CatMapsInMemory { + map: Default::default(), + rev_map: Default::default(), + } } pub fn get(&self, key: &Arc) -> Option<&u32> { self.map.get(key) } - pub(crate) fn counter(&self) -> u32 { - self.rev_map.keys().max().unwrap().clone() + pub fn counter(&self) -> u32 { + self.rev_map.keys().max().unwrap().clone() + 1 } pub fn get_rev(&self, key: &u32) -> Option<&Arc> { @@ -162,7 +101,7 @@ impl CatMapsInMemory { self.rev_map.get(u).map(|x| x.as_str()) } - pub(crate) fn image(&self, s: &HashSet) -> Option { + pub fn image(&self, s: &HashSet) -> Option { let new_map: HashMap<_, _, BuildHasherDefault>> = s .par_iter() .map(|x| { @@ -186,4 +125,61 @@ impl CatMapsInMemory { }) } } + + pub fn inner_join_re_enc(&self, other: &CatMapsInMemory) -> Vec<(u32, u32)> { + let renc: Vec<_> = self + .map + .iter() + .map(|(x, l)| { + if let Some(r) = other.maybe_encode_string(x.as_str()) { + if l != r { + Some((*l, *r)) + } else { + None + } + } else { + None + } + }) + .filter(|x| x.is_some()) + .map(|x| x.unwrap()) + .collect(); + renc + } + + pub fn merge(&mut self, other: &CatMapsInMemory, c: &mut u32) -> CatReEnc { + let (remap, insert): (Vec<_>, Vec<_>) = other + .map + .iter() + .map(|(s, u)| { + if let Some(e) = self.map.get(s) { + (Some((*u, *e)), None) + } else { + (None, Some((s.clone(), u))) + } + }) + .unzip(); + let mut numbered_insert = Vec::new(); + let mut new_remap = Vec::new(); + for k in insert { + if let Some((s, u)) = k { + numbered_insert.push((s, *c)); + new_remap.push((*u, *c)); + *c += 1; + } + } + for (s, u) in numbered_insert { + self.encode_new_arc_string(s.clone(), u); + } + let remap: HashMap<_, _, BuildHasherDefault>> = remap + .into_iter() + .filter(|x| x.is_some()) + .map(|x| x.unwrap()) + .chain(new_remap.into_iter()) + .collect(); + let reenc = CatReEnc { + cat_map: Arc::new(remap), + }; + reenc + } } diff --git a/lib/representation/src/cats/maps/on_disk.rs b/lib/representation/src/cats/maps/on_disk.rs index d8675e46..59c707a6 100644 --- a/lib/representation/src/cats/maps/on_disk.rs +++ b/lib/representation/src/cats/maps/on_disk.rs @@ -7,13 +7,13 @@ pub struct CatMapsOnDisk { } impl CatMapsOnDisk { - pub(crate) fn decode_batch(&self, p0: &[Option]) -> Vec> { + pub(crate) fn decode_batch(&self, us: &[Option]) -> Vec> { todo!() } } impl CatMapsOnDisk { - pub(crate) fn image(&self, p0: &HashSet) -> Option { + pub(crate) fn image(&self, s: &HashSet) -> Option { todo!() } } diff --git a/lib/representation/src/cats/re_encode.rs b/lib/representation/src/cats/re_encode.rs index 6842e9d9..d842f28c 100644 --- a/lib/representation/src/cats/re_encode.rs +++ b/lib/representation/src/cats/re_encode.rs @@ -95,17 +95,17 @@ impl Cats { let c = c.read().unwrap(); let mut other_map = HashMap::new(); for (t, other_enc) in c.cat_map.iter() { + let mut counter = self.get_counter(t); if let Some(enc) = self.cat_map.get_mut(t) { - let re_enc = enc.merge(other_enc); + let re_enc = enc.merge(other_enc, &mut counter); other_map.insert(t.clone(), re_enc); } else { - let (new_enc, re_enc) = CatEncs::new_remap(other_enc, path); + let (new_enc, re_enc) = CatEncs::new_remap(other_enc, path, &mut counter); self.cat_map.insert(t.clone(), new_enc); other_map.insert(t.clone(), re_enc); } - let counter = self.cat_map.get(t).unwrap().counter(); self.set_counter(counter, t); } if !other_map.is_empty() { diff --git a/lib/triplestore/src/sparql.rs b/lib/triplestore/src/sparql.rs index 6020f3ec..f84aaedb 100644 --- a/lib/triplestore/src/sparql.rs +++ b/lib/triplestore/src/sparql.rs @@ -27,6 +27,7 @@ use query_processing::graph_patterns::unique_workaround; use query_processing::pushdowns::Pushdowns; use representation::cats::{Cats, LockedCats}; use representation::dataset::{NamedGraph, QueryGraph}; +use representation::debug::DebugOutput; use representation::polars_to_rdf::{df_as_result, QuerySolutions}; use representation::query_context::Context; use representation::rdf_to_polars::{ @@ -44,7 +45,6 @@ use spargebra::term::{ }; use spargebra::{GraphUpdateOperation, Query, Update}; use std::collections::{HashMap, HashSet}; -use representation::debug::DebugOutput; pub struct QueryResult { pub kind: QueryResultKind, diff --git a/lib/triplestore/src/sparql/delete.rs b/lib/triplestore/src/sparql/delete.rs index bfc2a047..70b0a094 100644 --- a/lib/triplestore/src/sparql/delete.rs +++ b/lib/triplestore/src/sparql/delete.rs @@ -13,7 +13,7 @@ use representation::multitype::{set_struct_all_null_to_null_row, split_df_multic use representation::solution_mapping::EagerSolutionMappings; use representation::{BaseRDFNodeType, OBJECT_COL_NAME, PREDICATE_COL_NAME, SUBJECT_COL_NAME}; use std::collections::HashMap; -use std::path::{Path, PathBuf}; +use std::path::Path; use std::time::Instant; use tracing::trace; diff --git a/lib/triplestore/src/storage.rs b/lib/triplestore/src/storage.rs index 61b8446f..c6383af7 100644 --- a/lib/triplestore/src/storage.rs +++ b/lib/triplestore/src/storage.rs @@ -823,7 +823,7 @@ fn create_sparse_string_index(ser: &Series) -> SparseIndex { } //Assumes sorted. -fn create_deduplicated_string_vec<'a>(ser: &Series, CatEncs: &'a CatEncs) -> Vec<&'a str> { +fn create_deduplicated_string_vec<'a>(ser: &Series, cat_encs: &'a CatEncs) -> Vec<&'a str> { let mut v = Vec::with_capacity(ser.len()); let mut last = None; for any in ser.iter() { @@ -835,7 +835,7 @@ fn create_deduplicated_string_vec<'a>(ser: &Series, CatEncs: &'a CatEncs) -> Vec } } last = Some(u); - let s = CatEncs.maybe_decode_string(&u).unwrap(); + let s = cat_encs.maybe_decode_string(&u).unwrap(); v.push(s); } _ => unreachable!("Should never happen"), diff --git a/lib/triplestore/src/triples_read.rs b/lib/triplestore/src/triples_read.rs index 4685fd7c..bb85ddef 100644 --- a/lib/triplestore/src/triples_read.rs +++ b/lib/triplestore/src/triples_read.rs @@ -43,6 +43,7 @@ impl Triplestore { parallel: Option, checked: bool, graph: &NamedGraph, + prefixes: &HashMap, ) -> Result<(), TriplestoreError> { let now = Instant::now(); let rdf_format = if let Some(rdf_format) = rdf_format { @@ -68,6 +69,7 @@ impl Triplestore { parallel, checked, graph, + prefixes, )?; drop(map); @@ -88,6 +90,7 @@ impl Triplestore { parallel: Option, checked: bool, graph: &NamedGraph, + prefixes: &HashMap, ) -> Result<(), TriplestoreError> { self.read_triples( s.as_bytes(), @@ -97,6 +100,7 @@ impl Triplestore { parallel, checked, graph, + prefixes, ) } @@ -111,6 +115,7 @@ impl Triplestore { parallel: Option, checked: bool, graph: &NamedGraph, + prefixes: &HashMap, ) -> Result<(), TriplestoreError> { let start_quadproc_now = Instant::now(); let parallel = if let Some(parallel) = parallel { @@ -129,6 +134,9 @@ impl Triplestore { let mut readers = vec![]; if rdf_format == RdfFormat::Turtle { let mut parser = TurtleParser::new(); + for (k, v) in prefixes { + parser = parser.with_prefix(k, v.as_str()).unwrap(); + } if !checked { parser = parser.unchecked(); } From 6d33c87b38d33eaa872dd539a55c299035f97aaf Mon Sep 17 00:00:00 2001 From: Magnus Bakken <10287813+magbak@users.noreply.github.com> Date: Mon, 29 Dec 2025 10:20:15 +0100 Subject: [PATCH 5/5] Further on disk --- Cargo.lock | 2 +- py_maplib/maplib/__init__.pyi | 2 +- py_maplib/src/lib.rs | 10 +++++++++- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f37e40aa..595a1297 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2451,7 +2451,7 @@ dependencies = [ [[package]] name = "py_maplib" -version = "0.18.18" +version = "0.18.17" dependencies = [ "chrono", "cimxml", diff --git a/py_maplib/maplib/__init__.pyi b/py_maplib/maplib/__init__.pyi index 2a2026c9..007bb9f6 100644 --- a/py_maplib/maplib/__init__.pyi +++ b/py_maplib/maplib/__init__.pyi @@ -356,7 +356,7 @@ class Model: :return: """ - def add_prefixes(self, template: Dict[str, str]): + def add_prefixes(self, prefixes: Dict[str, str]): """ Add prefixes that will be used in parsing of SPARQL, Datalog and OTTR. diff --git a/py_maplib/src/lib.rs b/py_maplib/src/lib.rs index 38f85538..9fda8a26 100644 --- a/py_maplib/src/lib.rs +++ b/py_maplib/src/lib.rs @@ -1285,7 +1285,15 @@ fn insert_sprout_mutex( let global_cats = &inner.triplestore.global_cats; let (mut sms, cats) = { let guard = global_cats.read().unwrap(); - new_solution_mapping_cats(sms, &guard) + new_solution_mapping_cats( + sms, + &guard, + inner + .triplestore + .storage_folder + .as_ref() + .map(|x| x.as_ref()), + ) }; let locked_cats = LockedCats::new(cats); for sm in &mut sms {