Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deps/moxygen
356 changes: 243 additions & 113 deletions src/MoqxRelay.cpp

Large diffs are not rendered by default.

106 changes: 101 additions & 5 deletions src/MoqxRelay.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,22 @@
#include "UpstreamProvider.h"
#include "config/Config.h"
#include "relay/PropertyRanking.h"
#include "relay/RelayExecUtil.h"
#include <moxygen/MoQSession.h>
#include <moxygen/relay/MoQForwarder.h>

#include <folly/Executor.h>
#include <folly/container/F14Map.h>
#include <folly/container/F14Set.h>
#include <optional>
#include <string>
#include <string_view>
#include <vector>

namespace openmoq::moqx {
class CrossExecForwarderCallback;
} // namespace openmoq::moqx

namespace openmoq::moqx {

// Visitor interface for relay state inspection.
Expand Down Expand Up @@ -115,6 +121,21 @@ class MoqxRelay : public moxygen::Publisher,
}
}

// Optionally isolate relay state on a dedicated executor thread.
// When set, all public entry points switch to relayExec before touching
// relay state, and consumer callbacks to/from sessions are wrapped with
// cross-executor filters. relayExec must outlive this relay.
// If not set (default), all operations run on the calling thread.
void setRelayExec(folly::Executor* relayExec) { relayExec_ = relayExec; }

// Takes ownership of exec and uses it as the relay executor.
void setRelayExec(std::shared_ptr<folly::Executor> exec) {
ownedRelayExec_ = std::move(exec);
relayExec_ = ownedRelayExec_.get();
}

folly::Executor* getRelayExec() const { return relayExec_; }

void setAllowedNamespacePrefix(moxygen::TrackNamespace allowed) {
allowedNamespacePrefix_ = std::move(allowed);
}
Expand Down Expand Up @@ -247,17 +268,41 @@ class MoqxRelay : public moxygen::Publisher,
void forwardChanged(moxygen::MoQForwarder* forwarder, bool forward) override;
void newGroupRequested(moxygen::MoQForwarder* forwarder, uint64_t group) override;

// FTN-keyed impl variants — called by CrossExecForwarderCallback (relay exec)
// or directly from the non-cross-exec callbacks above.
friend class CrossExecForwarderCallback;
void onEmptyImpl(const moxygen::FullTrackName& ftn);
void forwardChangedImpl(const moxygen::FullTrackName& ftn, bool forward);
void newGroupRequestedImpl(const moxygen::FullTrackName& ftn, uint64_t group);

folly::coro::Task<void> publishNamespaceToSession(
std::shared_ptr<moxygen::MoQSession> session,
moxygen::PublishNamespace pubNs,
std::shared_ptr<NamespaceTree::NamespaceNode> nodePtr
);

folly::coro::Task<void> publishToSession(
// Sync setup: addSubscriber → set pinned → session->publish (via
// SubscriberCrossExecFilter when subscriberExec is non-null) → set
// trackConsumer. Returns nullopt and cleans up on any synchronous failure.
struct PreparedPublish {
std::shared_ptr<moxygen::MoQForwarder::Subscriber> subscriber;
folly::coro::Task<folly::Expected<moxygen::PublishOk, moxygen::PublishError>> reply;
};
std::optional<PreparedPublish> startPublish(
std::shared_ptr<moxygen::MoQSession> session,
std::shared_ptr<moxygen::MoQForwarder> forwarder,
bool forward,
bool trackFilterSubscriber = false
bool pinned,
folly::Executor* subscriberExec
);

// Calls startPublish and fires the reply as a free-running coroutine.
// Returns false and cleans up on any synchronous failure.
bool addSubscriberAndPublish(
std::shared_ptr<moxygen::MoQSession> session,
std::shared_ptr<moxygen::MoQForwarder> forwarder,
bool forward,
bool pinned
);

folly::coro::Task<void>
Expand Down Expand Up @@ -319,6 +364,54 @@ class MoqxRelay : public moxygen::Publisher,
const moxygen::FullTrackName& ftn,
std::shared_ptr<moxygen::TrackConsumer> consumer
);

// Impl methods — run on relayExec_ when set, or inline when relayExec_==nullptr.
folly::coro::Task<SubscribeResult>
subscribeImpl(moxygen::SubscribeRequest subReq, std::shared_ptr<moxygen::TrackConsumer> consumer);
folly::coro::Task<FetchResult>
fetchImpl(moxygen::Fetch fetch, std::shared_ptr<moxygen::FetchConsumer> consumer);
folly::coro::Task<SubscribeNamespaceResult> subscribeNamespaceImpl(
moxygen::SubscribeNamespace subNs,
std::shared_ptr<NamespacePublishHandle> namespacePublishHandle
);
folly::coro::Task<moxygen::Subscriber::PublishNamespaceResult> publishNamespaceImpl(
moxygen::PublishNamespace pubNs,
std::shared_ptr<moxygen::Subscriber::PublishNamespaceCallback> callback
);
folly::coro::Task<moxygen::Publisher::TrackStatusResult> trackStatusImpl(moxygen::TrackStatus req
);
folly::coro::Task<void> onUpstreamConnectImpl(std::shared_ptr<moxygen::MoQSession> session);

// Contains all the inline publish() logic, taking session explicitly so it
// can be called from either the I/O thread (relayExec_==nullptr) or from
// coPublish on relay exec (where getRequestSession() would return null).
PublishResult publishWithSession(
moxygen::PublishRequest pub,
std::shared_ptr<moxygen::Publisher::SubscriptionHandle> handle,
std::shared_ptr<moxygen::MoQSession> session
);

std::shared_ptr<folly::Executor> ownedRelayExec_;
folly::Executor* relayExec_{nullptr};
// Only set in single-threaded mode (relayExec_ == null); used as the
// coroutine start executor for fire-and-forget tasks like doSubscribeUpdate.
folly::Executor* sessionExec_{nullptr};

void maybeSetSessionExec(moxygen::MoQSession& session) {
if (!relayExec_ && !sessionExec_) {
sessionExec_ = session.getExecutor();
}
}

folly::Executor* relayExec() const { return relayExec_ ? relayExec_ : sessionExec_; }

std::shared_ptr<moxygen::Publisher> findUpstreamPublisher(const moxygen::TrackNamespace& ns) {
auto session = namespaceTree_.findPublisherSession(ns);
if (!session) {
return nullptr;
}
return maybeWrapPublisher(relayExec_, std::move(session));
}
std::unique_ptr<MoqxCache> cache_;
uint64_t maxDeselected_{kDefaultMaxDeselected};

Expand All @@ -329,12 +422,15 @@ class MoqxRelay : public moxygen::Publisher,
};

// Creates a NamespacePublishHandle that bridges NAMESPACE/NAMESPACE_DONE
// messages from a peer relay into relay->doPublishNamespace() synchronously.
// Used for both the initiating (UpstreamProvider) and reciprocal (MoqxRelay) paths.
// messages from a peer relay into relay->doPublishNamespace(). When relayExec
// is non-null, callbacks are dispatched to it so relay state is only mutated
// on the relay executor thread. Used for both the initiating (UpstreamProvider)
// and reciprocal (MoqxRelay) paths.
std::shared_ptr<moxygen::Publisher::NamespacePublishHandle> makeNamespaceBridgeHandle(
std::weak_ptr<MoqxRelay> relay,
std::shared_ptr<moxygen::MoQSession> session,
std::string peerID = {}
std::string peerID = {},
folly::Executor* relayExec = nullptr
);

} // namespace openmoq::moqx
77 changes: 60 additions & 17 deletions src/MoqxRelayContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@
*/

#include "MoqxRelayContext.h"
#include "relay/PublisherCrossExecFilter.h"
#include "relay/RelayExecUtil.h"
#include "relay/SubscriberCrossExecFilter.h"
#include "stats/MoQStatsCollector.h"
#include <moxygen/events/MoQFollyExecutorImpl.h>
#include <moxygen/util/InsecureVerifierDangerousDoNotUseInProduction.h>

#include <folly/coro/Task.h>
#include <folly/executors/thread_factory/NamedThreadFactory.h>
#include <folly/logging/xlog.h>

using namespace moxygen;
Expand All @@ -18,11 +22,27 @@ namespace openmoq::moqx {

MoqxRelayContext::MoqxRelayContext(
const folly::F14FastMap<std::string, config::ServiceConfig>& services,
const std::string& relayID
const std::string& relayID,
bool useRelayThread
)
: serviceMatcher_(services), relayID_(relayID) {
for (const auto& [name, svc] : services) {
services_.emplace(name, ServiceEntry{svc, std::make_shared<MoqxRelay>(svc.cache, relayID)});
if (useRelayThread && !services.empty()) {
relayThreadPool_ = std::make_shared<folly::IOThreadPoolExecutor>(
services.size(),
std::make_shared<folly::NamedThreadFactory>("moqx-relay")
);
auto evbs = relayThreadPool_->getAllEventBases();
XCHECK_EQ(evbs.size(), services.size());
size_t i = 0;
for (const auto& [name, svc] : services) {
auto relay = std::make_shared<MoqxRelay>(svc.cache, relayID);
relay->setRelayExec(std::make_shared<moxygen::MoQFollyExecutorImpl>(evbs[i++].get()));
services_.emplace(name, ServiceEntry{svc, std::move(relay)});
}
} else {
for (const auto& [name, svc] : services) {
services_.emplace(name, ServiceEntry{svc, std::make_shared<MoqxRelay>(svc.cache, relayID)});
}
}
}

Expand Down Expand Up @@ -53,26 +73,39 @@ void MoqxRelayContext::initUpstreams(folly::EventBase* workerEvb) {
CHECK(workerEvb) << "initUpstreams: workerEvb must not be null";
workerEvb_ = workerEvb;

// Use the provided worker EVB for all upstream connections.
// Per-EVB providers (one per worker thread) are a follow-up.
auto exec = std::make_shared<MoQFollyExecutorImpl>(workerEvb);

auto workerExec = std::make_shared<moxygen::MoQFollyExecutorImpl>(workerEvb);
for (auto& [name, entry] : services_) {
if (!entry.config.upstream) {
continue;
}
const auto& cfg = *entry.config.upstream;
auto verifier = makeUpstreamVerifier(cfg.tls);
auto relay = entry.relay;
auto onConnect = [relay](std::shared_ptr<MoQSession> session) -> folly::coro::Task<void> {
co_await relay->onUpstreamConnect(session);
auto* relayExec = relay->getRelayExec();
auto onConnect = [relay,
relayExec](std::shared_ptr<MoQSession> session) -> folly::coro::Task<void> {
if (relayExec) {
co_return co_await folly::coro::co_withExecutor(
folly::getKeepAliveToken(relayExec),
relay->onUpstreamConnect(session)
);
}
co_return co_await relay->onUpstreamConnect(session);
};
auto onDisconnect = [relay, relayExec]() {
runOnExec(relayExec, [relay]() { relay->onUpstreamDisconnect(); });
};
auto onDisconnect = [relay]() { relay->onUpstreamDisconnect(); };
std::shared_ptr<moxygen::Publisher> pubHandler = relay;
std::shared_ptr<moxygen::Subscriber> subHandler = relay;
if (relayExec) {
pubHandler = std::make_shared<PublisherCrossExecFilter>(relayExec, relay);
subHandler = std::make_shared<SubscriberCrossExecFilter>(relayExec, relay);
}
auto provider = std::make_shared<UpstreamProvider>(
exec,
workerExec,
proxygen::URL(cfg.url),
/*publishHandler=*/entry.relay,
/*subscribeHandler=*/entry.relay,
/*publishHandler=*/pubHandler,
/*subscribeHandler=*/subHandler,
verifier,
std::move(onConnect),
std::move(onDisconnect),
Expand All @@ -84,7 +117,7 @@ void MoqxRelayContext::initUpstreams(folly::EventBase* workerEvb) {
// Eagerly connect so the peering handshake fires before any subscribers
// arrive. The connection is lazy in UpstreamProvider but we kick it off
// now so the upstream namespace tree is ready.
co_withExecutor(workerEvb, provider->start()).start();
co_withExecutor(workerExec.get(), provider->start()).start();
}
}

Expand Down Expand Up @@ -158,11 +191,21 @@ folly::Expected<folly::Unit, SessionCloseErrorCode> MoqxRelayContext::validateAu
return folly::makeUnexpected(SessionCloseErrorCode::INVALID_AUTHORITY);
}

// Route: set per-service relay as handler
// Route: set per-service relay as handler, wrapping in cross-exec filters if needed
auto it = services_.find(*matchedName);
CHECK(it != services_.end()) << "Service '" << *matchedName << "' matched but no entry found";
session->setPublishHandler(it->second.relay);
session->setSubscribeHandler(it->second.relay);
auto* relayExec = it->second.relay->getRelayExec();
if (relayExec) {
session->setPublishHandler(
std::make_shared<PublisherCrossExecFilter>(relayExec, it->second.relay)
);
session->setSubscribeHandler(
std::make_shared<SubscriberCrossExecFilter>(relayExec, it->second.relay)
);
} else {
session->setPublishHandler(it->second.relay);
session->setSubscribeHandler(it->second.relay);
}
return folly::unit;
}

Expand Down
9 changes: 8 additions & 1 deletion src/MoqxRelayContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <folly/Expected.h>
#include <folly/container/F14Map.h>
#include <folly/coro/Task.h>
#include <folly/executors/IOThreadPoolExecutor.h>
#include <folly/io/async/EventBase.h>

#include <optional>
Expand Down Expand Up @@ -54,7 +55,8 @@ class MoqxRelayContext {

MoqxRelayContext(
const folly::F14FastMap<std::string, config::ServiceConfig>& services,
const std::string& relayID
const std::string& relayID,
bool useRelayThread = true
);

void setStatsRegistry(std::shared_ptr<stats::StatsRegistry> registry);
Expand Down Expand Up @@ -108,6 +110,11 @@ class MoqxRelayContext {
);

private:
// When use_relay_thread=true: one dedicated thread per service, each with its
// own executor, isolating relay state from I/O threads. Null when disabled.
// Each relay owns its MoQFollyExecutorImpl; the pool just keeps threads alive.
std::shared_ptr<folly::IOThreadPoolExecutor> relayThreadPool_;

folly::F14FastMap<std::string, ServiceEntry> services_;
ServiceMatcher serviceMatcher_;
std::string relayID_;
Expand Down
Loading
Loading