diff --git a/include/thread_pool/queue_props.h b/include/thread_pool/queue_props.h new file mode 100644 index 0000000..5db2548 --- /dev/null +++ b/include/thread_pool/queue_props.h @@ -0,0 +1,38 @@ +#pragma once + +#include + +namespace dp { + +struct queue_props final{ + enum class overflow_action { DISCARD_OLDEST, DISCARD_NEWEST }; +private: + size_t queue_size_; + overflow_action overflow_action_; + bool notify_; +public: + queue_props(): + queue_size_{0}, + overflow_action_{overflow_action::DISCARD_NEWEST}, + notify_{false} + {} + queue_props(size_t queueSize, bool notify=false): + queue_size_{queueSize}, + overflow_action_{overflow_action::DISCARD_NEWEST}, + notify_{notify} + {} + queue_props(size_t queueSize, overflow_action overflowAction, bool notify=false): + queue_size_{queueSize}, + overflow_action_{overflowAction}, + notify_{notify} + {} + + bool will_notify() const { return notify_; } + bool is_infinite() const { return queue_size_ == 0; } + bool no_queue() const { return queue_size_ == 1; } + size_t get_queue_size() const { return queue_size_; } + overflow_action get_overflow_action() const { return overflow_action_; } + size_t num_threads(size_t requested) const { return overflow_action_ == overflow_action::DISCARD_OLDEST ? std::min((size_t)1,requested) : requested; } +}; + +} // namespace dp diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index 5817c44..3a2d7fa 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -18,6 +18,7 @@ #endif #include "thread_safe_queue.h" +#include "queue_props.h" namespace dp { namespace details { @@ -114,13 +115,38 @@ namespace dp { } } - ~thread_pool() { - wait_for_tasks(); - + /** + * @brief stops the thread pool with a non-blocking request. + * @details This does not clear queued tasks. But it does prevent + * new tasks from being generated queued. + */ + void stop_non_blocking() { + // if stopped_ return, otherwise set to true + bool FALSE = false; + if(!stopped_.compare_exchange_strong(FALSE, true)) return; // stop all threads for (std::size_t i = 0; i < threads_.size(); ++i) { threads_[i].request_stop(); tasks_[i].signal.release(); + } + } + + /** + * @brief stops the thread pool and waits for all queued tasks to complete. + * @details This does not clear queued tasks. But it does prevent + * new tasks from being generated queued. + */ + void stop() { + stop_non_blocking(); + wait_for_tasks(); + } + + /** + * @brief Destroy the thread pool object, calling stop() in the process. + */ + ~thread_pool() { + stop(); + for (std::size_t i = 0; i < threads_.size(); ++i) { threads_[i].join(); } } @@ -143,6 +169,7 @@ namespace dp { typename ReturnType = std::invoke_result_t> requires std::invocable [[nodiscard]] std::future enqueue(Function f, Args... args) { + if(stopped_.load()) throw std::runtime_error("Attempted to enqueue a new task to a stopped thread pool"); #ifdef __cpp_lib_move_only_function // we can do this in C++23 because we now have support for move only functions std::promise promise; @@ -208,6 +235,7 @@ namespace dp { template requires std::invocable void enqueue_detach(Function&& func, Args&&... args) { + if(stopped_.load()) throw std::runtime_error("Attempted to enqueue a new task to a stopped thread pool"); enqueue_task( std::move([f = std::forward(func), ... largs = std::forward(args)]() mutable -> decltype(auto) { @@ -262,6 +290,15 @@ namespace dp { return removed_task_count; } + /** + * @brief Check if this threadpool has been requested to stop + * + * @return true if stopped, false otherwise + */ + bool stop_requested() const { + return stopped_; + } + private: template void enqueue_task(Function&& f) { @@ -291,13 +328,14 @@ namespace dp { dp::thread_safe_queue tasks{}; std::binary_semaphore signal{0}; }; - + std::vector threads_; std::deque tasks_; dp::thread_safe_queue priority_queue_; // guarantee these get zero-initialized std::atomic_int_fast64_t unassigned_tasks_{0}, in_flight_tasks_{0}; std::atomic_bool threads_complete_signal_{false}; + std::atomic_bool stopped_{false}; }; /**