From e206e73767e7a68312ea357d48fca411e4c900a7 Mon Sep 17 00:00:00 2001 From: Suhas Nandakumar Date: Mon, 13 Apr 2026 21:44:04 -0700 Subject: [PATCH 1/3] Allow caching publishes without active subscribers --- src/MoqxRelay.cpp | 142 ++++++++++++++++++++++++++++- src/MoqxRelay.h | 6 +- src/config/Config.h | 1 + src/config/ConfigResolver.cpp | 6 ++ src/config/loader/ParsedConfig.h | 5 + test/MoqxRelayTest.cpp | 53 +++++++++++ test/config/ConfigResolverTest.cpp | 17 ++++ test/config/LoaderTest.cpp | 2 + 8 files changed, 229 insertions(+), 3 deletions(-) diff --git a/src/MoqxRelay.cpp b/src/MoqxRelay.cpp index dcaeb837..165401e7 100644 --- a/src/MoqxRelay.cpp +++ b/src/MoqxRelay.cpp @@ -703,6 +703,140 @@ class MoqxRelay::TerminationFilter : public TrackConsumerFilter { FullTrackName ftn_; }; +namespace { + +class NullSubgroupConsumer : public SubgroupConsumer { +public: + folly::Expected object( + uint64_t /*objectID*/, + Payload /*payload*/, + Extensions /*extensions*/ = noExtensions(), + bool /*finSubgroup*/ = false + ) override { + return folly::unit; + } + + folly::Expected beginObject( + uint64_t /*objectID*/, + uint64_t length, + Payload initialPayload, + Extensions /*extensions*/ = noExtensions() + ) override { + currentLength_ = length; + if (initialPayload) { + const auto initialBytes = initialPayload->computeChainDataLength(); + if (initialBytes > currentLength_) { + currentLength_ = 0; + return folly::makeUnexpected( + MoQPublishError(MoQPublishError::API_ERROR, "initial payload exceeds object length") + ); + } + currentLength_ -= initialBytes; + } + return folly::unit; + } + + folly::Expected objectPayload( + Payload payload, + bool /*finSubgroup*/ = false + ) override { + const auto payloadBytes = payload ? payload->computeChainDataLength() : 0; + if (payloadBytes > currentLength_) { + currentLength_ = 0; + return folly::makeUnexpected( + MoQPublishError(MoQPublishError::API_ERROR, "object payload exceeds expected length") + ); + } + currentLength_ -= payloadBytes; + return currentLength_ == 0 ? ObjectPublishStatus::DONE : ObjectPublishStatus::IN_PROGRESS; + } + + folly::Expected endOfGroup(uint64_t /*endOfGroupObjectID*/ + ) override { + return folly::unit; + } + + folly::Expected endOfTrackAndGroup(uint64_t /*endOfTrackObjectID*/ + ) override { + return folly::unit; + } + + folly::Expected endOfSubgroup() override { + return folly::unit; + } + + void reset(ResetStreamErrorCode /*error*/) override {} + +private: + uint64_t currentLength_{0}; +}; + +class NoSubscriberTolerantConsumer : public TrackConsumer { +public: + explicit NoSubscriberTolerantConsumer(std::shared_ptr downstream) + : downstream_(std::move(downstream)) {} + + folly::Expected setTrackAlias(TrackAlias alias) override { + return downstream_->setTrackAlias(alias); + } + + folly::Expected, MoQPublishError> + beginSubgroup( + uint64_t groupID, + uint64_t subgroupID, + Priority priority, + bool containsLastInGroup = false + ) override { + auto res = downstream_->beginSubgroup(groupID, subgroupID, priority, containsLastInGroup); + if (res.hasValue() || !shouldSink(res.error())) { + return res; + } + return std::static_pointer_cast(std::make_shared()); + } + + folly::Expected, MoQPublishError> awaitStreamCredit() override { + return downstream_->awaitStreamCredit(); + } + + folly::Expected objectStream( + const ObjectHeader& header, + Payload payload, + bool lastInGroup = false + ) override { + auto res = downstream_->objectStream(header, std::move(payload), lastInGroup); + if (res.hasValue() || !shouldSink(res.error())) { + return res; + } + return folly::unit; + } + + folly::Expected datagram( + const ObjectHeader& header, + Payload payload, + bool lastInGroup = false + ) override { + auto res = downstream_->datagram(header, std::move(payload), lastInGroup); + if (res.hasValue() || !shouldSink(res.error())) { + return res; + } + return folly::unit; + } + + folly::Expected publishDone(PublishDone pubDone) override { + return downstream_->publishDone(std::move(pubDone)); + } + +private: + bool shouldSink(const MoQPublishError& err) const { + return err.code == MoQPublishError::CANCELLED && + downstream_->numForwardingSubscribers() == 0; + } + + std::shared_ptr downstream_; +}; + +} // namespace + std::shared_ptr MoqxRelay::getSubscribeWriteback( const FullTrackName& ftn, std::shared_ptr consumer @@ -718,8 +852,12 @@ MoqxRelay::FilterChainResult MoqxRelay::buildFilterChain(const FullTrackName& ftn, std::shared_ptr forwarder) { // Build chain: TopNFilter → TerminationFilter → (cache?) → Forwarder // This ensures property values are observed in both PUBLISH and SUBSCRIBE paths. - auto baseConsumer = cache_ ? cache_->getSubscribeWriteback(ftn, forwarder) - : std::static_pointer_cast(forwarder); + std::shared_ptr downstreamConsumer = forwarder; + if (cache_ && cachePublishesWithoutSubscribers_) { + downstreamConsumer = std::make_shared(forwarder); + } + auto baseConsumer = + cache_ ? cache_->getSubscribeWriteback(ftn, downstreamConsumer) : downstreamConsumer; auto terminationFilter = std::make_shared(shared_from_this(), ftn, std::move(baseConsumer)); auto topNFilter = diff --git a/src/MoqxRelay.h b/src/MoqxRelay.h index e63d9f23..966a44fa 100644 --- a/src/MoqxRelay.h +++ b/src/MoqxRelay.h @@ -101,7 +101,10 @@ class MoqxRelay : public moxygen::Publisher, std::chrono::milliseconds idleTimeout = kDefaultIdleTimeout, std::chrono::milliseconds activityThreshold = kDefaultActivityThreshold ) - : relayID_(std::move(relayID)), maxDeselected_(maxDeselected), idleTimeout_(idleTimeout), + : relayID_(std::move(relayID)), + cachePublishesWithoutSubscribers_(cache.cachePublishesWithoutSubscribers), + maxDeselected_(maxDeselected), + idleTimeout_(idleTimeout), activityThreshold_(activityThreshold) { if (cache.maxCachedTracks > 0) { cache_ = @@ -417,6 +420,7 @@ class MoqxRelay : public moxygen::Publisher, std::shared_ptr consumer ); std::unique_ptr cache_; + bool cachePublishesWithoutSubscribers_{false}; uint64_t maxDeselected_{kDefaultMaxDeselected}; static constexpr std::chrono::milliseconds kDefaultIdleTimeout{10'000}; diff --git a/src/config/Config.h b/src/config/Config.h index a05e674b..a312a81c 100644 --- a/src/config/Config.h +++ b/src/config/Config.h @@ -34,6 +34,7 @@ struct CacheConfig { size_t maxCachedGroupsPerTrack; uint32_t maxCachedMb{16}; // code default: 16 MB; 0 invalid uint32_t minEvictionKb{64}; // eviction batch floor in KB + bool cachePublishesWithoutSubscribers{false}; std::chrono::milliseconds maxCacheDuration{std::chrono::hours(24)}; // cap on any track duration std::optional defaultMaxCacheDuration; // nullopt = use maxCacheDuration; 0ms = opt-in only diff --git a/src/config/ConfigResolver.cpp b/src/config/ConfigResolver.cpp index 20c29da6..1d0de3de 100644 --- a/src/config/ConfigResolver.cpp +++ b/src/config/ConfigResolver.cpp @@ -49,6 +49,10 @@ mergeCacheConfigs(const ParsedCacheConfig& base, const ParsedCacheConfig& overla merged.min_eviction_kb = overlay.min_eviction_kb.value().has_value() ? overlay.min_eviction_kb.value() : base.min_eviction_kb.value(); + merged.cache_publishes_without_subscribers = + overlay.cache_publishes_without_subscribers.value().has_value() + ? overlay.cache_publishes_without_subscribers.value() + : base.cache_publishes_without_subscribers.value(); merged.max_cache_duration_s = overlay.max_cache_duration_s.value().has_value() ? overlay.max_cache_duration_s.value() : base.max_cache_duration_s.value(); @@ -127,6 +131,8 @@ CacheConfig resolveCacheConfig(const ParsedCacheConfig& cache) { .maxCachedGroupsPerTrack = static_cast(*cache.max_groups_per_track.value()), .maxCachedMb = cache.max_cached_mb.value().value_or(16), .minEvictionKb = cache.min_eviction_kb.value().value_or(64), + .cachePublishesWithoutSubscribers = + cache.cache_publishes_without_subscribers.value().value_or(false), .maxCacheDuration = maxCacheDuration, .defaultMaxCacheDuration = defaultMaxCacheDuration, }; diff --git a/src/config/loader/ParsedConfig.h b/src/config/loader/ParsedConfig.h index c4a68c57..f48466d3 100644 --- a/src/config/loader/ParsedConfig.h +++ b/src/config/loader/ParsedConfig.h @@ -116,6 +116,11 @@ struct ParsedCacheConfig { "Default: 64. Ignored when cache is disabled.", std::optional> min_eviction_kb; + rfl::Description< + "When true, cache objects from incoming PUBLISH streams even if there are no active " + "downstream subscribers at the time the objects arrive. Default: false.", + std::optional> + cache_publishes_without_subscribers; rfl::Description< "Maximum cache duration (seconds) for any track; clamps publisher-set values. " "Also used as the default for tracks without a publisher-set duration when " diff --git a/test/MoqxRelayTest.cpp b/test/MoqxRelayTest.cpp index 116ebdde..349c038c 100644 --- a/test/MoqxRelayTest.cpp +++ b/test/MoqxRelayTest.cpp @@ -317,6 +317,31 @@ class MoQRelayTest : public ::testing::Test { std::shared_ptr relay_; }; +class CacheStatsVisitor : public RelayStateVisitor { +public: + void onPeersBegin() override {} + void onPeer(std::string_view, std::string_view, std::string_view) override {} + void onPeersEnd() override {} + void onSubscriptionsBegin() override {} + void onSubscription(const SubscriptionInfo&) override {} + void onSubscriptionsEnd() override {} + void onNamespaceTreeBegin() override {} + void beginNamespaceNode(std::string_view, const TrackNamespace&, size_t) override {} + void endNamespaceNode() override {} + void onNamespaceTreeEnd() override {} + void onCacheStats( + size_t totalBytesIn, + const std::vector& tracksIn, + MoQCache::TimePoint /*now*/ + ) override { + totalBytes = totalBytesIn; + tracks = tracksIn; + } + + size_t totalBytes{0}; + std::vector tracks; +}; + // Test: Basic relay construction TEST_F(MoQRelayTest, Construction) { EXPECT_NE(relay_, nullptr); @@ -355,6 +380,34 @@ TEST_F(MoQRelayTest, PublishSuccess) { removeSession(publisherSession); } +TEST_F(MoQRelayTest, PublishCanPopulateCacheWithoutSubscribersWhenEnabled) { + relay_ = std::make_shared(config::CacheConfig{ + .maxCachedTracks = 8, + .maxCachedGroupsPerTrack = 4, + .cachePublishesWithoutSubscribers = true, + }); + relay_->setAllowedNamespacePrefix(kAllowedPrefix); + + auto publisherSession = createMockSession(); + doPublishNamespace(publisherSession, kTestNamespace); + + auto consumer = doPublish(publisherSession, kTestTrackName); + auto subgroup = consumer->beginSubgroup(1, 0, 0); + ASSERT_TRUE(subgroup.hasValue()); + EXPECT_TRUE(subgroup.value()->object(0, makeBuf("hello"), noExtensions(), true).hasValue()); + + CacheStatsVisitor visitor; + relay_->dumpState(visitor); + ASSERT_EQ(visitor.tracks.size(), 1); + EXPECT_EQ(visitor.tracks[0].name, kTestTrackName); + ASSERT_EQ(visitor.tracks[0].groups.size(), 1); + EXPECT_EQ(visitor.tracks[0].groups[0].groupId, 1); + EXPECT_EQ(visitor.tracks[0].groups[0].objects, 1); + EXPECT_EQ(visitor.totalBytes, 5); + + removeSession(publisherSession); +} + // Test: Tree pruning when leaf node is removed // Scenario: test/A/B/C and test/A/D exist. Remove C should prune B but keep A // and D diff --git a/test/config/ConfigResolverTest.cpp b/test/config/ConfigResolverTest.cpp index 1dd3966f..fc99e047 100644 --- a/test/config/ConfigResolverTest.cpp +++ b/test/config/ConfigResolverTest.cpp @@ -1451,5 +1451,22 @@ TEST(ResolveConfig, CacheByteLimitsMergeWithDefaults) { EXPECT_EQ(resolved.services.at("overrider").cache.minEvictionKb, 512u); } +TEST(ResolveConfig, CachePublishesWithoutSubscribersDefaultsFalseAndCanBeEnabled) { + auto cfg = makeMinimalInsecureConfig(); + + auto defaultResult = resolveConfig(cfg); + ASSERT_TRUE(defaultResult.hasValue()); + EXPECT_FALSE(defaultResult.value().config.services.at("default").cache + .cachePublishesWithoutSubscribers); + + cfg.services.value().at("default").cache.value()->cache_publishes_without_subscribers = + std::optional{true}; + auto enabledResult = resolveConfig(cfg); + ASSERT_TRUE(enabledResult.hasValue()); + EXPECT_TRUE( + enabledResult.value().config.services.at("default").cache.cachePublishesWithoutSubscribers + ); +} + } // namespace } // namespace openmoq::moqx::config diff --git a/test/config/LoaderTest.cpp b/test/config/LoaderTest.cpp index 43de4aa0..23aad4f9 100644 --- a/test/config/LoaderTest.cpp +++ b/test/config/LoaderTest.cpp @@ -87,6 +87,7 @@ TEST(ConfigLoader, FullConfig) { enabled: true max_tracks: 200 max_groups_per_track: 5 + cache_publishes_without_subscribers: true admin: port: 9669 address: "::1" @@ -115,6 +116,7 @@ TEST(ConfigLoader, FullConfig) { EXPECT_EQ(svc.cache.value()->enabled.value(), true); EXPECT_EQ(svc.cache.value()->max_tracks.value(), 200); EXPECT_EQ(svc.cache.value()->max_groups_per_track.value(), 5); + EXPECT_EQ(svc.cache.value()->cache_publishes_without_subscribers.value(), true); } TEST(ConfigLoader, ServicesWithAuthorityAndPath) { From 415cd89e9c4de24f3a5effa84a1533dee3402f30 Mon Sep 17 00:00:00 2001 From: Suhas Nandakumar Date: Tue, 14 Apr 2026 11:31:11 -0700 Subject: [PATCH 2/3] test: cover late subscribers with cached publishes --- test/MoqxRelayTest.cpp | 49 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/test/MoqxRelayTest.cpp b/test/MoqxRelayTest.cpp index 349c038c..46c5344d 100644 --- a/test/MoqxRelayTest.cpp +++ b/test/MoqxRelayTest.cpp @@ -408,6 +408,55 @@ TEST_F(MoQRelayTest, PublishCanPopulateCacheWithoutSubscribersWhenEnabled) { removeSession(publisherSession); } +TEST_F(MoQRelayTest, LateSubscriberGetsLiveObjectsButNoCachedReplay) { + relay_ = std::make_shared(config::CacheConfig{ + .maxCachedTracks = 8, + .maxCachedGroupsPerTrack = 4, + .cachePublishesWithoutSubscribers = true, + }); + relay_->setAllowedNamespacePrefix(kAllowedPrefix); + + auto publisherSession = createMockSession(); + doPublishNamespace(publisherSession, kTestNamespace); + + auto consumer = doPublish(publisherSession, kTestTrackName); + auto subgroup1 = consumer->beginSubgroup(1, 0, 0); + ASSERT_TRUE(subgroup1.hasValue()); + EXPECT_TRUE(subgroup1.value()->object(0, makeBuf("before"), noExtensions(), true).hasValue()); + + auto subscriberSession = createMockSession(); + auto mockConsumer = createMockConsumer(); + auto mockSubgroupConsumer = createMockSubgroupConsumer(); + + EXPECT_CALL(*mockConsumer, beginSubgroup(1, 0, _, _)).Times(0); + EXPECT_CALL(*mockConsumer, beginSubgroup(2, 0, _, _)) + .WillOnce(Return(folly::makeExpected( + std::static_pointer_cast(mockSubgroupConsumer) + ))); + EXPECT_CALL(*mockSubgroupConsumer, object(0, _, _, true)) + .WillOnce(Return(folly::makeExpected(folly::unit))); + + subscribeToTrack(subscriberSession, kTestTrackName, mockConsumer, RequestID(1)); + + auto subgroup2 = consumer->beginSubgroup(2, 0, 0); + ASSERT_TRUE(subgroup2.hasValue()); + EXPECT_TRUE(subgroup2.value()->object(0, makeBuf("after"), noExtensions(), true).hasValue()); + + CacheStatsVisitor visitor; + relay_->dumpState(visitor); + ASSERT_EQ(visitor.tracks.size(), 1); + EXPECT_EQ(visitor.tracks[0].name, kTestTrackName); + ASSERT_EQ(visitor.tracks[0].groups.size(), 2); + EXPECT_EQ(visitor.tracks[0].groups[0].groupId, 1); + EXPECT_EQ(visitor.tracks[0].groups[0].objects, 1); + EXPECT_EQ(visitor.tracks[0].groups[1].groupId, 2); + EXPECT_EQ(visitor.tracks[0].groups[1].objects, 1); + EXPECT_EQ(visitor.totalBytes, 11); + + removeSession(publisherSession); + removeSession(subscriberSession); +} + // Test: Tree pruning when leaf node is removed // Scenario: test/A/B/C and test/A/D exist. Remove C should prune B but keep A // and D From d05b145f6355b52a5e1ccf31896b219db4eb9435 Mon Sep 17 00:00:00 2001 From: Suhas Nandakumar Date: Tue, 14 Apr 2026 11:49:46 -0700 Subject: [PATCH 3/3] style: wrap late-subscriber test expectations --- test/MoqxRelayTest.cpp | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/test/MoqxRelayTest.cpp b/test/MoqxRelayTest.cpp index 46c5344d..fa00dd81 100644 --- a/test/MoqxRelayTest.cpp +++ b/test/MoqxRelayTest.cpp @@ -422,7 +422,9 @@ TEST_F(MoQRelayTest, LateSubscriberGetsLiveObjectsButNoCachedReplay) { auto consumer = doPublish(publisherSession, kTestTrackName); auto subgroup1 = consumer->beginSubgroup(1, 0, 0); ASSERT_TRUE(subgroup1.hasValue()); - EXPECT_TRUE(subgroup1.value()->object(0, makeBuf("before"), noExtensions(), true).hasValue()); + EXPECT_TRUE( + subgroup1.value()->object(0, makeBuf("before"), noExtensions(), true).hasValue() + ); auto subscriberSession = createMockSession(); auto mockConsumer = createMockConsumer(); @@ -440,7 +442,9 @@ TEST_F(MoQRelayTest, LateSubscriberGetsLiveObjectsButNoCachedReplay) { auto subgroup2 = consumer->beginSubgroup(2, 0, 0); ASSERT_TRUE(subgroup2.hasValue()); - EXPECT_TRUE(subgroup2.value()->object(0, makeBuf("after"), noExtensions(), true).hasValue()); + EXPECT_TRUE( + subgroup2.value()->object(0, makeBuf("after"), noExtensions(), true).hasValue() + ); CacheStatsVisitor visitor; relay_->dumpState(visitor);