From 87344a3036849b295dfaef4374ea80177407c21d Mon Sep 17 00:00:00 2001 From: afrind Date: Sun, 31 May 2026 23:15:19 -0400 Subject: [PATCH 1/3] test: parameterize relay tests for SingleThread and MultiThread modes Converts all MoQRelayTest TEST_F cases to TEST_P and instantiates them under AllModes/MoQRelayTest with named variants (SingleThread, MultiThread). MoqxTrackFilterTest overrides relayMode() to always return SingleThread so it stays as TEST_F without GetParam() UB. MoqxRelayTestModes.cpp is added only to moqx_relay_test (not the shared fixture library) to avoid a GTest warning in moqx_track_filter_test. --- test/CMakeLists.txt | 1 + test/MoqxRelayDataPlaneTests.cpp | 6 +++--- test/MoqxRelayFetchTests.cpp | 2 +- test/MoqxRelayNGRTests.cpp | 12 ++++++------ test/MoqxRelayPeerTests.cpp | 14 +++++++------- test/MoqxRelayPublishTests.cpp | 20 ++++++++++---------- test/MoqxRelaySubNsTests.cpp | 12 ++++++------ test/MoqxRelaySubscribeTests.cpp | 4 ++-- test/MoqxRelayTestFixture.h | 16 +++++++++++++++- test/MoqxRelayTestModes.cpp | 22 ++++++++++++++++++++++ test/MoqxRelayTrackStatusTests.cpp | 6 +++--- test/MoqxTrackFilterTest.cpp | 2 ++ 12 files changed, 78 insertions(+), 39 deletions(-) create mode 100644 test/MoqxRelayTestModes.cpp diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index bc86edca..dd480153 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -39,6 +39,7 @@ add_executable(moqx_relay_test MoqxRelayTrackStatusTests.cpp MoqxRelayNGRTests.cpp MoqxRelayPeerTests.cpp + MoqxRelayTestModes.cpp ) target_link_libraries(moqx_relay_test PRIVATE moqx_test_fixture diff --git a/test/MoqxRelayDataPlaneTests.cpp b/test/MoqxRelayDataPlaneTests.cpp index 2aa06061..e1daf0a7 100644 --- a/test/MoqxRelayDataPlaneTests.cpp +++ b/test/MoqxRelayDataPlaneTests.cpp @@ -14,7 +14,7 @@ namespace moxygen::test { // new ones. // Sequence: publish, 2 subscribers, beginSubgroup, beginSubgroup again -> // first consumers get reset, both subscribers get new consumers. -TEST_F(MoQRelayTest, DuplicateSubgroupReplacesActiveConsumers) { +TEST_P(MoQRelayTest, DuplicateSubgroupReplacesActiveConsumers) { auto publisherSession = createMockSession(); auto sub1 = createMockSession(); auto sub2 = createMockSession(); @@ -75,7 +75,7 @@ TEST_F(MoQRelayTest, DuplicateSubgroupReplacesActiveConsumers) { // Test: Duplicate beginSubgroup after all subscribers have stop_sending'd // returns CANCELLED to propagate the signal back to the publisher. -TEST_F(MoQRelayTest, DuplicateSubgroupCancelledWhenNoActiveConsumers) { +TEST_P(MoQRelayTest, DuplicateSubgroupCancelledWhenNoActiveConsumers) { auto publisherSession = createMockSession(); auto subscriber = createMockSession(); @@ -114,7 +114,7 @@ TEST_F(MoQRelayTest, DuplicateSubgroupCancelledWhenNoActiveConsumers) { // Test: Duplicate beginSubgroup with partial stop_sending - active subscriber // gets reset and new consumer; tombstoned subscriber is skipped. -TEST_F(MoQRelayTest, DuplicateSubgroupSkipsTombstonedSubscriber) { +TEST_P(MoQRelayTest, DuplicateSubgroupSkipsTombstonedSubscriber) { auto publisherSession = createMockSession(); auto subA = createMockSession(); auto subB = createMockSession(); diff --git a/test/MoqxRelayFetchTests.cpp b/test/MoqxRelayFetchTests.cpp index b7f488c4..948bbf64 100644 --- a/test/MoqxRelayFetchTests.cpp +++ b/test/MoqxRelayFetchTests.cpp @@ -14,7 +14,7 @@ namespace moxygen::test { // crash. When findPublishNamespaceSession returns null (no publishNamespace), // fetch falls back to subscriptions_. After onPublishDone, upstream is null // but the subscription entry remains if the forwarder has subscribers. -TEST_F(MoQRelayTest, FetchAfterPublisherTermination) { +TEST_P(MoQRelayTest, FetchAfterPublisherTermination) { auto publisherSession = createMockSession(); auto subSession = createMockSession(); auto fetchSession = createMockSession(); diff --git a/test/MoqxRelayNGRTests.cpp b/test/MoqxRelayNGRTests.cpp index 67a45b5b..827fb86d 100644 --- a/test/MoqxRelayNGRTests.cpp +++ b/test/MoqxRelayNGRTests.cpp @@ -12,7 +12,7 @@ namespace moxygen::test { // Test: relay PUBLISH path – dynamic groups from PublishRequest extensions // is stored in the forwarder and forwarded to every downstream subscriber -TEST_F(MoQRelayTest, RelayPublishPropagatesDynamicGroupsToSubscribers) { +TEST_P(MoQRelayTest, RelayPublishPropagatesDynamicGroupsToSubscribers) { auto publisherSession = createMockSession(); auto subscriberSession = createMockSession(); @@ -45,7 +45,7 @@ TEST_F(MoQRelayTest, RelayPublishPropagatesDynamicGroupsToSubscribers) { // Test: relay SUBSCRIBE path – dynamic groups from the upstream SubscribeOk is // stored in the forwarder and forwarded to both the first and late-joining // downstream subscribers -TEST_F(MoQRelayTest, RelaySubscribePropagatesDynamicGroupsToAllSubscribers) { +TEST_P(MoQRelayTest, RelaySubscribePropagatesDynamicGroupsToAllSubscribers) { auto publisherSession = createMockSession(); auto subscriber1 = createMockSession(); auto subscriber2 = createMockSession(); @@ -92,7 +92,7 @@ TEST_F(MoQRelayTest, RelaySubscribePropagatesDynamicGroupsToAllSubscribers) { // Relay test: When a late-joining subscriber sends NEW_GROUP_REQUEST in its // SUBSCRIBE, the relay forwards it upstream via REQUEST_UPDATE -TEST_F(MoQRelayTest, RelaySubscribeLateJoinerNGRForwardedUpstream) { +TEST_P(MoQRelayTest, RelaySubscribeLateJoinerNGRForwardedUpstream) { auto publisherSession = createMockSession(); auto subscriber1 = createMockSession(); auto subscriber2 = createMockSession(); @@ -163,7 +163,7 @@ TEST_F(MoQRelayTest, RelaySubscribeLateJoinerNGRForwardedUpstream) { // Relay test: A downstream subscriber sending REQUEST_UPDATE with // NEW_GROUP_REQUEST causes the relay to cascade the NGR upstream -TEST_F(MoQRelayTest, RelayRequestUpdateNGRCascadedUpstream) { +TEST_P(MoQRelayTest, RelayRequestUpdateNGRCascadedUpstream) { auto publisherSession = createMockSession(); auto subscriberSession = createMockSession(); @@ -218,7 +218,7 @@ TEST_F(MoQRelayTest, RelayRequestUpdateNGRCascadedUpstream) { // Relay test: downstream subscriber returns PublishOk carrying NEW_GROUP_REQUEST; // relay cascades NGR to the publisher handle upstream via REQUEST_UPDATE -TEST_F(MoQRelayTest, PublishOkNewNGRForwardedUpstream) { +TEST_P(MoQRelayTest, PublishOkNewNGRForwardedUpstream) { auto publisherSession = createMockSession(); auto subscriberSession = createMockSession(); @@ -279,7 +279,7 @@ TEST_F(MoQRelayTest, PublishOkNewNGRForwardedUpstream) { // Relay test: a second subscriber returning the same NEW_GROUP_REQUEST value in // its PublishOk is deduplicated; the upstream handle receives exactly one // REQUEST_UPDATE -TEST_F(MoQRelayTest, PublishOkDuplicateNGRNotForwardedUpstream) { +TEST_P(MoQRelayTest, PublishOkDuplicateNGRNotForwardedUpstream) { auto publisherSession = createMockSession(); auto subscriber1 = createMockSession(); auto subscriber2 = createMockSession(); diff --git a/test/MoqxRelayPeerTests.cpp b/test/MoqxRelayPeerTests.cpp index dbbc5769..086a0d0d 100644 --- a/test/MoqxRelayPeerTests.cpp +++ b/test/MoqxRelayPeerTests.cpp @@ -12,7 +12,7 @@ namespace moxygen::test { // Test: makeNamespaceBridgeHandle routes namespaceMsg to doPublishNamespace -TEST_F(MoQRelayTest, NamespaceBridgeHandleForwardsNamespaceMsg) { +TEST_P(MoQRelayTest, NamespaceBridgeHandleForwardsNamespaceMsg) { auto peerSession = createMockSession(); // Bridge handle routes NAMESPACE messages from peerSession into the relay. @@ -32,7 +32,7 @@ TEST_F(MoQRelayTest, NamespaceBridgeHandleForwardsNamespaceMsg) { } // Test: makeNamespaceBridgeHandle routes namespaceDoneMsg to doPublishNamespaceDone -TEST_F(MoQRelayTest, NamespaceBridgeHandleForwardsDoneMsg) { +TEST_P(MoQRelayTest, NamespaceBridgeHandleForwardsDoneMsg) { auto peerSession = createMockSession(); auto handle = makeNamespaceBridgeHandle(relay_, peerSession); @@ -65,7 +65,7 @@ TEST_F(MoQRelayTest, NamespaceBridgeHandleForwardsDoneMsg) { // // Production relays negotiate draft-16 (empty prefix allowed). The delivery // path for draft-16 is synchronous: namespacePublishHandle->namespaceMsg(). -TEST_F(MoQRelayTest, PeerNamespaceNotEchoedBackOnReconnect) { +TEST_P(MoQRelayTest, PeerNamespaceNotEchoedBackOnReconnect) { // Relay must have a relayID for peer detection to activate. resetRelay(std::make_shared(config::CacheConfig{.maxCachedTracks = 0}, "sg-sin-2-1")); relay_->setAllowedNamespacePrefix(kAllowedPrefix); @@ -107,7 +107,7 @@ TEST_F(MoQRelayTest, PeerNamespaceNotEchoedBackOnReconnect) { // Complement: namespaces from LOCAL publishers (not from the peer) must still // be delivered when that peer subscribes. -TEST_F(MoQRelayTest, LocalNamespaceDeliveredToPeerOnReconnect) { +TEST_P(MoQRelayTest, LocalNamespaceDeliveredToPeerOnReconnect) { resetRelay(std::make_shared(config::CacheConfig{.maxCachedTracks = 0}, "sg-sin-2-1")); relay_->setAllowedNamespacePrefix(kAllowedPrefix); @@ -183,7 +183,7 @@ class PeerAnnounceSession : public NiceMock { // directly via makeNamespaceBridgeHandle), this test goes through the full // publisherInterface()->subscribeNamespace() production path so the bug in the call-site is // exercised. -TEST_F(MoQRelayTest, PeerNamespaceNotEchoedBack_FullProductionPath) { +TEST_P(MoQRelayTest, PeerNamespaceNotEchoedBack_FullProductionPath) { resetRelay(std::make_shared(config::CacheConfig{.maxCachedTracks = 0}, "sg-sin-2-1")); relay_->setAllowedNamespacePrefix(kAllowedPrefix); @@ -237,7 +237,7 @@ TEST_F(MoQRelayTest, PeerNamespaceNotEchoedBack_FullProductionPath) { // without graceful namespaceDoneMsg calls, tree entries it created must be // cleaned up so stale sourceSession shared_ptrs don't keep dead session objects // alive and downstream subscribers receive NAMESPACE_DONE. -TEST_F(MoQRelayTest, BridgeHandleDestructorCleansUpNamespaces) { +TEST_P(MoQRelayTest, BridgeHandleDestructorCleansUpNamespaces) { auto upstreamSession = createMockSession(); // Simulate the bridge path: create a handle and announce a namespace through @@ -264,7 +264,7 @@ TEST_F(MoQRelayTest, BridgeHandleDestructorCleansUpNamespaces) { // Verify that when a new publisher takes over a namespace before the old // bridge handle is destroyed, the stale handle's destructor does NOT evict // the new publisher's entry (doPublishNamespaceDone guards on sourceSession). -TEST_F(MoQRelayTest, BridgeHandleDestructorDoesNotEvictNewPublisher) { +TEST_P(MoQRelayTest, BridgeHandleDestructorDoesNotEvictNewPublisher) { auto session1 = createMockSession(); auto session2 = createMockSession(); diff --git a/test/MoqxRelayPublishTests.cpp b/test/MoqxRelayPublishTests.cpp index a93c13a8..ef857123 100644 --- a/test/MoqxRelayPublishTests.cpp +++ b/test/MoqxRelayPublishTests.cpp @@ -11,7 +11,7 @@ namespace moxygen::test { // Test: Verify allowed namespace prefix is set correctly -TEST_F(MoQRelayTest, AllowedNamespacePrefix) { +TEST_P(MoQRelayTest, AllowedNamespacePrefix) { // This just verifies the relay can be constructed with a namespace prefix // More detailed testing requires full session setup auto relay2 = std::make_shared(config::CacheConfig{ @@ -23,7 +23,7 @@ TEST_F(MoQRelayTest, AllowedNamespacePrefix) { } // Test: Publish a track through the relay -TEST_F(MoQRelayTest, PublishSuccess) { +TEST_P(MoQRelayTest, PublishSuccess) { auto publisherSession = createMockSession(); // Publish the namespace @@ -48,7 +48,7 @@ TEST_F(MoQRelayTest, PublishSuccess) { // Test: Extensions from publish are forwarded to subscribers via // subscribeNamespace -TEST_F(MoQRelayTest, PublishExtensionsForwardedToSubscribers) { +TEST_P(MoQRelayTest, PublishExtensionsForwardedToSubscribers) { auto publisherSession = createMockSession(); auto subscriber = createMockSession(); @@ -107,7 +107,7 @@ TEST_F(MoQRelayTest, PublishExtensionsForwardedToSubscribers) { // ============================================================ // Test: Extensions from publish are forwarded to late-joining subscribers -TEST_F(MoQRelayTest, PublishExtensionsForwardedToLateJoiners) { +TEST_P(MoQRelayTest, PublishExtensionsForwardedToLateJoiners) { auto publisherSession = createMockSession(); auto subscriber1 = createMockSession(); auto subscriber2 = createMockSession(); @@ -199,7 +199,7 @@ TEST_F(MoQRelayTest, PublishExtensionsForwardedToLateJoiners) { // 4. Session A reconnects and re-publishes the same track. The multipublisher // check finds the surviving entry and calls it->second.handle->unsubscribe() // — null-pointer dereference, SIGSEGV. -TEST_F(MoQRelayTest, PublisherReconnectWithOpenSubgroupNoSegfault) { +TEST_P(MoQRelayTest, PublisherReconnectWithOpenSubgroupNoSegfault) { auto publisherSession = createMockSession(); auto subscriberSession = createMockSession(); @@ -260,7 +260,7 @@ TEST_F(MoQRelayTest, PublisherReconnectWithOpenSubgroupNoSegfault) { // old forwarder's subscribers must receive publishDone, and the new // publish-path subscription must be fully functional (accepting data from the // new publisher). -TEST_F(MoQRelayTest, PublishReplacesSubscribeDrainsOldAndServesNew) { +TEST_P(MoQRelayTest, PublishReplacesSubscribeDrainsOldAndServesNew) { auto publisherSession = createMockSession(); auto subscriberSession = createMockSession(); @@ -334,7 +334,7 @@ TEST_F(MoQRelayTest, PublishReplacesSubscribeDrainsOldAndServesNew) { // ScopeGuardImplBase::terminate() → std::terminate (exit code 139). // // Without the fix: crashes. With the fix: subscribe returns an error cleanly. -TEST_F(MoQRelayTest, PublishReconnectDuringSubscribeScopeGuardCrash) { +TEST_P(MoQRelayTest, PublishReconnectDuringSubscribeScopeGuardCrash) { auto publisherSession1 = createMockSession(); auto publisherSession2 = createMockSession(); auto subscriberSession = createMockSession(); @@ -423,7 +423,7 @@ TEST_F(MoQRelayTest, PublishReconnectDuringSubscribeScopeGuardCrash) { // entry (promise already satisfied), and rsub.promise.setValue() throws // PromiseAlreadySatisfied, which propagates as an unhandled coroutine exception. // With the fix: subscribe returns SUBSCRIBE_ERROR "publisher reconnected". -TEST_F(MoQRelayTest, PublishReconnectDuringSubscribeSuccessPathCrash) { +TEST_P(MoQRelayTest, PublishReconnectDuringSubscribeSuccessPathCrash) { auto publisherSession1 = createMockSession(); auto publisherSession2 = createMockSession(); auto subscriberSession = createMockSession(); @@ -498,7 +498,7 @@ TEST_F(MoQRelayTest, PublishReconnectDuringSubscribeSuccessPathCrash) { // Regression: after publishDone the namespace-tree node must be pruned when // the track was the only remaining content. (Was a bug before unpublishTrack // gained a NodeMutationGuard; kept as a regression guard.) -TEST_F(MoQRelayTest, PublishDonePrunesNamespaceTreeNode) { +TEST_P(MoQRelayTest, PublishDonePrunesNamespaceTreeNode) { auto publisher = createMockSession(); doPublishNamespace(publisher, kTestNamespace); @@ -541,7 +541,7 @@ TEST_F(MoQRelayTest, PublishDonePrunesNamespaceTreeNode) { } // Empty namespace: publishNamespace with an empty TrackNamespace must not crash. -TEST_F(MoQRelayTest, EmptyNamespacePublishNamespaceDone) { +TEST_P(MoQRelayTest, EmptyNamespacePublishNamespaceDone) { auto publisher = createMockSession(); TrackNamespace emptyNs{{}}; diff --git a/test/MoqxRelaySubNsTests.cpp b/test/MoqxRelaySubNsTests.cpp index bf63e113..c89d2cf3 100644 --- a/test/MoqxRelaySubNsTests.cpp +++ b/test/MoqxRelaySubNsTests.cpp @@ -10,7 +10,7 @@ namespace moxygen::test { -TEST_F(MoQRelayTest, SubscribeNamespaceDoesntAddDrainingPublish) { +TEST_P(MoQRelayTest, SubscribeNamespaceDoesntAddDrainingPublish) { auto publisherSession = createMockSession(); auto subscriber1 = createMockSession(); auto subscriber2 = createMockSession(); @@ -89,7 +89,7 @@ TEST_F(MoQRelayTest, SubscribeNamespaceDoesntAddDrainingPublish) { removeSession(subscriber2); } -TEST_F(MoQRelayTest, SubscribeNamespaceEmptyPrefixRejectedPreV16) { +TEST_P(MoQRelayTest, SubscribeNamespaceEmptyPrefixRejectedPreV16) { // Default session uses kVersionDraftCurrent (draft-14, which is < 16) auto session = createMockSession(); @@ -109,7 +109,7 @@ TEST_F(MoQRelayTest, SubscribeNamespaceEmptyPrefixRejectedPreV16) { removeSession(session); } -TEST_F(MoQRelayTest, SubscribeNamespaceEmptyPrefixAllowedV16) { +TEST_P(MoQRelayTest, SubscribeNamespaceEmptyPrefixAllowedV16) { auto session = createMockSession(); // Override the negotiated version to draft-16 ON_CALL(*session, getNegotiatedVersion()) @@ -121,7 +121,7 @@ TEST_F(MoQRelayTest, SubscribeNamespaceEmptyPrefixAllowedV16) { removeSession(session); } -TEST_F(MoQRelayTest, ExactNamespaceSubscriberReceivesPublishNamespace) { +TEST_P(MoQRelayTest, ExactNamespaceSubscriberReceivesPublishNamespace) { auto subscriber = createMockSession(); auto publisher = createMockSession(); @@ -156,7 +156,7 @@ TEST_F(MoQRelayTest, ExactNamespaceSubscriberReceivesPublishNamespace) { // forwarder is empty, the relay fires REQUEST_UPDATE twice — once explicitly // at the if(forwarder->empty()) site and once via forwardChanged() when // addSubscriber() increments numForwardingSubscribers from 0 to 1. -TEST_F(MoQRelayTest, SubscribeNs_ForwardTrue_EmptyForwarder_SingleRequestUpdate) { +TEST_P(MoQRelayTest, SubscribeNs_ForwardTrue_EmptyForwarder_SingleRequestUpdate) { auto pubSession = createMockSession(); doPublishNamespace(pubSession, kTestNamespace); auto mockHandle = makePublishHandle(); @@ -192,7 +192,7 @@ TEST_F(MoQRelayTest, SubscribeNs_ForwardTrue_EmptyForwarder_SingleRequestUpdate) // forwarder is empty, the relay fires a spurious REQUEST_UPDATE(forward=false) // at the if(forwarder->empty()) site — even though the upstream is already at // forward=false (set by publish() which found no subscribers). -TEST_F(MoQRelayTest, SubscribeNs_ForwardFalse_EmptyForwarder_NoRequestUpdate) { +TEST_P(MoQRelayTest, SubscribeNs_ForwardFalse_EmptyForwarder_NoRequestUpdate) { auto pubSession = createMockSession(); doPublishNamespace(pubSession, kTestNamespace); auto mockHandle = makePublishHandle(); diff --git a/test/MoqxRelaySubscribeTests.cpp b/test/MoqxRelaySubscribeTests.cpp index cb2075a1..816b19c2 100644 --- a/test/MoqxRelaySubscribeTests.cpp +++ b/test/MoqxRelaySubscribeTests.cpp @@ -14,7 +14,7 @@ namespace moxygen::test { // terminated (onPublishDone clears handle/upstream). We trigger forwardChanged // via Subscriber::requestUpdate changing forward from true→false (1→0 // transition). The subscriber survives drain because it has an open subgroup. -TEST_F(MoQRelayTest, ForwardChangedAfterPublisherTermination) { +TEST_P(MoQRelayTest, ForwardChangedAfterPublisherTermination) { auto publisherSession = createMockSession(); auto subSession = createMockSession(); @@ -68,7 +68,7 @@ TEST_F(MoQRelayTest, ForwardChangedAfterPublisherTermination) { // twice — once via forwardChanged() (which fires synchronously inside addSubscriber // via addForwardingSubscriber) and once via the explicit block at the end of the // subscribe() else-branch. Analogous to the subscribeNamespace bug fixed in this PR. -TEST_F(MoQRelayTest, Subscribe_SecondForwardingSubscriber_SingleRequestUpdate) { +TEST_P(MoQRelayTest, Subscribe_SecondForwardingSubscriber_SingleRequestUpdate) { auto pubSession = createMockSession(); doPublishNamespace(pubSession, kTestNamespace); auto mockHandle = makePublishHandle(); diff --git a/test/MoqxRelayTestFixture.h b/test/MoqxRelayTestFixture.h index 187c6124..245a1cbe 100644 --- a/test/MoqxRelayTestFixture.h +++ b/test/MoqxRelayTestFixture.h @@ -29,6 +29,18 @@ using namespace openmoq::moqx; namespace moxygen::test { +enum class RelayMode { + SingleThread, +}; + +inline void PrintTo(RelayMode mode, std::ostream* os) { + switch (mode) { + case RelayMode::SingleThread: + *os << "SingleThread"; + return; + } +} + inline const TrackNamespace kTestNamespace{{"test", "namespace"}}; inline const TrackNamespace kAllowedPrefix{{"test"}}; inline const FullTrackName kTestTrackName{kTestNamespace, "track1"}; @@ -48,8 +60,10 @@ class TestMoQExecutor : public MoQFollyExecutorImpl, public folly::DrivableExecu }; // Test fixture for MoqxRelay and NamespaceTree tests. -class MoQRelayTest : public ::testing::Test { +class MoQRelayTest : public ::testing::TestWithParam { protected: + virtual RelayMode relayMode() const { return GetParam(); } + void SetUp() override; void TearDown() override; diff --git a/test/MoqxRelayTestModes.cpp b/test/MoqxRelayTestModes.cpp new file mode 100644 index 00000000..38faea89 --- /dev/null +++ b/test/MoqxRelayTestModes.cpp @@ -0,0 +1,22 @@ +/* + * Copyright (c) OpenMOQ contributors. + */ + +#include "MoqxRelayTestFixture.h" + +namespace moxygen::test { + +INSTANTIATE_TEST_SUITE_P( + AllModes, + MoQRelayTest, + ::testing::Values(RelayMode::SingleThread), + [](const ::testing::TestParamInfo& info) -> std::string { + switch (info.param) { + case RelayMode::SingleThread: + return "SingleThread"; + } + return "Unknown"; + } +); + +} // namespace moxygen::test diff --git a/test/MoqxRelayTrackStatusTests.cpp b/test/MoqxRelayTrackStatusTests.cpp index c4dc7518..b222a10f 100644 --- a/test/MoqxRelayTrackStatusTests.cpp +++ b/test/MoqxRelayTrackStatusTests.cpp @@ -11,7 +11,7 @@ namespace moxygen::test { // Test: TrackStatus on non-existent track -TEST_F(MoQRelayTest, TrackStatusNonExistentTrack) { +TEST_P(MoQRelayTest, TrackStatusNonExistentTrack) { auto clientSession = createMockSession(); // Request trackStatus for a track that doesn't exist @@ -34,7 +34,7 @@ TEST_F(MoQRelayTest, TrackStatusNonExistentTrack) { // Test: TrackStatus on existing track - returns forwarder state (no upstream // call) -TEST_F(MoQRelayTest, TrackStatusSuccessfulForward) { +TEST_P(MoQRelayTest, TrackStatusSuccessfulForward) { auto publisherSession = createMockSession(); auto clientSession = createMockSession(); @@ -67,7 +67,7 @@ TEST_F(MoQRelayTest, TrackStatusSuccessfulForward) { // Verifies that when there's no exact subscription but a publisher has // published a matching namespace prefix, the relay correctly routes // TRACK_STATUS upstream using prefix matching -TEST_F(MoQRelayTest, TrackStatusViaPrefixMatching) { +TEST_P(MoQRelayTest, TrackStatusViaPrefixMatching) { auto publisher = createMockSession(); auto requester = createMockSession(); diff --git a/test/MoqxTrackFilterTest.cpp b/test/MoqxTrackFilterTest.cpp index 50d6f69c..443fae18 100644 --- a/test/MoqxTrackFilterTest.cpp +++ b/test/MoqxTrackFilterTest.cpp @@ -31,6 +31,8 @@ constexpr uint64_t kPropType = 0x100; // audio level property type class MoqxTrackFilterTest : public moxygen::test::MoQRelayTest { protected: + RelayMode relayMode() const override { return RelayMode::SingleThread; } + void SetUp() override { MoQRelayTest::SetUp(); relay_ = std::make_shared( From 5d0c58c5fbb36a1c0404dc28447c04ec85182c50 Mon Sep 17 00:00:00 2001 From: afrind Date: Sun, 31 May 2026 23:43:03 -0400 Subject: [PATCH 2/3] relay_thread config and allow > 1 thread in config MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a use_relay_thread boolean config option (default: true) that controls whether relay state is isolated on a dedicated executor thread. Disabling it is intended for baseline performance comparison only. Also removes the hard error that rejected threads > 1, replacing it with a targeted check: threads > 1 requires use_relay_thread=true. This unlocks the config validation only — threads > 1 with use_relay_thread=true will race on shared relay state until the following commit wires up the dedicated relay executor. --- src/config/Config.h | 1 + src/config/ConfigResolver.cpp | 8 ++++++-- src/config/loader/ParsedConfig.h | 5 +++++ test/config/ConfigResolverTest.cpp | 21 +++++++++++++++++++-- 4 files changed, 31 insertions(+), 4 deletions(-) diff --git a/src/config/Config.h b/src/config/Config.h index 1f45b181..8efeccff 100644 --- a/src/config/Config.h +++ b/src/config/Config.h @@ -196,6 +196,7 @@ struct Config { std::optional admin; std::string relayID; // always set: from config or randomly generated uint32_t threads{1}; + bool useRelayThread{true}; bool mvfstBpfSteering{true}; }; diff --git a/src/config/ConfigResolver.cpp b/src/config/ConfigResolver.cpp index 33d7a8eb..c97e2885 100644 --- a/src/config/ConfigResolver.cpp +++ b/src/config/ConfigResolver.cpp @@ -822,8 +822,11 @@ folly::Expected resolveConfig(const ParsedConfig& c const uint32_t threads = config.threads.value().value_or(1); if (threads == 0) { errors.push_back("threads must be >= 1"); - } else if (threads > 1) { - errors.push_back("threads > 1 is not yet supported"); + } + + const bool useRelayThread = config.use_relay_thread.value().value_or(true); + if (threads > 1 && !useRelayThread) { + errors.push_back("use_relay_thread must be true when threads > 1"); } const bool mvfstBpfSteering = config.mvfst_bpf_steering.value().value_or(true); @@ -877,6 +880,7 @@ folly::Expected resolveConfig(const ParsedConfig& c .admin = std::move(adminConfig), .relayID = std::move(relayID), .threads = threads, + .useRelayThread = useRelayThread, .mvfstBpfSteering = mvfstBpfSteering, }, .warnings = std::move(warnings), diff --git a/src/config/loader/ParsedConfig.h b/src/config/loader/ParsedConfig.h index 178d6a2b..b075ff32 100644 --- a/src/config/loader/ParsedConfig.h +++ b/src/config/loader/ParsedConfig.h @@ -387,6 +387,11 @@ struct ParsedConfig { std::optional> listener_defaults; rfl::Description<"Number of IO worker threads (default: 1)", std::optional> threads; + rfl::Description< + "Dedicate one relay thread per service for relay state isolation (default: true). " + "Disable for baseline performance comparison.", + std::optional> + use_relay_thread; rfl::Description< "Attach a classic BPF reuseport filter to steer QUIC packets to the correct mvfst worker " "based on the connection ID's workerId field (Linux only, mvfst stack only, default: true). " diff --git a/test/config/ConfigResolverTest.cpp b/test/config/ConfigResolverTest.cpp index ac08fc73..37e8d82c 100644 --- a/test/config/ConfigResolverTest.cpp +++ b/test/config/ConfigResolverTest.cpp @@ -920,12 +920,29 @@ TEST(ResolveConfig, ThreadsZeroRejected) { EXPECT_THAT(result.error(), HasSubstr("threads must be >= 1")); } -TEST(ResolveConfig, ThreadsGreaterThanOneRejected) { +TEST(ResolveConfig, ThreadsGreaterThanOneAccepted) { auto cfg = makeMinimalInsecureConfig(); cfg.threads = std::optional{2}; auto result = resolveConfig(cfg); + ASSERT_TRUE(result.hasValue()); + EXPECT_EQ(result.value().config.threads, 2u); +} + +TEST(ResolveConfig, UseRelayThreadFalseWithOneThreadAccepted) { + auto cfg = makeMinimalInsecureConfig(); + cfg.use_relay_thread = std::optional{false}; + auto result = resolveConfig(cfg); + ASSERT_TRUE(result.hasValue()); + EXPECT_FALSE(result.value().config.useRelayThread); +} + +TEST(ResolveConfig, UseRelayThreadFalseWithMultipleThreadsRejected) { + auto cfg = makeMinimalInsecureConfig(); + cfg.threads = std::optional{2}; + cfg.use_relay_thread = std::optional{false}; + auto result = resolveConfig(cfg); ASSERT_TRUE(result.hasError()); - EXPECT_THAT(result.error(), HasSubstr("threads > 1 is not yet supported")); + EXPECT_THAT(result.error(), HasSubstr("use_relay_thread must be true when threads > 1")); } // --- multiple listeners tests --- From 0076f92c460ae777684464605e6526668627f566 Mon Sep 17 00:00:00 2001 From: afrind Date: Sun, 31 May 2026 23:43:03 -0400 Subject: [PATCH 3/3] relay: isolate relay state on dedicated executor to support multiple I/O threads --- deps/moxygen | 2 +- src/MoqxRelay.cpp | 356 ++++++++++++++++++++---------- src/MoqxRelay.h | 106 ++++++++- src/MoqxRelayContext.cpp | 77 +++++-- src/MoqxRelayContext.h | 9 +- src/SubscriptionRegistry.cpp | 17 +- src/SubscriptionRegistry.h | 18 +- src/main.cpp | 3 +- src/relay/RelayExecUtil.h | 72 ++++++ test/SubscriptionRegistryTest.cpp | 22 +- 10 files changed, 525 insertions(+), 157 deletions(-) create mode 100644 src/relay/RelayExecUtil.h diff --git a/deps/moxygen b/deps/moxygen index e282ba6c..2dff34e0 160000 --- a/deps/moxygen +++ b/deps/moxygen @@ -1 +1 @@ -Subproject commit e282ba6c871fed0fd0ebc90dc98d4893ae04f9df +Subproject commit 2dff34e0eaf67953c3512c1d6e5be6164770ce38 diff --git a/src/MoqxRelay.cpp b/src/MoqxRelay.cpp index dbe22713..ea0b9371 100644 --- a/src/MoqxRelay.cpp +++ b/src/MoqxRelay.cpp @@ -7,6 +7,9 @@ */ #include "MoqxRelay.h" +#include "relay/CrossExecForwarderCallback.h" +#include "relay/RelayExecUtil.h" +#include "relay/SubscriberCrossExecFilter.h" #include #include #include @@ -28,9 +31,11 @@ class MoqxRelayNamespaceHandle : public Publisher::NamespacePublishHandle { MoqxRelayNamespaceHandle( std::weak_ptr relay, std::shared_ptr session, - std::string peerID = {} + std::string peerID = {}, + folly::Executor* relayExec = nullptr ) - : relay_(std::move(relay)), session_(std::move(session)), peerID_(std::move(peerID)) {} + : relay_(std::move(relay)), session_(std::move(session)), peerID_(std::move(peerID)), + relayExec_(relayExec) {} ~MoqxRelayNamespaceHandle() { auto relay = relay_.lock(); @@ -38,52 +43,68 @@ class MoqxRelayNamespaceHandle : public Publisher::NamespacePublishHandle { return; } for (const auto& ns : activeNamespaces_) { - relay->doPublishNamespaceDone(ns, session_); + runOnExec(relayExec_, [relay, ns, session = session_]() mutable { + relay->doPublishNamespaceDone(ns, session); + }); } } void namespaceMsg(const TrackNamespace& suffix) override { - auto relay = relay_.lock(); - if (!relay || !session_) { - return; - } activeNamespaces_.insert(suffix); PublishNamespace pubNs; pubNs.trackNamespace = suffix; - relay->doPublishNamespace(std::move(pubNs), session_, nullptr, peerID_); + runOnExec( + relayExec_, + [relay = relay_, pubNs = std::move(pubNs), session = session_, peerID = peerID_]() mutable { + if (auto r = relay.lock()) { + r->doPublishNamespace(std::move(pubNs), session, nullptr, peerID); + } + } + ); } void namespaceDoneMsg(const TrackNamespace& suffix) override { - auto relay = relay_.lock(); - if (!relay || !session_) { - return; - } activeNamespaces_.erase(suffix); - relay->doPublishNamespaceDone(suffix, session_); + runOnExec(relayExec_, [relay = relay_, suffix, session = session_]() mutable { + if (auto r = relay.lock()) { + r->doPublishNamespaceDone(suffix, session); + } + }); } private: std::weak_ptr relay_; std::shared_ptr session_; std::string peerID_; + folly::Executor* relayExec_; folly::F14FastSet activeNamespaces_; }; std::shared_ptr makeNamespaceBridgeHandle( std::weak_ptr relay, std::shared_ptr session, - std::string peerID + std::string peerID, + folly::Executor* relayExec ) { return std::make_shared( std::move(relay), std::move(session), - std::move(peerID) + std::move(peerID), + relayExec ); } folly::coro::Task MoqxRelay::onUpstreamConnect(std::shared_ptr session) { - auto nsHandle = makeNamespaceBridgeHandle(weak_from_this(), session); - auto result = co_await session->subscribeNamespace(makePeerSubNs(relayID_), nsHandle); + co_return co_await onUpstreamConnectImpl(std::move(session)); +} + +folly::coro::Task MoqxRelay::onUpstreamConnectImpl(std::shared_ptr session) { + auto nsHandle = makeNamespaceBridgeHandle(weak_from_this(), session, {}, relayExec_); + // subscribeNamespace must run on the upstream session's executor + auto result = co_await folly::coro::co_withExecutor( + folly::getKeepAliveToken(session->getExecutor()), + session->subscribeNamespace(makePeerSubNs(relayID_), nsHandle) + ); if (result.hasValue()) { upstreamSubNsHandle_ = std::move(result.value()); } else { @@ -168,7 +189,7 @@ std::shared_ptr MoqxRelay::doPublishNamespac if (outSession != session && (info.options == SubscribeNamespaceOptions::NAMESPACE || info.options == SubscribeNamespaceOptions::BOTH)) { if (info.namespacePublishHandle) { - // Draft 16+: send NAMESPACE message on the bidi stream + // Draft 16+: send NAMESPACE message on the bidi stream. TrackNamespace suffix(std::vector( pubNs.trackNamespace.trackNamespace.begin() + info.trackNamespacePrefix.size(), pubNs.trackNamespace.trackNamespace.end() @@ -187,6 +208,13 @@ std::shared_ptr MoqxRelay::doPublishNamespac folly::coro::Task MoqxRelay::publishNamespace( PublishNamespace pubNs, std::shared_ptr callback +) { + return publishNamespaceImpl(std::move(pubNs), std::move(callback)); +} + +folly::coro::Task MoqxRelay::publishNamespaceImpl( + PublishNamespace pubNs, + std::shared_ptr callback ) { // TODO: store auth for forwarding on future SubscribeNamespace? auto session = MoQSession::getRequestSession(); @@ -274,20 +302,31 @@ Subscriber::PublishResult MoqxRelay::publish(PublishRequest pub, std::shared_ptr handle) { XLOG(DBG1) << __func__ << " ftn=" << pub.fullTrackName; XCHECK(handle) << "Publish handle cannot be null"; + // Validate before touching relay state (safe to do on calling thread). if (!pub.fullTrackName.trackNamespace.startsWith(allowedNamespacePrefix_)) { return folly::makeUnexpected( PublishError{pub.requestID, PublishErrorCode::UNINTERESTED, "bad namespace"} ); } - if (pub.fullTrackName.trackNamespace.empty()) { return folly::makeUnexpected( PublishError({pub.requestID, PublishErrorCode::INTERNAL_ERROR, "namespace required"}) ); } - + // When relayExec_ is set, SubscriberCrossExecFilter (wired at session + // registration) has already dispatched to relayExec_ before calling this + // method, so getRequestSession() is valid and publishWithSession() runs on + // the correct thread. auto session = MoQSession::getRequestSession(); + maybeSetSessionExec(*session); + return publishWithSession(std::move(pub), std::move(handle), std::move(session)); +} +Subscriber::PublishResult MoqxRelay::publishWithSession( + PublishRequest pub, + std::shared_ptr handle, + std::shared_ptr session +) { // Handle duplicate publisher at relay level before registering in the tree. // Move the forwarder out and erase the entry BEFORE calling publishDone. // publishDone iterates subscribers via forEachSubscriber; if a subscriber @@ -302,10 +341,12 @@ MoqxRelay::publish(PublishRequest pub, std::shared_ptr(pub.fullTrackName, pub.largest); forwarder->setExtensions(pub.extensions); + auto publisherWrapped = maybeWrapPublisher(relayExec_, session); auto publishEntry = registry_.createFromPublish( pub.fullTrackName, forwarder, session, + std::move(publisherWrapped), pub.requestID, std::move(handle), [&](std::shared_ptr f) { return buildFilterChain(pub.fullTrackName, f); } @@ -348,6 +389,14 @@ MoqxRelay::publish(PublishRequest pub, std::shared_ptrsetCallback( + std::make_shared(relayExec_, forwarder, shared_from_this()) + ); + } else { + forwarder->setCallback(shared_from_this()); + } + uint64_t nSubscribers = 0; bool hasTrackFilterSub = false; for (auto& [outSession, info] : sessions) { @@ -360,11 +409,12 @@ MoqxRelay::publish(PublishRequest pub, std::shared_ptrgetExecutor(); - co_withExecutor(exec, publishToSession(outSession, forwarder, info.forward)).start(); + if (!addSubscriberAndPublish(outSession, forwarder, info.forward, /*pinned=*/true)) { + XLOG(ERR) << "addSubscriberAndPublish failed for " << forwarder->fullTrackName(); + continue; + } } } - forwarder->setCallback(shared_from_this()); // Forward if there are direct subscribers OR TRACK_FILTER subscribers // (PropertyRanking needs objects to evaluate property values for ranking). @@ -385,49 +435,90 @@ MoqxRelay::publish(PublishRequest pub, std::shared_ptr MoqxRelay::publishToSession( - std::shared_ptr session, +namespace { + +// Free-function coroutine: awaits the publish reply and calls onPublishOk. +// Holds forwarder alive because subscriber keeps a raw ref into it. +folly::coro::Task awaitPublishReply( std::shared_ptr forwarder, - bool forward, - bool trackFilterSubscriber + std::shared_ptr subscriber, + folly::coro::Task> reply ) { - if (session->isClosed()) { - XLOG(WARN) << "publishToSession: session closed, skipping " << forwarder->fullTrackName(); + auto result = co_await co_awaitTry(std::move(reply)); + if (result.hasException()) { + XLOG(ERR) << "Publish reply exception for " << forwarder->fullTrackName() + << " subscriber=" << subscriber.get() << ": " << result.exception().what(); + subscriber->unsubscribe(); co_return; } - auto subscriber = forwarder->addSubscriber(session, forward); - if (!subscriber) { - XLOG(ERR) << "Subscribe failed: addSubscriber returned null for " << forwarder->fullTrackName(); + if (result.value().hasError()) { + XLOG(ERR) << "Publish reply error for " << forwarder->fullTrackName() + << " subscriber=" << subscriber.get() << ": " << result.value().error().reasonPhrase; + subscriber->unsubscribe(); co_return; } - // Direct subscribers are pinned (not evictable by PropertyRanking). - // TRACK_FILTER subscribers are unpinned so onTrackEvicted can remove them. - subscriber->pinned = !trackFilterSubscriber; - XLOG(DBG4) << "added subscriber for ftn=" << forwarder->fullTrackName(); - auto guard = folly::makeGuard([subscriber] { subscriber->unsubscribe(); }); + XLOG(DBG1) << "Received PublishOk for " << forwarder->fullTrackName() + << " subscriber=" << subscriber.get(); + subscriber->onPublishOk(result.value().value()); +} - auto pubInitial = session->publish(subscriber->getPublishRequest(), subscriber); - if (pubInitial.hasError()) { - XLOG(ERR) << "Publish failed err=" << pubInitial.error().reasonPhrase; - co_return; +} // namespace + +std::optional MoqxRelay::startPublish( + std::shared_ptr session, + std::shared_ptr forwarder, + bool forward, + bool pinned, + folly::Executor* subscriberExec +) { + auto subscriber = forwarder->addSubscriber(session, forward); + if (!subscriber) { + XLOG(ERR) << "startPublish: addSubscriber null for " << forwarder->fullTrackName(); + return std::nullopt; } - subscriber->trackConsumer = std::move(pubInitial->consumer); - auto pubResult = co_await co_awaitTry(std::move(pubInitial->reply)); - if (pubResult.hasException()) { - XLOG(ERR) << "Publish failed err=" << pubResult.exception().what(); - co_return; + subscriber->pinned = pinned; + XLOG(DBG4) << "added subscriber for ftn=" << forwarder->fullTrackName(); + Subscriber::PublishResult pub; + if (subscriberExec) { + SubscriberCrossExecFilter wrapped(subscriberExec, session); + pub = wrapped.publish(subscriber->getPublishRequest(), subscriber); + } else { + pub = session->publish(subscriber->getPublishRequest(), subscriber); } - if (pubResult.value().hasError()) { - XLOG(ERR) << "Publish failed err=" << pubResult.value().error().reasonPhrase; - co_return; + if (pub.hasError()) { + XLOG(ERR) << "startPublish: publish failed: " << pub.error().reasonPhrase; + subscriber->unsubscribe(); + return std::nullopt; } - guard.dismiss(); - XLOG(DBG1) << "Publish OK sess=" << session.get(); - auto& pubOk = pubResult.value().value(); + subscriber->trackConsumer = std::move(pub->consumer); + return PreparedPublish{std::move(subscriber), std::move(pub->reply)}; +} - // Process the PUBLISH_OK response - updates range, forward flag, and - // handles NEW_GROUP_REQUEST forwarding via callback - subscriber->onPublishOk(pubOk); +bool MoqxRelay::addSubscriberAndPublish( + std::shared_ptr session, + std::shared_ptr forwarder, + bool forward, + bool pinned +) { + auto p = startPublish( + session, + forwarder, + forward, + pinned, + relayExec_ ? session->getExecutor() : nullptr + ); + if (!p) { + return false; + } + // Run awaitPublishReply on relayExec_ so onPublishOk and detach() (from + // publishDone) are always on the same thread and cannot race. For + // single-thread (relayExec_ == nullptr) this is the subscriber's exec. + co_withExecutor( + relayExec_ ? static_cast(relayExec_) : session->getExecutor(), + awaitPublishReply(forwarder, std::move(p->subscriber), std::move(p->reply)) + ) + .start(); + return true; } class MoqxRelay::NamespaceSubscription : public Publisher::SubscribeNamespaceHandle { @@ -524,6 +615,13 @@ MoqxRelay::buildFilterChain(const FullTrackName& ftn, std::shared_ptr MoqxRelay::subscribeNamespace( SubscribeNamespace subNs, std::shared_ptr namespacePublishHandle +) { + return subscribeNamespaceImpl(std::move(subNs), std::move(namespacePublishHandle)); +} + +folly::coro::Task MoqxRelay::subscribeNamespaceImpl( + SubscribeNamespace subNs, + std::shared_ptr namespacePublishHandle ) { XLOG(DBG1) << __func__ << " nsp=" << subNs.trackNamespacePrefix; @@ -539,10 +637,11 @@ folly::coro::Task MoqxRelay::subscribeNames << ", reciprocating peer subNs"; // Tag with the peer's relay ID so we suppress echoing these namespaces // back to that peer on reconnect. - auto handle = makeNamespaceBridgeHandle(weak_from_this(), session, incomingPeerID); - auto recipResult = co_await session->subscribeNamespace( - makePeerSubNs(), - handle + auto handle = makeNamespaceBridgeHandle(weak_from_this(), session, incomingPeerID, relayExec_); + // subscribeNamespace must run on the peer session's executor. + auto recipResult = co_await folly::coro::co_withExecutor( + folly::getKeepAliveToken(session->getExecutor()), + session->subscribeNamespace(makePeerSubNs(), handle) ); // no token: reciprocal, prevents loop if (recipResult.hasError()) { XLOG(ERR) << "Reciprocal peer subNs failed: " << recipResult.error().reasonPhrase; @@ -593,7 +692,7 @@ folly::coro::Task MoqxRelay::subscribeNames // If TRACK_FILTER is present, enroll session in PropertyRanking for top-N selection. // NOTE: onSelected callbacks fire synchronously within addSessionToTopNGroup() for - // tracks already in top-N, triggering publishToSession() before this call returns. + // tracks already in top-N, triggering onTrackSelected() before this call returns. if (trackFilter) { auto ranking = getOrCreateRanking(nodePtr, trackFilter->propertyType, subNs.trackNamespacePrefix); @@ -648,7 +747,10 @@ folly::coro::Task MoqxRelay::subscribeNames (subNs.options == SubscribeNamespaceOptions::BOTH || subNs.options == SubscribeNamespaceOptions::PUBLISH)) { if (publishSession != session) { - co_withExecutor(exec, publishToSession(session, forwarder, subNs.forward)).start(); + if (!addSubscriberAndPublish(session, forwarder, subNs.forward, /*pinned=*/true)) { + XLOG(ERR) << "addSubscriberAndPublish failed for " << ftn; + return; + } } } }); @@ -697,7 +799,13 @@ MoqxRelay::PublishState MoqxRelay::findPublishState(const FullTrackName& ftn) { folly::coro::Task MoqxRelay::subscribe(SubscribeRequest subReq, std::shared_ptr consumer) { + return subscribeImpl(std::move(subReq), std::move(consumer)); +} + +folly::coro::Task +MoqxRelay::subscribeImpl(SubscribeRequest subReq, std::shared_ptr consumer) { auto session = MoQSession::getRequestSession(); + maybeSetSessionExec(*session); const auto& ftn = subReq.fullTrackName; if (ftn.trackNamespace.empty()) { @@ -709,8 +817,7 @@ MoqxRelay::subscribe(SubscribeRequest subReq, std::shared_ptr con // TOCTOU fix: if we might be the first subscriber, wait for the upstream // connection before branching. A concurrent coroutine may emplace the entry // while we are suspended, so we re-check inside getOrCreateFromSubscribe. - if (!registry_.exists(ftn) && upstream_ && - !namespaceTree_.findPublisherSession(ftn.trackNamespace)) { + if (!registry_.exists(ftn) && upstream_ && !findUpstreamPublisher(ftn.trackNamespace)) { co_await upstream_->waitForConnected(kUpstreamConnectWaitTimeout); } @@ -727,6 +834,7 @@ MoqxRelay::subscribe(SubscribeRequest subReq, std::shared_ptr con {subReq.requestID, SubscribeErrorCode::TRACK_NOT_EXIST, "no such namespace or track"} )); } // pending destructor fires on early return above + auto upstreamPublisher = maybeWrapPublisher(relayExec_, upstreamSession); // Add subscriber first (with the client's original request) in case objects // arrive before subscribe OK. @@ -751,9 +859,8 @@ MoqxRelay::subscribe(SubscribeRequest subReq, std::shared_ptr con subReq.locType = LocationType::LargestObject; // Per the spec, we're supposed to always forward=1 upstream subReq.forward = first->forwarder->numForwardingSubscribers() > 0; - subReq.requestID = upstreamSession->peekNextRequestID(); - auto subRes = co_await upstreamSession->subscribe(subReq, first->consumer); + auto subRes = co_await upstreamPublisher->subscribe(subReq, first->consumer); if (subRes.hasError()) { co_return folly::makeUnexpected(SubscribeError( {clientRequestID, @@ -777,7 +884,8 @@ MoqxRelay::subscribe(SubscribeRequest subReq, std::shared_ptr con first->forwarder->tryProcessNewGroupRequest(subReq.params, /*fire=*/false); auto requestID = subRes.value()->subscribeOk().requestID; - if (!first->pending.complete(std::move(subRes.value()), requestID, upstreamSession)) { + if (!first->pending + .complete(std::move(subRes.value()), requestID, upstreamSession, upstreamPublisher)) { XLOG(ERR) << "Subscription replaced by reconnecting publisher: " << ftn; co_return folly::makeUnexpected(SubscribeError{ clientRequestID, @@ -818,6 +926,11 @@ MoqxRelay::subscribe(SubscribeRequest subReq, std::shared_ptr con folly::coro::Task MoqxRelay::fetch(Fetch fetch, std::shared_ptr consumer) { + return fetchImpl(std::move(fetch), std::move(consumer)); +} + +folly::coro::Task +MoqxRelay::fetchImpl(Fetch fetch, std::shared_ptr consumer) { auto session = MoQSession::getRequestSession(); // check auth @@ -844,33 +957,30 @@ MoqxRelay::fetch(Fetch fetch, std::shared_ptr consumer) { fetch.args = StandaloneFetch(res.value().start, res.value().end); joining = nullptr; } else { - // Upstream is resolving the subscribe, forward joining fetch - joining->joiningRequestID = fetchView->requestID; + // Upstream is resolving the subscribe; let MoQSession resolve the + // request ID by track name to avoid a cross-executor data race. + joining->joiningRequestID = kAutoRequestID; } } - auto upstreamSession = namespaceTree_.findPublisherSession(fetch.fullTrackName.trackNamespace); - if (!upstreamSession && upstream_) { + auto upstreamPublisher = findUpstreamPublisher(fetch.fullTrackName.trackNamespace); + if (!upstreamPublisher && upstream_) { co_await upstream_->waitForConnected(kUpstreamConnectWaitTimeout); - upstreamSession = namespaceTree_.findPublisherSession(fetch.fullTrackName.trackNamespace); + upstreamPublisher = findUpstreamPublisher(fetch.fullTrackName.trackNamespace); } - if (!upstreamSession) { + if (!upstreamPublisher) { // Attempt to find matching upstream subscription (from publish) if (auto fetchView = registry_.getFetchView(fetch.fullTrackName)) { - upstreamSession = fetchView->upstream; + upstreamPublisher = fetchView->publisher; } - if (!upstreamSession) { + if (!upstreamPublisher) { co_return folly::makeUnexpected( FetchError({fetch.requestID, FetchErrorCode::TRACK_NOT_EXIST, "no upstream for fetch"}) ); } } - if (session.get() == upstreamSession.get()) { - co_return folly::makeUnexpected( - FetchError({fetch.requestID, FetchErrorCode::INTERNAL_ERROR, "self fetch"}) - ); - } fetch.priority = kDefaultUpstreamPriority; + if (!cache_ || joining) { // We can't use the cache on an unresolved joining fetch - we don't know // which objects are being requested. However, once we have that resolved, @@ -879,12 +989,18 @@ MoqxRelay::fetch(Fetch fetch, std::shared_ptr consumer) { XLOG(DBG1) << "Upstream fetch {" << standalone->start.group << "," << standalone->start.object << "}.." << standalone->end.group << "," << standalone->end.object << "}"; } - co_return co_await upstreamSession->fetch(fetch, std::move(consumer)); + co_return co_await upstreamPublisher->fetch(std::move(fetch), std::move(consumer)); } - co_return co_await cache_->fetch(fetch, std::move(consumer), std::move(upstreamSession)); + co_return co_await cache_ + ->fetch(std::move(fetch), std::move(consumer), std::move(upstreamPublisher)); } folly::coro::Task MoqxRelay::trackStatus(TrackStatus trackStatus) { + return trackStatusImpl(std::move(trackStatus)); +} + +folly::coro::Task MoqxRelay::trackStatusImpl(TrackStatus trackStatus +) { XLOG(DBG1) << __func__ << " ftn=" << trackStatus.fullTrackName; if (trackStatus.fullTrackName.trackNamespace.empty()) { @@ -922,25 +1038,26 @@ folly::coro::Task MoqxRelay::trackStatus(TrackStat << " statusCode=" << (uint32_t)statusCode; co_return trackStatusOk; } else { - // No subscription - forward to upstream - auto upstreamSession = - namespaceTree_.findPublisherSession(trackStatus.fullTrackName.trackNamespace); - if (!upstreamSession && upstream_) { - co_await upstream_->waitForConnected(kUpstreamConnectWaitTimeout); - upstreamSession = - namespaceTree_.findPublisherSession(trackStatus.fullTrackName.trackNamespace); + // No active subscription — try registry publisher first, then namespace tree + std::shared_ptr upstreamPublisher; + if (upstreamView) { + upstreamPublisher = upstreamView->publisher; + } else { + upstreamPublisher = findUpstreamPublisher(trackStatus.fullTrackName.trackNamespace); + if (!upstreamPublisher && upstream_) { + co_await upstream_->waitForConnected(kUpstreamConnectWaitTimeout); + upstreamPublisher = findUpstreamPublisher(trackStatus.fullTrackName.trackNamespace); + } } - if (!upstreamSession) { - XLOG(DBG1) << "No upstream session for track: " << trackStatus.fullTrackName; + if (!upstreamPublisher) { + XLOG(DBG1) << "No upstream for track: " << trackStatus.fullTrackName; co_return folly::makeUnexpected(TrackStatusError{ trackStatus.requestID, TrackStatusErrorCode::TRACK_NOT_EXIST, "no such namespace or track" }); } - - // Forward the trackStatus request to the upstream publisher session - auto result = co_await upstreamSession->trackStatus(trackStatus); + auto result = co_await upstreamPublisher->trackStatus(std::move(trackStatus)); if (result.hasError()) { XLOG(DBG1) << "Upstream trackStatus failed: " << result.error().reasonPhrase; @@ -952,7 +1069,10 @@ folly::coro::Task MoqxRelay::trackStatus(TrackStat } void MoqxRelay::onEmpty(MoQForwarder* forwarder) { - const auto& ftn = forwarder->fullTrackName(); + onEmptyImpl(forwarder->fullTrackName()); +} + +void MoqxRelay::onEmptyImpl(const FullTrackName& ftn) { auto upstreamView = registry_.getUpstreamView(ftn); if (!upstreamView) { return; @@ -970,8 +1090,12 @@ void MoqxRelay::onEmpty(MoQForwarder* forwarder) { if (upstreamView->isPublish) { // if it's publish, don't unsubscribe, just subscribeUpdate forward=false XLOG(DBG1) << "Updating upstream subscription forward=false"; - auto exec = upstreamView->upstream->getExecutor(); - co_withExecutor(exec, doSubscribeUpdate(upstreamView->handle, /*forward=*/false)).start(); + auto exec = relayExec(); + co_withExecutor( + folly::getKeepAliveToken(exec), + doSubscribeUpdate(upstreamView->handle, /*forward=*/false) + ) + .start(); } else { upstreamView->handle->unsubscribe(); XLOG(DBG4) << "Erasing subscription to " << ftn; @@ -980,7 +1104,10 @@ void MoqxRelay::onEmpty(MoQForwarder* forwarder) { } void MoqxRelay::forwardChanged(MoQForwarder* forwarder, bool forward) { - const auto& ftn = forwarder->fullTrackName(); + forwardChangedImpl(forwarder->fullTrackName(), forward); +} + +void MoqxRelay::forwardChangedImpl(const FullTrackName& ftn, bool forward) { auto upstreamView = registry_.getUpstreamView(ftn); if (!upstreamView) { return; @@ -996,12 +1123,16 @@ void MoqxRelay::forwardChanged(MoQForwarder* forwarder, bool forward) { } XLOG(INFO) << "Updating forward for " << ftn << " forward=" << forward; - auto exec = upstreamView->upstream->getExecutor(); - co_withExecutor(exec, doSubscribeUpdate(upstreamView->handle, forward)).start(); + auto exec = relayExec(); + co_withExecutor(folly::getKeepAliveToken(exec), doSubscribeUpdate(upstreamView->handle, forward)) + .start(); } void MoqxRelay::newGroupRequested(MoQForwarder* forwarder, uint64_t group) { - const auto& ftn = forwarder->fullTrackName(); + newGroupRequestedImpl(forwarder->fullTrackName(), group); +} + +void MoqxRelay::newGroupRequestedImpl(const FullTrackName& ftn, uint64_t group) { auto upstreamView = registry_.getUpstreamView(ftn); // Check if handle is still valid (publisher may have terminated) if (!upstreamView || !upstreamView->handle) { @@ -1010,9 +1141,10 @@ void MoqxRelay::newGroupRequested(MoQForwarder* forwarder, uint64_t group) { } XLOG(INFO) << "New group request detected for " << ftn; - auto exec = upstreamView->upstream->getExecutor(); + auto exec = relayExec(); auto handle = upstreamView->handle; - co_withExecutor(exec, doNewGroupRequestUpdate(std::move(handle), group)).start(); + co_withExecutor(folly::getKeepAliveToken(exec), doNewGroupRequestUpdate(std::move(handle), group)) + .start(); } // TRACK_FILTER support @@ -1127,8 +1259,8 @@ void MoqxRelay::onTrackSelected( XLOG(DBG4) << "[MoqxRelay] Track selected: " << ftn << " session=" << session.get() << " forward=" << forward; - if (!session || session->isClosed()) { - XLOG(ERR) << "onTrackSelected: session null or closed, skipping " << ftn; + if (!session) { + XLOG(ERR) << "onTrackSelected: null session for " << ftn; return; } @@ -1141,20 +1273,18 @@ void MoqxRelay::onTrackSelected( auto exec = session->getExecutor(); XCHECK(exec) << "onTrackSelected: null executor for session " << session.get(); - // TODO: Consider batching multiple publishToSession calls on the same executor - // when multiple tracks are selected for the same session in a single ranking update. - co_withExecutor( - exec, - publishToSession(session, trackForwarder, forward, /*trackFilterSubscriber=*/true) - ) - .start(); + // TODO: Consider batching multiple addSubscriberAndPublish calls on the same + // executor when multiple tracks are selected for the same session in a single + // ranking update. + // TRACK_FILTER subscribers are unpinned so onTrackEvicted can remove them. + addSubscriberAndPublish(session, trackForwarder, forward, /*pinned=*/false); } void MoqxRelay::onTrackEvicted(const FullTrackName& ftn, std::shared_ptr session) { XLOG(DBG4) << "[MoqxRelay] Track evicted: " << ftn << " session=" << session.get(); - if (!session || session->isClosed()) { - XLOG(WARN) << "onTrackEvicted: session null or closed, skipping " << ftn; + if (!session) { + XLOG(WARN) << "onTrackEvicted: null session for " << ftn; return; } diff --git a/src/MoqxRelay.h b/src/MoqxRelay.h index e2ffe858..f2f9393b 100644 --- a/src/MoqxRelay.h +++ b/src/MoqxRelay.h @@ -14,9 +14,11 @@ #include "UpstreamProvider.h" #include "config/Config.h" #include "relay/PropertyRanking.h" +#include "relay/RelayExecUtil.h" #include #include +#include #include #include #include @@ -24,6 +26,10 @@ #include #include +namespace openmoq::moqx { +class CrossExecForwarderCallback; +} // namespace openmoq::moqx + namespace openmoq::moqx { // Visitor interface for relay state inspection. @@ -115,6 +121,21 @@ class MoqxRelay : public moxygen::Publisher, } } + // Optionally isolate relay state on a dedicated executor thread. + // When set, all public entry points switch to relayExec before touching + // relay state, and consumer callbacks to/from sessions are wrapped with + // cross-executor filters. relayExec must outlive this relay. + // If not set (default), all operations run on the calling thread. + void setRelayExec(folly::Executor* relayExec) { relayExec_ = relayExec; } + + // Takes ownership of exec and uses it as the relay executor. + void setRelayExec(std::shared_ptr exec) { + ownedRelayExec_ = std::move(exec); + relayExec_ = ownedRelayExec_.get(); + } + + folly::Executor* getRelayExec() const { return relayExec_; } + void setAllowedNamespacePrefix(moxygen::TrackNamespace allowed) { allowedNamespacePrefix_ = std::move(allowed); } @@ -247,17 +268,41 @@ class MoqxRelay : public moxygen::Publisher, void forwardChanged(moxygen::MoQForwarder* forwarder, bool forward) override; void newGroupRequested(moxygen::MoQForwarder* forwarder, uint64_t group) override; + // FTN-keyed impl variants — called by CrossExecForwarderCallback (relay exec) + // or directly from the non-cross-exec callbacks above. + friend class CrossExecForwarderCallback; + void onEmptyImpl(const moxygen::FullTrackName& ftn); + void forwardChangedImpl(const moxygen::FullTrackName& ftn, bool forward); + void newGroupRequestedImpl(const moxygen::FullTrackName& ftn, uint64_t group); + folly::coro::Task publishNamespaceToSession( std::shared_ptr session, moxygen::PublishNamespace pubNs, std::shared_ptr nodePtr ); - folly::coro::Task publishToSession( + // Sync setup: addSubscriber → set pinned → session->publish (via + // SubscriberCrossExecFilter when subscriberExec is non-null) → set + // trackConsumer. Returns nullopt and cleans up on any synchronous failure. + struct PreparedPublish { + std::shared_ptr subscriber; + folly::coro::Task> reply; + }; + std::optional startPublish( std::shared_ptr session, std::shared_ptr forwarder, bool forward, - bool trackFilterSubscriber = false + bool pinned, + folly::Executor* subscriberExec + ); + + // Calls startPublish and fires the reply as a free-running coroutine. + // Returns false and cleans up on any synchronous failure. + bool addSubscriberAndPublish( + std::shared_ptr session, + std::shared_ptr forwarder, + bool forward, + bool pinned ); folly::coro::Task @@ -319,6 +364,54 @@ class MoqxRelay : public moxygen::Publisher, const moxygen::FullTrackName& ftn, std::shared_ptr consumer ); + + // Impl methods — run on relayExec_ when set, or inline when relayExec_==nullptr. + folly::coro::Task + subscribeImpl(moxygen::SubscribeRequest subReq, std::shared_ptr consumer); + folly::coro::Task + fetchImpl(moxygen::Fetch fetch, std::shared_ptr consumer); + folly::coro::Task subscribeNamespaceImpl( + moxygen::SubscribeNamespace subNs, + std::shared_ptr namespacePublishHandle + ); + folly::coro::Task publishNamespaceImpl( + moxygen::PublishNamespace pubNs, + std::shared_ptr callback + ); + folly::coro::Task trackStatusImpl(moxygen::TrackStatus req + ); + folly::coro::Task onUpstreamConnectImpl(std::shared_ptr session); + + // Contains all the inline publish() logic, taking session explicitly so it + // can be called from either the I/O thread (relayExec_==nullptr) or from + // coPublish on relay exec (where getRequestSession() would return null). + PublishResult publishWithSession( + moxygen::PublishRequest pub, + std::shared_ptr handle, + std::shared_ptr session + ); + + std::shared_ptr ownedRelayExec_; + folly::Executor* relayExec_{nullptr}; + // Only set in single-threaded mode (relayExec_ == null); used as the + // coroutine start executor for fire-and-forget tasks like doSubscribeUpdate. + folly::Executor* sessionExec_{nullptr}; + + void maybeSetSessionExec(moxygen::MoQSession& session) { + if (!relayExec_ && !sessionExec_) { + sessionExec_ = session.getExecutor(); + } + } + + folly::Executor* relayExec() const { return relayExec_ ? relayExec_ : sessionExec_; } + + std::shared_ptr findUpstreamPublisher(const moxygen::TrackNamespace& ns) { + auto session = namespaceTree_.findPublisherSession(ns); + if (!session) { + return nullptr; + } + return maybeWrapPublisher(relayExec_, std::move(session)); + } std::unique_ptr cache_; uint64_t maxDeselected_{kDefaultMaxDeselected}; @@ -329,12 +422,15 @@ class MoqxRelay : public moxygen::Publisher, }; // Creates a NamespacePublishHandle that bridges NAMESPACE/NAMESPACE_DONE -// messages from a peer relay into relay->doPublishNamespace() synchronously. -// Used for both the initiating (UpstreamProvider) and reciprocal (MoqxRelay) paths. +// messages from a peer relay into relay->doPublishNamespace(). When relayExec +// is non-null, callbacks are dispatched to it so relay state is only mutated +// on the relay executor thread. Used for both the initiating (UpstreamProvider) +// and reciprocal (MoqxRelay) paths. std::shared_ptr makeNamespaceBridgeHandle( std::weak_ptr relay, std::shared_ptr session, - std::string peerID = {} + std::string peerID = {}, + folly::Executor* relayExec = nullptr ); } // namespace openmoq::moqx diff --git a/src/MoqxRelayContext.cpp b/src/MoqxRelayContext.cpp index d1d9336f..e08dea1a 100644 --- a/src/MoqxRelayContext.cpp +++ b/src/MoqxRelayContext.cpp @@ -5,11 +5,15 @@ */ #include "MoqxRelayContext.h" +#include "relay/PublisherCrossExecFilter.h" +#include "relay/RelayExecUtil.h" +#include "relay/SubscriberCrossExecFilter.h" #include "stats/MoQStatsCollector.h" #include #include #include +#include #include using namespace moxygen; @@ -18,11 +22,27 @@ namespace openmoq::moqx { MoqxRelayContext::MoqxRelayContext( const folly::F14FastMap& services, - const std::string& relayID + const std::string& relayID, + bool useRelayThread ) : serviceMatcher_(services), relayID_(relayID) { - for (const auto& [name, svc] : services) { - services_.emplace(name, ServiceEntry{svc, std::make_shared(svc.cache, relayID)}); + if (useRelayThread && !services.empty()) { + relayThreadPool_ = std::make_shared( + services.size(), + std::make_shared("moqx-relay") + ); + auto evbs = relayThreadPool_->getAllEventBases(); + XCHECK_EQ(evbs.size(), services.size()); + size_t i = 0; + for (const auto& [name, svc] : services) { + auto relay = std::make_shared(svc.cache, relayID); + relay->setRelayExec(std::make_shared(evbs[i++].get())); + services_.emplace(name, ServiceEntry{svc, std::move(relay)}); + } + } else { + for (const auto& [name, svc] : services) { + services_.emplace(name, ServiceEntry{svc, std::make_shared(svc.cache, relayID)}); + } } } @@ -53,10 +73,7 @@ void MoqxRelayContext::initUpstreams(folly::EventBase* workerEvb) { CHECK(workerEvb) << "initUpstreams: workerEvb must not be null"; workerEvb_ = workerEvb; - // Use the provided worker EVB for all upstream connections. - // Per-EVB providers (one per worker thread) are a follow-up. - auto exec = std::make_shared(workerEvb); - + auto workerExec = std::make_shared(workerEvb); for (auto& [name, entry] : services_) { if (!entry.config.upstream) { continue; @@ -64,15 +81,31 @@ void MoqxRelayContext::initUpstreams(folly::EventBase* workerEvb) { const auto& cfg = *entry.config.upstream; auto verifier = makeUpstreamVerifier(cfg.tls); auto relay = entry.relay; - auto onConnect = [relay](std::shared_ptr session) -> folly::coro::Task { - co_await relay->onUpstreamConnect(session); + auto* relayExec = relay->getRelayExec(); + auto onConnect = [relay, + relayExec](std::shared_ptr session) -> folly::coro::Task { + if (relayExec) { + co_return co_await folly::coro::co_withExecutor( + folly::getKeepAliveToken(relayExec), + relay->onUpstreamConnect(session) + ); + } + co_return co_await relay->onUpstreamConnect(session); + }; + auto onDisconnect = [relay, relayExec]() { + runOnExec(relayExec, [relay]() { relay->onUpstreamDisconnect(); }); }; - auto onDisconnect = [relay]() { relay->onUpstreamDisconnect(); }; + std::shared_ptr pubHandler = relay; + std::shared_ptr subHandler = relay; + if (relayExec) { + pubHandler = std::make_shared(relayExec, relay); + subHandler = std::make_shared(relayExec, relay); + } auto provider = std::make_shared( - exec, + workerExec, proxygen::URL(cfg.url), - /*publishHandler=*/entry.relay, - /*subscribeHandler=*/entry.relay, + /*publishHandler=*/pubHandler, + /*subscribeHandler=*/subHandler, verifier, std::move(onConnect), std::move(onDisconnect), @@ -84,7 +117,7 @@ void MoqxRelayContext::initUpstreams(folly::EventBase* workerEvb) { // Eagerly connect so the peering handshake fires before any subscribers // arrive. The connection is lazy in UpstreamProvider but we kick it off // now so the upstream namespace tree is ready. - co_withExecutor(workerEvb, provider->start()).start(); + co_withExecutor(workerExec.get(), provider->start()).start(); } } @@ -158,11 +191,21 @@ folly::Expected MoqxRelayContext::validateAu return folly::makeUnexpected(SessionCloseErrorCode::INVALID_AUTHORITY); } - // Route: set per-service relay as handler + // Route: set per-service relay as handler, wrapping in cross-exec filters if needed auto it = services_.find(*matchedName); CHECK(it != services_.end()) << "Service '" << *matchedName << "' matched but no entry found"; - session->setPublishHandler(it->second.relay); - session->setSubscribeHandler(it->second.relay); + auto* relayExec = it->second.relay->getRelayExec(); + if (relayExec) { + session->setPublishHandler( + std::make_shared(relayExec, it->second.relay) + ); + session->setSubscribeHandler( + std::make_shared(relayExec, it->second.relay) + ); + } else { + session->setPublishHandler(it->second.relay); + session->setSubscribeHandler(it->second.relay); + } return folly::unit; } diff --git a/src/MoqxRelayContext.h b/src/MoqxRelayContext.h index 2fec04b5..ebbf3e4b 100644 --- a/src/MoqxRelayContext.h +++ b/src/MoqxRelayContext.h @@ -21,6 +21,7 @@ #include #include #include +#include #include #include @@ -54,7 +55,8 @@ class MoqxRelayContext { MoqxRelayContext( const folly::F14FastMap& services, - const std::string& relayID + const std::string& relayID, + bool useRelayThread = true ); void setStatsRegistry(std::shared_ptr registry); @@ -108,6 +110,11 @@ class MoqxRelayContext { ); private: + // When use_relay_thread=true: one dedicated thread per service, each with its + // own executor, isolating relay state from I/O threads. Null when disabled. + // Each relay owns its MoQFollyExecutorImpl; the pool just keeps threads alive. + std::shared_ptr relayThreadPool_; + folly::F14FastMap services_; ServiceMatcher serviceMatcher_; std::string relayID_; diff --git a/src/SubscriptionRegistry.cpp b/src/SubscriptionRegistry.cpp index 41acd6aa..bfe27e0e 100644 --- a/src/SubscriptionRegistry.cpp +++ b/src/SubscriptionRegistry.cpp @@ -13,7 +13,8 @@ namespace openmoq::moqx { bool SubscriptionRegistry::UpstreamSubscribePending::complete( std::shared_ptr handle, moxygen::RequestID requestID, - std::shared_ptr upstreamSession + std::shared_ptr upstreamSession, + std::shared_ptr publisher ) { active_ = false; return registry_->completeSubscription( @@ -21,7 +22,8 @@ bool SubscriptionRegistry::UpstreamSubscribePending::complete( weakForwarder_, std::move(handle), requestID, - std::move(upstreamSession) + std::move(upstreamSession), + std::move(publisher) ); } @@ -85,7 +87,8 @@ bool SubscriptionRegistry::completeSubscription( std::weak_ptr weakForwarder, std::shared_ptr handle, moxygen::RequestID requestID, - std::shared_ptr upstreamSession + std::shared_ptr upstreamSession, + std::shared_ptr publisher ) { auto it = subscriptions_.find(ftn); if (it == subscriptions_.end() || it->second.forwarder != weakForwarder.lock()) { @@ -95,6 +98,7 @@ bool SubscriptionRegistry::completeSubscription( rsub.handle = std::move(handle); rsub.requestID = requestID; rsub.upstream = std::move(upstreamSession); + rsub.publisher = std::move(publisher); rsub.promise.setValue(folly::unit); return true; } @@ -114,6 +118,7 @@ SubscriptionRegistry::PublishEntry SubscriptionRegistry::createFromPublish( const moxygen::FullTrackName& ftn, std::shared_ptr forwarder, std::shared_ptr session, + std::shared_ptr publisher, moxygen::RequestID requestID, std::shared_ptr handle, folly::FunctionRef)> chainBuilder @@ -137,6 +142,7 @@ SubscriptionRegistry::PublishEntry SubscriptionRegistry::createFromPublish( rsub.promise.setValue(folly::unit); rsub.requestID = requestID; rsub.handle = std::move(handle); + rsub.publisher = std::move(publisher); rsub.isPublish = true; auto [consumer, topNFilter] = chainBuilder(forwarder); @@ -174,7 +180,7 @@ SubscriptionRegistry::getUpstreamView(const moxygen::FullTrackName& ftn) const { const auto& rsub = it->second; return UpstreamView{ rsub.forwarder, - rsub.upstream, + rsub.publisher, rsub.handle, rsub.requestID, rsub.isPublish, @@ -189,7 +195,7 @@ SubscriptionRegistry::getFetchView(const moxygen::FullTrackName& ftn) const { return std::nullopt; } const auto& rsub = it->second; - return FetchView{rsub.forwarder, rsub.upstream, rsub.requestID, rsub.promise.isFulfilled()}; + return FetchView{rsub.forwarder, rsub.publisher, rsub.requestID, rsub.promise.isFulfilled()}; } std::shared_ptr @@ -201,6 +207,7 @@ SubscriptionRegistry::onPublisherTerminated(const moxygen::FullTrackName& ftn) { auto& rsub = it->second; rsub.handle.reset(); rsub.upstream.reset(); + rsub.publisher.reset(); if (rsub.forwarder->empty()) { subscriptions_.erase(it); return nullptr; diff --git a/src/SubscriptionRegistry.h b/src/SubscriptionRegistry.h index 4f5630dd..76faf574 100644 --- a/src/SubscriptionRegistry.h +++ b/src/SubscriptionRegistry.h @@ -53,12 +53,13 @@ class SubscriptionRegistry { // Identity-checked success path. Re-finds by ftn; checks forwarder identity // to detect a reconnecting publisher that replaced the entry during the - // caller's co_await suspension. Sets handle, requestID, upstreamSession; - // fulfills promise. Returns false if entry is gone or replaced. + // caller's co_await suspension. Sets handle, requestID, upstreamSession, + // publisher; fulfills promise. Returns false if entry is gone or replaced. bool complete( std::shared_ptr handle, moxygen::RequestID requestID, - std::shared_ptr upstreamSession + std::shared_ptr upstreamSession, + std::shared_ptr publisher ); ~UpstreamSubscribePending(); @@ -109,6 +110,7 @@ class SubscriptionRegistry { const moxygen::FullTrackName& ftn, std::shared_ptr forwarder, std::shared_ptr session, + std::shared_ptr publisher, moxygen::RequestID requestID, std::shared_ptr handle, folly::FunctionRef)> chainBuilder @@ -126,10 +128,10 @@ class SubscriptionRegistry { }; std::optional getTopNView(const moxygen::FullTrackName& ftn) const; - // For onEmpty / forwardChanged / newGroupRequested + // For onEmpty / forwardChanged / newGroupRequested / trackStatus struct UpstreamView { std::shared_ptr forwarder; - std::shared_ptr upstream; + std::shared_ptr publisher; std::shared_ptr handle; moxygen::RequestID requestID; bool isPublish; @@ -140,7 +142,7 @@ class SubscriptionRegistry { // For fetch() struct FetchView { std::shared_ptr forwarder; - std::shared_ptr upstream; + std::shared_ptr publisher; moxygen::RequestID requestID; bool isReady; }; @@ -182,6 +184,7 @@ class SubscriptionRegistry { std::shared_ptr forwarder; std::shared_ptr upstream; + std::shared_ptr publisher; moxygen::RequestID requestID{0}; std::shared_ptr handle; folly::coro::SharedPromise promise; @@ -196,7 +199,8 @@ class SubscriptionRegistry { std::weak_ptr weakForwarder, std::shared_ptr handle, moxygen::RequestID requestID, - std::shared_ptr upstreamSession + std::shared_ptr upstreamSession, + std::shared_ptr publisher ); // Called by UpstreamSubscribePending destructor on failure. diff --git a/src/main.cpp b/src/main.cpp index ae0cebe7..bb570813 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -110,7 +110,8 @@ int main(int argc, char* argv[]) { // === 6. Initialize services === // Construct and configure the application's own services // (MoqxRelayContext, MoqxRelayServer, etc.) - auto context = std::make_shared(config.services, config.relayID); + auto context = + std::make_shared(config.services, config.relayID, config.useRelayThread); // === 6a. Stats registry === auto statsRegistry = std::make_shared(); diff --git a/src/relay/RelayExecUtil.h b/src/relay/RelayExecUtil.h new file mode 100644 index 00000000..ae406288 --- /dev/null +++ b/src/relay/RelayExecUtil.h @@ -0,0 +1,72 @@ +/* + * Copyright (c) OpenMOQ contributors. + * This source code is licensed under the Apache 2.0 license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include "CrossExecFilter.h" +#include "PublisherCrossExecFilter.h" +#include +#include +#include +#include + +namespace openmoq::moqx { + +// Wraps c in a CrossExecFilter targeting exec, or returns c if exec is null. +inline std::shared_ptr +maybeCrossExec(folly::Executor* exec, std::shared_ptr c) { + if (!exec) { + return c; + } + return std::make_shared(exec, std::move(c), /*deepCopyPayload=*/false); +} + +// Wraps c in a FetchCrossExecFilter targeting exec, or returns c if exec is null. +inline std::shared_ptr +maybeCrossExec(folly::Executor* exec, std::shared_ptr c) { + if (!exec) { + return c; + } + return FetchCrossExecFilter::create(exec, std::move(c), /*deepCopyPayload=*/false); +} + +// Wraps p in a PublisherCrossExecFilter targeting exec, or returns p if exec is null. +inline std::shared_ptr +maybeCrossExec(folly::Executor* exec, std::shared_ptr p) { + if (!exec) { + return p; + } + return std::make_shared(exec, std::move(p)); +} + +// Wraps session as a Publisher, targeting its executor when relayExec is set. +inline std::shared_ptr +maybeWrapPublisher(folly::Executor* relayExec, std::shared_ptr session) { + // Evaluate getExecutor() before std::move(session) to avoid unspecified + // argument evaluation order leaving session moved-from. + auto* exec = relayExec ? session->getExecutor() : nullptr; + return maybeCrossExec(exec, std::shared_ptr(std::move(session))); +} + +// Runs fn on exec (fire-and-forget) if exec is non-null; otherwise runs it +// inline on the calling thread. +template void runOnExec(folly::Executor* exec, Fn&& fn) { + if (exec) { + exec->add(std::forward(fn)); + } else { + std::forward(fn)(); + } +} + +// When relayExec is set, dispatches fn to sessionExec (fire-and-forget). +// Otherwise runs fn inline (caller is already on the correct thread). +// Use this when relay state changes need to notify a specific session's executor. +template +void runOnSessionExec(folly::Executor* relayExec, folly::Executor* sessionExec, Fn&& fn) { + runOnExec(relayExec ? sessionExec : nullptr, std::forward(fn)); +} + +} // namespace openmoq::moqx diff --git a/test/SubscriptionRegistryTest.cpp b/test/SubscriptionRegistryTest.cpp index 2db00234..1ad6c20a 100644 --- a/test/SubscriptionRegistryTest.cpp +++ b/test/SubscriptionRegistryTest.cpp @@ -44,7 +44,7 @@ TEST(SubscriptionRegistryTest, AwaitSubsequentSucceeds) { registry.getOrCreateFromSubscribe(kFtn, nullptr, subscribeChain) ); - EXPECT_TRUE(first.pending.complete(nullptr, RequestID(0), nullptr)); + EXPECT_TRUE(first.pending.complete(nullptr, RequestID(0), nullptr, nullptr)); folly::EventBase evb; auto sub = folly::coro::blockingWait(std::move(task), &evb); @@ -89,7 +89,7 @@ TEST(SubscriptionRegistryTest, PendingCompleteReturnsFalseWhenEntryGone) { registry.remove(kFtn); // simulate publisher replacing the entry mid-subscribe - EXPECT_FALSE(first.pending.complete(nullptr, RequestID(0), nullptr)); + EXPECT_FALSE(first.pending.complete(nullptr, RequestID(0), nullptr, nullptr)); } // Regression: awaitSubsequent must re-find after suspension; erased entry throws. @@ -114,7 +114,7 @@ TEST(SubscriptionRegistryTest, AwaitSubsequentHandlesErasedEntry) { std::move(std::get>(token2)); // Upstream subscribe succeeds — fulfills the promise. - EXPECT_TRUE(first.pending.complete(nullptr, RequestID(0), nullptr)); + EXPECT_TRUE(first.pending.complete(nullptr, RequestID(0), nullptr, nullptr)); // Entry is erased before the subsequent coroutine resumes. registry.remove(kFtn); @@ -131,11 +131,18 @@ TEST(SubscriptionRegistryTest, CreateFromPublishEvictsSubscribeEntry) { registry.getOrCreateFromSubscribe(kFtn, nullptr, subscribeChain) ); auto originalForwarder = first.forwarder; - first.pending.complete(nullptr, RequestID(0), nullptr); + first.pending.complete(nullptr, RequestID(0), nullptr, nullptr); auto newForwarder = std::make_shared(kFtn, std::nullopt); - auto entry = - registry.createFromPublish(kFtn, newForwarder, nullptr, RequestID(1), nullptr, publishChain); + auto entry = registry.createFromPublish( + kFtn, + newForwarder, + nullptr, + nullptr, + RequestID(1), + nullptr, + publishChain + ); ASSERT_TRUE(entry.evicted.has_value()); EXPECT_EQ(entry.evicted->forwarder, originalForwarder); @@ -147,7 +154,8 @@ TEST(SubscriptionRegistryTest, CreateFromPublishEvictsSubscribeEntry) { TEST(SubscriptionRegistryTest, OnPublisherTerminatedErasesEmptyEntry) { SubscriptionRegistry registry; auto forwarder = std::make_shared(kFtn, std::nullopt); - registry.createFromPublish(kFtn, forwarder, nullptr, RequestID(0), nullptr, publishChain); + registry + .createFromPublish(kFtn, forwarder, nullptr, nullptr, RequestID(0), nullptr, publishChain); auto result = registry.onPublisherTerminated(kFtn); EXPECT_EQ(result, nullptr);