From 924fbde14fe5d1870185b9af1577a335e5c3857e Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Mon, 18 May 2020 19:18:28 +0800 Subject: [PATCH 01/12] add backup Signed-off-by: Yilin Chen --- src/pool/spawn.rs | 158 +++++++++++++++++++++++++++++++++++++--------- 1 file changed, 128 insertions(+), 30 deletions(-) diff --git a/src/pool/spawn.rs b/src/pool/spawn.rs index 8f06417..a35cd4e 100644 --- a/src/pool/spawn.rs +++ b/src/pool/spawn.rs @@ -8,7 +8,7 @@ use crate::pool::SchedConfig; use crate::queue::{Extras, LocalQueue, Pop, TaskCell, TaskInjector, WithExtras}; use fail::fail_point; use parking_lot_core::{ParkResult, ParkToken, UnparkToken}; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering}; use std::sync::Arc; /// An usize is used to trace the threads that are working actively. @@ -29,21 +29,53 @@ pub fn is_shutdown(cnt: usize) -> bool { cnt & SHUTDOWN_BIT == SHUTDOWN_BIT } +// 1 bit: shutdown +// 16 bits: running +// 16 bits: active +// 16 bits: backup +trait WorkersInfo { + fn is_shutdown(self) -> bool; + fn running_count(self) -> usize; + fn active_count(self) -> usize; + fn backup_count(self) -> usize; +} + +impl WorkersInfo for u64 { + fn is_shutdown(self) -> bool { + self & 1 == 1 + } + + fn running_count(self) -> usize { + ((self >> 1) & (u16::max_value() as u64)) as usize + } + + fn active_count(self) -> usize { + ((self >> 17) & (u16::max_value() as u64)) as usize + } + + fn backup_count(self) -> usize { + ((self >> 33) & (u16::max_value() as u64)) as usize + } +} + /// The core of queues. /// /// Every thread pool instance should have one and only `QueueCore`. It's /// saved in an `Arc` and shared between all worker threads and remote handles. pub(crate) struct QueueCore { global_queue: TaskInjector, - active_workers: AtomicUsize, + workers_info: AtomicU64, + backup_countdown: AtomicU8, config: SchedConfig, } impl QueueCore { pub fn new(global_queue: TaskInjector, config: SchedConfig) -> QueueCore { + let thread_count = config.max_thread_count as u64; QueueCore { global_queue, - active_workers: AtomicUsize::new(config.max_thread_count << WORKER_COUNT_SHIFT), + workers_info: AtomicU64::new(((thread_count << 17) | (thread_count << 1)) as u64), + backup_countdown: AtomicU8::new(8), config, } } @@ -53,14 +85,48 @@ impl QueueCore { /// If the method is going to wake up any threads, source is used to trace who triggers /// the action. pub fn ensure_workers(&self, source: usize) { - let cnt = self.active_workers.load(Ordering::SeqCst); - if (cnt >> WORKER_COUNT_SHIFT) >= self.config.max_thread_count || is_shutdown(cnt) { + let workers_info = self.workers_info.load(Ordering::SeqCst); + if workers_info.is_shutdown() { return; } + if workers_info.running_count() == workers_info.active_count() + && workers_info.backup_count() > 0 + { + // println!( + // "unpark backup, running: {}, active: {}, backup: {}", + // workers_info.running_count(), + // workers_info.active_count(), + // workers_info.backup_count() + // ); + self.unpark_one(true, source); + } else { + // println!( + // "unpark active, running: {}, active: {}, backup: {}", + // workers_info.running_count(), + // workers_info.active_count(), + // workers_info.backup_count() + // ); + self.unpark_one(false, source); + } + } - let addr = self as *const QueueCore as usize; + fn unpark_one(&self, backup: bool, source: usize) { unsafe { - parking_lot_core::unpark_one(addr, |_| UnparkToken(source)); + parking_lot_core::unpark_one(self.park_address(backup), |_| UnparkToken(source)); + } + } + + pub fn park_to_backup(&self) -> bool { + self.backup_countdown + .compare_and_swap(0, 8, Ordering::SeqCst) + == 0 + } + + pub fn park_address(&self, backup: bool) -> usize { + if backup { + self as *const QueueCore as usize + 1 + } else { + self as *const QueueCore as usize } } @@ -68,53 +134,85 @@ impl QueueCore { /// /// `source` is used to trace who triggers the action. pub fn mark_shutdown(&self, source: usize) { - self.active_workers.fetch_or(SHUTDOWN_BIT, Ordering::SeqCst); - let addr = self as *const QueueCore as usize; + self.workers_info.fetch_or(1, Ordering::SeqCst); unsafe { - parking_lot_core::unpark_all(addr, UnparkToken(source)); + parking_lot_core::unpark_all(self.park_address(false), UnparkToken(source)); + parking_lot_core::unpark_all(self.park_address(true), UnparkToken(source)); } } /// Checks if the thread pool is shutting down. pub fn is_shutdown(&self) -> bool { - let cnt = self.active_workers.load(Ordering::SeqCst); - is_shutdown(cnt) + self.workers_info.load(Ordering::SeqCst).is_shutdown() } /// Marks the current thread in sleep state. /// /// It can be marked as sleep only when the pool is not shutting down. - pub fn mark_sleep(&self) -> bool { - let mut cnt = self.active_workers.load(Ordering::SeqCst); + pub fn mark_sleep(&self, backup: bool) -> bool { + let mut workers_info = self.workers_info.load(Ordering::SeqCst); loop { - if is_shutdown(cnt) { + if workers_info.is_shutdown() { return false; } - match self.active_workers.compare_exchange_weak( - cnt, - cnt - WORKER_COUNT_BASE, + let new_info = if backup { + ((workers_info.backup_count() as u64 + 1) << 33) + | ((workers_info.active_count() as u64 - 1) << 17) + | ((workers_info.running_count() as u64 - 1) << 1) + } else { + ((workers_info.backup_count() as u64) << 33) + | ((workers_info.active_count() as u64) << 17) + | ((workers_info.running_count() as u64 - 1) << 1) + }; + match self.workers_info.compare_exchange_weak( + workers_info, + new_info, Ordering::SeqCst, Ordering::SeqCst, ) { - Ok(_) => return true, - Err(n) => cnt = n, + Ok(_) => { + if workers_info.running_count() + 1 < workers_info.active_count() { + let mut countdown = self.backup_countdown.load(Ordering::Relaxed); + while let Err(actual) = self.backup_countdown.compare_exchange_weak( + countdown, + countdown.saturating_sub(1), + Ordering::SeqCst, + Ordering::SeqCst, + ) { + countdown = actual; + } + } + return true; + } + Err(actual) => workers_info = actual, } } } /// Marks current thread as woken up states. - pub fn mark_woken(&self) { - let mut cnt = self.active_workers.load(Ordering::SeqCst); + pub fn mark_woken(&self, backup: bool) { + let mut workers_info = self.workers_info.load(Ordering::SeqCst); loop { - match self.active_workers.compare_exchange_weak( - cnt, - cnt + WORKER_COUNT_BASE, + let new_info = if backup { + ((workers_info.backup_count() as u64 - 1) << 33) + | ((workers_info.active_count() as u64 + 1) << 17) + | ((workers_info.running_count() as u64 + 1) << 1) + | workers_info.is_shutdown() as u64 + } else { + ((workers_info.backup_count() as u64) << 33) + | ((workers_info.active_count() as u64) << 17) + | ((workers_info.running_count() as u64 + 1) << 1) + | workers_info.is_shutdown() as u64 + }; + match self.workers_info.compare_exchange_weak( + workers_info, + new_info, Ordering::SeqCst, Ordering::SeqCst, ) { Ok(_) => return, - Err(n) => cnt = n, + Err(actual) => workers_info = actual, } } } @@ -226,15 +324,15 @@ impl Local { /// If there are no tasks at the moment, it will go to sleep until woken /// up by other threads. pub(crate) fn pop_or_sleep(&mut self) -> Option> { - let address = &*self.core as *const QueueCore as usize; + let backup = self.core.park_to_backup(); let mut task = None; let id = self.id; let res = unsafe { parking_lot_core::park( - address, + self.core.park_address(backup), || { - if !self.core.mark_sleep() { + if !self.core.mark_sleep(backup) { return false; } task = self.local_queue.pop(); @@ -248,7 +346,7 @@ impl Local { }; match res { ParkResult::Unparked(_) | ParkResult::Invalid => { - self.core.mark_woken(); + self.core.mark_woken(backup); task } ParkResult::TimedOut => unreachable!(), From aec4bedc6b0a4cffe0adfe715ab5b53221ab8238 Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Mon, 18 May 2020 21:30:03 +0800 Subject: [PATCH 02/12] sleep longer Signed-off-by: Yilin Chen --- src/pool/spawn.rs | 17 +++++++++++++++-- src/pool/worker.rs | 17 +++++++++++++---- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/src/pool/spawn.rs b/src/pool/spawn.rs index a35cd4e..7247e91 100644 --- a/src/pool/spawn.rs +++ b/src/pool/spawn.rs @@ -8,7 +8,7 @@ use crate::pool::SchedConfig; use crate::queue::{Extras, LocalQueue, Pop, TaskCell, TaskInjector, WithExtras}; use fail::fail_point; use parking_lot_core::{ParkResult, ParkToken, UnparkToken}; -use std::sync::atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering}; use std::sync::Arc; /// An usize is used to trace the threads that are working actively. @@ -66,6 +66,7 @@ pub(crate) struct QueueCore { global_queue: TaskInjector, workers_info: AtomicU64, backup_countdown: AtomicU8, + idling: AtomicBool, config: SchedConfig, } @@ -76,6 +77,7 @@ impl QueueCore { global_queue, workers_info: AtomicU64::new(((thread_count << 17) | (thread_count << 1)) as u64), backup_countdown: AtomicU8::new(8), + idling: AtomicBool::new(false), config, } } @@ -89,8 +91,10 @@ impl QueueCore { if workers_info.is_shutdown() { return; } + let idling = self.idling.load(Ordering::SeqCst); if workers_info.running_count() == workers_info.active_count() && workers_info.backup_count() > 0 + && !idling { // println!( // "unpark backup, running: {}, active: {}, backup: {}", @@ -98,8 +102,9 @@ impl QueueCore { // workers_info.active_count(), // workers_info.backup_count() // ); + // println!("unpark backup"); self.unpark_one(true, source); - } else { + } else if !idling { // println!( // "unpark active, running: {}, active: {}, backup: {}", // workers_info.running_count(), @@ -146,6 +151,14 @@ impl QueueCore { self.workers_info.load(Ordering::SeqCst).is_shutdown() } + pub fn mark_idling(&self) -> bool { + !self.idling.compare_and_swap(false, true, Ordering::SeqCst) + } + + pub fn unmark_idling(&self) { + self.idling.store(false, Ordering::SeqCst); + } + /// Marks the current thread in sleep state. /// /// It can be marked as sleep only when the pool is not shutting down. diff --git a/src/pool/worker.rs b/src/pool/worker.rs index 9ccd94e..0bcf40f 100644 --- a/src/pool/worker.rs +++ b/src/pool/worker.rs @@ -3,6 +3,8 @@ use crate::pool::{Local, Runner}; use crate::queue::{Pop, TaskCell}; use parking_lot_core::SpinWait; +use std::thread; +use std::time::Duration; pub(crate) struct WorkerThread { local: Local, @@ -22,13 +24,20 @@ where { #[inline] fn pop(&mut self) -> Option> { - // Wait some time before going to sleep, which is more expensive. - let mut spin = SpinWait::new(); - loop { + let idling = self.local.core().mark_idling(); + let mut counter = 0; + while idling { if let Some(t) = self.local.pop() { + self.local.core().unmark_idling(); return Some(t); } - if !spin.spin() { + counter += 1; + if counter < 3 { + thread::yield_now(); + } else if counter < 100 { + thread::sleep(Duration::from_micros(10)); + } else { + self.local.core().unmark_idling(); break; } } From dba1a5106c88aa748d3330206a10b4bbbb95d229 Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Mon, 18 May 2020 21:47:37 +0800 Subject: [PATCH 03/12] add log Signed-off-by: Yilin Chen --- src/pool/spawn.rs | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/src/pool/spawn.rs b/src/pool/spawn.rs index 7247e91..906d879 100644 --- a/src/pool/spawn.rs +++ b/src/pool/spawn.rs @@ -96,21 +96,23 @@ impl QueueCore { && workers_info.backup_count() > 0 && !idling { - // println!( - // "unpark backup, running: {}, active: {}, backup: {}", - // workers_info.running_count(), - // workers_info.active_count(), - // workers_info.backup_count() - // ); + eprintln!( + "{:?} unpark backup, running: {}, active: {}, backup: {}", + std::time::SystemTime::now(), + workers_info.running_count(), + workers_info.active_count(), + workers_info.backup_count() + ); // println!("unpark backup"); self.unpark_one(true, source); } else if !idling { - // println!( - // "unpark active, running: {}, active: {}, backup: {}", - // workers_info.running_count(), - // workers_info.active_count(), - // workers_info.backup_count() - // ); + eprintln!( + "{:?} unpark active, running: {}, active: {}, backup: {}", + std::time::SystemTime::now(), + workers_info.running_count(), + workers_info.active_count(), + workers_info.backup_count() + ); self.unpark_one(false, source); } } @@ -185,7 +187,7 @@ impl QueueCore { Ordering::SeqCst, ) { Ok(_) => { - if workers_info.running_count() + 1 < workers_info.active_count() { + if !backup && workers_info.running_count() + 1 < workers_info.active_count() { let mut countdown = self.backup_countdown.load(Ordering::Relaxed); while let Err(actual) = self.backup_countdown.compare_exchange_weak( countdown, @@ -195,6 +197,11 @@ impl QueueCore { ) { countdown = actual; } + eprintln!( + "{:?} sleep, backup: {}", + std::time::SystemTime::now(), + backup + ); } return true; } From ddd3bed6f2511e9f97b467e663f93ab04637e5d0 Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Mon, 18 May 2020 22:49:01 +0800 Subject: [PATCH 04/12] active countdown Signed-off-by: Yilin Chen --- src/pool/spawn.rs | 123 +++++++++++++++++++++++++++++---------------- src/pool/worker.rs | 1 + 2 files changed, 82 insertions(+), 42 deletions(-) diff --git a/src/pool/spawn.rs b/src/pool/spawn.rs index 906d879..f607047 100644 --- a/src/pool/spawn.rs +++ b/src/pool/spawn.rs @@ -8,7 +8,7 @@ use crate::pool::SchedConfig; use crate::queue::{Extras, LocalQueue, Pop, TaskCell, TaskInjector, WithExtras}; use fail::fail_point; use parking_lot_core::{ParkResult, ParkToken, UnparkToken}; -use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU16, AtomicU64, AtomicU8, Ordering}; use std::sync::Arc; /// An usize is used to trace the threads that are working actively. @@ -65,6 +65,7 @@ impl WorkersInfo for u64 { pub(crate) struct QueueCore { global_queue: TaskInjector, workers_info: AtomicU64, + active_countdown: AtomicU8, backup_countdown: AtomicU8, idling: AtomicBool, config: SchedConfig, @@ -76,7 +77,8 @@ impl QueueCore { QueueCore { global_queue, workers_info: AtomicU64::new(((thread_count << 17) | (thread_count << 1)) as u64), - backup_countdown: AtomicU8::new(8), + active_countdown: AtomicU8::new(0), + backup_countdown: AtomicU8::new(0), idling: AtomicBool::new(false), config, } @@ -91,30 +93,56 @@ impl QueueCore { if workers_info.is_shutdown() { return; } - let idling = self.idling.load(Ordering::SeqCst); - if workers_info.running_count() == workers_info.active_count() - && workers_info.backup_count() > 0 - && !idling - { - eprintln!( - "{:?} unpark backup, running: {}, active: {}, backup: {}", - std::time::SystemTime::now(), - workers_info.running_count(), - workers_info.active_count(), - workers_info.backup_count() - ); - // println!("unpark backup"); + if self.idling.load(Ordering::SeqCst) { + return; + } + let mut value = self.active_countdown.load(Ordering::Relaxed); + if workers_info.active_count() < self.config.min_thread_count { self.unpark_one(true, source); - } else if !idling { - eprintln!( - "{:?} unpark active, running: {}, active: {}, backup: {}", - std::time::SystemTime::now(), - workers_info.running_count(), - workers_info.active_count(), - workers_info.backup_count() - ); + } else if source != 0 && workers_info.running_count() == workers_info.active_count() { + loop { + let unpark = value == 0xFF; + let new_value = if unpark { 0 } else { (value << 1) | 1 }; + match self.active_countdown.compare_exchange_weak( + value, + new_value, + Ordering::SeqCst, + Ordering::SeqCst, + ) { + Ok(_) => { + if unpark { + self.unpark_one(true, source); + } + break; + } + Err(actual) => { + value = actual; + } + } + } + } else { self.unpark_one(false, source); } + // if workers_info.running_count() == workers_info.active_count() + // && workers_info.backup_count() > 0 + // { + // // eprintln!( + // // "unpark backup, running: {}, active: {}, backup: {}", + // // workers_info.running_count(), + // // workers_info.active_count(), + // // workers_info.backup_count() + // // ); + // // println!("unpark backup"); + // self.unpark_one(true, source); + // } else { + // // eprintln!( + // // "unpark active, running: {}, active: {}, backup: {}", + // // workers_info.running_count(), + // // workers_info.active_count(), + // // workers_info.backup_count() + // // ); + // self.unpark_one(false, source); + // } } fn unpark_one(&self, backup: bool, source: usize) { @@ -125,8 +153,8 @@ impl QueueCore { pub fn park_to_backup(&self) -> bool { self.backup_countdown - .compare_and_swap(0, 8, Ordering::SeqCst) - == 0 + .compare_and_swap(255, 0, Ordering::SeqCst) + == 255 } pub fn park_address(&self, backup: bool) -> usize { @@ -187,21 +215,23 @@ impl QueueCore { Ordering::SeqCst, ) { Ok(_) => { - if !backup && workers_info.running_count() + 1 < workers_info.active_count() { - let mut countdown = self.backup_countdown.load(Ordering::Relaxed); - while let Err(actual) = self.backup_countdown.compare_exchange_weak( - countdown, - countdown.saturating_sub(1), - Ordering::SeqCst, - Ordering::SeqCst, - ) { - countdown = actual; - } - eprintln!( - "{:?} sleep, backup: {}", - std::time::SystemTime::now(), - backup - ); + let down = + (workers_info.running_count() + 1 < workers_info.active_count()) as u8; + // println!( + // "running: {}, active: {}, backup: {}, down: {}", + // workers_info.running_count(), + // workers_info.active_count(), + // workers_info.backup_count(), + // down + // ); + let mut countdown = self.backup_countdown.load(Ordering::Relaxed); + while let Err(actual) = self.backup_countdown.compare_exchange_weak( + countdown, + (countdown << 1) | down, + Ordering::SeqCst, + Ordering::SeqCst, + ) { + countdown = actual; } return true; } @@ -231,7 +261,16 @@ impl QueueCore { Ordering::SeqCst, Ordering::SeqCst, ) { - Ok(_) => return, + Ok(_) => { + // println!( + // "wake! running: {}, active: {}, backup: {}, backup: {}", + // workers_info.running_count(), + // workers_info.active_count(), + // workers_info.backup_count(), + // backup + // ); + return; + } Err(actual) => workers_info = actual, } } @@ -298,7 +337,7 @@ impl AssertSend for Remote {} /// instead of global queue, so new tasks can take advantage of cache /// coherence. pub struct Local { - id: usize, + pub(crate) id: usize, local_queue: LocalQueue, core: Arc>, } diff --git a/src/pool/worker.rs b/src/pool/worker.rs index 0bcf40f..dadb768 100644 --- a/src/pool/worker.rs +++ b/src/pool/worker.rs @@ -24,6 +24,7 @@ where { #[inline] fn pop(&mut self) -> Option> { + self.local.core().ensure_workers(self.local.id); let idling = self.local.core().mark_idling(); let mut counter = 0; while idling { From 24bc05f700d6f43d81e92936d3febf603519322b Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Mon, 18 May 2020 23:37:21 +0800 Subject: [PATCH 05/12] fix Signed-off-by: Yilin Chen --- src/pool/spawn.rs | 58 +++++++++++++++++++++++++++++++++------------- src/pool/worker.rs | 4 ++-- 2 files changed, 44 insertions(+), 18 deletions(-) diff --git a/src/pool/spawn.rs b/src/pool/spawn.rs index f607047..f36d69d 100644 --- a/src/pool/spawn.rs +++ b/src/pool/spawn.rs @@ -67,6 +67,8 @@ pub(crate) struct QueueCore { workers_info: AtomicU64, active_countdown: AtomicU8, backup_countdown: AtomicU8, + wake_up_backup: AtomicU64, + wake_up_active: AtomicU64, idling: AtomicBool, config: SchedConfig, } @@ -79,6 +81,8 @@ impl QueueCore { workers_info: AtomicU64::new(((thread_count << 17) | (thread_count << 1)) as u64), active_countdown: AtomicU8::new(0), backup_countdown: AtomicU8::new(0), + wake_up_backup: AtomicU64::new(0), + wake_up_active: AtomicU64::new(0), idling: AtomicBool::new(false), config, } @@ -93,16 +97,21 @@ impl QueueCore { if workers_info.is_shutdown() { return; } - if self.idling.load(Ordering::SeqCst) { - return; - } - let mut value = self.active_countdown.load(Ordering::Relaxed); + // if source == 0 { + // println!("backup: {}", workers_info.backup_count()); + // } if workers_info.active_count() < self.config.min_thread_count { self.unpark_one(true, source); - } else if source != 0 && workers_info.running_count() == workers_info.active_count() { + } else if workers_info.running_count() < self.config.min_thread_count { + self.unpark_one(false, source); + } else if source != 0 { + let idling = self.idling.load(Ordering::SeqCst); + let set_bit = + (workers_info.running_count() == workers_info.active_count() && !idling) as u8; + let mut value = self.active_countdown.load(Ordering::Relaxed); loop { let unpark = value == 0xFF; - let new_value = if unpark { 0 } else { (value << 1) | 1 }; + let new_value = if unpark { 0 } else { (value << 1) | set_bit }; match self.active_countdown.compare_exchange_weak( value, new_value, @@ -120,8 +129,9 @@ impl QueueCore { } } } - } else { - self.unpark_one(false, source); + if workers_info.running_count() < workers_info.active_count() && !idling { + self.unpark_one(false, source); + } } // if workers_info.running_count() == workers_info.active_count() // && workers_info.backup_count() > 0 @@ -165,6 +175,14 @@ impl QueueCore { } } + pub fn mark_idling(&self) -> bool { + !self.idling.compare_and_swap(false, true, Ordering::SeqCst) + } + + pub fn unmark_idling(&self) { + self.idling.store(false, Ordering::SeqCst); + } + /// Sets the shutdown bit and notify all threads. /// /// `source` is used to trace who triggers the action. @@ -181,14 +199,6 @@ impl QueueCore { self.workers_info.load(Ordering::SeqCst).is_shutdown() } - pub fn mark_idling(&self) -> bool { - !self.idling.compare_and_swap(false, true, Ordering::SeqCst) - } - - pub fn unmark_idling(&self) { - self.idling.store(false, Ordering::SeqCst); - } - /// Marks the current thread in sleep state. /// /// It can be marked as sleep only when the pool is not shutting down. @@ -217,6 +227,7 @@ impl QueueCore { Ok(_) => { let down = (workers_info.running_count() + 1 < workers_info.active_count()) as u8; + // println!("down: {}", down); // println!( // "running: {}, active: {}, backup: {}, down: {}", // workers_info.running_count(), @@ -242,6 +253,11 @@ impl QueueCore { /// Marks current thread as woken up states. pub fn mark_woken(&self, backup: bool) { + if backup { + self.wake_up_backup.fetch_add(1, Ordering::SeqCst); + } else { + self.wake_up_active.fetch_add(1, Ordering::SeqCst); + } let mut workers_info = self.workers_info.load(Ordering::SeqCst); loop { let new_info = if backup { @@ -291,6 +307,16 @@ impl QueueCore { } } +impl Drop for QueueCore { + fn drop(&mut self) { + println!( + "wake active: {}, wake backup: {}", + self.wake_up_active.load(Ordering::SeqCst), + self.wake_up_backup.load(Ordering::SeqCst) + ); + } +} + /// Submits tasks to associated thread pool. /// /// Note that thread pool can be shutdown and dropped even not all remotes are diff --git a/src/pool/worker.rs b/src/pool/worker.rs index dadb768..95379e6 100644 --- a/src/pool/worker.rs +++ b/src/pool/worker.rs @@ -24,18 +24,18 @@ where { #[inline] fn pop(&mut self) -> Option> { - self.local.core().ensure_workers(self.local.id); let idling = self.local.core().mark_idling(); let mut counter = 0; while idling { if let Some(t) = self.local.pop() { self.local.core().unmark_idling(); + self.local.core().ensure_workers(self.local.id); return Some(t); } counter += 1; if counter < 3 { thread::yield_now(); - } else if counter < 100 { + } else if counter < 10 { thread::sleep(Duration::from_micros(10)); } else { self.local.core().unmark_idling(); From ef5bc387d61960b13e0b2958113721c740e36dcd Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Mon, 18 May 2020 23:45:10 +0800 Subject: [PATCH 06/12] fix Signed-off-by: Yilin Chen --- src/pool/spawn.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/pool/spawn.rs b/src/pool/spawn.rs index f36d69d..66aeb3a 100644 --- a/src/pool/spawn.rs +++ b/src/pool/spawn.rs @@ -432,6 +432,7 @@ impl Local { match res { ParkResult::Unparked(_) | ParkResult::Invalid => { self.core.mark_woken(backup); + self.core.ensure_workers(id); task } ParkResult::TimedOut => unreachable!(), From 8245e8602a5a46766a40fbe46f7549b4fc03295d Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Tue, 19 May 2020 00:28:52 +0800 Subject: [PATCH 07/12] more strict unpark backup Signed-off-by: Yilin Chen --- src/pool/spawn.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/pool/spawn.rs b/src/pool/spawn.rs index 66aeb3a..28fa623 100644 --- a/src/pool/spawn.rs +++ b/src/pool/spawn.rs @@ -65,7 +65,7 @@ impl WorkersInfo for u64 { pub(crate) struct QueueCore { global_queue: TaskInjector, workers_info: AtomicU64, - active_countdown: AtomicU8, + active_countdown: AtomicU64, backup_countdown: AtomicU8, wake_up_backup: AtomicU64, wake_up_active: AtomicU64, @@ -79,7 +79,7 @@ impl QueueCore { QueueCore { global_queue, workers_info: AtomicU64::new(((thread_count << 17) | (thread_count << 1)) as u64), - active_countdown: AtomicU8::new(0), + active_countdown: AtomicU64::new(0), backup_countdown: AtomicU8::new(0), wake_up_backup: AtomicU64::new(0), wake_up_active: AtomicU64::new(0), @@ -107,10 +107,10 @@ impl QueueCore { } else if source != 0 { let idling = self.idling.load(Ordering::SeqCst); let set_bit = - (workers_info.running_count() == workers_info.active_count() && !idling) as u8; + (workers_info.running_count() == workers_info.active_count() && !idling) as u64; let mut value = self.active_countdown.load(Ordering::Relaxed); loop { - let unpark = value == 0xFF; + let unpark = value == u64::max_value(); let new_value = if unpark { 0 } else { (value << 1) | set_bit }; match self.active_countdown.compare_exchange_weak( value, From bdfd5ba562de440e44303b8432e0049dbd4d649d Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Tue, 19 May 2020 12:21:45 +0800 Subject: [PATCH 08/12] small improvement Signed-off-by: Yilin Chen --- src/pool/spawn.rs | 155 +++++++++++++-------------------------------- src/pool/worker.rs | 17 ++--- 2 files changed, 52 insertions(+), 120 deletions(-) diff --git a/src/pool/spawn.rs b/src/pool/spawn.rs index 28fa623..013d6a0 100644 --- a/src/pool/spawn.rs +++ b/src/pool/spawn.rs @@ -37,7 +37,7 @@ trait WorkersInfo { fn is_shutdown(self) -> bool; fn running_count(self) -> usize; fn active_count(self) -> usize; - fn backup_count(self) -> usize; + fn park_backup_counter(self) -> usize; } impl WorkersInfo for u64 { @@ -53,7 +53,7 @@ impl WorkersInfo for u64 { ((self >> 17) & (u16::max_value() as u64)) as usize } - fn backup_count(self) -> usize { + fn park_backup_counter(self) -> usize { ((self >> 33) & (u16::max_value() as u64)) as usize } } @@ -65,11 +65,8 @@ impl WorkersInfo for u64 { pub(crate) struct QueueCore { global_queue: TaskInjector, workers_info: AtomicU64, - active_countdown: AtomicU64, - backup_countdown: AtomicU8, - wake_up_backup: AtomicU64, - wake_up_active: AtomicU64, - idling: AtomicBool, + unpark_backup_counter: AtomicU64, + park_backup_counter: AtomicU64, config: SchedConfig, } @@ -79,11 +76,8 @@ impl QueueCore { QueueCore { global_queue, workers_info: AtomicU64::new(((thread_count << 17) | (thread_count << 1)) as u64), - active_countdown: AtomicU64::new(0), - backup_countdown: AtomicU8::new(0), - wake_up_backup: AtomicU64::new(0), - wake_up_active: AtomicU64::new(0), - idling: AtomicBool::new(false), + unpark_backup_counter: AtomicU64::new(0), + park_backup_counter: AtomicU64::new(0), config, } } @@ -98,61 +92,34 @@ impl QueueCore { return; } // if source == 0 { - // println!("backup: {}", workers_info.backup_count()); + // println!("backup: {}", workers_info.park_backup_counter()); // } if workers_info.active_count() < self.config.min_thread_count { self.unpark_one(true, source); } else if workers_info.running_count() < self.config.min_thread_count { self.unpark_one(false, source); } else if source != 0 { - let idling = self.idling.load(Ordering::SeqCst); - let set_bit = - (workers_info.running_count() == workers_info.active_count() && !idling) as u64; - let mut value = self.active_countdown.load(Ordering::Relaxed); - loop { - let unpark = value == u64::max_value(); - let new_value = if unpark { 0 } else { (value << 1) | set_bit }; - match self.active_countdown.compare_exchange_weak( - value, - new_value, - Ordering::SeqCst, - Ordering::SeqCst, - ) { - Ok(_) => { - if unpark { - self.unpark_one(true, source); - } + if workers_info.running_count() < workers_info.active_count() { + self.unpark_backup_counter.store(0, Ordering::SeqCst); + self.unpark_one(false, source); + } else { + let mut v = self.unpark_backup_counter.fetch_add(1, Ordering::SeqCst) + 1; + loop { + if v < 64 { break; } - Err(actual) => { - value = actual; + match self.unpark_backup_counter.compare_exchange_weak( + v, + 0, + Ordering::SeqCst, + Ordering::SeqCst, + ) { + Ok(_) => self.unpark_one(true, source), + Err(actual) => v = actual, } } } - if workers_info.running_count() < workers_info.active_count() && !idling { - self.unpark_one(false, source); - } } - // if workers_info.running_count() == workers_info.active_count() - // && workers_info.backup_count() > 0 - // { - // // eprintln!( - // // "unpark backup, running: {}, active: {}, backup: {}", - // // workers_info.running_count(), - // // workers_info.active_count(), - // // workers_info.backup_count() - // // ); - // // println!("unpark backup"); - // self.unpark_one(true, source); - // } else { - // // eprintln!( - // // "unpark active, running: {}, active: {}, backup: {}", - // // workers_info.running_count(), - // // workers_info.active_count(), - // // workers_info.backup_count() - // // ); - // self.unpark_one(false, source); - // } } fn unpark_one(&self, backup: bool, source: usize) { @@ -162,9 +129,21 @@ impl QueueCore { } pub fn park_to_backup(&self) -> bool { - self.backup_countdown - .compare_and_swap(255, 0, Ordering::SeqCst) - == 255 + let mut v = self.park_backup_counter.load(Ordering::SeqCst); + loop { + if v < 64 { + return false; + } + match self.park_backup_counter.compare_exchange_weak( + v, + 0, + Ordering::SeqCst, + Ordering::SeqCst, + ) { + Ok(_) => return true, + Err(actual) => v = actual, + } + } } pub fn park_address(&self, backup: bool) -> usize { @@ -175,14 +154,6 @@ impl QueueCore { } } - pub fn mark_idling(&self) -> bool { - !self.idling.compare_and_swap(false, true, Ordering::SeqCst) - } - - pub fn unmark_idling(&self) { - self.idling.store(false, Ordering::SeqCst); - } - /// Sets the shutdown bit and notify all threads. /// /// `source` is used to trace who triggers the action. @@ -210,11 +181,11 @@ impl QueueCore { } let new_info = if backup { - ((workers_info.backup_count() as u64 + 1) << 33) + ((workers_info.park_backup_counter() as u64 + 1) << 33) | ((workers_info.active_count() as u64 - 1) << 17) | ((workers_info.running_count() as u64 - 1) << 1) } else { - ((workers_info.backup_count() as u64) << 33) + ((workers_info.park_backup_counter() as u64) << 33) | ((workers_info.active_count() as u64) << 17) | ((workers_info.running_count() as u64 - 1) << 1) }; @@ -225,24 +196,10 @@ impl QueueCore { Ordering::SeqCst, ) { Ok(_) => { - let down = - (workers_info.running_count() + 1 < workers_info.active_count()) as u8; - // println!("down: {}", down); - // println!( - // "running: {}, active: {}, backup: {}, down: {}", - // workers_info.running_count(), - // workers_info.active_count(), - // workers_info.backup_count(), - // down - // ); - let mut countdown = self.backup_countdown.load(Ordering::Relaxed); - while let Err(actual) = self.backup_countdown.compare_exchange_weak( - countdown, - (countdown << 1) | down, - Ordering::SeqCst, - Ordering::SeqCst, - ) { - countdown = actual; + if workers_info.running_count() < workers_info.active_count() { + self.park_backup_counter.fetch_add(1, Ordering::SeqCst); + } else { + self.park_backup_counter.store(0, Ordering::SeqCst); } return true; } @@ -253,20 +210,15 @@ impl QueueCore { /// Marks current thread as woken up states. pub fn mark_woken(&self, backup: bool) { - if backup { - self.wake_up_backup.fetch_add(1, Ordering::SeqCst); - } else { - self.wake_up_active.fetch_add(1, Ordering::SeqCst); - } let mut workers_info = self.workers_info.load(Ordering::SeqCst); loop { let new_info = if backup { - ((workers_info.backup_count() as u64 - 1) << 33) + ((workers_info.park_backup_counter() as u64 - 1) << 33) | ((workers_info.active_count() as u64 + 1) << 17) | ((workers_info.running_count() as u64 + 1) << 1) | workers_info.is_shutdown() as u64 } else { - ((workers_info.backup_count() as u64) << 33) + ((workers_info.park_backup_counter() as u64) << 33) | ((workers_info.active_count() as u64) << 17) | ((workers_info.running_count() as u64 + 1) << 1) | workers_info.is_shutdown() as u64 @@ -278,13 +230,6 @@ impl QueueCore { Ordering::SeqCst, ) { Ok(_) => { - // println!( - // "wake! running: {}, active: {}, backup: {}, backup: {}", - // workers_info.running_count(), - // workers_info.active_count(), - // workers_info.backup_count(), - // backup - // ); return; } Err(actual) => workers_info = actual, @@ -307,16 +252,6 @@ impl QueueCore { } } -impl Drop for QueueCore { - fn drop(&mut self) { - println!( - "wake active: {}, wake backup: {}", - self.wake_up_active.load(Ordering::SeqCst), - self.wake_up_backup.load(Ordering::SeqCst) - ); - } -} - /// Submits tasks to associated thread pool. /// /// Note that thread pool can be shutdown and dropped even not all remotes are diff --git a/src/pool/worker.rs b/src/pool/worker.rs index 95379e6..54783bd 100644 --- a/src/pool/worker.rs +++ b/src/pool/worker.rs @@ -24,22 +24,19 @@ where { #[inline] fn pop(&mut self) -> Option> { - let idling = self.local.core().mark_idling(); - let mut counter = 0; - while idling { + // let idling = self.local.core().mark_idling(); + for counter in 0..10 { if let Some(t) = self.local.pop() { - self.local.core().unmark_idling(); + // self.local.core().unmark_idling(); self.local.core().ensure_workers(self.local.id); return Some(t); } - counter += 1; if counter < 3 { - thread::yield_now(); - } else if counter < 10 { - thread::sleep(Duration::from_micros(10)); + for _ in 0..(1 << counter) { + std::sync::atomic::spin_loop_hint(); + } } else { - self.local.core().unmark_idling(); - break; + thread::yield_now(); } } self.runner.pause(&mut self.local); From a4d4f07300dfc6f9b55a365d8af20d9ec5a6b424 Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Tue, 19 May 2020 15:38:03 +0800 Subject: [PATCH 09/12] adjust active thread count adaptively Signed-off-by: Yilin Chen --- src/pool/spawn.rs | 198 +++++++++++++++++++++++++++------------------ src/pool/worker.rs | 4 - 2 files changed, 118 insertions(+), 84 deletions(-) diff --git a/src/pool/spawn.rs b/src/pool/spawn.rs index 013d6a0..416cf7e 100644 --- a/src/pool/spawn.rs +++ b/src/pool/spawn.rs @@ -8,56 +8,95 @@ use crate::pool::SchedConfig; use crate::queue::{Extras, LocalQueue, Pop, TaskCell, TaskInjector, WithExtras}; use fail::fail_point; use parking_lot_core::{ParkResult, ParkToken, UnparkToken}; -use std::sync::atomic::{AtomicBool, AtomicU16, AtomicU64, AtomicU8, Ordering}; +use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; use std::sync::Arc; -/// An usize is used to trace the threads that are working actively. -/// To save additional memory and atomic operation, the number and -/// shutdown hint are merged into one number in the following format -/// ```text -/// 0...00 -/// ^ ^ -/// | The least significant bit indicates whether the queue is shutting down. -/// Bits represent the thread count -/// ``` -const SHUTDOWN_BIT: usize = 1; -const WORKER_COUNT_SHIFT: usize = 1; -const WORKER_COUNT_BASE: usize = 2; - -/// Checks if shutdown bit is set. -pub fn is_shutdown(cnt: usize) -> bool { - cnt & SHUTDOWN_BIT == SHUTDOWN_BIT -} - -// 1 bit: shutdown -// 16 bits: running -// 16 bits: active -// 16 bits: backup +/// A u64 is used to trace the states of the workers. +/// The highest 16 bits represent the running workers count. +/// The 17-32 bits represent the active workers count. +/// The 33-48 bits represent the backup workers count. +/// The lowest 16 bits are used to save other flags, currently only the lowest +/// bit is used to represent whether the pool has been shut down. trait WorkersInfo { + fn new(workers_count: usize) -> Self; + fn is_shutdown(self) -> bool; + fn running_count(self) -> usize; + fn active_count(self) -> usize; - fn park_backup_counter(self) -> usize; + + fn backup_count(self) -> usize; + + fn set_shutdown(self) -> Self; + + // Change an active worker from not running to running. + fn active_to_running(self) -> Self; + + // Change an running worker to a not running but active worker. + fn running_to_active(self) -> Self; + + // Change an running worker to a backup worker. + fn running_to_backup(self) -> Self; + + // Change a backup worker to a running worker. + fn backup_to_running(self) -> Self; } +const SHUTDOWN_BIT: u64 = 1; +const RUNNING_COUNT_SHIFT: u32 = 48; +const ACTIVE_COUNT_SHIFT: u32 = 32; +const BACKUP_COUNT_SHIFT: u32 = 16; +const COUNT_MASK: u64 = 0xFFFF; + impl WorkersInfo for u64 { + fn new(workers_count: usize) -> u64 { + let workers_count = workers_count as u64; + assert!(workers_count <= COUNT_MASK, "too many workers"); + (workers_count << ACTIVE_COUNT_SHIFT) | (workers_count << RUNNING_COUNT_SHIFT) + } + fn is_shutdown(self) -> bool { - self & 1 == 1 + self & SHUTDOWN_BIT == SHUTDOWN_BIT } fn running_count(self) -> usize { - ((self >> 1) & (u16::max_value() as u64)) as usize + ((self >> RUNNING_COUNT_SHIFT) & COUNT_MASK) as usize } fn active_count(self) -> usize { - ((self >> 17) & (u16::max_value() as u64)) as usize + ((self >> ACTIVE_COUNT_SHIFT) & COUNT_MASK) as usize + } + + fn backup_count(self) -> usize { + ((self >> BACKUP_COUNT_SHIFT) & COUNT_MASK) as usize + } + + fn set_shutdown(self) -> Self { + self | SHUTDOWN_BIT } - fn park_backup_counter(self) -> usize { - ((self >> 33) & (u16::max_value() as u64)) as usize + fn active_to_running(self) -> Self { + debug_assert!(self.running_count() < self.active_count()); + self + (1 << RUNNING_COUNT_SHIFT) + } + + fn running_to_active(self) -> Self { + debug_assert!(self.running_count() > 0); + self - (1 << RUNNING_COUNT_SHIFT) + } + + fn running_to_backup(self) -> Self { + self - (1 << RUNNING_COUNT_SHIFT) - (1 << ACTIVE_COUNT_SHIFT) + (1 << BACKUP_COUNT_SHIFT) + } + + fn backup_to_running(self) -> Self { + self + (1 << RUNNING_COUNT_SHIFT) + (1 << ACTIVE_COUNT_SHIFT) - (1 << BACKUP_COUNT_SHIFT) } } +const BACKUP_THRESHOLD: i64 = 64; + /// The core of queues. /// /// Every thread pool instance should have one and only `QueueCore`. It's @@ -65,19 +104,16 @@ impl WorkersInfo for u64 { pub(crate) struct QueueCore { global_queue: TaskInjector, workers_info: AtomicU64, - unpark_backup_counter: AtomicU64, - park_backup_counter: AtomicU64, + backup_count: AtomicI64, config: SchedConfig, } impl QueueCore { pub fn new(global_queue: TaskInjector, config: SchedConfig) -> QueueCore { - let thread_count = config.max_thread_count as u64; QueueCore { global_queue, - workers_info: AtomicU64::new(((thread_count << 17) | (thread_count << 1)) as u64), - unpark_backup_counter: AtomicU64::new(0), - park_backup_counter: AtomicU64::new(0), + workers_info: AtomicU64::new(WorkersInfo::new(config.max_thread_count)), + backup_count: AtomicI64::new(0), config, } } @@ -91,33 +127,43 @@ impl QueueCore { if workers_info.is_shutdown() { return; } - // if source == 0 { - // println!("backup: {}", workers_info.park_backup_counter()); - // } + if workers_info.active_count() < self.config.min_thread_count { self.unpark_one(true, source); } else if workers_info.running_count() < self.config.min_thread_count { self.unpark_one(false, source); } else if source != 0 { - if workers_info.running_count() < workers_info.active_count() { - self.unpark_backup_counter.store(0, Ordering::SeqCst); - self.unpark_one(false, source); - } else { - let mut v = self.unpark_backup_counter.fetch_add(1, Ordering::SeqCst) + 1; - loop { - if v < 64 { - break; - } - match self.unpark_backup_counter.compare_exchange_weak( - v, - 0, - Ordering::SeqCst, - Ordering::SeqCst, - ) { - Ok(_) => self.unpark_one(true, source), - Err(actual) => v = actual, + if workers_info.running_count() == workers_info.active_count() { + let mut backup_count = self.backup_count.load(Ordering::SeqCst); + if backup_count > 0 { + self.backup_count.store(0, Ordering::SeqCst); + } else { + backup_count = self.backup_count.fetch_sub(1, Ordering::SeqCst) - 1; + while backup_count < -BACKUP_THRESHOLD { + match self.backup_count.compare_exchange_weak( + backup_count, + 0, + Ordering::SeqCst, + Ordering::SeqCst, + ) { + Ok(_) => { + self.unpark_one(true, source); + break; + } + Err(actual) => backup_count = actual, + } } } + } else if workers_info.running_count() + 1 < workers_info.active_count() { + let backup_count = self.backup_count.load(Ordering::SeqCst); + if backup_count < 0 { + self.backup_count.store(0, Ordering::SeqCst) + } else { + self.backup_count.fetch_add(1, Ordering::SeqCst); + } + self.unpark_one(false, source); + } else if workers_info.running_count() < workers_info.active_count() { + self.unpark_one(false, source); } } } @@ -129,17 +175,15 @@ impl QueueCore { } pub fn park_to_backup(&self) -> bool { - let mut v = self.park_backup_counter.load(Ordering::SeqCst); + let mut v = self.backup_count.load(Ordering::SeqCst); loop { - if v < 64 { + if v < BACKUP_THRESHOLD { return false; } - match self.park_backup_counter.compare_exchange_weak( - v, - 0, - Ordering::SeqCst, - Ordering::SeqCst, - ) { + match self + .backup_count + .compare_exchange_weak(v, 0, Ordering::SeqCst, Ordering::SeqCst) + { Ok(_) => return true, Err(actual) => v = actual, } @@ -181,13 +225,9 @@ impl QueueCore { } let new_info = if backup { - ((workers_info.park_backup_counter() as u64 + 1) << 33) - | ((workers_info.active_count() as u64 - 1) << 17) - | ((workers_info.running_count() as u64 - 1) << 1) + workers_info.running_to_backup() } else { - ((workers_info.park_backup_counter() as u64) << 33) - | ((workers_info.active_count() as u64) << 17) - | ((workers_info.running_count() as u64 - 1) << 1) + workers_info.running_to_active() }; match self.workers_info.compare_exchange_weak( workers_info, @@ -197,9 +237,12 @@ impl QueueCore { ) { Ok(_) => { if workers_info.running_count() < workers_info.active_count() { - self.park_backup_counter.fetch_add(1, Ordering::SeqCst); - } else { - self.park_backup_counter.store(0, Ordering::SeqCst); + let backup_count = self.backup_count.load(Ordering::SeqCst); + if backup_count < 0 { + self.backup_count.store(0, Ordering::SeqCst) + } else { + self.backup_count.fetch_add(1, Ordering::SeqCst); + } } return true; } @@ -210,18 +253,13 @@ impl QueueCore { /// Marks current thread as woken up states. pub fn mark_woken(&self, backup: bool) { + // println!("wake up, backup: {}", backup); let mut workers_info = self.workers_info.load(Ordering::SeqCst); loop { let new_info = if backup { - ((workers_info.park_backup_counter() as u64 - 1) << 33) - | ((workers_info.active_count() as u64 + 1) << 17) - | ((workers_info.running_count() as u64 + 1) << 1) - | workers_info.is_shutdown() as u64 + workers_info.backup_to_running() } else { - ((workers_info.park_backup_counter() as u64) << 33) - | ((workers_info.active_count() as u64) << 17) - | ((workers_info.running_count() as u64 + 1) << 1) - | workers_info.is_shutdown() as u64 + workers_info.active_to_running() }; match self.workers_info.compare_exchange_weak( workers_info, diff --git a/src/pool/worker.rs b/src/pool/worker.rs index 54783bd..a87a14c 100644 --- a/src/pool/worker.rs +++ b/src/pool/worker.rs @@ -2,9 +2,7 @@ use crate::pool::{Local, Runner}; use crate::queue::{Pop, TaskCell}; -use parking_lot_core::SpinWait; use std::thread; -use std::time::Duration; pub(crate) struct WorkerThread { local: Local, @@ -24,10 +22,8 @@ where { #[inline] fn pop(&mut self) -> Option> { - // let idling = self.local.core().mark_idling(); for counter in 0..10 { if let Some(t) = self.local.pop() { - // self.local.core().unmark_idling(); self.local.core().ensure_workers(self.local.id); return Some(t); } From 439012cfeed9d4e855bbef20ca40b0418a00f8cd Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Tue, 19 May 2020 16:51:00 +0800 Subject: [PATCH 10/12] fix tests Signed-off-by: Yilin Chen --- src/pool/spawn.rs | 10 ++-------- src/pool/worker.rs | 4 +++- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/src/pool/spawn.rs b/src/pool/spawn.rs index 416cf7e..89e251e 100644 --- a/src/pool/spawn.rs +++ b/src/pool/spawn.rs @@ -28,8 +28,6 @@ trait WorkersInfo { fn backup_count(self) -> usize; - fn set_shutdown(self) -> Self; - // Change an active worker from not running to running. fn active_to_running(self) -> Self; @@ -72,17 +70,13 @@ impl WorkersInfo for u64 { ((self >> BACKUP_COUNT_SHIFT) & COUNT_MASK) as usize } - fn set_shutdown(self) -> Self { - self | SHUTDOWN_BIT - } - fn active_to_running(self) -> Self { - debug_assert!(self.running_count() < self.active_count()); + debug_assert!(self.is_shutdown() || self.running_count() < self.active_count()); self + (1 << RUNNING_COUNT_SHIFT) } fn running_to_active(self) -> Self { - debug_assert!(self.running_count() > 0); + debug_assert!(self.is_shutdown() || self.running_count() > 0); self - (1 << RUNNING_COUNT_SHIFT) } diff --git a/src/pool/worker.rs b/src/pool/worker.rs index a87a14c..2207897 100644 --- a/src/pool/worker.rs +++ b/src/pool/worker.rs @@ -124,7 +124,9 @@ mod tests { }; let metrics = r.metrics.clone(); let mut expected_metrics = Metrics::default(); - let (injector, mut locals) = build_spawn(QueueType::SingleLevel, Default::default()); + let mut config = crate::pool::SchedConfig::default(); + config.max_thread_count = 1; + let (injector, mut locals) = build_spawn(QueueType::SingleLevel, config); let th = WorkerThread::new(locals.remove(0), r); let handle = std::thread::spawn(move || { th.run(); From 01ddc1549b12c39d61fad7c06530aa09b410d7cb Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Tue, 19 May 2020 19:03:26 +0800 Subject: [PATCH 11/12] add notified Signed-off-by: Yilin Chen --- src/pool/spawn.rs | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/pool/spawn.rs b/src/pool/spawn.rs index 89e251e..84e5f4d 100644 --- a/src/pool/spawn.rs +++ b/src/pool/spawn.rs @@ -8,7 +8,7 @@ use crate::pool::SchedConfig; use crate::queue::{Extras, LocalQueue, Pop, TaskCell, TaskInjector, WithExtras}; use fail::fail_point; use parking_lot_core::{ParkResult, ParkToken, UnparkToken}; -use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering}; use std::sync::Arc; /// A u64 is used to trace the states of the workers. @@ -99,6 +99,7 @@ pub(crate) struct QueueCore { global_queue: TaskInjector, workers_info: AtomicU64, backup_count: AtomicI64, + notified: AtomicBool, config: SchedConfig, } @@ -108,6 +109,7 @@ impl QueueCore { global_queue, workers_info: AtomicU64::new(WorkersInfo::new(config.max_thread_count)), backup_count: AtomicI64::new(0), + notified: AtomicBool::new(false), config, } } @@ -159,6 +161,13 @@ impl QueueCore { } else if workers_info.running_count() < workers_info.active_count() { self.unpark_one(false, source); } + } else if workers_info.running_count() < workers_info.active_count() { + if !self + .notified + .compare_and_swap(false, true, Ordering::SeqCst) + { + self.unpark_one(false, source); + } } } @@ -247,7 +256,9 @@ impl QueueCore { /// Marks current thread as woken up states. pub fn mark_woken(&self, backup: bool) { - // println!("wake up, backup: {}", backup); + if !backup { + self.notified.store(false, Ordering::SeqCst); + } let mut workers_info = self.workers_info.load(Ordering::SeqCst); loop { let new_info = if backup { From c8c1aa57c730ce7b13db9fc961b17a1b6a85eed5 Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Wed, 20 May 2020 16:47:36 +0800 Subject: [PATCH 12/12] add metrics for active threads count Signed-off-by: Yilin Chen --- src/metrics.rs | 11 +++++++++++ src/pool/builder.rs | 11 ++++++++++- src/pool/spawn.rs | 33 ++++++++++++++++++++++++++++----- src/pool/worker.rs | 5 ++++- 4 files changed, 53 insertions(+), 7 deletions(-) diff --git a/src/metrics.rs b/src/metrics.rs index abacae7..e735b24 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -27,6 +27,17 @@ lazy_static! { ) .unwrap(); + /// The count of active workers. + pub static ref ACTIVE_WORKERS_COUNT: HistogramVec = HistogramVec::new( + histogram_opts!( + "yatp_active_workers_count", + "the count of backup workers", + vec![1.0, 2.0, 4.0, 6.0, 8.0, 12.0, 16.0, 24.0, 32.0, 48.0, 64.0, 96.0, 128.0] + ), + &["name"] + ) + .unwrap(); + static ref NAMESPACE: Mutex> = Mutex::new(None); } diff --git a/src/pool/builder.rs b/src/pool/builder.rs index a49c288..e30c6c3 100644 --- a/src/pool/builder.rs +++ b/src/pool/builder.rs @@ -1,5 +1,6 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. +use crate::metrics::*; use crate::pool::spawn::QueueCore; use crate::pool::worker::WorkerThread; use crate::pool::{CloneRunnerBuilder, Local, Remote, Runner, RunnerBuilder, ThreadPool}; @@ -76,6 +77,9 @@ where F::Runner: Runner + Send + 'static, { let mut threads = Vec::with_capacity(self.builder.sched_config.max_thread_count); + let active_count_histogram = ACTIVE_WORKERS_COUNT + .get_metric_with_label_values(&[&self.builder.name_prefix]) + .unwrap(); for (i, local_queue) in self.local_queues.into_iter().enumerate() { let runner = factory.build(); let name = format!("{}-{}", self.builder.name_prefix, i); @@ -83,7 +87,12 @@ where if let Some(size) = self.builder.stack_size { builder = builder.stack_size(size) } - let local = Local::new(i + 1, local_queue, self.core.clone()); + let local = Local::new( + i + 1, + local_queue, + self.core.clone(), + active_count_histogram.clone(), + ); let thd = WorkerThread::new(local, runner); threads.push( builder diff --git a/src/pool/spawn.rs b/src/pool/spawn.rs index 84e5f4d..bf9bc16 100644 --- a/src/pool/spawn.rs +++ b/src/pool/spawn.rs @@ -8,6 +8,7 @@ use crate::pool::SchedConfig; use crate::queue::{Extras, LocalQueue, Pop, TaskCell, TaskInjector, WithExtras}; use fail::fail_point; use parking_lot_core::{ParkResult, ParkToken, UnparkToken}; +use prometheus::{Histogram, HistogramOpts}; use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering}; use std::sync::Arc; @@ -220,7 +221,14 @@ impl QueueCore { /// Marks the current thread in sleep state. /// /// It can be marked as sleep only when the pool is not shutting down. - pub fn mark_sleep(&self, backup: bool) -> bool { + pub fn mark_sleep(&self, backup: bool, active_count_histogram: &Histogram) -> bool { + if !backup + && self + .notified + .compare_and_swap(true, false, Ordering::SeqCst) + { + return false; + } let mut workers_info = self.workers_info.load(Ordering::SeqCst); loop { if workers_info.is_shutdown() { @@ -247,6 +255,9 @@ impl QueueCore { self.backup_count.fetch_add(1, Ordering::SeqCst); } } + if backup { + active_count_histogram.observe((workers_info.active_count() - 1) as f64); + } return true; } Err(actual) => workers_info = actual, @@ -344,14 +355,21 @@ pub struct Local { pub(crate) id: usize, local_queue: LocalQueue, core: Arc>, + active_count_histogram: Histogram, } impl Local { - pub(crate) fn new(id: usize, local_queue: LocalQueue, core: Arc>) -> Local { + pub(crate) fn new( + id: usize, + local_queue: LocalQueue, + core: Arc>, + active_count_histogram: Histogram, + ) -> Local { Local { id, local_queue, core, + active_count_histogram, } } @@ -390,14 +408,16 @@ impl Local { let backup = self.core.park_to_backup(); let mut task = None; let id = self.id; + let mut marked_sleep = false; let res = unsafe { parking_lot_core::park( self.core.park_address(backup), || { - if !self.core.mark_sleep(backup) { + if !self.core.mark_sleep(backup, &self.active_count_histogram) { return false; } + marked_sleep = true; task = self.local_queue.pop(); task.is_none() }, @@ -409,7 +429,9 @@ impl Local { }; match res { ParkResult::Unparked(_) | ParkResult::Invalid => { - self.core.mark_woken(backup); + if marked_sleep { + self.core.mark_woken(backup); + } self.core.ensure_workers(id); task } @@ -440,10 +462,11 @@ where let queue_type = queue_type.into(); let (global, locals) = crate::queue::build(queue_type, config.max_thread_count); let core = Arc::new(QueueCore::new(global, config)); + let backup_counter = Histogram::with_opts(HistogramOpts::new("_", "_")).unwrap(); let l = locals .into_iter() .enumerate() - .map(|(i, l)| Local::new(i + 1, l, core.clone())) + .map(|(i, l)| Local::new(i + 1, l, core.clone(), backup_counter.clone())) .collect(); let g = Remote::new(core); (g, l) diff --git a/src/pool/worker.rs b/src/pool/worker.rs index 2207897..ec9b72c 100644 --- a/src/pool/worker.rs +++ b/src/pool/worker.rs @@ -24,6 +24,9 @@ where fn pop(&mut self) -> Option> { for counter in 0..10 { if let Some(t) = self.local.pop() { + // if t.schedule_time.elapsed() > Duration::from_millis(1) { + // self.local.core().unpark_one(true, self.local.id); + // } self.local.core().ensure_workers(self.local.id); return Some(t); } @@ -61,7 +64,7 @@ mod tests { use crate::queue::QueueType; use crate::task::callback; use std::sync::*; - use std::time::*; + use std::time::Duration; #[derive(Default, PartialEq, Debug)] struct Metrics {