diff --git a/include/transport/event_driven_tcp_transport.h b/include/transport/event_driven_tcp_transport.h new file mode 100644 index 0000000000..82e7026237 --- /dev/null +++ b/include/transport/event_driven_tcp_transport.h @@ -0,0 +1,102 @@ +/******************************************************************************** + * Copyright (c) 2025 Vinicius Tadeu Zein + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#ifndef SOMEIP_TRANSPORT_EVENT_DRIVEN_TCP_TRANSPORT_H +#define SOMEIP_TRANSPORT_EVENT_DRIVEN_TCP_TRANSPORT_H + +#include "transport/transport.h" +#include "transport/tcp_socket_adapter.h" +#include "platform/thread.h" +#include +#include +#include + +namespace someip { +namespace transport { + +/** + * @brief Configuration for event-driven TCP transport (stream reassembly limits). + */ +struct EventDrivenTcpTransportConfig { + size_t max_receive_buffer{65536}; +}; + +/** + * @brief ITransport implementation driven by an ITcpSocketAdapter. + * + * Unlike EventDrivenUdpTransport, TCP requires explicit lifecycle steps + * that are not part of the ITransport interface. Callers must use the + * concrete type to call initialize(), and optionally enable_server_mode() + * / try_accept_connection() for server-side usage, before calling start(). + */ +class EventDrivenTcpTransport : public ITransport { +public: + explicit EventDrivenTcpTransport(ITcpSocketAdapter& adapter, + const EventDrivenTcpTransportConfig& config = EventDrivenTcpTransportConfig()); + + ~EventDrivenTcpTransport() override; + + EventDrivenTcpTransport(const EventDrivenTcpTransport&) = delete; + EventDrivenTcpTransport& operator=(const EventDrivenTcpTransport&) = delete; + + /** @brief Concrete-type-only: bind the adapter to a local endpoint. */ + [[nodiscard]] Result initialize(const Endpoint& local_endpoint); + + /** @brief Concrete-type-only: switch to server mode after initialize(). */ + [[nodiscard]] Result enable_server_mode(int backlog = 5); + + /** + * @brief Concrete-type-only: non-blocking accept (server mode). + * Adapter invokes connected/disconnected callbacks on completion. + */ + [[nodiscard]] Result try_accept_connection(Endpoint& remote_out); + + [[nodiscard]] Result send_message(const Message& message, const Endpoint& endpoint) override; + MessagePtr receive_message() override; + Result connect(const Endpoint& endpoint) override; + Result disconnect() override; + bool is_connected() const override; + Endpoint get_local_endpoint() const override; + void set_listener(ITransportListener* listener) override; + Result start() override; + Result stop() override; + bool is_running() const override; + +private: + void on_adapter_receive(const std::vector& data); + void on_adapter_connected(const Endpoint& remote); + void on_adapter_disconnected(); + bool parse_message_from_buffer(std::vector& buffer, MessagePtr& message); + + ITcpSocketAdapter& adapter_; + EventDrivenTcpTransportConfig config_; + Endpoint local_endpoint_; + Endpoint connection_remote_; + std::atomic listener_{nullptr}; + + std::atomic running_{false}; + std::atomic initialized_{false}; + std::atomic server_mode_{false}; + + std::vector receive_buffer_; + std::queue> message_queue_; + platform::Mutex queue_mutex_; + + static const size_t SOMEIP_HEADER_SIZE; + static const size_t MAX_MESSAGE_SIZE; +}; + +} // namespace transport +} // namespace someip + +#endif // SOMEIP_TRANSPORT_EVENT_DRIVEN_TCP_TRANSPORT_H diff --git a/include/transport/event_driven_udp_transport.h b/include/transport/event_driven_udp_transport.h new file mode 100644 index 0000000000..c584d518e4 --- /dev/null +++ b/include/transport/event_driven_udp_transport.h @@ -0,0 +1,87 @@ +/******************************************************************************** + * Copyright (c) 2025 Vinicius Tadeu Zein + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#ifndef SOMEIP_TRANSPORT_EVENT_DRIVEN_UDP_TRANSPORT_H +#define SOMEIP_TRANSPORT_EVENT_DRIVEN_UDP_TRANSPORT_H + +#include "transport/transport.h" +#include "transport/multicast_transport.h" +#include "transport/udp_socket_adapter.h" +#include "platform/thread.h" +#include +#include +#include + +namespace someip { +namespace transport { + +/** + * @brief Configuration for event-driven UDP transport. + */ +struct EventDrivenUdpTransportConfig { + std::string multicast_interface{}; + size_t max_message_size{1400}; +}; + +/** + * @brief ITransport implementation driven by an IUdpSocketAdapter. + */ +class EventDrivenUdpTransport : public ITransport, public IMulticastTransport { +public: + explicit EventDrivenUdpTransport(IUdpSocketAdapter& adapter, + const Endpoint& local_endpoint, + const EventDrivenUdpTransportConfig& config = EventDrivenUdpTransportConfig()); + + ~EventDrivenUdpTransport() override; + + EventDrivenUdpTransport(const EventDrivenUdpTransport&) = delete; + EventDrivenUdpTransport& operator=(const EventDrivenUdpTransport&) = delete; + + /** @brief Check whether the transport was constructed with a valid endpoint. */ + [[nodiscard]] bool is_valid() const { return local_endpoint_.is_valid(); } + + [[nodiscard]] Result send_message(const Message& message, const Endpoint& endpoint) override; + MessagePtr receive_message() override; + Result connect(const Endpoint& endpoint) override; + Result disconnect() override; + bool is_connected() const override; + Endpoint get_local_endpoint() const override; + void set_listener(ITransportListener* listener) override; + Result start() override; + Result stop() override; + bool is_running() const override; + + Result join_multicast_group(const std::string& multicast_address) override; + Result leave_multicast_group(const std::string& multicast_address) override; + +private: + void on_adapter_receive(const std::vector& data, const Endpoint& sender); + static bool is_multicast_ipv4(const std::string& address); + + IUdpSocketAdapter& adapter_; + Endpoint local_endpoint_; + EventDrivenUdpTransportConfig config_; + std::atomic running_{false}; + std::atomic opened_{false}; + std::atomic listener_{nullptr}; + + std::queue receive_queue_; + platform::Mutex queue_mutex_; + + static constexpr size_t MAX_UDP_PAYLOAD = 65507; +}; + +} // namespace transport +} // namespace someip + +#endif // SOMEIP_TRANSPORT_EVENT_DRIVEN_UDP_TRANSPORT_H diff --git a/include/transport/multicast_transport.h b/include/transport/multicast_transport.h new file mode 100644 index 0000000000..73f603b761 --- /dev/null +++ b/include/transport/multicast_transport.h @@ -0,0 +1,41 @@ +/******************************************************************************** + * Copyright (c) 2025 Vinicius Tadeu Zein + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#ifndef SOMEIP_TRANSPORT_MULTICAST_TRANSPORT_H +#define SOMEIP_TRANSPORT_MULTICAST_TRANSPORT_H + +#include "common/result.h" +#include + +namespace someip { +namespace transport { + +/** + * @brief Interface for transports that support IPv4 multicast group management. + * + * Both UdpTransport (BSD sockets) and EventDrivenUdpTransport (adapter-based) + * implement this so that SD and other callers can join/leave multicast groups + * without knowing the concrete transport type. + */ +class IMulticastTransport { +public: + virtual ~IMulticastTransport() = default; + + [[nodiscard]] virtual Result join_multicast_group(const std::string& multicast_address) = 0; + [[nodiscard]] virtual Result leave_multicast_group(const std::string& multicast_address) = 0; +}; + +} // namespace transport +} // namespace someip + +#endif // SOMEIP_TRANSPORT_MULTICAST_TRANSPORT_H diff --git a/include/transport/tcp_socket_adapter.h b/include/transport/tcp_socket_adapter.h new file mode 100644 index 0000000000..af842777a0 --- /dev/null +++ b/include/transport/tcp_socket_adapter.h @@ -0,0 +1,95 @@ +/******************************************************************************** + * Copyright (c) 2025 Vinicius Tadeu Zein + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#ifndef SOMEIP_TRANSPORT_TCP_SOCKET_ADAPTER_H +#define SOMEIP_TRANSPORT_TCP_SOCKET_ADAPTER_H + +#include "transport/endpoint.h" +#include "common/result.h" +#include +#include + +namespace someip { +namespace transport { + +/** + * @brief Invoked when payload bytes arrive on the established connection. + */ +using TcpReceiveCallback = std::function& data)>; + +/** + * @brief Invoked when the connection is ready (outgoing connect or accepted peer). + */ +using TcpConnectedCallback = std::function; + +/** + * @brief Invoked when the connection is closed or reset. + */ +using TcpDisconnectedCallback = std::function; + +/** + * @brief TCP socket abstraction for event-driven SOME/IP transport. + * + * Implemented by integrators using non-BSD stacks. The adapter owns the + * semantics of connect/accept; it must invoke callbacks from its event context. + */ +class ITcpSocketAdapter { +public: + virtual ~ITcpSocketAdapter() = default; + + /** + * @brief Create socket bound to the local endpoint (listening or pre-connect). + */ + [[nodiscard]] virtual Result open(const Endpoint& local_endpoint) = 0; + + virtual void close() = 0; + + /** + * @brief Start listening after open (server mode). No-op or NOT_IMPLEMENTED for client-only adapters. + */ + [[nodiscard]] virtual Result listen(int backlog) = 0; + + /** + * @brief Connect to a remote endpoint (client mode). + */ + [[nodiscard]] virtual Result connect(const Endpoint& remote_endpoint) = 0; + + /** + * @brief Accept one pending connection if available (non-blocking from SOME/IP's perspective). + * @param remote_out Peer endpoint when Result::SUCCESS + */ + [[nodiscard]] virtual Result accept(Endpoint& remote_out) = 0; + + /** + * @brief Send bytes on the active connection. + */ + [[nodiscard]] virtual Result send(const std::vector& data) = 0; + + /** + * Quiescence guarantee: after any set_*_callback(nullptr) returns, the + * adapter must not invoke that previously registered callback. + * Implementations must ensure no in-flight callback is executing when + * the setter returns. + */ + virtual void set_receive_callback(TcpReceiveCallback callback) = 0; + virtual void set_connected_callback(TcpConnectedCallback callback) = 0; + virtual void set_disconnected_callback(TcpDisconnectedCallback callback) = 0; + + [[nodiscard]] virtual Endpoint get_local_endpoint() const = 0; + [[nodiscard]] virtual bool is_connected() const = 0; +}; + +} // namespace transport +} // namespace someip + +#endif // SOMEIP_TRANSPORT_TCP_SOCKET_ADAPTER_H diff --git a/include/transport/udp_socket_adapter.h b/include/transport/udp_socket_adapter.h new file mode 100644 index 0000000000..94028b7760 --- /dev/null +++ b/include/transport/udp_socket_adapter.h @@ -0,0 +1,94 @@ +/******************************************************************************** + * Copyright (c) 2025 Vinicius Tadeu Zein + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#ifndef SOMEIP_TRANSPORT_UDP_SOCKET_ADAPTER_H +#define SOMEIP_TRANSPORT_UDP_SOCKET_ADAPTER_H + +#include "transport/endpoint.h" +#include "common/result.h" +#include +#include +#include + +namespace someip { +namespace transport { + +/** + * @brief Callback invoked by the adapter when a datagram is received. + * + * Integrators call this from their I/O event path after a packet is available. + * The payload is the raw UDP payload (one datagram). + */ +using UdpReceiveCallback = std::function& data, const Endpoint& sender)>; + +/** + * @brief UDP socket abstraction for event-driven SOME/IP transport. + * + * Implemented by integrators using non-BSD stacks (e.g. custom datagram sockets). + * Must not depend on platform socket headers. + */ +class IUdpSocketAdapter { +public: + virtual ~IUdpSocketAdapter() = default; + + /** + * @brief Open and bind the local endpoint (port 0 selects an ephemeral port). + */ + [[nodiscard]] virtual Result open(const Endpoint& local_endpoint) = 0; + + /** + * @brief Close the socket and release resources. + */ + virtual void close() = 0; + + /** + * @brief Send one datagram to the destination. + */ + [[nodiscard]] virtual Result send(const std::vector& data, const Endpoint& destination) = 0; + + /** + * @brief Join an IPv4 multicast group. + * @param multicast_address Group address (e.g. 224.0.0.1) + * @param interface_address Outgoing interface address; empty uses stack default + */ + [[nodiscard]] virtual Result join_multicast(const std::string& multicast_address, + const std::string& interface_address = {}) = 0; + + /** + * @brief Leave a multicast group previously joined. + */ + [[nodiscard]] virtual Result leave_multicast(const std::string& multicast_address, + const std::string& interface_address = {}) = 0; + + /** + * @brief Register the receive callback (nullptr clears). + * + * The adapter must invoke the callback for each received datagram from the + * integrator's event loop or I/O thread. + * + * Quiescence guarantee: after set_receive_callback(nullptr) returns, the + * adapter must not invoke any previously registered callback. Implementations + * must ensure no in-flight callback is executing when the setter returns. + */ + virtual void set_receive_callback(UdpReceiveCallback callback) = 0; + + /** + * @brief Effective local endpoint after open (required after bind with port 0). + */ + [[nodiscard]] virtual Endpoint get_local_endpoint() const = 0; +}; + +} // namespace transport +} // namespace someip + +#endif // SOMEIP_TRANSPORT_UDP_SOCKET_ADAPTER_H diff --git a/include/transport/udp_transport.h b/include/transport/udp_transport.h index d4b1d1a71e..51b5279fff 100644 --- a/include/transport/udp_transport.h +++ b/include/transport/udp_transport.h @@ -15,6 +15,7 @@ #define SOMEIP_TRANSPORT_UDP_TRANSPORT_H #include "transport/transport.h" +#include "transport/multicast_transport.h" #include "platform/net.h" #include "platform/thread.h" #include @@ -53,7 +54,7 @@ struct UdpTransportConfig { * - Blocking mode (default): More efficient, eliminates busy loops * - Non-blocking mode: Allows integration with event loops/polling */ -class UdpTransport : public ITransport { +class UdpTransport : public ITransport, public IMulticastTransport { public: /** * @brief Constructor @@ -80,9 +81,9 @@ class UdpTransport : public ITransport { Result stop() override; bool is_running() const override; - // Multicast support - Result join_multicast_group(const std::string& multicast_address); - Result leave_multicast_group(const std::string& multicast_address); + // IMulticastTransport + Result join_multicast_group(const std::string& multicast_address) override; + Result leave_multicast_group(const std::string& multicast_address) override; private: Endpoint local_endpoint_; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index fe35dac237..8a52028e29 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -10,6 +10,8 @@ set(OPENSOMEIP_SOURCES transport/endpoint.cpp transport/udp_transport.cpp transport/tcp_transport.cpp + transport/event_driven_udp_transport.cpp + transport/event_driven_tcp_transport.cpp e2e/e2e_protection.cpp e2e/e2e_header.cpp e2e/e2e_crc.cpp diff --git a/src/sd/sd_client.cpp b/src/sd/sd_client.cpp index 7270df4935..b370c62f1c 100644 --- a/src/sd/sd_client.cpp +++ b/src/sd/sd_client.cpp @@ -13,6 +13,7 @@ #include "sd/sd_client.h" #include "sd/sd_message.h" +#include "transport/multicast_transport.h" #include "transport/udp_transport.h" #include "transport/endpoint.h" #include "transport/transport.h" @@ -267,18 +268,18 @@ class SdClientImpl : public transport::ITransportListener { }; bool join_multicast_group() { - auto udp_transport = std::dynamic_pointer_cast(transport_); - if (!udp_transport) { + auto multicast = std::dynamic_pointer_cast(transport_); + if (!multicast) { return false; } - return udp_transport->join_multicast_group(config_.multicast_address) == Result::SUCCESS; + return multicast->join_multicast_group(config_.multicast_address) == Result::SUCCESS; } void leave_multicast_group() { - auto udp_transport = std::dynamic_pointer_cast(transport_); - if (udp_transport) { - udp_transport->leave_multicast_group(config_.multicast_address); + auto multicast = std::dynamic_pointer_cast(transport_); + if (multicast) { + (void)multicast->leave_multicast_group(config_.multicast_address); } } diff --git a/src/sd/sd_server.cpp b/src/sd/sd_server.cpp index 58d68dfac4..90c6c13b12 100644 --- a/src/sd/sd_server.cpp +++ b/src/sd/sd_server.cpp @@ -13,6 +13,7 @@ #include "sd/sd_server.h" #include "sd/sd_message.h" +#include "transport/multicast_transport.h" #include "transport/udp_transport.h" #include "transport/endpoint.h" #include "transport/transport.h" @@ -265,18 +266,18 @@ class SdServerImpl : public transport::ITransportListener { }; bool join_multicast_group() { - auto udp_transport = std::dynamic_pointer_cast(transport_); - if (!udp_transport) { + auto multicast = std::dynamic_pointer_cast(transport_); + if (!multicast) { return false; } - return udp_transport->join_multicast_group(config_.multicast_address) == Result::SUCCESS; + return multicast->join_multicast_group(config_.multicast_address) == Result::SUCCESS; } void leave_multicast_group() { - auto udp_transport = std::dynamic_pointer_cast(transport_); - if (udp_transport) { - udp_transport->leave_multicast_group(config_.multicast_address); + auto multicast = std::dynamic_pointer_cast(transport_); + if (multicast) { + (void)multicast->leave_multicast_group(config_.multicast_address); } } diff --git a/src/transport/event_driven_tcp_transport.cpp b/src/transport/event_driven_tcp_transport.cpp new file mode 100644 index 0000000000..352d5da5c2 --- /dev/null +++ b/src/transport/event_driven_tcp_transport.cpp @@ -0,0 +1,287 @@ +/******************************************************************************** + * Copyright (c) 2025 Vinicius Tadeu Zein + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#include "transport/event_driven_tcp_transport.h" +#include "platform/memory.h" +#include +#include + +namespace someip { +namespace transport { + +const size_t EventDrivenTcpTransport::SOMEIP_HEADER_SIZE = 16; +const size_t EventDrivenTcpTransport::MAX_MESSAGE_SIZE = 65535; + +EventDrivenTcpTransport::EventDrivenTcpTransport(ITcpSocketAdapter& adapter, + const EventDrivenTcpTransportConfig& config) + : adapter_(adapter), + config_(config) {} + +EventDrivenTcpTransport::~EventDrivenTcpTransport() { + // NOLINTNEXTLINE(clang-analyzer-optin.cplusplus.VirtualCall) + stop(); +} + +Result EventDrivenTcpTransport::initialize(const Endpoint& local_endpoint) { + if (initialized_.load()) { + return Result::INVALID_STATE; + } + + local_endpoint_ = + Endpoint(local_endpoint.get_address(), local_endpoint.get_port(), TransportProtocol::TCP); + + Result result = adapter_.open(local_endpoint_); + if (result != Result::SUCCESS) { + return result; + } + + local_endpoint_ = adapter_.get_local_endpoint(); + initialized_ = true; + return Result::SUCCESS; +} + +Result EventDrivenTcpTransport::enable_server_mode(int backlog) { + if (!initialized_.load()) { + return Result::NOT_INITIALIZED; + } + Result result = adapter_.listen(backlog); + if (result != Result::SUCCESS) { + return result; + } + server_mode_ = true; + return Result::SUCCESS; +} + +Result EventDrivenTcpTransport::try_accept_connection(Endpoint& remote_out) { + if (!server_mode_) { + return Result::INVALID_STATE; + } + if (!running_.load()) { + return Result::INVALID_STATE; + } + return adapter_.accept(remote_out); +} + +Result EventDrivenTcpTransport::send_message(const Message& message, const Endpoint& /*endpoint*/) { + if (!is_connected()) { + return Result::NOT_CONNECTED; + } + + std::vector data = message.serialize(); + return adapter_.send(data); +} + +MessagePtr EventDrivenTcpTransport::receive_message() { + platform::ScopedLock lock(queue_mutex_); + if (message_queue_.empty()) { + return nullptr; + } + MessagePtr message = message_queue_.front().first; + message_queue_.pop(); + return message; +} + +Result EventDrivenTcpTransport::connect(const Endpoint& endpoint) { + if (server_mode_) { + return Result::INVALID_STATE; + } + if (is_connected()) { + return Result::SUCCESS; + } + if (!initialized_.load()) { + return Result::NOT_INITIALIZED; + } + if (!running_.load()) { + return Result::INVALID_STATE; + } + return adapter_.connect(endpoint); +} + +Result EventDrivenTcpTransport::disconnect() { + if (!adapter_.is_connected() && !initialized_.load()) { + return Result::SUCCESS; + } + adapter_.close(); + { + platform::ScopedLock lock(queue_mutex_); + receive_buffer_.clear(); + } + initialized_ = false; + return Result::SUCCESS; +} + +bool EventDrivenTcpTransport::is_connected() const { + return adapter_.is_connected(); +} + +Endpoint EventDrivenTcpTransport::get_local_endpoint() const { + if (initialized_.load()) { + return adapter_.get_local_endpoint(); + } + return local_endpoint_; +} + +void EventDrivenTcpTransport::set_listener(ITransportListener* listener) { + listener_.store(listener, std::memory_order_release); +} + +Result EventDrivenTcpTransport::start() { + if (!initialized_.load()) { + return Result::NOT_INITIALIZED; + } + if (running_.load()) { + return Result::SUCCESS; + } + + adapter_.set_receive_callback([this](const std::vector& data) { on_adapter_receive(data); }); + adapter_.set_connected_callback([this](const Endpoint& remote) { on_adapter_connected(remote); }); + adapter_.set_disconnected_callback([this]() { on_adapter_disconnected(); }); + + running_ = true; + return Result::SUCCESS; +} + +Result EventDrivenTcpTransport::stop() { + running_ = false; + + adapter_.set_receive_callback(nullptr); + adapter_.set_connected_callback(nullptr); + adapter_.set_disconnected_callback(nullptr); + + adapter_.close(); + initialized_ = false; + server_mode_ = false; + + { + platform::ScopedLock lock(queue_mutex_); + receive_buffer_.clear(); + while (!message_queue_.empty()) { + message_queue_.pop(); + } + } + + return Result::SUCCESS; +} + +bool EventDrivenTcpTransport::is_running() const { + return running_.load(); +} + +void EventDrivenTcpTransport::on_adapter_receive(const std::vector& data) { + if (!running_.load() || !initialized_.load()) { + return; + } + + std::vector delivered; + { + platform::ScopedLock lock(queue_mutex_); + receive_buffer_.insert(receive_buffer_.end(), data.begin(), data.end()); + MessagePtr message; + while (parse_message_from_buffer(receive_buffer_, message)) { + message_queue_.push({message, connection_remote_}); + delivered.push_back(message); + } + } + + auto* cb = listener_.load(std::memory_order_acquire); + for (const MessagePtr& m : delivered) { + if (cb) { + cb->on_message_received(m, connection_remote_); + } + } +} + +void EventDrivenTcpTransport::on_adapter_connected(const Endpoint& remote) { + if (!running_.load()) { + return; + } + connection_remote_ = remote; + auto* cb = listener_.load(std::memory_order_acquire); + if (cb) { + cb->on_connection_established(remote); + } +} + +void EventDrivenTcpTransport::on_adapter_disconnected() { + if (!running_.load()) { + return; + } + Endpoint lost = connection_remote_; + { + platform::ScopedLock lock(queue_mutex_); + receive_buffer_.clear(); + } + auto* cb = listener_.load(std::memory_order_acquire); + if (cb) { + cb->on_connection_lost(lost); + } +} + +bool EventDrivenTcpTransport::parse_message_from_buffer(std::vector& buffer, MessagePtr& message) { + for (;;) { + if (buffer.size() > config_.max_receive_buffer) { + buffer.clear(); + return false; + } + + if (buffer.size() < SOMEIP_HEADER_SIZE) { + return false; + } + + uint32_t message_length = + (static_cast(buffer[4]) << 24) | (static_cast(buffer[5]) << 16) | + (static_cast(buffer[6]) << 8) | static_cast(buffer[7]); + + if (message_length < 8 || message_length > MAX_MESSAGE_SIZE) { + size_t search_start = 1; + bool found_valid_header = false; + + while (search_start + SOMEIP_HEADER_SIZE <= buffer.size()) { + uint32_t candidate_length = + (static_cast(buffer[search_start + 4]) << 24) | + (static_cast(buffer[search_start + 5]) << 16) | + (static_cast(buffer[search_start + 6]) << 8) | + static_cast(buffer[search_start + 7]); + if (candidate_length >= 8 && candidate_length <= MAX_MESSAGE_SIZE) { + buffer.erase(buffer.begin(), buffer.begin() + static_cast(search_start)); + found_valid_header = true; + break; + } + ++search_start; + } + + if (!found_valid_header) { + buffer.clear(); + return false; + } + continue; + } + + size_t total_message_size = 8 + message_length; + + if (buffer.size() < total_message_size) { + return false; + } + + std::vector message_data(buffer.begin(), buffer.begin() + static_cast(total_message_size)); + buffer.erase(buffer.begin(), buffer.begin() + static_cast(total_message_size)); + + message = platform::allocate_message(); + if (message && message->deserialize(message_data)) { + return true; + } + } +} + +} // namespace transport +} // namespace someip diff --git a/src/transport/event_driven_udp_transport.cpp b/src/transport/event_driven_udp_transport.cpp new file mode 100644 index 0000000000..ed525e6400 --- /dev/null +++ b/src/transport/event_driven_udp_transport.cpp @@ -0,0 +1,195 @@ +/******************************************************************************** + * Copyright (c) 2025 Vinicius Tadeu Zein + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#include "transport/event_driven_udp_transport.h" +#include "platform/memory.h" +#include + +namespace someip { +namespace transport { + +EventDrivenUdpTransport::EventDrivenUdpTransport(IUdpSocketAdapter& adapter, + const Endpoint& local_endpoint, + const EventDrivenUdpTransportConfig& config) + : adapter_(adapter), + local_endpoint_(local_endpoint), + config_(config) { + if (!local_endpoint_.is_valid()) { +#if defined(__cpp_exceptions) || defined(__EXCEPTIONS) + throw std::invalid_argument("Invalid local endpoint"); +#endif + } +} + +EventDrivenUdpTransport::~EventDrivenUdpTransport() { + // NOLINTNEXTLINE(clang-analyzer-optin.cplusplus.VirtualCall) + stop(); +} + +Result EventDrivenUdpTransport::send_message(const Message& message, const Endpoint& endpoint) { + if (!is_running()) { + return Result::NOT_CONNECTED; + } + if (!endpoint.is_valid()) { + return Result::INVALID_ENDPOINT; + } + + std::vector data = message.serialize(); + if (data.size() > MAX_UDP_PAYLOAD) { + return Result::BUFFER_OVERFLOW; + } + if (config_.max_message_size > 0 && data.size() > config_.max_message_size) { + return Result::BUFFER_OVERFLOW; + } + + return adapter_.send(data, endpoint); +} + +MessagePtr EventDrivenUdpTransport::receive_message() { + platform::ScopedLock lock(queue_mutex_); + if (receive_queue_.empty()) { + return nullptr; + } + MessagePtr message = receive_queue_.front(); + receive_queue_.pop(); + return message; +} + +Result EventDrivenUdpTransport::connect(const Endpoint& endpoint) { + if (!endpoint.is_valid()) { + return Result::INVALID_ENDPOINT; + } + if (endpoint.get_protocol() == TransportProtocol::MULTICAST_UDP) { + if (!is_multicast_ipv4(endpoint.get_address())) { + return Result::INVALID_ENDPOINT; + } + return adapter_.join_multicast(endpoint.get_address(), config_.multicast_interface); + } + return Result::SUCCESS; +} + +Result EventDrivenUdpTransport::disconnect() { + return Result::SUCCESS; +} + +bool EventDrivenUdpTransport::is_connected() const { + return is_running() && opened_.load(); +} + +Endpoint EventDrivenUdpTransport::get_local_endpoint() const { + if (opened_.load()) { + return adapter_.get_local_endpoint(); + } + return local_endpoint_; +} + +void EventDrivenUdpTransport::set_listener(ITransportListener* listener) { + listener_.store(listener, std::memory_order_release); +} + +Result EventDrivenUdpTransport::start() { + if (is_running()) { + return Result::SUCCESS; + } + + adapter_.set_receive_callback([this](const std::vector& data, const Endpoint& sender) { + on_adapter_receive(data, sender); + }); + + Result result = adapter_.open(local_endpoint_); + if (result != Result::SUCCESS) { + adapter_.set_receive_callback(nullptr); + return result; + } + + local_endpoint_ = adapter_.get_local_endpoint(); + opened_ = true; + running_ = true; + return Result::SUCCESS; +} + +Result EventDrivenUdpTransport::stop() { + if (!running_.load()) { + return Result::SUCCESS; + } + + running_ = false; + adapter_.set_receive_callback(nullptr); + adapter_.close(); + opened_ = false; + + platform::ScopedLock lock(queue_mutex_); + while (!receive_queue_.empty()) { + receive_queue_.pop(); + } + + return Result::SUCCESS; +} + +bool EventDrivenUdpTransport::is_running() const { + return running_.load(); +} + +Result EventDrivenUdpTransport::join_multicast_group(const std::string& multicast_address) { + if (!is_multicast_ipv4(multicast_address)) { + return Result::INVALID_ENDPOINT; + } + return adapter_.join_multicast(multicast_address, config_.multicast_interface); +} + +Result EventDrivenUdpTransport::leave_multicast_group(const std::string& multicast_address) { + if (!is_multicast_ipv4(multicast_address)) { + return Result::INVALID_ENDPOINT; + } + return adapter_.leave_multicast(multicast_address, config_.multicast_interface); +} + +void EventDrivenUdpTransport::on_adapter_receive(const std::vector& data, const Endpoint& sender) { + if (!running_.load()) { + return; + } + + MessagePtr message = platform::allocate_message(); + if (!message) { + auto* cb = listener_.load(std::memory_order_acquire); + if (cb) { + cb->on_error(Result::OUT_OF_MEMORY); + } + return; + } + if (!message->deserialize(data)) { + auto* cb = listener_.load(std::memory_order_acquire); + if (cb) { + cb->on_error(Result::INVALID_MESSAGE); + } + return; + } + + { + platform::ScopedLock lock(queue_mutex_); + receive_queue_.push(message); + } + + auto* cb = listener_.load(std::memory_order_acquire); + if (cb) { + cb->on_message_received(message, sender); + } +} + +bool EventDrivenUdpTransport::is_multicast_ipv4(const std::string& address) { + const Endpoint ep(address, 0, TransportProtocol::MULTICAST_UDP); + return ep.is_multicast(); +} + +} // namespace transport +} // namespace someip diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index d2ddc3d961..a7b7122d2e 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -34,6 +34,14 @@ target_link_libraries(test_tcp_transport someip-transport gtest_main) add_executable(test_udp_transport test_udp_transport.cpp) target_link_libraries(test_udp_transport someip-transport gtest_main) +# Event-driven UDP transport tests (mock adapter, no real sockets) +add_executable(test_event_driven_udp_transport test_event_driven_udp_transport.cpp) +target_link_libraries(test_event_driven_udp_transport someip-transport gtest_main) + +# Event-driven TCP transport tests (mock adapter, no real sockets) +add_executable(test_event_driven_tcp_transport test_event_driven_tcp_transport.cpp) +target_link_libraries(test_event_driven_tcp_transport someip-transport gtest_main) + # TP tests add_executable(test_tp test_tp.cpp) target_link_libraries(test_tp someip-tp gtest_main) @@ -118,6 +126,8 @@ target_link_libraries(test_e2e someip-core gtest_main) add_test(NAME EventsTest COMMAND test_events) add_test(NAME TcpTransportTest COMMAND test_tcp_transport) add_test(NAME UdpTransportTest COMMAND test_udp_transport) + add_test(NAME EventDrivenUdpTransportTest COMMAND test_event_driven_udp_transport) + add_test(NAME EventDrivenTcpTransportTest COMMAND test_event_driven_tcp_transport) add_test(NAME TpTest COMMAND test_tp) add_test(NAME E2ETest COMMAND test_e2e) @@ -125,7 +135,8 @@ target_link_libraries(test_e2e someip-core gtest_main) set(_ALL_TESTS PlatformThreadingTest SerializationTest MessageTest SessionManagerTest EndpointTest RpcTest SdTest EventsTest - TcpTransportTest UdpTransportTest TpTest E2ETest + TcpTransportTest UdpTransportTest EventDrivenUdpTransportTest + EventDrivenTcpTransportTest TpTest E2ETest ) if(NOT WIN32) list(APPEND _ALL_TESTS diff --git a/tests/test_event_driven_tcp_transport.cpp b/tests/test_event_driven_tcp_transport.cpp new file mode 100644 index 0000000000..5310f520e4 --- /dev/null +++ b/tests/test_event_driven_tcp_transport.cpp @@ -0,0 +1,482 @@ +/******************************************************************************** + * Copyright (c) 2025 Vinicius Tadeu Zein + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace someip; +using namespace someip::transport; + +namespace { + +class TestEventTcpListener : public ITransportListener { +public: + void on_message_received(MessagePtr message, const Endpoint& sender) override { + std::scoped_lock lock(mutex_); + received_messages_.push_back({std::move(message), sender}); + cv_.notify_one(); + } + + void on_connection_lost(const Endpoint& endpoint) override { + std::scoped_lock lock(mutex_); + lost_endpoint_ = endpoint; + connection_lost_count_++; + cv_.notify_one(); + } + + void on_connection_established(const Endpoint& endpoint) override { + std::scoped_lock lock(mutex_); + established_endpoint_ = endpoint; + connection_established_count_++; + cv_.notify_one(); + } + + void on_error(Result error) override { + std::scoped_lock lock(mutex_); + last_error_ = error; + error_count_++; + cv_.notify_one(); + } + + bool wait_for_message(std::chrono::milliseconds timeout = std::chrono::milliseconds(500)) { + std::unique_lock lock(mutex_); + return cv_.wait_for(lock, timeout, [this]() { return !received_messages_.empty(); }); + } + + bool wait_for_connection(std::chrono::milliseconds timeout = std::chrono::milliseconds(500)) { + std::unique_lock lock(mutex_); + return cv_.wait_for(lock, timeout, [this]() { return connection_established_count_ > 0; }); + } + + bool wait_for_disconnect(std::chrono::milliseconds timeout = std::chrono::milliseconds(500)) { + std::unique_lock lock(mutex_); + return cv_.wait_for(lock, timeout, [this]() { return connection_lost_count_ > 0; }); + } + + size_t message_count() const { + std::scoped_lock lock(mutex_); + return received_messages_.size(); + } + + uint16_t message_service_id(size_t index) const { + std::scoped_lock lock(mutex_); + return received_messages_.at(index).first->get_service_id(); + } + + std::atomic connection_established_count_{0}; + std::atomic connection_lost_count_{0}; + std::atomic last_error_{Result::SUCCESS}; + std::atomic error_count_{0}; + Endpoint established_endpoint_{"0.0.0.0", 0}; + Endpoint lost_endpoint_{"0.0.0.0", 0}; + +private: + mutable std::mutex mutex_; + std::condition_variable cv_; + std::vector> received_messages_; +}; + +class MockTcpAdapter : public ITcpSocketAdapter { +public: + Result open(const Endpoint& local_endpoint) override { + if (open_result_ != Result::SUCCESS) { + return open_result_; + } + open_ = true; + local_ = local_endpoint; + if (local_.get_port() == 0) { + local_.set_port(54321); + } + return Result::SUCCESS; + } + + void close() override { + open_ = false; + connected_ = false; + receive_cb_ = nullptr; + connected_cb_ = nullptr; + disconnected_cb_ = nullptr; + } + + Result listen(int /*backlog*/) override { + if (!open_) { + return Result::INVALID_STATE; + } + listening_ = true; + return Result::SUCCESS; + } + + Result connect(const Endpoint& remote_endpoint) override { + if (connect_result_ != Result::SUCCESS) { + return connect_result_; + } + connected_ = true; + remote_ = remote_endpoint; + if (connected_cb_) { + connected_cb_(remote_endpoint); + } + return Result::SUCCESS; + } + + Result accept(Endpoint& remote_out) override { + if (!listening_) { + return Result::INVALID_STATE; + } + if (!pending_connection_) { + return Result::TIMEOUT; + } + remote_out = pending_remote_; + connected_ = true; + pending_connection_ = false; + return Result::SUCCESS; + } + + Result send(const std::vector& data) override { + if (!connected_) { + return Result::NOT_CONNECTED; + } + last_send_data_ = data; + return Result::SUCCESS; + } + + void set_receive_callback(TcpReceiveCallback callback) override { + receive_cb_ = std::move(callback); + } + + void set_connected_callback(TcpConnectedCallback callback) override { + connected_cb_ = std::move(callback); + } + + void set_disconnected_callback(TcpDisconnectedCallback callback) override { + disconnected_cb_ = std::move(callback); + } + + Endpoint get_local_endpoint() const override { return local_; } + bool is_connected() const override { return connected_; } + + void inject_receive(const std::vector& data) { + if (receive_cb_) { + receive_cb_(data); + } + } + + void inject_connected(const Endpoint& remote) { + connected_ = true; + remote_ = remote; + if (connected_cb_) { + connected_cb_(remote); + } + } + + void inject_disconnected() { + connected_ = false; + if (disconnected_cb_) { + disconnected_cb_(); + } + } + + void set_open_result(Result r) { open_result_ = r; } + void set_connect_result(Result r) { connect_result_ = r; } + + void stage_pending_connection(const Endpoint& remote) { + pending_connection_ = true; + pending_remote_ = remote; + } + + bool is_open() const { return open_; } + std::vector last_send_data_; + +private: + bool open_{false}; + bool connected_{false}; + bool listening_{false}; + bool pending_connection_{false}; + Endpoint local_{"127.0.0.1", 0}; + Endpoint remote_{"0.0.0.0", 0}; + Endpoint pending_remote_{"0.0.0.0", 0}; + TcpReceiveCallback receive_cb_; + TcpConnectedCallback connected_cb_; + TcpDisconnectedCallback disconnected_cb_; + Result open_result_{Result::SUCCESS}; + Result connect_result_{Result::SUCCESS}; +}; + +Message make_tcp_sample_message() { + Message message; + message.set_service_id(0x1234); + message.set_method_id(0x5678); + message.set_client_id(0x9ABC); + message.set_session_id(0xDEF0); + message.set_protocol_version(1); + message.set_interface_version(1); + message.set_message_type(MessageType::REQUEST); + message.set_return_code(ReturnCode::E_OK); + message.set_payload({0x01, 0x02, 0x03}); + return message; +} + +} // namespace + +TEST(EventDrivenTcpTransport, ConstructionNotRunning) { + MockTcpAdapter adapter; + EventDrivenTcpTransport transport(adapter); + + EXPECT_FALSE(transport.is_running()); + EXPECT_FALSE(transport.is_connected()); +} + +TEST(EventDrivenTcpTransport, InitializeAndStart) { + MockTcpAdapter adapter; + EventDrivenTcpTransport transport(adapter); + + Endpoint local{"127.0.0.1", 30490, TransportProtocol::TCP}; + EXPECT_EQ(transport.initialize(local), Result::SUCCESS); + EXPECT_EQ(transport.start(), Result::SUCCESS); + EXPECT_TRUE(transport.is_running()); + EXPECT_EQ(transport.get_local_endpoint().get_port(), 30490); + + transport.stop(); + EXPECT_FALSE(transport.is_running()); +} + +TEST(EventDrivenTcpTransport, StartWithoutInitializeFails) { + MockTcpAdapter adapter; + EventDrivenTcpTransport transport(adapter); + + EXPECT_EQ(transport.start(), Result::NOT_INITIALIZED); + EXPECT_FALSE(transport.is_running()); +} + +TEST(EventDrivenTcpTransport, InitializePropagatesOpenFailure) { + MockTcpAdapter adapter; + adapter.set_open_result(Result::NETWORK_ERROR); + EventDrivenTcpTransport transport(adapter); + + EXPECT_EQ(transport.initialize(Endpoint{"127.0.0.1", 30490}), Result::NETWORK_ERROR); +} + +TEST(EventDrivenTcpTransport, DoubleInitializeFails) { + MockTcpAdapter adapter; + EventDrivenTcpTransport transport(adapter); + + EXPECT_EQ(transport.initialize(Endpoint{"127.0.0.1", 0}), Result::SUCCESS); + EXPECT_EQ(transport.initialize(Endpoint{"127.0.0.1", 0}), Result::INVALID_STATE); +} + +TEST(EventDrivenTcpTransport, ConnectCallbackNotifiesListener) { + MockTcpAdapter adapter; + EventDrivenTcpTransport transport(adapter); + TestEventTcpListener listener; + transport.set_listener(&listener); + + ASSERT_EQ(transport.initialize(Endpoint{"127.0.0.1", 0}), Result::SUCCESS); + ASSERT_EQ(transport.start(), Result::SUCCESS); + + Endpoint remote{"10.0.0.1", 5000, TransportProtocol::TCP}; + adapter.inject_connected(remote); + + ASSERT_TRUE(listener.wait_for_connection()); + EXPECT_EQ(listener.connection_established_count_.load(), 1); + + transport.stop(); +} + +TEST(EventDrivenTcpTransport, DisconnectCallbackNotifiesListener) { + MockTcpAdapter adapter; + EventDrivenTcpTransport transport(adapter); + TestEventTcpListener listener; + transport.set_listener(&listener); + + ASSERT_EQ(transport.initialize(Endpoint{"127.0.0.1", 0}), Result::SUCCESS); + ASSERT_EQ(transport.start(), Result::SUCCESS); + + Endpoint remote{"10.0.0.1", 5000, TransportProtocol::TCP}; + adapter.inject_connected(remote); + ASSERT_TRUE(listener.wait_for_connection()); + + adapter.inject_disconnected(); + ASSERT_TRUE(listener.wait_for_disconnect()); + EXPECT_EQ(listener.connection_lost_count_.load(), 1); + + transport.stop(); +} + +TEST(EventDrivenTcpTransport, SendForwardsToAdapter) { + MockTcpAdapter adapter; + EventDrivenTcpTransport transport(adapter); + + ASSERT_EQ(transport.initialize(Endpoint{"127.0.0.1", 0}), Result::SUCCESS); + ASSERT_EQ(transport.start(), Result::SUCCESS); + + adapter.inject_connected(Endpoint{"10.0.0.1", 5000, TransportProtocol::TCP}); + + Message msg = make_tcp_sample_message(); + Endpoint dest{"10.0.0.1", 5000}; + ASSERT_EQ(transport.send_message(msg, dest), Result::SUCCESS); + + std::vector expected = msg.serialize(); + EXPECT_EQ(adapter.last_send_data_, expected); + + transport.stop(); +} + +TEST(EventDrivenTcpTransport, SendWhenNotConnectedFails) { + MockTcpAdapter adapter; + EventDrivenTcpTransport transport(adapter); + + ASSERT_EQ(transport.initialize(Endpoint{"127.0.0.1", 0}), Result::SUCCESS); + ASSERT_EQ(transport.start(), Result::SUCCESS); + + Message msg = make_tcp_sample_message(); + EXPECT_EQ(transport.send_message(msg, Endpoint{"10.0.0.1", 5000}), Result::NOT_CONNECTED); + + transport.stop(); +} + +TEST(EventDrivenTcpTransport, ReceiveCallbackReassemblesMessage) { + MockTcpAdapter adapter; + EventDrivenTcpTransport transport(adapter); + TestEventTcpListener listener; + transport.set_listener(&listener); + + ASSERT_EQ(transport.initialize(Endpoint{"127.0.0.1", 0}), Result::SUCCESS); + ASSERT_EQ(transport.start(), Result::SUCCESS); + + adapter.inject_connected(Endpoint{"10.0.0.1", 5000, TransportProtocol::TCP}); + + Message sent = make_tcp_sample_message(); + std::vector raw = sent.serialize(); + adapter.inject_receive(raw); + + ASSERT_TRUE(listener.wait_for_message()); + ASSERT_EQ(listener.message_count(), 1u); + EXPECT_EQ(listener.message_service_id(0), sent.get_service_id()); + + MessagePtr queued = transport.receive_message(); + ASSERT_NE(queued, nullptr); + EXPECT_EQ(queued->get_method_id(), sent.get_method_id()); + + transport.stop(); +} + +TEST(EventDrivenTcpTransport, ReceiveFragmentedMessage) { + MockTcpAdapter adapter; + EventDrivenTcpTransport transport(adapter); + TestEventTcpListener listener; + transport.set_listener(&listener); + + ASSERT_EQ(transport.initialize(Endpoint{"127.0.0.1", 0}), Result::SUCCESS); + ASSERT_EQ(transport.start(), Result::SUCCESS); + + adapter.inject_connected(Endpoint{"10.0.0.1", 5000, TransportProtocol::TCP}); + + Message sent = make_tcp_sample_message(); + std::vector raw = sent.serialize(); + + size_t half = raw.size() / 2; + std::vector first_half(raw.begin(), raw.begin() + static_cast(half)); + std::vector second_half(raw.begin() + static_cast(half), raw.end()); + + adapter.inject_receive(first_half); + EXPECT_FALSE(listener.wait_for_message(std::chrono::milliseconds(50))); + + adapter.inject_receive(second_half); + ASSERT_TRUE(listener.wait_for_message()); + ASSERT_EQ(listener.message_count(), 1u); + EXPECT_EQ(listener.message_service_id(0), sent.get_service_id()); + + transport.stop(); +} + +TEST(EventDrivenTcpTransport, ServerModeEnableAndAccept) { + MockTcpAdapter adapter; + EventDrivenTcpTransport transport(adapter); + + ASSERT_EQ(transport.initialize(Endpoint{"127.0.0.1", 30490}), Result::SUCCESS); + ASSERT_EQ(transport.enable_server_mode(), Result::SUCCESS); + ASSERT_EQ(transport.start(), Result::SUCCESS); + + Endpoint remote{"10.0.0.2", 40000, TransportProtocol::TCP}; + adapter.stage_pending_connection(remote); + + Endpoint accepted; + EXPECT_EQ(transport.try_accept_connection(accepted), Result::SUCCESS); + EXPECT_EQ(accepted, remote); + + transport.stop(); +} + +TEST(EventDrivenTcpTransport, ServerModeWithoutInitializeFails) { + MockTcpAdapter adapter; + EventDrivenTcpTransport transport(adapter); + + EXPECT_EQ(transport.enable_server_mode(), Result::NOT_INITIALIZED); +} + +TEST(EventDrivenTcpTransport, ConnectInServerModeFails) { + MockTcpAdapter adapter; + EventDrivenTcpTransport transport(adapter); + + ASSERT_EQ(transport.initialize(Endpoint{"127.0.0.1", 0}), Result::SUCCESS); + ASSERT_EQ(transport.enable_server_mode(), Result::SUCCESS); + ASSERT_EQ(transport.start(), Result::SUCCESS); + + EXPECT_EQ(transport.connect(Endpoint{"10.0.0.1", 5000}), Result::INVALID_STATE); + + transport.stop(); +} + +TEST(EventDrivenTcpTransport, ListenerPreservedAcrossStopStart) { + MockTcpAdapter adapter; + EventDrivenTcpTransport transport(adapter); + TestEventTcpListener listener; + transport.set_listener(&listener); + + ASSERT_EQ(transport.initialize(Endpoint{"127.0.0.1", 0}), Result::SUCCESS); + ASSERT_EQ(transport.start(), Result::SUCCESS); + transport.stop(); + + ASSERT_EQ(transport.initialize(Endpoint{"127.0.0.1", 0}), Result::SUCCESS); + ASSERT_EQ(transport.start(), Result::SUCCESS); + + adapter.inject_connected(Endpoint{"10.0.0.1", 5000, TransportProtocol::TCP}); + + Message sent = make_tcp_sample_message(); + adapter.inject_receive(sent.serialize()); + + ASSERT_TRUE(listener.wait_for_message()); + ASSERT_EQ(listener.message_count(), 1u); + + transport.stop(); +} + +TEST(EventDrivenTcpTransport, StopClearsCallbacks) { + MockTcpAdapter adapter; + EventDrivenTcpTransport transport(adapter); + TestEventTcpListener listener; + transport.set_listener(&listener); + + ASSERT_EQ(transport.initialize(Endpoint{"127.0.0.1", 0}), Result::SUCCESS); + ASSERT_EQ(transport.start(), Result::SUCCESS); + transport.stop(); + + adapter.inject_receive({0x00, 0x01, 0x02}); + + EXPECT_FALSE(listener.wait_for_message(std::chrono::milliseconds(50))); +} diff --git a/tests/test_event_driven_udp_transport.cpp b/tests/test_event_driven_udp_transport.cpp new file mode 100644 index 0000000000..3183671c85 --- /dev/null +++ b/tests/test_event_driven_udp_transport.cpp @@ -0,0 +1,297 @@ +/******************************************************************************** + * Copyright (c) 2025 Vinicius Tadeu Zein + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace someip; +using namespace someip::transport; + +namespace { + +class TestEventUdpListener : public ITransportListener { +public: + void on_message_received(MessagePtr message, const Endpoint& sender) override { + std::scoped_lock lock(mutex_); + received_messages_.push_back({std::move(message), sender}); + cv_.notify_one(); + } + + void on_connection_lost(const Endpoint& /*endpoint*/) override {} + + void on_connection_established(const Endpoint& /*endpoint*/) override {} + + void on_error(Result error) override { + std::scoped_lock lock(mutex_); + last_error_ = error; + error_count_++; + cv_.notify_one(); + } + + bool wait_for_message(std::chrono::milliseconds timeout = std::chrono::milliseconds(500)) { + std::unique_lock lock(mutex_); + return cv_.wait_for(lock, timeout, [this]() { return !received_messages_.empty(); }); + } + + bool wait_for_error(std::chrono::milliseconds timeout = std::chrono::milliseconds(500)) { + std::unique_lock lock(mutex_); + return cv_.wait_for(lock, timeout, [this]() { return error_count_ > 0; }); + } + + size_t message_count() const { + std::scoped_lock lock(mutex_); + return received_messages_.size(); + } + + Endpoint message_sender(size_t index) const { + std::scoped_lock lock(mutex_); + return received_messages_.at(index).second; + } + + uint16_t message_service_id(size_t index) const { + std::scoped_lock lock(mutex_); + return received_messages_.at(index).first->get_service_id(); + } + + uint16_t message_method_id(size_t index) const { + std::scoped_lock lock(mutex_); + return received_messages_.at(index).first->get_method_id(); + } + + std::atomic last_error_{Result::SUCCESS}; + std::atomic error_count_{0}; + +private: + mutable std::mutex mutex_; + std::condition_variable cv_; + std::vector> received_messages_; +}; + +class MockUdpAdapter : public IUdpSocketAdapter { +public: + void set_open_result(Result r) { open_result_ = r; } + + Result open(const Endpoint& local_endpoint) override { + if (open_result_ != Result::SUCCESS) { + return open_result_; + } + open_ = true; + local_ = local_endpoint; + if (local_.get_port() == 0) { + local_.set_port(54321); + } + return Result::SUCCESS; + } + + void close() override { + open_ = false; + cb_ = nullptr; + } + + Result send(const std::vector& data, const Endpoint& destination) override { + last_send_data_ = data; + last_send_dest_ = destination; + return Result::SUCCESS; + } + + Result join_multicast(const std::string& multicast_address, + const std::string& /*interface_address*/) override { + joins_.push_back(multicast_address); + return Result::SUCCESS; + } + + Result leave_multicast(const std::string& multicast_address, + const std::string& /*interface_address*/) override { + leaves_.push_back(multicast_address); + return Result::SUCCESS; + } + + void set_receive_callback(UdpReceiveCallback callback) override { cb_ = std::move(callback); } + + Endpoint get_local_endpoint() const override { return local_; } + + void inject_receive(const std::vector& data, const Endpoint& sender) { + if (cb_) { + cb_(data, sender); + } + } + + bool is_open() const { return open_; } + + std::vector last_send_data_; + Endpoint last_send_dest_{"0.0.0.0", 0}; + std::vector joins_; + std::vector leaves_; + +private: + bool open_{false}; + Endpoint local_{"127.0.0.1", 0}; + UdpReceiveCallback cb_; + Result open_result_{Result::SUCCESS}; +}; + +Message make_sample_message() { + Message message; + message.set_service_id(0x1234); + message.set_method_id(0x5678); + message.set_client_id(0x9ABC); + message.set_session_id(0xDEF0); + message.set_protocol_version(1); + message.set_interface_version(1); + message.set_message_type(MessageType::REQUEST); + message.set_return_code(ReturnCode::E_OK); + message.set_payload({0x01, 0x02, 0x03}); + return message; +} + +} // namespace + +TEST(EventDrivenUdpTransport, ConstructionNotRunning) { + MockUdpAdapter adapter; + Endpoint local{"127.0.0.1", 0}; + EventDrivenUdpTransport transport(adapter, local); + + EXPECT_FALSE(transport.is_running()); + EXPECT_FALSE(transport.is_connected()); + EXPECT_EQ(transport.get_local_endpoint().get_address(), "127.0.0.1"); +} + +TEST(EventDrivenUdpTransport, StartStopLifecycle) { + MockUdpAdapter adapter; + EventDrivenUdpTransport transport(adapter, Endpoint{"127.0.0.1", 0}); + + EXPECT_EQ(transport.start(), Result::SUCCESS); + EXPECT_TRUE(transport.is_running()); + EXPECT_TRUE(transport.is_connected()); + EXPECT_TRUE(adapter.is_open()); + EXPECT_EQ(transport.get_local_endpoint().get_port(), 54321); + + EXPECT_EQ(transport.stop(), Result::SUCCESS); + EXPECT_FALSE(transport.is_running()); + EXPECT_FALSE(adapter.is_open()); +} + +TEST(EventDrivenUdpTransport, StartPropagatesOpenFailure) { + MockUdpAdapter adapter; + adapter.set_open_result(Result::NETWORK_ERROR); + EventDrivenUdpTransport transport(adapter, Endpoint{"127.0.0.1", 30490}); + + EXPECT_EQ(transport.start(), Result::NETWORK_ERROR); + EXPECT_FALSE(transport.is_running()); +} + +TEST(EventDrivenUdpTransport, SendForwardsToAdapter) { + MockUdpAdapter adapter; + EventDrivenUdpTransport transport(adapter, Endpoint{"127.0.0.1", 0}); + ASSERT_EQ(transport.start(), Result::SUCCESS); + + Message msg = make_sample_message(); + Endpoint dest{"127.0.0.1", 40000}; + ASSERT_EQ(transport.send_message(msg, dest), Result::SUCCESS); + + EXPECT_EQ(adapter.last_send_dest_, dest); + std::vector expected = msg.serialize(); + EXPECT_EQ(adapter.last_send_data_, expected); + + transport.stop(); +} + +TEST(EventDrivenUdpTransport, ReceiveCallbackNotifiesListener) { + MockUdpAdapter adapter; + EventDrivenUdpTransport transport(adapter, Endpoint{"127.0.0.1", 0}); + TestEventUdpListener listener; + transport.set_listener(&listener); + + ASSERT_EQ(transport.start(), Result::SUCCESS); + + Message sent = make_sample_message(); + std::vector raw = sent.serialize(); + Endpoint sender{"10.0.0.5", 5000, TransportProtocol::UDP}; + adapter.inject_receive(raw, sender); + + ASSERT_TRUE(listener.wait_for_message()); + ASSERT_EQ(listener.message_count(), 1u); + EXPECT_EQ(listener.message_sender(0), sender); + EXPECT_EQ(listener.message_service_id(0), sent.get_service_id()); + + MessagePtr queued = transport.receive_message(); + ASSERT_NE(queued, nullptr); + EXPECT_EQ(queued->get_method_id(), sent.get_method_id()); + + transport.stop(); +} + +TEST(EventDrivenUdpTransport, MalformedDatagramInvokesOnError) { + MockUdpAdapter adapter; + EventDrivenUdpTransport transport(adapter, Endpoint{"127.0.0.1", 0}); + TestEventUdpListener listener; + transport.set_listener(&listener); + + ASSERT_EQ(transport.start(), Result::SUCCESS); + + adapter.inject_receive({0x00, 0x01}, Endpoint{"127.0.0.1", 1234}); + + ASSERT_TRUE(listener.wait_for_error()); + EXPECT_EQ(listener.last_error_.load(), Result::INVALID_MESSAGE); + + transport.stop(); +} + +TEST(EventDrivenUdpTransport, MulticastJoinLeave) { + MockUdpAdapter adapter; + EventDrivenUdpTransport transport(adapter, Endpoint{"127.0.0.1", 0}); + ASSERT_EQ(transport.start(), Result::SUCCESS); + + EXPECT_EQ(transport.join_multicast_group("224.0.0.251"), Result::SUCCESS); + EXPECT_EQ(transport.leave_multicast_group("224.0.0.251"), Result::SUCCESS); + ASSERT_EQ(adapter.joins_.size(), 1u); + ASSERT_EQ(adapter.leaves_.size(), 1u); + EXPECT_EQ(adapter.joins_[0], "224.0.0.251"); + EXPECT_EQ(adapter.leaves_[0], "224.0.0.251"); + + transport.stop(); +} + +TEST(EventDrivenUdpTransport, ConnectMulticastUdpJoinsAdapter) { + MockUdpAdapter adapter; + EventDrivenUdpTransport transport(adapter, Endpoint{"127.0.0.1", 0}); + ASSERT_EQ(transport.start(), Result::SUCCESS); + + Endpoint mc("239.0.0.1", 30490, TransportProtocol::MULTICAST_UDP); + EXPECT_EQ(transport.connect(mc), Result::SUCCESS); + ASSERT_EQ(adapter.joins_.size(), 1u); + EXPECT_EQ(adapter.joins_[0], "239.0.0.1"); + + transport.stop(); +} + +TEST(EventDrivenUdpTransport, StopClearsReceiveCallback) { + MockUdpAdapter adapter; + EventDrivenUdpTransport transport(adapter, Endpoint{"127.0.0.1", 0}); + TestEventUdpListener listener; + transport.set_listener(&listener); + + ASSERT_EQ(transport.start(), Result::SUCCESS); + transport.stop(); + + Message sent = make_sample_message(); + adapter.inject_receive(sent.serialize(), Endpoint{"127.0.0.1", 1}); + + EXPECT_FALSE(listener.wait_for_message(std::chrono::milliseconds(50))); +}