Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions bench/simple_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,12 @@ struct alignas(hardware_destructive_interference_size) AlignedData {
operator int() const { return value; }
};

template <typename T, size_t min_size, class PutWaitStrategy = YieldWaitStrategy, class GetWaitStrategy = YieldWaitStrategy>
template <typename T, size_t min_size, class PutWaitStrategy = YieldWaitStrategy, class GetWaitStrategy = YieldWaitStrategy,
ReturnMode put_mode = ReturnMode::Blocking, ReturnMode get_mode = ReturnMode::Blocking>
class RigtorpSPSC {
public:
using put_t = typename std::conditional<!std::is_same<PutWaitStrategy, ReturnImmediateStrategy>::value, void, bool>::type;
using get_t = typename std::conditional<!std::is_same<GetWaitStrategy, ReturnImmediateStrategy>::value, T, std::optional<T>>::type;
using put_t = typename std::conditional<put_mode == fastchan::ReturnMode::Blocking, void, bool>::type;
using get_t = typename std::conditional<get_mode == fastchan::ReturnMode::Blocking, T, std::optional<T>>::type;

RigtorpSPSC() = default;

Expand Down
46 changes: 30 additions & 16 deletions include/mpsc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@
#include <mutex>
#include <optional>
#include <thread>
#include <variant>

#include "common.hpp"
#include "wait_strategy.hpp"

namespace fastchan {

template <typename T, size_t min_size, class PutWaitStrategy = YieldWaitStrategy, class GetWaitStrategy = YieldWaitStrategy>
template <typename T, size_t min_size, class PutWaitStrategy = YieldWaitStrategy, class GetWaitStrategy = YieldWaitStrategy,
ReturnMode put_mode = ReturnMode::Blocking, ReturnMode get_mode = ReturnMode::Blocking>
class MPSC {
public:
using put_t = typename std::conditional<!std::is_same<PutWaitStrategy, ReturnImmediateStrategy>::value, void, bool>::type;
using get_t = typename std::conditional<!std::is_same<GetWaitStrategy, ReturnImmediateStrategy>::value, T, std::optional<T>>::type;
using put_t = typename std::conditional<put_mode == ReturnMode::Blocking, void, bool>::type;
using get_t = typename std::conditional<get_mode == ReturnMode::Blocking, T, std::optional<T>>::type;

MPSC() = default;

Expand All @@ -24,7 +26,7 @@ class MPSC {
while (p.write_index_cache_ > (p.reader_index_cache_ + common_.index_mask_)) {
p.write_index_cache_ = next_free_index_.load(std::memory_order_acquire);
p.reader_index_cache_ = consumer_.reader_index_.load(std::memory_order_relaxed);
if constexpr (std::is_same<PutWaitStrategy, ReturnImmediateStrategy>::value) {
if constexpr (put_mode == ReturnMode::NonBlocking) {
return false;
} else {
common_.put_wait_.wait(
Expand All @@ -36,26 +38,33 @@ class MPSC {

contents_[p.write_index_cache_ & common_.index_mask_] = value;

// commit in the correct order to avoid problems
while (last_committed_index_.load(std::memory_order_relaxed) != p.write_index_cache_) {
// we don't return at this point even in case of ReturnImmediatelyStrategy as we've already taken the token
common_.put_wait_.wait([this] { return last_committed_index_.load(std::memory_order_relaxed) == p.write_index_cache_; });
if constexpr (put_mode == ReturnMode::Blocking) {
auto expected_index = p.write_index_cache_;
common_.put_wait_.wait([this, expected_index] { return last_committed_index_.load(std::memory_order_relaxed) == expected_index; });
}
}

last_committed_index_.store(++p.write_index_cache_, std::memory_order_release);

common_.get_wait_.notify();
common_.put_wait_.notify();
if constexpr (put_mode == ReturnMode::Blocking || get_mode == ReturnMode::Blocking) {
if constexpr (get_mode == ReturnMode::Blocking) {
common_.get_wait_.notify();
}
if constexpr (put_mode == ReturnMode::Blocking) {
common_.put_wait_.notify();
}
}

if constexpr (std::is_same<PutWaitStrategy, ReturnImmediateStrategy>::value) {
if constexpr (put_mode == ReturnMode::NonBlocking) {
return true;
}
}

get_t get() noexcept {
while (consumer_.reader_index_2_ >= consumer_.last_committed_index_cache_) {
consumer_.last_committed_index_cache_ = last_committed_index_.load(std::memory_order_relaxed);
if constexpr (std::is_same<GetWaitStrategy, ReturnImmediateStrategy>::value) {
if constexpr (get_mode == ReturnMode::NonBlocking) {
return std::nullopt;
} else {
common_.get_wait_.wait([this] { return consumer_.reader_index_2_ < last_committed_index_.load(std::memory_order_relaxed); });
Expand All @@ -65,7 +74,11 @@ class MPSC {
auto contents = contents_[consumer_.reader_index_2_ & common_.index_mask_];
consumer_.reader_index_.store(++consumer_.reader_index_2_, std::memory_order_release);

common_.put_wait_.notify();
if constexpr (put_mode == ReturnMode::Blocking || get_mode == ReturnMode::Blocking) {
if constexpr (put_mode == ReturnMode::Blocking) {
common_.put_wait_.notify();
}
}

return contents;
}
Expand All @@ -77,8 +90,6 @@ class MPSC {
bool isEmpty() const noexcept { return consumer_.reader_index_.load(std::memory_order_acquire) >= last_committed_index_.load(std::memory_order_acquire); }

bool isFull() const noexcept {
// this isFull is about whether there's all writer slots to the buffer are taken rather than whether those
// changes have actually been committed
return next_free_index_.load(std::memory_order_acquire) > (consumer_.reader_index_.load(std::memory_order_acquire) + common_.index_mask_);
}

Expand All @@ -89,8 +100,11 @@ class MPSC {
alignas(hardware_destructive_interference_size) std::atomic<std::size_t> last_committed_index_{0};

struct alignas(hardware_destructive_interference_size) Common {
GetWaitStrategy get_wait_{};
PutWaitStrategy put_wait_{};
static_assert(put_mode == ReturnMode::NonBlocking || std::is_base_of_v<WaitStrategyInterface<PutWaitStrategy>, PutWaitStrategy>);
static_assert(get_mode == ReturnMode::NonBlocking || std::is_base_of_v<WaitStrategyInterface<GetWaitStrategy>, GetWaitStrategy>);

typename std::conditional<get_mode == ReturnMode::Blocking, GetWaitStrategy, std::monostate>::type get_wait_{};
typename std::conditional<put_mode == ReturnMode::Blocking, PutWaitStrategy, std::monostate>::type put_wait_{};
const std::size_t index_mask_ = roundUpNextPowerOfTwo(min_size) - 1;
};

Expand Down
37 changes: 24 additions & 13 deletions include/spsc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,55 +6,63 @@
#include <optional>
#include <thread>
#include <type_traits>
#include <variant>

#include "common.hpp"
#include "wait_strategy.hpp"

namespace fastchan {

template <typename T, size_t min_size, class PutWaitStrategy = YieldWaitStrategy, class GetWaitStrategy = YieldWaitStrategy>
template <typename T, size_t min_size, class PutWaitStrategy = YieldWaitStrategy, class GetWaitStrategy = YieldWaitStrategy,
ReturnMode put_mode = ReturnMode::Blocking, ReturnMode get_mode = ReturnMode::Blocking>
class SPSC {
public:
using put_t = typename std::conditional<!std::is_same<PutWaitStrategy, ReturnImmediateStrategy>::value, void, bool>::type;
using get_t = typename std::conditional<!std::is_same<GetWaitStrategy, ReturnImmediateStrategy>::value, T, std::optional<T>>::type;
using put_t = typename std::conditional<put_mode == ReturnMode::Blocking, void, bool>::type;
using get_t = typename std::conditional<get_mode == ReturnMode::Blocking, T, std::optional<T>>::type;

SPSC() = default;

put_t put(const T &value) noexcept {
while (producer_.next_free_index_2_ > (producer_.reader_index_cache_ + common_.index_mask_)) {
producer_.reader_index_cache_ = consumer_.reader_index_.load(std::memory_order_acquire);
if constexpr (std::is_same<PutWaitStrategy, ReturnImmediateStrategy>::value) {
if constexpr (put_mode == ReturnMode::NonBlocking) {
return false;
} else {
common_.put_wait_.wait(
[this] { return producer_.next_free_index_2_ <= (consumer_.reader_index_.load(std::memory_order_acquire) + common_.index_mask_); });
}
}

contents_[producer_.next_free_index_2_ & common_.index_mask_] = value;
producer_.next_free_index_.store(++producer_.next_free_index_2_, std::memory_order_release);

common_.get_wait_.notify();
if constexpr (put_mode == ReturnMode::Blocking || get_mode == ReturnMode::Blocking) {
if constexpr (get_mode == ReturnMode::Blocking) {
common_.get_wait_.notify();
}
}

if constexpr (std::is_same<PutWaitStrategy, ReturnImmediateStrategy>::value) {
if constexpr (put_mode == ReturnMode::NonBlocking) {
return true;
}
}

get_t get() noexcept {
while (consumer_.reader_index_2_ >= consumer_.next_free_index_cache_) {
consumer_.next_free_index_cache_ = producer_.next_free_index_.load(std::memory_order_acquire);
if constexpr (std::is_same<GetWaitStrategy, ReturnImmediateStrategy>::value) {
if constexpr (get_mode == ReturnMode::NonBlocking) {
return std::nullopt;
} else {
common_.get_wait_.wait([this] { return consumer_.reader_index_2_ < producer_.next_free_index_.load(std::memory_order_acquire); });
}
}

auto contents = contents_[consumer_.reader_index_2_ & common_.index_mask_];
consumer_.reader_index_.store(++consumer_.reader_index_2_, std::memory_order_release);

common_.put_wait_.notify();
if constexpr (put_mode == ReturnMode::Blocking || get_mode == ReturnMode::Blocking) {
if constexpr (put_mode == ReturnMode::Blocking) {
common_.put_wait_.notify();
}
}

return contents;
}
Expand All @@ -75,8 +83,11 @@ class SPSC {
std::array<T, roundUpNextPowerOfTwo(min_size)> contents_;

struct alignas(hardware_destructive_interference_size) Common {
GetWaitStrategy get_wait_{};
PutWaitStrategy put_wait_{};
static_assert(put_mode == ReturnMode::NonBlocking || std::is_base_of_v<WaitStrategyInterface<PutWaitStrategy>, PutWaitStrategy>);
static_assert(get_mode == ReturnMode::NonBlocking || std::is_base_of_v<WaitStrategyInterface<GetWaitStrategy>, GetWaitStrategy>);

typename std::conditional<put_mode == ReturnMode::Blocking, PutWaitStrategy, std::monostate>::type put_wait_{};
typename std::conditional<get_mode == ReturnMode::Blocking, GetWaitStrategy, std::monostate>::type get_wait_{};
const std::size_t index_mask_ = roundUpNextPowerOfTwo(min_size) - 1;
};

Expand All @@ -96,5 +107,5 @@ class SPSC {
Producer producer_;
Consumer consumer_;
};
} // namespace fastchan

} // namespace fastchan
18 changes: 7 additions & 11 deletions include/wait_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@
#include <condition_variable>
#include <ratio>
#include <thread>

#include "common.hpp"

#ifndef FASTCHANWAIT_HPP
#define FASTCHANWAIT_HPP

namespace fastchan {

// Return mode enum to distinguish between blocking and non-blocking behavior
enum class ReturnMode {
Blocking,
NonBlocking
};

// WaitStrategyInterface is the interface for actual implementation of a wait strategy handler
template <typename Implementation>
class WaitStrategyInterface {
Expand All @@ -19,13 +24,6 @@ class WaitStrategyInterface {
inline void notify() {}
};

class ReturnImmediateStrategy : public WaitStrategyInterface<ReturnImmediateStrategy> {
public:
template <class Predicate>
inline void wait(Predicate p) {}
inline void notify() {}
};

class NoOpWaitStrategy : public WaitStrategyInterface<NoOpWaitStrategy> {
public:
template <class Predicate>
Expand All @@ -51,16 +49,14 @@ class YieldWaitStrategy : public WaitStrategyInterface<YieldWaitStrategy> {
inline void notify() {}
};

class CVWaitStrategy : public WaitStrategyInterface<PauseWaitStrategy> {
class CVWaitStrategy : public WaitStrategyInterface<CVWaitStrategy> {
public:
template <class Predicate>
inline void wait(Predicate p) {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait_for(lock, std::chrono::nanoseconds(100), p);
}

inline void notify() { cv_.notify_all(); }

private:
std::condition_variable cv_;
std::mutex mutex_;
Expand Down
Loading