Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 13 additions & 15 deletions include/ParallelPriotityQueue/SpapQueueWorker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,15 +223,13 @@ inline void WorkerResource<GlobalQType, LocalQType, numPorts>::enqueueGlobal(con
*/
template <typename GlobalQType, BasicQueue LocalQType, std::size_t numPorts>
inline bool WorkerResource<GlobalQType, LocalQType, numPorts>::pushOutBuffer() noexcept {
bool successfulPush;

const std::size_t batch = GlobalQType::netw_.batchSize_[*channelPointer_];
assert(batch <= bufferHead_ - bufferTail_);

const std::size_t targetWorker = GlobalQType::netw_.edgeTargets_[*channelPointer_];
if (targetWorker == GlobalQType::netw_.numWorkers_) { // netw.numWorkers_ is reserved for self-push
pushOutBufferSelf(batch);
successfulPush = true;
return true;
} else {
const std::size_t reducedTail = bufferTail_ % outBuffer_.size();
const std::size_t numElementsFirstPush = std::min(outBuffer_.size() - reducedTail, batch);
Expand All @@ -246,18 +244,18 @@ inline bool WorkerResource<GlobalQType, LocalQType, numPorts>::pushOutBuffer() n
static_cast<typename decltype(outBuffer_)::difference_type>(numElementsSecondPush));

const std::size_t port = GlobalQType::netw_.targetPort_[*channelPointer_];
successfulPush = globalQueue_.pushInternal(itBeginFirst, itEndFirst, targetWorker, port);
if (successfulPush) { bufferTail_ += numElementsFirstPush; }

if (numElementsSecondPush > 0U) {
const bool successfulSecondPush
= globalQueue_.pushInternal(outBuffer_.begin(), itEndSecond, targetWorker, port);
successfulPush |= successfulSecondPush;
if (successfulSecondPush) { bufferTail_ += numElementsSecondPush; };
const bool successfulPush = globalQueue_.pushInternal(itBeginFirst, itEndFirst, targetWorker, port);
if (successfulPush) {
bufferTail_ += numElementsFirstPush;

if (numElementsSecondPush > 0U) {
const bool successfulSecondPush
= globalQueue_.pushInternal(outBuffer_.begin(), itEndSecond, targetWorker, port);
if (successfulSecondPush) { bufferTail_ += numElementsSecondPush; };
}
}
return successfulPush;
}

return successfulPush;
}

/**
Expand Down Expand Up @@ -374,7 +372,7 @@ inline void WorkerResource<GlobalQType, LocalQType, numPorts>::incrGlobalCount()
const std::size_t diff = localCount_ - newLocalCount;

localCount_ = newLocalCount;
globalQueue_.globalCount_.fetch_add(diff, std::memory_order_relaxed);
globalQueue_.globalCount_.fetch_add(diff, std::memory_order_acquire);
}
}

Expand All @@ -390,7 +388,7 @@ inline void WorkerResource<GlobalQType, LocalQType, numPorts>::decrGlobalCount()
const std::size_t diff = newLocalCount + 1;

localCount_ = newLocalCount;
globalQueue_.globalCount_.fetch_sub(diff, std::memory_order_relaxed);
globalQueue_.globalCount_.fetch_sub(diff, std::memory_order_release);
} else {
--localCount_;
}
Expand Down
Loading