Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions cmake/libe3Tests.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@ target_include_directories(libe3_test_framework INTERFACE

file(GLOB LIBE3_TEST_SOURCES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "tests/*.cpp")

# Tests that hardcode the ASN.1 wire format: test_asn1_size exercises the
# APER encoder directly, and test_e2e_report_path configures its agent and
# fake-dApp peer with EncodingFormat::ASN1. On builds with
# LIBE3_ENABLE_ASN1=OFF the encoder factory cannot create an ASN.1 encoder,
# so these tests can only fail; exclude them instead of weakening them.
set(LIBE3_ASN1_ONLY_TESTS
asn1_size
e2e_report_path
)

foreach(test_src IN LISTS LIBE3_TEST_SOURCES)
# Derive a target name from the source file name: tests/test_foo.cpp -> test_foo
get_filename_component(test_name ${test_src} NAME_WE)
Expand All @@ -27,6 +37,10 @@ foreach(test_src IN LISTS LIBE3_TEST_SOURCES)
message(STATUS "Skipping test_json_encoder: JSON support disabled")
continue()
endif()
if(NOT LIBE3_ENABLE_ASN1 AND simple_name IN_LIST LIBE3_ASN1_ONLY_TESTS)
message(STATUS "Skipping test_${simple_name}: ASN.1 support disabled")
continue()
endif()

add_executable(${target_name} "${CMAKE_CURRENT_SOURCE_DIR}/${test_src}")
target_link_libraries(${target_name}
Expand Down
17 changes: 17 additions & 0 deletions include/libe3/e3_interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,12 @@ class E3Interface {
// Core components
std::unique_ptr<E3Connector> connector_;
std::unique_ptr<E3Encoder> encoder_;
// Service Models registered with THIS interface. Owned per interface
// (not process-wide) so multiple agents in one process can host SMs
// with the same RAN function id, and stopping one agent cannot wipe
// another agent's registry. Only the RAN role registers SMs; on the
// dApp role this stays empty.
SmRegistry sm_registry_;
// RAN-only state (nullptr when role==DAPP).
std::unique_ptr<SubscriptionManager> subscription_manager_;
// dApp-only state (nullptr when role==RAN).
Expand Down Expand Up @@ -308,6 +314,17 @@ class E3Interface {
void handle_release_message(const ReleaseMessage &release);
void handle_dapp_disconnection(uint32_t dapp_id);

/**
* @brief Complete a setup REQ/REP exchange with a best-effort empty reply.
*
* Used whenever a received setup message cannot be answered with a real
* SetupResponse (undecodable bytes, wrong PDU type, response encode
* failure). A ZMQ REP socket must send exactly one reply per received
* request before it can receive again; bailing out without replying
* wedges the setup channel for every subsequent dApp until restart.
*/
void send_empty_setup_reply();

// dApp-role handlers
void handle_setup_response(const SetupResponse& response);
void handle_subscription_response(const SubscriptionResponse& response);
Expand Down
20 changes: 9 additions & 11 deletions include/libe3/sm_interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,15 +212,18 @@ using SmFactory = std::function<std::unique_ptr<ServiceModel>()>;
/**
* @brief SM Registry for managing registered Service Models
*
* This class provides a central registry for Service Models.
* It's used by the E3Agent to find and manage SMs.
* Each E3Interface owns one SmRegistry instance: the registry is scoped to
* its owning interface/agent, not to the process. Two agents in the same
* process (e.g. integration tests, multi-RAN deployments) can therefore
* register Service Models with the same RAN function id independently, and
* tearing one agent down only clears that agent's own SMs.
*/
class SmRegistry {
public:
/**
* @brief Get the singleton instance
*/
static SmRegistry& instance();
SmRegistry() = default;
~SmRegistry() = default;
SmRegistry(const SmRegistry&) = delete;
SmRegistry& operator=(const SmRegistry&) = delete;

/**
* @brief Register a Service Model
Expand Down Expand Up @@ -277,11 +280,6 @@ class SmRegistry {
void clear();

private:
SmRegistry() = default;
~SmRegistry() = default;
SmRegistry(const SmRegistry&) = delete;
SmRegistry& operator=(const SmRegistry&) = delete;

mutable std::mutex mutex_;
std::unordered_map<uint32_t, std::unique_ptr<ServiceModel>> sms_;
std::unordered_map<uint32_t, SmFactory> factories_;
Expand Down
72 changes: 48 additions & 24 deletions src/core/e3_interface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,12 @@ void E3Interface::stop() {
}
setup_complete_cv_.notify_all();

// Clean up SM registry — only the RAN role owns SMs. Doing this
// unconditionally would wipe a sibling RAN-role E3Interface's registered
// SMs in a two-roles-in-one-process scenario (integration tests, the
// latency benchmark, multi-peer dApps colocated with a RAN). The dApp
// role never registers an SM, so there's nothing to clear.
// Clean up this interface's own SM registry — only the RAN role owns
// SMs (the dApp role never registers any, so its registry is empty).
// The registry is per-interface, so this cannot affect a sibling
// E3Interface's SMs in a multi-agent process.
if (config_.role == E3Role::RAN) {
SmRegistry::instance().clear();
sm_registry_.clear();
}

// Dispose connector
Expand All @@ -266,7 +265,7 @@ ErrorCode E3Interface::queue_outbound(Pdu pdu) {
}

std::vector<uint32_t> E3Interface::get_available_ran_functions() const {
return SmRegistry::instance().get_available_ran_functions();
return sm_registry_.get_available_ran_functions();
}

ErrorCode E3Interface::register_sm(std::unique_ptr<ServiceModel> sm) {
Expand All @@ -286,7 +285,7 @@ ErrorCode E3Interface::register_sm(std::unique_ptr<ServiceModel> sm) {
}
return queue_outbound(std::move(pdu));
});
return SmRegistry::instance().register_sm(std::move(sm));
return sm_registry_.register_sm(std::move(sm));
}

void E3Interface::notify_dapp_status_changed() {
Expand All @@ -302,7 +301,7 @@ void E3Interface::notify_dapp_status_changed() {
void E3Interface::setup_loop_ran() {
E3_LOG_INFO(LOG_TAG) << "Setup loop (RAN) started";

auto available_ran_functions = SmRegistry::instance().get_available_ran_functions();
auto available_ran_functions = sm_registry_.get_available_ran_functions();

while (!should_stop_.load()) {
std::vector<uint8_t> buffer;
Expand All @@ -315,26 +314,32 @@ void E3Interface::setup_loop_ran() {

E3_LOG_INFO(LOG_TAG) << "Setup request received: " << ret << " bytes";

// Decode the setup request
// Decode the setup request. On any failure we must still complete
// the REQ/REP exchange (see send_empty_setup_reply) or the setup
// channel wedges for every subsequent dApp.
auto decode_result = encoder_->decode(buffer.data(), static_cast<size_t>(ret));
if (!decode_result) {
E3_LOG_ERROR(LOG_TAG) << "Failed to decode setup request; ret=" << ret;
E3_LOG_WARN(LOG_TAG) << "Undecodable setup message (" << ret
<< " bytes, wrong encoding or garbage); replying empty";
send_empty_setup_reply();
continue;
}

Pdu& pdu = *decode_result;
if (pdu.type != PduType::SETUP_REQUEST) {
E3_LOG_ERROR(LOG_TAG) << "Unexpected PDU type in setup: "
<< pdu_type_to_string(pdu.type);
E3_LOG_WARN(LOG_TAG) << "Unexpected PDU type in setup: "
<< pdu_type_to_string(pdu.type) << "; replying empty";
send_empty_setup_reply();
continue;
}

auto* request = std::get_if<SetupRequest>(&pdu.choice);
if (!request) {
E3_LOG_ERROR(LOG_TAG) << "Failed to get SetupRequest from PDU";
E3_LOG_WARN(LOG_TAG) << "Failed to get SetupRequest from PDU; replying empty";
send_empty_setup_reply();
continue;
}

handle_setup_request(*request, pdu.message_id);
}

Expand Down Expand Up @@ -523,7 +528,7 @@ void E3Interface::sm_data_handler_loop() {
}

// Get SM for this RAN function
ServiceModel* sm = SmRegistry::instance().get_by_ran_function(ran_func);
ServiceModel* sm = sm_registry_.get_by_ran_function(ran_func);

if (!sm || !sm->is_running()) {
continue;
Expand Down Expand Up @@ -568,13 +573,13 @@ void E3Interface::handle_setup_request(const SetupRequest& request, uint32_t req

// Create and send response
// Get available RAN functions and convert to RanFunctionDef list
auto available_ran_function_ids = SmRegistry::instance().get_available_ran_functions();
auto available_ran_function_ids = sm_registry_.get_available_ran_functions();
E3_LOG_DEBUG(LOG_TAG) << "Available RAN function ids count: " << available_ran_function_ids.size();
std::vector<RanFunctionDef> ran_function_list;
for (auto id : available_ran_function_ids) {
RanFunctionDef func;
func.ran_function_identifier = id;
ServiceModel* sm = SmRegistry::instance().get_by_ran_function(id);
ServiceModel* sm = sm_registry_.get_by_ran_function(id);
if (sm) {
func.telemetry_identifier_list = sm->telemetry_ids();
func.control_identifier_list = sm->control_ids();
Expand Down Expand Up @@ -603,9 +608,11 @@ void E3Interface::handle_setup_request(const SetupRequest& request, uint32_t req

if (!encode_result) {
E3_LOG_ERROR(LOG_TAG) << "Failed to encode setup response for request id " << request_message_id;
// Still complete the REQ/REP exchange so the setup channel survives.
send_empty_setup_reply();
return;
}

ErrorCode send_result = connector_->send_response(encode_result->buffer);
if (send_result != ErrorCode::SUCCESS) {
E3_LOG_ERROR(LOG_TAG) << "Failed to send setup response for request id " << request_message_id
Expand All @@ -615,6 +622,23 @@ void E3Interface::handle_setup_request(const SetupRequest& request, uint32_t req
}
}

void E3Interface::send_empty_setup_reply() {
// The RAN side of the setup channel is a ZMQ REP socket: it must send
// exactly one reply per received request before it can receive again.
// Returning without replying (undecodable request, wrong PDU type,
// response-encode failure) leaves the socket in the send state, where
// every later recv fails — all future dApp setups stall until the
// connector resets or the agent restarts. Completing the exchange with
// an empty frame keeps the channel alive; the peer treats the empty
// reply as a failed setup and retries. On connectors without a REP
// state machine (POSIX) the empty send is harmless.
ErrorCode rc = connector_->send_response({});
if (rc != ErrorCode::SUCCESS) {
E3_LOG_WARN(LOG_TAG) << "Failed to send empty setup reply: "
<< error_code_to_string(rc);
}
}

void E3Interface::handle_subscription_request(const SubscriptionRequest& request, uint32_t request_message_id) {
E3_LOG_INFO(LOG_TAG) << "Handling subscription request from dApp " << request.dapp_identifier
<< " for RAN function " << request.ran_function_identifier;
Expand Down Expand Up @@ -706,7 +730,7 @@ void E3Interface::handle_control_action(const DAppControlAction& action, uint32_
<< " control " << action.control_identifier
<< " (" << action.action_data.size() << " bytes)";

ServiceModel* sm = SmRegistry::instance().get_by_ran_function(action.ran_function_identifier);
ServiceModel* sm = sm_registry_.get_by_ran_function(action.ran_function_identifier);

if (sm && sm->is_running()) {
ErrorCode result = sm->handle_control_action(request_message_id, action);
Expand Down Expand Up @@ -1137,13 +1161,13 @@ ErrorCode E3Interface::wait_for_setup(std::chrono::milliseconds timeout) {

void E3Interface::on_sm_lifecycle_change(uint32_t ran_function_id, bool should_start) {
if (should_start) {
ErrorCode result = SmRegistry::instance().start_sm(ran_function_id);
ErrorCode result = sm_registry_.start_sm(ran_function_id);
if (result != ErrorCode::SUCCESS) {
E3_LOG_ERROR(LOG_TAG) << "Failed to start SM for RAN function "
<< ran_function_id << ": " << error_code_to_string(result);
}
} else {
ErrorCode result = SmRegistry::instance().stop_sm(ran_function_id);
ErrorCode result = sm_registry_.stop_sm(ran_function_id);
if (result != ErrorCode::SUCCESS) {
E3_LOG_WARN(LOG_TAG) << "Failed to stop SM for RAN function "
<< ran_function_id << ": " << error_code_to_string(result);
Expand Down
5 changes: 0 additions & 5 deletions src/core/sm_registry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@ namespace {
constexpr const char* LOG_TAG = "SmReg";
}

SmRegistry& SmRegistry::instance() {
static SmRegistry registry;
return registry;
}

ErrorCode SmRegistry::register_sm(std::unique_ptr<ServiceModel> sm) {
if (!sm) {
return ErrorCode::INVALID_PARAM;
Expand Down
22 changes: 19 additions & 3 deletions src/encoder/asn1_encoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,25 @@ E3_PDU* Asn1E3Encoder::pdu_to_asn1(const Pdu& pdu) const {
ASN_SEQUENCE_ADD(&ran_func->controlIdentifierList, id);
}

OCTET_STRING_fromBuf(&ran_func->ranFunctionData,
reinterpret_cast<const char*>(func.ran_function_data.data()),
static_cast<int>(func.ran_function_data.size()));
if (func.ran_function_data.empty()) {
// ranFunctionData is MANDATORY in the E3AP schema and
// constrained to OCTET STRING (SIZE (1..32768))
// (messages/asn1/V1/e3ap-1.0.0.asn1,
// E3-RanFunctionDefinition): it can be neither omitted
// nor empty. An SM that provides no RAN-function data
// (ServiceModel::ran_function_data() defaults to {})
// would otherwise violate the APER size constraint and
// abort the encode of the entire setupResponse, taking
// every other registered RAN function down with it.
// Substitute a 1-byte 0x00 placeholder instead.
static const char kPlaceholder = '\0';
OCTET_STRING_fromBuf(&ran_func->ranFunctionData,
&kPlaceholder, 1);
} else {
OCTET_STRING_fromBuf(&ran_func->ranFunctionData,
reinterpret_cast<const char*>(func.ran_function_data.data()),
static_cast<int>(func.ran_function_data.size()));
}

// Ensure ranFunctionList container is allocated (optional field)
if (!asn1_pdu->msg.choice.setupResponse->ranFunctionList) {
Expand Down
57 changes: 57 additions & 0 deletions tests/test_asn1_size.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,63 @@ TEST(Asn1Size_growsLinearlyWithPayload) {
ASSERT_LE(delta, 2 * naive_payload_delta + ENVELOPE_OVERHEAD_MAX);
}

/**
* A setupResponse that advertises an SM with EMPTY ran_function_data must
* still encode. The schema makes ranFunctionData a mandatory
* OCTET STRING (SIZE (1..32768)), so the encoder substitutes a 1-byte 0x00
* placeholder; without that substitution the APER size constraint fails and
* the whole setupResponse encode aborts. The result must also decode, with
* the placeholder visible to the peer and every other field intact.
*/
TEST(Asn1_setupResponse_emptyRanFunctionData_stillEncodes) {
auto enc = make_encoder();

RanFunctionDef with_data;
with_data.ran_function_identifier = 1;
with_data.telemetry_identifier_list = {1, 2};
with_data.control_identifier_list = {10};
with_data.ran_function_data = {0xAB, 0xCD};

RanFunctionDef without_data;
without_data.ran_function_identifier = 2;
without_data.telemetry_identifier_list = {3};
without_data.control_identifier_list = {20};
// ran_function_data deliberately left empty (ServiceModel default)

auto encoded = enc->encode_setup_response(
7, // message_id
42, // request_id
ResponseCode::POSITIVE,
std::string("1.0.0"), // e3ap_protocol_version
3u, // dapp_identifier
"asn1-test-ran", // ran_identifier
{with_data, without_data});
ASSERT_TRUE(encoded.has_value());

auto decoded = enc->decode(encoded->buffer.data(), encoded->buffer.size());
ASSERT_TRUE(decoded.has_value());
ASSERT_EQ(static_cast<int>(decoded->type),
static_cast<int>(PduType::SETUP_RESPONSE));

auto* resp = std::get_if<SetupResponse>(&decoded->choice);
ASSERT_TRUE(resp != nullptr);
ASSERT_EQ(static_cast<int>(resp->response_code),
static_cast<int>(ResponseCode::POSITIVE));
ASSERT_EQ(resp->request_id, 42u);
ASSERT_EQ(resp->ran_function_list.size(), 2u);

// SM with real data: payload round-trips byte-for-byte.
ASSERT_EQ(resp->ran_function_list[0].ran_function_identifier, 1u);
ASSERT_EQ(resp->ran_function_list[0].ran_function_data.size(), 2u);
ASSERT_EQ(static_cast<int>(resp->ran_function_list[0].ran_function_data[0]), 0xAB);
ASSERT_EQ(static_cast<int>(resp->ran_function_list[0].ran_function_data[1]), 0xCD);

// SM without data: the wire carries the 1-byte 0x00 placeholder.
ASSERT_EQ(resp->ran_function_list[1].ran_function_identifier, 2u);
ASSERT_EQ(resp->ran_function_list[1].ran_function_data.size(), 1u);
ASSERT_EQ(static_cast<int>(resp->ran_function_list[1].ran_function_data[0]), 0x00);
}

// ---------------------------------------------------------------------------

int main() {
Expand Down
Loading
Loading