From 4a1628238fa721e841905ed18a1a2e27b82a2ddf Mon Sep 17 00:00:00 2001 From: E Sequeira <5458743+geseq@users.noreply.github.com> Date: Mon, 26 May 2025 11:03:23 +0100 Subject: [PATCH] add return mode --- bench/simple_bench.cpp | 7 +-- include/mpsc.hpp | 46 +++++++++++------- include/spsc.hpp | 37 ++++++++++----- include/wait_strategy.hpp | 18 +++---- test/fastchan_mpsc_test.cpp | 91 ++++++++++++++++++----------------- test/fastchan_spsc_test.cpp | 94 ++++++++++++++++++++----------------- 6 files changed, 164 insertions(+), 129 deletions(-) diff --git a/bench/simple_bench.cpp b/bench/simple_bench.cpp index feb4534..c6a318a 100644 --- a/bench/simple_bench.cpp +++ b/bench/simple_bench.cpp @@ -181,11 +181,12 @@ struct alignas(hardware_destructive_interference_size) AlignedData { operator int() const { return value; } }; -template +template class RigtorpSPSC { public: - using put_t = typename std::conditional::value, void, bool>::type; - using get_t = typename std::conditional::value, T, std::optional>::type; + using put_t = typename std::conditional::type; + using get_t = typename std::conditional>::type; RigtorpSPSC() = default; diff --git a/include/mpsc.hpp b/include/mpsc.hpp index cebaac4..1a5e649 100644 --- a/include/mpsc.hpp +++ b/include/mpsc.hpp @@ -4,17 +4,19 @@ #include #include #include +#include #include "common.hpp" #include "wait_strategy.hpp" namespace fastchan { -template +template class MPSC { public: - using put_t = typename std::conditional::value, void, bool>::type; - using get_t = typename std::conditional::value, T, std::optional>::type; + using put_t = typename std::conditional::type; + using get_t = typename std::conditional>::type; MPSC() = default; @@ -24,7 +26,7 @@ class MPSC { while (p.write_index_cache_ > (p.reader_index_cache_ + common_.index_mask_)) { p.write_index_cache_ = next_free_index_.load(std::memory_order_acquire); p.reader_index_cache_ = consumer_.reader_index_.load(std::memory_order_relaxed); - if constexpr (std::is_same::value) { + if constexpr (put_mode == ReturnMode::NonBlocking) { return false; } else { common_.put_wait_.wait( @@ -36,18 +38,25 @@ class MPSC { contents_[p.write_index_cache_ & common_.index_mask_] = value; - // commit in the correct order to avoid problems while (last_committed_index_.load(std::memory_order_relaxed) != p.write_index_cache_) { - // we don't return at this point even in case of ReturnImmediatelyStrategy as we've already taken the token - common_.put_wait_.wait([this] { return last_committed_index_.load(std::memory_order_relaxed) == p.write_index_cache_; }); + if constexpr (put_mode == ReturnMode::Blocking) { + auto expected_index = p.write_index_cache_; + common_.put_wait_.wait([this, expected_index] { return last_committed_index_.load(std::memory_order_relaxed) == expected_index; }); + } } last_committed_index_.store(++p.write_index_cache_, std::memory_order_release); - common_.get_wait_.notify(); - common_.put_wait_.notify(); + if constexpr (put_mode == ReturnMode::Blocking || get_mode == ReturnMode::Blocking) { + if constexpr (get_mode == ReturnMode::Blocking) { + common_.get_wait_.notify(); + } + if constexpr (put_mode == ReturnMode::Blocking) { + common_.put_wait_.notify(); + } + } - if constexpr (std::is_same::value) { + if constexpr (put_mode == ReturnMode::NonBlocking) { return true; } } @@ -55,7 +64,7 @@ class MPSC { get_t get() noexcept { while (consumer_.reader_index_2_ >= consumer_.last_committed_index_cache_) { consumer_.last_committed_index_cache_ = last_committed_index_.load(std::memory_order_relaxed); - if constexpr (std::is_same::value) { + if constexpr (get_mode == ReturnMode::NonBlocking) { return std::nullopt; } else { common_.get_wait_.wait([this] { return consumer_.reader_index_2_ < last_committed_index_.load(std::memory_order_relaxed); }); @@ -65,7 +74,11 @@ class MPSC { auto contents = contents_[consumer_.reader_index_2_ & common_.index_mask_]; consumer_.reader_index_.store(++consumer_.reader_index_2_, std::memory_order_release); - common_.put_wait_.notify(); + if constexpr (put_mode == ReturnMode::Blocking || get_mode == ReturnMode::Blocking) { + if constexpr (put_mode == ReturnMode::Blocking) { + common_.put_wait_.notify(); + } + } return contents; } @@ -77,8 +90,6 @@ class MPSC { bool isEmpty() const noexcept { return consumer_.reader_index_.load(std::memory_order_acquire) >= last_committed_index_.load(std::memory_order_acquire); } bool isFull() const noexcept { - // this isFull is about whether there's all writer slots to the buffer are taken rather than whether those - // changes have actually been committed return next_free_index_.load(std::memory_order_acquire) > (consumer_.reader_index_.load(std::memory_order_acquire) + common_.index_mask_); } @@ -89,8 +100,11 @@ class MPSC { alignas(hardware_destructive_interference_size) std::atomic last_committed_index_{0}; struct alignas(hardware_destructive_interference_size) Common { - GetWaitStrategy get_wait_{}; - PutWaitStrategy put_wait_{}; + static_assert(put_mode == ReturnMode::NonBlocking || std::is_base_of_v, PutWaitStrategy>); + static_assert(get_mode == ReturnMode::NonBlocking || std::is_base_of_v, GetWaitStrategy>); + + typename std::conditional::type get_wait_{}; + typename std::conditional::type put_wait_{}; const std::size_t index_mask_ = roundUpNextPowerOfTwo(min_size) - 1; }; diff --git a/include/spsc.hpp b/include/spsc.hpp index e83bfaf..c384edc 100644 --- a/include/spsc.hpp +++ b/include/spsc.hpp @@ -6,37 +6,42 @@ #include #include #include +#include #include "common.hpp" #include "wait_strategy.hpp" namespace fastchan { -template +template class SPSC { public: - using put_t = typename std::conditional::value, void, bool>::type; - using get_t = typename std::conditional::value, T, std::optional>::type; + using put_t = typename std::conditional::type; + using get_t = typename std::conditional>::type; SPSC() = default; put_t put(const T &value) noexcept { while (producer_.next_free_index_2_ > (producer_.reader_index_cache_ + common_.index_mask_)) { producer_.reader_index_cache_ = consumer_.reader_index_.load(std::memory_order_acquire); - if constexpr (std::is_same::value) { + if constexpr (put_mode == ReturnMode::NonBlocking) { return false; } else { common_.put_wait_.wait( [this] { return producer_.next_free_index_2_ <= (consumer_.reader_index_.load(std::memory_order_acquire) + common_.index_mask_); }); } } - contents_[producer_.next_free_index_2_ & common_.index_mask_] = value; producer_.next_free_index_.store(++producer_.next_free_index_2_, std::memory_order_release); - common_.get_wait_.notify(); + if constexpr (put_mode == ReturnMode::Blocking || get_mode == ReturnMode::Blocking) { + if constexpr (get_mode == ReturnMode::Blocking) { + common_.get_wait_.notify(); + } + } - if constexpr (std::is_same::value) { + if constexpr (put_mode == ReturnMode::NonBlocking) { return true; } } @@ -44,17 +49,20 @@ class SPSC { get_t get() noexcept { while (consumer_.reader_index_2_ >= consumer_.next_free_index_cache_) { consumer_.next_free_index_cache_ = producer_.next_free_index_.load(std::memory_order_acquire); - if constexpr (std::is_same::value) { + if constexpr (get_mode == ReturnMode::NonBlocking) { return std::nullopt; } else { common_.get_wait_.wait([this] { return consumer_.reader_index_2_ < producer_.next_free_index_.load(std::memory_order_acquire); }); } } - auto contents = contents_[consumer_.reader_index_2_ & common_.index_mask_]; consumer_.reader_index_.store(++consumer_.reader_index_2_, std::memory_order_release); - common_.put_wait_.notify(); + if constexpr (put_mode == ReturnMode::Blocking || get_mode == ReturnMode::Blocking) { + if constexpr (put_mode == ReturnMode::Blocking) { + common_.put_wait_.notify(); + } + } return contents; } @@ -75,8 +83,11 @@ class SPSC { std::array contents_; struct alignas(hardware_destructive_interference_size) Common { - GetWaitStrategy get_wait_{}; - PutWaitStrategy put_wait_{}; + static_assert(put_mode == ReturnMode::NonBlocking || std::is_base_of_v, PutWaitStrategy>); + static_assert(get_mode == ReturnMode::NonBlocking || std::is_base_of_v, GetWaitStrategy>); + + typename std::conditional::type put_wait_{}; + typename std::conditional::type get_wait_{}; const std::size_t index_mask_ = roundUpNextPowerOfTwo(min_size) - 1; }; @@ -96,5 +107,5 @@ class SPSC { Producer producer_; Consumer consumer_; }; -} // namespace fastchan +} // namespace fastchan diff --git a/include/wait_strategy.hpp b/include/wait_strategy.hpp index 959ec99..bf585de 100644 --- a/include/wait_strategy.hpp +++ b/include/wait_strategy.hpp @@ -2,7 +2,6 @@ #include #include #include - #include "common.hpp" #ifndef FASTCHANWAIT_HPP @@ -10,6 +9,12 @@ namespace fastchan { +// Return mode enum to distinguish between blocking and non-blocking behavior +enum class ReturnMode { + Blocking, + NonBlocking +}; + // WaitStrategyInterface is the interface for actual implementation of a wait strategy handler template class WaitStrategyInterface { @@ -19,13 +24,6 @@ class WaitStrategyInterface { inline void notify() {} }; -class ReturnImmediateStrategy : public WaitStrategyInterface { - public: - template - inline void wait(Predicate p) {} - inline void notify() {} -}; - class NoOpWaitStrategy : public WaitStrategyInterface { public: template @@ -51,16 +49,14 @@ class YieldWaitStrategy : public WaitStrategyInterface { inline void notify() {} }; -class CVWaitStrategy : public WaitStrategyInterface { +class CVWaitStrategy : public WaitStrategyInterface { public: template inline void wait(Predicate p) { std::unique_lock lock(mutex_); cv_.wait_for(lock, std::chrono::nanoseconds(100), p); } - inline void notify() { cv_.notify_all(); } - private: std::condition_variable cv_; std::mutex mutex_; diff --git a/test/fastchan_mpsc_test.cpp b/test/fastchan_mpsc_test.cpp index b768f5e..2368338 100644 --- a/test/fastchan_mpsc_test.cpp +++ b/test/fastchan_mpsc_test.cpp @@ -11,16 +11,17 @@ using namespace std::chrono_literals; const auto IterationsMultiplier = 100; -template +template void testMPSCSingleThreaded_Fill() { constexpr std::size_t chan_size = (iterations / 2) + 1; - fastchan::MPSC chan; + fastchan::MPSC chan; assert(chan.size() == 0); assert(chan.isEmpty() == true); // Test filling up with a single thread for (int i = 0; i < iterations; ++i) { - if constexpr (std::is_same::value) { + if constexpr (put_mode == fastchan::ReturnMode::NonBlocking) { auto result = false; do { result = chan.put(i); @@ -44,10 +45,11 @@ void testMPSCSingleThreaded_Fill() { assert(chan.isEmpty() == false); } -template +template void testMPSCSingleThreaded_PutGet() { constexpr std::size_t chan_size = (iterations / 2) + 1; - fastchan::MPSC chan; + fastchan::MPSC chan; assert(chan.size() == 0); assert(chan.isEmpty() == true); @@ -55,7 +57,7 @@ void testMPSCSingleThreaded_PutGet() { // Test put and get with a single thread for (int i = 0; i < iterations; ++i) { - if constexpr (std::is_same::value) { + if constexpr (put_mode == fastchan::ReturnMode::NonBlocking) { auto result = false; do { result = chan.put(i); @@ -68,7 +70,7 @@ void testMPSCSingleThreaded_PutGet() { } for (int i = 0; i < iterations; ++i) { - if constexpr (std::is_same::value) { + if constexpr (get_mode == fastchan::ReturnMode::NonBlocking) { auto&& val = chan.get(); while (!val) { val = chan.get(); @@ -85,16 +87,17 @@ void testMPSCSingleThreaded_PutGet() { assert(chan.size() == 0); } -template +template void testMPSCMultiThreadedSingleProducer() { constexpr std::size_t chan_size = (iterations / 2) + 1; - fastchan::MPSC chan; + fastchan::MPSC chan; auto total_iterations = IterationsMultiplier * iterations; // Test put and get with multiple threads std::thread producer([&] { for (int i = 1; i <= total_iterations; ++i) { - if constexpr (std::is_same::value) { + if constexpr (put_mode == fastchan::ReturnMode::NonBlocking) { auto result = false; do { result = chan.put(i); @@ -107,7 +110,7 @@ void testMPSCMultiThreadedSingleProducer() { std::thread consumer([&] { for (int i = 1; i <= total_iterations;) { - if constexpr (std::is_same::value) { + if constexpr (get_mode == fastchan::ReturnMode::NonBlocking) { auto&& val = chan.get(); while (!val) { val = chan.get(); @@ -129,10 +132,11 @@ void testMPSCMultiThreadedSingleProducer() { assert(chan.size() == 0); } -template +template void testMPSCMultiThreadedMultiProducer() { constexpr std::size_t chan_size = (iterations / 2) + 1; - fastchan::MPSC chan; + fastchan::MPSC chan; size_t total_iterations = IterationsMultiplier * iterations; size_t total = num_threads * (total_iterations * (total_iterations + 1) / 2); @@ -143,7 +147,7 @@ void testMPSCMultiThreadedMultiProducer() { // Test put and get with multiple threads producers[i] = std::thread([&] { for (int i = 1; i <= total_iterations; ++i) { - if constexpr (std::is_same::value) { + if constexpr (put_mode == fastchan::ReturnMode::NonBlocking) { auto result = false; do { result = chan.put(i); @@ -157,7 +161,7 @@ void testMPSCMultiThreadedMultiProducer() { std::thread consumer([&] { for (int i = 1; i <= total_iterations * num_threads;) { - if constexpr (std::is_same::value) { + if constexpr (get_mode == fastchan::ReturnMode::NonBlocking) { auto&& val = chan.get(); while (!val) { val = chan.get(); @@ -182,44 +186,47 @@ void testMPSCMultiThreadedMultiProducer() { assert(chan.size() == 0); } -template +template void testMPSC() { - testMPSCSingleThreaded_Fill<4, put_wait_type, get_wait_type>(); - testMPSCSingleThreaded_PutGet<4, put_wait_type, get_wait_type>(); - testMPSCMultiThreadedSingleProducer<4, put_wait_type, get_wait_type>(); + testMPSCSingleThreaded_Fill<4, put_mode, get_mode, put_wait_type, get_wait_type>(); + testMPSCSingleThreaded_PutGet<4, put_mode, get_mode, put_wait_type, get_wait_type>(); + testMPSCMultiThreadedSingleProducer<4, put_mode, get_mode, put_wait_type, get_wait_type>(); if (std::thread::hardware_concurrency() > 5) { - testMPSCMultiThreadedMultiProducer<4, 3, put_wait_type, get_wait_type>(); - testMPSCMultiThreadedMultiProducer<4, 5, put_wait_type, get_wait_type>(); + testMPSCMultiThreadedMultiProducer<4, 3, put_mode, get_mode, put_wait_type, get_wait_type>(); + testMPSCMultiThreadedMultiProducer<4, 5, put_mode, get_mode, put_wait_type, get_wait_type>(); } else { - testMPSCMultiThreadedMultiProducer<4, 2, put_wait_type, get_wait_type>(); + testMPSCMultiThreadedMultiProducer<4, 2, put_mode, get_mode, put_wait_type, get_wait_type>(); } - testMPSCSingleThreaded_Fill<4096, put_wait_type, get_wait_type>(); - testMPSCSingleThreaded_PutGet<4096, put_wait_type, get_wait_type>(); - testMPSCMultiThreadedSingleProducer<4096, put_wait_type, get_wait_type>(); + testMPSCSingleThreaded_Fill<4096, put_mode, get_mode, put_wait_type, get_wait_type>(); + testMPSCSingleThreaded_PutGet<4096, put_mode, get_mode, put_wait_type, get_wait_type>(); + testMPSCMultiThreadedSingleProducer<4096, put_mode, get_mode, put_wait_type, get_wait_type>(); if (std::thread::hardware_concurrency() > 5) { - testMPSCMultiThreadedMultiProducer<4096, 3, put_wait_type, get_wait_type>(); - testMPSCMultiThreadedMultiProducer<4096, 5, put_wait_type, get_wait_type>(); + testMPSCMultiThreadedMultiProducer<4096, 3, put_mode, get_mode, put_wait_type, get_wait_type>(); + testMPSCMultiThreadedMultiProducer<4096, 5, put_mode, get_mode, put_wait_type, get_wait_type>(); } else { - testMPSCMultiThreadedMultiProducer<4096, 2, put_wait_type, get_wait_type>(); + testMPSCMultiThreadedMultiProducer<4096, 2, put_mode, get_mode, put_wait_type, get_wait_type>(); } } int main() { - testMPSC(); - testMPSC(); - testMPSC(); - testMPSC(); - - testMPSC(); - testMPSC(); - testMPSC(); - testMPSC(); - - testMPSC(); - testMPSC(); - testMPSC(); - testMPSC(); + using namespace fastchan; + + testMPSC(); + testMPSC(); + testMPSC(); + testMPSC(); + + testMPSC(); + testMPSC(); + testMPSC(); + testMPSC(); + + testMPSC(); + testMPSC(); + testMPSC(); + testMPSC(); return 0; } diff --git a/test/fastchan_spsc_test.cpp b/test/fastchan_spsc_test.cpp index 7496504..23b7ff4 100644 --- a/test/fastchan_spsc_test.cpp +++ b/test/fastchan_spsc_test.cpp @@ -8,15 +8,16 @@ using namespace std::chrono_literals; const auto IterationsMultiplier = 100; -template +template void testSPSCSingleThreaded_Fill() { constexpr std::size_t chan_size = (iterations / 2) + 1; - fastchan::SPSC chan; + fastchan::SPSC chan; assert(chan.size() == 0); assert(chan.isEmpty() == true); for (int i = 0; i < iterations; ++i) { - if constexpr (std::is_same::value) { + if constexpr (put_mode == fastchan::ReturnMode::NonBlocking) { assert(chan.put(i)); } else { chan.put(i); @@ -35,10 +36,11 @@ void testSPSCSingleThreaded_Fill() { assert(chan.isEmpty() == false); } -template +template void testSPSCSingleThreaded_PutGet() { constexpr std::size_t chan_size = (iterations / 2) + 1; - fastchan::SPSC chan; + fastchan::SPSC chan; assert(chan.size() == 0); assert(chan.isEmpty() == true); @@ -46,7 +48,7 @@ void testSPSCSingleThreaded_PutGet() { // Test put and get with a single thread for (int i = 0; i < iterations; ++i) { - if constexpr (std::is_same::value) { + if constexpr (put_mode == fastchan::ReturnMode::NonBlocking) { auto result = false; do { result = chan.put(i); @@ -57,7 +59,7 @@ void testSPSCSingleThreaded_PutGet() { } for (int i = 0; i < iterations; ++i) { - if constexpr (std::is_same::value) { + if constexpr (get_mode == fastchan::ReturnMode::NonBlocking) { auto val = chan.get(); while (val == std::nullopt) val = chan.get(); assert(val == i); @@ -70,17 +72,18 @@ void testSPSCSingleThreaded_PutGet() { assert(chan.size() == 0); } -template +template void testSPSCMultiThreaded() { constexpr std::size_t chan_size = (iterations / 2) + 1; - fastchan::SPSC chan; + fastchan::SPSC chan; auto total_iterations = IterationsMultiplier * iterations; // Test put and get with multiple threads std::thread producer([&] { for (int i = 1; i <= total_iterations; ++i) { - if constexpr (std::is_same::value) { + if constexpr (put_mode == fastchan::ReturnMode::NonBlocking) { auto result = false; do { result = chan.put(i); @@ -94,7 +97,7 @@ void testSPSCMultiThreaded() { std::thread consumer([&] { for (int i = 1; i <= total_iterations;) { - if constexpr (std::is_same::value) { + if constexpr (get_mode == fastchan::ReturnMode::NonBlocking) { auto&& val = chan.get(); while (!val) { val = chan.get(); @@ -117,43 +120,46 @@ void testSPSCMultiThreaded() { assert(chan.size() == 0); } -template +template void testSPSC() { - testSPSCSingleThreaded_Fill<4096, put_wait_type, get_wait_type>(); - testSPSCSingleThreaded_PutGet<4096, put_wait_type, get_wait_type>(); - testSPSCMultiThreaded<4096, put_wait_type, get_wait_type>(); + testSPSCSingleThreaded_Fill<4096, put_mode, get_mode, put_wait_type, get_wait_type>(); + testSPSCSingleThreaded_PutGet<4096, put_mode, get_mode, put_wait_type, get_wait_type>(); + testSPSCMultiThreaded<4096, put_mode, get_mode, put_wait_type, get_wait_type>(); } int main() { - testSPSC(); - testSPSC(); - testSPSC(); - testSPSC(); - testSPSC(); - - testSPSC(); - testSPSC(); - testSPSC(); - testSPSC(); - testSPSC(); - - testSPSC(); - testSPSC(); - testSPSC(); - testSPSC(); - testSPSC(); - - testSPSC(); - testSPSC(); - testSPSC(); - testSPSC(); - testSPSC(); - - testSPSC(); - testSPSC(); - testSPSC(); - testSPSC(); - testSPSC(); + using namespace fastchan; + + testSPSC(); + testSPSC(); + testSPSC(); + testSPSC(); + testSPSC(); + + testSPSC(); + testSPSC(); + testSPSC(); + testSPSC(); + testSPSC(); + + testSPSC(); + testSPSC(); + testSPSC(); + testSPSC(); + testSPSC(); + + testSPSC(); + testSPSC(); + testSPSC(); + testSPSC(); + testSPSC(); + + testSPSC(); + testSPSC(); + testSPSC(); + testSPSC(); + testSPSC(); return 0; }