Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 31 additions & 41 deletions include/deprecated/dmn-limit-blockingqueue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,89 +62,79 @@ class Dmn_Limit_BlockingQueue : private Dmn_BlockingQueue<T> {
operator=(Dmn_Limit_BlockingQueue<T> &&obj) = delete;

/**
* @brief The method will pop and return front item from the queue or the
* caller is blocked waiting if the queue is empty.
* @brief Pop and return the front item, blocking until one is available.
*
* @return front item of the queue
* @return The front item of the queue.
*/
auto pop() -> T;

/**
* @brief Return true or false if the m_size is zero.
* @brief Return @c true if the queue contains no items.
*
* @return True if m_size is 0 or false otherwise.
* @return @c true when @c m_size is 0, @c false otherwise.
*/
auto empty() -> bool;

/**
* @brief The method will pop and return front item from the queue or the
* std::nullopt if the queue is empty.
* @brief Pop and return the front item without blocking.
*
* @return optional item from the front of the queue
* @return An optional containing the front item, or @c std::nullopt if
* the queue is empty.
*/
auto popNoWait() -> std::optional<T>;

/**
* @brief The method will push the item into queue using move semantics
* unless noexcept is false. The caller is blocked waiting if the
* queue is full.
* @brief Enqueue an rvalue item, preferring move semantics.
*
* @param item The item to be pushed into queue
* Blocks if the queue is at maximum capacity until space becomes available.
*
* @param item The item to enqueue (moved when the move constructor is
* noexcept).
*/
void push(T &&item);

/**
* @brief The method will push the item into queue using move semantics if
* move is true. The caller is blocked waiting if the queue is full.
* @brief Enqueue an lvalue item, optionally using move semantics.
*
* Blocks if the queue is at maximum capacity until space becomes available.
*
* @param item The item to be pushed into queue
* @param move True if use move semantic or false otherwise
* @param item The item to enqueue.
* @param move If @c true, move @p item into the queue; otherwise copy it.
*/
void push(T &item, bool move = true);

/**
* @brief The method returns the number of items held in the queue now.
* @brief Return a snapshot of the current number of items in the queue.
*
* @return The number of items held in the queue now
* @return The number of items currently held in the queue.
*/
auto size() -> size_t;

/**
* @brief The method will put the client on blocking wait until
* the queue is empty, it returns number of items that
* were passed through the queue in total.
* @brief Block until the queue is empty and return the total number of items
* that have passed through it.
*
* @return The number of items that were passed through the queue
* in total
* @return The cumulative number of items that have been pushed and popped.
*/
auto waitForEmpty() -> uint64_t override;

private:
/**
* @brief The method will pop front item from the queue and return it
* or block waiting for item if the queue is empty and wait is
* true.
* @brief Internal pop helper that optionally blocks waiting for an item.
*
* @param wait The caller is blocked waiting for item if queue is empty
* and wait is true, otherwise returning std::nullopt
* @param wait If @c true, block until an item is available; if @c false,
* return @c std::nullopt immediately when the queue is empty.
*
* @return optional value from front item of the queue
* @return An optional containing the front item, or @c std::nullopt.
*/
auto popOptional(bool wait) -> std::optional<T> override;

private:
/**
* data members for constructor to instantiate the object.
*/
size_t m_max_capacity{1};

/**
* data members for internal logic.
*/
size_t m_size{0};
std::mutex m_mutex{};
std::condition_variable m_pop_cond{};
std::condition_variable m_push_cond{};
size_t m_max_capacity{1}; ///< Maximum number of items the queue may hold.
size_t m_size{0}; ///< Current number of items in the queue.
std::mutex m_mutex{}; ///< Protects all access to @c m_size and the underlying storage.
std::condition_variable m_pop_cond{}; ///< Signalled when a new item is available for consumers.
std::condition_variable m_push_cond{}; ///< Signalled when a slot becomes available for producers.
}; // class Dmn_Limit_BlockingQueue

template <typename T>
Expand Down
51 changes: 34 additions & 17 deletions include/dmn-blockingqueue-lf.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,29 +137,42 @@ class Dmn_BlockingQueue_Lf
* sure that we do not have a large number of retired nodes waiting to be
* freed.
*/
static constexpr uint64_t s_epochTimeScale{500};
static constexpr uint64_t s_epochIdScale{2};
static constexpr uint32_t s_epochDataSize{50};

static constexpr uint64_t s_epochTimeScale{
500}; ///< API-call interval between epoch advances.
static constexpr uint64_t s_epochIdScale{
2}; ///< Consecutive epoch IDs mapped to a single bucket.
static constexpr uint32_t s_epochDataSize{
50}; ///< Number of epoch reclamation buckets.

/** @brief Atomically updated epoch metadata (cache-line aligned to avoid
* false sharing). */
struct alignas(16) EpochData {
uint64_t m_in_flight_total{};
uint64_t m_id{};
uint64_t m_in_flight_total{}; ///< Global in-flight API-call count when the
///< current epoch started.
uint64_t m_id{}; ///< Monotonically increasing epoch identifier.
};

/** @brief Internal linked-list node holding a stored item. */
struct Node {
T m_data{};
std::atomic<Node *> m_next{nullptr};
std::atomic<Node *> m_retired_next{nullptr};
T m_data{}; ///< The stored queue element.
std::atomic<Node *> m_next{nullptr}; ///< Next node in the live queue chain.
std::atomic<Node *> m_retired_next{
nullptr}; ///< Next node in the epoch retired list.
};

public:
using Dmn_BlockingQueue<Dmn_BlockingQueue_Lf<T>, T>::isShutdown;
using Dmn_BlockingQueue<Dmn_BlockingQueue_Lf<T>, T>::pop;
using Dmn_BlockingQueue<Dmn_BlockingQueue_Lf<T>, T>::push;

/** @brief Default constructor: initialises the sentinel head/tail node and
* epoch bookkeeping structures. */
Dmn_BlockingQueue_Lf();

/** @brief Construct and populate the queue from an initializer list. */
Dmn_BlockingQueue_Lf(std::initializer_list<T> list);

/** @brief Destroy the queue; triggers shutdown and reclaims all nodes. */
virtual ~Dmn_BlockingQueue_Lf() noexcept;

Dmn_BlockingQueue_Lf(const Dmn_BlockingQueue_Lf<T> &obj) = delete;
Expand Down Expand Up @@ -327,25 +340,29 @@ class Dmn_BlockingQueue_Lf
}

/**
* @brief The method frees a chain of nodes.
* @brief Free all nodes in a forward-linked list via @c m_next pointers.
*
* @param head The head pointer to a chain of nodes.
* @param head Pointer to the first node in the chain to free.
* @return Number of nodes freed.
*/
auto freeNodeList(Node *head) -> uint64_t;

/**
* @brief The method frees a chain of retired nodes.
* @brief Free all nodes in a retired-node list linked via
* @c m_retired_next pointers.
*
* @param head The head pointer to a chain of retired nodes.
* @param head Pointer to the first node in the retired chain to free.
* @return Number of nodes freed.
*/
auto freeRetiredNodeList(Node *head) -> uint64_t;

/**
* @brief The method returns a node into epoch based reclamation
* blocks, and free it later.
* @brief Defer deletion of @p node by placing it on the epoch's retired
* list; the node is freed when the epoch's in-flight count reaches
* zero.
*
* @param epochIndex Index to the epoch block to retire the node.
* @param node Pointer to the node to be free.
* @param epochIndex Index of the epoch bucket to which @p node is retired.
* @param node Pointer to the node to retire.
*/
void retireNode(uint64_t epochIndex, Node *node);

Expand Down
8 changes: 5 additions & 3 deletions include/dmn-blockingqueue-mt.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,12 @@ class Dmn_BlockingQueue_Mt

protected:
/**
* @brief Return true if the queue is stop (m_shutdown_flag is true), or false
* otherwise. The method is delegation of Dmn_Inflight_Guard module.
* @brief Return @c true if the queue has been shut down
* (@c m_shutdown_flag is set), @c false otherwise.
*
* @return true or false that the queue is shutdown.
* Implements the @c Dmn_Inflight_Guard closed-state predicate.
*
* @return @c true when the queue is shut down.
*/
auto isInflightGuardClosed() -> bool override;

Expand Down
2 changes: 1 addition & 1 deletion include/dmn-debug.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright © 2025 Chee Bin HOH. All rights reserved.
* Copyright © 2025 Chee Bin HOH. All rights reserved.
*
* @file include/dmn-debug.hpp
* @brief Lightweight debug-print macro controlled by the preprocessor.
Expand Down
32 changes: 16 additions & 16 deletions include/dmn-dmesg.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,29 +254,29 @@ class Dmn_DMesg : public Dmn_Pub<dmn::DMesgPb> {
Dmn_DMesgHandler &operator=(Dmn_DMesgHandler &&obj) = delete;

/**
* @brief The method returns yes or not if the handler is in conflict
* @brief Check whether this handler is currently in a conflict state.
*
* @param topic the topic to check if it is in conflict, or "" any topic.
* @param topic The topic to check, or an empty string to check any topic.
*
* @return True or False if the handler is in conflict state from last
* written message
* @return @c true if the handler is in conflict for the given topic (or for
* any topic when @p topic is empty), @c false otherwise.
*/
auto isInConflict(std::string_view topic = "") -> bool;

/**
* @brief The method returns running counter of the topic.
* @brief Return the current running counter for the given topic.
*
* @param topic The topic
* @param topic The topic whose running counter is queried.
*
* @return The running counter of the topic
* @return The running counter value for @p topic.
*/
auto getTopicRunningCounter(std::string_view topic) -> uint64_t;

/**
* @brief The method sets running counter of the topic.
* @brief Set the running counter for the given topic.
*
* @param topic The topic
* @param runningCounter The running counter to be set for the topic
* @param topic The topic whose running counter is to be updated.
* @param runningCounter The new counter value to assign.
*/
void setTopicRunningCounter(std::string_view topic,
uint64_t runningCounter);
Expand Down Expand Up @@ -399,19 +399,19 @@ class Dmn_DMesg : public Dmn_Pub<dmn::DMesgPb> {

protected:
/**
* @brief The method returns running counter of the topic.
* @brief Return the running counter for the given topic (internal helper).
*
* @param topic The topic
* @param topic The topic whose running counter is queried.
*
* @return The running counter of the topic
* @return The running counter value for @p topic.
*/
auto getTopicRunningCounterInternal(std::string_view topic) -> uint64_t;

/**
* @brief The method sets running counter of the topic.
* @brief Set the running counter for the given topic (internal helper).
*
* @param topic The topic
* @param runningCounter The running counter to be set for the topic
* @param topic The topic whose running counter is to be updated.
* @param runningCounter The new counter value to assign.
*/
void setTopicRunningCounterInternal(std::string_view topic,
uint64_t runningCounter);
Expand Down
Loading
Loading