Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 18 additions & 16 deletions include/ParallelPriotityQueue/SpapQueue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ namespace spapq {
*
* The SpapQueue class or object itself is generally not considered thread-safe with a few exceptions.\n
* (a) pushBeforeProcessing can be called for each worker by at most one thread. Hence, netw.numWorker_ number
* of threads can populate the queue.\n
* of threads can populate the queue. However, additional synchronisation between pushing threads and
* the thread activating the queue is required.\n
* (b) pushDuringProcessing can be called for each (self-push) channel by at most one thread.
*
* @tparam T Type of queue element or task.
Expand Down Expand Up @@ -115,9 +116,10 @@ class SpapQueue final {
///< queue.
std::barrier<> allocateSignal_{netw.numWorkers_ + 1}; ///< Signals that it is now safe to enqueue
///< tasks.
std::barrier<> safeToDeallocateSignal_{netw.numWorkers_}; ///< Signal that all workers have finished
///< working and that it is now safe to
///< deallocate the worker resources.
std::barrier<> safeToDeallocateSignal_{netw.numWorkers_}; ///< Signal that all workers have
///< finished working and that it is now
///< safe to deallocate the worker
///< resources.

std::array<std::jthread, netw.numWorkers_> workers_; ///< Worker threads.

Expand Down Expand Up @@ -183,16 +185,16 @@ bool SpapQueue<T, netw, WorkerTemplate, LocalQType>::initQueue(Args &&...workerA
if (queueActive_.exchange(true, std::memory_order_acq_rel)) {
std::cerr << "SpapQueue is already active and cannot be initiated again!\n";
return false;
} else {
[this, &workerArgs...]<std::size_t... I>(std::index_sequence<I...>) {
((workers_[I] = std::jthread(std::bind_front(&ThisQType::threadWork<I, Args...>, this),
std::forward<Args>(workerArgs)...)),
...);
}(std::make_index_sequence<netw.numWorkers_>{});

allocateSignal_.arrive_and_wait();
return true;
}

[this, &workerArgs...]<std::size_t... I>(std::index_sequence<I...>) {
((workers_[I] = std::jthread(std::bind_front(&ThisQType::threadWork<I, Args...>, this),
std::forward<Args>(workerArgs)...)),
...);
}(std::make_index_sequence<netw.numWorkers_>{});

allocateSignal_.arrive_and_wait();
return true;
}

/**
Expand Down Expand Up @@ -383,12 +385,12 @@ inline void SpapQueue<T, netw, WorkerTemplate, LocalQType>::pushBeforeProcessing
template <typename T, QNetwork netw, template <class, BasicQueue, std::size_t> class WorkerTemplate, BasicQueue LocalQType>
inline void SpapQueue<T, netw, WorkerTemplate, LocalQType>::pushBeforeProcessing(
const value_type val, const std::size_t workerId) noexcept {
globalCount_.fetch_add(1U, std::memory_order_relaxed);
if constexpr (netw.hasHomogeneousInPorts()) {
workerResources_[workerId]->pushUnsafe(val);
} else {
pushBeforeProcessingHelper<netw.numWorkers_>(val, workerId);
}
globalCount_.fetch_add(1U, std::memory_order_release);
}

/**
Expand All @@ -413,7 +415,7 @@ inline bool SpapQueue<T, netw, WorkerTemplate, LocalQType>::pushDuringProcessing
std::size_t prevCount = globalCount_.load(std::memory_order_relaxed);
while (prevCount > 0U
&& (not globalCount_.compare_exchange_weak(
prevCount, prevCount + 1U, std::memory_order_relaxed, std::memory_order_relaxed))) { };
prevCount, prevCount + 1U, std::memory_order_acquire, std::memory_order_relaxed))) { };

// Only inserts if queue is still running
if (prevCount > 0U) {
Expand All @@ -426,7 +428,7 @@ inline bool SpapQueue<T, netw, WorkerTemplate, LocalQType>::pushDuringProcessing
success = std::get<worker>(workerResources_)->push(val, port);
}

if (not success) { globalCount_.fetch_sub(1U, std::memory_order_relaxed); }
if (not success) { globalCount_.fetch_sub(1U, std::memory_order_release); }
}

return success;
Expand Down
14 changes: 11 additions & 3 deletions include/ParallelPriotityQueue/SpapQueueWorker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,10 @@ inline void WorkerResource<GlobalQType, LocalQType, numPorts>::incrGlobalCount()
const std::size_t diff = localCount_ - newLocalCount;

localCount_ = newLocalCount;
globalQueue_.globalCount_.fetch_add(diff, std::memory_order_acquire);
globalQueue_.globalCount_.fetch_add(diff, std::memory_order_relaxed);
// Can be relaxed as this is only ever called during enqueueGlobal which is only ever called during
// the processing of an element. This means we can be sure that the queue will not become empty until
// the processing has finished by which point the increment must have occured.
}
}

Expand All @@ -384,11 +387,16 @@ inline void WorkerResource<GlobalQType, LocalQType, numPorts>::incrGlobalCount()
template <typename GlobalQType, BasicQueue LocalQType, std::size_t numPorts>
inline void WorkerResource<GlobalQType, LocalQType, numPorts>::decrGlobalCount() noexcept {
if (localCount_ == 0) {
const std::size_t newLocalCount = queue_.size() / 2;
const std::size_t queueSize = queue_.size();
const std::size_t newLocalCount = queueSize / 2;
const std::size_t diff = newLocalCount + 1;

localCount_ = newLocalCount;
globalQueue_.globalCount_.fetch_sub(diff, std::memory_order_release);
if (queueSize > 0U) [[likely]] { // Release is only needed to communicate completed work
globalQueue_.globalCount_.fetch_sub(diff, std::memory_order_relaxed);
} else {
globalQueue_.globalCount_.fetch_sub(diff, std::memory_order_release);
}
} else {
--localCount_;
}
Expand Down