Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions include/livekit/room.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,14 +230,13 @@ class Room {

private:
mutable std::mutex lock_;
bool connected_{false};
ConnectionState connection_state_ = ConnectionState::Disconnected;
RoomDelegate *delegate_ = nullptr; // Not owned
RoomInfoData room_info_;
std::shared_ptr<FfiHandle> room_handle_;
std::unique_ptr<LocalParticipant> local_participant_;
std::unordered_map<std::string, std::shared_ptr<RemoteParticipant>>
remote_participants_;
ConnectionState connection_state_ = ConnectionState::Disconnected;
// Data stream
std::unordered_map<std::string, TextStreamHandler> text_stream_handlers_;
std::unordered_map<std::string, ByteStreamHandler> byte_stream_handlers_;
Expand All @@ -248,6 +247,9 @@ class Room {
// E2EE
std::unique_ptr<E2EEManager> e2ee_manager_;

// FfiClient listener ID (0 means no listener registered)
int listener_id_{0};

void OnEvent(const proto::FfiEvent &event);
};
} // namespace livekit
Expand Down
5 changes: 0 additions & 5 deletions src/ffi_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,8 @@ FfiClient::connectAsync(const std::string &url, const std::string &token,
auto *opts = connect->mutable_options();
opts->set_auto_subscribe(options.auto_subscribe);
opts->set_dynacast(options.dynacast);
std::cout << "connectAsync " << std::endl;
// --- E2EE / encryption (optional) ---
if (options.encryption.has_value()) {
std::cout << "connectAsync e2ee " << std::endl;
const E2EEOptions &e2ee = *options.encryption;
const auto &kpo = e2ee.key_provider_options;

Expand Down Expand Up @@ -222,7 +220,6 @@ FfiClient::connectAsync(const std::string &url, const std::string &token,

// --- RTC configuration (optional) ---
if (options.rtc_config.has_value()) {
std::cout << "options.rtc_config.has_value() " << std::endl;
const RtcConfig &rc = *options.rtc_config;
auto *rtc = opts->mutable_rtc_config();

Expand All @@ -248,7 +245,6 @@ FfiClient::connectAsync(const std::string &url, const std::string &token,
}
}
}
std::cout << "connectAsync sendRequest " << std::endl;
proto::FfiResponse resp = sendRequest(req);
if (!resp.has_connect()) {
throw std::runtime_error("FfiResponse missing connect");
Expand All @@ -266,7 +262,6 @@ FfiClient::connectAsync(const std::string &url, const std::string &token,
[](const proto::FfiEvent &event,
std::promise<proto::ConnectCallback> &pr) {
const auto &connectCb = event.connect();
std::cout << "connectAsync e2ee done " << std::endl;
if (!connectCb.error().empty()) {
pr.set_exception(
std::make_exception_ptr(std::runtime_error(connectCb.error())));
Expand Down
121 changes: 101 additions & 20 deletions src/room.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,17 @@ createRemoteParticipant(const proto::OwnedParticipant &owned) {
} // namespace
Room::Room() {}

Room::~Room() {}
Room::~Room() {
int listener_to_remove = 0;
{
std::lock_guard<std::mutex> g(lock_);
listener_to_remove = listener_id_;
listener_id_ = 0;
}
if (listener_to_remove != 0) {
FfiClient::instance().RemoveListener(listener_to_remove);
}
}

void Room::setDelegate(RoomDelegate *delegate) {
std::lock_guard<std::mutex> g(lock_);
Expand All @@ -76,27 +86,25 @@ void Room::setDelegate(RoomDelegate *delegate) {

bool Room::Connect(const std::string &url, const std::string &token,
const RoomOptions &options) {
auto listenerId = FfiClient::instance().AddListener(
std::bind(&Room::OnEvent, this, std::placeholders::_1));
{
std::lock_guard<std::mutex> g(lock_);
if (connected_) {
FfiClient::instance().RemoveListener(listenerId);
if (connection_state_ != ConnectionState::Disconnected) {
throw std::runtime_error("already connected");
}
connection_state_ = ConnectionState::Reconnecting;
}
auto fut = FfiClient::instance().connectAsync(url, token, options);
try {
auto connectCb =
fut.get(); // fut will throw if it fails to connect to the room
{
std::lock_guard<std::mutex> g(lock_);
connected_ = true;
const auto &ownedRoom = connectCb.result().room();
room_handle_ = std::make_shared<FfiHandle>(ownedRoom.handle().id());
room_info_ = fromProto(ownedRoom.info());
}

const auto &owned_room = connectCb.result().room();
auto new_room_handle =
std::make_shared<FfiHandle>(owned_room.handle().id());
auto new_room_info = fromProto(owned_room.info());

// Setup local particpant
std::unique_ptr<LocalParticipant> new_local_participant;
{
const auto &owned_local = connectCb.result().local_participant();
const auto &pinfo = owned_local.info();
Expand All @@ -113,11 +121,13 @@ bool Room::Connect(const std::string &url, const std::string &token,
// Participant base stores a weak_ptr<FfiHandle>, so share the room handle
FfiHandle participant_handle(
static_cast<uintptr_t>(owned_local.handle().id()));
local_participant_ = std::make_unique<LocalParticipant>(
new_local_participant = std::make_unique<LocalParticipant>(
std::move(participant_handle), pinfo.sid(), pinfo.name(),
pinfo.identity(), pinfo.metadata(), std::move(attrs), kind, reason);
}
// Setup remote participants
std::unordered_map<std::string, std::shared_ptr<RemoteParticipant>>
new_remote_participants;
{
const auto &participants = connectCb.result().participants();
std::lock_guard<std::mutex> g(lock_);
Expand All @@ -132,23 +142,41 @@ bool Room::Connect(const std::string &url, const std::string &token,
std::move(publication));
}

remote_participants_.emplace(rp->identity(), std::move(rp));
new_remote_participants.emplace(rp->identity(), std::move(rp));
}
}

// Setup e2eeManager
std::unique_ptr<E2EEManager> new_e2ee_manager;
if (options.encryption) {
std::cout << "creating E2eeManager " << std::endl;
e2ee_manager_ = std::unique_ptr<E2EEManager>(
new E2EEManager(room_handle_->get(), options.encryption.value()));
} else {
e2ee_manager_.reset();
}

// Publish all state atomically under lock
{
std::lock_guard<std::mutex> g(lock_);
room_handle_ = std::move(new_room_handle);
room_info_ = std::move(new_room_info);
local_participant_ = std::move(new_local_participant);
remote_participants_ = std::move(new_remote_participants);
e2ee_manager_ = std::move(new_e2ee_manager);
connection_state_ = ConnectionState::Connected;
}

// Install listener (Room is fully initialized)
auto listenerId = FfiClient::instance().AddListener(
std::bind(&Room::OnEvent, this, std::placeholders::_1));
{
std::lock_guard<std::mutex> g(lock_);
listener_id_ = listenerId;
}

return true;
} catch (const std::exception &e) {
// On error, remove the listener and rethrow
FfiClient::instance().RemoveListener(listenerId);
// On error, set the connection_state_ to Disconnected
connection_state_ = ConnectionState::Disconnected;
std::cerr << "Room::Connect failed: " << e.what() << std::endl;
return false;
}
Expand Down Expand Up @@ -255,6 +283,16 @@ void Room::OnEvent(const FfiEvent &event) {
switch (event.message_case()) {
case FfiEvent::kRoomEvent: {
const proto::RoomEvent &re = event.room_event();

// Check if this event is for our room handle
{
std::lock_guard<std::mutex> guard(lock_);
if (!room_handle_ ||
re.room_handle() != static_cast<std::uint64_t>(room_handle_->get())) {
return;
}
}

switch (re.message_case()) {
case proto::RoomEvent::kParticipantConnected: {
std::shared_ptr<RemoteParticipant> new_participant;
Expand Down Expand Up @@ -920,8 +958,12 @@ void Room::OnEvent(const FfiEvent &event) {
{
std::lock_guard<std::mutex> guard(lock_);
const auto &cs = re.connection_state_changed();
connection_state_ = static_cast<ConnectionState>(cs.state());
ev.state = connection_state_;
// TODO, maybe we should update our |connection_state_|
// correspoindingly, but the this kConnectionStateChanged event is never
// triggered in my local test.
std::cout << "cs.state() is " << cs.state() << " connection_state_ is "
<< (int)connection_state_ << std::endl;
ev.state = static_cast<ConnectionState>(cs.state());
}
if (delegate_snapshot) {
delegate_snapshot->onConnectionStateChanged(*this, ev);
Expand Down Expand Up @@ -951,6 +993,45 @@ void Room::OnEvent(const FfiEvent &event) {
break;
}
case proto::RoomEvent::kEos: {
// Remove listener since no more events will come for this room
int listener_to_remove = 0;

// Move state out of lock scope before destroying to avoid holding lock
// during potentially long destructors
std::unique_ptr<LocalParticipant> old_local_participant;
std::unordered_map<std::string, std::shared_ptr<RemoteParticipant>>
old_remote_participants;
std::shared_ptr<FfiHandle> old_room_handle;
std::unique_ptr<E2EEManager> old_e2ee_manager;
std::unordered_map<std::string, std::shared_ptr<TextStreamReader>>
old_text_readers;
std::unordered_map<std::string, std::shared_ptr<ByteStreamReader>>
old_byte_readers;

{
std::lock_guard<std::mutex> guard(lock_);
listener_to_remove = listener_id_;
listener_id_ = 0;

// Reset connection state
connection_state_ = ConnectionState::Disconnected;

// Move state out for cleanup outside lock
old_local_participant = std::move(local_participant_);
old_remote_participants = std::move(remote_participants_);
old_room_handle = std::move(room_handle_);
old_e2ee_manager = std::move(e2ee_manager_);
old_text_readers = std::move(text_stream_readers_);
old_byte_readers = std::move(byte_stream_readers_);
}

// Remove listener outside lock
if (listener_to_remove != 0) {
FfiClient::instance().RemoveListener(listener_to_remove);
}

// Old state will be destroyed here when going out of scope

RoomEosEvent ev;
if (delegate_snapshot) {
delegate_snapshot->onRoomEos(*this, ev);
Expand Down
Loading