Skip to content

perf-test: increase packets read per loop to 256#367

Closed
afrind wants to merge 8 commits into
mainfrom
pr367
Closed

perf-test: increase packets read per loop to 256#367
afrind wants to merge 8 commits into
mainfrom
pr367

Conversation

afrind and others added 8 commits May 31, 2026 23:36
Converts all MoQRelayTest TEST_F cases to TEST_P and instantiates them
under AllModes/MoQRelayTest with named variants (SingleThread, MultiThread).

MoqxTrackFilterTest overrides relayMode() to always return SingleThread
so it stays as TEST_F without GetParam() UB.

MoqxRelayTestModes.cpp is added only to moqx_relay_test (not the shared
fixture library) to avoid a GTest warning in moqx_track_filter_test.
Adds a use_relay_thread boolean config option (default: true) that
controls whether relay state is isolated on a dedicated executor thread.
Disabling it is intended for baseline performance comparison only.

Also removes the hard error that rejected threads > 1, replacing it
with a targeted check: threads > 1 requires use_relay_thread=true.
This unlocks the config validation only — threads > 1 with
use_relay_thread=true will race on shared relay state until the
following commit wires up the dedicated relay executor.
Add RelayMode::MultiThread to the MoQRelayTest parameterized suite so the
relay is exercised across an executor boundary, alongside the existing
SingleThread mode.

Fixture (MoqxRelayTestFixture.h/.cpp, MoqxRelayTestModes.cpp):
- New RelayMode::MultiThread enum value, instantiated in the AllModes suite.
- SetUp() starts a ScopedEventBaseThread, wires the relay onto it via
  setRelayExec, and wraps it in PublisherCrossExecFilter /
  SubscriberCrossExecFilter. TearDown() drains pending relay-exec tasks and
  tears down in order to avoid use-after-free on NamespaceTree.
- TestMoQExecutor::drive() does a loopOnce(), then in MT mode round-trips
  through the relay EVB twice to flush task->relay and relay-created tasks.
- Add driveIfMultiThread(), make verifyOnRelayExec() hop to the relay EVB,
  and add driveUntil(pred) to advance async cascades deterministically
  instead of using a fixed drive count (capped at one iter in ST mode).

Tests (DataPlane, NGR, Peer, Publish, SubNs, Subscribe):
- Insert driveIfMultiThread() where work crosses executors.
- Replace fixed-drive-then-read with driveUntil() plus std::atomic flags.
- DuplicateSubgroupCancelledWhenNoActiveConsumers splits ST/MT expectations:
  in MT the CrossExecFilter defers the CANCELLED error to the next op, so it
  probes via endOfSubgroup().
- Add reset() calls to simulate the publisher/QUIC resetting open streams.

Test Plan:
Remove the cache from the inline filter chain and attach it as a passive
subscriber of the primary MoQForwarder instead.

Previous chain: TopNFilter → TerminationFilter → SubscribeWriteback(cache) → Forwarder
New chain:      TopNFilter → TerminationFilter → Forwarder
               + forwarder.addSubscriber(passive, NullTrackConsumer via SubscribeWriteback)

MoqxCache::makePassiveConsumer(): returns a SubscribeWriteback wrapping
NullTrackConsumer, suitable for use as a passive forwarder subscriber.

Behavior is identical to before — cache writes remain synchronous on the
publisher's thread and the full MoqxRelayTest suite passes unchanged.
The passive subscriber does not affect forwardingSubscribers_, so removing
all real subscribers still fires onEmpty and tears down the upstream
subscription correctly.

Bumps moxygen submodule to the step-1 commit (NullConsumers + passive flag).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Adds an optional data-plane mode in which each I/O thread owns its own
MoQForwarder per track, so fanning objects out to subscribers on the same
thread needs no cross-thread hop. Gated by the new use_local_forwarders
option (which requires use_relay_thread); off by default.

Theory of operation
--------------------
The relay now runs in one of three modes, chosen per session by
createPublisherFilter()/createSubscriberFilter():

1. Single-threaded (relayExec_ unset): relay state and all forwarders live
   on the session I/O thread; MoqxRelay is the session handler directly.

2. Cross-exec (relayExec_ set, local forwarders off): relay state is
   isolated on a dedicated relay executor. Session handlers are
   Publisher/SubscriberCrossExecFilter, which hop to the relay exec before
   touching state. One primary forwarder per track lives on the relay exec
   and fans out to every subscriber via a per-callback cross-exec hop.

3. Local forwarders (relayExec_ set, local forwarders on): the primary
   forwarder for a track lives on its publisher's I/O thread. Each
   subscriber I/O thread keeps a thread-local forwarder (LocalForwarderRegistry
   in tlForwarders_) that attaches to the primary as a single channel
   subscriber. The primary thus hops once per subscriber *thread*; each local
   forwarder then fans out to its same-thread subscribers with no further hop.

Subscribe (local mode) is split so the relay-exec critical section stays
small: subscribeFromSubscriberThread orchestrates on the subscriber thread,
dispatching subscribeStatefulWork (registry lookup, first-vs-subsequent
decision) to the relay exec, then wiring the local->primary channel; for the
first subscriber, completeFirstSubscriber issues the upstream subscribe and
applies the SubscribeOk on the primary's exec. Publish (local mode) makes the
publisher's local forwarder the primary and registers it on the relay exec via
setupPublisherPrimary; the relay chain (TopNFilter -> termination -> cache)
attaches to it as a passive channel subscriber so it observes all objects
without counting as a forwarding subscriber.

Lifecycle callbacks flow LocalForwarderCallback -> CrossExecForwarderCallback
-> ChannelForwarderCallback (on the primary's exec), which propagates forward
and new-group-request changes upstream via requestUpdate and removes the
channel subscriber on onEmpty. LocalForwarderRegistry removal is
identity-checked so teardown is order-independent. PendingForwarderCallback
captures forwardChanged/newGroupRequested/onEmpty during the setup window for
replay once the real callback is installed. WeakRelayForwarderCallback breaks
the registry -> forwarder -> callback -> relay reference cycle and is now also
used by the cross-exec (non-local) mode.

Config and tests
----------------
- use_local_forwarders config field with ConfigResolver validation (requires
  use_relay_thread), threaded through MoqxRelayContext; --local-forwarders
  flag added to scripts/perf-test.sh.
- The relay test suite gains a third parameterized mode, LocalForwarderMT,
  run alongside SingleThread and MultiThread.
Exposes udpSendBufferBytes/udpRecvBufferBytes via MoQServer::Options,
plumbed through ParsedConfig → ConfigResolver → MoqxRelayServer.
0 (default) keeps the existing MoQServer built-in 1 MB behaviour.
Also comments out reorderingThreshold (removed from moxygen API).
@afrind
Copy link
Copy Markdown
Contributor Author

afrind commented Jun 1, 2026

Replaced by #373

@afrind afrind closed this Jun 1, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant