diff --git a/src/mem_store/partition.rs b/src/mem_store/partition.rs index a7c5779..a0d1da4 100644 --- a/src/mem_store/partition.rs +++ b/src/mem_store/partition.rs @@ -328,6 +328,10 @@ impl ColumnHandle { self.load_scheduled.load(Ordering::SeqCst) } + pub fn set_load_scheduled(&self, load_scheduled: bool) { + self.load_scheduled.store(load_scheduled, Ordering::SeqCst); + } + pub fn key(&self) -> &ColumnLocator { &self.key } diff --git a/src/scheduler/disk_read_scheduler.rs b/src/scheduler/disk_read_scheduler.rs index 8223379..45c0bcf 100644 --- a/src/scheduler/disk_read_scheduler.rs +++ b/src/scheduler/disk_read_scheduler.rs @@ -3,6 +3,7 @@ use std::collections::VecDeque; use std::collections::{HashMap, HashSet}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Condvar, Mutex, RwLock}; use std_semaphore::Semaphore; @@ -18,6 +19,8 @@ pub struct DiskReadScheduler { reader_semaphore: Semaphore, lru: Lru, lz4_decode: bool, + // Maps (TableName, PartitionID) to whether a load is scheduled for that partition. + load_scheduled: RwLock>, background_load_wait_queue: Condvar, background_load_in_progress: Mutex, @@ -45,6 +48,7 @@ impl DiskReadScheduler { lz4_decode, background_load_wait_queue: Condvar::default(), background_load_in_progress: Mutex::default(), + load_scheduled: RwLock::default(), } } @@ -56,6 +60,19 @@ impl DiskReadScheduler { cols: &RwLock>>, perf_counter: &QueryPerfCounter, ) -> Option> { + let partition_handle = (handle.table().to_string(), handle.id()); + if !self + .load_scheduled + .read() + .unwrap() + .contains_key(&partition_handle) + { + self.load_scheduled + .write() + .unwrap() + .insert(partition_handle.clone(), AtomicBool::new(false)); + } + loop { // Empty marker if handle.is_empty() { @@ -77,9 +94,12 @@ impl DiskReadScheduler { } // Load for column is already scheduled, wait for it to complete. // TODO: this doesn't do anything currently, was only used by sequential disk reads. should check whether relevant subpartition is currently being loaded. - } else if handle.is_load_scheduled() { + } else if self.is_load_scheduled(&partition_handle) { let mut is_load_in_progress = self.background_load_in_progress.lock().unwrap(); - while *is_load_in_progress && !handle.is_resident() && handle.is_load_scheduled() { + while *is_load_in_progress + && !handle.is_resident() + && self.is_load_scheduled(&partition_handle) + { debug!("Queuing for {}.{}", handle.name(), handle.id()); is_load_in_progress = self .background_load_wait_queue @@ -91,6 +111,22 @@ impl DiskReadScheduler { // TODO: ensure same partition isn't being loaded multiple times debug!("Point lookup for {}.{}", handle.name(), handle.id()); let columns = { + { + let mut load_scheduled = self.load_scheduled.write().unwrap(); + if load_scheduled + .get(&partition_handle) + .unwrap() + .load(Ordering::SeqCst) + { + continue; + } else { + load_scheduled + .get_mut(&partition_handle) + .unwrap() + .store(true, Ordering::SeqCst); + } + } + let _token = self.reader_semaphore.access(); match self.disk_store.load_column( &handle.key().table, @@ -142,6 +178,7 @@ impl DiskReadScheduler { handle.id(), handle.name(), ); + self.load_scheduled.read().unwrap().get(&partition_handle).unwrap().store(false, Ordering::SeqCst); match result { Some(column) => return Some(column), None => handle.set_empty(), @@ -171,6 +208,15 @@ impl DiskReadScheduler { } } + pub fn is_load_scheduled(&self, partition_key: &(String, PartitionID)) -> bool { + self.load_scheduled + .read() + .unwrap() + .get(partition_key) + .unwrap() + .load(Ordering::SeqCst) + } + fn service_sequential_read(&self, run: &DiskRun, ldb: &InnerLocustDB) { let _token = self.reader_semaphore.access(); debug!("Servicing read: {:?}", &run);