diff --git a/include/ParallelPriotityQueue/SpapQueueWorker.hpp b/include/ParallelPriotityQueue/SpapQueueWorker.hpp index f6f7916..859e590 100644 --- a/include/ParallelPriotityQueue/SpapQueueWorker.hpp +++ b/include/ParallelPriotityQueue/SpapQueueWorker.hpp @@ -223,15 +223,13 @@ inline void WorkerResource::enqueueGlobal(con */ template inline bool WorkerResource::pushOutBuffer() noexcept { - bool successfulPush; - const std::size_t batch = GlobalQType::netw_.batchSize_[*channelPointer_]; assert(batch <= bufferHead_ - bufferTail_); const std::size_t targetWorker = GlobalQType::netw_.edgeTargets_[*channelPointer_]; if (targetWorker == GlobalQType::netw_.numWorkers_) { // netw.numWorkers_ is reserved for self-push pushOutBufferSelf(batch); - successfulPush = true; + return true; } else { const std::size_t reducedTail = bufferTail_ % outBuffer_.size(); const std::size_t numElementsFirstPush = std::min(outBuffer_.size() - reducedTail, batch); @@ -246,18 +244,18 @@ inline bool WorkerResource::pushOutBuffer() n static_cast(numElementsSecondPush)); const std::size_t port = GlobalQType::netw_.targetPort_[*channelPointer_]; - successfulPush = globalQueue_.pushInternal(itBeginFirst, itEndFirst, targetWorker, port); - if (successfulPush) { bufferTail_ += numElementsFirstPush; } - - if (numElementsSecondPush > 0U) { - const bool successfulSecondPush - = globalQueue_.pushInternal(outBuffer_.begin(), itEndSecond, targetWorker, port); - successfulPush |= successfulSecondPush; - if (successfulSecondPush) { bufferTail_ += numElementsSecondPush; }; + const bool successfulPush = globalQueue_.pushInternal(itBeginFirst, itEndFirst, targetWorker, port); + if (successfulPush) { + bufferTail_ += numElementsFirstPush; + + if (numElementsSecondPush > 0U) { + const bool successfulSecondPush + = globalQueue_.pushInternal(outBuffer_.begin(), itEndSecond, targetWorker, port); + if (successfulSecondPush) { bufferTail_ += numElementsSecondPush; }; + } } + return successfulPush; } - - return successfulPush; } /** @@ -374,7 +372,7 @@ inline void WorkerResource::incrGlobalCount() const std::size_t diff = localCount_ - newLocalCount; localCount_ = newLocalCount; - globalQueue_.globalCount_.fetch_add(diff, std::memory_order_relaxed); + globalQueue_.globalCount_.fetch_add(diff, std::memory_order_acquire); } } @@ -390,7 +388,7 @@ inline void WorkerResource::decrGlobalCount() const std::size_t diff = newLocalCount + 1; localCount_ = newLocalCount; - globalQueue_.globalCount_.fetch_sub(diff, std::memory_order_relaxed); + globalQueue_.globalCount_.fetch_sub(diff, std::memory_order_release); } else { --localCount_; } diff --git a/tests/SpapQueue.cpp b/tests/SpapQueue.cpp index 29fd71c..645ae72 100644 --- a/tests/SpapQueue.cpp +++ b/tests/SpapQueue.cpp @@ -58,7 +58,7 @@ class DivisorWorker final : public WorkerResource> &ansCounter) : WorkerResource(globalQueue, channelIndices, workerId), - locAnsCounter_(ansCounter[workerId]){} + locAnsCounter_(ansCounter[workerId]) { } DivisorWorker(const DivisorWorker &other) = delete; DivisorWorker(DivisorWorker &&other) = delete; @@ -116,7 +116,7 @@ class FibonacciWorker final : public WorkerResource> &ansCounter) : WorkerResource(globalQueue, channelIndices, workerId), - locAnsCounter_(ansCounter[workerId]){} + locAnsCounter_(ansCounter[workerId]) { } FibonacciWorker(const FibonacciWorker &other) = delete; FibonacciWorker(FibonacciWorker &&other) = delete; @@ -267,6 +267,28 @@ TEST(SpapQueueTest, DivisorsSingleWorker) { for (std::size_t i = 0; i < divisorTestMaxSize; ++i) { EXPECT_EQ(ansCounter[0][i], solution[i]); } } +TEST(SpapQueueTest, DivisorsSingleWorkerBatch) { + constexpr QNetwork<1, 1> netw({0, 1}, {0}, {0}, {1}, {8}); + + std::vector> ansCounter(netw.numWorkers_, + std::vector(divisorTestMaxSize, 0)); + + SpapQueue globalQ; + EXPECT_TRUE(globalQ.initQueue(std::ref(ansCounter))); + globalQ.pushBeforeProcessing(1U, 0U); + globalQ.processQueue(); + globalQ.waitProcessFinish(); + + std::vector solution = computeAnswerDivisors(divisorTestMaxSize); + + // Tallying up from all workers + for (std::size_t i = 1; i < netw.numWorkers_; ++i) { + for (std::size_t j = 0; j < divisorTestMaxSize; ++j) { ansCounter[0][j] += ansCounter[i][j]; } + } + + for (std::size_t i = 0; i < divisorTestMaxSize; ++i) { EXPECT_EQ(ansCounter[0][i], solution[i]); } +} + TEST(SpapQueueTest, DivisorsHomogeneousWorkers) { constexpr QNetwork<4, 16> netw = FULLY_CONNECTED_GRAPH<4U>(); @@ -289,6 +311,32 @@ TEST(SpapQueueTest, DivisorsHomogeneousWorkers) { for (std::size_t i = 0; i < divisorTestMaxSize; ++i) { EXPECT_EQ(ansCounter[0][i], solution[i]); } } +TEST(SpapQueueTest, DivisorsHomogeneousWorkersBatch) { + constexpr QNetwork<4, 16> netw({0, 4, 8, 12, 16}, + {0, 1, 2, 3, 1, 2, 3, 0, 2, 3, 0, 1, 3, 0, 1, 2}, + {0, 1, 2, 3}, + {2, 1, 1, 1, 2, 1, 1, 1, 2, 1, 1, 1, 2, 1, 1, 1}, + {4, 8, 8, 8, 4, 8, 8, 8, 4, 8, 8, 8, 4, 8, 8, 8}); + + std::vector> ansCounter(netw.numWorkers_, + std::vector(divisorTestMaxSize, 0)); + + SpapQueue globalQ; + EXPECT_TRUE(globalQ.initQueue(std::ref(ansCounter))); + globalQ.pushBeforeProcessing(1U, 0U); + globalQ.processQueue(); + globalQ.waitProcessFinish(); + + std::vector solution = computeAnswerDivisors(divisorTestMaxSize); + + // Tallying up from all workers + for (std::size_t i = 1; i < netw.numWorkers_; ++i) { + for (std::size_t j = 0; j < divisorTestMaxSize; ++j) { ansCounter[0][j] += ansCounter[i][j]; } + } + + for (std::size_t i = 0; i < divisorTestMaxSize; ++i) { EXPECT_EQ(ansCounter[0][i], solution[i]); } +} + TEST(SpapQueueTest, DivisorsHeterogeneousWorkers) { constexpr QNetwork<2, 3> netw({0, 1, 3}, {1, 0, 1}); @@ -311,6 +359,28 @@ TEST(SpapQueueTest, DivisorsHeterogeneousWorkers) { for (std::size_t i = 0; i < divisorTestMaxSize; ++i) { EXPECT_EQ(ansCounter[0][i], solution[i]); } } +TEST(SpapQueueTest, DivisorsHeterogeneousWorkersBatch) { + constexpr QNetwork<2, 3> netw({0, 1, 3}, {1, 0, 1}, {0, 1}, {1, 2, 1}, {4, 8, 4}); + + std::vector> ansCounter(netw.numWorkers_, + std::vector(divisorTestMaxSize, 0)); + + SpapQueue globalQ; + EXPECT_TRUE(globalQ.initQueue(std::ref(ansCounter))); + globalQ.pushBeforeProcessing(1U, 0U); + globalQ.processQueue(); + globalQ.waitProcessFinish(); + + std::vector solution = computeAnswerDivisors(divisorTestMaxSize); + + // Tallying up from all workers + for (std::size_t i = 1; i < netw.numWorkers_; ++i) { + for (std::size_t j = 0; j < divisorTestMaxSize; ++j) { ansCounter[0][j] += ansCounter[i][j]; } + } + + for (std::size_t i = 0; i < divisorTestMaxSize; ++i) { EXPECT_EQ(ansCounter[0][i], solution[i]); } +} + TEST(SpapQueueTest, DivisorsPushSafeHomogeneousWorkers) { constexpr QNetwork<4, 16> netw = FULLY_CONNECTED_GRAPH<4U>(); @@ -477,6 +547,28 @@ TEST(SpapQueueTest, FibonacciSingleWorker) { for (std::size_t i = 0; i < fibonacciTestSize + 1; ++i) { EXPECT_EQ(ansCounter[0][i], solution[i]); } } +TEST(SpapQueueTest, FibonacciSingleWorkerBatch) { + constexpr QNetwork<1, 1> netw({0, 1}, {0}, {0}, {1}, {8}); + + std::vector> ansCounter(netw.numWorkers_, + std::vector(fibonacciTestSize + 1, 0)); + + SpapQueue globalQ; + EXPECT_TRUE(globalQ.initQueue(std::ref(ansCounter))); + globalQ.pushBeforeProcessing(fibonacciTestSize, 0U); + globalQ.processQueue(); + globalQ.waitProcessFinish(); + + std::vector solution = computeAnswerFibonacci(fibonacciTestSize + 1); + + // Tallying up from all workers + for (std::size_t i = 1; i < netw.numWorkers_; ++i) { + for (std::size_t j = 0; j < fibonacciTestSize + 1; ++j) { ansCounter[0][j] += ansCounter[i][j]; } + } + + for (std::size_t i = 0; i < fibonacciTestSize + 1; ++i) { EXPECT_EQ(ansCounter[0][i], solution[i]); } +} + TEST(SpapQueueTest, FibonacciHomogeneousWorkers) { constexpr QNetwork<4, 16> netw = FULLY_CONNECTED_GRAPH<4U>(); @@ -499,6 +591,32 @@ TEST(SpapQueueTest, FibonacciHomogeneousWorkers) { for (std::size_t i = 0; i < fibonacciTestSize + 1; ++i) { EXPECT_EQ(ansCounter[0][i], solution[i]); } } +TEST(SpapQueueTest, FibonacciHomogeneousWorkersBatch) { + constexpr QNetwork<4, 16> netw({0, 4, 8, 12, 16}, + {0, 1, 2, 3, 1, 2, 3, 0, 2, 3, 0, 1, 3, 0, 1, 2}, + {0, 1, 2, 3}, + {2, 1, 1, 1, 2, 1, 1, 1, 2, 1, 1, 1, 2, 1, 1, 1}, + {4, 8, 8, 8, 4, 8, 8, 8, 4, 8, 8, 8, 4, 8, 8, 8}); + + std::vector> ansCounter(netw.numWorkers_, + std::vector(fibonacciTestSize + 1, 0)); + + SpapQueue globalQ; + EXPECT_TRUE(globalQ.initQueue(std::ref(ansCounter))); + globalQ.pushBeforeProcessing(fibonacciTestSize, 0U); + globalQ.processQueue(); + globalQ.waitProcessFinish(); + + std::vector solution = computeAnswerFibonacci(fibonacciTestSize + 1); + + // Tallying up from all workers + for (std::size_t i = 1; i < netw.numWorkers_; ++i) { + for (std::size_t j = 0; j < fibonacciTestSize + 1; ++j) { ansCounter[0][j] += ansCounter[i][j]; } + } + + for (std::size_t i = 0; i < fibonacciTestSize + 1; ++i) { EXPECT_EQ(ansCounter[0][i], solution[i]); } +} + TEST(SpapQueueTest, FibonacciHeterogeneousWorkers) { constexpr QNetwork<2, 3> netw({0, 1, 3}, {1, 0, 1}); @@ -521,6 +639,28 @@ TEST(SpapQueueTest, FibonacciHeterogeneousWorkers) { for (std::size_t i = 0; i < fibonacciTestSize + 1; ++i) { EXPECT_EQ(ansCounter[0][i], solution[i]); } } +TEST(SpapQueueTest, FibonacciHeterogeneousWorkersBatch) { + constexpr QNetwork<2, 3> netw({0, 1, 3}, {1, 0, 1}, {0, 1}, {1, 2, 1}, {4, 8, 4}); + + std::vector> ansCounter(netw.numWorkers_, + std::vector(fibonacciTestSize + 1, 0)); + + SpapQueue globalQ; + EXPECT_TRUE(globalQ.initQueue(std::ref(ansCounter))); + globalQ.pushBeforeProcessing(fibonacciTestSize, 0U); + globalQ.processQueue(); + globalQ.waitProcessFinish(); + + std::vector solution = computeAnswerFibonacci(fibonacciTestSize + 1); + + // Tallying up from all workers + for (std::size_t i = 1; i < netw.numWorkers_; ++i) { + for (std::size_t j = 0; j < fibonacciTestSize + 1; ++j) { ansCounter[0][j] += ansCounter[i][j]; } + } + + for (std::size_t i = 0; i < fibonacciTestSize + 1; ++i) { EXPECT_EQ(ansCounter[0][i], solution[i]); } +} + TEST(SpapQueueTest, SSSPSingleWorker) { constexpr QNetwork<1, 1> netw = FULLY_CONNECTED_GRAPH<1U>(); @@ -563,6 +703,48 @@ TEST(SpapQueueTest, SSSPSingleWorker) { } } +TEST(SpapQueueTest, SSSPSingleWorkerBatch) { + constexpr QNetwork<1, 1> netw({0, 1}, {0}, {0}, {1}, {8}); + + SpapQueue, + netw, + SSSPWorker, + std::priority_queue, + std::vector>, + std::greater>>> + globalQ; + + const CSRGraph graph = make3DTorus(SSSPTorusSideLength); + const unsigned nVerts = SSSPTorusSideLength * SSSPTorusSideLength * SSSPTorusSideLength; + + std::vector> distances(nVerts); + for (auto &dist : distances) { + dist.store(std::numeric_limits::max(), std::memory_order_relaxed); + } + distances[0].store(0U, std::memory_order_relaxed); + + EXPECT_TRUE(globalQ.initQueue(std::cref(graph), std::ref(distances))); + globalQ.pushBeforeProcessing({0U, 0U}, 0U); + globalQ.processQueue(); + globalQ.waitProcessFinish(); + + const unsigned sideLengthSqr = SSSPTorusSideLength * SSSPTorusSideLength; + + for (unsigned i = 0U; i < SSSPTorusSideLength; ++i) { + for (unsigned j = 0U; j < SSSPTorusSideLength; ++j) { + for (unsigned k = 0U; k < SSSPTorusSideLength; ++k) { + const unsigned vert = k + (j * SSSPTorusSideLength) + (i * sideLengthSqr); + + const unsigned dist = std::min(k, SSSPTorusSideLength - k) + + std::min(j, SSSPTorusSideLength - j) + + std::min(i, SSSPTorusSideLength - i); + + EXPECT_EQ(distances[vert].load(std::memory_order_relaxed), dist); + } + } + } +} + TEST(SpapQueueTest, SSSPHomogeneousWorkers) { constexpr QNetwork<4, 16> netw = FULLY_CONNECTED_GRAPH<4U>(); @@ -605,6 +787,52 @@ TEST(SpapQueueTest, SSSPHomogeneousWorkers) { } } +TEST(SpapQueueTest, SSSPHomogeneousWorkersBatch) { + constexpr QNetwork<4, 16> netw({0, 4, 8, 12, 16}, + {0, 1, 2, 3, 1, 2, 3, 0, 2, 3, 0, 1, 3, 0, 1, 2}, + {0, 1, 2, 3}, + {2, 1, 1, 1, 2, 1, 1, 1, 2, 1, 1, 1, 2, 1, 1, 1}, + {4, 8, 8, 8, 4, 8, 8, 8, 4, 8, 8, 8, 4, 8, 8, 8}); + + SpapQueue, + netw, + SSSPWorker, + std::priority_queue, + std::vector>, + std::greater>>> + globalQ; + + const CSRGraph graph = make3DTorus(SSSPTorusSideLength); + const unsigned nVerts = SSSPTorusSideLength * SSSPTorusSideLength * SSSPTorusSideLength; + + std::vector> distances(nVerts); + for (auto &dist : distances) { + dist.store(std::numeric_limits::max(), std::memory_order_relaxed); + } + distances[0].store(0U, std::memory_order_relaxed); + + EXPECT_TRUE(globalQ.initQueue(std::cref(graph), std::ref(distances))); + globalQ.pushBeforeProcessing({0U, 0U}, 0U); + globalQ.processQueue(); + globalQ.waitProcessFinish(); + + const unsigned sideLengthSqr = SSSPTorusSideLength * SSSPTorusSideLength; + + for (unsigned i = 0U; i < SSSPTorusSideLength; ++i) { + for (unsigned j = 0U; j < SSSPTorusSideLength; ++j) { + for (unsigned k = 0U; k < SSSPTorusSideLength; ++k) { + const unsigned vert = k + (j * SSSPTorusSideLength) + (i * sideLengthSqr); + + const unsigned dist = std::min(k, SSSPTorusSideLength - k) + + std::min(j, SSSPTorusSideLength - j) + + std::min(i, SSSPTorusSideLength - i); + + EXPECT_EQ(distances[vert].load(std::memory_order_relaxed), dist); + } + } + } +} + TEST(SpapQueueTest, SSSPHeterogeneousWorkers) { constexpr QNetwork<2, 3> netw({0, 1, 3}, {1, 0, 1}); @@ -646,3 +874,45 @@ TEST(SpapQueueTest, SSSPHeterogeneousWorkers) { } } } + +TEST(SpapQueueTest, SSSPHeterogeneousWorkersBatch) { + constexpr QNetwork<2, 3> netw({0, 1, 3}, {1, 0, 1}, {0, 1}, {1, 2, 1}, {4, 8, 4}); + + SpapQueue, + netw, + SSSPWorker, + std::priority_queue, + std::vector>, + std::greater>>> + globalQ; + + const CSRGraph graph = make3DTorus(SSSPTorusSideLength); + const unsigned nVerts = SSSPTorusSideLength * SSSPTorusSideLength * SSSPTorusSideLength; + + std::vector> distances(nVerts); + for (auto &dist : distances) { + dist.store(std::numeric_limits::max(), std::memory_order_relaxed); + } + distances[0].store(0U, std::memory_order_relaxed); + + EXPECT_TRUE(globalQ.initQueue(std::cref(graph), std::ref(distances))); + globalQ.pushBeforeProcessing({0U, 0U}, 0U); + globalQ.processQueue(); + globalQ.waitProcessFinish(); + + const unsigned sideLengthSqr = SSSPTorusSideLength * SSSPTorusSideLength; + + for (unsigned i = 0U; i < SSSPTorusSideLength; ++i) { + for (unsigned j = 0U; j < SSSPTorusSideLength; ++j) { + for (unsigned k = 0U; k < SSSPTorusSideLength; ++k) { + const unsigned vert = k + (j * SSSPTorusSideLength) + (i * sideLengthSqr); + + const unsigned dist = std::min(k, SSSPTorusSideLength - k) + + std::min(j, SSSPTorusSideLength - j) + + std::min(i, SSSPTorusSideLength - i); + + EXPECT_EQ(distances[vert].load(std::memory_order_relaxed), dist); + } + } + } +}