Conversation
Converts all MoQRelayTest TEST_F cases to TEST_P and instantiates them under AllModes/MoQRelayTest with named variants (SingleThread, MultiThread). MoqxTrackFilterTest overrides relayMode() to always return SingleThread so it stays as TEST_F without GetParam() UB. MoqxRelayTestModes.cpp is added only to moqx_relay_test (not the shared fixture library) to avoid a GTest warning in moqx_track_filter_test.
Adds a use_relay_thread boolean config option (default: true) that controls whether relay state is isolated on a dedicated executor thread. Disabling it is intended for baseline performance comparison only. Also removes the hard error that rejected threads > 1, replacing it with a targeted check: threads > 1 requires use_relay_thread=true. This unlocks the config validation only — threads > 1 with use_relay_thread=true will race on shared relay state until the following commit wires up the dedicated relay executor.
Add RelayMode::MultiThread to the MoQRelayTest parameterized suite so the relay is exercised across an executor boundary, alongside the existing SingleThread mode. Fixture (MoqxRelayTestFixture.h/.cpp, MoqxRelayTestModes.cpp): - New RelayMode::MultiThread enum value, instantiated in the AllModes suite. - SetUp() starts a ScopedEventBaseThread, wires the relay onto it via setRelayExec, and wraps it in PublisherCrossExecFilter / SubscriberCrossExecFilter. TearDown() drains pending relay-exec tasks and tears down in order to avoid use-after-free on NamespaceTree. - TestMoQExecutor::drive() does a loopOnce(), then in MT mode round-trips through the relay EVB twice to flush task->relay and relay-created tasks. - Add driveIfMultiThread(), make verifyOnRelayExec() hop to the relay EVB, and add driveUntil(pred) to advance async cascades deterministically instead of using a fixed drive count (capped at one iter in ST mode). Tests (DataPlane, NGR, Peer, Publish, SubNs, Subscribe): - Insert driveIfMultiThread() where work crosses executors. - Replace fixed-drive-then-read with driveUntil() plus std::atomic flags. - DuplicateSubgroupCancelledWhenNoActiveConsumers splits ST/MT expectations: in MT the CrossExecFilter defers the CANCELLED error to the next op, so it probes via endOfSubgroup(). - Add reset() calls to simulate the publisher/QUIC resetting open streams. Test Plan:
Remove the cache from the inline filter chain and attach it as a passive
subscriber of the primary MoQForwarder instead.
Previous chain: TopNFilter → TerminationFilter → SubscribeWriteback(cache) → Forwarder
New chain: TopNFilter → TerminationFilter → Forwarder
+ forwarder.addSubscriber(passive, NullTrackConsumer via SubscribeWriteback)
MoqxCache::makePassiveConsumer(): returns a SubscribeWriteback wrapping
NullTrackConsumer, suitable for use as a passive forwarder subscriber.
Behavior is identical to before — cache writes remain synchronous on the
publisher's thread and the full MoqxRelayTest suite passes unchanged.
The passive subscriber does not affect forwardingSubscribers_, so removing
all real subscribers still fires onEmpty and tears down the upstream
subscription correctly.
Bumps moxygen submodule to the step-1 commit (NullConsumers + passive flag).
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Adds an optional data-plane mode in which each I/O thread owns its own MoQForwarder per track, so fanning objects out to subscribers on the same thread needs no cross-thread hop. Gated by the new use_local_forwarders option (which requires use_relay_thread); off by default. Theory of operation -------------------- The relay now runs in one of three modes, chosen per session by createPublisherFilter()/createSubscriberFilter(): 1. Single-threaded (relayExec_ unset): relay state and all forwarders live on the session I/O thread; MoqxRelay is the session handler directly. 2. Cross-exec (relayExec_ set, local forwarders off): relay state is isolated on a dedicated relay executor. Session handlers are Publisher/SubscriberCrossExecFilter, which hop to the relay exec before touching state. One primary forwarder per track lives on the relay exec and fans out to every subscriber via a per-callback cross-exec hop. 3. Local forwarders (relayExec_ set, local forwarders on): the primary forwarder for a track lives on its publisher's I/O thread. Each subscriber I/O thread keeps a thread-local forwarder (LocalForwarderRegistry in tlForwarders_) that attaches to the primary as a single channel subscriber. The primary thus hops once per subscriber *thread*; each local forwarder then fans out to its same-thread subscribers with no further hop. Subscribe (local mode) is split so the relay-exec critical section stays small: subscribeFromSubscriberThread orchestrates on the subscriber thread, dispatching subscribeStatefulWork (registry lookup, first-vs-subsequent decision) to the relay exec, then wiring the local->primary channel; for the first subscriber, completeFirstSubscriber issues the upstream subscribe and applies the SubscribeOk on the primary's exec. Publish (local mode) makes the publisher's local forwarder the primary and registers it on the relay exec via setupPublisherPrimary; the relay chain (TopNFilter -> termination -> cache) attaches to it as a passive channel subscriber so it observes all objects without counting as a forwarding subscriber. Lifecycle callbacks flow LocalForwarderCallback -> CrossExecForwarderCallback -> ChannelForwarderCallback (on the primary's exec), which propagates forward and new-group-request changes upstream via requestUpdate and removes the channel subscriber on onEmpty. LocalForwarderRegistry removal is identity-checked so teardown is order-independent. PendingForwarderCallback captures forwardChanged/newGroupRequested/onEmpty during the setup window for replay once the real callback is installed. WeakRelayForwarderCallback breaks the registry -> forwarder -> callback -> relay reference cycle and is now also used by the cross-exec (non-local) mode. Config and tests ---------------- - use_local_forwarders config field with ConfigResolver validation (requires use_relay_thread), threaded through MoqxRelayContext; --local-forwarders flag added to scripts/perf-test.sh. - The relay test suite gains a third parameterized mode, LocalForwarderMT, run alongside SingleThread and MultiThread.
Exposes udpSendBufferBytes/udpRecvBufferBytes via MoQServer::Options, plumbed through ParsedConfig → ConfigResolver → MoqxRelayServer. 0 (default) keeps the existing MoQServer built-in 1 MB behaviour. Also comments out reorderingThreshold (removed from moxygen API).
This was referenced Jun 1, 2026
Contributor
Author
|
Replaced by #373 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This change is