From f9e3ca429f916f0ce3ec4d28055917a49b72c633 Mon Sep 17 00:00:00 2001 From: Vinicius Zein Date: Sat, 28 Mar 2026 12:32:03 -0400 Subject: [PATCH 1/7] feat(transport): add event-driven transport interfaces (#172) Made-with: Cursor --- .../transport/event_driven_tcp_transport.h | 95 ++++++ .../transport/event_driven_udp_transport.h | 83 ++++++ include/transport/tcp_socket_adapter.h | 89 ++++++ include/transport/udp_socket_adapter.h | 90 ++++++ src/CMakeLists.txt | 2 + src/transport/event_driven_tcp_transport.cpp | 275 +++++++++++++++++ src/transport/event_driven_udp_transport.cpp | 187 ++++++++++++ tests/CMakeLists.txt | 7 +- tests/test_event_driven_udp_transport.cpp | 277 ++++++++++++++++++ 9 files changed, 1104 insertions(+), 1 deletion(-) create mode 100644 include/transport/event_driven_tcp_transport.h create mode 100644 include/transport/event_driven_udp_transport.h create mode 100644 include/transport/tcp_socket_adapter.h create mode 100644 include/transport/udp_socket_adapter.h create mode 100644 src/transport/event_driven_tcp_transport.cpp create mode 100644 src/transport/event_driven_udp_transport.cpp create mode 100644 tests/test_event_driven_udp_transport.cpp diff --git a/include/transport/event_driven_tcp_transport.h b/include/transport/event_driven_tcp_transport.h new file mode 100644 index 0000000000..b7e8814ceb --- /dev/null +++ b/include/transport/event_driven_tcp_transport.h @@ -0,0 +1,95 @@ +/******************************************************************************** + * Copyright (c) 2025 Vinicius Tadeu Zein + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#ifndef SOMEIP_TRANSPORT_EVENT_DRIVEN_TCP_TRANSPORT_H +#define SOMEIP_TRANSPORT_EVENT_DRIVEN_TCP_TRANSPORT_H + +#include "transport/transport.h" +#include "transport/tcp_socket_adapter.h" +#include "platform/thread.h" +#include +#include +#include + +namespace someip { +namespace transport { + +/** + * @brief Configuration for event-driven TCP transport (stream reassembly limits). + */ +struct EventDrivenTcpTransportConfig { + size_t max_receive_buffer{65536}; +}; + +/** + * @brief ITransport implementation driven by an ITcpSocketAdapter. + */ +class EventDrivenTcpTransport : public ITransport { +public: + explicit EventDrivenTcpTransport(ITcpSocketAdapter& adapter, + const EventDrivenTcpTransportConfig& config = EventDrivenTcpTransportConfig()); + + ~EventDrivenTcpTransport() override; + + EventDrivenTcpTransport(const EventDrivenTcpTransport&) = delete; + EventDrivenTcpTransport& operator=(const EventDrivenTcpTransport&) = delete; + + [[nodiscard]] Result initialize(const Endpoint& local_endpoint); + + [[nodiscard]] Result enable_server_mode(int backlog = 5); + + /** + * @brief Non-blocking accept attempt (server mode). Adapter should invoke + * connected/disconnected callbacks when the connection is ready or lost. + */ + [[nodiscard]] Result try_accept_connection(Endpoint& remote_out); + + [[nodiscard]] Result send_message(const Message& message, const Endpoint& endpoint) override; + MessagePtr receive_message() override; + Result connect(const Endpoint& endpoint) override; + Result disconnect() override; + bool is_connected() const override; + Endpoint get_local_endpoint() const override; + void set_listener(ITransportListener* listener) override; + Result start() override; + Result stop() override; + bool is_running() const override; + +private: + void on_adapter_receive(const std::vector& data); + void on_adapter_connected(const Endpoint& remote); + void on_adapter_disconnected(); + bool parse_message_from_buffer(std::vector& buffer, MessagePtr& message); + + ITcpSocketAdapter& adapter_; + EventDrivenTcpTransportConfig config_; + Endpoint local_endpoint_; + Endpoint connection_remote_; + ITransportListener* listener_{nullptr}; + + std::atomic running_{false}; + std::atomic initialized_{false}; + bool server_mode_{false}; + + std::vector receive_buffer_; + std::queue> message_queue_; + platform::Mutex queue_mutex_; + + static const size_t SOMEIP_HEADER_SIZE; + static const size_t MAX_MESSAGE_SIZE; +}; + +} // namespace transport +} // namespace someip + +#endif // SOMEIP_TRANSPORT_EVENT_DRIVEN_TCP_TRANSPORT_H diff --git a/include/transport/event_driven_udp_transport.h b/include/transport/event_driven_udp_transport.h new file mode 100644 index 0000000000..10c4089152 --- /dev/null +++ b/include/transport/event_driven_udp_transport.h @@ -0,0 +1,83 @@ +/******************************************************************************** + * Copyright (c) 2025 Vinicius Tadeu Zein + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#ifndef SOMEIP_TRANSPORT_EVENT_DRIVEN_UDP_TRANSPORT_H +#define SOMEIP_TRANSPORT_EVENT_DRIVEN_UDP_TRANSPORT_H + +#include "transport/transport.h" +#include "transport/udp_socket_adapter.h" +#include "platform/thread.h" +#include +#include +#include + +namespace someip { +namespace transport { + +/** + * @brief Configuration for event-driven UDP transport. + */ +struct EventDrivenUdpTransportConfig { + std::string multicast_interface{}; + size_t max_message_size{1400}; +}; + +/** + * @brief ITransport implementation driven by an IUdpSocketAdapter. + */ +class EventDrivenUdpTransport : public ITransport { +public: + explicit EventDrivenUdpTransport(IUdpSocketAdapter& adapter, + const Endpoint& local_endpoint, + const EventDrivenUdpTransportConfig& config = EventDrivenUdpTransportConfig()); + + ~EventDrivenUdpTransport() override; + + EventDrivenUdpTransport(const EventDrivenUdpTransport&) = delete; + EventDrivenUdpTransport& operator=(const EventDrivenUdpTransport&) = delete; + + [[nodiscard]] Result send_message(const Message& message, const Endpoint& endpoint) override; + MessagePtr receive_message() override; + Result connect(const Endpoint& endpoint) override; + Result disconnect() override; + bool is_connected() const override; + Endpoint get_local_endpoint() const override; + void set_listener(ITransportListener* listener) override; + Result start() override; + Result stop() override; + bool is_running() const override; + + Result join_multicast_group(const std::string& multicast_address); + Result leave_multicast_group(const std::string& multicast_address); + +private: + void on_adapter_receive(const std::vector& data, const Endpoint& sender); + static bool is_multicast_ipv4(const std::string& address); + + IUdpSocketAdapter& adapter_; + Endpoint local_endpoint_; + EventDrivenUdpTransportConfig config_; + std::atomic running_{false}; + std::atomic opened_{false}; + ITransportListener* listener_{nullptr}; + + std::queue receive_queue_; + platform::Mutex queue_mutex_; + + static constexpr size_t MAX_UDP_PAYLOAD = 65507; +}; + +} // namespace transport +} // namespace someip + +#endif // SOMEIP_TRANSPORT_EVENT_DRIVEN_UDP_TRANSPORT_H diff --git a/include/transport/tcp_socket_adapter.h b/include/transport/tcp_socket_adapter.h new file mode 100644 index 0000000000..6d35c49ec7 --- /dev/null +++ b/include/transport/tcp_socket_adapter.h @@ -0,0 +1,89 @@ +/******************************************************************************** + * Copyright (c) 2025 Vinicius Tadeu Zein + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#ifndef SOMEIP_TRANSPORT_TCP_SOCKET_ADAPTER_H +#define SOMEIP_TRANSPORT_TCP_SOCKET_ADAPTER_H + +#include "transport/endpoint.h" +#include "common/result.h" +#include +#include + +namespace someip { +namespace transport { + +/** + * @brief Invoked when payload bytes arrive on the established connection. + */ +using TcpReceiveCallback = std::function& data)>; + +/** + * @brief Invoked when the connection is ready (outgoing connect or accepted peer). + */ +using TcpConnectedCallback = std::function; + +/** + * @brief Invoked when the connection is closed or reset. + */ +using TcpDisconnectedCallback = std::function; + +/** + * @brief TCP socket abstraction for event-driven SOME/IP transport. + * + * Implemented by integrators using non-BSD stacks. The adapter owns the + * semantics of connect/accept; it must invoke callbacks from its event context. + */ +class ITcpSocketAdapter { +public: + virtual ~ITcpSocketAdapter() = default; + + /** + * @brief Create socket bound to the local endpoint (listening or pre-connect). + */ + [[nodiscard]] virtual Result open(const Endpoint& local_endpoint) = 0; + + virtual void close() = 0; + + /** + * @brief Start listening after open (server mode). No-op or NOT_IMPLEMENTED for client-only adapters. + */ + [[nodiscard]] virtual Result listen(int backlog) = 0; + + /** + * @brief Connect to a remote endpoint (client mode). + */ + [[nodiscard]] virtual Result connect(const Endpoint& remote_endpoint) = 0; + + /** + * @brief Accept one pending connection if available (non-blocking from SOME/IP's perspective). + * @param remote_out Peer endpoint when Result::SUCCESS + */ + [[nodiscard]] virtual Result accept(Endpoint& remote_out) = 0; + + /** + * @brief Send bytes on the active connection. + */ + [[nodiscard]] virtual Result send(const std::vector& data) = 0; + + virtual void set_receive_callback(TcpReceiveCallback callback) = 0; + virtual void set_connected_callback(TcpConnectedCallback callback) = 0; + virtual void set_disconnected_callback(TcpDisconnectedCallback callback) = 0; + + [[nodiscard]] virtual Endpoint get_local_endpoint() const = 0; + [[nodiscard]] virtual bool is_connected() const = 0; +}; + +} // namespace transport +} // namespace someip + +#endif // SOMEIP_TRANSPORT_TCP_SOCKET_ADAPTER_H diff --git a/include/transport/udp_socket_adapter.h b/include/transport/udp_socket_adapter.h new file mode 100644 index 0000000000..36b285314e --- /dev/null +++ b/include/transport/udp_socket_adapter.h @@ -0,0 +1,90 @@ +/******************************************************************************** + * Copyright (c) 2025 Vinicius Tadeu Zein + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#ifndef SOMEIP_TRANSPORT_UDP_SOCKET_ADAPTER_H +#define SOMEIP_TRANSPORT_UDP_SOCKET_ADAPTER_H + +#include "transport/endpoint.h" +#include "common/result.h" +#include +#include +#include + +namespace someip { +namespace transport { + +/** + * @brief Callback invoked by the adapter when a datagram is received. + * + * Integrators call this from their I/O event path after a packet is available. + * The payload is the raw UDP payload (one datagram). + */ +using UdpReceiveCallback = std::function& data, const Endpoint& sender)>; + +/** + * @brief UDP socket abstraction for event-driven SOME/IP transport. + * + * Implemented by integrators using non-BSD stacks (e.g. custom datagram sockets). + * Must not depend on platform socket headers. + */ +class IUdpSocketAdapter { +public: + virtual ~IUdpSocketAdapter() = default; + + /** + * @brief Open and bind the local endpoint (port 0 selects an ephemeral port). + */ + [[nodiscard]] virtual Result open(const Endpoint& local_endpoint) = 0; + + /** + * @brief Close the socket and release resources. + */ + virtual void close() = 0; + + /** + * @brief Send one datagram to the destination. + */ + [[nodiscard]] virtual Result send(const std::vector& data, const Endpoint& destination) = 0; + + /** + * @brief Join an IPv4 multicast group. + * @param multicast_address Group address (e.g. 224.0.0.1) + * @param interface_address Outgoing interface address; empty uses stack default + */ + [[nodiscard]] virtual Result join_multicast(const std::string& multicast_address, + const std::string& interface_address = {}) = 0; + + /** + * @brief Leave a multicast group previously joined. + */ + [[nodiscard]] virtual Result leave_multicast(const std::string& multicast_address, + const std::string& interface_address = {}) = 0; + + /** + * @brief Register the receive callback (nullptr clears). + * + * The adapter must invoke the callback for each received datagram from the + * integrator's event loop or I/O thread. + */ + virtual void set_receive_callback(UdpReceiveCallback callback) = 0; + + /** + * @brief Effective local endpoint after open (required after bind with port 0). + */ + [[nodiscard]] virtual Endpoint get_local_endpoint() const = 0; +}; + +} // namespace transport +} // namespace someip + +#endif // SOMEIP_TRANSPORT_UDP_SOCKET_ADAPTER_H diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index fe35dac237..8a52028e29 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -10,6 +10,8 @@ set(OPENSOMEIP_SOURCES transport/endpoint.cpp transport/udp_transport.cpp transport/tcp_transport.cpp + transport/event_driven_udp_transport.cpp + transport/event_driven_tcp_transport.cpp e2e/e2e_protection.cpp e2e/e2e_header.cpp e2e/e2e_crc.cpp diff --git a/src/transport/event_driven_tcp_transport.cpp b/src/transport/event_driven_tcp_transport.cpp new file mode 100644 index 0000000000..a6b84c299f --- /dev/null +++ b/src/transport/event_driven_tcp_transport.cpp @@ -0,0 +1,275 @@ +/******************************************************************************** + * Copyright (c) 2025 Vinicius Tadeu Zein + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#include "transport/event_driven_tcp_transport.h" +#include "platform/memory.h" +#include +#include + +namespace someip { +namespace transport { + +const size_t EventDrivenTcpTransport::SOMEIP_HEADER_SIZE = 16; +const size_t EventDrivenTcpTransport::MAX_MESSAGE_SIZE = 65535; + +EventDrivenTcpTransport::EventDrivenTcpTransport(ITcpSocketAdapter& adapter, + const EventDrivenTcpTransportConfig& config) + : adapter_(adapter), + config_(config) {} + +EventDrivenTcpTransport::~EventDrivenTcpTransport() { + // NOLINTNEXTLINE(clang-analyzer-optin.cplusplus.VirtualCall) + stop(); +} + +Result EventDrivenTcpTransport::initialize(const Endpoint& local_endpoint) { + if (initialized_.load()) { + return Result::INVALID_STATE; + } + + local_endpoint_ = + Endpoint(local_endpoint.get_address(), local_endpoint.get_port(), TransportProtocol::TCP); + + Result result = adapter_.open(local_endpoint_); + if (result != Result::SUCCESS) { + return result; + } + + local_endpoint_ = adapter_.get_local_endpoint(); + initialized_ = true; + return Result::SUCCESS; +} + +Result EventDrivenTcpTransport::enable_server_mode(int backlog) { + if (!initialized_.load()) { + return Result::NOT_INITIALIZED; + } + Result result = adapter_.listen(backlog); + if (result != Result::SUCCESS) { + return result; + } + server_mode_ = true; + return Result::SUCCESS; +} + +Result EventDrivenTcpTransport::try_accept_connection(Endpoint& remote_out) { + if (!server_mode_) { + return Result::INVALID_STATE; + } + if (!running_.load()) { + return Result::INVALID_STATE; + } + return adapter_.accept(remote_out); +} + +Result EventDrivenTcpTransport::send_message(const Message& message, const Endpoint& /*endpoint*/) { + if (!is_connected()) { + return Result::NOT_CONNECTED; + } + + std::vector data = message.serialize(); + return adapter_.send(data); +} + +MessagePtr EventDrivenTcpTransport::receive_message() { + platform::ScopedLock lock(queue_mutex_); + if (message_queue_.empty()) { + return nullptr; + } + MessagePtr message = message_queue_.front().first; + message_queue_.pop(); + return message; +} + +Result EventDrivenTcpTransport::connect(const Endpoint& endpoint) { + if (server_mode_) { + return Result::INVALID_STATE; + } + if (is_connected()) { + return Result::SUCCESS; + } + if (!initialized_.load()) { + return Result::NOT_INITIALIZED; + } + if (!running_.load()) { + return Result::INVALID_STATE; + } + return adapter_.connect(endpoint); +} + +Result EventDrivenTcpTransport::disconnect() { + if (!adapter_.is_connected() && !initialized_.load()) { + return Result::SUCCESS; + } + adapter_.close(); + receive_buffer_.clear(); + initialized_ = false; + return Result::SUCCESS; +} + +bool EventDrivenTcpTransport::is_connected() const { + return adapter_.is_connected(); +} + +Endpoint EventDrivenTcpTransport::get_local_endpoint() const { + if (initialized_.load()) { + return adapter_.get_local_endpoint(); + } + return local_endpoint_; +} + +void EventDrivenTcpTransport::set_listener(ITransportListener* listener) { + listener_ = listener; +} + +Result EventDrivenTcpTransport::start() { + if (!initialized_.load()) { + return Result::NOT_INITIALIZED; + } + if (running_.load()) { + return Result::SUCCESS; + } + + adapter_.set_receive_callback([this](const std::vector& data) { on_adapter_receive(data); }); + adapter_.set_connected_callback([this](const Endpoint& remote) { on_adapter_connected(remote); }); + adapter_.set_disconnected_callback([this]() { on_adapter_disconnected(); }); + + running_ = true; + return Result::SUCCESS; +} + +Result EventDrivenTcpTransport::stop() { + listener_ = nullptr; + + adapter_.set_receive_callback(nullptr); + adapter_.set_connected_callback(nullptr); + adapter_.set_disconnected_callback(nullptr); + + adapter_.close(); + initialized_ = false; + server_mode_ = false; + receive_buffer_.clear(); + + { + platform::ScopedLock lock(queue_mutex_); + while (!message_queue_.empty()) { + message_queue_.pop(); + } + } + + running_ = false; + return Result::SUCCESS; +} + +bool EventDrivenTcpTransport::is_running() const { + return running_; +} + +void EventDrivenTcpTransport::on_adapter_receive(const std::vector& data) { + if (!running_.load() || !initialized_.load()) { + return; + } + + std::vector delivered; + { + platform::ScopedLock lock(queue_mutex_); + receive_buffer_.insert(receive_buffer_.end(), data.begin(), data.end()); + MessagePtr message; + while (parse_message_from_buffer(receive_buffer_, message)) { + message_queue_.push({message, connection_remote_}); + delivered.push_back(message); + } + } + + for (const MessagePtr& m : delivered) { + if (listener_) { + listener_->on_message_received(m, connection_remote_); + } + } +} + +void EventDrivenTcpTransport::on_adapter_connected(const Endpoint& remote) { + if (!running_.load()) { + return; + } + connection_remote_ = remote; + if (listener_) { + listener_->on_connection_established(remote); + } +} + +void EventDrivenTcpTransport::on_adapter_disconnected() { + receive_buffer_.clear(); + Endpoint lost = connection_remote_; + initialized_ = false; + if (listener_) { + listener_->on_connection_lost(lost); + } +} + +bool EventDrivenTcpTransport::parse_message_from_buffer(std::vector& buffer, MessagePtr& message) { + if (buffer.size() > config_.max_receive_buffer) { + buffer.clear(); + return false; + } + + if (buffer.size() < SOMEIP_HEADER_SIZE) { + return false; + } + + uint32_t length_from_client_id = + (static_cast(buffer[4]) << 24) | (static_cast(buffer[5]) << 16) | + (static_cast(buffer[6]) << 8) | static_cast(buffer[7]); + + if (length_from_client_id < 8 || length_from_client_id > MAX_MESSAGE_SIZE) { + size_t search_start = SOMEIP_HEADER_SIZE; + bool found_valid_header = false; + + while (search_start + SOMEIP_HEADER_SIZE <= buffer.size()) { + uint32_t potential_msg_id = (static_cast(buffer[search_start]) << 24) | + (static_cast(buffer[search_start + 1]) << 16) | + (static_cast(buffer[search_start + 2]) << 8) | + static_cast(buffer[search_start + 3]); + if (potential_msg_id != 0) { + buffer.erase(buffer.begin(), buffer.begin() + static_cast(search_start)); + found_valid_header = true; + break; + } + search_start++; + } + + if (!found_valid_header) { + buffer.clear(); + } + return false; + } + + size_t total_message_size = 8 + length_from_client_id; + + if (buffer.size() < total_message_size) { + return false; + } + + std::vector message_data(buffer.begin(), buffer.begin() + static_cast(total_message_size)); + buffer.erase(buffer.begin(), buffer.begin() + static_cast(total_message_size)); + + message = platform::allocate_message(); + if (message && message->deserialize(message_data)) { + return true; + } + + return false; +} + +} // namespace transport +} // namespace someip diff --git a/src/transport/event_driven_udp_transport.cpp b/src/transport/event_driven_udp_transport.cpp new file mode 100644 index 0000000000..1f54f5e37c --- /dev/null +++ b/src/transport/event_driven_udp_transport.cpp @@ -0,0 +1,187 @@ +/******************************************************************************** + * Copyright (c) 2025 Vinicius Tadeu Zein + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#include "transport/event_driven_udp_transport.h" +#include "platform/memory.h" +#include + +namespace someip { +namespace transport { + +EventDrivenUdpTransport::EventDrivenUdpTransport(IUdpSocketAdapter& adapter, + const Endpoint& local_endpoint, + const EventDrivenUdpTransportConfig& config) + : adapter_(adapter), + local_endpoint_(local_endpoint), + config_(config) { + if (!local_endpoint_.is_valid()) { +#if defined(__cpp_exceptions) || defined(__EXCEPTIONS) + throw std::invalid_argument("Invalid local endpoint"); +#endif + } +} + +EventDrivenUdpTransport::~EventDrivenUdpTransport() { + // NOLINTNEXTLINE(clang-analyzer-optin.cplusplus.VirtualCall) + stop(); +} + +Result EventDrivenUdpTransport::send_message(const Message& message, const Endpoint& endpoint) { + if (!is_running()) { + return Result::NOT_CONNECTED; + } + if (!endpoint.is_valid()) { + return Result::INVALID_ENDPOINT; + } + + std::vector data = message.serialize(); + if (data.size() > MAX_UDP_PAYLOAD) { + return Result::BUFFER_OVERFLOW; + } + if (config_.max_message_size > 0 && data.size() > config_.max_message_size) { + // Same policy as UdpTransport: warn path reserved for TP segmentation + } + + return adapter_.send(data, endpoint); +} + +MessagePtr EventDrivenUdpTransport::receive_message() { + platform::ScopedLock lock(queue_mutex_); + if (receive_queue_.empty()) { + return nullptr; + } + MessagePtr message = receive_queue_.front(); + receive_queue_.pop(); + return message; +} + +Result EventDrivenUdpTransport::connect(const Endpoint& endpoint) { + if (!endpoint.is_valid()) { + return Result::INVALID_ENDPOINT; + } + if (endpoint.get_protocol() == TransportProtocol::MULTICAST_UDP) { + if (!is_multicast_ipv4(endpoint.get_address())) { + return Result::INVALID_ENDPOINT; + } + return adapter_.join_multicast(endpoint.get_address(), config_.multicast_interface); + } + return Result::SUCCESS; +} + +Result EventDrivenUdpTransport::disconnect() { + return Result::SUCCESS; +} + +bool EventDrivenUdpTransport::is_connected() const { + return is_running() && opened_.load(); +} + +Endpoint EventDrivenUdpTransport::get_local_endpoint() const { + if (opened_.load()) { + return adapter_.get_local_endpoint(); + } + return local_endpoint_; +} + +void EventDrivenUdpTransport::set_listener(ITransportListener* listener) { + listener_ = listener; +} + +Result EventDrivenUdpTransport::start() { + if (is_running()) { + return Result::SUCCESS; + } + + adapter_.set_receive_callback([this](const std::vector& data, const Endpoint& sender) { + on_adapter_receive(data, sender); + }); + + Result result = adapter_.open(local_endpoint_); + if (result != Result::SUCCESS) { + adapter_.set_receive_callback(nullptr); + return result; + } + + local_endpoint_ = adapter_.get_local_endpoint(); + opened_ = true; + running_ = true; + return Result::SUCCESS; +} + +Result EventDrivenUdpTransport::stop() { + if (!running_.load()) { + return Result::SUCCESS; + } + + running_ = false; + listener_ = nullptr; + adapter_.set_receive_callback(nullptr); + adapter_.close(); + opened_ = false; + + platform::ScopedLock lock(queue_mutex_); + while (!receive_queue_.empty()) { + receive_queue_.pop(); + } + + return Result::SUCCESS; +} + +bool EventDrivenUdpTransport::is_running() const { + return running_; +} + +Result EventDrivenUdpTransport::join_multicast_group(const std::string& multicast_address) { + if (!is_multicast_ipv4(multicast_address)) { + return Result::INVALID_ENDPOINT; + } + return adapter_.join_multicast(multicast_address, config_.multicast_interface); +} + +Result EventDrivenUdpTransport::leave_multicast_group(const std::string& multicast_address) { + if (!is_multicast_ipv4(multicast_address)) { + return Result::INVALID_ENDPOINT; + } + return adapter_.leave_multicast(multicast_address, config_.multicast_interface); +} + +void EventDrivenUdpTransport::on_adapter_receive(const std::vector& data, const Endpoint& sender) { + if (!running_.load()) { + return; + } + + MessagePtr message = platform::allocate_message(); + if (!message->deserialize(data)) { + if (listener_) { + listener_->on_error(Result::INVALID_MESSAGE); + } + return; + } + + { + platform::ScopedLock lock(queue_mutex_); + receive_queue_.push(message); + } + + if (listener_) { + listener_->on_message_received(message, sender); + } +} + +bool EventDrivenUdpTransport::is_multicast_ipv4(const std::string& address) { + const Endpoint ep(address, 0, TransportProtocol::MULTICAST_UDP); + return ep.is_multicast(); +} + +} // namespace transport +} // namespace someip diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index d2ddc3d961..786eb9357d 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -34,6 +34,10 @@ target_link_libraries(test_tcp_transport someip-transport gtest_main) add_executable(test_udp_transport test_udp_transport.cpp) target_link_libraries(test_udp_transport someip-transport gtest_main) +# Event-driven UDP transport tests (mock adapter, no real sockets) +add_executable(test_event_driven_udp_transport test_event_driven_udp_transport.cpp) +target_link_libraries(test_event_driven_udp_transport someip-transport gtest_main) + # TP tests add_executable(test_tp test_tp.cpp) target_link_libraries(test_tp someip-tp gtest_main) @@ -118,6 +122,7 @@ target_link_libraries(test_e2e someip-core gtest_main) add_test(NAME EventsTest COMMAND test_events) add_test(NAME TcpTransportTest COMMAND test_tcp_transport) add_test(NAME UdpTransportTest COMMAND test_udp_transport) + add_test(NAME EventDrivenUdpTransportTest COMMAND test_event_driven_udp_transport) add_test(NAME TpTest COMMAND test_tp) add_test(NAME E2ETest COMMAND test_e2e) @@ -125,7 +130,7 @@ target_link_libraries(test_e2e someip-core gtest_main) set(_ALL_TESTS PlatformThreadingTest SerializationTest MessageTest SessionManagerTest EndpointTest RpcTest SdTest EventsTest - TcpTransportTest UdpTransportTest TpTest E2ETest + TcpTransportTest UdpTransportTest EventDrivenUdpTransportTest TpTest E2ETest ) if(NOT WIN32) list(APPEND _ALL_TESTS diff --git a/tests/test_event_driven_udp_transport.cpp b/tests/test_event_driven_udp_transport.cpp new file mode 100644 index 0000000000..bffc558988 --- /dev/null +++ b/tests/test_event_driven_udp_transport.cpp @@ -0,0 +1,277 @@ +/******************************************************************************** + * Copyright (c) 2025 Vinicius Tadeu Zein + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace someip; +using namespace someip::transport; + +namespace { + +class TestEventUdpListener : public ITransportListener { +public: + void on_message_received(MessagePtr message, const Endpoint& sender) override { + std::scoped_lock lock(mutex_); + received_messages_.push_back({std::move(message), sender}); + cv_.notify_one(); + } + + void on_connection_lost(const Endpoint& /*endpoint*/) override {} + + void on_connection_established(const Endpoint& /*endpoint*/) override {} + + void on_error(Result error) override { + std::scoped_lock lock(mutex_); + last_error_ = error; + error_count_++; + cv_.notify_one(); + } + + bool wait_for_message(std::chrono::milliseconds timeout = std::chrono::milliseconds(500)) { + std::unique_lock lock(mutex_); + return cv_.wait_for(lock, timeout, [this]() { return !received_messages_.empty(); }); + } + + bool wait_for_error(std::chrono::milliseconds timeout = std::chrono::milliseconds(500)) { + std::unique_lock lock(mutex_); + return cv_.wait_for(lock, timeout, [this]() { return error_count_ > 0; }); + } + + std::vector> received_messages_; + std::atomic last_error_{Result::SUCCESS}; + std::atomic error_count_{0}; + +private: + std::mutex mutex_; + std::condition_variable cv_; +}; + +class MockUdpAdapter : public IUdpSocketAdapter { +public: + void set_open_result(Result r) { open_result_ = r; } + + Result open(const Endpoint& local_endpoint) override { + if (open_result_ != Result::SUCCESS) { + return open_result_; + } + open_ = true; + local_ = local_endpoint; + if (local_.get_port() == 0) { + local_.set_port(54321); + } + return Result::SUCCESS; + } + + void close() override { + open_ = false; + cb_ = nullptr; + } + + Result send(const std::vector& data, const Endpoint& destination) override { + last_send_data_ = data; + last_send_dest_ = destination; + return Result::SUCCESS; + } + + Result join_multicast(const std::string& multicast_address, + const std::string& /*interface_address*/) override { + joins_.push_back(multicast_address); + return Result::SUCCESS; + } + + Result leave_multicast(const std::string& multicast_address, + const std::string& /*interface_address*/) override { + leaves_.push_back(multicast_address); + return Result::SUCCESS; + } + + void set_receive_callback(UdpReceiveCallback callback) override { cb_ = std::move(callback); } + + Endpoint get_local_endpoint() const override { return local_; } + + void inject_receive(const std::vector& data, const Endpoint& sender) { + if (cb_) { + cb_(data, sender); + } + } + + bool is_open() const { return open_; } + + std::vector last_send_data_; + Endpoint last_send_dest_{"0.0.0.0", 0}; + std::vector joins_; + std::vector leaves_; + +private: + bool open_{false}; + Endpoint local_{"127.0.0.1", 0}; + UdpReceiveCallback cb_; + Result open_result_{Result::SUCCESS}; +}; + +Message make_sample_message() { + Message message; + message.set_service_id(0x1234); + message.set_method_id(0x5678); + message.set_client_id(0x9ABC); + message.set_session_id(0xDEF0); + message.set_protocol_version(1); + message.set_interface_version(1); + message.set_message_type(MessageType::REQUEST); + message.set_return_code(ReturnCode::E_OK); + message.set_payload({0x01, 0x02, 0x03}); + return message; +} + +} // namespace + +TEST(EventDrivenUdpTransport, ConstructionNotRunning) { + MockUdpAdapter adapter; + Endpoint local{"127.0.0.1", 0}; + EventDrivenUdpTransport transport(adapter, local); + + EXPECT_FALSE(transport.is_running()); + EXPECT_FALSE(transport.is_connected()); + EXPECT_EQ(transport.get_local_endpoint().get_address(), "127.0.0.1"); +} + +TEST(EventDrivenUdpTransport, StartStopLifecycle) { + MockUdpAdapter adapter; + EventDrivenUdpTransport transport(adapter, Endpoint{"127.0.0.1", 0}); + + EXPECT_EQ(transport.start(), Result::SUCCESS); + EXPECT_TRUE(transport.is_running()); + EXPECT_TRUE(transport.is_connected()); + EXPECT_TRUE(adapter.is_open()); + EXPECT_EQ(transport.get_local_endpoint().get_port(), 54321); + + EXPECT_EQ(transport.stop(), Result::SUCCESS); + EXPECT_FALSE(transport.is_running()); + EXPECT_FALSE(adapter.is_open()); +} + +TEST(EventDrivenUdpTransport, StartPropagatesOpenFailure) { + MockUdpAdapter adapter; + adapter.set_open_result(Result::NETWORK_ERROR); + EventDrivenUdpTransport transport(adapter, Endpoint{"127.0.0.1", 30490}); + + EXPECT_EQ(transport.start(), Result::NETWORK_ERROR); + EXPECT_FALSE(transport.is_running()); +} + +TEST(EventDrivenUdpTransport, SendForwardsToAdapter) { + MockUdpAdapter adapter; + EventDrivenUdpTransport transport(adapter, Endpoint{"127.0.0.1", 0}); + ASSERT_EQ(transport.start(), Result::SUCCESS); + + Message msg = make_sample_message(); + Endpoint dest{"127.0.0.1", 40000}; + ASSERT_EQ(transport.send_message(msg, dest), Result::SUCCESS); + + EXPECT_EQ(adapter.last_send_dest_, dest); + std::vector expected = msg.serialize(); + EXPECT_EQ(adapter.last_send_data_, expected); + + transport.stop(); +} + +TEST(EventDrivenUdpTransport, ReceiveCallbackNotifiesListener) { + MockUdpAdapter adapter; + EventDrivenUdpTransport transport(adapter, Endpoint{"127.0.0.1", 0}); + TestEventUdpListener listener; + transport.set_listener(&listener); + + ASSERT_EQ(transport.start(), Result::SUCCESS); + + Message sent = make_sample_message(); + std::vector raw = sent.serialize(); + Endpoint sender{"10.0.0.5", 5000, TransportProtocol::UDP}; + adapter.inject_receive(raw, sender); + + ASSERT_TRUE(listener.wait_for_message()); + ASSERT_EQ(listener.received_messages_.size(), 1u); + EXPECT_EQ(listener.received_messages_[0].second, sender); + EXPECT_EQ(listener.received_messages_[0].first->get_service_id(), sent.get_service_id()); + + MessagePtr queued = transport.receive_message(); + ASSERT_NE(queued, nullptr); + EXPECT_EQ(queued->get_method_id(), sent.get_method_id()); + + transport.stop(); +} + +TEST(EventDrivenUdpTransport, MalformedDatagramInvokesOnError) { + MockUdpAdapter adapter; + EventDrivenUdpTransport transport(adapter, Endpoint{"127.0.0.1", 0}); + TestEventUdpListener listener; + transport.set_listener(&listener); + + ASSERT_EQ(transport.start(), Result::SUCCESS); + + adapter.inject_receive({0x00, 0x01}, Endpoint{"127.0.0.1", 1234}); + + ASSERT_TRUE(listener.wait_for_error()); + EXPECT_EQ(listener.last_error_.load(), Result::INVALID_MESSAGE); + + transport.stop(); +} + +TEST(EventDrivenUdpTransport, MulticastJoinLeave) { + MockUdpAdapter adapter; + EventDrivenUdpTransport transport(adapter, Endpoint{"127.0.0.1", 0}); + ASSERT_EQ(transport.start(), Result::SUCCESS); + + EXPECT_EQ(transport.join_multicast_group("224.0.0.251"), Result::SUCCESS); + EXPECT_EQ(transport.leave_multicast_group("224.0.0.251"), Result::SUCCESS); + ASSERT_EQ(adapter.joins_.size(), 1u); + ASSERT_EQ(adapter.leaves_.size(), 1u); + EXPECT_EQ(adapter.joins_[0], "224.0.0.251"); + EXPECT_EQ(adapter.leaves_[0], "224.0.0.251"); + + transport.stop(); +} + +TEST(EventDrivenUdpTransport, ConnectMulticastUdpJoinsAdapter) { + MockUdpAdapter adapter; + EventDrivenUdpTransport transport(adapter, Endpoint{"127.0.0.1", 0}); + ASSERT_EQ(transport.start(), Result::SUCCESS); + + Endpoint mc("239.0.0.1", 30490, TransportProtocol::MULTICAST_UDP); + EXPECT_EQ(transport.connect(mc), Result::SUCCESS); + ASSERT_EQ(adapter.joins_.size(), 1u); + EXPECT_EQ(adapter.joins_[0], "239.0.0.1"); + + transport.stop(); +} + +TEST(EventDrivenUdpTransport, StopClearsReceiveCallback) { + MockUdpAdapter adapter; + EventDrivenUdpTransport transport(adapter, Endpoint{"127.0.0.1", 0}); + TestEventUdpListener listener; + transport.set_listener(&listener); + + ASSERT_EQ(transport.start(), Result::SUCCESS); + transport.stop(); + + Message sent = make_sample_message(); + adapter.inject_receive(sent.serialize(), Endpoint{"127.0.0.1", 1}); + + EXPECT_FALSE(listener.wait_for_message(std::chrono::milliseconds(50))); +} From 758d08f93003f0e27fed10444e621ef9fa2b73be Mon Sep 17 00:00:00 2001 From: Vinicius Zein Date: Sat, 28 Mar 2026 20:32:50 -0400 Subject: [PATCH 2/7] fix(transport): address CodeRabbit review findings on thread safety - Make listener_ atomic in both UDP and TCP event-driven transports to prevent data races between set_listener() and callback invocations - Make server_mode_ atomic in TCP transport for thread-safe access - Move running_ = false to start of stop() in TCP transport to avoid inconsistent state window where is_running() returns true during teardown - Guard receive_buffer_.clear() in TCP disconnect handler with queue_mutex_ to prevent concurrent access with on_adapter_receive() - Add null check for allocate_message() in UDP transport to handle memory-constrained platforms gracefully (reports OUT_OF_MEMORY) - Add documentation comments for TCP transport's non-polymorphic setup methods (initialize, enable_server_mode, try_accept_connection) Made-with: Cursor --- .../transport/event_driven_tcp_transport.h | 15 +++++++--- .../transport/event_driven_udp_transport.h | 2 +- src/transport/event_driven_tcp_transport.cpp | 28 +++++++++++-------- src/transport/event_driven_udp_transport.cpp | 21 ++++++++++---- 4 files changed, 44 insertions(+), 22 deletions(-) diff --git a/include/transport/event_driven_tcp_transport.h b/include/transport/event_driven_tcp_transport.h index b7e8814ceb..82e7026237 100644 --- a/include/transport/event_driven_tcp_transport.h +++ b/include/transport/event_driven_tcp_transport.h @@ -33,6 +33,11 @@ struct EventDrivenTcpTransportConfig { /** * @brief ITransport implementation driven by an ITcpSocketAdapter. + * + * Unlike EventDrivenUdpTransport, TCP requires explicit lifecycle steps + * that are not part of the ITransport interface. Callers must use the + * concrete type to call initialize(), and optionally enable_server_mode() + * / try_accept_connection() for server-side usage, before calling start(). */ class EventDrivenTcpTransport : public ITransport { public: @@ -44,13 +49,15 @@ class EventDrivenTcpTransport : public ITransport { EventDrivenTcpTransport(const EventDrivenTcpTransport&) = delete; EventDrivenTcpTransport& operator=(const EventDrivenTcpTransport&) = delete; + /** @brief Concrete-type-only: bind the adapter to a local endpoint. */ [[nodiscard]] Result initialize(const Endpoint& local_endpoint); + /** @brief Concrete-type-only: switch to server mode after initialize(). */ [[nodiscard]] Result enable_server_mode(int backlog = 5); /** - * @brief Non-blocking accept attempt (server mode). Adapter should invoke - * connected/disconnected callbacks when the connection is ready or lost. + * @brief Concrete-type-only: non-blocking accept (server mode). + * Adapter invokes connected/disconnected callbacks on completion. */ [[nodiscard]] Result try_accept_connection(Endpoint& remote_out); @@ -75,11 +82,11 @@ class EventDrivenTcpTransport : public ITransport { EventDrivenTcpTransportConfig config_; Endpoint local_endpoint_; Endpoint connection_remote_; - ITransportListener* listener_{nullptr}; + std::atomic listener_{nullptr}; std::atomic running_{false}; std::atomic initialized_{false}; - bool server_mode_{false}; + std::atomic server_mode_{false}; std::vector receive_buffer_; std::queue> message_queue_; diff --git a/include/transport/event_driven_udp_transport.h b/include/transport/event_driven_udp_transport.h index 10c4089152..c538e2a1b6 100644 --- a/include/transport/event_driven_udp_transport.h +++ b/include/transport/event_driven_udp_transport.h @@ -69,7 +69,7 @@ class EventDrivenUdpTransport : public ITransport { EventDrivenUdpTransportConfig config_; std::atomic running_{false}; std::atomic opened_{false}; - ITransportListener* listener_{nullptr}; + std::atomic listener_{nullptr}; std::queue receive_queue_; platform::Mutex queue_mutex_; diff --git a/src/transport/event_driven_tcp_transport.cpp b/src/transport/event_driven_tcp_transport.cpp index a6b84c299f..7657b684e0 100644 --- a/src/transport/event_driven_tcp_transport.cpp +++ b/src/transport/event_driven_tcp_transport.cpp @@ -129,7 +129,7 @@ Endpoint EventDrivenTcpTransport::get_local_endpoint() const { } void EventDrivenTcpTransport::set_listener(ITransportListener* listener) { - listener_ = listener; + listener_.store(listener, std::memory_order_release); } Result EventDrivenTcpTransport::start() { @@ -149,7 +149,8 @@ Result EventDrivenTcpTransport::start() { } Result EventDrivenTcpTransport::stop() { - listener_ = nullptr; + running_ = false; + listener_.store(nullptr, std::memory_order_release); adapter_.set_receive_callback(nullptr); adapter_.set_connected_callback(nullptr); @@ -158,16 +159,15 @@ Result EventDrivenTcpTransport::stop() { adapter_.close(); initialized_ = false; server_mode_ = false; - receive_buffer_.clear(); { platform::ScopedLock lock(queue_mutex_); + receive_buffer_.clear(); while (!message_queue_.empty()) { message_queue_.pop(); } } - running_ = false; return Result::SUCCESS; } @@ -191,9 +191,10 @@ void EventDrivenTcpTransport::on_adapter_receive(const std::vector& dat } } + auto* cb = listener_.load(std::memory_order_acquire); for (const MessagePtr& m : delivered) { - if (listener_) { - listener_->on_message_received(m, connection_remote_); + if (cb) { + cb->on_message_received(m, connection_remote_); } } } @@ -203,17 +204,22 @@ void EventDrivenTcpTransport::on_adapter_connected(const Endpoint& remote) { return; } connection_remote_ = remote; - if (listener_) { - listener_->on_connection_established(remote); + auto* cb = listener_.load(std::memory_order_acquire); + if (cb) { + cb->on_connection_established(remote); } } void EventDrivenTcpTransport::on_adapter_disconnected() { - receive_buffer_.clear(); Endpoint lost = connection_remote_; initialized_ = false; - if (listener_) { - listener_->on_connection_lost(lost); + { + platform::ScopedLock lock(queue_mutex_); + receive_buffer_.clear(); + } + auto* cb = listener_.load(std::memory_order_acquire); + if (cb) { + cb->on_connection_lost(lost); } } diff --git a/src/transport/event_driven_udp_transport.cpp b/src/transport/event_driven_udp_transport.cpp index 1f54f5e37c..9876db5e68 100644 --- a/src/transport/event_driven_udp_transport.cpp +++ b/src/transport/event_driven_udp_transport.cpp @@ -94,7 +94,7 @@ Endpoint EventDrivenUdpTransport::get_local_endpoint() const { } void EventDrivenUdpTransport::set_listener(ITransportListener* listener) { - listener_ = listener; + listener_.store(listener, std::memory_order_release); } Result EventDrivenUdpTransport::start() { @@ -124,7 +124,7 @@ Result EventDrivenUdpTransport::stop() { } running_ = false; - listener_ = nullptr; + listener_.store(nullptr, std::memory_order_release); adapter_.set_receive_callback(nullptr); adapter_.close(); opened_ = false; @@ -161,9 +161,17 @@ void EventDrivenUdpTransport::on_adapter_receive(const std::vector& dat } MessagePtr message = platform::allocate_message(); + if (!message) { + auto* cb = listener_.load(std::memory_order_acquire); + if (cb) { + cb->on_error(Result::OUT_OF_MEMORY); + } + return; + } if (!message->deserialize(data)) { - if (listener_) { - listener_->on_error(Result::INVALID_MESSAGE); + auto* cb = listener_.load(std::memory_order_acquire); + if (cb) { + cb->on_error(Result::INVALID_MESSAGE); } return; } @@ -173,8 +181,9 @@ void EventDrivenUdpTransport::on_adapter_receive(const std::vector& dat receive_queue_.push(message); } - if (listener_) { - listener_->on_message_received(message, sender); + auto* cb = listener_.load(std::memory_order_acquire); + if (cb) { + cb->on_message_received(message, sender); } } From b77786e07fee3fff89ad55beb9b4ccf294e11e97 Mon Sep 17 00:00:00 2001 From: Vinicius Zein Date: Sat, 28 Mar 2026 22:24:08 -0400 Subject: [PATCH 3/7] fix(transport): address second round of CodeRabbit review findings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Preserve listener_ across stop()/start() cycles in both UDP and TCP transports — running_ = false already suppresses post-stop delivery - Guard receive_buffer_.clear() in disconnect() with queue_mutex_ to prevent concurrent access with on_adapter_receive() - Don't reset initialized_ in on_adapter_disconnected() — initialized_ reflects local socket state, not peer connection; allows reconnect without requiring re-initialize() - Fix TCP stream resync heuristic: start scanning at offset 1 (not SOMEIP_HEADER_SIZE) and validate the candidate length field instead of accepting any non-zero service ID Made-with: Cursor --- src/transport/event_driven_tcp_transport.cpp | 22 +++++++++++--------- src/transport/event_driven_udp_transport.cpp | 1 - 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/src/transport/event_driven_tcp_transport.cpp b/src/transport/event_driven_tcp_transport.cpp index 7657b684e0..86f6c9996a 100644 --- a/src/transport/event_driven_tcp_transport.cpp +++ b/src/transport/event_driven_tcp_transport.cpp @@ -112,7 +112,10 @@ Result EventDrivenTcpTransport::disconnect() { return Result::SUCCESS; } adapter_.close(); - receive_buffer_.clear(); + { + platform::ScopedLock lock(queue_mutex_); + receive_buffer_.clear(); + } initialized_ = false; return Result::SUCCESS; } @@ -150,7 +153,6 @@ Result EventDrivenTcpTransport::start() { Result EventDrivenTcpTransport::stop() { running_ = false; - listener_.store(nullptr, std::memory_order_release); adapter_.set_receive_callback(nullptr); adapter_.set_connected_callback(nullptr); @@ -212,7 +214,6 @@ void EventDrivenTcpTransport::on_adapter_connected(const Endpoint& remote) { void EventDrivenTcpTransport::on_adapter_disconnected() { Endpoint lost = connection_remote_; - initialized_ = false; { platform::ScopedLock lock(queue_mutex_); receive_buffer_.clear(); @@ -238,20 +239,21 @@ bool EventDrivenTcpTransport::parse_message_from_buffer(std::vector& bu (static_cast(buffer[6]) << 8) | static_cast(buffer[7]); if (length_from_client_id < 8 || length_from_client_id > MAX_MESSAGE_SIZE) { - size_t search_start = SOMEIP_HEADER_SIZE; + size_t search_start = 1; bool found_valid_header = false; while (search_start + SOMEIP_HEADER_SIZE <= buffer.size()) { - uint32_t potential_msg_id = (static_cast(buffer[search_start]) << 24) | - (static_cast(buffer[search_start + 1]) << 16) | - (static_cast(buffer[search_start + 2]) << 8) | - static_cast(buffer[search_start + 3]); - if (potential_msg_id != 0) { + uint32_t candidate_length = + (static_cast(buffer[search_start + 4]) << 24) | + (static_cast(buffer[search_start + 5]) << 16) | + (static_cast(buffer[search_start + 6]) << 8) | + static_cast(buffer[search_start + 7]); + if (candidate_length >= 8 && candidate_length <= MAX_MESSAGE_SIZE) { buffer.erase(buffer.begin(), buffer.begin() + static_cast(search_start)); found_valid_header = true; break; } - search_start++; + ++search_start; } if (!found_valid_header) { diff --git a/src/transport/event_driven_udp_transport.cpp b/src/transport/event_driven_udp_transport.cpp index 9876db5e68..4eda96ddfd 100644 --- a/src/transport/event_driven_udp_transport.cpp +++ b/src/transport/event_driven_udp_transport.cpp @@ -124,7 +124,6 @@ Result EventDrivenUdpTransport::stop() { } running_ = false; - listener_.store(nullptr, std::memory_order_release); adapter_.set_receive_callback(nullptr); adapter_.close(); opened_ = false; From c715a487ca93078f67e3c27faa220ffe42522fcf Mon Sep 17 00:00:00 2001 From: Vinicius Zein Date: Sat, 28 Mar 2026 22:40:42 -0400 Subject: [PATCH 4/7] fix(transport): address third round of CodeRabbit review findings - Document callback quiescence guarantee on adapter interfaces: after set_*_callback(nullptr) returns, no in-flight callbacks may execute - Add running_ guard to on_adapter_disconnected() in TCP transport - Make parse_message_from_buffer() loop internally after resync so valid frames buffered behind discarded bytes are not deferred - Enforce max_message_size limit in UDP send_message() instead of leaving an empty conditional block - Use explicit .load() on atomic reads in is_running() for consistency Made-with: Cursor --- include/transport/tcp_socket_adapter.h | 6 ++ include/transport/udp_socket_adapter.h | 4 + src/transport/event_driven_tcp_transport.cpp | 90 ++++++++++---------- src/transport/event_driven_udp_transport.cpp | 4 +- 4 files changed, 59 insertions(+), 45 deletions(-) diff --git a/include/transport/tcp_socket_adapter.h b/include/transport/tcp_socket_adapter.h index 6d35c49ec7..af842777a0 100644 --- a/include/transport/tcp_socket_adapter.h +++ b/include/transport/tcp_socket_adapter.h @@ -75,6 +75,12 @@ class ITcpSocketAdapter { */ [[nodiscard]] virtual Result send(const std::vector& data) = 0; + /** + * Quiescence guarantee: after any set_*_callback(nullptr) returns, the + * adapter must not invoke that previously registered callback. + * Implementations must ensure no in-flight callback is executing when + * the setter returns. + */ virtual void set_receive_callback(TcpReceiveCallback callback) = 0; virtual void set_connected_callback(TcpConnectedCallback callback) = 0; virtual void set_disconnected_callback(TcpDisconnectedCallback callback) = 0; diff --git a/include/transport/udp_socket_adapter.h b/include/transport/udp_socket_adapter.h index 36b285314e..94028b7760 100644 --- a/include/transport/udp_socket_adapter.h +++ b/include/transport/udp_socket_adapter.h @@ -75,6 +75,10 @@ class IUdpSocketAdapter { * * The adapter must invoke the callback for each received datagram from the * integrator's event loop or I/O thread. + * + * Quiescence guarantee: after set_receive_callback(nullptr) returns, the + * adapter must not invoke any previously registered callback. Implementations + * must ensure no in-flight callback is executing when the setter returns. */ virtual void set_receive_callback(UdpReceiveCallback callback) = 0; diff --git a/src/transport/event_driven_tcp_transport.cpp b/src/transport/event_driven_tcp_transport.cpp index 86f6c9996a..e5e6239515 100644 --- a/src/transport/event_driven_tcp_transport.cpp +++ b/src/transport/event_driven_tcp_transport.cpp @@ -174,7 +174,7 @@ Result EventDrivenTcpTransport::stop() { } bool EventDrivenTcpTransport::is_running() const { - return running_; + return running_.load(); } void EventDrivenTcpTransport::on_adapter_receive(const std::vector& data) { @@ -213,6 +213,9 @@ void EventDrivenTcpTransport::on_adapter_connected(const Endpoint& remote) { } void EventDrivenTcpTransport::on_adapter_disconnected() { + if (!running_.load()) { + return; + } Endpoint lost = connection_remote_; { platform::ScopedLock lock(queue_mutex_); @@ -225,58 +228,59 @@ void EventDrivenTcpTransport::on_adapter_disconnected() { } bool EventDrivenTcpTransport::parse_message_from_buffer(std::vector& buffer, MessagePtr& message) { - if (buffer.size() > config_.max_receive_buffer) { - buffer.clear(); - return false; - } + for (;;) { + if (buffer.size() > config_.max_receive_buffer) { + buffer.clear(); + return false; + } - if (buffer.size() < SOMEIP_HEADER_SIZE) { - return false; - } + if (buffer.size() < SOMEIP_HEADER_SIZE) { + return false; + } - uint32_t length_from_client_id = - (static_cast(buffer[4]) << 24) | (static_cast(buffer[5]) << 16) | - (static_cast(buffer[6]) << 8) | static_cast(buffer[7]); - - if (length_from_client_id < 8 || length_from_client_id > MAX_MESSAGE_SIZE) { - size_t search_start = 1; - bool found_valid_header = false; - - while (search_start + SOMEIP_HEADER_SIZE <= buffer.size()) { - uint32_t candidate_length = - (static_cast(buffer[search_start + 4]) << 24) | - (static_cast(buffer[search_start + 5]) << 16) | - (static_cast(buffer[search_start + 6]) << 8) | - static_cast(buffer[search_start + 7]); - if (candidate_length >= 8 && candidate_length <= MAX_MESSAGE_SIZE) { - buffer.erase(buffer.begin(), buffer.begin() + static_cast(search_start)); - found_valid_header = true; - break; + uint32_t length_from_client_id = + (static_cast(buffer[4]) << 24) | (static_cast(buffer[5]) << 16) | + (static_cast(buffer[6]) << 8) | static_cast(buffer[7]); + + if (length_from_client_id < 8 || length_from_client_id > MAX_MESSAGE_SIZE) { + size_t search_start = 1; + bool found_valid_header = false; + + while (search_start + SOMEIP_HEADER_SIZE <= buffer.size()) { + uint32_t candidate_length = + (static_cast(buffer[search_start + 4]) << 24) | + (static_cast(buffer[search_start + 5]) << 16) | + (static_cast(buffer[search_start + 6]) << 8) | + static_cast(buffer[search_start + 7]); + if (candidate_length >= 8 && candidate_length <= MAX_MESSAGE_SIZE) { + buffer.erase(buffer.begin(), buffer.begin() + static_cast(search_start)); + found_valid_header = true; + break; + } + ++search_start; } - ++search_start; - } - if (!found_valid_header) { - buffer.clear(); + if (!found_valid_header) { + buffer.clear(); + return false; + } + continue; } - return false; - } - size_t total_message_size = 8 + length_from_client_id; + size_t total_message_size = 8 + length_from_client_id; - if (buffer.size() < total_message_size) { - return false; - } + if (buffer.size() < total_message_size) { + return false; + } - std::vector message_data(buffer.begin(), buffer.begin() + static_cast(total_message_size)); - buffer.erase(buffer.begin(), buffer.begin() + static_cast(total_message_size)); + std::vector message_data(buffer.begin(), buffer.begin() + static_cast(total_message_size)); + buffer.erase(buffer.begin(), buffer.begin() + static_cast(total_message_size)); - message = platform::allocate_message(); - if (message && message->deserialize(message_data)) { - return true; + message = platform::allocate_message(); + if (message && message->deserialize(message_data)) { + return true; + } } - - return false; } } // namespace transport diff --git a/src/transport/event_driven_udp_transport.cpp b/src/transport/event_driven_udp_transport.cpp index 4eda96ddfd..ed525e6400 100644 --- a/src/transport/event_driven_udp_transport.cpp +++ b/src/transport/event_driven_udp_transport.cpp @@ -49,7 +49,7 @@ Result EventDrivenUdpTransport::send_message(const Message& message, const Endpo return Result::BUFFER_OVERFLOW; } if (config_.max_message_size > 0 && data.size() > config_.max_message_size) { - // Same policy as UdpTransport: warn path reserved for TP segmentation + return Result::BUFFER_OVERFLOW; } return adapter_.send(data, endpoint); @@ -137,7 +137,7 @@ Result EventDrivenUdpTransport::stop() { } bool EventDrivenUdpTransport::is_running() const { - return running_; + return running_.load(); } Result EventDrivenUdpTransport::join_multicast_group(const std::string& multicast_address) { From e539a78f5f186b10d5703ca7caabdb0380cc4119 Mon Sep 17 00:00:00 2001 From: Vinicius Zein Date: Sat, 28 Mar 2026 22:49:45 -0400 Subject: [PATCH 5/7] refactor(transport): rename misleading length_from_client_id to message_length The variable reads the SOME/IP Length field at header offset 4-7, which is unrelated to Client ID. Rename for clarity. Made-with: Cursor --- src/transport/event_driven_tcp_transport.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/transport/event_driven_tcp_transport.cpp b/src/transport/event_driven_tcp_transport.cpp index e5e6239515..352d5da5c2 100644 --- a/src/transport/event_driven_tcp_transport.cpp +++ b/src/transport/event_driven_tcp_transport.cpp @@ -238,11 +238,11 @@ bool EventDrivenTcpTransport::parse_message_from_buffer(std::vector& bu return false; } - uint32_t length_from_client_id = + uint32_t message_length = (static_cast(buffer[4]) << 24) | (static_cast(buffer[5]) << 16) | (static_cast(buffer[6]) << 8) | static_cast(buffer[7]); - if (length_from_client_id < 8 || length_from_client_id > MAX_MESSAGE_SIZE) { + if (message_length < 8 || message_length > MAX_MESSAGE_SIZE) { size_t search_start = 1; bool found_valid_header = false; @@ -267,7 +267,7 @@ bool EventDrivenTcpTransport::parse_message_from_buffer(std::vector& bu continue; } - size_t total_message_size = 8 + length_from_client_id; + size_t total_message_size = 8 + message_length; if (buffer.size() < total_message_size) { return false; From 8f6da44d5bc885e0197c9d6fb4b80f1467a3cd06 Mon Sep 17 00:00:00 2001 From: Vinicius Zein Date: Sun, 29 Mar 2026 12:31:34 -0400 Subject: [PATCH 6/7] fix(transport): address remaining CodeRabbit review findings - Add IMulticastTransport interface so SD code can work with both UdpTransport and EventDrivenUdpTransport via a common abstraction instead of downcasting to concrete UdpTransport - Add is_valid() to EventDrivenUdpTransport for no-exception builds where constructor cannot signal invalid endpoint - Add mutex-guarded accessors to test listener to prevent future flaky tests from direct unlocked access to shared state - Add comprehensive EventDrivenTcpTransport test suite with 15 tests covering lifecycle, send/receive, fragmented messages, server mode, connection callbacks, and listener persistence across stop/start Made-with: Cursor --- .../transport/event_driven_udp_transport.h | 10 +- include/transport/multicast_transport.h | 41 ++ include/transport/udp_transport.h | 9 +- src/sd/sd_client.cpp | 13 +- src/sd/sd_server.cpp | 13 +- tests/CMakeLists.txt | 8 +- tests/test_event_driven_tcp_transport.cpp | 482 ++++++++++++++++++ tests/test_event_driven_udp_transport.cpp | 30 +- 8 files changed, 581 insertions(+), 25 deletions(-) create mode 100644 include/transport/multicast_transport.h create mode 100644 tests/test_event_driven_tcp_transport.cpp diff --git a/include/transport/event_driven_udp_transport.h b/include/transport/event_driven_udp_transport.h index c538e2a1b6..c584d518e4 100644 --- a/include/transport/event_driven_udp_transport.h +++ b/include/transport/event_driven_udp_transport.h @@ -15,6 +15,7 @@ #define SOMEIP_TRANSPORT_EVENT_DRIVEN_UDP_TRANSPORT_H #include "transport/transport.h" +#include "transport/multicast_transport.h" #include "transport/udp_socket_adapter.h" #include "platform/thread.h" #include @@ -35,7 +36,7 @@ struct EventDrivenUdpTransportConfig { /** * @brief ITransport implementation driven by an IUdpSocketAdapter. */ -class EventDrivenUdpTransport : public ITransport { +class EventDrivenUdpTransport : public ITransport, public IMulticastTransport { public: explicit EventDrivenUdpTransport(IUdpSocketAdapter& adapter, const Endpoint& local_endpoint, @@ -46,6 +47,9 @@ class EventDrivenUdpTransport : public ITransport { EventDrivenUdpTransport(const EventDrivenUdpTransport&) = delete; EventDrivenUdpTransport& operator=(const EventDrivenUdpTransport&) = delete; + /** @brief Check whether the transport was constructed with a valid endpoint. */ + [[nodiscard]] bool is_valid() const { return local_endpoint_.is_valid(); } + [[nodiscard]] Result send_message(const Message& message, const Endpoint& endpoint) override; MessagePtr receive_message() override; Result connect(const Endpoint& endpoint) override; @@ -57,8 +61,8 @@ class EventDrivenUdpTransport : public ITransport { Result stop() override; bool is_running() const override; - Result join_multicast_group(const std::string& multicast_address); - Result leave_multicast_group(const std::string& multicast_address); + Result join_multicast_group(const std::string& multicast_address) override; + Result leave_multicast_group(const std::string& multicast_address) override; private: void on_adapter_receive(const std::vector& data, const Endpoint& sender); diff --git a/include/transport/multicast_transport.h b/include/transport/multicast_transport.h new file mode 100644 index 0000000000..73f603b761 --- /dev/null +++ b/include/transport/multicast_transport.h @@ -0,0 +1,41 @@ +/******************************************************************************** + * Copyright (c) 2025 Vinicius Tadeu Zein + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#ifndef SOMEIP_TRANSPORT_MULTICAST_TRANSPORT_H +#define SOMEIP_TRANSPORT_MULTICAST_TRANSPORT_H + +#include "common/result.h" +#include + +namespace someip { +namespace transport { + +/** + * @brief Interface for transports that support IPv4 multicast group management. + * + * Both UdpTransport (BSD sockets) and EventDrivenUdpTransport (adapter-based) + * implement this so that SD and other callers can join/leave multicast groups + * without knowing the concrete transport type. + */ +class IMulticastTransport { +public: + virtual ~IMulticastTransport() = default; + + [[nodiscard]] virtual Result join_multicast_group(const std::string& multicast_address) = 0; + [[nodiscard]] virtual Result leave_multicast_group(const std::string& multicast_address) = 0; +}; + +} // namespace transport +} // namespace someip + +#endif // SOMEIP_TRANSPORT_MULTICAST_TRANSPORT_H diff --git a/include/transport/udp_transport.h b/include/transport/udp_transport.h index d4b1d1a71e..51b5279fff 100644 --- a/include/transport/udp_transport.h +++ b/include/transport/udp_transport.h @@ -15,6 +15,7 @@ #define SOMEIP_TRANSPORT_UDP_TRANSPORT_H #include "transport/transport.h" +#include "transport/multicast_transport.h" #include "platform/net.h" #include "platform/thread.h" #include @@ -53,7 +54,7 @@ struct UdpTransportConfig { * - Blocking mode (default): More efficient, eliminates busy loops * - Non-blocking mode: Allows integration with event loops/polling */ -class UdpTransport : public ITransport { +class UdpTransport : public ITransport, public IMulticastTransport { public: /** * @brief Constructor @@ -80,9 +81,9 @@ class UdpTransport : public ITransport { Result stop() override; bool is_running() const override; - // Multicast support - Result join_multicast_group(const std::string& multicast_address); - Result leave_multicast_group(const std::string& multicast_address); + // IMulticastTransport + Result join_multicast_group(const std::string& multicast_address) override; + Result leave_multicast_group(const std::string& multicast_address) override; private: Endpoint local_endpoint_; diff --git a/src/sd/sd_client.cpp b/src/sd/sd_client.cpp index 7270df4935..6c627cbd39 100644 --- a/src/sd/sd_client.cpp +++ b/src/sd/sd_client.cpp @@ -13,6 +13,7 @@ #include "sd/sd_client.h" #include "sd/sd_message.h" +#include "transport/multicast_transport.h" #include "transport/udp_transport.h" #include "transport/endpoint.h" #include "transport/transport.h" @@ -267,18 +268,18 @@ class SdClientImpl : public transport::ITransportListener { }; bool join_multicast_group() { - auto udp_transport = std::dynamic_pointer_cast(transport_); - if (!udp_transport) { + auto multicast = std::dynamic_pointer_cast(transport_); + if (!multicast) { return false; } - return udp_transport->join_multicast_group(config_.multicast_address) == Result::SUCCESS; + return multicast->join_multicast_group(config_.multicast_address) == Result::SUCCESS; } void leave_multicast_group() { - auto udp_transport = std::dynamic_pointer_cast(transport_); - if (udp_transport) { - udp_transport->leave_multicast_group(config_.multicast_address); + auto multicast = std::dynamic_pointer_cast(transport_); + if (multicast) { + multicast->leave_multicast_group(config_.multicast_address); } } diff --git a/src/sd/sd_server.cpp b/src/sd/sd_server.cpp index 58d68dfac4..ba550e9d1b 100644 --- a/src/sd/sd_server.cpp +++ b/src/sd/sd_server.cpp @@ -13,6 +13,7 @@ #include "sd/sd_server.h" #include "sd/sd_message.h" +#include "transport/multicast_transport.h" #include "transport/udp_transport.h" #include "transport/endpoint.h" #include "transport/transport.h" @@ -265,18 +266,18 @@ class SdServerImpl : public transport::ITransportListener { }; bool join_multicast_group() { - auto udp_transport = std::dynamic_pointer_cast(transport_); - if (!udp_transport) { + auto multicast = std::dynamic_pointer_cast(transport_); + if (!multicast) { return false; } - return udp_transport->join_multicast_group(config_.multicast_address) == Result::SUCCESS; + return multicast->join_multicast_group(config_.multicast_address) == Result::SUCCESS; } void leave_multicast_group() { - auto udp_transport = std::dynamic_pointer_cast(transport_); - if (udp_transport) { - udp_transport->leave_multicast_group(config_.multicast_address); + auto multicast = std::dynamic_pointer_cast(transport_); + if (multicast) { + multicast->leave_multicast_group(config_.multicast_address); } } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 786eb9357d..a7b7122d2e 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -38,6 +38,10 @@ target_link_libraries(test_udp_transport someip-transport gtest_main) add_executable(test_event_driven_udp_transport test_event_driven_udp_transport.cpp) target_link_libraries(test_event_driven_udp_transport someip-transport gtest_main) +# Event-driven TCP transport tests (mock adapter, no real sockets) +add_executable(test_event_driven_tcp_transport test_event_driven_tcp_transport.cpp) +target_link_libraries(test_event_driven_tcp_transport someip-transport gtest_main) + # TP tests add_executable(test_tp test_tp.cpp) target_link_libraries(test_tp someip-tp gtest_main) @@ -123,6 +127,7 @@ target_link_libraries(test_e2e someip-core gtest_main) add_test(NAME TcpTransportTest COMMAND test_tcp_transport) add_test(NAME UdpTransportTest COMMAND test_udp_transport) add_test(NAME EventDrivenUdpTransportTest COMMAND test_event_driven_udp_transport) + add_test(NAME EventDrivenTcpTransportTest COMMAND test_event_driven_tcp_transport) add_test(NAME TpTest COMMAND test_tp) add_test(NAME E2ETest COMMAND test_e2e) @@ -130,7 +135,8 @@ target_link_libraries(test_e2e someip-core gtest_main) set(_ALL_TESTS PlatformThreadingTest SerializationTest MessageTest SessionManagerTest EndpointTest RpcTest SdTest EventsTest - TcpTransportTest UdpTransportTest EventDrivenUdpTransportTest TpTest E2ETest + TcpTransportTest UdpTransportTest EventDrivenUdpTransportTest + EventDrivenTcpTransportTest TpTest E2ETest ) if(NOT WIN32) list(APPEND _ALL_TESTS diff --git a/tests/test_event_driven_tcp_transport.cpp b/tests/test_event_driven_tcp_transport.cpp new file mode 100644 index 0000000000..5310f520e4 --- /dev/null +++ b/tests/test_event_driven_tcp_transport.cpp @@ -0,0 +1,482 @@ +/******************************************************************************** + * Copyright (c) 2025 Vinicius Tadeu Zein + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace someip; +using namespace someip::transport; + +namespace { + +class TestEventTcpListener : public ITransportListener { +public: + void on_message_received(MessagePtr message, const Endpoint& sender) override { + std::scoped_lock lock(mutex_); + received_messages_.push_back({std::move(message), sender}); + cv_.notify_one(); + } + + void on_connection_lost(const Endpoint& endpoint) override { + std::scoped_lock lock(mutex_); + lost_endpoint_ = endpoint; + connection_lost_count_++; + cv_.notify_one(); + } + + void on_connection_established(const Endpoint& endpoint) override { + std::scoped_lock lock(mutex_); + established_endpoint_ = endpoint; + connection_established_count_++; + cv_.notify_one(); + } + + void on_error(Result error) override { + std::scoped_lock lock(mutex_); + last_error_ = error; + error_count_++; + cv_.notify_one(); + } + + bool wait_for_message(std::chrono::milliseconds timeout = std::chrono::milliseconds(500)) { + std::unique_lock lock(mutex_); + return cv_.wait_for(lock, timeout, [this]() { return !received_messages_.empty(); }); + } + + bool wait_for_connection(std::chrono::milliseconds timeout = std::chrono::milliseconds(500)) { + std::unique_lock lock(mutex_); + return cv_.wait_for(lock, timeout, [this]() { return connection_established_count_ > 0; }); + } + + bool wait_for_disconnect(std::chrono::milliseconds timeout = std::chrono::milliseconds(500)) { + std::unique_lock lock(mutex_); + return cv_.wait_for(lock, timeout, [this]() { return connection_lost_count_ > 0; }); + } + + size_t message_count() const { + std::scoped_lock lock(mutex_); + return received_messages_.size(); + } + + uint16_t message_service_id(size_t index) const { + std::scoped_lock lock(mutex_); + return received_messages_.at(index).first->get_service_id(); + } + + std::atomic connection_established_count_{0}; + std::atomic connection_lost_count_{0}; + std::atomic last_error_{Result::SUCCESS}; + std::atomic error_count_{0}; + Endpoint established_endpoint_{"0.0.0.0", 0}; + Endpoint lost_endpoint_{"0.0.0.0", 0}; + +private: + mutable std::mutex mutex_; + std::condition_variable cv_; + std::vector> received_messages_; +}; + +class MockTcpAdapter : public ITcpSocketAdapter { +public: + Result open(const Endpoint& local_endpoint) override { + if (open_result_ != Result::SUCCESS) { + return open_result_; + } + open_ = true; + local_ = local_endpoint; + if (local_.get_port() == 0) { + local_.set_port(54321); + } + return Result::SUCCESS; + } + + void close() override { + open_ = false; + connected_ = false; + receive_cb_ = nullptr; + connected_cb_ = nullptr; + disconnected_cb_ = nullptr; + } + + Result listen(int /*backlog*/) override { + if (!open_) { + return Result::INVALID_STATE; + } + listening_ = true; + return Result::SUCCESS; + } + + Result connect(const Endpoint& remote_endpoint) override { + if (connect_result_ != Result::SUCCESS) { + return connect_result_; + } + connected_ = true; + remote_ = remote_endpoint; + if (connected_cb_) { + connected_cb_(remote_endpoint); + } + return Result::SUCCESS; + } + + Result accept(Endpoint& remote_out) override { + if (!listening_) { + return Result::INVALID_STATE; + } + if (!pending_connection_) { + return Result::TIMEOUT; + } + remote_out = pending_remote_; + connected_ = true; + pending_connection_ = false; + return Result::SUCCESS; + } + + Result send(const std::vector& data) override { + if (!connected_) { + return Result::NOT_CONNECTED; + } + last_send_data_ = data; + return Result::SUCCESS; + } + + void set_receive_callback(TcpReceiveCallback callback) override { + receive_cb_ = std::move(callback); + } + + void set_connected_callback(TcpConnectedCallback callback) override { + connected_cb_ = std::move(callback); + } + + void set_disconnected_callback(TcpDisconnectedCallback callback) override { + disconnected_cb_ = std::move(callback); + } + + Endpoint get_local_endpoint() const override { return local_; } + bool is_connected() const override { return connected_; } + + void inject_receive(const std::vector& data) { + if (receive_cb_) { + receive_cb_(data); + } + } + + void inject_connected(const Endpoint& remote) { + connected_ = true; + remote_ = remote; + if (connected_cb_) { + connected_cb_(remote); + } + } + + void inject_disconnected() { + connected_ = false; + if (disconnected_cb_) { + disconnected_cb_(); + } + } + + void set_open_result(Result r) { open_result_ = r; } + void set_connect_result(Result r) { connect_result_ = r; } + + void stage_pending_connection(const Endpoint& remote) { + pending_connection_ = true; + pending_remote_ = remote; + } + + bool is_open() const { return open_; } + std::vector last_send_data_; + +private: + bool open_{false}; + bool connected_{false}; + bool listening_{false}; + bool pending_connection_{false}; + Endpoint local_{"127.0.0.1", 0}; + Endpoint remote_{"0.0.0.0", 0}; + Endpoint pending_remote_{"0.0.0.0", 0}; + TcpReceiveCallback receive_cb_; + TcpConnectedCallback connected_cb_; + TcpDisconnectedCallback disconnected_cb_; + Result open_result_{Result::SUCCESS}; + Result connect_result_{Result::SUCCESS}; +}; + +Message make_tcp_sample_message() { + Message message; + message.set_service_id(0x1234); + message.set_method_id(0x5678); + message.set_client_id(0x9ABC); + message.set_session_id(0xDEF0); + message.set_protocol_version(1); + message.set_interface_version(1); + message.set_message_type(MessageType::REQUEST); + message.set_return_code(ReturnCode::E_OK); + message.set_payload({0x01, 0x02, 0x03}); + return message; +} + +} // namespace + +TEST(EventDrivenTcpTransport, ConstructionNotRunning) { + MockTcpAdapter adapter; + EventDrivenTcpTransport transport(adapter); + + EXPECT_FALSE(transport.is_running()); + EXPECT_FALSE(transport.is_connected()); +} + +TEST(EventDrivenTcpTransport, InitializeAndStart) { + MockTcpAdapter adapter; + EventDrivenTcpTransport transport(adapter); + + Endpoint local{"127.0.0.1", 30490, TransportProtocol::TCP}; + EXPECT_EQ(transport.initialize(local), Result::SUCCESS); + EXPECT_EQ(transport.start(), Result::SUCCESS); + EXPECT_TRUE(transport.is_running()); + EXPECT_EQ(transport.get_local_endpoint().get_port(), 30490); + + transport.stop(); + EXPECT_FALSE(transport.is_running()); +} + +TEST(EventDrivenTcpTransport, StartWithoutInitializeFails) { + MockTcpAdapter adapter; + EventDrivenTcpTransport transport(adapter); + + EXPECT_EQ(transport.start(), Result::NOT_INITIALIZED); + EXPECT_FALSE(transport.is_running()); +} + +TEST(EventDrivenTcpTransport, InitializePropagatesOpenFailure) { + MockTcpAdapter adapter; + adapter.set_open_result(Result::NETWORK_ERROR); + EventDrivenTcpTransport transport(adapter); + + EXPECT_EQ(transport.initialize(Endpoint{"127.0.0.1", 30490}), Result::NETWORK_ERROR); +} + +TEST(EventDrivenTcpTransport, DoubleInitializeFails) { + MockTcpAdapter adapter; + EventDrivenTcpTransport transport(adapter); + + EXPECT_EQ(transport.initialize(Endpoint{"127.0.0.1", 0}), Result::SUCCESS); + EXPECT_EQ(transport.initialize(Endpoint{"127.0.0.1", 0}), Result::INVALID_STATE); +} + +TEST(EventDrivenTcpTransport, ConnectCallbackNotifiesListener) { + MockTcpAdapter adapter; + EventDrivenTcpTransport transport(adapter); + TestEventTcpListener listener; + transport.set_listener(&listener); + + ASSERT_EQ(transport.initialize(Endpoint{"127.0.0.1", 0}), Result::SUCCESS); + ASSERT_EQ(transport.start(), Result::SUCCESS); + + Endpoint remote{"10.0.0.1", 5000, TransportProtocol::TCP}; + adapter.inject_connected(remote); + + ASSERT_TRUE(listener.wait_for_connection()); + EXPECT_EQ(listener.connection_established_count_.load(), 1); + + transport.stop(); +} + +TEST(EventDrivenTcpTransport, DisconnectCallbackNotifiesListener) { + MockTcpAdapter adapter; + EventDrivenTcpTransport transport(adapter); + TestEventTcpListener listener; + transport.set_listener(&listener); + + ASSERT_EQ(transport.initialize(Endpoint{"127.0.0.1", 0}), Result::SUCCESS); + ASSERT_EQ(transport.start(), Result::SUCCESS); + + Endpoint remote{"10.0.0.1", 5000, TransportProtocol::TCP}; + adapter.inject_connected(remote); + ASSERT_TRUE(listener.wait_for_connection()); + + adapter.inject_disconnected(); + ASSERT_TRUE(listener.wait_for_disconnect()); + EXPECT_EQ(listener.connection_lost_count_.load(), 1); + + transport.stop(); +} + +TEST(EventDrivenTcpTransport, SendForwardsToAdapter) { + MockTcpAdapter adapter; + EventDrivenTcpTransport transport(adapter); + + ASSERT_EQ(transport.initialize(Endpoint{"127.0.0.1", 0}), Result::SUCCESS); + ASSERT_EQ(transport.start(), Result::SUCCESS); + + adapter.inject_connected(Endpoint{"10.0.0.1", 5000, TransportProtocol::TCP}); + + Message msg = make_tcp_sample_message(); + Endpoint dest{"10.0.0.1", 5000}; + ASSERT_EQ(transport.send_message(msg, dest), Result::SUCCESS); + + std::vector expected = msg.serialize(); + EXPECT_EQ(adapter.last_send_data_, expected); + + transport.stop(); +} + +TEST(EventDrivenTcpTransport, SendWhenNotConnectedFails) { + MockTcpAdapter adapter; + EventDrivenTcpTransport transport(adapter); + + ASSERT_EQ(transport.initialize(Endpoint{"127.0.0.1", 0}), Result::SUCCESS); + ASSERT_EQ(transport.start(), Result::SUCCESS); + + Message msg = make_tcp_sample_message(); + EXPECT_EQ(transport.send_message(msg, Endpoint{"10.0.0.1", 5000}), Result::NOT_CONNECTED); + + transport.stop(); +} + +TEST(EventDrivenTcpTransport, ReceiveCallbackReassemblesMessage) { + MockTcpAdapter adapter; + EventDrivenTcpTransport transport(adapter); + TestEventTcpListener listener; + transport.set_listener(&listener); + + ASSERT_EQ(transport.initialize(Endpoint{"127.0.0.1", 0}), Result::SUCCESS); + ASSERT_EQ(transport.start(), Result::SUCCESS); + + adapter.inject_connected(Endpoint{"10.0.0.1", 5000, TransportProtocol::TCP}); + + Message sent = make_tcp_sample_message(); + std::vector raw = sent.serialize(); + adapter.inject_receive(raw); + + ASSERT_TRUE(listener.wait_for_message()); + ASSERT_EQ(listener.message_count(), 1u); + EXPECT_EQ(listener.message_service_id(0), sent.get_service_id()); + + MessagePtr queued = transport.receive_message(); + ASSERT_NE(queued, nullptr); + EXPECT_EQ(queued->get_method_id(), sent.get_method_id()); + + transport.stop(); +} + +TEST(EventDrivenTcpTransport, ReceiveFragmentedMessage) { + MockTcpAdapter adapter; + EventDrivenTcpTransport transport(adapter); + TestEventTcpListener listener; + transport.set_listener(&listener); + + ASSERT_EQ(transport.initialize(Endpoint{"127.0.0.1", 0}), Result::SUCCESS); + ASSERT_EQ(transport.start(), Result::SUCCESS); + + adapter.inject_connected(Endpoint{"10.0.0.1", 5000, TransportProtocol::TCP}); + + Message sent = make_tcp_sample_message(); + std::vector raw = sent.serialize(); + + size_t half = raw.size() / 2; + std::vector first_half(raw.begin(), raw.begin() + static_cast(half)); + std::vector second_half(raw.begin() + static_cast(half), raw.end()); + + adapter.inject_receive(first_half); + EXPECT_FALSE(listener.wait_for_message(std::chrono::milliseconds(50))); + + adapter.inject_receive(second_half); + ASSERT_TRUE(listener.wait_for_message()); + ASSERT_EQ(listener.message_count(), 1u); + EXPECT_EQ(listener.message_service_id(0), sent.get_service_id()); + + transport.stop(); +} + +TEST(EventDrivenTcpTransport, ServerModeEnableAndAccept) { + MockTcpAdapter adapter; + EventDrivenTcpTransport transport(adapter); + + ASSERT_EQ(transport.initialize(Endpoint{"127.0.0.1", 30490}), Result::SUCCESS); + ASSERT_EQ(transport.enable_server_mode(), Result::SUCCESS); + ASSERT_EQ(transport.start(), Result::SUCCESS); + + Endpoint remote{"10.0.0.2", 40000, TransportProtocol::TCP}; + adapter.stage_pending_connection(remote); + + Endpoint accepted; + EXPECT_EQ(transport.try_accept_connection(accepted), Result::SUCCESS); + EXPECT_EQ(accepted, remote); + + transport.stop(); +} + +TEST(EventDrivenTcpTransport, ServerModeWithoutInitializeFails) { + MockTcpAdapter adapter; + EventDrivenTcpTransport transport(adapter); + + EXPECT_EQ(transport.enable_server_mode(), Result::NOT_INITIALIZED); +} + +TEST(EventDrivenTcpTransport, ConnectInServerModeFails) { + MockTcpAdapter adapter; + EventDrivenTcpTransport transport(adapter); + + ASSERT_EQ(transport.initialize(Endpoint{"127.0.0.1", 0}), Result::SUCCESS); + ASSERT_EQ(transport.enable_server_mode(), Result::SUCCESS); + ASSERT_EQ(transport.start(), Result::SUCCESS); + + EXPECT_EQ(transport.connect(Endpoint{"10.0.0.1", 5000}), Result::INVALID_STATE); + + transport.stop(); +} + +TEST(EventDrivenTcpTransport, ListenerPreservedAcrossStopStart) { + MockTcpAdapter adapter; + EventDrivenTcpTransport transport(adapter); + TestEventTcpListener listener; + transport.set_listener(&listener); + + ASSERT_EQ(transport.initialize(Endpoint{"127.0.0.1", 0}), Result::SUCCESS); + ASSERT_EQ(transport.start(), Result::SUCCESS); + transport.stop(); + + ASSERT_EQ(transport.initialize(Endpoint{"127.0.0.1", 0}), Result::SUCCESS); + ASSERT_EQ(transport.start(), Result::SUCCESS); + + adapter.inject_connected(Endpoint{"10.0.0.1", 5000, TransportProtocol::TCP}); + + Message sent = make_tcp_sample_message(); + adapter.inject_receive(sent.serialize()); + + ASSERT_TRUE(listener.wait_for_message()); + ASSERT_EQ(listener.message_count(), 1u); + + transport.stop(); +} + +TEST(EventDrivenTcpTransport, StopClearsCallbacks) { + MockTcpAdapter adapter; + EventDrivenTcpTransport transport(adapter); + TestEventTcpListener listener; + transport.set_listener(&listener); + + ASSERT_EQ(transport.initialize(Endpoint{"127.0.0.1", 0}), Result::SUCCESS); + ASSERT_EQ(transport.start(), Result::SUCCESS); + transport.stop(); + + adapter.inject_receive({0x00, 0x01, 0x02}); + + EXPECT_FALSE(listener.wait_for_message(std::chrono::milliseconds(50))); +} diff --git a/tests/test_event_driven_udp_transport.cpp b/tests/test_event_driven_udp_transport.cpp index bffc558988..3183671c85 100644 --- a/tests/test_event_driven_udp_transport.cpp +++ b/tests/test_event_driven_udp_transport.cpp @@ -54,13 +54,33 @@ class TestEventUdpListener : public ITransportListener { return cv_.wait_for(lock, timeout, [this]() { return error_count_ > 0; }); } - std::vector> received_messages_; + size_t message_count() const { + std::scoped_lock lock(mutex_); + return received_messages_.size(); + } + + Endpoint message_sender(size_t index) const { + std::scoped_lock lock(mutex_); + return received_messages_.at(index).second; + } + + uint16_t message_service_id(size_t index) const { + std::scoped_lock lock(mutex_); + return received_messages_.at(index).first->get_service_id(); + } + + uint16_t message_method_id(size_t index) const { + std::scoped_lock lock(mutex_); + return received_messages_.at(index).first->get_method_id(); + } + std::atomic last_error_{Result::SUCCESS}; std::atomic error_count_{0}; private: - std::mutex mutex_; + mutable std::mutex mutex_; std::condition_variable cv_; + std::vector> received_messages_; }; class MockUdpAdapter : public IUdpSocketAdapter { @@ -206,9 +226,9 @@ TEST(EventDrivenUdpTransport, ReceiveCallbackNotifiesListener) { adapter.inject_receive(raw, sender); ASSERT_TRUE(listener.wait_for_message()); - ASSERT_EQ(listener.received_messages_.size(), 1u); - EXPECT_EQ(listener.received_messages_[0].second, sender); - EXPECT_EQ(listener.received_messages_[0].first->get_service_id(), sent.get_service_id()); + ASSERT_EQ(listener.message_count(), 1u); + EXPECT_EQ(listener.message_sender(0), sender); + EXPECT_EQ(listener.message_service_id(0), sent.get_service_id()); MessagePtr queued = transport.receive_message(); ASSERT_NE(queued, nullptr); From 021a4803ac59b0d64396aaa39c4272794b3e42ae Mon Sep 17 00:00:00 2001 From: Vinicius Zein Date: Sun, 29 Mar 2026 12:41:02 -0400 Subject: [PATCH 7/7] fix(sd): suppress nodiscard warning on leave_multicast_group in teardown MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The leave result is intentionally ignored during shutdown — the transport is about to be stopped regardless. Made-with: Cursor --- src/sd/sd_client.cpp | 2 +- src/sd/sd_server.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sd/sd_client.cpp b/src/sd/sd_client.cpp index 6c627cbd39..b370c62f1c 100644 --- a/src/sd/sd_client.cpp +++ b/src/sd/sd_client.cpp @@ -279,7 +279,7 @@ class SdClientImpl : public transport::ITransportListener { void leave_multicast_group() { auto multicast = std::dynamic_pointer_cast(transport_); if (multicast) { - multicast->leave_multicast_group(config_.multicast_address); + (void)multicast->leave_multicast_group(config_.multicast_address); } } diff --git a/src/sd/sd_server.cpp b/src/sd/sd_server.cpp index ba550e9d1b..90c6c13b12 100644 --- a/src/sd/sd_server.cpp +++ b/src/sd/sd_server.cpp @@ -277,7 +277,7 @@ class SdServerImpl : public transport::ITransportListener { void leave_multicast_group() { auto multicast = std::dynamic_pointer_cast(transport_); if (multicast) { - multicast->leave_multicast_group(config_.multicast_address); + (void)multicast->leave_multicast_group(config_.multicast_address); } }