From 43afc420da68ee85f8f707f3ad496f9b5cf98c29 Mon Sep 17 00:00:00 2001 From: shijing xian Date: Fri, 16 Jan 2026 11:00:40 +0800 Subject: [PATCH] remove the listener and fixed some racing conditions --- include/livekit/room.h | 6 +- src/ffi_client.cpp | 5 -- src/room.cpp | 121 ++++++++++++++++++++++++++++++++++------- 3 files changed, 105 insertions(+), 27 deletions(-) diff --git a/include/livekit/room.h b/include/livekit/room.h index a0d3d3d..2e641ad 100644 --- a/include/livekit/room.h +++ b/include/livekit/room.h @@ -230,14 +230,13 @@ class Room { private: mutable std::mutex lock_; - bool connected_{false}; + ConnectionState connection_state_ = ConnectionState::Disconnected; RoomDelegate *delegate_ = nullptr; // Not owned RoomInfoData room_info_; std::shared_ptr room_handle_; std::unique_ptr local_participant_; std::unordered_map> remote_participants_; - ConnectionState connection_state_ = ConnectionState::Disconnected; // Data stream std::unordered_map text_stream_handlers_; std::unordered_map byte_stream_handlers_; @@ -248,6 +247,9 @@ class Room { // E2EE std::unique_ptr e2ee_manager_; + // FfiClient listener ID (0 means no listener registered) + int listener_id_{0}; + void OnEvent(const proto::FfiEvent &event); }; } // namespace livekit diff --git a/src/ffi_client.cpp b/src/ffi_client.cpp index efb7840..811cebb 100644 --- a/src/ffi_client.cpp +++ b/src/ffi_client.cpp @@ -177,10 +177,8 @@ FfiClient::connectAsync(const std::string &url, const std::string &token, auto *opts = connect->mutable_options(); opts->set_auto_subscribe(options.auto_subscribe); opts->set_dynacast(options.dynacast); - std::cout << "connectAsync " << std::endl; // --- E2EE / encryption (optional) --- if (options.encryption.has_value()) { - std::cout << "connectAsync e2ee " << std::endl; const E2EEOptions &e2ee = *options.encryption; const auto &kpo = e2ee.key_provider_options; @@ -222,7 +220,6 @@ FfiClient::connectAsync(const std::string &url, const std::string &token, // --- RTC configuration (optional) --- if (options.rtc_config.has_value()) { - std::cout << "options.rtc_config.has_value() " << std::endl; const RtcConfig &rc = *options.rtc_config; auto *rtc = opts->mutable_rtc_config(); @@ -248,7 +245,6 @@ FfiClient::connectAsync(const std::string &url, const std::string &token, } } } - std::cout << "connectAsync sendRequest " << std::endl; proto::FfiResponse resp = sendRequest(req); if (!resp.has_connect()) { throw std::runtime_error("FfiResponse missing connect"); @@ -266,7 +262,6 @@ FfiClient::connectAsync(const std::string &url, const std::string &token, [](const proto::FfiEvent &event, std::promise &pr) { const auto &connectCb = event.connect(); - std::cout << "connectAsync e2ee done " << std::endl; if (!connectCb.error().empty()) { pr.set_exception( std::make_exception_ptr(std::runtime_error(connectCb.error()))); diff --git a/src/room.cpp b/src/room.cpp index 91f19a7..174b64b 100644 --- a/src/room.cpp +++ b/src/room.cpp @@ -67,7 +67,17 @@ createRemoteParticipant(const proto::OwnedParticipant &owned) { } // namespace Room::Room() {} -Room::~Room() {} +Room::~Room() { + int listener_to_remove = 0; + { + std::lock_guard g(lock_); + listener_to_remove = listener_id_; + listener_id_ = 0; + } + if (listener_to_remove != 0) { + FfiClient::instance().RemoveListener(listener_to_remove); + } +} void Room::setDelegate(RoomDelegate *delegate) { std::lock_guard g(lock_); @@ -76,27 +86,25 @@ void Room::setDelegate(RoomDelegate *delegate) { bool Room::Connect(const std::string &url, const std::string &token, const RoomOptions &options) { - auto listenerId = FfiClient::instance().AddListener( - std::bind(&Room::OnEvent, this, std::placeholders::_1)); { std::lock_guard g(lock_); - if (connected_) { - FfiClient::instance().RemoveListener(listenerId); + if (connection_state_ != ConnectionState::Disconnected) { throw std::runtime_error("already connected"); } + connection_state_ = ConnectionState::Reconnecting; } auto fut = FfiClient::instance().connectAsync(url, token, options); try { auto connectCb = fut.get(); // fut will throw if it fails to connect to the room - { - std::lock_guard g(lock_); - connected_ = true; - const auto &ownedRoom = connectCb.result().room(); - room_handle_ = std::make_shared(ownedRoom.handle().id()); - room_info_ = fromProto(ownedRoom.info()); - } + + const auto &owned_room = connectCb.result().room(); + auto new_room_handle = + std::make_shared(owned_room.handle().id()); + auto new_room_info = fromProto(owned_room.info()); + // Setup local particpant + std::unique_ptr new_local_participant; { const auto &owned_local = connectCb.result().local_participant(); const auto &pinfo = owned_local.info(); @@ -113,11 +121,13 @@ bool Room::Connect(const std::string &url, const std::string &token, // Participant base stores a weak_ptr, so share the room handle FfiHandle participant_handle( static_cast(owned_local.handle().id())); - local_participant_ = std::make_unique( + new_local_participant = std::make_unique( std::move(participant_handle), pinfo.sid(), pinfo.name(), pinfo.identity(), pinfo.metadata(), std::move(attrs), kind, reason); } // Setup remote participants + std::unordered_map> + new_remote_participants; { const auto &participants = connectCb.result().participants(); std::lock_guard g(lock_); @@ -132,23 +142,41 @@ bool Room::Connect(const std::string &url, const std::string &token, std::move(publication)); } - remote_participants_.emplace(rp->identity(), std::move(rp)); + new_remote_participants.emplace(rp->identity(), std::move(rp)); } } // Setup e2eeManager + std::unique_ptr new_e2ee_manager; if (options.encryption) { std::cout << "creating E2eeManager " << std::endl; e2ee_manager_ = std::unique_ptr( new E2EEManager(room_handle_->get(), options.encryption.value())); - } else { - e2ee_manager_.reset(); + } + + // Publish all state atomically under lock + { + std::lock_guard g(lock_); + room_handle_ = std::move(new_room_handle); + room_info_ = std::move(new_room_info); + local_participant_ = std::move(new_local_participant); + remote_participants_ = std::move(new_remote_participants); + e2ee_manager_ = std::move(new_e2ee_manager); + connection_state_ = ConnectionState::Connected; + } + + // Install listener (Room is fully initialized) + auto listenerId = FfiClient::instance().AddListener( + std::bind(&Room::OnEvent, this, std::placeholders::_1)); + { + std::lock_guard g(lock_); + listener_id_ = listenerId; } return true; } catch (const std::exception &e) { - // On error, remove the listener and rethrow - FfiClient::instance().RemoveListener(listenerId); + // On error, set the connection_state_ to Disconnected + connection_state_ = ConnectionState::Disconnected; std::cerr << "Room::Connect failed: " << e.what() << std::endl; return false; } @@ -255,6 +283,16 @@ void Room::OnEvent(const FfiEvent &event) { switch (event.message_case()) { case FfiEvent::kRoomEvent: { const proto::RoomEvent &re = event.room_event(); + + // Check if this event is for our room handle + { + std::lock_guard guard(lock_); + if (!room_handle_ || + re.room_handle() != static_cast(room_handle_->get())) { + return; + } + } + switch (re.message_case()) { case proto::RoomEvent::kParticipantConnected: { std::shared_ptr new_participant; @@ -920,8 +958,12 @@ void Room::OnEvent(const FfiEvent &event) { { std::lock_guard guard(lock_); const auto &cs = re.connection_state_changed(); - connection_state_ = static_cast(cs.state()); - ev.state = connection_state_; + // TODO, maybe we should update our |connection_state_| + // correspoindingly, but the this kConnectionStateChanged event is never + // triggered in my local test. + std::cout << "cs.state() is " << cs.state() << " connection_state_ is " + << (int)connection_state_ << std::endl; + ev.state = static_cast(cs.state()); } if (delegate_snapshot) { delegate_snapshot->onConnectionStateChanged(*this, ev); @@ -951,6 +993,45 @@ void Room::OnEvent(const FfiEvent &event) { break; } case proto::RoomEvent::kEos: { + // Remove listener since no more events will come for this room + int listener_to_remove = 0; + + // Move state out of lock scope before destroying to avoid holding lock + // during potentially long destructors + std::unique_ptr old_local_participant; + std::unordered_map> + old_remote_participants; + std::shared_ptr old_room_handle; + std::unique_ptr old_e2ee_manager; + std::unordered_map> + old_text_readers; + std::unordered_map> + old_byte_readers; + + { + std::lock_guard guard(lock_); + listener_to_remove = listener_id_; + listener_id_ = 0; + + // Reset connection state + connection_state_ = ConnectionState::Disconnected; + + // Move state out for cleanup outside lock + old_local_participant = std::move(local_participant_); + old_remote_participants = std::move(remote_participants_); + old_room_handle = std::move(room_handle_); + old_e2ee_manager = std::move(e2ee_manager_); + old_text_readers = std::move(text_stream_readers_); + old_byte_readers = std::move(byte_stream_readers_); + } + + // Remove listener outside lock + if (listener_to_remove != 0) { + FfiClient::instance().RemoveListener(listener_to_remove); + } + + // Old state will be destroyed here when going out of scope + RoomEosEvent ev; if (delegate_snapshot) { delegate_snapshot->onRoomEos(*this, ev);