diff --git a/src/metrics.rs b/src/metrics.rs index abacae7..e735b24 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -27,6 +27,17 @@ lazy_static! { ) .unwrap(); + /// The count of active workers. + pub static ref ACTIVE_WORKERS_COUNT: HistogramVec = HistogramVec::new( + histogram_opts!( + "yatp_active_workers_count", + "the count of backup workers", + vec![1.0, 2.0, 4.0, 6.0, 8.0, 12.0, 16.0, 24.0, 32.0, 48.0, 64.0, 96.0, 128.0] + ), + &["name"] + ) + .unwrap(); + static ref NAMESPACE: Mutex> = Mutex::new(None); } diff --git a/src/pool/builder.rs b/src/pool/builder.rs index a49c288..e30c6c3 100644 --- a/src/pool/builder.rs +++ b/src/pool/builder.rs @@ -1,5 +1,6 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. +use crate::metrics::*; use crate::pool::spawn::QueueCore; use crate::pool::worker::WorkerThread; use crate::pool::{CloneRunnerBuilder, Local, Remote, Runner, RunnerBuilder, ThreadPool}; @@ -76,6 +77,9 @@ where F::Runner: Runner + Send + 'static, { let mut threads = Vec::with_capacity(self.builder.sched_config.max_thread_count); + let active_count_histogram = ACTIVE_WORKERS_COUNT + .get_metric_with_label_values(&[&self.builder.name_prefix]) + .unwrap(); for (i, local_queue) in self.local_queues.into_iter().enumerate() { let runner = factory.build(); let name = format!("{}-{}", self.builder.name_prefix, i); @@ -83,7 +87,12 @@ where if let Some(size) = self.builder.stack_size { builder = builder.stack_size(size) } - let local = Local::new(i + 1, local_queue, self.core.clone()); + let local = Local::new( + i + 1, + local_queue, + self.core.clone(), + active_count_histogram.clone(), + ); let thd = WorkerThread::new(local, runner); threads.push( builder diff --git a/src/pool/spawn.rs b/src/pool/spawn.rs index 8f06417..bf9bc16 100644 --- a/src/pool/spawn.rs +++ b/src/pool/spawn.rs @@ -8,34 +8,99 @@ use crate::pool::SchedConfig; use crate::queue::{Extras, LocalQueue, Pop, TaskCell, TaskInjector, WithExtras}; use fail::fail_point; use parking_lot_core::{ParkResult, ParkToken, UnparkToken}; -use std::sync::atomic::{AtomicUsize, Ordering}; +use prometheus::{Histogram, HistogramOpts}; +use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering}; use std::sync::Arc; -/// An usize is used to trace the threads that are working actively. -/// To save additional memory and atomic operation, the number and -/// shutdown hint are merged into one number in the following format -/// ```text -/// 0...00 -/// ^ ^ -/// | The least significant bit indicates whether the queue is shutting down. -/// Bits represent the thread count -/// ``` -const SHUTDOWN_BIT: usize = 1; -const WORKER_COUNT_SHIFT: usize = 1; -const WORKER_COUNT_BASE: usize = 2; - -/// Checks if shutdown bit is set. -pub fn is_shutdown(cnt: usize) -> bool { - cnt & SHUTDOWN_BIT == SHUTDOWN_BIT +/// A u64 is used to trace the states of the workers. +/// The highest 16 bits represent the running workers count. +/// The 17-32 bits represent the active workers count. +/// The 33-48 bits represent the backup workers count. +/// The lowest 16 bits are used to save other flags, currently only the lowest +/// bit is used to represent whether the pool has been shut down. +trait WorkersInfo { + fn new(workers_count: usize) -> Self; + + fn is_shutdown(self) -> bool; + + fn running_count(self) -> usize; + + fn active_count(self) -> usize; + + fn backup_count(self) -> usize; + + // Change an active worker from not running to running. + fn active_to_running(self) -> Self; + + // Change an running worker to a not running but active worker. + fn running_to_active(self) -> Self; + + // Change an running worker to a backup worker. + fn running_to_backup(self) -> Self; + + // Change a backup worker to a running worker. + fn backup_to_running(self) -> Self; +} + +const SHUTDOWN_BIT: u64 = 1; +const RUNNING_COUNT_SHIFT: u32 = 48; +const ACTIVE_COUNT_SHIFT: u32 = 32; +const BACKUP_COUNT_SHIFT: u32 = 16; +const COUNT_MASK: u64 = 0xFFFF; + +impl WorkersInfo for u64 { + fn new(workers_count: usize) -> u64 { + let workers_count = workers_count as u64; + assert!(workers_count <= COUNT_MASK, "too many workers"); + (workers_count << ACTIVE_COUNT_SHIFT) | (workers_count << RUNNING_COUNT_SHIFT) + } + + fn is_shutdown(self) -> bool { + self & SHUTDOWN_BIT == SHUTDOWN_BIT + } + + fn running_count(self) -> usize { + ((self >> RUNNING_COUNT_SHIFT) & COUNT_MASK) as usize + } + + fn active_count(self) -> usize { + ((self >> ACTIVE_COUNT_SHIFT) & COUNT_MASK) as usize + } + + fn backup_count(self) -> usize { + ((self >> BACKUP_COUNT_SHIFT) & COUNT_MASK) as usize + } + + fn active_to_running(self) -> Self { + debug_assert!(self.is_shutdown() || self.running_count() < self.active_count()); + self + (1 << RUNNING_COUNT_SHIFT) + } + + fn running_to_active(self) -> Self { + debug_assert!(self.is_shutdown() || self.running_count() > 0); + self - (1 << RUNNING_COUNT_SHIFT) + } + + fn running_to_backup(self) -> Self { + self - (1 << RUNNING_COUNT_SHIFT) - (1 << ACTIVE_COUNT_SHIFT) + (1 << BACKUP_COUNT_SHIFT) + } + + fn backup_to_running(self) -> Self { + self + (1 << RUNNING_COUNT_SHIFT) + (1 << ACTIVE_COUNT_SHIFT) - (1 << BACKUP_COUNT_SHIFT) + } } +const BACKUP_THRESHOLD: i64 = 64; + /// The core of queues. /// /// Every thread pool instance should have one and only `QueueCore`. It's /// saved in an `Arc` and shared between all worker threads and remote handles. pub(crate) struct QueueCore { global_queue: TaskInjector, - active_workers: AtomicUsize, + workers_info: AtomicU64, + backup_count: AtomicI64, + notified: AtomicBool, config: SchedConfig, } @@ -43,7 +108,9 @@ impl QueueCore { pub fn new(global_queue: TaskInjector, config: SchedConfig) -> QueueCore { QueueCore { global_queue, - active_workers: AtomicUsize::new(config.max_thread_count << WORKER_COUNT_SHIFT), + workers_info: AtomicU64::new(WorkersInfo::new(config.max_thread_count)), + backup_count: AtomicI64::new(0), + notified: AtomicBool::new(false), config, } } @@ -53,14 +120,85 @@ impl QueueCore { /// If the method is going to wake up any threads, source is used to trace who triggers /// the action. pub fn ensure_workers(&self, source: usize) { - let cnt = self.active_workers.load(Ordering::SeqCst); - if (cnt >> WORKER_COUNT_SHIFT) >= self.config.max_thread_count || is_shutdown(cnt) { + let workers_info = self.workers_info.load(Ordering::SeqCst); + if workers_info.is_shutdown() { return; } - let addr = self as *const QueueCore as usize; + if workers_info.active_count() < self.config.min_thread_count { + self.unpark_one(true, source); + } else if workers_info.running_count() < self.config.min_thread_count { + self.unpark_one(false, source); + } else if source != 0 { + if workers_info.running_count() == workers_info.active_count() { + let mut backup_count = self.backup_count.load(Ordering::SeqCst); + if backup_count > 0 { + self.backup_count.store(0, Ordering::SeqCst); + } else { + backup_count = self.backup_count.fetch_sub(1, Ordering::SeqCst) - 1; + while backup_count < -BACKUP_THRESHOLD { + match self.backup_count.compare_exchange_weak( + backup_count, + 0, + Ordering::SeqCst, + Ordering::SeqCst, + ) { + Ok(_) => { + self.unpark_one(true, source); + break; + } + Err(actual) => backup_count = actual, + } + } + } + } else if workers_info.running_count() + 1 < workers_info.active_count() { + let backup_count = self.backup_count.load(Ordering::SeqCst); + if backup_count < 0 { + self.backup_count.store(0, Ordering::SeqCst) + } else { + self.backup_count.fetch_add(1, Ordering::SeqCst); + } + self.unpark_one(false, source); + } else if workers_info.running_count() < workers_info.active_count() { + self.unpark_one(false, source); + } + } else if workers_info.running_count() < workers_info.active_count() { + if !self + .notified + .compare_and_swap(false, true, Ordering::SeqCst) + { + self.unpark_one(false, source); + } + } + } + + fn unpark_one(&self, backup: bool, source: usize) { unsafe { - parking_lot_core::unpark_one(addr, |_| UnparkToken(source)); + parking_lot_core::unpark_one(self.park_address(backup), |_| UnparkToken(source)); + } + } + + pub fn park_to_backup(&self) -> bool { + let mut v = self.backup_count.load(Ordering::SeqCst); + loop { + if v < BACKUP_THRESHOLD { + return false; + } + match self + .backup_count + .compare_exchange_weak(v, 0, Ordering::SeqCst, Ordering::SeqCst) + { + Ok(_) => return true, + Err(actual) => v = actual, + } + } + } + + pub fn park_address(&self, backup: bool) -> usize { + if backup { + self as *const QueueCore as usize + 1 + } else { + self as *const QueueCore as usize } } @@ -68,53 +206,87 @@ impl QueueCore { /// /// `source` is used to trace who triggers the action. pub fn mark_shutdown(&self, source: usize) { - self.active_workers.fetch_or(SHUTDOWN_BIT, Ordering::SeqCst); - let addr = self as *const QueueCore as usize; + self.workers_info.fetch_or(1, Ordering::SeqCst); unsafe { - parking_lot_core::unpark_all(addr, UnparkToken(source)); + parking_lot_core::unpark_all(self.park_address(false), UnparkToken(source)); + parking_lot_core::unpark_all(self.park_address(true), UnparkToken(source)); } } /// Checks if the thread pool is shutting down. pub fn is_shutdown(&self) -> bool { - let cnt = self.active_workers.load(Ordering::SeqCst); - is_shutdown(cnt) + self.workers_info.load(Ordering::SeqCst).is_shutdown() } /// Marks the current thread in sleep state. /// /// It can be marked as sleep only when the pool is not shutting down. - pub fn mark_sleep(&self) -> bool { - let mut cnt = self.active_workers.load(Ordering::SeqCst); + pub fn mark_sleep(&self, backup: bool, active_count_histogram: &Histogram) -> bool { + if !backup + && self + .notified + .compare_and_swap(true, false, Ordering::SeqCst) + { + return false; + } + let mut workers_info = self.workers_info.load(Ordering::SeqCst); loop { - if is_shutdown(cnt) { + if workers_info.is_shutdown() { return false; } - match self.active_workers.compare_exchange_weak( - cnt, - cnt - WORKER_COUNT_BASE, + let new_info = if backup { + workers_info.running_to_backup() + } else { + workers_info.running_to_active() + }; + match self.workers_info.compare_exchange_weak( + workers_info, + new_info, Ordering::SeqCst, Ordering::SeqCst, ) { - Ok(_) => return true, - Err(n) => cnt = n, + Ok(_) => { + if workers_info.running_count() < workers_info.active_count() { + let backup_count = self.backup_count.load(Ordering::SeqCst); + if backup_count < 0 { + self.backup_count.store(0, Ordering::SeqCst) + } else { + self.backup_count.fetch_add(1, Ordering::SeqCst); + } + } + if backup { + active_count_histogram.observe((workers_info.active_count() - 1) as f64); + } + return true; + } + Err(actual) => workers_info = actual, } } } /// Marks current thread as woken up states. - pub fn mark_woken(&self) { - let mut cnt = self.active_workers.load(Ordering::SeqCst); + pub fn mark_woken(&self, backup: bool) { + if !backup { + self.notified.store(false, Ordering::SeqCst); + } + let mut workers_info = self.workers_info.load(Ordering::SeqCst); loop { - match self.active_workers.compare_exchange_weak( - cnt, - cnt + WORKER_COUNT_BASE, + let new_info = if backup { + workers_info.backup_to_running() + } else { + workers_info.active_to_running() + }; + match self.workers_info.compare_exchange_weak( + workers_info, + new_info, Ordering::SeqCst, Ordering::SeqCst, ) { - Ok(_) => return, - Err(n) => cnt = n, + Ok(_) => { + return; + } + Err(actual) => workers_info = actual, } } } @@ -180,17 +352,24 @@ impl AssertSend for Remote {} /// instead of global queue, so new tasks can take advantage of cache /// coherence. pub struct Local { - id: usize, + pub(crate) id: usize, local_queue: LocalQueue, core: Arc>, + active_count_histogram: Histogram, } impl Local { - pub(crate) fn new(id: usize, local_queue: LocalQueue, core: Arc>) -> Local { + pub(crate) fn new( + id: usize, + local_queue: LocalQueue, + core: Arc>, + active_count_histogram: Histogram, + ) -> Local { Local { id, local_queue, core, + active_count_histogram, } } @@ -226,17 +405,19 @@ impl Local { /// If there are no tasks at the moment, it will go to sleep until woken /// up by other threads. pub(crate) fn pop_or_sleep(&mut self) -> Option> { - let address = &*self.core as *const QueueCore as usize; + let backup = self.core.park_to_backup(); let mut task = None; let id = self.id; + let mut marked_sleep = false; let res = unsafe { parking_lot_core::park( - address, + self.core.park_address(backup), || { - if !self.core.mark_sleep() { + if !self.core.mark_sleep(backup, &self.active_count_histogram) { return false; } + marked_sleep = true; task = self.local_queue.pop(); task.is_none() }, @@ -248,7 +429,10 @@ impl Local { }; match res { ParkResult::Unparked(_) | ParkResult::Invalid => { - self.core.mark_woken(); + if marked_sleep { + self.core.mark_woken(backup); + } + self.core.ensure_workers(id); task } ParkResult::TimedOut => unreachable!(), @@ -278,10 +462,11 @@ where let queue_type = queue_type.into(); let (global, locals) = crate::queue::build(queue_type, config.max_thread_count); let core = Arc::new(QueueCore::new(global, config)); + let backup_counter = Histogram::with_opts(HistogramOpts::new("_", "_")).unwrap(); let l = locals .into_iter() .enumerate() - .map(|(i, l)| Local::new(i + 1, l, core.clone())) + .map(|(i, l)| Local::new(i + 1, l, core.clone(), backup_counter.clone())) .collect(); let g = Remote::new(core); (g, l) diff --git a/src/pool/worker.rs b/src/pool/worker.rs index 9ccd94e..ec9b72c 100644 --- a/src/pool/worker.rs +++ b/src/pool/worker.rs @@ -2,7 +2,7 @@ use crate::pool::{Local, Runner}; use crate::queue::{Pop, TaskCell}; -use parking_lot_core::SpinWait; +use std::thread; pub(crate) struct WorkerThread { local: Local, @@ -22,14 +22,20 @@ where { #[inline] fn pop(&mut self) -> Option> { - // Wait some time before going to sleep, which is more expensive. - let mut spin = SpinWait::new(); - loop { + for counter in 0..10 { if let Some(t) = self.local.pop() { + // if t.schedule_time.elapsed() > Duration::from_millis(1) { + // self.local.core().unpark_one(true, self.local.id); + // } + self.local.core().ensure_workers(self.local.id); return Some(t); } - if !spin.spin() { - break; + if counter < 3 { + for _ in 0..(1 << counter) { + std::sync::atomic::spin_loop_hint(); + } + } else { + thread::yield_now(); } } self.runner.pause(&mut self.local); @@ -58,7 +64,7 @@ mod tests { use crate::queue::QueueType; use crate::task::callback; use std::sync::*; - use std::time::*; + use std::time::Duration; #[derive(Default, PartialEq, Debug)] struct Metrics { @@ -121,7 +127,9 @@ mod tests { }; let metrics = r.metrics.clone(); let mut expected_metrics = Metrics::default(); - let (injector, mut locals) = build_spawn(QueueType::SingleLevel, Default::default()); + let mut config = crate::pool::SchedConfig::default(); + config.max_thread_count = 1; + let (injector, mut locals) = build_spawn(QueueType::SingleLevel, config); let th = WorkerThread::new(locals.remove(0), r); let handle = std::thread::spawn(move || { th.run();