Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ add_library(moqx_core STATIC
src/relay/PropertyRanking.cpp
src/relay/CrossExecFilter.cpp
src/relay/CrossExecForwarderCallback.cpp
src/relay/WeakRelayForwarderCallback.cpp
src/relay/PublisherCrossExecFilter.cpp
src/relay/SubscriberCrossExecFilter.cpp
)
Expand Down
17 changes: 15 additions & 2 deletions scripts/perf-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# --delivery-timeout N Delivery timeout in ms (default: 500)
# --transport TYPE quic or webtransport (default: quic)
# --no-relay-thread Disable relay exec thread (use_relay_thread: false)
# --local-forwarders Enable per-subscriber-thread local forwarders
# (use_local_forwarders: true; requires relay thread)
# --io-threads N Number of relay IO threads (default: 1)
# --threads N Number of perf client threads (default: 2)
# --relay-log SPEC folly XLOG config passed as --logging=SPEC to relay
Expand All @@ -34,6 +36,9 @@
# locally. The binary is expected at
# /tmp/moqperf_test_client on the remote host.
# HOST may include a user prefix (user@host).
# --udp-socket-buffer-bytes N
# UDP socket send/recv buffer size in bytes for the
# relay listener (defaults to net.core.wmem_max).
#
# Linux-only options (not supported on macOS):
# --metrics, --perf-duration, --perf-events, --perf-stat, --jemalloc,
Expand All @@ -52,6 +57,7 @@ DURATION=30
DELIVERY_TIMEOUT=500
TRANSPORT="quic"
USE_RELAY_THREAD="true"
USE_LOCAL_FORWARDERS="false"
IO_THREADS=1
CLIENT_THREADS=2
RELAY_LOG_SPEC=""
Expand All @@ -62,6 +68,7 @@ PERF_DURATION=0
PERF_EVENTS=""
RUN_PERF_STAT=false
REMOTE_CLIENT_HOST=""
UDP_SOCKET_BUFFER_BYTES=$(cat /proc/sys/net/core/wmem_max 2>/dev/null || echo 1048576)
TRACE_SCRIPT=""
CLIENT_EXTRA_ARGS=()

Expand All @@ -80,6 +87,7 @@ while [[ $# -gt 0 ]]; do
--delivery-timeout) DELIVERY_TIMEOUT="$2"; shift 2 ;;
--transport) TRANSPORT="$2"; shift 2 ;;
--no-relay-thread) USE_RELAY_THREAD="false"; shift ;;
--local-forwarders) USE_LOCAL_FORWARDERS="true"; shift ;;
--io-threads) IO_THREADS="$2"; shift 2 ;;
--threads) CLIENT_THREADS="$2"; shift 2 ;;
--relay-log) RELAY_LOG_SPEC="$2"; shift 2 ;;
Expand All @@ -93,6 +101,7 @@ while [[ $# -gt 0 ]]; do
--trace-script) TRACE_SCRIPT="$2"; shift 2 ;;
--client-args) read -ra CLIENT_EXTRA_ARGS <<< "$2"; shift 2 ;;
--remote-client) REMOTE_CLIENT_HOST="$2"; shift 2 ;;
--udp-socket-buffer-bytes) UDP_SOCKET_BUFFER_BYTES="$2"; shift 2 ;;
*) echo "Unknown option: $1" >&2; exit 1 ;;
esac
done
Expand Down Expand Up @@ -206,6 +215,7 @@ cat >"$RELAY_CFG" <<EOF
relay_id: "perf-test-relay"
threads: $IO_THREADS
use_relay_thread: $USE_RELAY_THREAD
use_local_forwarders: $USE_LOCAL_FORWARDERS
mvfst_bpf_steering: $BPF_STEERING
listeners:
- name: perf
Expand All @@ -219,7 +229,8 @@ listeners:
mvfst:
enable_gso: true
max_conn_packets_sent_per_loop: 16
max_server_recv_packets_per_loop: 32
max_server_recv_packets_per_loop: 256
udp_socket_buffer_bytes: $UDP_SOCKET_BUFFER_BYTES
bbr2:
exit_startup_on_loss: true
enable_recovery_in_startup: true
Expand Down Expand Up @@ -254,12 +265,14 @@ cp "$RELAY_CFG" "$LOG_DIR/relay.yaml"
echo "transport: $TRANSPORT"
echo "io_threads: $IO_THREADS"
echo "use_relay_thread: $USE_RELAY_THREAD"
echo "local_forwarders: $USE_LOCAL_FORWARDERS"
echo "subscriber_max: $SUBSCRIBER_MAX"
echo "ramp: $RAMP"
echo "duration: $DURATION"
echo "delivery_timeout: $DELIVERY_TIMEOUT"
echo "client_threads: $CLIENT_THREADS"
echo "jemalloc: ${JEMALLOC:-none}"
echo "udp_socket_buf: $UDP_SOCKET_BUFFER_BYTES"
echo "mvfst_bpf_steering: $BPF_STEERING"
[[ "$PERF_DURATION" -gt 0 ]] && echo "perf_duration: ${PERF_DURATION}s (delay=$(( 3 * SUBSCRIBER_MAX / RAMP ))s)" || true
[[ -n "$PERF_EVENTS" ]] && echo "perf_events: $PERF_EVENTS" || true
Expand All @@ -269,7 +282,7 @@ echo ""

# ── Start relay ───────────────────────────────────────────────────────────────
ulimit -n 65536 2>/dev/null || true
echo "Starting relay (use_relay_thread=$USE_RELAY_THREAD, io_threads=$IO_THREADS, transport=$TRANSPORT, mvfst_bpf_steering=$BPF_STEERING)..."
echo "Starting relay (use_relay_thread=$USE_RELAY_THREAD, local_forwarders=$USE_LOCAL_FORWARDERS, io_threads=$IO_THREADS, transport=$TRANSPORT, mvfst_bpf_steering=$BPF_STEERING)..."
RELAY_LOGGING_ARG=()
[[ -n "$RELAY_LOG_SPEC" ]] && RELAY_LOGGING_ARG=("--logging=$RELAY_LOG_SPEC")
if [[ -n "$JEMALLOC" ]]; then
Expand Down
5 changes: 5 additions & 0 deletions src/MoqxCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/

#include "MoqxCache.h"
#include "relay/NullConsumers.h"
#include <folly/logging/xlog.h>
#include <moxygen/MoQTrackProperties.h>

Expand Down Expand Up @@ -1292,6 +1293,10 @@ class MoqxCache::FetchWriteback : public FetchConsumer {
}
};

std::shared_ptr<TrackConsumer> MoqxCache::makePassiveConsumer(const FullTrackName& ftn) {
return getSubscribeWriteback(ftn, std::make_shared<moxygen::NullTrackConsumer>());
}

std::shared_ptr<TrackConsumer> MoqxCache::getSubscribeWriteback(
const FullTrackName& ftn,
std::shared_ptr<TrackConsumer> consumer
Expand Down
5 changes: 5 additions & 0 deletions src/MoqxCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ class MoqxCache {
std::shared_ptr<moxygen::TrackConsumer> consumer
);

// Returns a TrackConsumer suitable for use as a passive forwarder subscriber.
// Objects delivered to it are written to the cache; nothing is forwarded
// downstream (the inner consumer is a NullTrackConsumer).
std::shared_ptr<moxygen::TrackConsumer> makePassiveConsumer(const moxygen::FullTrackName& ftn);

// Serves objects from the cache to the consumer. If objects in the range are
// not in cache, issue one-or-more FETCH'es upstream. Objects fetched from
// upstream are written back to the cache and passed to the consumer.
Expand Down
Loading
Loading