From 1265beb11bc3701ec1cff33a0bd39e919142489c Mon Sep 17 00:00:00 2001 From: Raphael Steiner Date: Sat, 7 Mar 2026 21:57:08 +0100 Subject: [PATCH 1/4] partial fix --- .../ParallelPriotityQueue/SpapQueueWorker.hpp | 4 +- tests/SpapQueue.cpp | 144 +++++++++++++++++- 2 files changed, 144 insertions(+), 4 deletions(-) diff --git a/include/ParallelPriotityQueue/SpapQueueWorker.hpp b/include/ParallelPriotityQueue/SpapQueueWorker.hpp index f6f7916..c97bcb9 100644 --- a/include/ParallelPriotityQueue/SpapQueueWorker.hpp +++ b/include/ParallelPriotityQueue/SpapQueueWorker.hpp @@ -374,7 +374,7 @@ inline void WorkerResource::incrGlobalCount() const std::size_t diff = localCount_ - newLocalCount; localCount_ = newLocalCount; - globalQueue_.globalCount_.fetch_add(diff, std::memory_order_relaxed); + globalQueue_.globalCount_.fetch_add(diff, std::memory_order_acquire); } } @@ -390,7 +390,7 @@ inline void WorkerResource::decrGlobalCount() const std::size_t diff = newLocalCount + 1; localCount_ = newLocalCount; - globalQueue_.globalCount_.fetch_sub(diff, std::memory_order_relaxed); + globalQueue_.globalCount_.fetch_sub(diff, std::memory_order_release); } else { --localCount_; } diff --git a/tests/SpapQueue.cpp b/tests/SpapQueue.cpp index 29fd71c..c67ac83 100644 --- a/tests/SpapQueue.cpp +++ b/tests/SpapQueue.cpp @@ -58,7 +58,7 @@ class DivisorWorker final : public WorkerResource> &ansCounter) : WorkerResource(globalQueue, channelIndices, workerId), - locAnsCounter_(ansCounter[workerId]){} + locAnsCounter_(ansCounter[workerId]) { } DivisorWorker(const DivisorWorker &other) = delete; DivisorWorker(DivisorWorker &&other) = delete; @@ -116,7 +116,7 @@ class FibonacciWorker final : public WorkerResource> &ansCounter) : WorkerResource(globalQueue, channelIndices, workerId), - locAnsCounter_(ansCounter[workerId]){} + locAnsCounter_(ansCounter[workerId]) { } FibonacciWorker(const FibonacciWorker &other) = delete; FibonacciWorker(FibonacciWorker &&other) = delete; @@ -267,6 +267,28 @@ TEST(SpapQueueTest, DivisorsSingleWorker) { for (std::size_t i = 0; i < divisorTestMaxSize; ++i) { EXPECT_EQ(ansCounter[0][i], solution[i]); } } +TEST(SpapQueueTest, DivisorsSingleWorkerBatch) { + constexpr QNetwork<1, 1> netw({0, 1}, {0}, {0}, {1}, {8}); + + std::vector> ansCounter(netw.numWorkers_, + std::vector(divisorTestMaxSize, 0)); + + SpapQueue globalQ; + EXPECT_TRUE(globalQ.initQueue(std::ref(ansCounter))); + globalQ.pushBeforeProcessing(1U, 0U); + globalQ.processQueue(); + globalQ.waitProcessFinish(); + + std::vector solution = computeAnswerDivisors(divisorTestMaxSize); + + // Tallying up from all workers + for (std::size_t i = 1; i < netw.numWorkers_; ++i) { + for (std::size_t j = 0; j < divisorTestMaxSize; ++j) { ansCounter[0][j] += ansCounter[i][j]; } + } + + for (std::size_t i = 0; i < divisorTestMaxSize; ++i) { EXPECT_EQ(ansCounter[0][i], solution[i]); } +} + TEST(SpapQueueTest, DivisorsHomogeneousWorkers) { constexpr QNetwork<4, 16> netw = FULLY_CONNECTED_GRAPH<4U>(); @@ -289,6 +311,32 @@ TEST(SpapQueueTest, DivisorsHomogeneousWorkers) { for (std::size_t i = 0; i < divisorTestMaxSize; ++i) { EXPECT_EQ(ansCounter[0][i], solution[i]); } } +TEST(SpapQueueTest, DivisorsHomogeneousWorkersBatch) { + constexpr QNetwork<4, 16> netw({0, 4, 8, 12, 16}, + {0, 1, 2, 3, 1, 2, 3, 0, 2, 3, 0, 1, 3, 0, 1, 2}, + {0, 1, 2, 3}, + {2, 1, 1, 1, 2, 1, 1, 1, 2, 1, 1, 1, 2, 1, 1, 1}, + {4, 8, 8, 8, 4, 8, 8, 8, 4, 8, 8, 8, 4, 8, 8, 8}); + + std::vector> ansCounter(netw.numWorkers_, + std::vector(divisorTestMaxSize, 0)); + + SpapQueue globalQ; + EXPECT_TRUE(globalQ.initQueue(std::ref(ansCounter))); + globalQ.pushBeforeProcessing(1U, 0U); + globalQ.processQueue(); + globalQ.waitProcessFinish(); + + std::vector solution = computeAnswerDivisors(divisorTestMaxSize); + + // Tallying up from all workers + for (std::size_t i = 1; i < netw.numWorkers_; ++i) { + for (std::size_t j = 0; j < divisorTestMaxSize; ++j) { ansCounter[0][j] += ansCounter[i][j]; } + } + + for (std::size_t i = 0; i < divisorTestMaxSize; ++i) { EXPECT_EQ(ansCounter[0][i], solution[i]); } +} + TEST(SpapQueueTest, DivisorsHeterogeneousWorkers) { constexpr QNetwork<2, 3> netw({0, 1, 3}, {1, 0, 1}); @@ -311,6 +359,28 @@ TEST(SpapQueueTest, DivisorsHeterogeneousWorkers) { for (std::size_t i = 0; i < divisorTestMaxSize; ++i) { EXPECT_EQ(ansCounter[0][i], solution[i]); } } +TEST(SpapQueueTest, DivisorsHeterogeneousWorkersBatch) { + constexpr QNetwork<2, 3> netw({0, 1, 3}, {1, 0, 1}, {0, 1}, {1, 2, 1}, {4, 8, 4}); + + std::vector> ansCounter(netw.numWorkers_, + std::vector(divisorTestMaxSize, 0)); + + SpapQueue globalQ; + EXPECT_TRUE(globalQ.initQueue(std::ref(ansCounter))); + globalQ.pushBeforeProcessing(1U, 0U); + globalQ.processQueue(); + globalQ.waitProcessFinish(); + + std::vector solution = computeAnswerDivisors(divisorTestMaxSize); + + // Tallying up from all workers + for (std::size_t i = 1; i < netw.numWorkers_; ++i) { + for (std::size_t j = 0; j < divisorTestMaxSize; ++j) { ansCounter[0][j] += ansCounter[i][j]; } + } + + for (std::size_t i = 0; i < divisorTestMaxSize; ++i) { EXPECT_EQ(ansCounter[0][i], solution[i]); } +} + TEST(SpapQueueTest, DivisorsPushSafeHomogeneousWorkers) { constexpr QNetwork<4, 16> netw = FULLY_CONNECTED_GRAPH<4U>(); @@ -477,6 +547,28 @@ TEST(SpapQueueTest, FibonacciSingleWorker) { for (std::size_t i = 0; i < fibonacciTestSize + 1; ++i) { EXPECT_EQ(ansCounter[0][i], solution[i]); } } +TEST(SpapQueueTest, FibonacciSingleWorkerBatch) { + constexpr QNetwork<1, 1> netw({0, 1}, {0}, {0}, {1}, {8}); + + std::vector> ansCounter(netw.numWorkers_, + std::vector(fibonacciTestSize + 1, 0)); + + SpapQueue globalQ; + EXPECT_TRUE(globalQ.initQueue(std::ref(ansCounter))); + globalQ.pushBeforeProcessing(fibonacciTestSize, 0U); + globalQ.processQueue(); + globalQ.waitProcessFinish(); + + std::vector solution = computeAnswerFibonacci(fibonacciTestSize + 1); + + // Tallying up from all workers + for (std::size_t i = 1; i < netw.numWorkers_; ++i) { + for (std::size_t j = 0; j < fibonacciTestSize + 1; ++j) { ansCounter[0][j] += ansCounter[i][j]; } + } + + for (std::size_t i = 0; i < fibonacciTestSize + 1; ++i) { EXPECT_EQ(ansCounter[0][i], solution[i]); } +} + TEST(SpapQueueTest, FibonacciHomogeneousWorkers) { constexpr QNetwork<4, 16> netw = FULLY_CONNECTED_GRAPH<4U>(); @@ -499,6 +591,32 @@ TEST(SpapQueueTest, FibonacciHomogeneousWorkers) { for (std::size_t i = 0; i < fibonacciTestSize + 1; ++i) { EXPECT_EQ(ansCounter[0][i], solution[i]); } } +TEST(SpapQueueTest, FibonacciHomogeneousWorkersBatch) { + constexpr QNetwork<4, 16> netw({0, 4, 8, 12, 16}, + {0, 1, 2, 3, 1, 2, 3, 0, 2, 3, 0, 1, 3, 0, 1, 2}, + {0, 1, 2, 3}, + {2, 1, 1, 1, 2, 1, 1, 1, 2, 1, 1, 1, 2, 1, 1, 1}, + {4, 8, 8, 8, 4, 8, 8, 8, 4, 8, 8, 8, 4, 8, 8, 8}); + + std::vector> ansCounter(netw.numWorkers_, + std::vector(fibonacciTestSize + 1, 0)); + + SpapQueue globalQ; + EXPECT_TRUE(globalQ.initQueue(std::ref(ansCounter))); + globalQ.pushBeforeProcessing(fibonacciTestSize, 0U); + globalQ.processQueue(); + globalQ.waitProcessFinish(); + + std::vector solution = computeAnswerFibonacci(fibonacciTestSize + 1); + + // Tallying up from all workers + for (std::size_t i = 1; i < netw.numWorkers_; ++i) { + for (std::size_t j = 0; j < fibonacciTestSize + 1; ++j) { ansCounter[0][j] += ansCounter[i][j]; } + } + + for (std::size_t i = 0; i < fibonacciTestSize + 1; ++i) { EXPECT_EQ(ansCounter[0][i], solution[i]); } +} + TEST(SpapQueueTest, FibonacciHeterogeneousWorkers) { constexpr QNetwork<2, 3> netw({0, 1, 3}, {1, 0, 1}); @@ -521,6 +639,28 @@ TEST(SpapQueueTest, FibonacciHeterogeneousWorkers) { for (std::size_t i = 0; i < fibonacciTestSize + 1; ++i) { EXPECT_EQ(ansCounter[0][i], solution[i]); } } +TEST(SpapQueueTest, FibonacciHeterogeneousWorkersBatch) { + constexpr QNetwork<2, 3> netw({0, 1, 3}, {1, 0, 1}, {0, 1}, {1, 2, 1}, {4, 8, 8}); + + std::vector> ansCounter(netw.numWorkers_, + std::vector(fibonacciTestSize + 1, 0)); + + SpapQueue globalQ; + EXPECT_TRUE(globalQ.initQueue(std::ref(ansCounter))); + globalQ.pushBeforeProcessing(fibonacciTestSize, 0U); + globalQ.processQueue(); + globalQ.waitProcessFinish(); + + std::vector solution = computeAnswerFibonacci(fibonacciTestSize + 1); + + // Tallying up from all workers + for (std::size_t i = 1; i < netw.numWorkers_; ++i) { + for (std::size_t j = 0; j < fibonacciTestSize + 1; ++j) { ansCounter[0][j] += ansCounter[i][j]; } + } + + for (std::size_t i = 0; i < fibonacciTestSize + 1; ++i) { EXPECT_EQ(ansCounter[0][i], solution[i]); } +} + TEST(SpapQueueTest, SSSPSingleWorker) { constexpr QNetwork<1, 1> netw = FULLY_CONNECTED_GRAPH<1U>(); From 9e47a1728f19c1f2b20c7fa8c1753775a5b854f3 Mon Sep 17 00:00:00 2001 From: Raphael Steiner Date: Sat, 7 Mar 2026 22:06:47 +0100 Subject: [PATCH 2/4] more tests --- tests/SpapQueue.cpp | 132 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 131 insertions(+), 1 deletion(-) diff --git a/tests/SpapQueue.cpp b/tests/SpapQueue.cpp index c67ac83..645ae72 100644 --- a/tests/SpapQueue.cpp +++ b/tests/SpapQueue.cpp @@ -640,7 +640,7 @@ TEST(SpapQueueTest, FibonacciHeterogeneousWorkers) { } TEST(SpapQueueTest, FibonacciHeterogeneousWorkersBatch) { - constexpr QNetwork<2, 3> netw({0, 1, 3}, {1, 0, 1}, {0, 1}, {1, 2, 1}, {4, 8, 8}); + constexpr QNetwork<2, 3> netw({0, 1, 3}, {1, 0, 1}, {0, 1}, {1, 2, 1}, {4, 8, 4}); std::vector> ansCounter(netw.numWorkers_, std::vector(fibonacciTestSize + 1, 0)); @@ -703,6 +703,48 @@ TEST(SpapQueueTest, SSSPSingleWorker) { } } +TEST(SpapQueueTest, SSSPSingleWorkerBatch) { + constexpr QNetwork<1, 1> netw({0, 1}, {0}, {0}, {1}, {8}); + + SpapQueue, + netw, + SSSPWorker, + std::priority_queue, + std::vector>, + std::greater>>> + globalQ; + + const CSRGraph graph = make3DTorus(SSSPTorusSideLength); + const unsigned nVerts = SSSPTorusSideLength * SSSPTorusSideLength * SSSPTorusSideLength; + + std::vector> distances(nVerts); + for (auto &dist : distances) { + dist.store(std::numeric_limits::max(), std::memory_order_relaxed); + } + distances[0].store(0U, std::memory_order_relaxed); + + EXPECT_TRUE(globalQ.initQueue(std::cref(graph), std::ref(distances))); + globalQ.pushBeforeProcessing({0U, 0U}, 0U); + globalQ.processQueue(); + globalQ.waitProcessFinish(); + + const unsigned sideLengthSqr = SSSPTorusSideLength * SSSPTorusSideLength; + + for (unsigned i = 0U; i < SSSPTorusSideLength; ++i) { + for (unsigned j = 0U; j < SSSPTorusSideLength; ++j) { + for (unsigned k = 0U; k < SSSPTorusSideLength; ++k) { + const unsigned vert = k + (j * SSSPTorusSideLength) + (i * sideLengthSqr); + + const unsigned dist = std::min(k, SSSPTorusSideLength - k) + + std::min(j, SSSPTorusSideLength - j) + + std::min(i, SSSPTorusSideLength - i); + + EXPECT_EQ(distances[vert].load(std::memory_order_relaxed), dist); + } + } + } +} + TEST(SpapQueueTest, SSSPHomogeneousWorkers) { constexpr QNetwork<4, 16> netw = FULLY_CONNECTED_GRAPH<4U>(); @@ -745,6 +787,52 @@ TEST(SpapQueueTest, SSSPHomogeneousWorkers) { } } +TEST(SpapQueueTest, SSSPHomogeneousWorkersBatch) { + constexpr QNetwork<4, 16> netw({0, 4, 8, 12, 16}, + {0, 1, 2, 3, 1, 2, 3, 0, 2, 3, 0, 1, 3, 0, 1, 2}, + {0, 1, 2, 3}, + {2, 1, 1, 1, 2, 1, 1, 1, 2, 1, 1, 1, 2, 1, 1, 1}, + {4, 8, 8, 8, 4, 8, 8, 8, 4, 8, 8, 8, 4, 8, 8, 8}); + + SpapQueue, + netw, + SSSPWorker, + std::priority_queue, + std::vector>, + std::greater>>> + globalQ; + + const CSRGraph graph = make3DTorus(SSSPTorusSideLength); + const unsigned nVerts = SSSPTorusSideLength * SSSPTorusSideLength * SSSPTorusSideLength; + + std::vector> distances(nVerts); + for (auto &dist : distances) { + dist.store(std::numeric_limits::max(), std::memory_order_relaxed); + } + distances[0].store(0U, std::memory_order_relaxed); + + EXPECT_TRUE(globalQ.initQueue(std::cref(graph), std::ref(distances))); + globalQ.pushBeforeProcessing({0U, 0U}, 0U); + globalQ.processQueue(); + globalQ.waitProcessFinish(); + + const unsigned sideLengthSqr = SSSPTorusSideLength * SSSPTorusSideLength; + + for (unsigned i = 0U; i < SSSPTorusSideLength; ++i) { + for (unsigned j = 0U; j < SSSPTorusSideLength; ++j) { + for (unsigned k = 0U; k < SSSPTorusSideLength; ++k) { + const unsigned vert = k + (j * SSSPTorusSideLength) + (i * sideLengthSqr); + + const unsigned dist = std::min(k, SSSPTorusSideLength - k) + + std::min(j, SSSPTorusSideLength - j) + + std::min(i, SSSPTorusSideLength - i); + + EXPECT_EQ(distances[vert].load(std::memory_order_relaxed), dist); + } + } + } +} + TEST(SpapQueueTest, SSSPHeterogeneousWorkers) { constexpr QNetwork<2, 3> netw({0, 1, 3}, {1, 0, 1}); @@ -786,3 +874,45 @@ TEST(SpapQueueTest, SSSPHeterogeneousWorkers) { } } } + +TEST(SpapQueueTest, SSSPHeterogeneousWorkersBatch) { + constexpr QNetwork<2, 3> netw({0, 1, 3}, {1, 0, 1}, {0, 1}, {1, 2, 1}, {4, 8, 4}); + + SpapQueue, + netw, + SSSPWorker, + std::priority_queue, + std::vector>, + std::greater>>> + globalQ; + + const CSRGraph graph = make3DTorus(SSSPTorusSideLength); + const unsigned nVerts = SSSPTorusSideLength * SSSPTorusSideLength * SSSPTorusSideLength; + + std::vector> distances(nVerts); + for (auto &dist : distances) { + dist.store(std::numeric_limits::max(), std::memory_order_relaxed); + } + distances[0].store(0U, std::memory_order_relaxed); + + EXPECT_TRUE(globalQ.initQueue(std::cref(graph), std::ref(distances))); + globalQ.pushBeforeProcessing({0U, 0U}, 0U); + globalQ.processQueue(); + globalQ.waitProcessFinish(); + + const unsigned sideLengthSqr = SSSPTorusSideLength * SSSPTorusSideLength; + + for (unsigned i = 0U; i < SSSPTorusSideLength; ++i) { + for (unsigned j = 0U; j < SSSPTorusSideLength; ++j) { + for (unsigned k = 0U; k < SSSPTorusSideLength; ++k) { + const unsigned vert = k + (j * SSSPTorusSideLength) + (i * sideLengthSqr); + + const unsigned dist = std::min(k, SSSPTorusSideLength - k) + + std::min(j, SSSPTorusSideLength - j) + + std::min(i, SSSPTorusSideLength - i); + + EXPECT_EQ(distances[vert].load(std::memory_order_relaxed), dist); + } + } + } +} From 608e35a97b33797444005f7ff97161d121b6d5a9 Mon Sep 17 00:00:00 2001 From: Raphael Steiner Date: Sat, 7 Mar 2026 22:26:06 +0100 Subject: [PATCH 3/4] fixed bug --- .../ParallelPriotityQueue/SpapQueueWorker.hpp | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/include/ParallelPriotityQueue/SpapQueueWorker.hpp b/include/ParallelPriotityQueue/SpapQueueWorker.hpp index c97bcb9..b955c14 100644 --- a/include/ParallelPriotityQueue/SpapQueueWorker.hpp +++ b/include/ParallelPriotityQueue/SpapQueueWorker.hpp @@ -247,13 +247,15 @@ inline bool WorkerResource::pushOutBuffer() n const std::size_t port = GlobalQType::netw_.targetPort_[*channelPointer_]; successfulPush = globalQueue_.pushInternal(itBeginFirst, itEndFirst, targetWorker, port); - if (successfulPush) { bufferTail_ += numElementsFirstPush; } - - if (numElementsSecondPush > 0U) { - const bool successfulSecondPush - = globalQueue_.pushInternal(outBuffer_.begin(), itEndSecond, targetWorker, port); - successfulPush |= successfulSecondPush; - if (successfulSecondPush) { bufferTail_ += numElementsSecondPush; }; + if (successfulPush) { + bufferTail_ += numElementsFirstPush; + + if (numElementsSecondPush > 0U) { + const bool successfulSecondPush + = globalQueue_.pushInternal(outBuffer_.begin(), itEndSecond, targetWorker, port); + successfulPush |= successfulSecondPush; + if (successfulSecondPush) { bufferTail_ += numElementsSecondPush; }; + } } } From c245c4ff31a0c58e6c9f33dc29cded75272035bb Mon Sep 17 00:00:00 2001 From: Raphael Steiner Date: Sun, 8 Mar 2026 18:33:41 +0100 Subject: [PATCH 4/4] clean up --- include/ParallelPriotityQueue/SpapQueueWorker.hpp | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/include/ParallelPriotityQueue/SpapQueueWorker.hpp b/include/ParallelPriotityQueue/SpapQueueWorker.hpp index b955c14..859e590 100644 --- a/include/ParallelPriotityQueue/SpapQueueWorker.hpp +++ b/include/ParallelPriotityQueue/SpapQueueWorker.hpp @@ -223,15 +223,13 @@ inline void WorkerResource::enqueueGlobal(con */ template inline bool WorkerResource::pushOutBuffer() noexcept { - bool successfulPush; - const std::size_t batch = GlobalQType::netw_.batchSize_[*channelPointer_]; assert(batch <= bufferHead_ - bufferTail_); const std::size_t targetWorker = GlobalQType::netw_.edgeTargets_[*channelPointer_]; if (targetWorker == GlobalQType::netw_.numWorkers_) { // netw.numWorkers_ is reserved for self-push pushOutBufferSelf(batch); - successfulPush = true; + return true; } else { const std::size_t reducedTail = bufferTail_ % outBuffer_.size(); const std::size_t numElementsFirstPush = std::min(outBuffer_.size() - reducedTail, batch); @@ -246,20 +244,18 @@ inline bool WorkerResource::pushOutBuffer() n static_cast(numElementsSecondPush)); const std::size_t port = GlobalQType::netw_.targetPort_[*channelPointer_]; - successfulPush = globalQueue_.pushInternal(itBeginFirst, itEndFirst, targetWorker, port); + const bool successfulPush = globalQueue_.pushInternal(itBeginFirst, itEndFirst, targetWorker, port); if (successfulPush) { bufferTail_ += numElementsFirstPush; if (numElementsSecondPush > 0U) { const bool successfulSecondPush = globalQueue_.pushInternal(outBuffer_.begin(), itEndSecond, targetWorker, port); - successfulPush |= successfulSecondPush; if (successfulSecondPush) { bufferTail_ += numElementsSecondPush; }; } } + return successfulPush; } - - return successfulPush; } /**