Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/mem_store/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,10 @@ impl ColumnHandle {
self.load_scheduled.load(Ordering::SeqCst)
}

pub fn set_load_scheduled(&self, load_scheduled: bool) {
self.load_scheduled.store(load_scheduled, Ordering::SeqCst);
}

pub fn key(&self) -> &ColumnLocator {
&self.key
}
Expand Down
50 changes: 48 additions & 2 deletions src/scheduler/disk_read_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use std::collections::VecDeque;
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std_semaphore::Semaphore;

Expand All @@ -18,6 +19,8 @@ pub struct DiskReadScheduler {
reader_semaphore: Semaphore,
lru: Lru,
lz4_decode: bool,
// Maps (TableName, PartitionID) to whether a load is scheduled for that partition.
load_scheduled: RwLock<HashMap<(String, PartitionID), AtomicBool>>,

background_load_wait_queue: Condvar,
background_load_in_progress: Mutex<bool>,
Expand Down Expand Up @@ -45,6 +48,7 @@ impl DiskReadScheduler {
lz4_decode,
background_load_wait_queue: Condvar::default(),
background_load_in_progress: Mutex::default(),
load_scheduled: RwLock::default(),
}
}

Expand All @@ -56,6 +60,19 @@ impl DiskReadScheduler {
cols: &RwLock<HashMap<String, Arc<ColumnHandle>>>,
perf_counter: &QueryPerfCounter,
) -> Option<Arc<Column>> {
let partition_handle = (handle.table().to_string(), handle.id());
if !self
.load_scheduled
.read()
.unwrap()
.contains_key(&partition_handle)
{
self.load_scheduled
.write()
.unwrap()
.insert(partition_handle.clone(), AtomicBool::new(false));
}

loop {
// Empty marker
if handle.is_empty() {
Expand All @@ -77,9 +94,12 @@ impl DiskReadScheduler {
}
// Load for column is already scheduled, wait for it to complete.
// TODO: this doesn't do anything currently, was only used by sequential disk reads. should check whether relevant subpartition is currently being loaded.
} else if handle.is_load_scheduled() {
} else if self.is_load_scheduled(&partition_handle) {
let mut is_load_in_progress = self.background_load_in_progress.lock().unwrap();
while *is_load_in_progress && !handle.is_resident() && handle.is_load_scheduled() {
while *is_load_in_progress
&& !handle.is_resident()
&& self.is_load_scheduled(&partition_handle)
{
debug!("Queuing for {}.{}", handle.name(), handle.id());
is_load_in_progress = self
.background_load_wait_queue
Expand All @@ -91,6 +111,22 @@ impl DiskReadScheduler {
// TODO: ensure same partition isn't being loaded multiple times
debug!("Point lookup for {}.{}", handle.name(), handle.id());
let columns = {
{
let mut load_scheduled = self.load_scheduled.write().unwrap();
if load_scheduled
.get(&partition_handle)
.unwrap()
.load(Ordering::SeqCst)
{
continue;
} else {
load_scheduled
.get_mut(&partition_handle)
.unwrap()
.store(true, Ordering::SeqCst);
}
}

let _token = self.reader_semaphore.access();
match self.disk_store.load_column(
&handle.key().table,
Expand Down Expand Up @@ -142,6 +178,7 @@ impl DiskReadScheduler {
handle.id(),
handle.name(),
);
self.load_scheduled.read().unwrap().get(&partition_handle).unwrap().store(false, Ordering::SeqCst);
match result {
Some(column) => return Some(column),
None => handle.set_empty(),
Expand Down Expand Up @@ -171,6 +208,15 @@ impl DiskReadScheduler {
}
}

pub fn is_load_scheduled(&self, partition_key: &(String, PartitionID)) -> bool {
self.load_scheduled
.read()
.unwrap()
.get(partition_key)
.unwrap()
.load(Ordering::SeqCst)
}

fn service_sequential_read(&self, run: &DiskRun, ldb: &InnerLocustDB) {
let _token = self.reader_semaphore.access();
debug!("Servicing read: {:?}", &run);
Expand Down
Loading