From efd3e67e5d967daea15ac76eacd389d7244dd8ee Mon Sep 17 00:00:00 2001 From: Daniel Prudky Date: Mon, 25 May 2026 09:55:08 +0200 Subject: [PATCH 1/9] fix(StatusAggregatorDeviceState): add mutex to protect externalCommandQueue_ - getCommand and addExternalCommand can be called from different threads; the queue was unprotected - added externalCommandMutex_ as a unique_ptr (keeps the type movable) and a lock_guard in both methods --- include/bringauto/structures/StatusAggregatorDeviceState.hpp | 3 +++ source/bringauto/structures/StatusAggregatorDeviceState.cpp | 2 ++ 2 files changed, 5 insertions(+) diff --git a/include/bringauto/structures/StatusAggregatorDeviceState.hpp b/include/bringauto/structures/StatusAggregatorDeviceState.hpp index 13c80d2d..976f5ac0 100644 --- a/include/bringauto/structures/StatusAggregatorDeviceState.hpp +++ b/include/bringauto/structures/StatusAggregatorDeviceState.hpp @@ -5,6 +5,7 @@ #include #include +#include #include #include @@ -88,6 +89,8 @@ class StatusAggregatorDeviceState { modules::Buffer defaultCommand_ {}; + std::unique_ptr externalCommandMutex_ { std::make_unique() }; + std::queue externalCommandQueue_ {}; }; diff --git a/source/bringauto/structures/StatusAggregatorDeviceState.cpp b/source/bringauto/structures/StatusAggregatorDeviceState.cpp index 6da352f9..6412baff 100644 --- a/source/bringauto/structures/StatusAggregatorDeviceState.cpp +++ b/source/bringauto/structures/StatusAggregatorDeviceState.cpp @@ -34,6 +34,7 @@ void StatusAggregatorDeviceState::setDefaultCommand(const modules::Buffer &comma } const modules::Buffer &StatusAggregatorDeviceState::getCommand() { + std::lock_guard lock { *externalCommandMutex_ }; if (!externalCommandQueue_.empty()) { defaultCommand_ = externalCommandQueue_.front(); externalCommandQueue_.pop(); @@ -46,6 +47,7 @@ std::queue &StatusAggregatorDeviceState::aggregatedMessa } int StatusAggregatorDeviceState::addExternalCommand(const modules::Buffer &commandBuffer) { + std::lock_guard lock { *externalCommandMutex_ }; externalCommandQueue_.push(commandBuffer); if (externalCommandQueue_.size() > settings::max_external_commands) { externalCommandQueue_.pop(); From 3c6c27d621977e823692d11a5be286dff0a2f23a Mon Sep 17 00:00:00 2001 From: Daniel Prudky Date: Wed, 27 May 2026 15:01:59 +0200 Subject: [PATCH 2/9] feat(modules): add module-controlled immediate command forwarding - Add optional forward_command_on_receive(device_type) to IModuleManagerLibraryHandler; modules returning OK opt into immediate dispatch, others keep status-triggered behaviour - ModuleManagerLibraryHandlerLocal loads the symbol via checkOptionalFunction (no throw if absent); Async handler always returns NOT_OK - StatusAggregatorDeviceState caches the forwarding flag at device registration; update_command() returns new FORWARD_IMMEDIATELY code when the flag is set - ExternalClient detects FORWARD_IMMEDIATELY and pushes a makeCommandForward event to a dedicated commandForwardingQueue separate from fromInternalQueue - ModuleHandler spawns a second jthread (handleCommandForwards) that drains commandForwardingQueue independently of the status-driven handleMessages loop - StatusAggregator gains a recursive_mutex protecting devices and deviceTimeouts_ against concurrent access from both ModuleHandler threads and ExternalClient --- .../external_client/ExternalClient.hpp | 7 +++- .../modules/IModuleManagerLibraryHandler.hpp | 12 ++++++ include/bringauto/modules/ModuleHandler.hpp | 25 ++++++++++-- .../ModuleManagerLibraryHandlerAsync.hpp | 2 + .../ModuleManagerLibraryHandlerLocal.hpp | 10 +++++ .../bringauto/modules/StatusAggregator.hpp | 14 +++++++ .../structures/InternalClientMessage.hpp | 20 ++++++++++ .../StatusAggregatorDeviceState.hpp | 13 ++++++ main.cpp | 7 ++-- .../external_client/ExternalClient.cpp | 12 ++++-- source/bringauto/modules/ModuleHandler.cpp | 37 +++++++++++++++++ .../ModuleManagerLibraryHandlerAsync.cpp | 4 ++ .../ModuleManagerLibraryHandlerLocal.cpp | 16 ++++++++ source/bringauto/modules/StatusAggregator.cpp | 40 ++++++++++++++++++- .../structures/InternalClientMessage.cpp | 8 ++++ .../StatusAggregatorDeviceState.cpp | 8 ++++ 16 files changed, 221 insertions(+), 14 deletions(-) diff --git a/include/bringauto/external_client/ExternalClient.hpp b/include/bringauto/external_client/ExternalClient.hpp index 24a2fefd..9f8c92f6 100644 --- a/include/bringauto/external_client/ExternalClient.hpp +++ b/include/bringauto/external_client/ExternalClient.hpp @@ -24,7 +24,8 @@ class ExternalClient { ExternalClient(const std::shared_ptr &context, structures::ModuleLibrary &moduleLibrary, - const std::shared_ptr> &toExternalQueue); + const std::shared_ptr> &toExternalQueue, + const std::shared_ptr> &commandForwardingQueue); /** * @brief Initialize connections, error aggregators @@ -95,10 +96,12 @@ class ExternalClient { std::unordered_map> externalConnectionMap_ {}; /// List of external connections, each device can have its own connection or multiple devices can share one connection std::list externalConnectionsList_ {}; - /// Queue for messages from module handler to external client to be sent to external server + /// Queue for messages from module handler to external client to be sent to external server std::shared_ptr> toExternalQueue_; /// Queue for device commands received by external client to module handler std::shared_ptr> fromExternalQueue_ {}; + /// Queue shared with ModuleHandler; used to push command-forward events for immediate dispatch + std::shared_ptr> commandForwardingQueue_ {}; std::shared_ptr> reconnectQueue_ {}; diff --git a/include/bringauto/modules/IModuleManagerLibraryHandler.hpp b/include/bringauto/modules/IModuleManagerLibraryHandler.hpp index efe94309..f3606a6d 100644 --- a/include/bringauto/modules/IModuleManagerLibraryHandler.hpp +++ b/include/bringauto/modules/IModuleManagerLibraryHandler.hpp @@ -2,6 +2,7 @@ #include +#include #include #include @@ -103,6 +104,17 @@ class IModuleManagerLibraryHandler { */ virtual int commandDataValid(const Buffer &command, unsigned int device_type) const = 0; + /** + * @brief Determine whether Module Gateway should forward a received command to the device + * immediately upon receipt, without waiting for the next status message. + * + * Optional — modules that do not implement this default to NOT_OK (existing behaviour). + * + * @param device_type device type + * @return OK to forward immediately, NOT_OK to forward on next status + */ + virtual int forwardCommandOnReceive(unsigned int /*device_type*/) { return NOT_OK; } + /** * @brief Constructs a buffer with the given size * diff --git a/include/bringauto/modules/ModuleHandler.hpp b/include/bringauto/modules/ModuleHandler.hpp index 1ae711d3..e06403ae 100644 --- a/include/bringauto/modules/ModuleHandler.hpp +++ b/include/bringauto/modules/ModuleHandler.hpp @@ -19,11 +19,12 @@ class ModuleHandler { const std::shared_ptr &context, structures::ModuleLibrary &moduleLibrary, const std::shared_ptr > &fromInternalQueue, + const std::shared_ptr > &commandForwardingQueue, const std::shared_ptr > &toInternalQueue, const std::shared_ptr > &toExternalQueue) - : context_ { context }, moduleLibrary_ { moduleLibrary }, fromInternalQueue_ { fromInternalQueue }, - toInternalQueue_ { toInternalQueue }, - toExternalQueue_ { toExternalQueue } {} + : context_ { context }, moduleLibrary_ { moduleLibrary }, + fromInternalQueue_ { fromInternalQueue }, commandForwardingQueue_ { commandForwardingQueue }, + toInternalQueue_ { toInternalQueue }, toExternalQueue_ { toExternalQueue } {} /** * @brief Start Module handler @@ -47,6 +48,12 @@ class ModuleHandler { */ void handleMessages() const; + /** + * @brief Process command-forward events from ExternalClient independently of handleMessages. + * Runs in its own thread started by run(). + */ + void handleCommandForwards() const; + /** * @brief Check if there are any timeouted messages * @@ -91,6 +98,14 @@ class ModuleHandler { */ void handleStatus(const InternalProtocol::DeviceStatus &status) const; + /** + * @brief Forward a pending command to a device immediately, using the cached status. + * Triggered when ExternalClient receives a command for a module with forward_command_on_receive enabled. + * + * @param deviceId device that has a command pending in the external command queue + */ + void handleCommandForward(const structures::DeviceIdentification &deviceId) const; + /** * @brief Throws an error if external queue size is too big */ @@ -99,8 +114,10 @@ class ModuleHandler { std::shared_ptr context_ {}; structures::ModuleLibrary &moduleLibrary_; - /// Queue for incoming messages from internal server to be processed + /// Queue for incoming messages from internal server (connect/status/disconnect) std::shared_ptr > fromInternalQueue_ {}; + /// Queue for command-forward events from external client + std::shared_ptr > commandForwardingQueue_ {}; /// Queue for outgoing messages to internal server to be forwarded to devices std::shared_ptr > toInternalQueue_ {}; /// Queue for outgoing messages to external server to be forwarded to external server diff --git a/include/bringauto/modules/ModuleManagerLibraryHandlerAsync.hpp b/include/bringauto/modules/ModuleManagerLibraryHandlerAsync.hpp index 4f7f92e2..8855ce6f 100644 --- a/include/bringauto/modules/ModuleManagerLibraryHandlerAsync.hpp +++ b/include/bringauto/modules/ModuleManagerLibraryHandlerAsync.hpp @@ -78,6 +78,8 @@ class ModuleManagerLibraryHandlerAsync : public IModuleManagerLibraryHandler { int commandDataValid(const Buffer &command, unsigned int device_type) const override; + int forwardCommandOnReceive(unsigned int device_type) override; + /** * @brief Constructs a buffer with the given size * diff --git a/include/bringauto/modules/ModuleManagerLibraryHandlerLocal.hpp b/include/bringauto/modules/ModuleManagerLibraryHandlerLocal.hpp index eaa24ed4..10220040 100644 --- a/include/bringauto/modules/ModuleManagerLibraryHandlerLocal.hpp +++ b/include/bringauto/modules/ModuleManagerLibraryHandlerLocal.hpp @@ -53,6 +53,8 @@ class ModuleManagerLibraryHandlerLocal : public IModuleManagerLibraryHandler { int commandDataValid(const Buffer &command, unsigned int device_type) const override; + int forwardCommandOnReceive(unsigned int device_type) override; + /** * @brief Constructs a buffer with the given size * @@ -69,6 +71,12 @@ class ModuleManagerLibraryHandlerLocal : public IModuleManagerLibraryHandler { void *checkFunction(const char *functionName) const; + /** + * @brief Look up an optional symbol in the loaded library. + * @return function pointer, or nullptr if the symbol is not exported + */ + void *checkOptionalFunction(const char *functionName) const; + /** * @brief Constructs a buffer with the same raw c buffer as provided * @@ -93,6 +101,8 @@ class ModuleManagerLibraryHandlerLocal : public IModuleManagerLibraryHandler { std::function generateCommand_ {}; std::function allocate_ {}; std::function deallocate_ {}; + /// Optional — nullptr when the module does not export forward_command_on_receive + std::function forwardCommandOnReceive_ {}; }; } \ No newline at end of file diff --git a/include/bringauto/modules/StatusAggregator.hpp b/include/bringauto/modules/StatusAggregator.hpp index 5043df77..fef2474c 100644 --- a/include/bringauto/modules/StatusAggregator.hpp +++ b/include/bringauto/modules/StatusAggregator.hpp @@ -6,6 +6,7 @@ #include #include +#include @@ -115,6 +116,16 @@ class StatusAggregator { int get_command(const Buffer& status, const structures::DeviceIdentification& device, Buffer &command); + /** + * @brief Get a command for immediate forwarding, using the cached status as input. + * Called when a command arrived and the module requested immediate forwarding. + * + * @param device device identification + * @param command output buffer for the generated command + * @return OK on success, DEVICE_NOT_REGISTERED or COMMAND_INVALID on error + */ + int get_command_for_forwarding(const structures::DeviceIdentification& device, Buffer& command); + /** * @short Get number of the module * @@ -198,6 +209,9 @@ class StatusAggregator { */ std::unordered_map deviceTimeouts_ {}; + /// Protects devices and deviceTimeouts_ against concurrent access from ModuleHandler threads and ExternalClient + mutable std::recursive_mutex devicesMutex_ {}; + std::atomic_bool timeoutedMessageReady_ { false }; }; diff --git a/include/bringauto/structures/InternalClientMessage.hpp b/include/bringauto/structures/InternalClientMessage.hpp index 6e58ed5d..9d2f8789 100644 --- a/include/bringauto/structures/InternalClientMessage.hpp +++ b/include/bringauto/structures/InternalClientMessage.hpp @@ -19,6 +19,15 @@ class InternalClientMessage { deviceId_ { deviceId } {} + /** + * @brief Create a command-forward event. Used by ExternalClient to wake up ModuleHandler + * for immediate command dispatch when the module requested forward_command_on_receive. + * + * @param deviceId device that has a pending command to forward + * @return InternalClientMessage tagged as command-forward + */ + static InternalClientMessage makeCommandForward(const DeviceIdentification &deviceId); + explicit InternalClientMessage(bool disconnect, const InternalProtocol::InternalClient &message): message_ { message }, disconnect_ { disconnect } @@ -44,6 +53,11 @@ class InternalClientMessage { */ bool disconnected() const; + /** + * @brief Returns true if this message is a command-forward event (no protobuf payload). + */ + [[nodiscard]] bool isCommandForward() const noexcept; + /** * @brief Get device identification struct * @@ -52,10 +66,16 @@ class InternalClientMessage { const DeviceIdentification &getDeviceId() const; private: + InternalClientMessage(const DeviceIdentification &deviceId, bool commandForward) + : disconnect_ { false }, commandForward_ { commandForward }, deviceId_ { deviceId } + {} + /// Internal client message InternalProtocol::InternalClient message_ {}; /// True if device is disconnected otherwise false bool disconnect_; + /// True if this is a command-forward event (no protobuf payload) + bool commandForward_ { false }; /// Device identification struct DeviceIdentification deviceId_ {}; }; diff --git a/include/bringauto/structures/StatusAggregatorDeviceState.hpp b/include/bringauto/structures/StatusAggregatorDeviceState.hpp index 976f5ac0..c58fc664 100644 --- a/include/bringauto/structures/StatusAggregatorDeviceState.hpp +++ b/include/bringauto/structures/StatusAggregatorDeviceState.hpp @@ -80,6 +80,17 @@ class StatusAggregatorDeviceState { */ int addExternalCommand(const modules::Buffer &commandBuffer); + /** + * @brief Mark this device as requiring immediate command forwarding. + * Called once at device registration based on the module's forwardCommandOnReceive response. + */ + void enableImmediateCommandForwarding() noexcept; + + /** + * @brief Returns true if this device requires commands to be forwarded immediately upon receipt. + */ + [[nodiscard]] bool isForwardCommandImmediately() const noexcept; + private: std::unique_ptr timer_ {}; @@ -92,6 +103,8 @@ class StatusAggregatorDeviceState { std::unique_ptr externalCommandMutex_ { std::make_unique() }; std::queue externalCommandQueue_ {}; + + bool forwardCommandImmediately_ { false }; }; } diff --git a/main.cpp b/main.cpp index 78c6c70e..72a2db83 100644 --- a/main.cpp +++ b/main.cpp @@ -77,12 +77,13 @@ int main(int argc, char **argv) { auto toInternalQueue = std::make_shared>(); auto fromInternalQueue = std::make_shared>(); + auto commandForwardingQueue = std::make_shared>(); auto toExternalQueue = std::make_shared>(); bais::InternalServer internalServer { context, fromInternalQueue, toInternalQueue }; - bringauto::modules::ModuleHandler moduleHandler { context, moduleLibrary, fromInternalQueue, toInternalQueue, - toExternalQueue }; - bringauto::external_client::ExternalClient externalClient { context, moduleLibrary, toExternalQueue }; + bringauto::modules::ModuleHandler moduleHandler { context, moduleLibrary, fromInternalQueue, + commandForwardingQueue, toInternalQueue, toExternalQueue }; + bringauto::external_client::ExternalClient externalClient { context, moduleLibrary, toExternalQueue, commandForwardingQueue }; std::jthread moduleHandlerThread([&moduleHandler]() { moduleHandler.run(); }); std::jthread externalClientThread([&externalClient]() { externalClient.run(); }); diff --git a/source/bringauto/external_client/ExternalClient.cpp b/source/bringauto/external_client/ExternalClient.cpp index f966898d..460da5fb 100644 --- a/source/bringauto/external_client/ExternalClient.cpp +++ b/source/bringauto/external_client/ExternalClient.cpp @@ -8,6 +8,7 @@ #include #include +#include #include @@ -20,8 +21,10 @@ namespace ip = InternalProtocol; ExternalClient::ExternalClient(const std::shared_ptr &context, structures::ModuleLibrary &moduleLibrary, - const std::shared_ptr> &toExternalQueue): + const std::shared_ptr> &toExternalQueue, + const std::shared_ptr> &commandForwardingQueue): toExternalQueue_ { toExternalQueue }, + commandForwardingQueue_ { commandForwardingQueue }, context_ { context }, moduleLibrary_ { moduleLibrary }, timer_ { context->ioContext } { @@ -65,8 +68,11 @@ void ExternalClient::handleCommand(const InternalProtocol::DeviceCommand &device const auto deviceId = structures::DeviceIdentification(device); - int ret = it->second->update_command(commandBuffer, deviceId); - if (ret == OK) { + const int ret = it->second->update_command(commandBuffer, deviceId); + if (ret == FORWARD_IMMEDIATELY) { + settings::Logger::logInfo("Command for device {} was added to queue, forwarding immediately", device.devicename()); + commandForwardingQueue_->pushAndNotify(structures::InternalClientMessage::makeCommandForward(deviceId)); + } else if (ret == OK) { settings::Logger::logInfo("Command for device {} was added to queue", device.devicename()); } } diff --git a/source/bringauto/modules/ModuleHandler.cpp b/source/bringauto/modules/ModuleHandler.cpp index a05e34a9..4eeb108f 100644 --- a/source/bringauto/modules/ModuleHandler.cpp +++ b/source/bringauto/modules/ModuleHandler.cpp @@ -5,6 +5,8 @@ #include +#include + namespace bringauto::modules { @@ -15,12 +17,16 @@ void ModuleHandler::destroy() const { while(not fromInternalQueue_->empty()) { fromInternalQueue_->pop(); } + while(not commandForwardingQueue_->empty()) { + commandForwardingQueue_->pop(); + } settings::Logger::logInfo("Module handler stopped"); } void ModuleHandler::run() const { settings::Logger::logInfo("Module handler started, constants used: queue_timeout_length: {}, status_aggregation_timeout: {}", settings::queue_timeout_length.count(), settings::status_aggregation_timeout.count()); + std::jthread forwardThread([this]() { handleCommandForwards(); }); handleMessages(); } @@ -44,6 +50,16 @@ void ModuleHandler::handleMessages() const { } } +void ModuleHandler::handleCommandForwards() const { + while(not context_->ioContext.stopped()) { + if(commandForwardingQueue_->waitForValueWithTimeout(settings::queue_timeout_length)) { + continue; + } + handleCommandForward(commandForwardingQueue_->front().getDeviceId()); + commandForwardingQueue_->pop(); + } +} + void ModuleHandler::checkTimeoutedMessages() const { for (const auto& [key, statusAggregator] : moduleLibrary_.statusAggregators) { if(statusAggregator->getTimeoutedMessageReady()){ @@ -156,6 +172,27 @@ ModuleHandler::sendConnectResponse(const ip::Device &device, ip::DeviceConnectRe settings::Logger::logInfo("New device {} is trying to connect, sending response {}", device.devicename(), static_cast(response_type)); } +void ModuleHandler::handleCommandForward(const structures::DeviceIdentification &deviceId) const { + const auto &moduleNumber = deviceId.getModule(); + const auto &statusAggregators = moduleLibrary_.statusAggregators; + if(not statusAggregators.contains(moduleNumber)) { + settings::Logger::logWarning("Module number: {} is not supported in handleCommandForward", moduleNumber); + return; + } + + const auto &statusAggregator = statusAggregators.at(moduleNumber); + Buffer commandBuffer {}; + if(statusAggregator->get_command_for_forwarding(deviceId, commandBuffer) != OK) { + settings::Logger::logWarning("get_command_for_forwarding failed for device: {}", deviceId.getDeviceName()); + return; + } + + const auto device = deviceId.convertToIPDevice(); + const auto deviceCommandMessage = common_utils::ProtobufUtils::createInternalServerCommandMessage(device, commandBuffer); + toInternalQueue_->pushAndNotify(structures::ModuleHandlerMessage(false, deviceCommandMessage)); + settings::Logger::logDebug("Module handler forwarded command immediately for device: {}", deviceId.getDeviceName()); +} + void ModuleHandler::handleStatus(const ip::DeviceStatus &status) const { const auto &device = status.device(); const auto &moduleNumber = device.module(); diff --git a/source/bringauto/modules/ModuleManagerLibraryHandlerAsync.cpp b/source/bringauto/modules/ModuleManagerLibraryHandlerAsync.cpp index 70c1ec3e..9acb81c3 100644 --- a/source/bringauto/modules/ModuleManagerLibraryHandlerAsync.cpp +++ b/source/bringauto/modules/ModuleManagerLibraryHandlerAsync.cpp @@ -210,6 +210,10 @@ int ModuleManagerLibraryHandlerAsync::commandDataValid(const Buffer &command, un return aeronClient_.callFunc(fp_async::commandDataValidAsync, command_raw_buffer, device_type).value_or(NOT_OK); } +int ModuleManagerLibraryHandlerAsync::forwardCommandOnReceive(unsigned int /*device_type*/) { + return NOT_OK; +} + Buffer ModuleManagerLibraryHandlerAsync::constructBuffer(std::size_t size) { if(size == 0) { return Buffer {}; diff --git a/source/bringauto/modules/ModuleManagerLibraryHandlerLocal.cpp b/source/bringauto/modules/ModuleManagerLibraryHandlerLocal.cpp index 2a15a7e8..37007bac 100644 --- a/source/bringauto/modules/ModuleManagerLibraryHandlerLocal.cpp +++ b/source/bringauto/modules/ModuleManagerLibraryHandlerLocal.cpp @@ -57,6 +57,11 @@ void ModuleManagerLibraryHandlerLocal::loadLibrary(const std::filesystem::path & "allocate")); deallocate_ = reinterpret_cast::fncptr>(checkFunction( "deallocate")); + forwardCommandOnReceive_ = reinterpret_cast::fncptr>( + checkOptionalFunction("forward_command_on_receive")); + if(forwardCommandOnReceive_) { + log::logDebug("Library " + path.string() + " supports forward_command_on_receive"); + } log::logDebug("Library " + path.string() + " was successfully loaded"); } @@ -68,6 +73,10 @@ void *ModuleManagerLibraryHandlerLocal::checkFunction(const char *functionName) return function; } +void *ModuleManagerLibraryHandlerLocal::checkOptionalFunction(const char *functionName) const { + return dlsym(module_, functionName); +} + int ModuleManagerLibraryHandlerLocal::getModuleNumber() const { return getModuleNumber_(); } @@ -196,6 +205,13 @@ int ModuleManagerLibraryHandlerLocal::commandDataValid(const Buffer &command, un return commandDataValid_(raw_buffer, device_type); } +int ModuleManagerLibraryHandlerLocal::forwardCommandOnReceive(unsigned int device_type) { + if(!forwardCommandOnReceive_) { + return NOT_OK; + } + return forwardCommandOnReceive_(device_type); +} + int ModuleManagerLibraryHandlerLocal::allocate(struct buffer *buffer_pointer, size_t size_in_bytes) const { return allocate_(buffer_pointer, size_in_bytes); } diff --git a/source/bringauto/modules/StatusAggregator.cpp b/source/bringauto/modules/StatusAggregator.cpp index 4c56e939..bb320d2e 100644 --- a/source/bringauto/modules/StatusAggregator.cpp +++ b/source/bringauto/modules/StatusAggregator.cpp @@ -10,6 +10,7 @@ namespace bringauto::modules { using log = settings::Logger; int StatusAggregator::clear_device(const structures::DeviceIdentification &device) { + std::lock_guard lock(devicesMutex_); if(is_device_valid(device) == NOT_OK) { return DEVICE_NOT_REGISTERED; } @@ -59,6 +60,7 @@ int StatusAggregator::destroy_status_aggregator() { } int StatusAggregator::clear_all_devices() { + std::lock_guard lock(devicesMutex_); for(auto &[key, device]: devices) { clear_device(key); } @@ -66,18 +68,22 @@ int StatusAggregator::clear_all_devices() { } int StatusAggregator::remove_device(const structures::DeviceIdentification& device) { + std::lock_guard lock(devicesMutex_); if(is_device_valid(device) == NOT_OK) { return DEVICE_NOT_REGISTERED; } clear_device(device); - // WUT, maybe becose there can be multiple context running so by this we ensure no rece condition? - boost::asio::post(context_->ioContext, [this, device]() { devices.erase(device); }); + boost::asio::post(context_->ioContext, [this, device]() { + std::lock_guard postLock(devicesMutex_); + devices.erase(device); + }); return OK; } int StatusAggregator::add_status_to_aggregator(const Buffer& status, const structures::DeviceIdentification& device) { + std::lock_guard lock(devicesMutex_); const auto &device_type = device.getDeviceType(); if(is_device_type_supported(device_type) == NOT_OK) { log::logError("Trying to add status to unsupported device type: {}", device_type); @@ -101,6 +107,13 @@ int StatusAggregator::add_status_to_aggregator(const Buffer& status, devices.emplace( device, structures::StatusAggregatorDeviceState(context_, timeouted_force_aggregation, device, commandBuffer, status)); + const int forwardOnReceive = module_->forwardCommandOnReceive(device_type); + log::logInfo("forwardCommandOnReceive for device {} (type={}): rc={}", device.convertToString(), device_type, forwardOnReceive); + if(forwardOnReceive == OK) { + devices.at(device).enableImmediateCommandForwarding(); + log::logInfo("Immediate command forwarding ENABLED for device {}", device.convertToString()); + } + force_aggregation_on_device(device); return 1; } @@ -119,6 +132,7 @@ int StatusAggregator::add_status_to_aggregator(const Buffer& status, int StatusAggregator::get_aggregated_status(Buffer &generated_status, const structures::DeviceIdentification& device) { + std::lock_guard lock(devicesMutex_); if(is_device_valid(device) == NOT_OK) { log::logError("Trying to get aggregated status from unregistered device"); return DEVICE_NOT_REGISTERED; @@ -135,6 +149,7 @@ int StatusAggregator::get_aggregated_status(Buffer &generated_status, } int StatusAggregator::get_unique_devices(std::list &unique_devices_list) { + std::lock_guard lock(devicesMutex_); const auto devicesSize = devices.size(); if (devicesSize == 0) { return 0; @@ -148,6 +163,7 @@ int StatusAggregator::get_unique_devices(std::listgetModuleNumber(); } int StatusAggregator::update_command(const Buffer& command, const structures::DeviceIdentification& device) { + std::lock_guard lock(devicesMutex_); const auto &device_type = device.getDeviceType(); if(is_device_type_supported(device_type) == NOT_OK) { log::logError("Device type {} is not supported", device_type); @@ -189,11 +207,18 @@ int StatusAggregator::update_command(const Buffer& command, const structures::De if (devices.at(device).addExternalCommand(command) == NOT_OK) { log::logError("External command queue is full for device: {} deleting oldest command", device.convertToString()); } + + const bool forwarding = devices.at(device).isForwardCommandImmediately(); + log::logDebug("update_command for device {}: isForwardCommandImmediately={}", device.convertToString(), forwarding); + if(forwarding) { + return FORWARD_IMMEDIATELY; + } return OK; } int StatusAggregator::get_command(const Buffer& status, const structures::DeviceIdentification& device, Buffer& command) { + std::lock_guard lock(devicesMutex_); const auto &device_type = device.getDeviceType(); if(is_device_type_supported(device_type) == NOT_OK) { log::logError("Device type {} is not supported", device_type); @@ -210,6 +235,16 @@ int StatusAggregator::get_command(const Buffer& status, const structures::Device return OK; } +int StatusAggregator::get_command_for_forwarding(const structures::DeviceIdentification &device, Buffer &command) { + std::lock_guard lock(devicesMutex_); + if(is_device_valid(device) == NOT_OK) { + log::logError("Trying to get forwarding command for unregistered device: {}", device.convertToString()); + return DEVICE_NOT_REGISTERED; + } + const auto &cachedStatus = devices.at(device).getStatus(); + return get_command(cachedStatus, device, command); +} + int StatusAggregator::is_device_type_supported(unsigned int device_type) { return module_->isDeviceTypeSupported(device_type); } @@ -223,6 +258,7 @@ bool StatusAggregator::getTimeoutedMessageReady() const { } int StatusAggregator::getDeviceTimeoutCount(const structures::DeviceIdentification &device) const { + std::lock_guard lock(devicesMutex_); if(const auto it = deviceTimeouts_.find(device); it != deviceTimeouts_.end()) { return it->second; } diff --git a/source/bringauto/structures/InternalClientMessage.cpp b/source/bringauto/structures/InternalClientMessage.cpp index 4c905687..d81f69d0 100644 --- a/source/bringauto/structures/InternalClientMessage.cpp +++ b/source/bringauto/structures/InternalClientMessage.cpp @@ -12,8 +12,16 @@ bool InternalClientMessage::disconnected() const { return disconnect_; } +bool InternalClientMessage::isCommandForward() const noexcept { + return commandForward_; +} + const DeviceIdentification &InternalClientMessage::getDeviceId() const { return deviceId_; } +InternalClientMessage InternalClientMessage::makeCommandForward(const DeviceIdentification &deviceId) { + return InternalClientMessage { deviceId, true }; +} + } diff --git a/source/bringauto/structures/StatusAggregatorDeviceState.cpp b/source/bringauto/structures/StatusAggregatorDeviceState.cpp index 6412baff..16d35a85 100644 --- a/source/bringauto/structures/StatusAggregatorDeviceState.cpp +++ b/source/bringauto/structures/StatusAggregatorDeviceState.cpp @@ -56,4 +56,12 @@ int StatusAggregatorDeviceState::addExternalCommand(const modules::Buffer &comma return OK; } +void StatusAggregatorDeviceState::enableImmediateCommandForwarding() noexcept { + forwardCommandImmediately_ = true; +} + +bool StatusAggregatorDeviceState::isForwardCommandImmediately() const noexcept { + return forwardCommandImmediately_; +} + } From d4e3de0f0f30db847c066cfd87d6f7e9270058bd Mon Sep 17 00:00:00 2001 From: Daniel Prudky Date: Wed, 27 May 2026 15:22:47 +0200 Subject: [PATCH 3/9] refactor(external_client): move forward-on-receive decision to ExternalClient - Remove FORWARD_IMMEDIATELY return code from StatusAggregator::update_command; the forwarding flag no longer leaks module policy into the aggregator layer - ExternalClient::handleCommand now calls forwardCommandOnReceive() directly after a successful update_command to decide whether to push to commandForwardingQueue, keeping the dispatch logic co-located with the queue it writes to --- source/bringauto/external_client/ExternalClient.cpp | 12 +++++++----- source/bringauto/modules/StatusAggregator.cpp | 5 ----- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/source/bringauto/external_client/ExternalClient.cpp b/source/bringauto/external_client/ExternalClient.cpp index 460da5fb..720b3cd8 100644 --- a/source/bringauto/external_client/ExternalClient.cpp +++ b/source/bringauto/external_client/ExternalClient.cpp @@ -69,11 +69,13 @@ void ExternalClient::handleCommand(const InternalProtocol::DeviceCommand &device const auto deviceId = structures::DeviceIdentification(device); const int ret = it->second->update_command(commandBuffer, deviceId); - if (ret == FORWARD_IMMEDIATELY) { - settings::Logger::logInfo("Command for device {} was added to queue, forwarding immediately", device.devicename()); - commandForwardingQueue_->pushAndNotify(structures::InternalClientMessage::makeCommandForward(deviceId)); - } else if (ret == OK) { - settings::Logger::logInfo("Command for device {} was added to queue", device.devicename()); + if (ret == OK) { + if (moduleLibraryHandler->forwardCommandOnReceive(deviceId.getDeviceType()) == OK) { + settings::Logger::logInfo("Command for device {} was added to queue, forwarding immediately", device.devicename()); + commandForwardingQueue_->pushAndNotify(structures::InternalClientMessage::makeCommandForward(deviceId)); + } else { + settings::Logger::logInfo("Command for device {} was added to queue", device.devicename()); + } } } diff --git a/source/bringauto/modules/StatusAggregator.cpp b/source/bringauto/modules/StatusAggregator.cpp index bb320d2e..e5996d7a 100644 --- a/source/bringauto/modules/StatusAggregator.cpp +++ b/source/bringauto/modules/StatusAggregator.cpp @@ -208,11 +208,6 @@ int StatusAggregator::update_command(const Buffer& command, const structures::De log::logError("External command queue is full for device: {} deleting oldest command", device.convertToString()); } - const bool forwarding = devices.at(device).isForwardCommandImmediately(); - log::logDebug("update_command for device {}: isForwardCommandImmediately={}", device.convertToString(), forwarding); - if(forwarding) { - return FORWARD_IMMEDIATELY; - } return OK; } From 758805cae0b1a7beb96229afec4a53cd7c899173 Mon Sep 17 00:00:00 2001 From: Daniel Prudky Date: Wed, 27 May 2026 15:33:13 +0200 Subject: [PATCH 4/9] fix(internal_server): correct buffer bounds check in processBufferData - Previous condition bytesTransferred < bufferOffset was logically inverted; replace with bufferOffset + bytesTransferred > buffer.size() to catch actual overflows - Fix recursive call offset: pass bufferOffset + (bytesTransferred - bytesLeft) instead of bytesTransferred - bytesLeft --- source/bringauto/internal_server/InternalServer.cpp | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/source/bringauto/internal_server/InternalServer.cpp b/source/bringauto/internal_server/InternalServer.cpp index 4f17722c..ed065e72 100644 --- a/source/bringauto/internal_server/InternalServer.cpp +++ b/source/bringauto/internal_server/InternalServer.cpp @@ -102,12 +102,11 @@ void InternalServer::asyncReceiveHandler( bool InternalServer::processBufferData( const std::shared_ptr &connection, std::size_t bytesTransferred, std::size_t bufferOffset) { - if(bytesTransferred < bufferOffset) { + if(bufferOffset + bytesTransferred > connection->connContext.buffer.size()) { log::logError( - "Error in processBufferData(...): bufferOffset: {} is greater than bytesTransferred: {}, " - "Invalid bufferOffset: {} received from Internal Client, " - "connection's ip address is {}", bufferOffset, bytesTransferred, bufferOffset, - connection->remoteEndpointAddress()); + "Error in processBufferData(...): bufferOffset: {} + bytesTransferred: {} exceeds buffer size: {}, " + "connection's ip address is {}", bufferOffset, bytesTransferred, + connection->connContext.buffer.size(), connection->remoteEndpointAddress()); return false; } @@ -170,7 +169,7 @@ bool InternalServer::processBufferData( connection->remoteEndpointAddress()); return false; } - if(bytesLeft && !processBufferData(connection, bytesLeft, bytesTransferred - bytesLeft)) { + if(bytesLeft && !processBufferData(connection, bytesLeft, bufferOffset + (bytesTransferred - bytesLeft))) { log::logError("Error in processBufferData(...): " "Received extra invalid bytes of data: {} from Internal Client, " "connection's ip address is {}", bytesLeft, From 78e078a0405f59daa8a2e05ed1aa1cb5a3c38029 Mon Sep 17 00:00:00 2001 From: Daniel Prudky Date: Mon, 1 Jun 2026 10:53:11 +0200 Subject: [PATCH 5/9] refactor(modules): fix push-only command handling and rename getCommand to consumeCommand MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - rename getCommand() → consumeCommand() to signal side-effectful dequeue semantics, distinguishing it from pure const getters like getStatus() - return std::nullopt from consumeCommand() for push-only devices with empty external command queue instead of falling through to default command - handle NO_MESSAGE_AVAILABLE in handleCommandForward: log at debug level when command was already consumed by the status path, avoiding a spurious warning - send empty DeviceCommand in handleStatus when push-only device has no pending command, satisfying the InternalProtocol 250 ms response requirement without forwarding to the vehicle - use *currentCommand instead of .value() after explicit has_value() guard in get_command - remove redundant #include from StatusAggregatorDeviceState.cpp --- .../structures/StatusAggregatorDeviceState.hpp | 16 ++++++++++------ source/bringauto/modules/ModuleHandler.cpp | 17 ++++++++++++++++- source/bringauto/modules/StatusAggregator.cpp | 6 +++++- .../structures/StatusAggregatorDeviceState.cpp | 6 +++++- 4 files changed, 36 insertions(+), 9 deletions(-) diff --git a/include/bringauto/structures/StatusAggregatorDeviceState.hpp b/include/bringauto/structures/StatusAggregatorDeviceState.hpp index c58fc664..e3758575 100644 --- a/include/bringauto/structures/StatusAggregatorDeviceState.hpp +++ b/include/bringauto/structures/StatusAggregatorDeviceState.hpp @@ -6,6 +6,7 @@ #include #include +#include #include #include @@ -55,14 +56,17 @@ class StatusAggregatorDeviceState { void setDefaultCommand(const modules::Buffer &commandBuffer); /** - * @brief Gets the most relevant command buffer. - * If there are external commands in the queue, they will be used first. - * Otherwise, the default command buffer will be used. - * Commands received from the queue are removed from it. + * @brief Consumes and returns the most relevant command buffer. + * If there are external commands in the queue, they will be dequeued and returned. + * For push-only devices (forwardCommandOnReceive == OK) with an empty queue, + * returns std::nullopt — the caller must send an empty DeviceCommand to satisfy the + * InternalProtocol response requirement without forwarding anything to the vehicle. + * For pull devices, falls back to the default command buffer when the queue is empty. + * Commands dequeued from the external queue are removed from it. * - * @return const Buffer& + * @return command buffer, or std::nullopt when push-only device has no pending command */ - [[nodiscard]] const modules::Buffer &getCommand(); + [[nodiscard]] std::optional consumeCommand(); /** * @brief Get aggregated messages queue diff --git a/source/bringauto/modules/ModuleHandler.cpp b/source/bringauto/modules/ModuleHandler.cpp index 4eeb108f..32a2ba57 100644 --- a/source/bringauto/modules/ModuleHandler.cpp +++ b/source/bringauto/modules/ModuleHandler.cpp @@ -4,6 +4,7 @@ #include #include +#include #include @@ -182,7 +183,12 @@ void ModuleHandler::handleCommandForward(const structures::DeviceIdentification const auto &statusAggregator = statusAggregators.at(moduleNumber); Buffer commandBuffer {}; - if(statusAggregator->get_command_for_forwarding(deviceId, commandBuffer) != OK) { + const int forwardRc = statusAggregator->get_command_for_forwarding(deviceId, commandBuffer); + if(forwardRc == NO_MESSAGE_AVAILABLE) { + settings::Logger::logDebug("Command already consumed by status path for push-only device: {}", deviceId.getDeviceName()); + return; + } + if(forwardRc != OK) { settings::Logger::logWarning("get_command_for_forwarding failed for device: {}", deviceId.getDeviceName()); return; } @@ -233,6 +239,15 @@ void ModuleHandler::handleStatus(const ip::DeviceStatus &status) const { commandBuffer); toInternalQueue_->pushAndNotify(structures::ModuleHandlerMessage(false, deviceCommandMessage)); settings::Logger::logDebug("Module handler successfully retrieved command and sent it to device: {}", deviceName); + } else if(getCommandRc == NO_MESSAGE_AVAILABLE) { + // Push-only device with no fresh command from ES. Send an empty DeviceCommand to + // satisfy the InternalProtocol 250ms response requirement. The bridge will detect + // empty commanddata and not forward it to the vehicle, letting the vehicle's own + // timeout trigger the safe-stop. + const auto emptyCommandMessage = common_utils::ProtobufUtils::createInternalServerCommandMessage( + device, Buffer{}); + toInternalQueue_->pushAndNotify(structures::ModuleHandlerMessage(false, emptyCommandMessage)); + settings::Logger::logDebug("No fresh command for push-only device {}, sending empty response", deviceName); } else { settings::Logger::logWarning("Retrieving command failed with return code: {}", getCommandRc); return; diff --git a/source/bringauto/modules/StatusAggregator.cpp b/source/bringauto/modules/StatusAggregator.cpp index e5996d7a..aeb62f58 100644 --- a/source/bringauto/modules/StatusAggregator.cpp +++ b/source/bringauto/modules/StatusAggregator.cpp @@ -221,7 +221,11 @@ int StatusAggregator::get_command(const Buffer& status, const structures::Device } auto &deviceState = devices.at(device); - if (module_->generateCommand(command, status, deviceState.getStatus(), deviceState.getCommand(), device_type) != OK) { + auto currentCommand = deviceState.consumeCommand(); + if (!currentCommand.has_value()) { + return NO_MESSAGE_AVAILABLE; + } + if (module_->generateCommand(command, status, deviceState.getStatus(), *currentCommand, device_type) != OK) { log::logError("Error occurred while generating command for device: {}", device.convertToString()); return COMMAND_INVALID; } diff --git a/source/bringauto/structures/StatusAggregatorDeviceState.cpp b/source/bringauto/structures/StatusAggregatorDeviceState.cpp index 16d35a85..cade4b66 100644 --- a/source/bringauto/structures/StatusAggregatorDeviceState.cpp +++ b/source/bringauto/structures/StatusAggregatorDeviceState.cpp @@ -33,11 +33,15 @@ void StatusAggregatorDeviceState::setDefaultCommand(const modules::Buffer &comma defaultCommand_ = commandBuffer; } -const modules::Buffer &StatusAggregatorDeviceState::getCommand() { +std::optional StatusAggregatorDeviceState::consumeCommand() { std::lock_guard lock { *externalCommandMutex_ }; if (!externalCommandQueue_.empty()) { defaultCommand_ = externalCommandQueue_.front(); externalCommandQueue_.pop(); + return defaultCommand_; + } + if (forwardCommandImmediately_) { + return std::nullopt; } return defaultCommand_; } From d28bd575d8d8b173025b2062aa2bd078a0a41c4f Mon Sep 17 00:00:00 2001 From: Daniel Prudky Date: Wed, 3 Jun 2026 08:02:19 +0200 Subject: [PATCH 6/9] refactor(modules): replace recursive_mutex with mutex in StatusAggregator - introduce private unlocked helpers (isDeviceValidUnlocked, clearDeviceUnlocked, forceAggregationOnDeviceUnlocked, getCommandUnlocked) that operate under an assumed held lock - update all callers that already hold devicesMutex_ to use the unlocked variants, eliminating recursive lock acquisition - replace mutable std::recursive_mutex devicesMutex_ with std::mutex - replace std::unique_ptr externalCommandMutex_ in StatusAggregatorDeviceState with a plain std::mutex member - switch devices.emplace() to try_emplace() to construct StatusAggregatorDeviceState in-place, avoiding a move constructor that std::mutex would have deleted --- .../bringauto/modules/StatusAggregator.hpp | 36 ++++++++++++- .../StatusAggregatorDeviceState.hpp | 2 +- source/bringauto/modules/StatusAggregator.cpp | 54 ++++++++++++------- .../StatusAggregatorDeviceState.cpp | 4 +- 4 files changed, 73 insertions(+), 23 deletions(-) diff --git a/include/bringauto/modules/StatusAggregator.hpp b/include/bringauto/modules/StatusAggregator.hpp index fef2474c..3e1e3cd2 100644 --- a/include/bringauto/modules/StatusAggregator.hpp +++ b/include/bringauto/modules/StatusAggregator.hpp @@ -163,6 +163,40 @@ class StatusAggregator { private: + /** + * @brief Check if device is valid without acquiring devicesMutex_. Caller must hold the lock. + * + * @param device device identification + * @return OK if device is registered and its type is supported, NOT_OK otherwise + */ + int isDeviceValidUnlocked(const structures::DeviceIdentification& device); + + /** + * @brief Force status message aggregation on a device without acquiring devicesMutex_. Caller must hold the lock. + * + * @param device device identification + * @return number of queued aggregated messages on success, DEVICE_NOT_REGISTERED if device is unknown + */ + int forceAggregationOnDeviceUnlocked(const structures::DeviceIdentification& device); + + /** + * @brief Clear state and messages for a device without acquiring devicesMutex_. Caller must hold the lock. + * + * @param device device identification + * @return OK on success, DEVICE_NOT_REGISTERED if device is unknown + */ + int clearDeviceUnlocked(const structures::DeviceIdentification& device); + + /** + * @brief Get command for a device without acquiring devicesMutex_. Caller must hold the lock. + * + * @param status current status buffer + * @param device device identification + * @param command output buffer for the generated command + * @return OK on success, DEVICE_NOT_SUPPORTED, NO_MESSAGE_AVAILABLE, or COMMAND_INVALID on error + */ + int getCommandUnlocked(const Buffer& status, const structures::DeviceIdentification& device, Buffer& command); + /** * @brief Aggregate status message * @@ -210,7 +244,7 @@ class StatusAggregator { std::unordered_map deviceTimeouts_ {}; /// Protects devices and deviceTimeouts_ against concurrent access from ModuleHandler threads and ExternalClient - mutable std::recursive_mutex devicesMutex_ {}; + mutable std::mutex devicesMutex_ {}; std::atomic_bool timeoutedMessageReady_ { false }; }; diff --git a/include/bringauto/structures/StatusAggregatorDeviceState.hpp b/include/bringauto/structures/StatusAggregatorDeviceState.hpp index e3758575..8b169799 100644 --- a/include/bringauto/structures/StatusAggregatorDeviceState.hpp +++ b/include/bringauto/structures/StatusAggregatorDeviceState.hpp @@ -104,7 +104,7 @@ class StatusAggregatorDeviceState { modules::Buffer defaultCommand_ {}; - std::unique_ptr externalCommandMutex_ { std::make_unique() }; + std::mutex externalCommandMutex_ {}; std::queue externalCommandQueue_ {}; diff --git a/source/bringauto/modules/StatusAggregator.cpp b/source/bringauto/modules/StatusAggregator.cpp index aeb62f58..2c4ebd8d 100644 --- a/source/bringauto/modules/StatusAggregator.cpp +++ b/source/bringauto/modules/StatusAggregator.cpp @@ -9,9 +9,8 @@ namespace bringauto::modules { using log = settings::Logger; -int StatusAggregator::clear_device(const structures::DeviceIdentification &device) { - std::lock_guard lock(devicesMutex_); - if(is_device_valid(device) == NOT_OK) { +int StatusAggregator::clearDeviceUnlocked(const structures::DeviceIdentification &device) { + if(isDeviceValidUnlocked(device) == NOT_OK) { return DEVICE_NOT_REGISTERED; } auto &deviceState = devices.at(device); @@ -22,6 +21,11 @@ int StatusAggregator::clear_device(const structures::DeviceIdentification &devic return OK; } +int StatusAggregator::clear_device(const structures::DeviceIdentification &device) { + std::lock_guard lock(devicesMutex_); + return clearDeviceUnlocked(device); +} + Buffer StatusAggregator::aggregateStatus(const structures::StatusAggregatorDeviceState &deviceState, const Buffer &status, const unsigned int &device_type) const { @@ -62,17 +66,17 @@ int StatusAggregator::destroy_status_aggregator() { int StatusAggregator::clear_all_devices() { std::lock_guard lock(devicesMutex_); for(auto &[key, device]: devices) { - clear_device(key); + clearDeviceUnlocked(key); } return OK; } int StatusAggregator::remove_device(const structures::DeviceIdentification& device) { std::lock_guard lock(devicesMutex_); - if(is_device_valid(device) == NOT_OK) { + if(isDeviceValidUnlocked(device) == NOT_OK) { return DEVICE_NOT_REGISTERED; } - clear_device(device); + clearDeviceUnlocked(device); boost::asio::post(context_->ioContext, [this, device]() { std::lock_guard postLock(devicesMutex_); @@ -104,8 +108,7 @@ int StatusAggregator::add_status_to_aggregator(const Buffer& status, deviceTimeouts_[deviceId]++; return force_aggregation_on_device(deviceId); }; - devices.emplace( - device, structures::StatusAggregatorDeviceState(context_, timeouted_force_aggregation, device, commandBuffer, status)); + devices.try_emplace(device, context_, timeouted_force_aggregation, device, commandBuffer, status); const int forwardOnReceive = module_->forwardCommandOnReceive(device_type); log::logInfo("forwardCommandOnReceive for device {} (type={}): rc={}", device.convertToString(), device_type, forwardOnReceive); @@ -114,7 +117,7 @@ int StatusAggregator::add_status_to_aggregator(const Buffer& status, log::logInfo("Immediate command forwarding ENABLED for device {}", device.convertToString()); } - force_aggregation_on_device(device); + forceAggregationOnDeviceUnlocked(device); return 1; } @@ -133,7 +136,7 @@ int StatusAggregator::add_status_to_aggregator(const Buffer& status, int StatusAggregator::get_aggregated_status(Buffer &generated_status, const structures::DeviceIdentification& device) { std::lock_guard lock(devicesMutex_); - if(is_device_valid(device) == NOT_OK) { + if(isDeviceValidUnlocked(device) == NOT_OK) { log::logError("Trying to get aggregated status from unregistered device"); return DEVICE_NOT_REGISTERED; } @@ -162,9 +165,8 @@ int StatusAggregator::get_unique_devices(std::listgetModuleNumber(); } int StatusAggregator::update_command(const Buffer& command, const structures::DeviceIdentification& device) { @@ -211,9 +222,8 @@ int StatusAggregator::update_command(const Buffer& command, const structures::De return OK; } -int StatusAggregator::get_command(const Buffer& status, const structures::DeviceIdentification& device, - Buffer& command) { - std::lock_guard lock(devicesMutex_); +int StatusAggregator::getCommandUnlocked(const Buffer& status, const structures::DeviceIdentification& device, + Buffer& command) { const auto &device_type = device.getDeviceType(); if(is_device_type_supported(device_type) == NOT_OK) { log::logError("Device type {} is not supported", device_type); @@ -234,14 +244,20 @@ int StatusAggregator::get_command(const Buffer& status, const structures::Device return OK; } +int StatusAggregator::get_command(const Buffer& status, const structures::DeviceIdentification& device, + Buffer& command) { + std::lock_guard lock(devicesMutex_); + return getCommandUnlocked(status, device, command); +} + int StatusAggregator::get_command_for_forwarding(const structures::DeviceIdentification &device, Buffer &command) { std::lock_guard lock(devicesMutex_); - if(is_device_valid(device) == NOT_OK) { + if(isDeviceValidUnlocked(device) == NOT_OK) { log::logError("Trying to get forwarding command for unregistered device: {}", device.convertToString()); return DEVICE_NOT_REGISTERED; } const auto &cachedStatus = devices.at(device).getStatus(); - return get_command(cachedStatus, device, command); + return getCommandUnlocked(cachedStatus, device, command); } int StatusAggregator::is_device_type_supported(unsigned int device_type) { diff --git a/source/bringauto/structures/StatusAggregatorDeviceState.cpp b/source/bringauto/structures/StatusAggregatorDeviceState.cpp index cade4b66..e2a5d015 100644 --- a/source/bringauto/structures/StatusAggregatorDeviceState.cpp +++ b/source/bringauto/structures/StatusAggregatorDeviceState.cpp @@ -34,7 +34,7 @@ void StatusAggregatorDeviceState::setDefaultCommand(const modules::Buffer &comma } std::optional StatusAggregatorDeviceState::consumeCommand() { - std::lock_guard lock { *externalCommandMutex_ }; + std::lock_guard lock { externalCommandMutex_ }; if (!externalCommandQueue_.empty()) { defaultCommand_ = externalCommandQueue_.front(); externalCommandQueue_.pop(); @@ -51,7 +51,7 @@ std::queue &StatusAggregatorDeviceState::aggregatedMessa } int StatusAggregatorDeviceState::addExternalCommand(const modules::Buffer &commandBuffer) { - std::lock_guard lock { *externalCommandMutex_ }; + std::lock_guard lock { externalCommandMutex_ }; externalCommandQueue_.push(commandBuffer); if (externalCommandQueue_.size() > settings::max_external_commands) { externalCommandQueue_.pop(); From 2cdb253fd6bebb6217e1b30470ee2ad1434553ef Mon Sep 17 00:00:00 2001 From: Daniel Prudky Date: Wed, 3 Jun 2026 08:24:35 +0200 Subject: [PATCH 7/9] refactor(modules): replace void* with FunctionPtr in dlsym helpers - introduce FunctionPtr = void(*)() alias to express that checkFunction and checkOptionalFunction return function pointers, not data pointers --- .../modules/ModuleManagerLibraryHandlerLocal.hpp | 7 +++++-- .../modules/ModuleManagerLibraryHandlerLocal.cpp | 8 ++++---- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/include/bringauto/modules/ModuleManagerLibraryHandlerLocal.hpp b/include/bringauto/modules/ModuleManagerLibraryHandlerLocal.hpp index 10220040..f3d1ce44 100644 --- a/include/bringauto/modules/ModuleManagerLibraryHandlerLocal.hpp +++ b/include/bringauto/modules/ModuleManagerLibraryHandlerLocal.hpp @@ -6,6 +6,9 @@ namespace bringauto::modules { +/// Generic function pointer type used as the return type of dlsym lookups. +using FunctionPtr = void(*)(); + /** * @brief Class used to load and handle library created by module maintainer */ @@ -69,13 +72,13 @@ class ModuleManagerLibraryHandlerLocal : public IModuleManagerLibraryHandler { void deallocate(struct buffer *buffer) const; - void *checkFunction(const char *functionName) const; + FunctionPtr checkFunction(const char *functionName) const; /** * @brief Look up an optional symbol in the loaded library. * @return function pointer, or nullptr if the symbol is not exported */ - void *checkOptionalFunction(const char *functionName) const; + FunctionPtr checkOptionalFunction(const char *functionName) const; /** * @brief Constructs a buffer with the same raw c buffer as provided diff --git a/source/bringauto/modules/ModuleManagerLibraryHandlerLocal.cpp b/source/bringauto/modules/ModuleManagerLibraryHandlerLocal.cpp index 37007bac..cf927dce 100644 --- a/source/bringauto/modules/ModuleManagerLibraryHandlerLocal.cpp +++ b/source/bringauto/modules/ModuleManagerLibraryHandlerLocal.cpp @@ -65,16 +65,16 @@ void ModuleManagerLibraryHandlerLocal::loadLibrary(const std::filesystem::path & log::logDebug("Library " + path.string() + " was successfully loaded"); } -void *ModuleManagerLibraryHandlerLocal::checkFunction(const char *functionName) const { - const auto function = dlsym(module_, functionName); +FunctionPtr ModuleManagerLibraryHandlerLocal::checkFunction(const char *functionName) const { + const auto function = reinterpret_cast(dlsym(module_, functionName)); if(not function) { throw std::runtime_error {"Function " + std::string(functionName) + " is not included in library"}; } return function; } -void *ModuleManagerLibraryHandlerLocal::checkOptionalFunction(const char *functionName) const { - return dlsym(module_, functionName); +FunctionPtr ModuleManagerLibraryHandlerLocal::checkOptionalFunction(const char *functionName) const { + return reinterpret_cast(dlsym(module_, functionName)); } int ModuleManagerLibraryHandlerLocal::getModuleNumber() const { From 5e14e0c4df281b04c952a2c411e065594470addd Mon Sep 17 00:00:00 2001 From: Daniel Prudky Date: Wed, 3 Jun 2026 08:36:51 +0200 Subject: [PATCH 8/9] refactor(modules): suppress Sonar false positive on dlsym reinterpret_cast --- source/bringauto/modules/ModuleManagerLibraryHandlerLocal.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/bringauto/modules/ModuleManagerLibraryHandlerLocal.cpp b/source/bringauto/modules/ModuleManagerLibraryHandlerLocal.cpp index cf927dce..da473f57 100644 --- a/source/bringauto/modules/ModuleManagerLibraryHandlerLocal.cpp +++ b/source/bringauto/modules/ModuleManagerLibraryHandlerLocal.cpp @@ -66,7 +66,7 @@ void ModuleManagerLibraryHandlerLocal::loadLibrary(const std::filesystem::path & } FunctionPtr ModuleManagerLibraryHandlerLocal::checkFunction(const char *functionName) const { - const auto function = reinterpret_cast(dlsym(module_, functionName)); + const auto function = reinterpret_cast(dlsym(module_, functionName)); // NOSONAR if(not function) { throw std::runtime_error {"Function " + std::string(functionName) + " is not included in library"}; } @@ -74,7 +74,7 @@ FunctionPtr ModuleManagerLibraryHandlerLocal::checkFunction(const char *function } FunctionPtr ModuleManagerLibraryHandlerLocal::checkOptionalFunction(const char *functionName) const { - return reinterpret_cast(dlsym(module_, functionName)); + return reinterpret_cast(dlsym(module_, functionName)); // NOSONAR } int ModuleManagerLibraryHandlerLocal::getModuleNumber() const { From 109c8c05c7d4798613e6953f582241f7c6de93e7 Mon Sep 17 00:00:00 2001 From: Daniel Prudky Date: Wed, 3 Jun 2026 08:55:51 +0200 Subject: [PATCH 9/9] refactor(modules): suppress Sonar false positive on dlsym reinterpret_cast Co-Authored-By: Claude Sonnet 4.6 --- source/bringauto/modules/ModuleManagerLibraryHandlerLocal.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/bringauto/modules/ModuleManagerLibraryHandlerLocal.cpp b/source/bringauto/modules/ModuleManagerLibraryHandlerLocal.cpp index da473f57..1e06b985 100644 --- a/source/bringauto/modules/ModuleManagerLibraryHandlerLocal.cpp +++ b/source/bringauto/modules/ModuleManagerLibraryHandlerLocal.cpp @@ -57,7 +57,7 @@ void ModuleManagerLibraryHandlerLocal::loadLibrary(const std::filesystem::path & "allocate")); deallocate_ = reinterpret_cast::fncptr>(checkFunction( "deallocate")); - forwardCommandOnReceive_ = reinterpret_cast::fncptr>( + forwardCommandOnReceive_ = reinterpret_cast::fncptr>( // NOSONAR: dlsym returns void* by POSIX API contract; reinterpret_cast to function pointer is unavoidable here checkOptionalFunction("forward_command_on_receive")); if(forwardCommandOnReceive_) { log::logDebug("Library " + path.string() + " supports forward_command_on_receive");