diff --git a/cmake/libe3Sources.cmake b/cmake/libe3Sources.cmake index 7a7970a..ec37912 100644 --- a/cmake/libe3Sources.cmake +++ b/cmake/libe3Sources.cmake @@ -9,7 +9,7 @@ set(LIBE3_PUBLIC_HEADERS include/libe3/e3_encoder.hpp include/libe3/mpmc_queue.hpp include/libe3/subscription_manager.hpp - include/libe3/response_queue.hpp + include/libe3/lockfree_queue.hpp include/libe3/sm_interface.hpp include/libe3/e3_interface.hpp include/libe3/e3_agent.hpp @@ -23,7 +23,6 @@ set(LIBE3_SOURCES src/core/e3_agent.cpp src/core/e3_interface.cpp src/core/subscription_manager.cpp - src/core/response_queue.cpp src/core/sm_registry.cpp # Encoder diff --git a/cmake/libe3Tests.cmake b/cmake/libe3Tests.cmake index a091569..03e9d10 100644 --- a/cmake/libe3Tests.cmake +++ b/cmake/libe3Tests.cmake @@ -36,6 +36,11 @@ foreach(test_src IN LISTS LIBE3_TEST_SOURCES) libe3_warnings libe3_sanitizers ) + + if(LIBE3_ENABLE_ZMQ AND TARGET PkgConfig::ZMQ) + target_link_libraries(${target_name} PRIVATE PkgConfig::ZMQ) + endif() + add_test(NAME ${target_name} COMMAND ${target_name}) endforeach() diff --git a/include/libe3/e3_interface.hpp b/include/libe3/e3_interface.hpp index 62706da..af00c61 100644 --- a/include/libe3/e3_interface.hpp +++ b/include/libe3/e3_interface.hpp @@ -16,7 +16,7 @@ #include "e3_connector.hpp" #include "e3_encoder.hpp" #include "subscription_manager.hpp" -#include "response_queue.hpp" +#include "lockfree_queue.hpp" #include "sm_interface.hpp" #include "dapp_subscription_state.hpp" #include @@ -123,8 +123,8 @@ class E3Interface { /** * @brief Get the response queue for outbound messages */ - ResponseQueue& response_queue() noexcept { - return *response_queue_; + LockFreeQueue& response_queue() noexcept { + return *response_queue_; } /** @@ -212,7 +212,11 @@ class E3Interface { std::unique_ptr subscription_manager_; // dApp-only state (nullptr when role==RAN). std::unique_ptr dapp_state_; - std::unique_ptr response_queue_; + std::unique_ptr> response_queue_; + + // dApp-report queue: subscriber/inbound thread hands reports off to the + // report worker thread so downstream (OAI / iApp) work never blocks reads. + std::unique_ptr> report_queue_; // Threads. setup_thread_ runs the setup loop (RAN: serves; dApp: drives once). // inbound_thread_ / outbound_thread_ replace the old subscriber_thread_ / @@ -222,6 +226,7 @@ class E3Interface { std::unique_ptr inbound_thread_; std::unique_ptr outbound_thread_; std::unique_ptr sm_data_thread_; + std::unique_ptr report_worker_thread_; // RAN-side handlers DAppReportHandler dapp_report_handler_; @@ -283,6 +288,13 @@ class E3Interface { */ void sm_data_handler_loop(); + /** + * @brief Report worker thread - drains report_queue_ and invokes + * handle_dapp_report() off the subscriber thread so ZMQ reads + * are never blocked by downstream (OAI / iApp) work. + */ + void report_worker_loop(); + // ========================================================================= // Message Handlers // ========================================================================= diff --git a/include/libe3/lockfree_queue.hpp b/include/libe3/lockfree_queue.hpp new file mode 100644 index 0000000..fd86fcc --- /dev/null +++ b/include/libe3/lockfree_queue.hpp @@ -0,0 +1,210 @@ +/** + * @file lockfree_queue.hpp + * @brief Lock-free bounded blocking queue for the E3AP hot paths + * + * Wraps the lock-free MPMC ring buffer (see mpmc_queue.hpp) in a small, + * reusable object that adds blocking pop variants and shutdown semantics. + * It eliminates mutex contention on the hot publish path and reduces latency + * jitter in sub-millisecond control loops. + * + * libe3 uses two specialisations: + * - LockFreeQueue — outbound E3AP PDUs (publisher/outbound thread + * consumes; many threads produce). + * - LockFreeQueue — inbound dApp reports handed from the RAN + * inbound thread to the report worker thread. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef LIBE3_LOCKFREE_QUEUE_HPP +#define LIBE3_LOCKFREE_QUEUE_HPP + +#include "types.hpp" +#include "mpmc_queue.hpp" +#include "logger.hpp" +#include +#include +#include +#include + +namespace libe3 { + +/** + * @brief Lock-free bounded blocking queue. + * + * Built on an MPMC lock-free ring buffer. Blocking pop variants use an + * adaptive spin-wait strategy: + * 1. Spin with CPU pause hints (lowest latency, nanoseconds) + * 2. Thread yield (cooperative, microseconds) + * 3. Short sleep (50 µs) (idle wait, avoids busy-loop when quiet) + * + * @tparam T Element type. Must be default-constructible and movable. + */ +template +class LockFreeQueue { +public: + /** + * @brief Construct a LockFreeQueue. + * @param capacity Minimum ring buffer capacity (rounded up to next power + * of two, default 128). + */ + explicit LockFreeQueue(size_t capacity = 128) + : ring_(capacity) + { + E3_LOG_DEBUG(LOG_TAG) << "Lock-free queue created, capacity=" + << ring_.capacity(); + } + + /** + * @brief Destructor – signals shutdown so any blocked pop() returns. + */ + ~LockFreeQueue() { + shutdown(); + E3_LOG_DEBUG(LOG_TAG) << "Lock-free queue destroyed"; + } + + // Non-copyable, non-movable + LockFreeQueue(const LockFreeQueue&) = delete; + LockFreeQueue& operator=(const LockFreeQueue&) = delete; + LockFreeQueue(LockFreeQueue&&) = delete; + LockFreeQueue& operator=(LockFreeQueue&&) = delete; + + /** + * @brief Enqueue an item (non-blocking, lock-free). + * @return ErrorCode::SUCCESS on success. + * @return ErrorCode::BUFFER_TOO_SMALL if the ring buffer is full. + * @return ErrorCode::NOT_INITIALIZED if shutdown() has been called. + */ + ErrorCode push(T item) { + if (shutdown_.load(std::memory_order_relaxed)) { + return ErrorCode::NOT_INITIALIZED; + } + + if (!ring_.try_push(std::move(item))) { + E3_LOG_WARN(LOG_TAG) << "Queue full, dropping message"; + return ErrorCode::BUFFER_TOO_SMALL; + } + + E3_LOG_TRACE(LOG_TAG) << "Pushed item"; + return ErrorCode::SUCCESS; + } + + /** + * @brief Dequeue an item, blocking indefinitely until one is available. + * + * Returns a default-constructed T{} if shutdown() is called while waiting. + */ + T pop() { + T item; + + // Phase 1: CPU-pause spin (nanosecond latency when producer is fast) + for (size_t i = 0; i < SPIN_COUNT; ++i) { + if (ring_.try_pop(item)) return item; + if (shutdown_.load(std::memory_order_relaxed)) return T{}; + cpu_relax(); + } + + // Phase 2: Cooperative yield (microsecond range) + for (size_t i = 0; i < YIELD_COUNT; ++i) { + if (ring_.try_pop(item)) return item; + if (shutdown_.load(std::memory_order_relaxed)) return T{}; + std::this_thread::yield(); + } + + // Phase 3: Short sleep until data arrives or shutdown is signalled + while (true) { + if (ring_.try_pop(item)) return item; + if (shutdown_.load(std::memory_order_relaxed)) return T{}; + std::this_thread::sleep_for(SLEEP_DURATION); + } + } + + /** + * @brief Dequeue an item with a maximum wait duration. + * @return The item on success; std::nullopt on timeout or shutdown. + */ + std::optional pop(std::chrono::milliseconds timeout) { + T item; + auto deadline = std::chrono::steady_clock::now() + timeout; + + // Phase 1: CPU-pause spin + for (size_t i = 0; i < SPIN_COUNT; ++i) { + if (ring_.try_pop(item)) return item; + if (shutdown_.load(std::memory_order_relaxed)) return std::nullopt; + cpu_relax(); + } + + // Phase 2: Cooperative yield + for (size_t i = 0; i < YIELD_COUNT; ++i) { + if (ring_.try_pop(item)) return item; + if (shutdown_.load(std::memory_order_relaxed)) return std::nullopt; + std::this_thread::yield(); + } + + // Phase 3: Timed sleep + while (std::chrono::steady_clock::now() < deadline) { + if (ring_.try_pop(item)) return item; + if (shutdown_.load(std::memory_order_relaxed)) return std::nullopt; + std::this_thread::sleep_for(SLEEP_DURATION); + } + + // One last attempt after deadline + if (ring_.try_pop(item)) return item; + return std::nullopt; + } + + /** + * @brief Try to dequeue without blocking. + * @return The item if one was available; std::nullopt otherwise. + */ + std::optional try_pop() { + T item; + if (ring_.try_pop(item)) return item; + return std::nullopt; + } + + /** @brief Return true if the queue appears empty (approximate). */ + bool empty() const { return ring_.empty_approx(); } + + /** @brief Return the approximate number of items in the queue. */ + size_t size() const { return ring_.size_approx(); } + + /** @brief Return the ring buffer capacity (always a power of two). */ + size_t capacity() const noexcept { return ring_.capacity(); } + + /** @brief Discard all items currently in the queue. */ + void clear() { + T item; + while (ring_.try_pop(item)) {} + E3_LOG_DEBUG(LOG_TAG) << "Queue cleared"; + } + + /** + * @brief Signal shutdown so that all blocked pop() calls return promptly. + */ + void shutdown() { + shutdown_.store(true, std::memory_order_relaxed); + E3_LOG_DEBUG(LOG_TAG) << "Queue shutdown signalled"; + } + + /** @brief Return true if shutdown() has been called. */ + bool is_shutdown() const { + return shutdown_.load(std::memory_order_relaxed); + } + +private: + static constexpr const char* LOG_TAG = "Queue"; + + MpmcQueue ring_; + std::atomic shutdown_{false}; + + // Adaptive spin-wait tuning constants + static constexpr size_t SPIN_COUNT = 40; ///< CPU-pause iterations + static constexpr size_t YIELD_COUNT = 100; ///< thread-yield iterations + /// Sleep duration between attempts in the slow path (µs) + static constexpr auto SLEEP_DURATION = std::chrono::microseconds(50); +}; + +} // namespace libe3 + +#endif // LIBE3_LOCKFREE_QUEUE_HPP diff --git a/include/libe3/mpmc_queue.hpp b/include/libe3/mpmc_queue.hpp index ea3adc8..a0e0a89 100644 --- a/include/libe3/mpmc_queue.hpp +++ b/include/libe3/mpmc_queue.hpp @@ -11,7 +11,7 @@ * - Zero locks / zero condition variables in the fast path * - head_ and tail_ are on separate cache lines to eliminate false sharing * - Capacity is rounded up to the nearest power of two for cheap masking - * - Provides try_push / try_pop (non-blocking) used by ResponseQueue + * - Provides try_push / try_pop (non-blocking) used by LockFreeQueue * * SPDX-License-Identifier: Apache-2.0 */ diff --git a/include/libe3/response_queue.hpp b/include/libe3/response_queue.hpp deleted file mode 100644 index 472f55c..0000000 --- a/include/libe3/response_queue.hpp +++ /dev/null @@ -1,119 +0,0 @@ -/** - * @file response_queue.hpp - * @brief Lock-free bounded queue for E3AP outbound PDUs - * - * Replaces the original mutex + condition-variable implementation with a - * lock-free MPMC ring buffer (see mpmc_queue.hpp) to eliminate mutex - * contention on the hot publish path and to reduce latency jitter in - * sub-millisecond control loops. - * - * Producer/consumer pattern in libe3: - * - Producers: subscriber_loop thread, sm_data_handler_loop thread, and - * any caller of E3Interface::queue_outbound() (MPSC / MPMC). - * - Consumer: publisher_loop thread (single consumer). - * - * SPDX-License-Identifier: Apache-2.0 - */ - -#ifndef LIBE3_RESPONSE_QUEUE_HPP -#define LIBE3_RESPONSE_QUEUE_HPP - -#include "types.hpp" -#include "mpmc_queue.hpp" -#include -#include -#include - -namespace libe3 { - -/** - * @brief Lock-free bounded queue for E3AP PDUs. - * - * Provides the same API as the previous mutex-based implementation but uses - * an MPMC lock-free ring buffer internally. Blocking pop variants use an - * adaptive spin-wait strategy: - * 1. Spin with CPU pause hints (lowest latency, nanoseconds) - * 2. Thread yield (cooperative, microseconds) - * 3. Short sleep (50 µs) (idle wait, avoids busy-loop when quiet) - */ -class ResponseQueue { -public: - /** - * @brief Construct a ResponseQueue. - * @param capacity Minimum ring buffer capacity (rounded up to next power - * of two, default 128). - */ - explicit ResponseQueue(size_t capacity = 128); - - /** - * @brief Destructor – signals shutdown so any blocked pop() returns. - */ - ~ResponseQueue(); - - // Non-copyable, non-movable - ResponseQueue(const ResponseQueue&) = delete; - ResponseQueue& operator=(const ResponseQueue&) = delete; - ResponseQueue(ResponseQueue&&) = delete; - ResponseQueue& operator=(ResponseQueue&&) = delete; - - /** - * @brief Enqueue a PDU (non-blocking, lock-free). - * @return ErrorCode::SUCCESS on success. - * @return ErrorCode::BUFFER_TOO_SMALL if the ring buffer is full. - * @return ErrorCode::NOT_INITIALIZED if shutdown() has been called. - */ - ErrorCode push(Pdu pdu); - - /** - * @brief Dequeue a PDU, blocking indefinitely until one is available. - * - * Returns an empty Pdu{} if shutdown() is called while waiting. - */ - Pdu pop(); - - /** - * @brief Dequeue a PDU with a maximum wait duration. - * @return The PDU on success; std::nullopt on timeout or shutdown. - */ - std::optional pop(std::chrono::milliseconds timeout); - - /** - * @brief Try to dequeue without blocking. - * @return The PDU if one was available; std::nullopt otherwise. - */ - std::optional try_pop(); - - /** @brief Return true if the queue appears empty (approximate). */ - bool empty() const; - - /** @brief Return the approximate number of items in the queue. */ - size_t size() const; - - /** @brief Return the ring buffer capacity (always a power of two). */ - size_t capacity() const noexcept { return ring_.capacity(); } - - /** @brief Discard all items currently in the queue. */ - void clear(); - - /** - * @brief Signal shutdown so that all blocked pop() calls return promptly. - */ - void shutdown(); - - /** @brief Return true if shutdown() has been called. */ - bool is_shutdown() const; - -private: - MpmcQueue ring_; - std::atomic shutdown_{false}; - - // Adaptive spin-wait tuning constants - static constexpr size_t SPIN_COUNT = 40; ///< CPU-pause iterations - static constexpr size_t YIELD_COUNT = 100; ///< thread-yield iterations - /// Sleep duration between attempts in the slow path (µs) - static constexpr auto SLEEP_DURATION = std::chrono::microseconds(50); -}; - -} // namespace libe3 - -#endif // LIBE3_RESPONSE_QUEUE_HPP diff --git a/src/connector/posix_connector.cpp b/src/connector/posix_connector.cpp index 04a18a0..c749fc1 100644 --- a/src/connector/posix_connector.cpp +++ b/src/connector/posix_connector.cpp @@ -636,7 +636,9 @@ void PosixE3Connector::dispose() { unlink(setup_endpoint_.c_str()); unlink(inbound_endpoint_.c_str()); unlink(outbound_endpoint_.c_str()); - rmdir(IPC_BASE_DIR); + // Deliberately do NOT rmdir(IPC_BASE_DIR): it is shared across agents + // and tests. Removing it races with another agent binding a socket + // inside it (ENOENT). Leaving the empty directory behind is harmless. } setup_socket_ = -1; diff --git a/src/connector/zmq_connector.cpp b/src/connector/zmq_connector.cpp index fffcd1c..9d93cc2 100644 --- a/src/connector/zmq_connector.cpp +++ b/src/connector/zmq_connector.cpp @@ -86,7 +86,10 @@ ErrorCode ZmqE3Connector::setup_initial_connection() { if (transport_layer_ == E3TransportLayer::IPC) { struct stat st{}; if (stat(IPC_BASE_DIR, &st) == -1) { - if (mkdir(IPC_BASE_DIR, 0777) == -1) { + // EEXIST: another agent/test process created it between the stat + // and the mkdir (the directory existing is exactly what we want). + // The POSIX connector guards the same race the same way. + if (mkdir(IPC_BASE_DIR, 0777) == -1 && errno != EEXIST) { E3_LOG_ERROR(LOG_TAG) << "Failed to create IPC directory: " << strerror(errno); return ErrorCode::CONNECTION_FAILED; } @@ -454,7 +457,11 @@ void ZmqE3Connector::dispose() { path = extract_path(outbound_endpoint_); if (!path.empty()) unlink(path.c_str()); - rmdir(IPC_BASE_DIR); + // Deliberately do NOT rmdir(IPC_BASE_DIR): it is a shared directory + // (multiple agents/tests use it concurrently). Removing it here races + // with another agent that has already stat()'d it and is about to + // bind a socket inside it, making that bind fail with ENOENT. Leaving + // the empty directory behind is harmless. } connected_ = false; diff --git a/src/core/e3_interface.cpp b/src/core/e3_interface.cpp index d3aae34..7f3847e 100644 --- a/src/core/e3_interface.cpp +++ b/src/core/e3_interface.cpp @@ -117,7 +117,10 @@ ErrorCode E3Interface::init() { } // Create response queue - response_queue_ = std::make_unique(); + response_queue_ = std::make_unique>(); + + // Create dApp-report queue (RAN side drains it via the report worker) + report_queue_ = std::make_unique>(1024); // Create connector connector_ = create_connector( @@ -174,6 +177,8 @@ ErrorCode E3Interface::start() { setup_thread_ = std::make_unique(&E3Interface::setup_loop_ran, this); inbound_thread_ = std::make_unique(&E3Interface::inbound_loop_ran, this); outbound_thread_ = std::make_unique(&E3Interface::outbound_loop_ran, this); + // RAN receives dApp reports; drain them off the inbound thread. + report_worker_thread_ = std::make_unique(&E3Interface::report_worker_loop, this); } else { setup_thread_ = std::make_unique(&E3Interface::setup_loop_dapp, this); inbound_thread_ = std::make_unique(&E3Interface::inbound_loop_dapp, this); @@ -200,7 +205,12 @@ void E3Interface::stop() { if (response_queue_) { response_queue_->shutdown(); } - + + // Wake up the report queue so the report worker's blocking pop returns. + if (report_queue_) { + report_queue_->shutdown(); + } + // Interrupt blocking socket operations if (connector_) { connector_->shutdown(); @@ -219,6 +229,9 @@ void E3Interface::stop() { if (sm_data_thread_ && sm_data_thread_->joinable()) { sm_data_thread_->join(); } + if (report_worker_thread_ && report_worker_thread_->joinable()) { + report_worker_thread_->join(); + } // Wake up anyone blocked in wait_for_setup so they don't hang. { @@ -384,7 +397,14 @@ void E3Interface::inbound_loop_ran() { case PduType::DAPP_REPORT: { auto* report = std::get_if(&pdu.choice); if (report) { - handle_dapp_report(*report); + if (report_queue_) { + // Hand off to the report worker so downstream work + // never blocks the inbound read path. The queue logs + // on overflow; surface it here as an error too. + if (report_queue_->push(std::move(*report)) != ErrorCode::SUCCESS) { + E3_LOG_ERROR(LOG_TAG) << "Report queue full — dropping dApp report"; + } + } } break; } @@ -459,6 +479,27 @@ void E3Interface::outbound_loop_ran() { E3_LOG_INFO(LOG_TAG) << "Outbound loop (RAN) stopped"; } +void E3Interface::report_worker_loop() { + apply_thread_config(config_.io_thread_affinity, config_.io_thread_niceness); + E3_LOG_INFO(LOG_TAG) << "Report worker loop started"; + + while (!should_stop_.load()) { + // Blocking pop with the queue's adaptive spin-wait, mirroring + // outbound_loop_ran's use of response_queue_. + auto report_opt = report_queue_->pop(std::chrono::milliseconds(10)); + if (report_opt) { + handle_dapp_report(*report_opt); + } + } + + // Drain anything left after shutdown was signalled. + while (auto report_opt = report_queue_->try_pop()) { + handle_dapp_report(*report_opt); + } + + E3_LOG_INFO(LOG_TAG) << "Report worker loop stopped"; +} + void E3Interface::sm_data_handler_loop() { E3_LOG_INFO(LOG_TAG) << "SM data handler started"; diff --git a/src/core/response_queue.cpp b/src/core/response_queue.cpp deleted file mode 100644 index cf3b9ff..0000000 --- a/src/core/response_queue.cpp +++ /dev/null @@ -1,133 +0,0 @@ -/** - * @file response_queue.cpp - * @brief Lock-free bounded queue for E3AP outbound PDUs - * - * Replaces the original mutex + condition-variable implementation with the - * MPMC lock-free ring buffer from mpmc_queue.hpp. Blocking pop() variants - * use a three-phase adaptive spin-wait to minimise latency while avoiding - * a busy-loop when the queue is idle for longer periods. - * - * SPDX-License-Identifier: Apache-2.0 - */ - -#include "libe3/response_queue.hpp" -#include "libe3/logger.hpp" -#include - -namespace libe3 { - -namespace { -constexpr const char* LOG_TAG = "Queue"; -} // anonymous namespace - -ResponseQueue::ResponseQueue(size_t capacity) - : ring_(capacity) -{ - E3_LOG_DEBUG(LOG_TAG) << "Lock-free response queue created, capacity=" - << ring_.capacity(); -} - -ResponseQueue::~ResponseQueue() { - shutdown(); - E3_LOG_DEBUG(LOG_TAG) << "Response queue destroyed"; -} - -ErrorCode ResponseQueue::push(Pdu pdu) { - if (shutdown_.load(std::memory_order_relaxed)) { - return ErrorCode::NOT_INITIALIZED; - } - - if (!ring_.try_push(std::move(pdu))) { - E3_LOG_WARN(LOG_TAG) << "Queue full, dropping message"; - return ErrorCode::BUFFER_TOO_SMALL; - } - - E3_LOG_TRACE(LOG_TAG) << "Pushed PDU"; - return ErrorCode::SUCCESS; -} - -Pdu ResponseQueue::pop() { - Pdu pdu; - - // Phase 1: CPU-pause spin (nanosecond latency when producer is fast) - for (size_t i = 0; i < SPIN_COUNT; ++i) { - if (ring_.try_pop(pdu)) return pdu; - if (shutdown_.load(std::memory_order_relaxed)) return Pdu{}; - cpu_relax(); - } - - // Phase 2: Cooperative yield (microsecond range) - for (size_t i = 0; i < YIELD_COUNT; ++i) { - if (ring_.try_pop(pdu)) return pdu; - if (shutdown_.load(std::memory_order_relaxed)) return Pdu{}; - std::this_thread::yield(); - } - - // Phase 3: Short sleep until data arrives or shutdown is signalled - while (true) { - if (ring_.try_pop(pdu)) return pdu; - if (shutdown_.load(std::memory_order_relaxed)) return Pdu{}; - std::this_thread::sleep_for(SLEEP_DURATION); - } -} - -std::optional ResponseQueue::pop(std::chrono::milliseconds timeout) { - Pdu pdu; - auto deadline = std::chrono::steady_clock::now() + timeout; - - // Phase 1: CPU-pause spin - for (size_t i = 0; i < SPIN_COUNT; ++i) { - if (ring_.try_pop(pdu)) return pdu; - if (shutdown_.load(std::memory_order_relaxed)) return std::nullopt; - cpu_relax(); - } - - // Phase 2: Cooperative yield - for (size_t i = 0; i < YIELD_COUNT; ++i) { - if (ring_.try_pop(pdu)) return pdu; - if (shutdown_.load(std::memory_order_relaxed)) return std::nullopt; - std::this_thread::yield(); - } - - // Phase 3: Timed sleep - while (std::chrono::steady_clock::now() < deadline) { - if (ring_.try_pop(pdu)) return pdu; - if (shutdown_.load(std::memory_order_relaxed)) return std::nullopt; - std::this_thread::sleep_for(SLEEP_DURATION); - } - - // One last attempt after deadline - if (ring_.try_pop(pdu)) return pdu; - return std::nullopt; -} - -std::optional ResponseQueue::try_pop() { - Pdu pdu; - if (ring_.try_pop(pdu)) return pdu; - return std::nullopt; -} - -bool ResponseQueue::empty() const { - return ring_.empty_approx(); -} - -size_t ResponseQueue::size() const { - return ring_.size_approx(); -} - -void ResponseQueue::clear() { - Pdu pdu; - while (ring_.try_pop(pdu)) {} - E3_LOG_DEBUG(LOG_TAG) << "Queue cleared"; -} - -void ResponseQueue::shutdown() { - shutdown_.store(true, std::memory_order_relaxed); - E3_LOG_DEBUG(LOG_TAG) << "Queue shutdown signalled"; -} - -bool ResponseQueue::is_shutdown() const { - return shutdown_.load(std::memory_order_relaxed); -} - -} // namespace libe3 diff --git a/tests/bench_mpmc_queue.cpp b/tests/bench_mpmc_queue.cpp index 408d811..b307c08 100644 --- a/tests/bench_mpmc_queue.cpp +++ b/tests/bench_mpmc_queue.cpp @@ -1,6 +1,6 @@ /** * @file bench_mpmc_queue.cpp - * @brief Stress test and latency/throughput benchmark for MpmcQueue and ResponseQueue + * @brief Stress test and latency/throughput benchmark for MpmcQueue and LockFreeQueue * * Measures end-to-end push-to-pop latency at different queue capacities, * throughput under SPSC/MPSC/MPMC producer-consumer configurations, and @@ -14,7 +14,7 @@ */ #include "libe3/mpmc_queue.hpp" -#include "libe3/response_queue.hpp" +#include "libe3/lockfree_queue.hpp" #include #include @@ -117,7 +117,7 @@ static MpmcLatResult mpmc_spsc_latency(size_t queue_cap, int n_items) { } // --------------------------------------------------------------------------- -// Section 2 – ResponseQueue end-to-end latency (SPSC) +// Section 2 – LockFreeQueue end-to-end latency (SPSC) // --------------------------------------------------------------------------- struct RQLatResult { @@ -128,13 +128,13 @@ struct RQLatResult { }; /** - * Like the MpmcQueue latency test but uses the full ResponseQueue API. + * Like the MpmcQueue latency test but uses the full LockFreeQueue API. * The push timestamp (nanoseconds) is stored in Pdu::timestamp, which the * Pdu constructor sets to milliseconds; we intentionally override it with * nanoseconds for this measurement. */ static RQLatResult rq_spsc_latency(size_t queue_cap, int n_items) { - ResponseQueue rq(queue_cap); + LockFreeQueue rq(queue_cap); std::vector latencies(static_cast(n_items)); // Warm-up @@ -334,13 +334,13 @@ int main() { } std::cout << "\n"; - // ---- 2. ResponseQueue SPSC latency ------------------------------------- + // ---- 2. LockFreeQueue SPSC latency ------------------------------------- std::vector rlat; rlat.reserve(caps.size()); for (size_t c : caps) rlat.push_back(rq_spsc_latency(c, LAT_ITEMS)); - std::cout << "## ResponseQueue End-to-End Latency (SPSC, " + std::cout << "## LockFreeQueue End-to-End Latency (SPSC, " << LAT_ITEMS << " items per queue size)\n\n"; std::cout << "| Queue Capacity | P50 (ns) | P95 (ns) | P99 (ns) |\n"; std::cout << "|---------------:|---------:|---------:|---------:|\n"; diff --git a/tests/test_e2e_report_path.cpp b/tests/test_e2e_report_path.cpp new file mode 100644 index 0000000..0c4f1e9 --- /dev/null +++ b/tests/test_e2e_report_path.cpp @@ -0,0 +1,336 @@ +/** + * @file test_e2e_report_path.cpp + * @brief End-to-end integration tests for the dApp → libe3 report path. + * + * Drives a real E3Agent and a fake dApp ZMQ peer in the same process to + * exercise the full inbound report pipeline: + * + * fake dApp PUB + * → ZMQ IPC + * → libe3 inbound SUB socket + * → APER decode + * → LockFreeQueue + * → report worker thread + * → handle_dapp_report() + * → user-registered DAppReportHandler + * + * The E3-SETUP handshake is deliberately skipped: libe3's subscriber + * loop dispatches DAPP_REPORT PDUs to the report queue without + * validating that the dApp ID was registered, so the handshake is not + * relevant to the properties being verified here. + * + * Properties verified: + * + * (1) Every PDU sent by the fake dApp is delivered to the registered + * handler exactly once. + * + * (2) FIFO ordering is preserved end-to-end. + * + * (3) The decoded payload bytes match the original payload exactly, + * with no truncation or trailing contamination. + * + * (4) When the producer rate exceeds the handler's processing rate, + * every message still reaches the handler (the inbound pipeline + * does not silently drop messages while the handler is busy). + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "test_framework.hpp" +#include "libe3/libe3.hpp" +#include "libe3/e3_agent.hpp" +#include "libe3/e3_encoder.hpp" +#include "libe3/types.hpp" +#include "libe3/sm_interface.hpp" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include // getpid + +using namespace libe3; +using namespace std::chrono_literals; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +inline int error_to_int(ErrorCode e) { return static_cast(e); } + +/// Counter so each test instance uses distinct IPC namespaces — required +/// when running ctest -j to avoid socket-file collisions. +static std::atomic g_e2e_seq{0}; + +static std::string unique_ipc(const char* tag) { + std::ostringstream oss; + oss << "ipc:///tmp/dapps/e2e_test_" << getpid() << "_" + << g_e2e_seq.fetch_add(1) << "_" << tag; + return oss.str(); +} + +/// Minimal ServiceModel so register_sm() succeeds. +class E2ETestSM : public ServiceModel { +public: + explicit E2ETestSM(uint32_t ran_function_id) : id_(ran_function_id) {} + std::string name() const override { return "E2ETestSM"; } + uint32_t version() const override { return 1; } + uint32_t ran_function_id() const override { return id_; } + std::vector telemetry_ids() const override { return {1}; } + std::vector control_ids() const override { return {10}; } + ErrorCode init() override { return ErrorCode::SUCCESS; } + void destroy() override { running_ = false; } + ErrorCode start() override { running_ = true; return ErrorCode::SUCCESS; } + void stop() override { running_ = false; } + bool is_running() const override { return running_; } + ErrorCode handle_control_action(uint32_t, const DAppControlAction&) override { + return ErrorCode::SUCCESS; + } +private: + uint32_t id_; + bool running_ = false; +}; + +/// Tag a 32-bit sequence number into the first 4 payload bytes; remaining +/// bytes use a distinctive filler so the handler can validate the +/// payload arrived intact. +static std::vector tagged_payload(uint32_t seq, size_t bytes = 70) { + std::vector v(bytes, 0xCD); + if (bytes >= 4) { + v[0] = static_cast(seq); + v[1] = static_cast(seq >> 8); + v[2] = static_cast(seq >> 16); + v[3] = static_cast(seq >> 24); + } + return v; +} + +static uint32_t read_seq(const std::vector& v) { + if (v.size() < 4) return 0xFFFFFFFFu; + return static_cast(v[0]) + | (static_cast(v[1]) << 8) + | (static_cast(v[2]) << 16) + | (static_cast(v[3]) << 24); +} + +/** + * Fake dApp ZMQ PUB peer. Owns a ZMQ context and a single PUB socket + * connected to libe3's inbound SUB endpoint. Encodes and sends + * DAPP_REPORT PDUs through the same APER encoder libe3 uses, so the + * wire format matches what a real dApp would produce. + */ +class FakeDappPub { +public: + FakeDappPub() + : ctx_(zmq_ctx_new()), + encoder_(create_encoder(EncodingFormat::ASN1)) + {} + + ~FakeDappPub() { + if (pub_) zmq_close(pub_); + if (ctx_) zmq_ctx_destroy(ctx_); + } + + bool connect(const std::string& subscriber_endpoint) { + pub_ = zmq_socket(ctx_, ZMQ_PUB); + if (!pub_) { + std::cerr << "[FakeDappPub] zmq_socket(ZMQ_PUB) failed: " + << zmq_strerror(zmq_errno()) << "\n"; + return false; + } + int linger = 0; + zmq_setsockopt(pub_, ZMQ_LINGER, &linger, sizeof(linger)); + if (zmq_connect(pub_, subscriber_endpoint.c_str()) != 0) { + std::cerr << "[FakeDappPub] zmq_connect(\"" << subscriber_endpoint + << "\") failed: " << zmq_strerror(zmq_errno()) << "\n"; + return false; + } + return true; + } + + bool send_report(uint32_t seq, uint32_t dapp_id, uint32_t ran_function_id, + size_t payload_bytes = 70) { + Pdu pdu(PduType::DAPP_REPORT); + pdu.message_id = (seq % 1000u) + 1u; // E3-MessageID is INTEGER (1..1000) + DAppReport rep; + rep.dapp_identifier = dapp_id; + rep.ran_function_identifier = ran_function_id; + rep.report_data = tagged_payload(seq, payload_bytes); + pdu.choice = rep; + + auto enc = encoder_->encode(pdu); + if (!enc.has_value()) { + std::cerr << "[FakeDappPub] encode(DAPP_REPORT) failed at seq=" << seq << "\n"; + return false; + } + return zmq_send(pub_, enc->buffer.data(), enc->buffer.size(), 0) >= 0; + } + +private: + void* ctx_ = nullptr; + void* pub_ = nullptr; + std::unique_ptr encoder_; +}; + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +/** + * Burst N reports into a real E3Agent's inbound SUB socket at full + * encode-and-send speed and assert that all N reach the handler + * exactly once, in FIFO order, with the original payload bytes intact. + */ +TEST(E2EReportPath_burst_AllReportsArriveExactlyOnce) { + constexpr uint32_t N = 1000; + constexpr uint32_t RAN_FUNC_ID = 100; + constexpr uint32_t FAKE_DAPP_ID = 1; + + const std::string setup_ep = unique_ipc("setup"); + const std::string sub_ep = unique_ipc("inbound"); + const std::string pub_ep = unique_ipc("outbound"); + + E3Config cfg; + cfg.link_layer = E3LinkLayer::ZMQ; + cfg.transport_layer = E3TransportLayer::IPC; + cfg.setup_endpoint = setup_ep; + cfg.subscriber_endpoint = sub_ep; + cfg.publisher_endpoint = pub_ep; + cfg.encoding = EncodingFormat::ASN1; + cfg.log_level = 0; + cfg.ran_identifier = "e2e-test"; + + E3Agent agent(std::move(cfg)); + ASSERT_EQ(error_to_int(agent.register_sm(std::make_unique(RAN_FUNC_ID))), + error_to_int(ErrorCode::SUCCESS)); + + std::atomic recv_count{0}; + std::vector seen(N, false); + std::mutex seen_mtx; + std::atomic bad_payload{false}; + std::atomic duplicate{false}; + std::atomic out_of_order{false}; + uint32_t expected_next = 0; + + agent.set_dapp_report_handler([&, N](const DAppReport& r) { + if (r.report_data.size() < 4) { bad_payload.store(true); return; } + for (size_t i = 4; i < r.report_data.size(); ++i) { + if (r.report_data[i] != 0xCD) { bad_payload.store(true); break; } + } + const uint32_t seq = read_seq(r.report_data); + if (seq >= N) { bad_payload.store(true); return; } + + std::lock_guard lk(seen_mtx); + if (seen[seq]) { duplicate.store(true); return; } + seen[seq] = true; + if (seq != expected_next) out_of_order.store(true); + expected_next = seq + 1; + recv_count.fetch_add(1); + }); + + ASSERT_EQ(error_to_int(agent.start()), error_to_int(ErrorCode::SUCCESS)); + ASSERT_TRUE(agent.is_running()); + + // Allow the inbound SUB socket to bind before the fake dApp connects. + std::this_thread::sleep_for(100ms); + + FakeDappPub pub; + ASSERT_TRUE(pub.connect(sub_ep)); + + // ZMQ slow-joiner: SUB needs a moment to register the subscription + // with PUB after connect. This is intrinsic to ZMQ PUB/SUB. + std::this_thread::sleep_for(300ms); + + for (uint32_t i = 0; i < N; ++i) { + ASSERT_TRUE(pub.send_report(i, FAKE_DAPP_ID, RAN_FUNC_ID)); + } + + auto deadline = std::chrono::steady_clock::now() + 30s; + while (recv_count.load() < N + && std::chrono::steady_clock::now() < deadline) { + std::this_thread::sleep_for(20ms); + } + + agent.stop(); + + ASSERT_EQ(recv_count.load(), N); + ASSERT_FALSE(bad_payload.load()); + ASSERT_FALSE(duplicate.load()); + ASSERT_FALSE(out_of_order.load()); +} + +/** + * Producer rate (~20 kHz) exceeds the handler's processing rate + * (~5 kHz, simulated by a 200 µs delay per call). Every message must + * still reach the handler — the rate gap is absorbed by the internal + * dispatch queue rather than dropped at the inbound socket. + */ +TEST(E2EReportPath_slowHandler_doesNotBlockSubscriber) { + constexpr uint32_t N = 200; + constexpr uint32_t RAN_FUNC_ID = 100; // E3-RanFunctionIdentifier has a bounded range + constexpr uint32_t FAKE_DAPP_ID = 1; + constexpr auto HANDLER_DELAY = std::chrono::microseconds(200); + + const std::string setup_ep = unique_ipc("setup"); + const std::string sub_ep = unique_ipc("inbound"); + const std::string pub_ep = unique_ipc("outbound"); + + E3Config cfg; + cfg.link_layer = E3LinkLayer::ZMQ; + cfg.transport_layer = E3TransportLayer::IPC; + cfg.setup_endpoint = setup_ep; + cfg.subscriber_endpoint = sub_ep; + cfg.publisher_endpoint = pub_ep; + cfg.encoding = EncodingFormat::ASN1; + cfg.log_level = 0; + cfg.ran_identifier = "e2e-test"; + + E3Agent agent(std::move(cfg)); + agent.register_sm(std::make_unique(RAN_FUNC_ID)); + + std::atomic recv_count{0}; + agent.set_dapp_report_handler([&](const DAppReport&) { + std::this_thread::sleep_for(HANDLER_DELAY); + recv_count.fetch_add(1); + }); + + ASSERT_EQ(error_to_int(agent.start()), error_to_int(ErrorCode::SUCCESS)); + std::this_thread::sleep_for(100ms); + + FakeDappPub pub; + ASSERT_TRUE(pub.connect(sub_ep)); + std::this_thread::sleep_for(300ms); + + // Producer at ~20 kHz (50 µs/msg) — 4× the handler's processing rate + // (200 µs/msg). The subscriber thread must drain the inbound ZMQ + // socket at I/O speed and the rate gap must be absorbed by the + // internal dispatch queue, not by silently dropping at the socket. + for (uint32_t i = 0; i < N; ++i) { + ASSERT_TRUE(pub.send_report(i, FAKE_DAPP_ID, RAN_FUNC_ID)); + std::this_thread::sleep_for(std::chrono::microseconds(50)); + } + + auto deadline = std::chrono::steady_clock::now() + 10s; + while (recv_count.load() < N + && std::chrono::steady_clock::now() < deadline) { + std::this_thread::sleep_for(10ms); + } + agent.stop(); + + ASSERT_EQ(recv_count.load(), N); +} + +// --------------------------------------------------------------------------- + +int main() { + return RUN_ALL_TESTS(); +} diff --git a/tests/test_response_queue.cpp b/tests/test_lockfree_queue.cpp similarity index 73% rename from tests/test_response_queue.cpp rename to tests/test_lockfree_queue.cpp index 09dd051..2a9391e 100644 --- a/tests/test_response_queue.cpp +++ b/tests/test_lockfree_queue.cpp @@ -1,83 +1,83 @@ /** - * @file test_response_queue.cpp - * @brief Unit tests for ResponseQueue + * @file test_lockfree_queue.cpp + * @brief Unit tests for LockFreeQueue * * SPDX-License-Identifier: Apache-2.0 */ #include "test_framework.hpp" -#include "libe3/response_queue.hpp" +#include "libe3/lockfree_queue.hpp" #include #include #include using namespace libe3; -TEST(ResponseQueue_initial_state) { - ResponseQueue queue(100); +TEST(LockFreeQueue_initial_state) { + LockFreeQueue queue(100); ASSERT_TRUE(queue.empty()); ASSERT_EQ(queue.size(), 0u); } -TEST(ResponseQueue_push_pop) { - ResponseQueue queue(100); - +TEST(LockFreeQueue_push_pop) { + LockFreeQueue queue(100); + Pdu pdu(PduType::INDICATION_MESSAGE); pdu.message_id = 42; - + auto pushed = queue.push(pdu); ASSERT_TRUE(pushed == ErrorCode::SUCCESS); ASSERT_FALSE(queue.empty()); ASSERT_EQ(queue.size(), 1u); - + auto popped = queue.pop(); ASSERT_EQ(popped.message_id, 42u); ASSERT_TRUE(queue.empty()); } -TEST(ResponseQueue_try_pop_empty) { - ResponseQueue queue(100); - +TEST(LockFreeQueue_try_pop_empty) { + LockFreeQueue queue(100); + auto result = queue.try_pop(); ASSERT_FALSE(result.has_value()); } -TEST(ResponseQueue_pop_with_timeout) { - ResponseQueue queue(100); - +TEST(LockFreeQueue_pop_with_timeout) { + LockFreeQueue queue(100); + auto start = std::chrono::steady_clock::now(); auto result = queue.pop(std::chrono::milliseconds(50)); auto end = std::chrono::steady_clock::now(); - + ASSERT_FALSE(result.has_value()); auto elapsed = std::chrono::duration_cast(end - start); ASSERT_GE(elapsed.count(), 40); // Allow some slack } -TEST(ResponseQueue_fifo_order) { - ResponseQueue queue(100); - +TEST(LockFreeQueue_fifo_order) { + LockFreeQueue queue(100); + for (uint32_t i = 0; i < 5; ++i) { Pdu pdu(PduType::INDICATION_MESSAGE); pdu.message_id = i; auto result = queue.push(pdu); (void)result; // ignore nodiscard warning } - + ASSERT_EQ(queue.size(), 5u); - + for (uint32_t i = 0; i < 5; ++i) { auto pdu = queue.pop(); ASSERT_EQ(pdu.message_id, i); } - + ASSERT_TRUE(queue.empty()); } -TEST(ResponseQueue_capacity_limit) { +TEST(LockFreeQueue_capacity_limit) { // The ring buffer rounds capacity up to the next power of two; use 4 // (already a power of two) so the logical capacity is well-defined. - ResponseQueue queue(4); + LockFreeQueue queue(4); for (int i = 0; i < 4; ++i) { Pdu pdu(PduType::INDICATION_MESSAGE); @@ -93,12 +93,12 @@ TEST(ResponseQueue_capacity_limit) { ASSERT_TRUE(pushed == ErrorCode::BUFFER_TOO_SMALL); } -TEST(ResponseQueue_producer_consumer) { - ResponseQueue queue(100); +TEST(LockFreeQueue_producer_consumer) { + LockFreeQueue queue(100); std::atomic produced{0}; std::atomic consumed{0}; std::atomic done{false}; - + // Producer thread std::thread producer([&]() { for (int i = 0; i < 50; ++i) { @@ -110,7 +110,7 @@ TEST(ResponseQueue_producer_consumer) { } done = true; }); - + // Consumer thread std::thread consumer([&]() { while (!done || !queue.empty()) { @@ -120,20 +120,20 @@ TEST(ResponseQueue_producer_consumer) { } } }); - + producer.join(); consumer.join(); - + ASSERT_EQ(produced.load(), 50); ASSERT_EQ(consumed.load(), 50); } -TEST(ResponseQueue_multiple_producers) { - ResponseQueue queue(1000); +TEST(LockFreeQueue_multiple_producers) { + LockFreeQueue queue(1000); const int items_per_producer = 100; const int num_producers = 4; std::atomic total_produced{0}; - + std::vector producers; for (int p = 0; p < num_producers; ++p) { producers.emplace_back([&, p]() { @@ -145,61 +145,83 @@ TEST(ResponseQueue_multiple_producers) { } }); } - + for (auto& t : producers) { t.join(); } - + ASSERT_EQ(total_produced.load(), num_producers * items_per_producer); ASSERT_EQ(queue.size(), static_cast(num_producers * items_per_producer)); } -TEST(ResponseQueue_clear) { - ResponseQueue queue(100); - +TEST(LockFreeQueue_clear) { + LockFreeQueue queue(100); + for (int i = 0; i < 10; ++i) { Pdu pdu(PduType::INDICATION_MESSAGE); (void)queue.push(pdu); } - + ASSERT_EQ(queue.size(), 10u); - + queue.clear(); - + ASSERT_TRUE(queue.empty()); ASSERT_EQ(queue.size(), 0u); } -TEST(ResponseQueue_capacity) { +TEST(LockFreeQueue_capacity) { // capacity() returns the actual ring-buffer size, which is the next // power of two >= the requested capacity. 64 is the next power of two // greater than or equal to 42. - ResponseQueue queue(42); + LockFreeQueue queue(42); ASSERT_EQ(queue.capacity(), 64u); } -TEST(ResponseQueue_blocking_pop) { - ResponseQueue queue(100); +TEST(LockFreeQueue_blocking_pop) { + LockFreeQueue queue(100); std::atomic got_item{false}; - + std::thread consumer([&]() { auto pdu = queue.pop(); // Will block got_item = true; (void)pdu; // Use the result }); - + // Give consumer time to block std::this_thread::sleep_for(std::chrono::milliseconds(50)); ASSERT_FALSE(got_item.load()); - + // Push an item Pdu pdu(PduType::INDICATION_MESSAGE); (void)queue.push(pdu); - + consumer.join(); ASSERT_TRUE(got_item.load()); } +// The same wrapper is reused for the inbound dApp-report path +// (LockFreeQueue); exercise that specialisation too so the +// template stays generic. +TEST(LockFreeQueue_dapp_report_specialisation) { + LockFreeQueue queue(8); + + DAppReport r; + r.dapp_identifier = 7; + r.ran_function_identifier = 3; + r.report_data = {1, 2, 3, 4}; + + ASSERT_TRUE(queue.push(std::move(r)) == ErrorCode::SUCCESS); + ASSERT_EQ(queue.size(), 1u); + + auto popped = queue.pop(std::chrono::milliseconds(50)); + ASSERT_TRUE(popped.has_value()); + ASSERT_EQ(popped->dapp_identifier, 7u); + ASSERT_EQ(popped->ran_function_identifier, 3u); + ASSERT_EQ(popped->report_data.size(), 4u); + ASSERT_TRUE(queue.empty()); +} + int main() { return RUN_ALL_TESTS(); } diff --git a/tests/test_report_drop.cpp b/tests/test_report_drop.cpp new file mode 100644 index 0000000..6ba8c85 --- /dev/null +++ b/tests/test_report_drop.cpp @@ -0,0 +1,224 @@ +/** + * @file test_report_drop.cpp + * @brief Tests for the dApp report queue used in the subscriber → worker + * handoff. + * + * Verifies that the bounded MPMC queue used to dispatch DAppReport + * messages from the subscriber thread to the report worker thread: + * + * (1) Delivers every pushed report to the consumer exactly once, in + * FIFO order, when the workload stays within capacity. + * + * (2) Surfaces overflow as an explicit push() != SUCCESS return value + * rather than silently dropping the message. + * + * (3) Loses no messages under a producer that bursts at full speed + * while the consumer simulates slow per-message work. + * + * (4) Loses no messages and produces no duplicates with multiple + * concurrent producers and a single consumer. + * + * The tests use LockFreeQueue(1024) — the same wrapper type + * and capacity used by E3Interface::report_queue_, so any future change to + * that production value is reflected here. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "test_framework.hpp" +#include "libe3/libe3.hpp" +#include "libe3/lockfree_queue.hpp" +#include "libe3/types.hpp" + +#include +#include +#include +#include +#include + +using namespace libe3; +using namespace std::chrono_literals; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/// Capacity used by E3Interface::report_queue_ (see e3_interface.cpp). +/// Keeping the constant in sync makes the workloads below representative +/// of real production behaviour and signals if the production capacity +/// is ever shrunk to a value the burst tests would overflow. +static constexpr size_t REPORT_QUEUE_CAPACITY = 1024; + +static DAppReport make_report(uint32_t seq, size_t payload_bytes = 70) { + DAppReport r; + r.dapp_identifier = 1; + r.ran_function_identifier = 1; + r.report_data.assign(payload_bytes, static_cast(seq & 0xFF)); + if (payload_bytes >= 4) { + r.report_data[0] = static_cast(seq); + r.report_data[1] = static_cast(seq >> 8); + r.report_data[2] = static_cast(seq >> 16); + r.report_data[3] = static_cast(seq >> 24); + } + return r; +} + +static uint32_t read_seq(const DAppReport& r) { + if (r.report_data.size() < 4) return 0; + return static_cast(r.report_data[0]) + | (static_cast(r.report_data[1]) << 8) + | (static_cast(r.report_data[2]) << 16) + | (static_cast(r.report_data[3]) << 24); +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +/** + * Push N reports (well under capacity), pop N, verify every report is + * delivered exactly once in FIFO order. + */ +TEST(ReportQueue_drainsAllMessagesUnderCapacity) { + LockFreeQueue q(REPORT_QUEUE_CAPACITY); + constexpr uint32_t N = 500; + + for (uint32_t i = 0; i < N; ++i) { + ASSERT_TRUE(q.push(make_report(i)) == ErrorCode::SUCCESS); + } + + for (uint32_t i = 0; i < N; ++i) { + auto out = q.try_pop(); + ASSERT_TRUE(out.has_value()); + ASSERT_EQ(read_seq(*out), i); + } + ASSERT_FALSE(q.try_pop().has_value()); // queue drained +} + +/** + * Filling the queue past capacity surfaces as an explicit `try_push` + * failure (false return), and the count of messages successfully popped + * matches the count of successful pushes. + */ +TEST(ReportQueue_overflowReturnsFalseInsteadOfSilentDrop) { + LockFreeQueue q(REPORT_QUEUE_CAPACITY); + + size_t pushed = 0; + for (size_t i = 0; i < REPORT_QUEUE_CAPACITY * 2; ++i) { + if (q.push(make_report(static_cast(i))) == ErrorCode::SUCCESS) { + ++pushed; + } + } + + // The queue rounds requested capacity up to the next power of two, + // so `pushed` may be ≥ REPORT_QUEUE_CAPACITY. The invariant we + // assert: at least one push attempt failed (returned non-SUCCESS), + // proving that overflow surfaces explicitly. + ASSERT_GE(pushed, REPORT_QUEUE_CAPACITY); + ASSERT_LT(pushed, REPORT_QUEUE_CAPACITY * 2); + + size_t popped = 0; + while (q.try_pop().has_value()) ++popped; + ASSERT_EQ(popped, pushed); +} + +/** + * Concurrent producer/consumer with a slow consumer (~50 µs per message, + * mimicking realistic downstream work). The producer bursts at full + * speed; the queue absorbs it; the consumer drains it. Every produced + * message must be consumed in FIFO order with no loss. + */ +TEST(ReportQueue_burstProducerSlowConsumer_noLoss) { + LockFreeQueue q(REPORT_QUEUE_CAPACITY); + constexpr uint32_t N = 5000; + + std::atomic consumed_count{0}; + std::atomic consumer_done{false}; + std::atomic mismatch{false}; + + std::thread consumer([&]() { + uint32_t expected = 0; + while (consumed_count.load() < N) { + if (auto r = q.try_pop()) { + if (read_seq(*r) != expected) mismatch.store(true); + ++expected; + consumed_count.fetch_add(1); + std::this_thread::sleep_for(std::chrono::microseconds(50)); + } else { + std::this_thread::sleep_for(std::chrono::microseconds(10)); + } + } + consumer_done.store(true); + }); + + for (uint32_t i = 0; i < N; ++i) { + while (q.push(make_report(i)) != ErrorCode::SUCCESS) { + std::this_thread::sleep_for(std::chrono::microseconds(20)); + } + } + + auto deadline = std::chrono::steady_clock::now() + 30s; + while (!consumer_done.load() && std::chrono::steady_clock::now() < deadline) { + std::this_thread::sleep_for(10ms); + } + consumer.join(); + + ASSERT_EQ(consumed_count.load(), N); + ASSERT_FALSE(mismatch.load()); +} + +/** + * Multi-producer single-consumer: 4 producers × 1000 messages each, with + * each producer emitting a distinct seq range. The consumer must see + * every seq exactly once with no duplicates and no loss. + */ +TEST(ReportQueue_multiProducerSingleConsumer_noLossOrDuplicates) { + LockFreeQueue q(REPORT_QUEUE_CAPACITY); + constexpr uint32_t N_PER_PRODUCER = 1000; + constexpr uint32_t N_PRODUCERS = 4; + constexpr uint32_t N_TOTAL = N_PER_PRODUCER * N_PRODUCERS; + + std::vector producers; + for (uint32_t p = 0; p < N_PRODUCERS; ++p) { + producers.emplace_back([&q, p]() { + const uint32_t base = p * N_PER_PRODUCER; + for (uint32_t i = 0; i < N_PER_PRODUCER; ++i) { + while (q.push(make_report(base + i)) != ErrorCode::SUCCESS) { + std::this_thread::sleep_for(std::chrono::microseconds(5)); + } + } + }); + } + + std::vector seen(N_TOTAL, false); + std::atomic count{0}; + std::atomic dup{false}; + + std::thread consumer([&]() { + while (count.load() < N_TOTAL) { + if (auto r = q.try_pop()) { + uint32_t s = read_seq(*r); + if (s >= N_TOTAL || seen[s]) dup.store(true); + else seen[s] = true; + count.fetch_add(1); + } else { + std::this_thread::sleep_for(std::chrono::microseconds(2)); + } + } + }); + + for (auto& t : producers) t.join(); + consumer.join(); + + ASSERT_EQ(count.load(), N_TOTAL); + ASSERT_FALSE(dup.load()); + for (uint32_t i = 0; i < N_TOTAL; ++i) { + ASSERT_TRUE(seen[i]); + } +} + +// --------------------------------------------------------------------------- + +int main() { + return RUN_ALL_TESTS(); +}