diff --git a/include/bringauto/external_client/ExternalClient.hpp b/include/bringauto/external_client/ExternalClient.hpp index 24a2fefd..9f8c92f6 100644 --- a/include/bringauto/external_client/ExternalClient.hpp +++ b/include/bringauto/external_client/ExternalClient.hpp @@ -24,7 +24,8 @@ class ExternalClient { ExternalClient(const std::shared_ptr &context, structures::ModuleLibrary &moduleLibrary, - const std::shared_ptr> &toExternalQueue); + const std::shared_ptr> &toExternalQueue, + const std::shared_ptr> &commandForwardingQueue); /** * @brief Initialize connections, error aggregators @@ -95,10 +96,12 @@ class ExternalClient { std::unordered_map> externalConnectionMap_ {}; /// List of external connections, each device can have its own connection or multiple devices can share one connection std::list externalConnectionsList_ {}; - /// Queue for messages from module handler to external client to be sent to external server + /// Queue for messages from module handler to external client to be sent to external server std::shared_ptr> toExternalQueue_; /// Queue for device commands received by external client to module handler std::shared_ptr> fromExternalQueue_ {}; + /// Queue shared with ModuleHandler; used to push command-forward events for immediate dispatch + std::shared_ptr> commandForwardingQueue_ {}; std::shared_ptr> reconnectQueue_ {}; diff --git a/include/bringauto/modules/IModuleManagerLibraryHandler.hpp b/include/bringauto/modules/IModuleManagerLibraryHandler.hpp index efe94309..f3606a6d 100644 --- a/include/bringauto/modules/IModuleManagerLibraryHandler.hpp +++ b/include/bringauto/modules/IModuleManagerLibraryHandler.hpp @@ -2,6 +2,7 @@ #include +#include #include #include @@ -103,6 +104,17 @@ class IModuleManagerLibraryHandler { */ virtual int commandDataValid(const Buffer &command, unsigned int device_type) const = 0; + /** + * @brief Determine whether Module Gateway should forward a received command to the device + * immediately upon receipt, without waiting for the next status message. + * + * Optional — modules that do not implement this default to NOT_OK (existing behaviour). + * + * @param device_type device type + * @return OK to forward immediately, NOT_OK to forward on next status + */ + virtual int forwardCommandOnReceive(unsigned int /*device_type*/) { return NOT_OK; } + /** * @brief Constructs a buffer with the given size * diff --git a/include/bringauto/modules/ModuleHandler.hpp b/include/bringauto/modules/ModuleHandler.hpp index 1ae711d3..e06403ae 100644 --- a/include/bringauto/modules/ModuleHandler.hpp +++ b/include/bringauto/modules/ModuleHandler.hpp @@ -19,11 +19,12 @@ class ModuleHandler { const std::shared_ptr &context, structures::ModuleLibrary &moduleLibrary, const std::shared_ptr > &fromInternalQueue, + const std::shared_ptr > &commandForwardingQueue, const std::shared_ptr > &toInternalQueue, const std::shared_ptr > &toExternalQueue) - : context_ { context }, moduleLibrary_ { moduleLibrary }, fromInternalQueue_ { fromInternalQueue }, - toInternalQueue_ { toInternalQueue }, - toExternalQueue_ { toExternalQueue } {} + : context_ { context }, moduleLibrary_ { moduleLibrary }, + fromInternalQueue_ { fromInternalQueue }, commandForwardingQueue_ { commandForwardingQueue }, + toInternalQueue_ { toInternalQueue }, toExternalQueue_ { toExternalQueue } {} /** * @brief Start Module handler @@ -47,6 +48,12 @@ class ModuleHandler { */ void handleMessages() const; + /** + * @brief Process command-forward events from ExternalClient independently of handleMessages. + * Runs in its own thread started by run(). + */ + void handleCommandForwards() const; + /** * @brief Check if there are any timeouted messages * @@ -91,6 +98,14 @@ class ModuleHandler { */ void handleStatus(const InternalProtocol::DeviceStatus &status) const; + /** + * @brief Forward a pending command to a device immediately, using the cached status. + * Triggered when ExternalClient receives a command for a module with forward_command_on_receive enabled. + * + * @param deviceId device that has a command pending in the external command queue + */ + void handleCommandForward(const structures::DeviceIdentification &deviceId) const; + /** * @brief Throws an error if external queue size is too big */ @@ -99,8 +114,10 @@ class ModuleHandler { std::shared_ptr context_ {}; structures::ModuleLibrary &moduleLibrary_; - /// Queue for incoming messages from internal server to be processed + /// Queue for incoming messages from internal server (connect/status/disconnect) std::shared_ptr > fromInternalQueue_ {}; + /// Queue for command-forward events from external client + std::shared_ptr > commandForwardingQueue_ {}; /// Queue for outgoing messages to internal server to be forwarded to devices std::shared_ptr > toInternalQueue_ {}; /// Queue for outgoing messages to external server to be forwarded to external server diff --git a/include/bringauto/modules/ModuleManagerLibraryHandlerAsync.hpp b/include/bringauto/modules/ModuleManagerLibraryHandlerAsync.hpp index 4f7f92e2..8855ce6f 100644 --- a/include/bringauto/modules/ModuleManagerLibraryHandlerAsync.hpp +++ b/include/bringauto/modules/ModuleManagerLibraryHandlerAsync.hpp @@ -78,6 +78,8 @@ class ModuleManagerLibraryHandlerAsync : public IModuleManagerLibraryHandler { int commandDataValid(const Buffer &command, unsigned int device_type) const override; + int forwardCommandOnReceive(unsigned int device_type) override; + /** * @brief Constructs a buffer with the given size * diff --git a/include/bringauto/modules/ModuleManagerLibraryHandlerLocal.hpp b/include/bringauto/modules/ModuleManagerLibraryHandlerLocal.hpp index eaa24ed4..f3d1ce44 100644 --- a/include/bringauto/modules/ModuleManagerLibraryHandlerLocal.hpp +++ b/include/bringauto/modules/ModuleManagerLibraryHandlerLocal.hpp @@ -6,6 +6,9 @@ namespace bringauto::modules { +/// Generic function pointer type used as the return type of dlsym lookups. +using FunctionPtr = void(*)(); + /** * @brief Class used to load and handle library created by module maintainer */ @@ -53,6 +56,8 @@ class ModuleManagerLibraryHandlerLocal : public IModuleManagerLibraryHandler { int commandDataValid(const Buffer &command, unsigned int device_type) const override; + int forwardCommandOnReceive(unsigned int device_type) override; + /** * @brief Constructs a buffer with the given size * @@ -67,7 +72,13 @@ class ModuleManagerLibraryHandlerLocal : public IModuleManagerLibraryHandler { void deallocate(struct buffer *buffer) const; - void *checkFunction(const char *functionName) const; + FunctionPtr checkFunction(const char *functionName) const; + + /** + * @brief Look up an optional symbol in the loaded library. + * @return function pointer, or nullptr if the symbol is not exported + */ + FunctionPtr checkOptionalFunction(const char *functionName) const; /** * @brief Constructs a buffer with the same raw c buffer as provided @@ -93,6 +104,8 @@ class ModuleManagerLibraryHandlerLocal : public IModuleManagerLibraryHandler { std::function generateCommand_ {}; std::function allocate_ {}; std::function deallocate_ {}; + /// Optional — nullptr when the module does not export forward_command_on_receive + std::function forwardCommandOnReceive_ {}; }; } \ No newline at end of file diff --git a/include/bringauto/modules/StatusAggregator.hpp b/include/bringauto/modules/StatusAggregator.hpp index 5043df77..3e1e3cd2 100644 --- a/include/bringauto/modules/StatusAggregator.hpp +++ b/include/bringauto/modules/StatusAggregator.hpp @@ -6,6 +6,7 @@ #include #include +#include @@ -115,6 +116,16 @@ class StatusAggregator { int get_command(const Buffer& status, const structures::DeviceIdentification& device, Buffer &command); + /** + * @brief Get a command for immediate forwarding, using the cached status as input. + * Called when a command arrived and the module requested immediate forwarding. + * + * @param device device identification + * @param command output buffer for the generated command + * @return OK on success, DEVICE_NOT_REGISTERED or COMMAND_INVALID on error + */ + int get_command_for_forwarding(const structures::DeviceIdentification& device, Buffer& command); + /** * @short Get number of the module * @@ -152,6 +163,40 @@ class StatusAggregator { private: + /** + * @brief Check if device is valid without acquiring devicesMutex_. Caller must hold the lock. + * + * @param device device identification + * @return OK if device is registered and its type is supported, NOT_OK otherwise + */ + int isDeviceValidUnlocked(const structures::DeviceIdentification& device); + + /** + * @brief Force status message aggregation on a device without acquiring devicesMutex_. Caller must hold the lock. + * + * @param device device identification + * @return number of queued aggregated messages on success, DEVICE_NOT_REGISTERED if device is unknown + */ + int forceAggregationOnDeviceUnlocked(const structures::DeviceIdentification& device); + + /** + * @brief Clear state and messages for a device without acquiring devicesMutex_. Caller must hold the lock. + * + * @param device device identification + * @return OK on success, DEVICE_NOT_REGISTERED if device is unknown + */ + int clearDeviceUnlocked(const structures::DeviceIdentification& device); + + /** + * @brief Get command for a device without acquiring devicesMutex_. Caller must hold the lock. + * + * @param status current status buffer + * @param device device identification + * @param command output buffer for the generated command + * @return OK on success, DEVICE_NOT_SUPPORTED, NO_MESSAGE_AVAILABLE, or COMMAND_INVALID on error + */ + int getCommandUnlocked(const Buffer& status, const structures::DeviceIdentification& device, Buffer& command); + /** * @brief Aggregate status message * @@ -198,6 +243,9 @@ class StatusAggregator { */ std::unordered_map deviceTimeouts_ {}; + /// Protects devices and deviceTimeouts_ against concurrent access from ModuleHandler threads and ExternalClient + mutable std::mutex devicesMutex_ {}; + std::atomic_bool timeoutedMessageReady_ { false }; }; diff --git a/include/bringauto/structures/InternalClientMessage.hpp b/include/bringauto/structures/InternalClientMessage.hpp index 6e58ed5d..9d2f8789 100644 --- a/include/bringauto/structures/InternalClientMessage.hpp +++ b/include/bringauto/structures/InternalClientMessage.hpp @@ -19,6 +19,15 @@ class InternalClientMessage { deviceId_ { deviceId } {} + /** + * @brief Create a command-forward event. Used by ExternalClient to wake up ModuleHandler + * for immediate command dispatch when the module requested forward_command_on_receive. + * + * @param deviceId device that has a pending command to forward + * @return InternalClientMessage tagged as command-forward + */ + static InternalClientMessage makeCommandForward(const DeviceIdentification &deviceId); + explicit InternalClientMessage(bool disconnect, const InternalProtocol::InternalClient &message): message_ { message }, disconnect_ { disconnect } @@ -44,6 +53,11 @@ class InternalClientMessage { */ bool disconnected() const; + /** + * @brief Returns true if this message is a command-forward event (no protobuf payload). + */ + [[nodiscard]] bool isCommandForward() const noexcept; + /** * @brief Get device identification struct * @@ -52,10 +66,16 @@ class InternalClientMessage { const DeviceIdentification &getDeviceId() const; private: + InternalClientMessage(const DeviceIdentification &deviceId, bool commandForward) + : disconnect_ { false }, commandForward_ { commandForward }, deviceId_ { deviceId } + {} + /// Internal client message InternalProtocol::InternalClient message_ {}; /// True if device is disconnected otherwise false bool disconnect_; + /// True if this is a command-forward event (no protobuf payload) + bool commandForward_ { false }; /// Device identification struct DeviceIdentification deviceId_ {}; }; diff --git a/include/bringauto/structures/StatusAggregatorDeviceState.hpp b/include/bringauto/structures/StatusAggregatorDeviceState.hpp index 13c80d2d..8b169799 100644 --- a/include/bringauto/structures/StatusAggregatorDeviceState.hpp +++ b/include/bringauto/structures/StatusAggregatorDeviceState.hpp @@ -5,6 +5,8 @@ #include #include +#include +#include #include #include @@ -54,14 +56,17 @@ class StatusAggregatorDeviceState { void setDefaultCommand(const modules::Buffer &commandBuffer); /** - * @brief Gets the most relevant command buffer. - * If there are external commands in the queue, they will be used first. - * Otherwise, the default command buffer will be used. - * Commands received from the queue are removed from it. + * @brief Consumes and returns the most relevant command buffer. + * If there are external commands in the queue, they will be dequeued and returned. + * For push-only devices (forwardCommandOnReceive == OK) with an empty queue, + * returns std::nullopt — the caller must send an empty DeviceCommand to satisfy the + * InternalProtocol response requirement without forwarding anything to the vehicle. + * For pull devices, falls back to the default command buffer when the queue is empty. + * Commands dequeued from the external queue are removed from it. * - * @return const Buffer& + * @return command buffer, or std::nullopt when push-only device has no pending command */ - [[nodiscard]] const modules::Buffer &getCommand(); + [[nodiscard]] std::optional consumeCommand(); /** * @brief Get aggregated messages queue @@ -79,6 +84,17 @@ class StatusAggregatorDeviceState { */ int addExternalCommand(const modules::Buffer &commandBuffer); + /** + * @brief Mark this device as requiring immediate command forwarding. + * Called once at device registration based on the module's forwardCommandOnReceive response. + */ + void enableImmediateCommandForwarding() noexcept; + + /** + * @brief Returns true if this device requires commands to be forwarded immediately upon receipt. + */ + [[nodiscard]] bool isForwardCommandImmediately() const noexcept; + private: std::unique_ptr timer_ {}; @@ -88,7 +104,11 @@ class StatusAggregatorDeviceState { modules::Buffer defaultCommand_ {}; + std::mutex externalCommandMutex_ {}; + std::queue externalCommandQueue_ {}; + + bool forwardCommandImmediately_ { false }; }; } diff --git a/main.cpp b/main.cpp index 78c6c70e..72a2db83 100644 --- a/main.cpp +++ b/main.cpp @@ -77,12 +77,13 @@ int main(int argc, char **argv) { auto toInternalQueue = std::make_shared>(); auto fromInternalQueue = std::make_shared>(); + auto commandForwardingQueue = std::make_shared>(); auto toExternalQueue = std::make_shared>(); bais::InternalServer internalServer { context, fromInternalQueue, toInternalQueue }; - bringauto::modules::ModuleHandler moduleHandler { context, moduleLibrary, fromInternalQueue, toInternalQueue, - toExternalQueue }; - bringauto::external_client::ExternalClient externalClient { context, moduleLibrary, toExternalQueue }; + bringauto::modules::ModuleHandler moduleHandler { context, moduleLibrary, fromInternalQueue, + commandForwardingQueue, toInternalQueue, toExternalQueue }; + bringauto::external_client::ExternalClient externalClient { context, moduleLibrary, toExternalQueue, commandForwardingQueue }; std::jthread moduleHandlerThread([&moduleHandler]() { moduleHandler.run(); }); std::jthread externalClientThread([&externalClient]() { externalClient.run(); }); diff --git a/source/bringauto/external_client/ExternalClient.cpp b/source/bringauto/external_client/ExternalClient.cpp index f966898d..720b3cd8 100644 --- a/source/bringauto/external_client/ExternalClient.cpp +++ b/source/bringauto/external_client/ExternalClient.cpp @@ -8,6 +8,7 @@ #include #include +#include #include @@ -20,8 +21,10 @@ namespace ip = InternalProtocol; ExternalClient::ExternalClient(const std::shared_ptr &context, structures::ModuleLibrary &moduleLibrary, - const std::shared_ptr> &toExternalQueue): + const std::shared_ptr> &toExternalQueue, + const std::shared_ptr> &commandForwardingQueue): toExternalQueue_ { toExternalQueue }, + commandForwardingQueue_ { commandForwardingQueue }, context_ { context }, moduleLibrary_ { moduleLibrary }, timer_ { context->ioContext } { @@ -65,9 +68,14 @@ void ExternalClient::handleCommand(const InternalProtocol::DeviceCommand &device const auto deviceId = structures::DeviceIdentification(device); - int ret = it->second->update_command(commandBuffer, deviceId); + const int ret = it->second->update_command(commandBuffer, deviceId); if (ret == OK) { - settings::Logger::logInfo("Command for device {} was added to queue", device.devicename()); + if (moduleLibraryHandler->forwardCommandOnReceive(deviceId.getDeviceType()) == OK) { + settings::Logger::logInfo("Command for device {} was added to queue, forwarding immediately", device.devicename()); + commandForwardingQueue_->pushAndNotify(structures::InternalClientMessage::makeCommandForward(deviceId)); + } else { + settings::Logger::logInfo("Command for device {} was added to queue", device.devicename()); + } } } diff --git a/source/bringauto/internal_server/InternalServer.cpp b/source/bringauto/internal_server/InternalServer.cpp index 4f17722c..ed065e72 100644 --- a/source/bringauto/internal_server/InternalServer.cpp +++ b/source/bringauto/internal_server/InternalServer.cpp @@ -102,12 +102,11 @@ void InternalServer::asyncReceiveHandler( bool InternalServer::processBufferData( const std::shared_ptr &connection, std::size_t bytesTransferred, std::size_t bufferOffset) { - if(bytesTransferred < bufferOffset) { + if(bufferOffset + bytesTransferred > connection->connContext.buffer.size()) { log::logError( - "Error in processBufferData(...): bufferOffset: {} is greater than bytesTransferred: {}, " - "Invalid bufferOffset: {} received from Internal Client, " - "connection's ip address is {}", bufferOffset, bytesTransferred, bufferOffset, - connection->remoteEndpointAddress()); + "Error in processBufferData(...): bufferOffset: {} + bytesTransferred: {} exceeds buffer size: {}, " + "connection's ip address is {}", bufferOffset, bytesTransferred, + connection->connContext.buffer.size(), connection->remoteEndpointAddress()); return false; } @@ -170,7 +169,7 @@ bool InternalServer::processBufferData( connection->remoteEndpointAddress()); return false; } - if(bytesLeft && !processBufferData(connection, bytesLeft, bytesTransferred - bytesLeft)) { + if(bytesLeft && !processBufferData(connection, bytesLeft, bufferOffset + (bytesTransferred - bytesLeft))) { log::logError("Error in processBufferData(...): " "Received extra invalid bytes of data: {} from Internal Client, " "connection's ip address is {}", bytesLeft, diff --git a/source/bringauto/modules/ModuleHandler.cpp b/source/bringauto/modules/ModuleHandler.cpp index a05e34a9..32a2ba57 100644 --- a/source/bringauto/modules/ModuleHandler.cpp +++ b/source/bringauto/modules/ModuleHandler.cpp @@ -4,6 +4,9 @@ #include #include +#include + +#include @@ -15,12 +18,16 @@ void ModuleHandler::destroy() const { while(not fromInternalQueue_->empty()) { fromInternalQueue_->pop(); } + while(not commandForwardingQueue_->empty()) { + commandForwardingQueue_->pop(); + } settings::Logger::logInfo("Module handler stopped"); } void ModuleHandler::run() const { settings::Logger::logInfo("Module handler started, constants used: queue_timeout_length: {}, status_aggregation_timeout: {}", settings::queue_timeout_length.count(), settings::status_aggregation_timeout.count()); + std::jthread forwardThread([this]() { handleCommandForwards(); }); handleMessages(); } @@ -44,6 +51,16 @@ void ModuleHandler::handleMessages() const { } } +void ModuleHandler::handleCommandForwards() const { + while(not context_->ioContext.stopped()) { + if(commandForwardingQueue_->waitForValueWithTimeout(settings::queue_timeout_length)) { + continue; + } + handleCommandForward(commandForwardingQueue_->front().getDeviceId()); + commandForwardingQueue_->pop(); + } +} + void ModuleHandler::checkTimeoutedMessages() const { for (const auto& [key, statusAggregator] : moduleLibrary_.statusAggregators) { if(statusAggregator->getTimeoutedMessageReady()){ @@ -156,6 +173,32 @@ ModuleHandler::sendConnectResponse(const ip::Device &device, ip::DeviceConnectRe settings::Logger::logInfo("New device {} is trying to connect, sending response {}", device.devicename(), static_cast(response_type)); } +void ModuleHandler::handleCommandForward(const structures::DeviceIdentification &deviceId) const { + const auto &moduleNumber = deviceId.getModule(); + const auto &statusAggregators = moduleLibrary_.statusAggregators; + if(not statusAggregators.contains(moduleNumber)) { + settings::Logger::logWarning("Module number: {} is not supported in handleCommandForward", moduleNumber); + return; + } + + const auto &statusAggregator = statusAggregators.at(moduleNumber); + Buffer commandBuffer {}; + const int forwardRc = statusAggregator->get_command_for_forwarding(deviceId, commandBuffer); + if(forwardRc == NO_MESSAGE_AVAILABLE) { + settings::Logger::logDebug("Command already consumed by status path for push-only device: {}", deviceId.getDeviceName()); + return; + } + if(forwardRc != OK) { + settings::Logger::logWarning("get_command_for_forwarding failed for device: {}", deviceId.getDeviceName()); + return; + } + + const auto device = deviceId.convertToIPDevice(); + const auto deviceCommandMessage = common_utils::ProtobufUtils::createInternalServerCommandMessage(device, commandBuffer); + toInternalQueue_->pushAndNotify(structures::ModuleHandlerMessage(false, deviceCommandMessage)); + settings::Logger::logDebug("Module handler forwarded command immediately for device: {}", deviceId.getDeviceName()); +} + void ModuleHandler::handleStatus(const ip::DeviceStatus &status) const { const auto &device = status.device(); const auto &moduleNumber = device.module(); @@ -196,6 +239,15 @@ void ModuleHandler::handleStatus(const ip::DeviceStatus &status) const { commandBuffer); toInternalQueue_->pushAndNotify(structures::ModuleHandlerMessage(false, deviceCommandMessage)); settings::Logger::logDebug("Module handler successfully retrieved command and sent it to device: {}", deviceName); + } else if(getCommandRc == NO_MESSAGE_AVAILABLE) { + // Push-only device with no fresh command from ES. Send an empty DeviceCommand to + // satisfy the InternalProtocol 250ms response requirement. The bridge will detect + // empty commanddata and not forward it to the vehicle, letting the vehicle's own + // timeout trigger the safe-stop. + const auto emptyCommandMessage = common_utils::ProtobufUtils::createInternalServerCommandMessage( + device, Buffer{}); + toInternalQueue_->pushAndNotify(structures::ModuleHandlerMessage(false, emptyCommandMessage)); + settings::Logger::logDebug("No fresh command for push-only device {}, sending empty response", deviceName); } else { settings::Logger::logWarning("Retrieving command failed with return code: {}", getCommandRc); return; diff --git a/source/bringauto/modules/ModuleManagerLibraryHandlerAsync.cpp b/source/bringauto/modules/ModuleManagerLibraryHandlerAsync.cpp index 70c1ec3e..9acb81c3 100644 --- a/source/bringauto/modules/ModuleManagerLibraryHandlerAsync.cpp +++ b/source/bringauto/modules/ModuleManagerLibraryHandlerAsync.cpp @@ -210,6 +210,10 @@ int ModuleManagerLibraryHandlerAsync::commandDataValid(const Buffer &command, un return aeronClient_.callFunc(fp_async::commandDataValidAsync, command_raw_buffer, device_type).value_or(NOT_OK); } +int ModuleManagerLibraryHandlerAsync::forwardCommandOnReceive(unsigned int /*device_type*/) { + return NOT_OK; +} + Buffer ModuleManagerLibraryHandlerAsync::constructBuffer(std::size_t size) { if(size == 0) { return Buffer {}; diff --git a/source/bringauto/modules/ModuleManagerLibraryHandlerLocal.cpp b/source/bringauto/modules/ModuleManagerLibraryHandlerLocal.cpp index 2a15a7e8..1e06b985 100644 --- a/source/bringauto/modules/ModuleManagerLibraryHandlerLocal.cpp +++ b/source/bringauto/modules/ModuleManagerLibraryHandlerLocal.cpp @@ -57,17 +57,26 @@ void ModuleManagerLibraryHandlerLocal::loadLibrary(const std::filesystem::path & "allocate")); deallocate_ = reinterpret_cast::fncptr>(checkFunction( "deallocate")); + forwardCommandOnReceive_ = reinterpret_cast::fncptr>( // NOSONAR: dlsym returns void* by POSIX API contract; reinterpret_cast to function pointer is unavoidable here + checkOptionalFunction("forward_command_on_receive")); + if(forwardCommandOnReceive_) { + log::logDebug("Library " + path.string() + " supports forward_command_on_receive"); + } log::logDebug("Library " + path.string() + " was successfully loaded"); } -void *ModuleManagerLibraryHandlerLocal::checkFunction(const char *functionName) const { - const auto function = dlsym(module_, functionName); +FunctionPtr ModuleManagerLibraryHandlerLocal::checkFunction(const char *functionName) const { + const auto function = reinterpret_cast(dlsym(module_, functionName)); // NOSONAR if(not function) { throw std::runtime_error {"Function " + std::string(functionName) + " is not included in library"}; } return function; } +FunctionPtr ModuleManagerLibraryHandlerLocal::checkOptionalFunction(const char *functionName) const { + return reinterpret_cast(dlsym(module_, functionName)); // NOSONAR +} + int ModuleManagerLibraryHandlerLocal::getModuleNumber() const { return getModuleNumber_(); } @@ -196,6 +205,13 @@ int ModuleManagerLibraryHandlerLocal::commandDataValid(const Buffer &command, un return commandDataValid_(raw_buffer, device_type); } +int ModuleManagerLibraryHandlerLocal::forwardCommandOnReceive(unsigned int device_type) { + if(!forwardCommandOnReceive_) { + return NOT_OK; + } + return forwardCommandOnReceive_(device_type); +} + int ModuleManagerLibraryHandlerLocal::allocate(struct buffer *buffer_pointer, size_t size_in_bytes) const { return allocate_(buffer_pointer, size_in_bytes); } diff --git a/source/bringauto/modules/StatusAggregator.cpp b/source/bringauto/modules/StatusAggregator.cpp index 4c56e939..2c4ebd8d 100644 --- a/source/bringauto/modules/StatusAggregator.cpp +++ b/source/bringauto/modules/StatusAggregator.cpp @@ -9,8 +9,8 @@ namespace bringauto::modules { using log = settings::Logger; -int StatusAggregator::clear_device(const structures::DeviceIdentification &device) { - if(is_device_valid(device) == NOT_OK) { +int StatusAggregator::clearDeviceUnlocked(const structures::DeviceIdentification &device) { + if(isDeviceValidUnlocked(device) == NOT_OK) { return DEVICE_NOT_REGISTERED; } auto &deviceState = devices.at(device); @@ -21,6 +21,11 @@ int StatusAggregator::clear_device(const structures::DeviceIdentification &devic return OK; } +int StatusAggregator::clear_device(const structures::DeviceIdentification &device) { + std::lock_guard lock(devicesMutex_); + return clearDeviceUnlocked(device); +} + Buffer StatusAggregator::aggregateStatus(const structures::StatusAggregatorDeviceState &deviceState, const Buffer &status, const unsigned int &device_type) const { @@ -59,25 +64,30 @@ int StatusAggregator::destroy_status_aggregator() { } int StatusAggregator::clear_all_devices() { + std::lock_guard lock(devicesMutex_); for(auto &[key, device]: devices) { - clear_device(key); + clearDeviceUnlocked(key); } return OK; } int StatusAggregator::remove_device(const structures::DeviceIdentification& device) { - if(is_device_valid(device) == NOT_OK) { + std::lock_guard lock(devicesMutex_); + if(isDeviceValidUnlocked(device) == NOT_OK) { return DEVICE_NOT_REGISTERED; } - clear_device(device); + clearDeviceUnlocked(device); - // WUT, maybe becose there can be multiple context running so by this we ensure no rece condition? - boost::asio::post(context_->ioContext, [this, device]() { devices.erase(device); }); + boost::asio::post(context_->ioContext, [this, device]() { + std::lock_guard postLock(devicesMutex_); + devices.erase(device); + }); return OK; } int StatusAggregator::add_status_to_aggregator(const Buffer& status, const structures::DeviceIdentification& device) { + std::lock_guard lock(devicesMutex_); const auto &device_type = device.getDeviceType(); if(is_device_type_supported(device_type) == NOT_OK) { log::logError("Trying to add status to unsupported device type: {}", device_type); @@ -98,10 +108,16 @@ int StatusAggregator::add_status_to_aggregator(const Buffer& status, deviceTimeouts_[deviceId]++; return force_aggregation_on_device(deviceId); }; - devices.emplace( - device, structures::StatusAggregatorDeviceState(context_, timeouted_force_aggregation, device, commandBuffer, status)); + devices.try_emplace(device, context_, timeouted_force_aggregation, device, commandBuffer, status); - force_aggregation_on_device(device); + const int forwardOnReceive = module_->forwardCommandOnReceive(device_type); + log::logInfo("forwardCommandOnReceive for device {} (type={}): rc={}", device.convertToString(), device_type, forwardOnReceive); + if(forwardOnReceive == OK) { + devices.at(device).enableImmediateCommandForwarding(); + log::logInfo("Immediate command forwarding ENABLED for device {}", device.convertToString()); + } + + forceAggregationOnDeviceUnlocked(device); return 1; } @@ -119,7 +135,8 @@ int StatusAggregator::add_status_to_aggregator(const Buffer& status, int StatusAggregator::get_aggregated_status(Buffer &generated_status, const structures::DeviceIdentification& device) { - if(is_device_valid(device) == NOT_OK) { + std::lock_guard lock(devicesMutex_); + if(isDeviceValidUnlocked(device) == NOT_OK) { log::logError("Trying to get aggregated status from unregistered device"); return DEVICE_NOT_REGISTERED; } @@ -135,6 +152,7 @@ int StatusAggregator::get_aggregated_status(Buffer &generated_status, } int StatusAggregator::get_unique_devices(std::list &unique_devices_list) { + std::lock_guard lock(devicesMutex_); const auto devicesSize = devices.size(); if (devicesSize == 0) { return 0; @@ -147,8 +165,8 @@ int StatusAggregator::get_unique_devices(std::listgetModuleNumber(); } int StatusAggregator::update_command(const Buffer& command, const structures::DeviceIdentification& device) { + std::lock_guard lock(devicesMutex_); const auto &device_type = device.getDeviceType(); if(is_device_type_supported(device_type) == NOT_OK) { log::logError("Device type {} is not supported", device_type); @@ -189,11 +218,12 @@ int StatusAggregator::update_command(const Buffer& command, const structures::De if (devices.at(device).addExternalCommand(command) == NOT_OK) { log::logError("External command queue is full for device: {} deleting oldest command", device.convertToString()); } + return OK; } -int StatusAggregator::get_command(const Buffer& status, const structures::DeviceIdentification& device, - Buffer& command) { +int StatusAggregator::getCommandUnlocked(const Buffer& status, const structures::DeviceIdentification& device, + Buffer& command) { const auto &device_type = device.getDeviceType(); if(is_device_type_supported(device_type) == NOT_OK) { log::logError("Device type {} is not supported", device_type); @@ -201,7 +231,11 @@ int StatusAggregator::get_command(const Buffer& status, const structures::Device } auto &deviceState = devices.at(device); - if (module_->generateCommand(command, status, deviceState.getStatus(), deviceState.getCommand(), device_type) != OK) { + auto currentCommand = deviceState.consumeCommand(); + if (!currentCommand.has_value()) { + return NO_MESSAGE_AVAILABLE; + } + if (module_->generateCommand(command, status, deviceState.getStatus(), *currentCommand, device_type) != OK) { log::logError("Error occurred while generating command for device: {}", device.convertToString()); return COMMAND_INVALID; } @@ -210,6 +244,22 @@ int StatusAggregator::get_command(const Buffer& status, const structures::Device return OK; } +int StatusAggregator::get_command(const Buffer& status, const structures::DeviceIdentification& device, + Buffer& command) { + std::lock_guard lock(devicesMutex_); + return getCommandUnlocked(status, device, command); +} + +int StatusAggregator::get_command_for_forwarding(const structures::DeviceIdentification &device, Buffer &command) { + std::lock_guard lock(devicesMutex_); + if(isDeviceValidUnlocked(device) == NOT_OK) { + log::logError("Trying to get forwarding command for unregistered device: {}", device.convertToString()); + return DEVICE_NOT_REGISTERED; + } + const auto &cachedStatus = devices.at(device).getStatus(); + return getCommandUnlocked(cachedStatus, device, command); +} + int StatusAggregator::is_device_type_supported(unsigned int device_type) { return module_->isDeviceTypeSupported(device_type); } @@ -223,6 +273,7 @@ bool StatusAggregator::getTimeoutedMessageReady() const { } int StatusAggregator::getDeviceTimeoutCount(const structures::DeviceIdentification &device) const { + std::lock_guard lock(devicesMutex_); if(const auto it = deviceTimeouts_.find(device); it != deviceTimeouts_.end()) { return it->second; } diff --git a/source/bringauto/structures/InternalClientMessage.cpp b/source/bringauto/structures/InternalClientMessage.cpp index 4c905687..d81f69d0 100644 --- a/source/bringauto/structures/InternalClientMessage.cpp +++ b/source/bringauto/structures/InternalClientMessage.cpp @@ -12,8 +12,16 @@ bool InternalClientMessage::disconnected() const { return disconnect_; } +bool InternalClientMessage::isCommandForward() const noexcept { + return commandForward_; +} + const DeviceIdentification &InternalClientMessage::getDeviceId() const { return deviceId_; } +InternalClientMessage InternalClientMessage::makeCommandForward(const DeviceIdentification &deviceId) { + return InternalClientMessage { deviceId, true }; +} + } diff --git a/source/bringauto/structures/StatusAggregatorDeviceState.cpp b/source/bringauto/structures/StatusAggregatorDeviceState.cpp index 6da352f9..e2a5d015 100644 --- a/source/bringauto/structures/StatusAggregatorDeviceState.cpp +++ b/source/bringauto/structures/StatusAggregatorDeviceState.cpp @@ -33,10 +33,15 @@ void StatusAggregatorDeviceState::setDefaultCommand(const modules::Buffer &comma defaultCommand_ = commandBuffer; } -const modules::Buffer &StatusAggregatorDeviceState::getCommand() { +std::optional StatusAggregatorDeviceState::consumeCommand() { + std::lock_guard lock { externalCommandMutex_ }; if (!externalCommandQueue_.empty()) { defaultCommand_ = externalCommandQueue_.front(); externalCommandQueue_.pop(); + return defaultCommand_; + } + if (forwardCommandImmediately_) { + return std::nullopt; } return defaultCommand_; } @@ -46,6 +51,7 @@ std::queue &StatusAggregatorDeviceState::aggregatedMessa } int StatusAggregatorDeviceState::addExternalCommand(const modules::Buffer &commandBuffer) { + std::lock_guard lock { externalCommandMutex_ }; externalCommandQueue_.push(commandBuffer); if (externalCommandQueue_.size() > settings::max_external_commands) { externalCommandQueue_.pop(); @@ -54,4 +60,12 @@ int StatusAggregatorDeviceState::addExternalCommand(const modules::Buffer &comma return OK; } +void StatusAggregatorDeviceState::enableImmediateCommandForwarding() noexcept { + forwardCommandImmediately_ = true; +} + +bool StatusAggregatorDeviceState::isForwardCommandImmediately() const noexcept { + return forwardCommandImmediately_; +} + }