From 20ad70a9e7505ca6dd6fa7d19e55be40dc1ed4e6 Mon Sep 17 00:00:00 2001 From: Raphael Steiner Date: Sun, 8 Mar 2026 19:04:03 +0100 Subject: [PATCH 1/2] fix --- include/ParallelPriotityQueue/SpapQueue.hpp | 29 +++++++++++---------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/include/ParallelPriotityQueue/SpapQueue.hpp b/include/ParallelPriotityQueue/SpapQueue.hpp index 78724a2..cc802d7 100644 --- a/include/ParallelPriotityQueue/SpapQueue.hpp +++ b/include/ParallelPriotityQueue/SpapQueue.hpp @@ -115,9 +115,10 @@ class SpapQueue final { ///< queue. std::barrier<> allocateSignal_{netw.numWorkers_ + 1}; ///< Signals that it is now safe to enqueue ///< tasks. - std::barrier<> safeToDeallocateSignal_{netw.numWorkers_}; ///< Signal that all workers have finished - ///< working and that it is now safe to - ///< deallocate the worker resources. + std::barrier<> safeToDeallocateSignal_{netw.numWorkers_}; ///< Signal that all workers have + ///< finished working and that it is now + ///< safe to deallocate the worker + ///< resources. std::array workers_; ///< Worker threads. @@ -183,16 +184,16 @@ bool SpapQueue::initQueue(Args &&...workerA if (queueActive_.exchange(true, std::memory_order_acq_rel)) { std::cerr << "SpapQueue is already active and cannot be initiated again!\n"; return false; + } else { + [this, &workerArgs...](std::index_sequence) { + ((workers_[I] = std::jthread(std::bind_front(&ThisQType::threadWork, this), + std::forward(workerArgs)...)), + ...); + }(std::make_index_sequence{}); + + allocateSignal_.arrive_and_wait(); + return true; } - - [this, &workerArgs...](std::index_sequence) { - ((workers_[I] = std::jthread(std::bind_front(&ThisQType::threadWork, this), - std::forward(workerArgs)...)), - ...); - }(std::make_index_sequence{}); - - allocateSignal_.arrive_and_wait(); - return true; } /** @@ -413,7 +414,7 @@ inline bool SpapQueue::pushDuringProcessing std::size_t prevCount = globalCount_.load(std::memory_order_relaxed); while (prevCount > 0U && (not globalCount_.compare_exchange_weak( - prevCount, prevCount + 1U, std::memory_order_relaxed, std::memory_order_relaxed))) { }; + prevCount, prevCount + 1U, std::memory_order_acquire, std::memory_order_relaxed))) { }; // Only inserts if queue is still running if (prevCount > 0U) { @@ -426,7 +427,7 @@ inline bool SpapQueue::pushDuringProcessing success = std::get(workerResources_)->push(val, port); } - if (not success) { globalCount_.fetch_sub(1U, std::memory_order_relaxed); } + if (not success) { globalCount_.fetch_sub(1U, std::memory_order_release); } } return success; From 753ecf62d69d966810e8077e1cb77a8d8a76feb6 Mon Sep 17 00:00:00 2001 From: Raphael Steiner Date: Mon, 9 Mar 2026 18:27:04 +0100 Subject: [PATCH 2/2] improved memory order --- include/ParallelPriotityQueue/SpapQueue.hpp | 5 +++-- include/ParallelPriotityQueue/SpapQueueWorker.hpp | 14 +++++++++++--- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/include/ParallelPriotityQueue/SpapQueue.hpp b/include/ParallelPriotityQueue/SpapQueue.hpp index cc802d7..e57c073 100644 --- a/include/ParallelPriotityQueue/SpapQueue.hpp +++ b/include/ParallelPriotityQueue/SpapQueue.hpp @@ -55,7 +55,8 @@ namespace spapq { * * The SpapQueue class or object itself is generally not considered thread-safe with a few exceptions.\n * (a) pushBeforeProcessing can be called for each worker by at most one thread. Hence, netw.numWorker_ number - * of threads can populate the queue.\n + * of threads can populate the queue. However, additional synchronisation between pushing threads and + * the thread activating the queue is required.\n * (b) pushDuringProcessing can be called for each (self-push) channel by at most one thread. * * @tparam T Type of queue element or task. @@ -384,12 +385,12 @@ inline void SpapQueue::pushBeforeProcessing template class WorkerTemplate, BasicQueue LocalQType> inline void SpapQueue::pushBeforeProcessing( const value_type val, const std::size_t workerId) noexcept { + globalCount_.fetch_add(1U, std::memory_order_relaxed); if constexpr (netw.hasHomogeneousInPorts()) { workerResources_[workerId]->pushUnsafe(val); } else { pushBeforeProcessingHelper(val, workerId); } - globalCount_.fetch_add(1U, std::memory_order_release); } /** diff --git a/include/ParallelPriotityQueue/SpapQueueWorker.hpp b/include/ParallelPriotityQueue/SpapQueueWorker.hpp index 859e590..86122bc 100644 --- a/include/ParallelPriotityQueue/SpapQueueWorker.hpp +++ b/include/ParallelPriotityQueue/SpapQueueWorker.hpp @@ -372,7 +372,10 @@ inline void WorkerResource::incrGlobalCount() const std::size_t diff = localCount_ - newLocalCount; localCount_ = newLocalCount; - globalQueue_.globalCount_.fetch_add(diff, std::memory_order_acquire); + globalQueue_.globalCount_.fetch_add(diff, std::memory_order_relaxed); + // Can be relaxed as this is only ever called during enqueueGlobal which is only ever called during + // the processing of an element. This means we can be sure that the queue will not become empty until + // the processing has finished by which point the increment must have occured. } } @@ -384,11 +387,16 @@ inline void WorkerResource::incrGlobalCount() template inline void WorkerResource::decrGlobalCount() noexcept { if (localCount_ == 0) { - const std::size_t newLocalCount = queue_.size() / 2; + const std::size_t queueSize = queue_.size(); + const std::size_t newLocalCount = queueSize / 2; const std::size_t diff = newLocalCount + 1; localCount_ = newLocalCount; - globalQueue_.globalCount_.fetch_sub(diff, std::memory_order_release); + if (queueSize > 0U) [[likely]] { // Release is only needed to communicate completed work + globalQueue_.globalCount_.fetch_sub(diff, std::memory_order_relaxed); + } else { + globalQueue_.globalCount_.fetch_sub(diff, std::memory_order_release); + } } else { --localCount_; }