Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions cmake/libe3Sources.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ set(LIBE3_PUBLIC_HEADERS
include/libe3/e3_encoder.hpp
include/libe3/mpmc_queue.hpp
include/libe3/subscription_manager.hpp
include/libe3/response_queue.hpp
include/libe3/lockfree_queue.hpp
include/libe3/sm_interface.hpp
include/libe3/e3_interface.hpp
include/libe3/e3_agent.hpp
Expand All @@ -23,7 +23,6 @@ set(LIBE3_SOURCES
src/core/e3_agent.cpp
src/core/e3_interface.cpp
src/core/subscription_manager.cpp
src/core/response_queue.cpp
src/core/sm_registry.cpp

# Encoder
Expand Down
5 changes: 5 additions & 0 deletions cmake/libe3Tests.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ foreach(test_src IN LISTS LIBE3_TEST_SOURCES)
libe3_warnings
libe3_sanitizers
)

if(LIBE3_ENABLE_ZMQ AND TARGET PkgConfig::ZMQ)
target_link_libraries(${target_name} PRIVATE PkgConfig::ZMQ)
endif()

add_test(NAME ${target_name} COMMAND ${target_name})
endforeach()

Expand Down
20 changes: 16 additions & 4 deletions include/libe3/e3_interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#include "e3_connector.hpp"
#include "e3_encoder.hpp"
#include "subscription_manager.hpp"
#include "response_queue.hpp"
#include "lockfree_queue.hpp"
#include "sm_interface.hpp"
#include "dapp_subscription_state.hpp"
#include <memory>
Expand Down Expand Up @@ -123,8 +123,8 @@ class E3Interface {
/**
* @brief Get the response queue for outbound messages
*/
ResponseQueue& response_queue() noexcept {
return *response_queue_;
LockFreeQueue<Pdu>& response_queue() noexcept {
return *response_queue_;
}

/**
Expand Down Expand Up @@ -212,7 +212,11 @@ class E3Interface {
std::unique_ptr<SubscriptionManager> subscription_manager_;
// dApp-only state (nullptr when role==RAN).
std::unique_ptr<DAppSubscriptionState> dapp_state_;
std::unique_ptr<ResponseQueue> response_queue_;
std::unique_ptr<LockFreeQueue<Pdu>> response_queue_;

// dApp-report queue: subscriber/inbound thread hands reports off to the
// report worker thread so downstream (OAI / iApp) work never blocks reads.
std::unique_ptr<LockFreeQueue<DAppReport>> report_queue_;

// Threads. setup_thread_ runs the setup loop (RAN: serves; dApp: drives once).
// inbound_thread_ / outbound_thread_ replace the old subscriber_thread_ /
Expand All @@ -222,6 +226,7 @@ class E3Interface {
std::unique_ptr<std::thread> inbound_thread_;
std::unique_ptr<std::thread> outbound_thread_;
std::unique_ptr<std::thread> sm_data_thread_;
std::unique_ptr<std::thread> report_worker_thread_;

// RAN-side handlers
DAppReportHandler dapp_report_handler_;
Expand Down Expand Up @@ -283,6 +288,13 @@ class E3Interface {
*/
void sm_data_handler_loop();

/**
* @brief Report worker thread - drains report_queue_ and invokes
* handle_dapp_report() off the subscriber thread so ZMQ reads
* are never blocked by downstream (OAI / iApp) work.
*/
void report_worker_loop();

// =========================================================================
// Message Handlers
// =========================================================================
Expand Down
210 changes: 210 additions & 0 deletions include/libe3/lockfree_queue.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
/**
* @file lockfree_queue.hpp
* @brief Lock-free bounded blocking queue for the E3AP hot paths
*
* Wraps the lock-free MPMC ring buffer (see mpmc_queue.hpp) in a small,
* reusable object that adds blocking pop variants and shutdown semantics.
* It eliminates mutex contention on the hot publish path and reduces latency
* jitter in sub-millisecond control loops.
*
* libe3 uses two specialisations:
* - LockFreeQueue<Pdu> — outbound E3AP PDUs (publisher/outbound thread
* consumes; many threads produce).
* - LockFreeQueue<DAppReport> — inbound dApp reports handed from the RAN
* inbound thread to the report worker thread.
*
* SPDX-License-Identifier: Apache-2.0
*/

#ifndef LIBE3_LOCKFREE_QUEUE_HPP
#define LIBE3_LOCKFREE_QUEUE_HPP

#include "types.hpp"
#include "mpmc_queue.hpp"
#include "logger.hpp"
#include <atomic>
#include <optional>
#include <chrono>
#include <thread>

namespace libe3 {

/**
* @brief Lock-free bounded blocking queue.
*
* Built on an MPMC lock-free ring buffer. Blocking pop variants use an
* adaptive spin-wait strategy:
* 1. Spin with CPU pause hints (lowest latency, nanoseconds)
* 2. Thread yield (cooperative, microseconds)
* 3. Short sleep (50 µs) (idle wait, avoids busy-loop when quiet)
*
* @tparam T Element type. Must be default-constructible and movable.
*/
template<typename T>
class LockFreeQueue {
public:
/**
* @brief Construct a LockFreeQueue.
* @param capacity Minimum ring buffer capacity (rounded up to next power
* of two, default 128).
*/
explicit LockFreeQueue(size_t capacity = 128)
: ring_(capacity)
{
E3_LOG_DEBUG(LOG_TAG) << "Lock-free queue created, capacity="
<< ring_.capacity();
}

/**
* @brief Destructor – signals shutdown so any blocked pop() returns.
*/
~LockFreeQueue() {
shutdown();
E3_LOG_DEBUG(LOG_TAG) << "Lock-free queue destroyed";
}

// Non-copyable, non-movable
LockFreeQueue(const LockFreeQueue&) = delete;
LockFreeQueue& operator=(const LockFreeQueue&) = delete;
LockFreeQueue(LockFreeQueue&&) = delete;
LockFreeQueue& operator=(LockFreeQueue&&) = delete;

/**
* @brief Enqueue an item (non-blocking, lock-free).
* @return ErrorCode::SUCCESS on success.
* @return ErrorCode::BUFFER_TOO_SMALL if the ring buffer is full.
* @return ErrorCode::NOT_INITIALIZED if shutdown() has been called.
*/
ErrorCode push(T item) {
if (shutdown_.load(std::memory_order_relaxed)) {
return ErrorCode::NOT_INITIALIZED;
}

if (!ring_.try_push(std::move(item))) {
E3_LOG_WARN(LOG_TAG) << "Queue full, dropping message";
return ErrorCode::BUFFER_TOO_SMALL;
}

E3_LOG_TRACE(LOG_TAG) << "Pushed item";
return ErrorCode::SUCCESS;
}

/**
* @brief Dequeue an item, blocking indefinitely until one is available.
*
* Returns a default-constructed T{} if shutdown() is called while waiting.
*/
T pop() {
T item;

// Phase 1: CPU-pause spin (nanosecond latency when producer is fast)
for (size_t i = 0; i < SPIN_COUNT; ++i) {
if (ring_.try_pop(item)) return item;
if (shutdown_.load(std::memory_order_relaxed)) return T{};
cpu_relax();
}

// Phase 2: Cooperative yield (microsecond range)
for (size_t i = 0; i < YIELD_COUNT; ++i) {
if (ring_.try_pop(item)) return item;
if (shutdown_.load(std::memory_order_relaxed)) return T{};
std::this_thread::yield();
}

// Phase 3: Short sleep until data arrives or shutdown is signalled
while (true) {
if (ring_.try_pop(item)) return item;
if (shutdown_.load(std::memory_order_relaxed)) return T{};
std::this_thread::sleep_for(SLEEP_DURATION);
}
}

/**
* @brief Dequeue an item with a maximum wait duration.
* @return The item on success; std::nullopt on timeout or shutdown.
*/
std::optional<T> pop(std::chrono::milliseconds timeout) {
T item;
auto deadline = std::chrono::steady_clock::now() + timeout;

// Phase 1: CPU-pause spin
for (size_t i = 0; i < SPIN_COUNT; ++i) {
if (ring_.try_pop(item)) return item;
if (shutdown_.load(std::memory_order_relaxed)) return std::nullopt;
cpu_relax();
}

// Phase 2: Cooperative yield
for (size_t i = 0; i < YIELD_COUNT; ++i) {
if (ring_.try_pop(item)) return item;
if (shutdown_.load(std::memory_order_relaxed)) return std::nullopt;
std::this_thread::yield();
}

// Phase 3: Timed sleep
while (std::chrono::steady_clock::now() < deadline) {
if (ring_.try_pop(item)) return item;
if (shutdown_.load(std::memory_order_relaxed)) return std::nullopt;
std::this_thread::sleep_for(SLEEP_DURATION);
}

// One last attempt after deadline
if (ring_.try_pop(item)) return item;
return std::nullopt;
}

/**
* @brief Try to dequeue without blocking.
* @return The item if one was available; std::nullopt otherwise.
*/
std::optional<T> try_pop() {
T item;
if (ring_.try_pop(item)) return item;
return std::nullopt;
}

/** @brief Return true if the queue appears empty (approximate). */
bool empty() const { return ring_.empty_approx(); }

/** @brief Return the approximate number of items in the queue. */
size_t size() const { return ring_.size_approx(); }

/** @brief Return the ring buffer capacity (always a power of two). */
size_t capacity() const noexcept { return ring_.capacity(); }

/** @brief Discard all items currently in the queue. */
void clear() {
T item;
while (ring_.try_pop(item)) {}
E3_LOG_DEBUG(LOG_TAG) << "Queue cleared";
}

/**
* @brief Signal shutdown so that all blocked pop() calls return promptly.
*/
void shutdown() {
shutdown_.store(true, std::memory_order_relaxed);
E3_LOG_DEBUG(LOG_TAG) << "Queue shutdown signalled";
}

/** @brief Return true if shutdown() has been called. */
bool is_shutdown() const {
return shutdown_.load(std::memory_order_relaxed);
}

private:
static constexpr const char* LOG_TAG = "Queue";

MpmcQueue<T> ring_;
std::atomic<bool> shutdown_{false};

// Adaptive spin-wait tuning constants
static constexpr size_t SPIN_COUNT = 40; ///< CPU-pause iterations
static constexpr size_t YIELD_COUNT = 100; ///< thread-yield iterations
/// Sleep duration between attempts in the slow path (µs)
static constexpr auto SLEEP_DURATION = std::chrono::microseconds(50);
};

} // namespace libe3

#endif // LIBE3_LOCKFREE_QUEUE_HPP
2 changes: 1 addition & 1 deletion include/libe3/mpmc_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* - Zero locks / zero condition variables in the fast path
* - head_ and tail_ are on separate cache lines to eliminate false sharing
* - Capacity is rounded up to the nearest power of two for cheap masking
* - Provides try_push / try_pop (non-blocking) used by ResponseQueue
* - Provides try_push / try_pop (non-blocking) used by LockFreeQueue
*
* SPDX-License-Identifier: Apache-2.0
*/
Expand Down
Loading
Loading