diff --git a/include/ParallelPriotityQueue/SpapQueue.hpp b/include/ParallelPriotityQueue/SpapQueue.hpp index 78724a2..e57c073 100644 --- a/include/ParallelPriotityQueue/SpapQueue.hpp +++ b/include/ParallelPriotityQueue/SpapQueue.hpp @@ -55,7 +55,8 @@ namespace spapq { * * The SpapQueue class or object itself is generally not considered thread-safe with a few exceptions.\n * (a) pushBeforeProcessing can be called for each worker by at most one thread. Hence, netw.numWorker_ number - * of threads can populate the queue.\n + * of threads can populate the queue. However, additional synchronisation between pushing threads and + * the thread activating the queue is required.\n * (b) pushDuringProcessing can be called for each (self-push) channel by at most one thread. * * @tparam T Type of queue element or task. @@ -115,9 +116,10 @@ class SpapQueue final { ///< queue. std::barrier<> allocateSignal_{netw.numWorkers_ + 1}; ///< Signals that it is now safe to enqueue ///< tasks. - std::barrier<> safeToDeallocateSignal_{netw.numWorkers_}; ///< Signal that all workers have finished - ///< working and that it is now safe to - ///< deallocate the worker resources. + std::barrier<> safeToDeallocateSignal_{netw.numWorkers_}; ///< Signal that all workers have + ///< finished working and that it is now + ///< safe to deallocate the worker + ///< resources. std::array workers_; ///< Worker threads. @@ -183,16 +185,16 @@ bool SpapQueue::initQueue(Args &&...workerA if (queueActive_.exchange(true, std::memory_order_acq_rel)) { std::cerr << "SpapQueue is already active and cannot be initiated again!\n"; return false; + } else { + [this, &workerArgs...](std::index_sequence) { + ((workers_[I] = std::jthread(std::bind_front(&ThisQType::threadWork, this), + std::forward(workerArgs)...)), + ...); + }(std::make_index_sequence{}); + + allocateSignal_.arrive_and_wait(); + return true; } - - [this, &workerArgs...](std::index_sequence) { - ((workers_[I] = std::jthread(std::bind_front(&ThisQType::threadWork, this), - std::forward(workerArgs)...)), - ...); - }(std::make_index_sequence{}); - - allocateSignal_.arrive_and_wait(); - return true; } /** @@ -383,12 +385,12 @@ inline void SpapQueue::pushBeforeProcessing template class WorkerTemplate, BasicQueue LocalQType> inline void SpapQueue::pushBeforeProcessing( const value_type val, const std::size_t workerId) noexcept { + globalCount_.fetch_add(1U, std::memory_order_relaxed); if constexpr (netw.hasHomogeneousInPorts()) { workerResources_[workerId]->pushUnsafe(val); } else { pushBeforeProcessingHelper(val, workerId); } - globalCount_.fetch_add(1U, std::memory_order_release); } /** @@ -413,7 +415,7 @@ inline bool SpapQueue::pushDuringProcessing std::size_t prevCount = globalCount_.load(std::memory_order_relaxed); while (prevCount > 0U && (not globalCount_.compare_exchange_weak( - prevCount, prevCount + 1U, std::memory_order_relaxed, std::memory_order_relaxed))) { }; + prevCount, prevCount + 1U, std::memory_order_acquire, std::memory_order_relaxed))) { }; // Only inserts if queue is still running if (prevCount > 0U) { @@ -426,7 +428,7 @@ inline bool SpapQueue::pushDuringProcessing success = std::get(workerResources_)->push(val, port); } - if (not success) { globalCount_.fetch_sub(1U, std::memory_order_relaxed); } + if (not success) { globalCount_.fetch_sub(1U, std::memory_order_release); } } return success; diff --git a/include/ParallelPriotityQueue/SpapQueueWorker.hpp b/include/ParallelPriotityQueue/SpapQueueWorker.hpp index 859e590..86122bc 100644 --- a/include/ParallelPriotityQueue/SpapQueueWorker.hpp +++ b/include/ParallelPriotityQueue/SpapQueueWorker.hpp @@ -372,7 +372,10 @@ inline void WorkerResource::incrGlobalCount() const std::size_t diff = localCount_ - newLocalCount; localCount_ = newLocalCount; - globalQueue_.globalCount_.fetch_add(diff, std::memory_order_acquire); + globalQueue_.globalCount_.fetch_add(diff, std::memory_order_relaxed); + // Can be relaxed as this is only ever called during enqueueGlobal which is only ever called during + // the processing of an element. This means we can be sure that the queue will not become empty until + // the processing has finished by which point the increment must have occured. } } @@ -384,11 +387,16 @@ inline void WorkerResource::incrGlobalCount() template inline void WorkerResource::decrGlobalCount() noexcept { if (localCount_ == 0) { - const std::size_t newLocalCount = queue_.size() / 2; + const std::size_t queueSize = queue_.size(); + const std::size_t newLocalCount = queueSize / 2; const std::size_t diff = newLocalCount + 1; localCount_ = newLocalCount; - globalQueue_.globalCount_.fetch_sub(diff, std::memory_order_release); + if (queueSize > 0U) [[likely]] { // Release is only needed to communicate completed work + globalQueue_.globalCount_.fetch_sub(diff, std::memory_order_relaxed); + } else { + globalQueue_.globalCount_.fetch_sub(diff, std::memory_order_release); + } } else { --localCount_; }