Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 140 additions & 2 deletions src/MoqxRelay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,140 @@ class MoqxRelay::TerminationFilter : public TrackConsumerFilter {
FullTrackName ftn_;
};

namespace {

class NullSubgroupConsumer : public SubgroupConsumer {
public:
folly::Expected<folly::Unit, MoQPublishError> object(
uint64_t /*objectID*/,
Payload /*payload*/,
Extensions /*extensions*/ = noExtensions(),
bool /*finSubgroup*/ = false
) override {
return folly::unit;
}

folly::Expected<folly::Unit, MoQPublishError> beginObject(
uint64_t /*objectID*/,
uint64_t length,
Payload initialPayload,
Extensions /*extensions*/ = noExtensions()
) override {
currentLength_ = length;
if (initialPayload) {
const auto initialBytes = initialPayload->computeChainDataLength();
if (initialBytes > currentLength_) {
currentLength_ = 0;
return folly::makeUnexpected(
MoQPublishError(MoQPublishError::API_ERROR, "initial payload exceeds object length")
);
}
currentLength_ -= initialBytes;
}
return folly::unit;
}

folly::Expected<ObjectPublishStatus, MoQPublishError> objectPayload(
Payload payload,
bool /*finSubgroup*/ = false
) override {
const auto payloadBytes = payload ? payload->computeChainDataLength() : 0;
if (payloadBytes > currentLength_) {
currentLength_ = 0;
return folly::makeUnexpected(
MoQPublishError(MoQPublishError::API_ERROR, "object payload exceeds expected length")
);
}
currentLength_ -= payloadBytes;
return currentLength_ == 0 ? ObjectPublishStatus::DONE : ObjectPublishStatus::IN_PROGRESS;
}

folly::Expected<folly::Unit, MoQPublishError> endOfGroup(uint64_t /*endOfGroupObjectID*/
) override {
return folly::unit;
}

folly::Expected<folly::Unit, MoQPublishError> endOfTrackAndGroup(uint64_t /*endOfTrackObjectID*/
) override {
return folly::unit;
}

folly::Expected<folly::Unit, MoQPublishError> endOfSubgroup() override {
return folly::unit;
}

void reset(ResetStreamErrorCode /*error*/) override {}

private:
uint64_t currentLength_{0};
};

class NoSubscriberTolerantConsumer : public TrackConsumer {
public:
explicit NoSubscriberTolerantConsumer(std::shared_ptr<MoQForwarder> downstream)
: downstream_(std::move(downstream)) {}

folly::Expected<folly::Unit, MoQPublishError> setTrackAlias(TrackAlias alias) override {
return downstream_->setTrackAlias(alias);
}

folly::Expected<std::shared_ptr<SubgroupConsumer>, MoQPublishError>
beginSubgroup(
uint64_t groupID,
uint64_t subgroupID,
Priority priority,
bool containsLastInGroup = false
) override {
auto res = downstream_->beginSubgroup(groupID, subgroupID, priority, containsLastInGroup);
if (res.hasValue() || !shouldSink(res.error())) {
return res;
}
return std::static_pointer_cast<SubgroupConsumer>(std::make_shared<NullSubgroupConsumer>());
}

folly::Expected<folly::SemiFuture<folly::Unit>, MoQPublishError> awaitStreamCredit() override {
return downstream_->awaitStreamCredit();
}

folly::Expected<folly::Unit, MoQPublishError> objectStream(
const ObjectHeader& header,
Payload payload,
bool lastInGroup = false
) override {
auto res = downstream_->objectStream(header, std::move(payload), lastInGroup);
if (res.hasValue() || !shouldSink(res.error())) {
return res;
}
return folly::unit;
}

folly::Expected<folly::Unit, MoQPublishError> datagram(
const ObjectHeader& header,
Payload payload,
bool lastInGroup = false
) override {
auto res = downstream_->datagram(header, std::move(payload), lastInGroup);
if (res.hasValue() || !shouldSink(res.error())) {
return res;
}
return folly::unit;
}

folly::Expected<folly::Unit, MoQPublishError> publishDone(PublishDone pubDone) override {
return downstream_->publishDone(std::move(pubDone));
}

private:
bool shouldSink(const MoQPublishError& err) const {
return err.code == MoQPublishError::CANCELLED &&
downstream_->numForwardingSubscribers() == 0;
}

std::shared_ptr<MoQForwarder> downstream_;
};

} // namespace

std::shared_ptr<TrackConsumer> MoqxRelay::getSubscribeWriteback(
const FullTrackName& ftn,
std::shared_ptr<TrackConsumer> consumer
Expand All @@ -718,8 +852,12 @@ MoqxRelay::FilterChainResult
MoqxRelay::buildFilterChain(const FullTrackName& ftn, std::shared_ptr<MoQForwarder> forwarder) {
// Build chain: TopNFilter → TerminationFilter → (cache?) → Forwarder
// This ensures property values are observed in both PUBLISH and SUBSCRIBE paths.
auto baseConsumer = cache_ ? cache_->getSubscribeWriteback(ftn, forwarder)
: std::static_pointer_cast<TrackConsumer>(forwarder);
std::shared_ptr<TrackConsumer> downstreamConsumer = forwarder;
if (cache_ && cachePublishesWithoutSubscribers_) {
downstreamConsumer = std::make_shared<NoSubscriberTolerantConsumer>(forwarder);
}
auto baseConsumer =
cache_ ? cache_->getSubscribeWriteback(ftn, downstreamConsumer) : downstreamConsumer;
auto terminationFilter =
std::make_shared<TerminationFilter>(shared_from_this(), ftn, std::move(baseConsumer));
auto topNFilter =
Expand Down
6 changes: 5 additions & 1 deletion src/MoqxRelay.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,10 @@ class MoqxRelay : public moxygen::Publisher,
std::chrono::milliseconds idleTimeout = kDefaultIdleTimeout,
std::chrono::milliseconds activityThreshold = kDefaultActivityThreshold
)
: relayID_(std::move(relayID)), maxDeselected_(maxDeselected), idleTimeout_(idleTimeout),
: relayID_(std::move(relayID)),
cachePublishesWithoutSubscribers_(cache.cachePublishesWithoutSubscribers),
maxDeselected_(maxDeselected),
idleTimeout_(idleTimeout),
activityThreshold_(activityThreshold) {
if (cache.maxCachedTracks > 0) {
cache_ =
Expand Down Expand Up @@ -417,6 +420,7 @@ class MoqxRelay : public moxygen::Publisher,
std::shared_ptr<moxygen::TrackConsumer> consumer
);
std::unique_ptr<moxygen::MoQCache> cache_;
bool cachePublishesWithoutSubscribers_{false};
uint64_t maxDeselected_{kDefaultMaxDeselected};

static constexpr std::chrono::milliseconds kDefaultIdleTimeout{10'000};
Expand Down
1 change: 1 addition & 0 deletions src/config/Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ struct CacheConfig {
size_t maxCachedGroupsPerTrack;
uint32_t maxCachedMb{16}; // code default: 16 MB; 0 invalid
uint32_t minEvictionKb{64}; // eviction batch floor in KB
bool cachePublishesWithoutSubscribers{false};
std::chrono::milliseconds maxCacheDuration{std::chrono::hours(24)}; // cap on any track duration
std::optional<std::chrono::milliseconds>
defaultMaxCacheDuration; // nullopt = use maxCacheDuration; 0ms = opt-in only
Expand Down
6 changes: 6 additions & 0 deletions src/config/ConfigResolver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ mergeCacheConfigs(const ParsedCacheConfig& base, const ParsedCacheConfig& overla
merged.min_eviction_kb = overlay.min_eviction_kb.value().has_value()
? overlay.min_eviction_kb.value()
: base.min_eviction_kb.value();
merged.cache_publishes_without_subscribers =
overlay.cache_publishes_without_subscribers.value().has_value()
? overlay.cache_publishes_without_subscribers.value()
: base.cache_publishes_without_subscribers.value();
merged.max_cache_duration_s = overlay.max_cache_duration_s.value().has_value()
? overlay.max_cache_duration_s.value()
: base.max_cache_duration_s.value();
Expand Down Expand Up @@ -127,6 +131,8 @@ CacheConfig resolveCacheConfig(const ParsedCacheConfig& cache) {
.maxCachedGroupsPerTrack = static_cast<size_t>(*cache.max_groups_per_track.value()),
.maxCachedMb = cache.max_cached_mb.value().value_or(16),
.minEvictionKb = cache.min_eviction_kb.value().value_or(64),
.cachePublishesWithoutSubscribers =
cache.cache_publishes_without_subscribers.value().value_or(false),
.maxCacheDuration = maxCacheDuration,
.defaultMaxCacheDuration = defaultMaxCacheDuration,
};
Expand Down
5 changes: 5 additions & 0 deletions src/config/loader/ParsedConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ struct ParsedCacheConfig {
"Default: 64. Ignored when cache is disabled.",
std::optional<uint32_t>>
min_eviction_kb;
rfl::Description<
"When true, cache objects from incoming PUBLISH streams even if there are no active "
"downstream subscribers at the time the objects arrive. Default: false.",
std::optional<bool>>
cache_publishes_without_subscribers;
rfl::Description<
"Maximum cache duration (seconds) for any track; clamps publisher-set values. "
"Also used as the default for tracks without a publisher-set duration when "
Expand Down
106 changes: 106 additions & 0 deletions test/MoqxRelayTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,31 @@ class MoQRelayTest : public ::testing::Test {
std::shared_ptr<MoqxRelay> relay_;
};

class CacheStatsVisitor : public RelayStateVisitor {
public:
void onPeersBegin() override {}
void onPeer(std::string_view, std::string_view, std::string_view) override {}
void onPeersEnd() override {}
void onSubscriptionsBegin() override {}
void onSubscription(const SubscriptionInfo&) override {}
void onSubscriptionsEnd() override {}
void onNamespaceTreeBegin() override {}
void beginNamespaceNode(std::string_view, const TrackNamespace&, size_t) override {}
void endNamespaceNode() override {}
void onNamespaceTreeEnd() override {}
void onCacheStats(
size_t totalBytesIn,
const std::vector<MoQCache::TrackStats>& tracksIn,
MoQCache::TimePoint /*now*/
) override {
totalBytes = totalBytesIn;
tracks = tracksIn;
}

size_t totalBytes{0};
std::vector<MoQCache::TrackStats> tracks;
};

// Test: Basic relay construction
TEST_F(MoQRelayTest, Construction) {
EXPECT_NE(relay_, nullptr);
Expand Down Expand Up @@ -355,6 +380,87 @@ TEST_F(MoQRelayTest, PublishSuccess) {
removeSession(publisherSession);
}

TEST_F(MoQRelayTest, PublishCanPopulateCacheWithoutSubscribersWhenEnabled) {
relay_ = std::make_shared<MoqxRelay>(config::CacheConfig{
.maxCachedTracks = 8,
.maxCachedGroupsPerTrack = 4,
.cachePublishesWithoutSubscribers = true,
});
relay_->setAllowedNamespacePrefix(kAllowedPrefix);

auto publisherSession = createMockSession();
doPublishNamespace(publisherSession, kTestNamespace);

auto consumer = doPublish(publisherSession, kTestTrackName);
auto subgroup = consumer->beginSubgroup(1, 0, 0);
ASSERT_TRUE(subgroup.hasValue());
EXPECT_TRUE(subgroup.value()->object(0, makeBuf("hello"), noExtensions(), true).hasValue());

CacheStatsVisitor visitor;
relay_->dumpState(visitor);
ASSERT_EQ(visitor.tracks.size(), 1);
EXPECT_EQ(visitor.tracks[0].name, kTestTrackName);
ASSERT_EQ(visitor.tracks[0].groups.size(), 1);
EXPECT_EQ(visitor.tracks[0].groups[0].groupId, 1);
EXPECT_EQ(visitor.tracks[0].groups[0].objects, 1);
EXPECT_EQ(visitor.totalBytes, 5);

removeSession(publisherSession);
}

TEST_F(MoQRelayTest, LateSubscriberGetsLiveObjectsButNoCachedReplay) {
relay_ = std::make_shared<MoqxRelay>(config::CacheConfig{
.maxCachedTracks = 8,
.maxCachedGroupsPerTrack = 4,
.cachePublishesWithoutSubscribers = true,
});
relay_->setAllowedNamespacePrefix(kAllowedPrefix);

auto publisherSession = createMockSession();
doPublishNamespace(publisherSession, kTestNamespace);

auto consumer = doPublish(publisherSession, kTestTrackName);
auto subgroup1 = consumer->beginSubgroup(1, 0, 0);
ASSERT_TRUE(subgroup1.hasValue());
EXPECT_TRUE(
subgroup1.value()->object(0, makeBuf("before"), noExtensions(), true).hasValue()
);

auto subscriberSession = createMockSession();
auto mockConsumer = createMockConsumer();
auto mockSubgroupConsumer = createMockSubgroupConsumer();

EXPECT_CALL(*mockConsumer, beginSubgroup(1, 0, _, _)).Times(0);
EXPECT_CALL(*mockConsumer, beginSubgroup(2, 0, _, _))
.WillOnce(Return(folly::makeExpected<MoQPublishError>(
std::static_pointer_cast<SubgroupConsumer>(mockSubgroupConsumer)
)));
EXPECT_CALL(*mockSubgroupConsumer, object(0, _, _, true))
.WillOnce(Return(folly::makeExpected<MoQPublishError>(folly::unit)));

subscribeToTrack(subscriberSession, kTestTrackName, mockConsumer, RequestID(1));

auto subgroup2 = consumer->beginSubgroup(2, 0, 0);
ASSERT_TRUE(subgroup2.hasValue());
EXPECT_TRUE(
subgroup2.value()->object(0, makeBuf("after"), noExtensions(), true).hasValue()
);

CacheStatsVisitor visitor;
relay_->dumpState(visitor);
ASSERT_EQ(visitor.tracks.size(), 1);
EXPECT_EQ(visitor.tracks[0].name, kTestTrackName);
ASSERT_EQ(visitor.tracks[0].groups.size(), 2);
EXPECT_EQ(visitor.tracks[0].groups[0].groupId, 1);
EXPECT_EQ(visitor.tracks[0].groups[0].objects, 1);
EXPECT_EQ(visitor.tracks[0].groups[1].groupId, 2);
EXPECT_EQ(visitor.tracks[0].groups[1].objects, 1);
EXPECT_EQ(visitor.totalBytes, 11);

removeSession(publisherSession);
removeSession(subscriberSession);
}

// Test: Tree pruning when leaf node is removed
// Scenario: test/A/B/C and test/A/D exist. Remove C should prune B but keep A
// and D
Expand Down
17 changes: 17 additions & 0 deletions test/config/ConfigResolverTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1451,5 +1451,22 @@ TEST(ResolveConfig, CacheByteLimitsMergeWithDefaults) {
EXPECT_EQ(resolved.services.at("overrider").cache.minEvictionKb, 512u);
}

TEST(ResolveConfig, CachePublishesWithoutSubscribersDefaultsFalseAndCanBeEnabled) {
auto cfg = makeMinimalInsecureConfig();

auto defaultResult = resolveConfig(cfg);
ASSERT_TRUE(defaultResult.hasValue());
EXPECT_FALSE(defaultResult.value().config.services.at("default").cache
.cachePublishesWithoutSubscribers);

cfg.services.value().at("default").cache.value()->cache_publishes_without_subscribers =
std::optional<bool>{true};
auto enabledResult = resolveConfig(cfg);
ASSERT_TRUE(enabledResult.hasValue());
EXPECT_TRUE(
enabledResult.value().config.services.at("default").cache.cachePublishesWithoutSubscribers
);
}

} // namespace
} // namespace openmoq::moqx::config
2 changes: 2 additions & 0 deletions test/config/LoaderTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ TEST(ConfigLoader, FullConfig) {
enabled: true
max_tracks: 200
max_groups_per_track: 5
cache_publishes_without_subscribers: true
admin:
port: 9669
address: "::1"
Expand Down Expand Up @@ -115,6 +116,7 @@ TEST(ConfigLoader, FullConfig) {
EXPECT_EQ(svc.cache.value()->enabled.value(), true);
EXPECT_EQ(svc.cache.value()->max_tracks.value(), 200);
EXPECT_EQ(svc.cache.value()->max_groups_per_track.value(), 5);
EXPECT_EQ(svc.cache.value()->cache_publishes_without_subscribers.value(), true);
}

TEST(ConfigLoader, ServicesWithAuthorityAndPath) {
Expand Down
Loading