Skip to content
Open
7 changes: 5 additions & 2 deletions include/bringauto/external_client/ExternalClient.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ class ExternalClient {

ExternalClient(const std::shared_ptr<structures::GlobalContext> &context,
structures::ModuleLibrary &moduleLibrary,
const std::shared_ptr<structures::AtomicQueue<structures::InternalClientMessage>> &toExternalQueue);
const std::shared_ptr<structures::AtomicQueue<structures::InternalClientMessage>> &toExternalQueue,
const std::shared_ptr<structures::AtomicQueue<structures::InternalClientMessage>> &commandForwardingQueue);

/**
* @brief Initialize connections, error aggregators
Expand Down Expand Up @@ -95,10 +96,12 @@ class ExternalClient {
std::unordered_map<unsigned int, std::reference_wrapper<connection::ExternalConnection>> externalConnectionMap_ {};
/// List of external connections, each device can have its own connection or multiple devices can share one connection
std::list<connection::ExternalConnection> externalConnectionsList_ {};
/// Queue for messages from module handler to external client to be sent to external server
/// Queue for messages from module handler to external client to be sent to external server
std::shared_ptr<structures::AtomicQueue<structures::InternalClientMessage>> toExternalQueue_;
/// Queue for device commands received by external client to module handler
std::shared_ptr<structures::AtomicQueue<InternalProtocol::DeviceCommand>> fromExternalQueue_ {};
/// Queue shared with ModuleHandler; used to push command-forward events for immediate dispatch
std::shared_ptr<structures::AtomicQueue<structures::InternalClientMessage>> commandForwardingQueue_ {};

std::shared_ptr<structures::AtomicQueue<structures::ReconnectQueueItem>> reconnectQueue_ {};

Expand Down
12 changes: 12 additions & 0 deletions include/bringauto/modules/IModuleManagerLibraryHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <bringauto/modules/Buffer.hpp>

#include <fleet_protocol/common_headers/general_error_codes.h>
#include <fleet_protocol/common_headers/memory_management.h>

#include <functional>
Expand Down Expand Up @@ -103,6 +104,17 @@ class IModuleManagerLibraryHandler {
*/
virtual int commandDataValid(const Buffer &command, unsigned int device_type) const = 0;

/**
* @brief Determine whether Module Gateway should forward a received command to the device
* immediately upon receipt, without waiting for the next status message.
*
* Optional — modules that do not implement this default to NOT_OK (existing behaviour).
*
* @param device_type device type
* @return OK to forward immediately, NOT_OK to forward on next status
*/
virtual int forwardCommandOnReceive(unsigned int /*device_type*/) { return NOT_OK; }

/**
* @brief Constructs a buffer with the given size
*
Expand Down
25 changes: 21 additions & 4 deletions include/bringauto/modules/ModuleHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ class ModuleHandler {
const std::shared_ptr <structures::GlobalContext> &context,
structures::ModuleLibrary &moduleLibrary,
const std::shared_ptr <structures::AtomicQueue<structures::InternalClientMessage>> &fromInternalQueue,
const std::shared_ptr <structures::AtomicQueue<structures::InternalClientMessage>> &commandForwardingQueue,
const std::shared_ptr <structures::AtomicQueue<structures::ModuleHandlerMessage>> &toInternalQueue,
const std::shared_ptr <structures::AtomicQueue<structures::InternalClientMessage>> &toExternalQueue)
: context_ { context }, moduleLibrary_ { moduleLibrary }, fromInternalQueue_ { fromInternalQueue },
toInternalQueue_ { toInternalQueue },
toExternalQueue_ { toExternalQueue } {}
: context_ { context }, moduleLibrary_ { moduleLibrary },
fromInternalQueue_ { fromInternalQueue }, commandForwardingQueue_ { commandForwardingQueue },
toInternalQueue_ { toInternalQueue }, toExternalQueue_ { toExternalQueue } {}

/**
* @brief Start Module handler
Expand All @@ -47,6 +48,12 @@ class ModuleHandler {
*/
void handleMessages() const;

/**
* @brief Process command-forward events from ExternalClient independently of handleMessages.
* Runs in its own thread started by run().
*/
void handleCommandForwards() const;

/**
* @brief Check if there are any timeouted messages
*
Expand Down Expand Up @@ -91,6 +98,14 @@ class ModuleHandler {
*/
void handleStatus(const InternalProtocol::DeviceStatus &status) const;

/**
* @brief Forward a pending command to a device immediately, using the cached status.
* Triggered when ExternalClient receives a command for a module with forward_command_on_receive enabled.
*
* @param deviceId device that has a command pending in the external command queue
*/
void handleCommandForward(const structures::DeviceIdentification &deviceId) const;

/**
* @brief Throws an error if external queue size is too big
*/
Expand All @@ -99,8 +114,10 @@ class ModuleHandler {
std::shared_ptr <structures::GlobalContext> context_ {};

structures::ModuleLibrary &moduleLibrary_;
/// Queue for incoming messages from internal server to be processed
/// Queue for incoming messages from internal server (connect/status/disconnect)
std::shared_ptr <structures::AtomicQueue<structures::InternalClientMessage>> fromInternalQueue_ {};
/// Queue for command-forward events from external client
std::shared_ptr <structures::AtomicQueue<structures::InternalClientMessage>> commandForwardingQueue_ {};
/// Queue for outgoing messages to internal server to be forwarded to devices
std::shared_ptr <structures::AtomicQueue<structures::ModuleHandlerMessage>> toInternalQueue_ {};
/// Queue for outgoing messages to external server to be forwarded to external server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ class ModuleManagerLibraryHandlerAsync : public IModuleManagerLibraryHandler {

int commandDataValid(const Buffer &command, unsigned int device_type) const override;

int forwardCommandOnReceive(unsigned int device_type) override;

/**
* @brief Constructs a buffer with the given size
*
Expand Down
15 changes: 14 additions & 1 deletion include/bringauto/modules/ModuleManagerLibraryHandlerLocal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

namespace bringauto::modules {

/// Generic function pointer type used as the return type of dlsym lookups.
using FunctionPtr = void(*)();

/**
* @brief Class used to load and handle library created by module maintainer
*/
Expand Down Expand Up @@ -53,6 +56,8 @@ class ModuleManagerLibraryHandlerLocal : public IModuleManagerLibraryHandler {

int commandDataValid(const Buffer &command, unsigned int device_type) const override;

int forwardCommandOnReceive(unsigned int device_type) override;

/**
* @brief Constructs a buffer with the given size
*
Expand All @@ -67,7 +72,13 @@ class ModuleManagerLibraryHandlerLocal : public IModuleManagerLibraryHandler {

void deallocate(struct buffer *buffer) const;

void *checkFunction(const char *functionName) const;
FunctionPtr checkFunction(const char *functionName) const;

/**
* @brief Look up an optional symbol in the loaded library.
* @return function pointer, or nullptr if the symbol is not exported
*/
FunctionPtr checkOptionalFunction(const char *functionName) const;

/**
* @brief Constructs a buffer with the same raw c buffer as provided
Expand All @@ -93,6 +104,8 @@ class ModuleManagerLibraryHandlerLocal : public IModuleManagerLibraryHandler {
std::function<int(struct buffer *, struct buffer, struct buffer, struct buffer, unsigned int)> generateCommand_ {};
std::function<int(struct buffer *, size_t)> allocate_ {};
std::function<void(struct buffer *)> deallocate_ {};
/// Optional — nullptr when the module does not export forward_command_on_receive
std::function<int(unsigned int)> forwardCommandOnReceive_ {};
};

}
48 changes: 48 additions & 0 deletions include/bringauto/modules/StatusAggregator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <unordered_map>
#include <list>
#include <mutex>



Expand Down Expand Up @@ -115,6 +116,16 @@ class StatusAggregator {
int get_command(const Buffer& status, const structures::DeviceIdentification& device,
Buffer &command);

/**
* @brief Get a command for immediate forwarding, using the cached status as input.
* Called when a command arrived and the module requested immediate forwarding.
*
* @param device device identification
* @param command output buffer for the generated command
* @return OK on success, DEVICE_NOT_REGISTERED or COMMAND_INVALID on error
*/
int get_command_for_forwarding(const structures::DeviceIdentification& device, Buffer& command);

/**
* @short Get number of the module
*
Expand Down Expand Up @@ -152,6 +163,40 @@ class StatusAggregator {

private:

/**
* @brief Check if device is valid without acquiring devicesMutex_. Caller must hold the lock.
*
* @param device device identification
* @return OK if device is registered and its type is supported, NOT_OK otherwise
*/
int isDeviceValidUnlocked(const structures::DeviceIdentification& device);

/**
* @brief Force status message aggregation on a device without acquiring devicesMutex_. Caller must hold the lock.
*
* @param device device identification
* @return number of queued aggregated messages on success, DEVICE_NOT_REGISTERED if device is unknown
*/
int forceAggregationOnDeviceUnlocked(const structures::DeviceIdentification& device);

/**
* @brief Clear state and messages for a device without acquiring devicesMutex_. Caller must hold the lock.
*
* @param device device identification
* @return OK on success, DEVICE_NOT_REGISTERED if device is unknown
*/
int clearDeviceUnlocked(const structures::DeviceIdentification& device);

/**
* @brief Get command for a device without acquiring devicesMutex_. Caller must hold the lock.
*
* @param status current status buffer
* @param device device identification
* @param command output buffer for the generated command
* @return OK on success, DEVICE_NOT_SUPPORTED, NO_MESSAGE_AVAILABLE, or COMMAND_INVALID on error
*/
int getCommandUnlocked(const Buffer& status, const structures::DeviceIdentification& device, Buffer& command);

/**
* @brief Aggregate status message
*
Expand Down Expand Up @@ -198,6 +243,9 @@ class StatusAggregator {
*/
std::unordered_map<structures::DeviceIdentification, int> deviceTimeouts_ {};

/// Protects devices and deviceTimeouts_ against concurrent access from ModuleHandler threads and ExternalClient
mutable std::mutex devicesMutex_ {};

std::atomic_bool timeoutedMessageReady_ { false };
};

Expand Down
20 changes: 20 additions & 0 deletions include/bringauto/structures/InternalClientMessage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@ class InternalClientMessage {
deviceId_ { deviceId }
{}

/**
* @brief Create a command-forward event. Used by ExternalClient to wake up ModuleHandler
* for immediate command dispatch when the module requested forward_command_on_receive.
*
* @param deviceId device that has a pending command to forward
* @return InternalClientMessage tagged as command-forward
*/
static InternalClientMessage makeCommandForward(const DeviceIdentification &deviceId);

explicit InternalClientMessage(bool disconnect, const InternalProtocol::InternalClient &message):
message_ { message },
disconnect_ { disconnect }
Expand All @@ -44,6 +53,11 @@ class InternalClientMessage {
*/
bool disconnected() const;

/**
* @brief Returns true if this message is a command-forward event (no protobuf payload).
*/
[[nodiscard]] bool isCommandForward() const noexcept;

/**
* @brief Get device identification struct
*
Expand All @@ -52,10 +66,16 @@ class InternalClientMessage {
const DeviceIdentification &getDeviceId() const;

private:
InternalClientMessage(const DeviceIdentification &deviceId, bool commandForward)
: disconnect_ { false }, commandForward_ { commandForward }, deviceId_ { deviceId }
{}

/// Internal client message
InternalProtocol::InternalClient message_ {};
/// True if device is disconnected otherwise false
bool disconnect_;
/// True if this is a command-forward event (no protobuf payload)
bool commandForward_ { false };
/// Device identification struct
DeviceIdentification deviceId_ {};
};
Expand Down
32 changes: 26 additions & 6 deletions include/bringauto/structures/StatusAggregatorDeviceState.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include <bringauto/structures/DeviceIdentification.hpp>
#include <bringauto/modules/Buffer.hpp>

#include <mutex>
#include <optional>
#include <queue>
#include <memory>

Expand Down Expand Up @@ -54,14 +56,17 @@ class StatusAggregatorDeviceState {
void setDefaultCommand(const modules::Buffer &commandBuffer);

/**
* @brief Gets the most relevant command buffer.
* If there are external commands in the queue, they will be used first.
* Otherwise, the default command buffer will be used.
* Commands received from the queue are removed from it.
* @brief Consumes and returns the most relevant command buffer.
* If there are external commands in the queue, they will be dequeued and returned.
* For push-only devices (forwardCommandOnReceive == OK) with an empty queue,
* returns std::nullopt — the caller must send an empty DeviceCommand to satisfy the
* InternalProtocol response requirement without forwarding anything to the vehicle.
* For pull devices, falls back to the default command buffer when the queue is empty.
* Commands dequeued from the external queue are removed from it.
*
* @return const Buffer&
* @return command buffer, or std::nullopt when push-only device has no pending command
*/
[[nodiscard]] const modules::Buffer &getCommand();
[[nodiscard]] std::optional<modules::Buffer> consumeCommand();

/**
* @brief Get aggregated messages queue
Expand All @@ -79,6 +84,17 @@ class StatusAggregatorDeviceState {
*/
int addExternalCommand(const modules::Buffer &commandBuffer);

/**
* @brief Mark this device as requiring immediate command forwarding.
* Called once at device registration based on the module's forwardCommandOnReceive response.
*/
void enableImmediateCommandForwarding() noexcept;

/**
* @brief Returns true if this device requires commands to be forwarded immediately upon receipt.
*/
[[nodiscard]] bool isForwardCommandImmediately() const noexcept;

private:
std::unique_ptr<ThreadTimer> timer_ {};

Expand All @@ -88,7 +104,11 @@ class StatusAggregatorDeviceState {

modules::Buffer defaultCommand_ {};

std::mutex externalCommandMutex_ {};

std::queue<modules::Buffer> externalCommandQueue_ {};

bool forwardCommandImmediately_ { false };
};

}
7 changes: 4 additions & 3 deletions main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,13 @@ int main(int argc, char **argv) {

auto toInternalQueue = std::make_shared<bas::AtomicQueue<bas::ModuleHandlerMessage >>();
auto fromInternalQueue = std::make_shared<bas::AtomicQueue<bas::InternalClientMessage >>();
auto commandForwardingQueue = std::make_shared<bas::AtomicQueue<bas::InternalClientMessage >>();
auto toExternalQueue = std::make_shared<bas::AtomicQueue<bas::InternalClientMessage >>();

bais::InternalServer internalServer { context, fromInternalQueue, toInternalQueue };
bringauto::modules::ModuleHandler moduleHandler { context, moduleLibrary, fromInternalQueue, toInternalQueue,
toExternalQueue };
bringauto::external_client::ExternalClient externalClient { context, moduleLibrary, toExternalQueue };
bringauto::modules::ModuleHandler moduleHandler { context, moduleLibrary, fromInternalQueue,
commandForwardingQueue, toInternalQueue, toExternalQueue };
bringauto::external_client::ExternalClient externalClient { context, moduleLibrary, toExternalQueue, commandForwardingQueue };

std::jthread moduleHandlerThread([&moduleHandler]() { moduleHandler.run(); });
std::jthread externalClientThread([&externalClient]() { externalClient.run(); });
Expand Down
14 changes: 11 additions & 3 deletions source/bringauto/external_client/ExternalClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <bringauto/settings/LoggerId.hpp>

#include <fleet_protocol/common_headers/general_error_codes.h>
#include <fleet_protocol/module_gateway/error_codes.h>

#include <boost/date_time/posix_time/posix_time.hpp>

Expand All @@ -20,8 +21,10 @@ namespace ip = InternalProtocol;

ExternalClient::ExternalClient(const std::shared_ptr<structures::GlobalContext> &context,
structures::ModuleLibrary &moduleLibrary,
const std::shared_ptr<structures::AtomicQueue<structures::InternalClientMessage>> &toExternalQueue):
const std::shared_ptr<structures::AtomicQueue<structures::InternalClientMessage>> &toExternalQueue,
const std::shared_ptr<structures::AtomicQueue<structures::InternalClientMessage>> &commandForwardingQueue):
toExternalQueue_ { toExternalQueue },
commandForwardingQueue_ { commandForwardingQueue },
context_ { context },
moduleLibrary_ { moduleLibrary },
timer_ { context->ioContext } {
Expand Down Expand Up @@ -65,9 +68,14 @@ void ExternalClient::handleCommand(const InternalProtocol::DeviceCommand &device


const auto deviceId = structures::DeviceIdentification(device);
int ret = it->second->update_command(commandBuffer, deviceId);
const int ret = it->second->update_command(commandBuffer, deviceId);
if (ret == OK) {
settings::Logger::logInfo("Command for device {} was added to queue", device.devicename());
if (moduleLibraryHandler->forwardCommandOnReceive(deviceId.getDeviceType()) == OK) {
settings::Logger::logInfo("Command for device {} was added to queue, forwarding immediately", device.devicename());
commandForwardingQueue_->pushAndNotify(structures::InternalClientMessage::makeCommandForward(deviceId));
} else {
settings::Logger::logInfo("Command for device {} was added to queue", device.devicename());
}
}
}

Expand Down
Loading