From 53d6228b4277b2b1489d893c240a4928121da4d0 Mon Sep 17 00:00:00 2001 From: Rahmeen Habib Date: Wed, 1 Jul 2026 16:56:39 +0000 Subject: [PATCH] Add pause/resume logic in consumers and consumer host health awareness --- src/rmq/rmqa/CMakeLists.txt | 1 - src/rmq/rmqa/rmqa_consumerimpl.cpp | 1 - src/rmq/rmqa/rmqa_rabbitcontextimpl.cpp | 74 +- src/rmq/rmqa/rmqa_rabbitcontextimpl.h | 5 +- src/rmq/rmqa/rmqa_rabbitcontextoptions.cpp | 8 + src/rmq/rmqa/rmqa_rabbitcontextoptions.h | 27 + src/rmq/rmqamqp/CMakeLists.txt | 1 + src/rmq/rmqamqp/rmqamqp_channel.cpp | 4 + src/rmq/rmqamqp/rmqamqp_channelfactory.cpp | 39 +- src/rmq/rmqamqp/rmqamqp_channelfactory.h | 13 + src/rmq/rmqamqp/rmqamqp_channelmap.cpp | 38 + src/rmq/rmqamqp/rmqamqp_channelmap.h | 10 + src/rmq/rmqamqp/rmqamqp_connection.cpp | 31 +- src/rmq/rmqamqp/rmqamqp_connection.h | 14 +- .../rmqamqp/rmqamqp_heartbeatmanagerimpl.cpp | 31 +- .../rmqamqp/rmqamqp_heartbeatmanagerimpl.h | 9 +- src/rmq/rmqamqp/rmqamqp_hosthealthmonitor.cpp | 265 +++++ src/rmq/rmqamqp/rmqamqp_hosthealthmonitor.h | 104 ++ src/rmq/rmqamqp/rmqamqp_metrics.cpp | 34 + src/rmq/rmqamqp/rmqamqp_metrics.h | 23 + src/rmq/rmqamqp/rmqamqp_receivechannel.cpp | 457 +++++++-- src/rmq/rmqamqp/rmqamqp_receivechannel.h | 27 +- src/rmq/rmqp/rmqp_metricpublisher.h | 18 +- src/rmq/rmqt/CMakeLists.txt | 1 + src/rmq/rmqt/rmqt_consumerconfig.cpp | 1 + src/rmq/rmqt/rmqt_consumerconfig.h | 24 + src/rmq/rmqt/rmqt_hosthealthconfig.cpp | 52 + src/rmq/rmqt/rmqt_hosthealthconfig.h | 97 ++ src/tests/rmqa/rmqa_connectionimpl.t.cpp | 4 +- src/tests/rmqa/rmqa_rabbitcontextimpl.t.cpp | 10 +- .../rmqa/rmqa_rabbitcontextoptions.t.cpp | 24 + src/tests/rmqamqp/CMakeLists.txt | 1 + src/tests/rmqamqp/rmqamqp_channelmap.t.cpp | 97 +- src/tests/rmqamqp/rmqamqp_connection.t.cpp | 3 +- .../rmqamqp_heartbeatmanagerimpl.t.cpp | 75 +- .../rmqamqp/rmqamqp_hosthealthmonitor.t.cpp | 684 +++++++++++++ .../rmqamqp/rmqamqp_receivechannel.t.cpp | 902 +++++++++++++++++- src/tests/rmqt/rmqt_consumerconfig.t.cpp | 17 + src/tests/rmqtestutil/CMakeLists.txt | 2 +- .../rmqtestutil/rmqtestutil_mockchannel.t.h | 12 +- 40 files changed, 3052 insertions(+), 188 deletions(-) create mode 100644 src/rmq/rmqamqp/rmqamqp_hosthealthmonitor.cpp create mode 100644 src/rmq/rmqamqp/rmqamqp_hosthealthmonitor.h create mode 100644 src/rmq/rmqt/rmqt_hosthealthconfig.cpp create mode 100644 src/rmq/rmqt/rmqt_hosthealthconfig.h create mode 100644 src/tests/rmqamqp/rmqamqp_hosthealthmonitor.t.cpp diff --git a/src/rmq/rmqa/CMakeLists.txt b/src/rmq/rmqa/CMakeLists.txt index bf61b103..98236df5 100644 --- a/src/rmq/rmqa/CMakeLists.txt +++ b/src/rmq/rmqa/CMakeLists.txt @@ -22,7 +22,6 @@ add_library(rmqa OBJECT rmqa_vhostimpl.cpp ) - target_link_libraries(rmqa PUBLIC bsl bdl diff --git a/src/rmq/rmqa/rmqa_consumerimpl.cpp b/src/rmq/rmqa/rmqa_consumerimpl.cpp index afe082d6..5b89f7d3 100644 --- a/src/rmq/rmqa/rmqa_consumerimpl.cpp +++ b/src/rmq/rmqa/rmqa_consumerimpl.cpp @@ -40,7 +40,6 @@ namespace BloombergLP { namespace rmqa { namespace { BALL_LOG_SET_NAMESPACE_CATEGORY("RMQA.CONSUMERIMPL") - } // namespace ConsumerImpl::Factory::~Factory() {} diff --git a/src/rmq/rmqa/rmqa_rabbitcontextimpl.cpp b/src/rmq/rmqa/rmqa_rabbitcontextimpl.cpp index aa918ddf..d84fff9d 100644 --- a/src/rmq/rmqa/rmqa_rabbitcontextimpl.cpp +++ b/src/rmq/rmqa/rmqa_rabbitcontextimpl.cpp @@ -25,12 +25,14 @@ #include #include +#include #include #include #include #include #include #include +#include #include #include @@ -43,6 +45,7 @@ #include #include +#include #include namespace BloombergLP { @@ -70,7 +73,10 @@ void handleErrorCbOnEventLoop(bdlmt::ThreadPool* threadPool, void startFirstConnection( const bsl::weak_ptr& weakConn, - const rmqamqp::Connection::ConnectedCallback& callback) + const rmqamqp::Connection::ConnectedCallback& callback, + const bsl::shared_ptr& hostHealthMonitor, + const bsl::shared_ptr& metricPublisher, + const bsl::shared_ptr& endpoint) { bsl::shared_ptr amqpConn = weakConn.lock(); if (!amqpConn) { @@ -79,6 +85,27 @@ void startFirstConnection( } amqpConn->startFirstConnection(callback); + + bsl::vector > vhostTags; + vhostTags.push_back(bsl::pair( + rmqamqp::Metrics::VHOST_TAG, endpoint->vhost())); + + if (hostHealthMonitor) { + BALL_LOG_INFO << "Registering connection '" + << amqpConn->connectionDebugName() + << "' with host health monitor."; + hostHealthMonitor->registerConnection( + bsl::weak_ptr(amqpConn)); + + // Publish health-aware vhost created counter + metricPublisher->publishCounter( + rmqamqp::Metrics::HEALTH_AWARE_VHOST_CREATED, 1.0, vhostTags); + } + else { + // Publish health-unaware vhost created counter + metricPublisher->publishCounter( + rmqamqp::Metrics::HEALTH_UNAWARE_VHOST_CREATED, 1.0, vhostTags); + } } void initiateConnection( @@ -142,7 +169,7 @@ RabbitContextImpl::RabbitContextImpl( bslma::ManagedPtr eventLoop, const rmqa::RabbitContextOptions& options) : d_eventLoop(eventLoop) -, d_watchDog(bsl::make_shared( +, d_connectionWatchDog(bsl::make_shared( bsls::TimeInterval(DEFAULT_WATCHDOG_PERIOD))) , d_threadPool(options.threadpool()) , d_hostedThreadPool() @@ -153,26 +180,42 @@ RabbitContextImpl::RabbitContextImpl( bdlf::PlaceHolders::_2)) , d_connectionMonitor( bsl::make_shared(options.messageProcessingTimeout())) +, d_metricPublisher(options.metricPublisher() + ? options.metricPublisher() + : bsl::shared_ptr( + bsl::make_shared())) +, d_hostHealthMonitor() , d_connectionFactory() , d_tunables(options.tunables()) , d_consumerTracing(options.consumerTracing()) , d_producerTracing(options.producerTracing()) { - bsl::shared_ptr metricPublisher = - options.metricPublisher(); - if (!metricPublisher) { - metricPublisher = bsl::make_shared(); + + const bool isHostHealthMonitoringEnabled = + options.hostHealthConfig().has_value(); + + // Host health monitoring enabled + if (isHostHealthMonitoringEnabled) { + const rmqt::HostHealthConfig& hostHealthConfig = + *options.hostHealthConfig(); + + d_hostHealthMonitor = bsl::make_shared( + hostHealthConfig, d_metricPublisher.get()); + d_hostHealthMonitor->start(d_eventLoop->timerFactory()); } + d_connectionFactory = bslma::ManagedPtrUtil::makeManaged( d_eventLoop->resolver( options.shuffleConnectionEndpoints().value_or(false)), d_eventLoop->timerFactory(), d_onError, - metricPublisher, + d_metricPublisher, d_connectionMonitor, options.clientProperties(), - options.connectionErrorThreshold()); + options.connectionErrorThreshold(), + isHostHealthMonitoringEnabled); + if (!d_threadPool) { bslmt::ThreadAttributes attributes; attributes.setThreadName(DEFAULT_THREADPOOL_WORKER_NAME); @@ -188,8 +231,10 @@ RabbitContextImpl::RabbitContextImpl( BSLS_REVIEW(d_threadPool->enabled()); d_eventLoop->start(); - d_watchDog->addTask(bsl::weak_ptr(d_connectionMonitor)); - d_watchDog->start(d_eventLoop->timerFactory()); + + d_connectionWatchDog->addTask( + bsl::weak_ptr(d_connectionMonitor)); + d_connectionWatchDog->start(d_eventLoop->timerFactory()); } RabbitContextImpl::~RabbitContextImpl() @@ -202,9 +247,11 @@ RabbitContextImpl::~RabbitContextImpl() d_connectionFactory.reset(); + d_hostHealthMonitor.reset(); + d_hostedThreadPool.reset(); - d_watchDog.reset(); + d_connectionWatchDog.reset(); const bool eventLoopShutdownResult = d_eventLoop->waitForEventLoopExit(60); @@ -370,7 +417,10 @@ rmqt::Future RabbitContextImpl::createNewConnection( d_eventLoop->post( bdlf::BindUtil::bind(&startFirstConnection, bsl::weak_ptr(amqpConn), - cb)); + cb, + d_hostHealthMonitor, + d_metricPublisher, + endpoint)); return futurePair.second; } diff --git a/src/rmq/rmqa/rmqa_rabbitcontextimpl.h b/src/rmq/rmqa/rmqa_rabbitcontextimpl.h index f9271be1..4420118a 100644 --- a/src/rmq/rmqa/rmqa_rabbitcontextimpl.h +++ b/src/rmq/rmqa/rmqa_rabbitcontextimpl.h @@ -21,6 +21,7 @@ #include #include +#include #include #include #include @@ -71,11 +72,13 @@ class RabbitContextImpl : public rmqp::RabbitContext { private: static const int DEFAULT_WATCHDOG_PERIOD = 60; bslma::ManagedPtr d_eventLoop; - bsl::shared_ptr d_watchDog; + bsl::shared_ptr d_connectionWatchDog; bdlmt::ThreadPool* d_threadPool; bslma::ManagedPtr d_hostedThreadPool; rmqt::ErrorCallback d_onError; bsl::shared_ptr d_connectionMonitor; + bsl::shared_ptr d_metricPublisher; + bsl::shared_ptr d_hostHealthMonitor; bslma::ManagedPtr d_connectionFactory; rmqt::Tunables d_tunables; bsl::shared_ptr d_consumerTracing; diff --git a/src/rmq/rmqa/rmqa_rabbitcontextoptions.cpp b/src/rmq/rmqa/rmqa_rabbitcontextoptions.cpp index 457da3e3..14021259 100644 --- a/src/rmq/rmqa/rmqa_rabbitcontextoptions.cpp +++ b/src/rmq/rmqa/rmqa_rabbitcontextoptions.cpp @@ -67,6 +67,7 @@ RabbitContextOptions::RabbitContextOptions() , d_tunables() , d_connectionErrorThreshold() , d_shuffleConnectionEndpoints() +, d_hostHealthConfig() { populateUsefulInformation(&d_clientProperties); } @@ -140,6 +141,13 @@ RabbitContextOptions& RabbitContextOptions::setShuffleConnectionEndpoints( return *this; } +RabbitContextOptions& RabbitContextOptions::setHostHealthConfig( + const rmqt::HostHealthConfig& hostHealthConfig) +{ + d_hostHealthConfig = hostHealthConfig; + return *this; +} + #ifdef USES_LIBRMQ_EXPERIMENTAL_FEATURES RabbitContextOptions& RabbitContextOptions::setTunable(const bsl::string& tunable) diff --git a/src/rmq/rmqa/rmqa_rabbitcontextoptions.h b/src/rmq/rmqa/rmqa_rabbitcontextoptions.h index b508997c..4ab70ebb 100644 --- a/src/rmq/rmqa/rmqa_rabbitcontextoptions.h +++ b/src/rmq/rmqa/rmqa_rabbitcontextoptions.h @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -141,6 +142,23 @@ class RabbitContextOptions { RabbitContextOptions& setShuffleConnectionEndpoints(bool shuffleConnectionEndpoints); + /// \brief Set host health config for connections created by this context to + /// ensure that RabbitMQ consumers are connected from healthy hosts. + /// When host health config is set, the host health monitor is created and + /// consumers with \c consumeOnlyFromHealthyHost enabled (the default) will + /// pause message delivery when the host becomes unhealthy and resume when + /// the host is deemed healthy again. + /// \note By default, \c ConsumerConfig::consumeOnlyFromHealthyHost is true, + /// meaning consumers automatically participate in host health monitoring + /// when host health config is set at the context level. Consumers can + /// opt out by calling \c + /// ConsumerConfig::setConsumeOnlyFromHealthyHost(false). If host health + /// config is not set, \c consumeOnlyFromHealthyHost has no effect. + /// + /// \param hostHealthConfig configuration for host health monitoring + RabbitContextOptions& + setHostHealthConfig(const rmqt::HostHealthConfig& hostHealthConfig); + bdlmt::ThreadPool* threadpool() const { return d_threadpool; } const bsl::shared_ptr& metricPublisher() const @@ -182,6 +200,14 @@ class RabbitContextOptions { return d_shuffleConnectionEndpoints; } + /// \brief Get the host health config. If not set, host health monitoring is + /// disabled. By default, host health monitoring is disabled. + /// \return The host health config + const bsl::optional& hostHealthConfig() const + { + return d_hostHealthConfig; + } + #ifdef USES_LIBRMQ_EXPERIMENTAL_FEATURES RabbitContextOptions& setTunable(const bsl::string& tunable); #endif @@ -198,6 +224,7 @@ class RabbitContextOptions { bsl::shared_ptr d_consumerTracing; bsl::shared_ptr d_producerTracing; bsl::optional d_shuffleConnectionEndpoints; + bsl::optional d_hostHealthConfig; }; } // namespace rmqa diff --git a/src/rmq/rmqamqp/CMakeLists.txt b/src/rmq/rmqamqp/CMakeLists.txt index 2301a639..7b8d48f6 100644 --- a/src/rmq/rmqamqp/CMakeLists.txt +++ b/src/rmq/rmqamqp/CMakeLists.txt @@ -9,6 +9,7 @@ add_library(rmqamqp OBJECT rmqamqp_framer.cpp rmqamqp_heartbeatmanager.cpp rmqamqp_heartbeatmanagerimpl.cpp + rmqamqp_hosthealthmonitor.cpp rmqamqp_message.cpp rmqamqp_messagestore.cpp rmqamqp_messagewithroute.cpp diff --git a/src/rmq/rmqamqp/rmqamqp_channel.cpp b/src/rmq/rmqamqp/rmqamqp_channel.cpp index c23a540b..2c847a6c 100644 --- a/src/rmq/rmqamqp/rmqamqp_channel.cpp +++ b/src/rmq/rmqamqp/rmqamqp_channel.cpp @@ -475,9 +475,13 @@ void Channel::retry(const bsl::weak_ptr& weakSelf) self->open(); } + void Channel::onReset() {} + void Channel::onFlowAllowed() {} + void Channel::onOpen() { ready(); } + void Channel::ready() { BALL_LOG_TRACE << "Channel Ready"; diff --git a/src/rmq/rmqamqp/rmqamqp_channelfactory.cpp b/src/rmq/rmqamqp/rmqamqp_channelfactory.cpp index 78b8ff2d..eb350c46 100644 --- a/src/rmq/rmqamqp/rmqamqp_channelfactory.cpp +++ b/src/rmq/rmqamqp/rmqamqp_channelfactory.cpp @@ -15,9 +15,20 @@ #include +#include + +#include +#include + namespace BloombergLP { namespace rmqamqp { +ChannelFactory::ChannelFactory( + const ChannelOnOpenState receiveChannelOnOpenState) +: d_receiveChannelOnOpenState(receiveChannelOnOpenState) +{ +} + bsl::shared_ptr ChannelFactory::createReceiveChannel( const rmqt::Topology& topology, const Channel::AsyncWriteCallback& onAsyncWrite, @@ -29,6 +40,25 @@ bsl::shared_ptr ChannelFactory::createReceiveChannel( const bsl::shared_ptr& hungProgressTimer, const Channel::HungChannelCallback& connErrorCb) { + const bool channelPausedOnOpen = + d_receiveChannelOnOpenState == PAUSED || + (consumerConfig.consumeOnlyFromHealthyHost() && + d_receiveChannelOnOpenState == PAUSED_HOST_HEALTH_AWARE); + + // Publish health-aware or health-unaware consumer created counter + bsl::vector > vhostTags; + vhostTags.push_back( + bsl::pair(Metrics::VHOST_TAG, vhost)); + + if (consumerConfig.consumeOnlyFromHealthyHost()) { + metricPublisher->publishCounter( + Metrics::HEALTH_AWARE_CONSUMER_CREATED, 1.0, vhostTags); + } + else { + metricPublisher->publishCounter( + Metrics::HEALTH_UNAWARE_CONSUMER_CREATED, 1.0, vhostTags); + } + return bsl::make_shared(topology, onAsyncWrite, retryHandler, @@ -37,7 +67,8 @@ bsl::shared_ptr ChannelFactory::createReceiveChannel( vhost, ackQueue, hungProgressTimer, - connErrorCb); + connErrorCb, + channelPausedOnOpen); } bsl::shared_ptr ChannelFactory::createSendChannel( @@ -60,5 +91,11 @@ bsl::shared_ptr ChannelFactory::createSendChannel( connErrorCb); } +void ChannelFactory::setReceiveChannelOnOpenState( + const ChannelOnOpenState channelOnOpenState) +{ + d_receiveChannelOnOpenState = channelOnOpenState; +} + } // namespace rmqamqp } // namespace BloombergLP diff --git a/src/rmq/rmqamqp/rmqamqp_channelfactory.h b/src/rmq/rmqamqp/rmqamqp_channelfactory.h index d0ea3ed7..f97daafe 100644 --- a/src/rmq/rmqamqp/rmqamqp_channelfactory.h +++ b/src/rmq/rmqamqp/rmqamqp_channelfactory.h @@ -36,6 +36,13 @@ namespace rmqamqp { class ChannelFactory { public: + enum ChannelOnOpenState { + PAUSED, + PAUSED_HOST_HEALTH_AWARE, + NOT_PAUSED, + }; + + ChannelFactory(const ChannelOnOpenState channelOnOpenState = NOT_PAUSED); virtual ~ChannelFactory() {}; virtual bsl::shared_ptr createReceiveChannel( @@ -58,6 +65,12 @@ class ChannelFactory { const bsl::string& vhost, const bsl::shared_ptr& hungProgressTimer, const Channel::HungChannelCallback& connErrorCb); + + void setReceiveChannelOnOpenState( + const ChannelOnOpenState receiveChannelOnOpenState); + + private: + ChannelOnOpenState d_receiveChannelOnOpenState; }; } // namespace rmqamqp diff --git a/src/rmq/rmqamqp/rmqamqp_channelmap.cpp b/src/rmq/rmqamqp/rmqamqp_channelmap.cpp index ef7c3f82..2fab7f13 100644 --- a/src/rmq/rmqamqp/rmqamqp_channelmap.cpp +++ b/src/rmq/rmqamqp/rmqamqp_channelmap.cpp @@ -114,6 +114,44 @@ void ChannelMap::openAll() } } +void ChannelMap::pauseReceiveChannels(const bool respectHostHealth) +{ + for (ReceiveChannelMap::const_iterator it = d_receiveChannels.cbegin(); + it != d_receiveChannels.cend(); + ++it) { + + bsl::shared_ptr channel = it->second; + + if (respectHostHealth && + !channel->consumerConfig().consumeOnlyFromHealthyHost()) { + BALL_LOG_TRACE << "Channel " << it->first + << " is not host-health aware. Skipping pause."; + } + else { + channel->pause(); + } + } +} + +void ChannelMap::resumeReceiveChannels(const bool respectHostHealth) +{ + for (ReceiveChannelMap::const_iterator it = d_receiveChannels.cbegin(); + it != d_receiveChannels.cend(); + ++it) { + + bsl::shared_ptr channel = it->second; + + if (respectHostHealth && + !channel->consumerConfig().consumeOnlyFromHealthyHost()) { + BALL_LOG_TRACE << "Channel " << it->first + << " is not host-health aware. Skipping resume."; + } + else { + channel->resume(); + } + } +} + bool ChannelMap::processReceived(uint16_t channelId, const rmqamqp::Message& message) { diff --git a/src/rmq/rmqamqp/rmqamqp_channelmap.h b/src/rmq/rmqamqp/rmqamqp_channelmap.h index 04f59735..ca0ddae3 100644 --- a/src/rmq/rmqamqp/rmqamqp_channelmap.h +++ b/src/rmq/rmqamqp/rmqamqp_channelmap.h @@ -79,6 +79,16 @@ class ChannelMap { return d_receiveChannels; } + // Calls `resume` on receive channels in the receive channel map + // if `respectHostHealth` is true, only resumes those channels whose + // consumer config has `consumeOnlyFromHealthyHost` set to true + void resumeReceiveChannels(const bool respectHostHealth); + + // Calls `pause` on receive channels in the receive channel map + // if `respectHostHealth` is true, only pauses those channels whose + // consumer config has `consumeOnlyFromHealthyHost` set to true + void pauseReceiveChannels(const bool respectHostHealth); + private: ChannelPtrMap d_channels; SendChannelMap d_sendChannels; diff --git a/src/rmq/rmqamqp/rmqamqp_connection.cpp b/src/rmq/rmqamqp/rmqamqp_connection.cpp index 0d116421..ac3bc46b 100644 --- a/src/rmq/rmqamqp/rmqamqp_connection.cpp +++ b/src/rmq/rmqamqp/rmqamqp_connection.cpp @@ -13,6 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "rmqamqp_channelfactory.h" #include #include @@ -39,7 +40,9 @@ #include #include #include +#include +#include #include #include #include @@ -1031,6 +1034,20 @@ bsl::string Connection::connectionDebugName() const return d_connectionName + ": " + d_endpoint->formatAddress(); } +void Connection::pauseReceiveChannels(const bool respectHostHealth) +{ + d_channelFactory->setReceiveChannelOnOpenState( + respectHostHealth ? ChannelFactory::PAUSED_HOST_HEALTH_AWARE + : ChannelFactory::PAUSED); + d_channels.pauseReceiveChannels(respectHostHealth); +} + +void Connection::resumeReceiveChannels(const bool respectHostHealth) +{ + d_channelFactory->setReceiveChannelOnOpenState(ChannelFactory::NOT_PAUSED); + d_channels.resumeReceiveChannels(respectHostHealth); +} + Connection::Factory::Factory( const bsl::shared_ptr& resolver, const bsl::shared_ptr& timerFactory, @@ -1038,7 +1055,8 @@ Connection::Factory::Factory( const bsl::shared_ptr& metricPublisher, const bsl::shared_ptr& connectionMonitor, const rmqt::FieldTable& clientProperties, - const bsl::optional& connectionErrorThreshold) + const bsl::optional& connectionErrorThreshold, + const bool isHostHealthMonitoringEnabled) : d_errorCb(errorCb) , d_clientProperties(clientProperties) , d_metricPublisher(metricPublisher) @@ -1046,6 +1064,7 @@ Connection::Factory::Factory( , d_timerFactory(timerFactory) , d_connectionMonitor(connectionMonitor) , d_connectionErrorThreshold(connectionErrorThreshold) +, d_isHostHealthMonitoringEnabled(isHostHealthMonitoringEnabled) { } @@ -1095,7 +1114,15 @@ Connection::Factory::newHeartBeatManager() bsl::shared_ptr Connection::Factory::newChannelFactory() { - return bsl::make_shared(); + // Set the initial state of receive channels based on whether host health + // monitoring is enabled. This state is later updated when health check has + // run successfully. + ChannelFactory::ChannelOnOpenState receiveChannelOnOpenState = + d_isHostHealthMonitoringEnabled + ? ChannelFactory::PAUSED_HOST_HEALTH_AWARE + : ChannelFactory::NOT_PAUSED; + + return bsl::make_shared(receiveChannelOnOpenState); } } // namespace rmqamqp diff --git a/src/rmq/rmqamqp/rmqamqp_connection.h b/src/rmq/rmqamqp/rmqamqp_connection.h index 49d84e66..f5dd8575 100644 --- a/src/rmq/rmqamqp/rmqamqp_connection.h +++ b/src/rmq/rmqamqp/rmqamqp_connection.h @@ -123,6 +123,16 @@ class Connection : public bsl::enable_shared_from_this, const rmqt::ConsumerConfig& config, const bsl::shared_ptr& ackQueue); + // Calls `resume` on receive channels in the channel map + // if `respectHostHealth` is true, only resumes those channels whose + // consumer config has `consumeOnlyFromHealthyHost` set to true + virtual void resumeReceiveChannels(const bool respectHostHealth); + + // Calls `pause` on receive channels in the channel map + // if `respectHostHealth` is true, only pauses those channels whose + // consumer config has `consumeOnlyFromHealthyHost` set to true + virtual void pauseReceiveChannels(const bool respectHostHealth); + /// Declares a new channel which can be used to publish messages /// The function must be called from the Connection's EventLoop thread. /// Calling this function when disconnected will successfully return a @@ -314,7 +324,8 @@ class Connection::Factory { const bsl::shared_ptr& metricPublisher, const bsl::shared_ptr& connectionMonitor, const rmqt::FieldTable& clientProperties, - const bsl::optional& connectionErrorThreshold); + const bsl::optional& connectionErrorThreshold, + const bool isHostHealthMonitoringEnabled); virtual ~Factory() {} @@ -339,6 +350,7 @@ class Connection::Factory { const bsl::shared_ptr d_timerFactory; const bsl::shared_ptr d_connectionMonitor; const bsl::optional d_connectionErrorThreshold; + const bool d_isHostHealthMonitoringEnabled; }; // class Connection::Factory } // namespace rmqamqp } // namespace BloombergLP diff --git a/src/rmq/rmqamqp/rmqamqp_heartbeatmanagerimpl.cpp b/src/rmq/rmqamqp/rmqamqp_heartbeatmanagerimpl.cpp index 1cf77b75..ae4616db 100644 --- a/src/rmq/rmqamqp/rmqamqp_heartbeatmanagerimpl.cpp +++ b/src/rmq/rmqamqp/rmqamqp_heartbeatmanagerimpl.cpp @@ -37,10 +37,8 @@ BALL_LOG_SET_NAMESPACE_CATEGORY("RMQAMQP.HEARTBEATMANAGERIMPL") HeartbeatManagerImpl::HeartbeatManagerImpl( const bsl::shared_ptr& timerFactory) : d_timeoutSeconds() -, d_tickTimer(timerFactory->createWithCallback( - bdlf::BindUtil::bind(&HeartbeatManagerImpl::handleTick, - this, - bdlf::PlaceHolders::_1))) +, d_timerFactory(timerFactory) +, d_tickTimer() , d_sendHeartbeat() , d_killConnection() , d_active(false) @@ -70,21 +68,40 @@ void HeartbeatManagerImpl::start( d_totalTicksUntilHeartBeat = d_timeoutSeconds / 2 / TICK_TIME; d_ticksUntilHeartBeat = d_totalTicksUntilHeartBeat; + d_tickTimer = d_timerFactory->createWithCallback( + bdlf::BindUtil::bind(&HeartbeatManagerImpl::handleTick, + weak_from_this(), + bdlf::PlaceHolders::_1)); startTickTimer(); } void HeartbeatManagerImpl::stop() { - d_tickTimer->cancel(); + if (d_tickTimer) { + d_tickTimer->cancel(); + d_tickTimer.reset(); + } d_active = false; } -void HeartbeatManagerImpl::handleTick(rmqio::Timer::InterruptReason reason) +void HeartbeatManagerImpl::handleTick( + const bsl::weak_ptr& weakSelf, + rmqio::Timer::InterruptReason reason) { if (reason == rmqio::Timer::CANCEL) { - // Cancelled return; } + + bsl::shared_ptr self = weakSelf.lock(); + if (!self) { + return; + } + + self->processTick(); +} + +void HeartbeatManagerImpl::processTick() +{ startTickTimer(); --d_ticksUntilHeartBeat; --d_ticksUntilDisconnect; diff --git a/src/rmq/rmqamqp/rmqamqp_heartbeatmanagerimpl.h b/src/rmq/rmqamqp/rmqamqp_heartbeatmanagerimpl.h index f127e6c5..b8e2eb5b 100644 --- a/src/rmq/rmqamqp/rmqamqp_heartbeatmanagerimpl.h +++ b/src/rmq/rmqamqp/rmqamqp_heartbeatmanagerimpl.h @@ -38,7 +38,9 @@ class Frame; } namespace rmqamqp { -class HeartbeatManagerImpl : public HeartbeatManager { +class HeartbeatManagerImpl +: public HeartbeatManager, + public bsl::enable_shared_from_this { public: typedef bsl::function HeartbeatCallback; typedef bsl::function ConnectionDeathCallback; @@ -67,13 +69,16 @@ class HeartbeatManagerImpl : public HeartbeatManager { HeartbeatManagerImpl& operator=(const HeartbeatManagerImpl&) BSLS_KEYWORD_DELETED; - void handleTick(rmqio::Timer::InterruptReason reason); + static void handleTick(const bsl::weak_ptr& weakSelf, + rmqio::Timer::InterruptReason reason); + void processTick(); void startTickTimer(); private: uint32_t d_timeoutSeconds; + bsl::shared_ptr d_timerFactory; bsl::shared_ptr d_tickTimer; HeartbeatCallback d_sendHeartbeat; ConnectionDeathCallback d_killConnection; diff --git a/src/rmq/rmqamqp/rmqamqp_hosthealthmonitor.cpp b/src/rmq/rmqamqp/rmqamqp_hosthealthmonitor.cpp new file mode 100644 index 00000000..0c568726 --- /dev/null +++ b/src/rmq/rmqamqp/rmqamqp_hosthealthmonitor.cpp @@ -0,0 +1,265 @@ +// Copyright 2025 Bloomberg Finance L.P. +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace BloombergLP { +namespace rmqamqp { + +namespace { +BALL_LOG_SET_NAMESPACE_CATEGORY("RMQAMQP.HOSTHEALTHMONITOR") + +const bool RESPECT_HOST_HEALTH = true; + +} // namespace + +HostHealthMonitor::HostHealthMonitor( + const rmqt::HostHealthConfig& hostHealthConfig, + rmqp::MetricPublisher* metricPublisher) +: d_hostHealthConfig(hostHealthConfig) +, d_currentTries(0) +, d_timer() +, d_metricPublisher(metricPublisher) +{ + BSLS_ASSERT(d_metricPublisher); + BSLS_ASSERT(d_hostHealthConfig.pollInterval() > 0); + BALL_LOG_INFO << "Monitoring host health using config " << hostHealthConfig + << " with health checks on the event loop"; +} + +HostHealthMonitor::~HostHealthMonitor() +{ + stop(); + d_connections.clear(); +} + +void HostHealthMonitor::start( + const bsl::shared_ptr& timerFactory) +{ + BSLS_ASSERT_OPT(!d_timer); + d_timer = timerFactory->createWithCallback( + bdlf::BindUtil::bind(&HostHealthMonitor::handleTimerFired, + weak_from_this(), + bdlf::PlaceHolders::_1)); + scheduleNextCheck(); +} + +void HostHealthMonitor::stop() +{ + if (d_timer) { + d_timer->cancel(); + d_timer.reset(); + } +} + +void HostHealthMonitor::registerConnection( + const bsl::weak_ptr& conn) +{ + d_connections.push_back(conn); + + d_metricPublisher->publishGauge(Metrics::HEALTH_AWARE_VHOSTS, + static_cast(d_connections.size())); +} + +void HostHealthMonitor::handleTimerFired( + const bsl::weak_ptr& weakSelf, + rmqio::Timer::InterruptReason reason) +{ + if (reason == rmqio::Timer::CANCEL) { + return; + } + + bsl::shared_ptr self = weakSelf.lock(); + if (!self) { + BALL_LOG_DEBUG << "HostHealthMonitor destroyed before timer fired"; + return; + } + + self->checkHealth(); +} + +void HostHealthMonitor::checkHealth() +{ + scheduleNextCheck(); + + HostHealth result; + + d_metricPublisher->publishCounter(Metrics::HEALTH_CHECK_TOTAL, 1.0); + + try { + const bsls::TimeInterval startTime = + bsls::SystemTime::nowMonotonicClock(); + + const bool healthCheckerResult = d_hostHealthConfig.healthChecker()(); + result = healthCheckerResult ? HEALTHY : UNHEALTHY; + + const bsls::TimeInterval endTime = + bsls::SystemTime::nowMonotonicClock(); + const double durationMs = (endTime - startTime).totalMilliseconds(); + + BALL_LOG_INFO << "Health check completed in " << durationMs + << " ms with result: " << healthCheckerResult + << ". Set health state to: " << result; + + d_metricPublisher->publishSummary(Metrics::HEALTH_CHECK_DURATION_MS, + durationMs); + + const double HEALTHCHECK_DURATION_REPORTING_THRESHOLD = + bsl::min(d_hostHealthConfig.pollInterval() * 1000.0 * 0.8, 1000.0); + if (durationMs > HEALTHCHECK_DURATION_REPORTING_THRESHOLD) { + BALL_LOG_WARN << "Host health check took " << durationMs + << " ms which exceeds the threshold of " + << HEALTHCHECK_DURATION_REPORTING_THRESHOLD << " ms."; + d_metricPublisher->publishCounter( + Metrics::HEALTH_CHECK_BLOCKED_EVENT_LOOP, 1.0); + } + } + catch (const bsl::exception& e) { + BALL_LOG_ERROR << "Host health check failed with exception: " + << e.what(); + result = RETRY; + + d_metricPublisher->publishCounter(Metrics::HEALTH_CHECK_FAILURES_TOTAL, + 1.0); + } + catch (...) { + BALL_LOG_ERROR << "Host health check failed with unknown exception."; + result = RETRY; + + d_metricPublisher->publishCounter(Metrics::HEALTH_CHECK_FAILURES_TOTAL, + 1.0); + } + + if (result == RETRY) { + if (d_currentTries++ > d_hostHealthConfig.maxRetriesOnFailure()) { + BALL_LOG_ERROR << "Exceeded max retries on failure of " + << d_hostHealthConfig.maxRetriesOnFailure() + << ". Marking host as UNHEALTHY."; + result = UNHEALTHY; + + d_metricPublisher->publishGauge( + Metrics::HEALTH_CHECK_CONSECUTIVE_FAILURES, + static_cast(d_currentTries)); + } + else { + BALL_LOG_WARN << "Current tries " << d_currentTries + << " do not exceed max retries on failure of " + << d_hostHealthConfig.maxRetriesOnFailure() + << ". Will retry after " + << d_hostHealthConfig.pollInterval() + << " seconds. Will NOT pause the consumers yet."; + + if (d_metricPublisher) { + d_metricPublisher->publishGauge( + Metrics::HEALTH_CHECK_CONSECUTIVE_FAILURES, + static_cast(d_currentTries), + bsl::vector >()); + } + return; + } + } + + processHealthResult(result); +} + +void HostHealthMonitor::processHealthResult(HostHealth result) +{ + d_currentTries = 0; + + BALL_LOG_DEBUG << (result == HEALTHY ? "Resuming" : "Pausing") + << " host health aware consumers."; + + const double statusValue = (result == HEALTHY) ? 1.0 : 0.0; + d_metricPublisher->publishGauge(Metrics::HEALTH_CHECK_STATUS, statusValue); + + if (result == HEALTHY) { + d_metricPublisher->publishGauge( + Metrics::HEALTH_CHECK_CONSECUTIVE_FAILURES, 0.0); + d_metricPublisher->publishCounter( + Metrics::HEALTH_TRIGGERED_RESUME_TOTAL, 1.0); + } + else { + d_metricPublisher->publishCounter(Metrics::HEALTH_TRIGGERED_PAUSE_TOTAL, + 1.0); + } + + for (bsl::list >::iterator conn = + d_connections.begin(); + conn != d_connections.end();) { + + bsl::shared_ptr connection = conn->lock(); + + if (!connection) { + BALL_LOG_DEBUG << "Remove destructed connection from host health " + "monitoring list."; + conn = d_connections.erase(conn); + + d_metricPublisher->publishGauge( + Metrics::HEALTH_AWARE_VHOSTS, + static_cast(d_connections.size())); + + continue; + } + + if (result == HEALTHY) { + connection->resumeReceiveChannels(RESPECT_HOST_HEALTH); + } + else { + connection->pauseReceiveChannels(RESPECT_HOST_HEALTH); + } + + ++conn; + } +} + +void HostHealthMonitor::scheduleNextCheck() +{ + d_timer->reset(bsls::TimeInterval(d_hostHealthConfig.pollInterval())); +} + +bsl::ostream& operator<<(bsl::ostream& os, + HostHealthMonitor::HostHealth hostHealth) +{ + switch (hostHealth) { + case HostHealthMonitor::HEALTHY: + os << "HEALTHY"; + break; + case HostHealthMonitor::UNHEALTHY: + os << "UNHEALTHY"; + break; + case HostHealthMonitor::RETRY: + os << "RETRY"; + break; + default: + os << "UNKNOWN"; + break; // defensive fallback + } + return os; +} + +} // namespace rmqamqp +} // namespace BloombergLP diff --git a/src/rmq/rmqamqp/rmqamqp_hosthealthmonitor.h b/src/rmq/rmqamqp/rmqamqp_hosthealthmonitor.h new file mode 100644 index 00000000..d3d06f7f --- /dev/null +++ b/src/rmq/rmqamqp/rmqamqp_hosthealthmonitor.h @@ -0,0 +1,104 @@ +// Copyright 2025 Bloomberg Finance L.P. +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef INCLUDED_RMQA_HOSTHEALTHMONITOR +#define INCLUDED_RMQA_HOSTHEALTHMONITOR + +#include +#include +#include +#include + +#include +#include +#include + +namespace BloombergLP { +namespace rmqamqp { + +/// \brief Monitors host health and pauses/resumes consumers accordingly +/// +/// HostHealthMonitor periodically checks host health using a user-provided +/// health checker function. When the host becomes unhealthy, it pauses +/// message delivery on consumers that have \c consumeOnlyFromHealthyHost +/// enabled. When the host recovers, it resumes message delivery. +/// +/// The health checker function runs on the main event loop and should +/// complete promptly, as it will delay AMQP heartbeats and message delivery +/// for all connections by its execution time every \c pollInterval seconds. +/// Each check schedules the next one via a timer (self-rescheduling pattern). +class HostHealthMonitor +: public bsl::enable_shared_from_this { + public: + /// Health state of the host + /// HEALTHY: Host is healthy + /// UNHEALTHY: Host is unhealthy + /// RETRY: Host health checker failed due to some exception. Will retry + /// `HostHealthConfig::maxRetriesOnFailure()` times before + /// marking host as UNHEALTHY + enum HostHealth { HEALTHY = 0, UNHEALTHY = 1, RETRY = 2 }; + + // CREATORS + + /// Construct a HostHealthMonitor with the given configuration. + /// + /// \param hostHealthConfig Configuration for health checking + /// \param metricPublisher Metric publisher for health check metrics. + /// The behavior is undefined unless the supplied + /// metric publisher remains valid for the lifetime + /// of this object. + HostHealthMonitor(const rmqt::HostHealthConfig& hostHealthConfig, + rmqp::MetricPublisher* metricPublisher); + + ~HostHealthMonitor(); + + /// Start the health monitoring timer. The first health check will fire + /// after one poll interval. + void start(const bsl::shared_ptr& timerFactory); + + /// Stop the health monitoring timer. + void stop(); + + /// Register a connection to be notified about host health changes. + /// When the host becomes unhealthy, the connection's receive channels + /// will be paused. When the host recovers, they will be resumed. + void + registerConnection(const bsl::weak_ptr& connection); + + friend bsl::ostream& operator<<(bsl::ostream& os, HostHealth hostHealth); + + private: + static void + handleTimerFired(const bsl::weak_ptr& weakSelf, + rmqio::Timer::InterruptReason reason); + void checkHealth(); + void processHealthResult(HostHealth result); + void scheduleNextCheck(); + + // DATA + rmqt::HostHealthConfig d_hostHealthConfig; + bsl::list > d_connections; + unsigned int d_currentTries; + bsl::shared_ptr d_timer; + rmqp::MetricPublisher* d_metricPublisher; +}; + +bsl::ostream& operator<<(bsl::ostream& os, + HostHealthMonitor::HostHealth hostHealth); + +} // namespace rmqamqp +} // namespace BloombergLP + +#endif diff --git a/src/rmq/rmqamqp/rmqamqp_metrics.cpp b/src/rmq/rmqamqp/rmqamqp_metrics.cpp index b8c0ffe8..646a4261 100644 --- a/src/rmq/rmqamqp/rmqamqp_metrics.cpp +++ b/src/rmq/rmqamqp/rmqamqp_metrics.cpp @@ -20,5 +20,39 @@ namespace rmqamqp { const char* Metrics::VHOST_TAG = "rmqVhostName"; const char* Metrics::CHANNELTYPE_TAG = "rmqChannelType"; + +// Pause/Resume metrics +const char* Metrics::CONSUMER_PAUSE_TOTAL = "consumer_pause_total"; +const char* Metrics::CONSUMER_RESUME_TOTAL = "consumer_resume_total"; +const char* Metrics::PAUSE_OPERATION_LATENCY_SECONDS = + "pause_operation_latency_seconds"; +const char* Metrics::RESUME_OPERATION_LATENCY_SECONDS = + "resume_operation_latency_seconds"; + +// Health polling metrics +const char* Metrics::HEALTH_CHECK_STATUS = "health_check_status"; +const char* Metrics::HEALTH_CHECK_TOTAL = "health_check_total"; +const char* Metrics::HEALTH_CHECK_FAILURES_TOTAL = + "health_check_failures_total"; +const char* Metrics::HEALTH_CHECK_CONSECUTIVE_FAILURES = + "health_check_consecutive_failures"; +const char* Metrics::HEALTH_CHECK_DURATION_MS = "health_check_duration_ms"; +const char* Metrics::HEALTH_TRIGGERED_PAUSE_TOTAL = + "health_triggered_pause_total"; +const char* Metrics::HEALTH_TRIGGERED_RESUME_TOTAL = + "health_triggered_resume_total"; + +const char* Metrics::HEALTH_CHECK_BLOCKED_EVENT_LOOP = + "health_check_blocked_event_loop"; + +// Host Health awareness metrics +const char* Metrics::HEALTH_AWARE_VHOST_CREATED = "health_aware_vhost_created"; +const char* Metrics::HEALTH_UNAWARE_VHOST_CREATED = + "health_unaware_vhost_created"; +const char* Metrics::HEALTH_AWARE_CONSUMER_CREATED = + "health_aware_consumer_created"; +const char* Metrics::HEALTH_UNAWARE_CONSUMER_CREATED = + "health_unaware_consumer_created"; +const char* Metrics::HEALTH_AWARE_VHOSTS = "health_aware_vhosts"; } // namespace rmqamqp } // namespace BloombergLP diff --git a/src/rmq/rmqamqp/rmqamqp_metrics.h b/src/rmq/rmqamqp/rmqamqp_metrics.h index e4f7e11d..f4aa658d 100644 --- a/src/rmq/rmqamqp/rmqamqp_metrics.h +++ b/src/rmq/rmqamqp/rmqamqp_metrics.h @@ -30,6 +30,29 @@ class Metrics { static const char* NAMESPACE; static const char* VHOST_TAG; static const char* CHANNELTYPE_TAG; + + // Pause/Resume metrics + static const char* CONSUMER_PAUSE_TOTAL; + static const char* CONSUMER_RESUME_TOTAL; + static const char* PAUSE_OPERATION_LATENCY_SECONDS; + static const char* RESUME_OPERATION_LATENCY_SECONDS; + + // Health polling metrics + static const char* HEALTH_CHECK_STATUS; + static const char* HEALTH_CHECK_TOTAL; + static const char* HEALTH_CHECK_FAILURES_TOTAL; + static const char* HEALTH_CHECK_CONSECUTIVE_FAILURES; + static const char* HEALTH_CHECK_DURATION_MS; + static const char* HEALTH_TRIGGERED_PAUSE_TOTAL; + static const char* HEALTH_TRIGGERED_RESUME_TOTAL; + + // Host Health awareness metrics + static const char* HEALTH_AWARE_VHOST_CREATED; + static const char* HEALTH_UNAWARE_VHOST_CREATED; + static const char* HEALTH_AWARE_CONSUMER_CREATED; + static const char* HEALTH_UNAWARE_CONSUMER_CREATED; + static const char* HEALTH_CHECK_BLOCKED_EVENT_LOOP; + static const char* HEALTH_AWARE_VHOSTS; }; } // namespace rmqamqp diff --git a/src/rmq/rmqamqp/rmqamqp_receivechannel.cpp b/src/rmq/rmqamqp/rmqamqp_receivechannel.cpp index aff45011..b61cc05e 100644 --- a/src/rmq/rmqamqp/rmqamqp_receivechannel.cpp +++ b/src/rmq/rmqamqp/rmqamqp_receivechannel.cpp @@ -16,6 +16,7 @@ // rmqamqp_receivechannel.cpp -*-C++-*- #include +#include #include #include #include @@ -29,11 +30,13 @@ #include #include #include +#include #include #include #include #include +#include #include namespace BloombergLP { @@ -44,11 +47,39 @@ BALL_LOG_SET_NAMESPACE_CATEGORY("RMQAMQP.RECEIVECHANNEL") using bdlf::PlaceHolders::_1; using bdlf::PlaceHolders::_2; using bdlf::PlaceHolders::_3; + +/// Publish latency metric if start time is valid (non-zero). +/// Calculates elapsed time from start time to now and publishes as a +/// summary metric, then resets the start time to zero. +void publishLatencyIfTracked( + bsls::TimeInterval* startTime, + const bsl::string& metricName, + rmqp::MetricPublisher* metricPublisher, + const bsl::vector >& tags) +{ + if (startTime->totalSecondsAsDouble() > 0.0) { + const bsls::TimeInterval endTime = + bsls::SystemTime::nowMonotonicClock(); + const double latencySeconds = + (endTime - *startTime).totalSecondsAsDouble(); + metricPublisher->publishSummary(metricName, latencySeconds, tags); + *startTime = bsls::TimeInterval(); + } +} + } // namespace class ReceiveChannel::Consumer { public: - enum State { NOT_CONSUMING, STARTING, CONSUMING, CANCELLING, CANCELLED }; + enum State { + NOT_CONSUMING, + STARTING, + CONSUMING, + PAUSING, + PAUSED, + CANCELLING, + CANCELLED, + }; bsl::optional consume(const rmqt::ConsumerConfig& consumerConfig); bsl::optional consumeOk(); @@ -57,7 +88,7 @@ class ReceiveChannel::Consumer { size_t lifetimeId); /// Signal the server to stop delivering - bsl::optional cancel(); + bsl::optional cancel(const bool pause = false); /// Server acknowledged cancel void cancelOk(); @@ -66,11 +97,20 @@ class ReceiveChannel::Consumer { Consumer(const bsl::shared_ptr& queue, const MessageCallback& onNewMessage, - const bsl::string& tag); + const bsl::string& tag, + const bool channelPausedOnOpen); bool isStarted() const; bool isActive() const; + bool isPaused() const; + bool isPausing() const; + bool isCancelling() const; + + void handleCancelAfterPause(); + void setPaused(); + bool shouldRestart() const; + bool canPause() const; void reset(); @@ -86,8 +126,9 @@ class ReceiveChannel::Consumer { ReceiveChannel::Consumer::Consumer(const bsl::shared_ptr& queue, const MessageCallback& onNewMessage, - const bsl::string& consumerTag) -: d_state(NOT_CONSUMING) + const bsl::string& consumerTag, + const bool channelPausedOnOpen) +: d_state(channelPausedOnOpen ? PAUSED : NOT_CONSUMING) , d_tag(consumerTag) , d_queue(queue) , d_onNewMessage(onNewMessage) @@ -114,6 +155,11 @@ bsl::optional ReceiveChannel::Consumer::consumeOk() << "Received ConsumeOk for a cancelled consumer, cancelling"; method = rmqamqpt::BasicCancel(d_tag); } + else if (d_state == PAUSING) { + // Consumer has been paused since sending Basic.Consume + BALL_LOG_INFO << "Received ConsumeOk for a paused consumer, pausing"; + method = rmqamqpt::BasicCancel(d_tag); + } else { BALL_LOG_ERROR << "Unexpected ConsumeOK, consumer-tag: " << d_tag << ", state: " << d_state << ", ignoring"; @@ -136,12 +182,48 @@ bool ReceiveChannel::Consumer::isStarted() const { return d_state != NOT_CONSUMING; } + bool ReceiveChannel::Consumer::isActive() const { return d_state == CONSUMING; } + +bool ReceiveChannel::Consumer::isPaused() const { return d_state == PAUSED; } + +bool ReceiveChannel::Consumer::isPausing() const { return d_state == PAUSING; } + +bool ReceiveChannel::Consumer::isCancelling() const +{ + return d_state == CANCELLING; +} + +void ReceiveChannel::Consumer::handleCancelAfterPause() +{ + switch (d_state) { + case PAUSING: + BALL_LOG_TRACE + << "Transitioning consumer from PAUSING to CANCELLING: " + << d_tag; + d_state = CANCELLING; + break; + case PAUSED: + BALL_LOG_TRACE + << "Transitioning consumer from PAUSED to CANCELLED: " << d_tag; + d_state = CANCELLED; + break; + default: + BALL_LOG_WARN << "Cancel not received after pause, ignoring for " + << d_tag << " in state " << d_state; + break; + } +} + +void ReceiveChannel::Consumer::setPaused() { d_state = PAUSED; } + bool ReceiveChannel::Consumer::shouldRestart() const { return d_state < CANCELLING; } +bool ReceiveChannel::Consumer::canPause() const { return d_state <= CONSUMING; } + void ReceiveChannel::Consumer::process(const rmqt::Message& msg, const rmqamqpt::BasicDeliver& deliver, size_t lifetimeId) @@ -177,59 +259,80 @@ ReceiveChannel::Consumer::consume(const rmqt::ConsumerConfig& consumerConfig) { bsl::optional method; - if (d_state == NOT_CONSUMING) { - BALL_LOG_INFO << "Starting consumer: " << d_tag - << " for queue: " << d_queue->name(); - d_state = STARTING; - - const bool noLocal = false; - const bool noAck = false; - const bool exclusive = - consumerConfig.exclusiveFlag() == rmqt::Exclusive::ON; - const bool noWait = false; - - method = - rmqamqpt::BasicConsume(d_queue->name(), - d_tag, - getBasicConsumeArguments(consumerConfig), - noLocal, - noAck, - exclusive, - noWait); - } - else { - BALL_LOG_ERROR << "Start called in state " << d_state << ", ignoring"; + if (d_state != NOT_CONSUMING && d_state != PAUSED) { + BALL_LOG_ERROR << "Consume called in state " << d_state << ", ignoring"; + return method; } + BALL_LOG_INFO << (d_state == PAUSED ? "Resuming" : "Starting") + << " consumer: " << d_tag + << " for queue: " << d_queue->name(); + + d_state = STARTING; + + const bool noLocal = false; + const bool noAck = false; + const bool exclusive = + consumerConfig.exclusiveFlag() == rmqt::Exclusive::ON; + const bool noWait = false; + + method = rmqamqpt::BasicConsume(d_queue->name(), + d_tag, + getBasicConsumeArguments(consumerConfig), + noLocal, + noAck, + exclusive, + noWait); + return method; } -bsl::optional ReceiveChannel::Consumer::cancel() +bsl::optional +ReceiveChannel::Consumer::cancel(const bool pause) { bsl::optional method; - if (d_state == CONSUMING) { - BALL_LOG_INFO << "Cancelling consumer: " << d_tag - << " for queue: " << d_queue->name(); - method = rmqamqpt::BasicCancel(d_tag); + if (!pause && d_state == PAUSING) { + BALL_LOG_WARN << "Cancel called in PAUSING state, queue: " + << queueName(); + d_state = CANCELLING; } - else { - BALL_LOG_WARN - << "Cancel called in a state other than CONSUMING. State: " - << d_state << ", queue: " << queueName(); + + if (d_state != CONSUMING) { + BALL_LOG_WARN << (pause ? "Pause" : "Cancel") + << " called in state other than CONSUMING. State: " + << d_state << ", queue: " << queueName(); + + // Consumer was cancelled/paused before it was started + // It will be cancelled/paused when consumeOk is received + d_state = pause ? PAUSING : CANCELLING; + return method; } - d_state = CANCELLING; + BALL_LOG_INFO << (pause ? "Pausing" : "Cancelling") + << " consumer: " << d_tag + << " for queue: " << d_queue->name(); + method = rmqamqpt::BasicCancel(d_tag); + + d_state = pause ? PAUSING : CANCELLING; return method; } void ReceiveChannel::Consumer::cancelOk() { - if (d_state != CANCELLING) { - BALL_LOG_ERROR << "Received CancelOk without sending Cancel consumer: [" - << d_tag << "]"; + switch (d_state) { + case CANCELLING: + d_state = CANCELLED; + break; + case PAUSING: + d_state = PAUSED; + break; + default: + BALL_LOG_ERROR + << "Received CancelOk without sending Cancel consumer: [" + << d_tag << "]"; + break; } - d_state = CANCELLED; } ReceiveChannel::Consumer::~Consumer() {} @@ -244,7 +347,8 @@ ReceiveChannel::ReceiveChannel( const bsl::string& vhost, const bsl::shared_ptr& ackQueue, const bsl::shared_ptr& hungProgressTimer, - const HungChannelCallback& connErrorCb) + const HungChannelCallback& connErrorCb, + const bool channelPausedOnOpen) : Channel(topology, messageSender, retryHandler, @@ -260,13 +364,26 @@ ReceiveChannel::ReceiveChannel( , d_multipleAckHandler( bdlf::BindUtil::bind(&ReceiveChannel::sendAck, this, _1, _2), bdlf::BindUtil::bind(&ReceiveChannel::sendNack, this, _1, _2, _3)) +, d_consumeFuturePair() , d_cancelFuturePair() , d_drainFuture() +, d_channelPausedOnOpen(channelPausedOnOpen) +, d_pauseStartTime() +, d_resumeStartTime() { } +const rmqt::ConsumerConfig& ReceiveChannel::consumerConfig() const +{ + return d_consumerConfig; +} + void ReceiveChannel::onOpen() { + if (d_channelPausedOnOpen) { + BALL_LOG_INFO << "Channel will be opened in paused state"; + } + BALL_LOG_TRACE << "Setting Prefetch: " << d_consumerConfig.prefetchCount(); d_hungProgressTimer->reset( bsls::TimeInterval(Channel::k_HUNG_CHANNEL_TIMER_SEC)); @@ -297,7 +414,12 @@ void ReceiveChannel::onReset() // processFailures() clears d_messageStore; if (d_consumer) { - if (d_consumer->shouldRestart()) { + if (d_consumer->isPaused()) { + d_channelPausedOnOpen = true; + BALL_LOG_TRACE << "On reset, keeping the consumer in paused state " + "so it can be resumed later "; + } + else if (d_consumer->shouldRestart()) { BALL_LOG_TRACE << "Resetting consumer"; d_consumer->reset(); } @@ -306,6 +428,12 @@ void ReceiveChannel::onReset() d_consumer.reset(); } } + if (d_consumeFuturePair) { + d_consumeFuturePair->first( + rmqt::Result<>("Consume could not be processed by the server, " + "due channel reset")); + d_consumeFuturePair.reset(); + } if (d_cancelFuturePair) { d_cancelFuturePair->first( rmqt::Result<>("Cancel could not be processed by the server, " @@ -332,8 +460,14 @@ rmqt::Result<> ReceiveChannel::consume(const rmqt::QueueHandle& queue, BSLS_ASSERT(!d_consumer); - d_consumer = - bsl::make_shared(queuePtr, onNewMessage, consumerTag); + d_consumer = bsl::make_shared( + queuePtr, onNewMessage, consumerTag, d_channelPausedOnOpen); + + if (d_consumer->isPaused()) { + BALL_LOG_INFO << "Consume called on paused channel, consumerTag: " + << consumerTag << ", awaiting resume"; + return rmqt::Result<>(); + } if (state() == READY || state() == AWAITING_REPLY) { restartConsumers(); @@ -347,8 +481,61 @@ rmqt::Result<> ReceiveChannel::consume(const rmqt::QueueHandle& queue, return rmqt::Result<>(); } +rmqt::Future<> ReceiveChannel::resume() +{ + d_channelPausedOnOpen = false; + + if (d_consumeFuturePair) { + BALL_LOG_WARN << "Resume called with a resume already in flight"; + return d_consumeFuturePair->second; + } + + if (!d_consumer) { + return rmqt::Future<>( + rmqt::Result<>("Resume called with no existing consumer")); + } + + if (d_cancelFuturePair) { + return rmqt::Future<>(rmqt::Result<>( + "Resume called with a " + + bsl::string(d_consumer->isPausing() ? "pause" : "cancel") + + " already in flight")); + } + + if (!d_consumer->isPaused()) { + return rmqt::Future<>( + rmqt::Result<>("Resume called on unpaused consumer")); + } + + if (state() == READY || state() == AWAITING_REPLY) { + // Track resume start time and increment counter + d_resumeStartTime = bsls::SystemTime::nowMonotonicClock(); + d_metricPublisher->publishCounter( + Metrics::CONSUMER_RESUME_TOTAL, 1.0, getMetricTags()); + + d_consumeFuturePair = + bslma::ManagedPtrUtil::makeManaged::Pair>( + rmqt::Future<>::make()); + + restartConsumers(); + + return d_consumeFuturePair->second; + } + + BALL_LOG_TRACE << "Resume called before channel ready, consumertag: " + << d_consumer->consumerTag() << " pending"; + + return rmqt::Future<>(rmqt::Result<>("Resume called before channel ready")); +} + ReceiveChannel::~ReceiveChannel() { + if (d_consumeFuturePair) { + d_consumeFuturePair->first( + rmqt::Result<>("ReceiveChannel Shut Down Before ConsumeOk " + "Received, consumer not started")); + d_consumeFuturePair.reset(); + } if (d_cancelFuturePair) { d_cancelFuturePair->first( rmqt::Result<>("ReceiveChannel Shut Down Before CancelOk Received, " @@ -405,31 +592,98 @@ bool ReceiveChannel::consumerIsActive() const return d_consumer && d_consumer->isActive(); } +bool ReceiveChannel::consumerIsPaused() const +{ + return d_consumer && d_consumer->isPaused(); +} + rmqt::Future<> ReceiveChannel::cancel() { if (d_cancelFuturePair) { - BALL_LOG_WARN << "Cancel called with a cancel already in " - "flight"; + if (d_consumer->isPausing()) { + BALL_LOG_WARN << "Cancel called with a pause already in " + "flight"; + + // If consumer is pausing, we can just turn that into a cancel. + // When the pause completes, consumer will be in a cancelled state. + // Consumer will be reset by the cancelOk processing. + d_consumer->handleCancelAfterPause(); + } + else { + BALL_LOG_WARN << "Cancel called with a cancel already in " + "flight"; + } return d_cancelFuturePair->second; } - if (d_consumer) { - if (d_consumer->isStarted()) { - d_cancelFuturePair = - bslma::ManagedPtrUtil::makeManaged::Pair>( - rmqt::Future<>::make()); - bsl::optional method = d_consumer->cancel(); - if (method) { - writeMessage(Message(rmqamqpt::Method(method.value())), - AWAITING_REPLY); - } - return d_cancelFuturePair->second; + + if (!d_consumer) { + return rmqt::Future<>( + rmqt::Result<>("Cancel called with no active consumer")); + } + + if (d_consumer->isPaused()) { + // Consumer is already cancelled, just transition to cancelled state + d_consumer->handleCancelAfterPause(); + } + else if (d_consumer->isStarted()) { + d_cancelFuturePair = + bslma::ManagedPtrUtil::makeManaged::Pair>( + rmqt::Future<>::make()); + + bsl::optional method = d_consumer->cancel(); + + if (method) { + writeMessage(Message(rmqamqpt::Method(method.value())), + AWAITING_REPLY); + } + return d_cancelFuturePair->second; + } + + d_consumer.reset(); + return rmqt::Future<>(rmqt::Result<>()); +} + +rmqt::Future<> ReceiveChannel::pause() +{ + d_channelPausedOnOpen = true; + + if (d_cancelFuturePair) { + if (d_consumer->isCancelling()) { + return rmqt::Future<>( + rmqt::Result<>("Pause called with a cancel already in flight")); + } + + BALL_LOG_WARN << "Pause called with a pause already in flight"; + return d_cancelFuturePair->second; + } + + if (!d_consumer) { + return rmqt::Future<>( + rmqt::Result<>("Pause called with no active consumer")); + } + + if (d_consumer->canPause()) { + // Track pause start time and increment counter + d_pauseStartTime = bsls::SystemTime::nowMonotonicClock(); + d_metricPublisher->publishCounter( + Metrics::CONSUMER_PAUSE_TOTAL, 1.0, getMetricTags()); + + d_cancelFuturePair = + bslma::ManagedPtrUtil::makeManaged::Pair>( + rmqt::Future<>::make()); + + const bool pause = true; + bsl::optional method = d_consumer->cancel(pause); + + if (method) { + writeMessage(Message(rmqamqpt::Method(method.value())), + AWAITING_REPLY); } - // Not consuming so we're already done - d_consumer.reset(); - return rmqt::Future<>(rmqt::Result<>()); + return d_cancelFuturePair->second; } - return rmqt::Future<>( - rmqt::Result<>("Cancel called, with no active consumer")); + + BALL_LOG_DEBUG << "Pause called when consumer is already paused"; + return rmqt::Future<>(rmqt::Result<>()); } rmqt::Future<> ReceiveChannel::drain() @@ -499,11 +753,24 @@ void ReceiveChannel::processBasicMethod(const rmqamqpt::BasicMethod& basic) writeMessage(Message(rmqamqpt::Method(method.value())), AWAITING_REPLY); } + + if (d_consumeFuturePair) { + // Publish resume latency + publishLatencyIfTracked( + &d_resumeStartTime, + Metrics::RESUME_OPERATION_LATENCY_SECONDS, + d_metricPublisher.get(), + getMetricTags()); + + d_consumeFuturePair->first(rmqt::Result<>()); + d_consumeFuturePair.reset(); + } } else { invalidConsumerError( basic.the().consumerTag()); } + if (state() != READY) { if (consumerIsActive()) { ready(); @@ -512,12 +779,25 @@ void ReceiveChannel::processBasicMethod(const rmqamqpt::BasicMethod& basic) } break; case rmqamqpt::BasicQoSOk::METHOD_ID: { if (d_consumer) { - // we've restarted, we already have a callback so can declare - // consumer - restartConsumers(); + if (d_channelPausedOnOpen) { + d_consumer->setPaused(); + } + + if (d_consumer->isPaused()) { + // we've paused, so cancel the timer and wait for a resume + // and mark the channel ready + d_hungProgressTimer->cancel(); + ready(); + } + else { + // we've restarted, we already have a callback so can + // declare consumer + restartConsumers(); + } } else { - // This is the first connection, we don't have a callback + // This is the first connection, or recovery from connection + // drop during pause, we don't have a callback // registered so need to wait for the consume() call. ready(); } @@ -541,10 +821,33 @@ void ReceiveChannel::processBasicMethod(const rmqamqpt::BasicMethod& basic) d_consumer->consumerTag() == basic.the().consumerTag()) { d_consumer->cancelOk(); - BALL_LOG_INFO << "Stopping Consumer: " - << d_consumer->consumerTag(); - d_consumer.reset(); + + if (d_consumer->isPaused()) { + BALL_LOG_INFO << "Pausing Consumer: " + << d_consumer->consumerTag(); + } + else { + BALL_LOG_INFO << "Stopping Consumer: " + << d_consumer->consumerTag(); + // Reset the consumer if it is not paused + // so it can be restarted with new tag + d_consumer.reset(); + } + + if (d_consumeFuturePair) { + d_consumeFuturePair->first( + rmqt::Result<>("Consumer cancelled before it could be " + "started")); + d_consumeFuturePair.reset(); + } if (d_cancelFuturePair) { + // Publish pause latency + publishLatencyIfTracked( + &d_pauseStartTime, + Metrics::PAUSE_OPERATION_LATENCY_SECONDS, + d_metricPublisher.get(), + getMetricTags()); + d_cancelFuturePair->first(rmqt::Result<>()); d_cancelFuturePair.reset(); } @@ -579,7 +882,7 @@ void ReceiveChannel::processMessage(const rmqt::Message& message) d_consumer->consumerTag() == d_nextMessage->consumerTag()) { d_metricPublisher->publishCounter( - "received_messages", 1, d_vhostTags); + "received_messages", 1, getMetricTags()); d_consumer->process(message, *d_nextMessage, lifetimeId()); d_nextMessage.reset(); @@ -596,9 +899,11 @@ void ReceiveChannel::restartConsumers() if (d_consumer) { bsl::optional method = d_consumer->consume(d_consumerConfig); + if (method) { d_hungProgressTimer->reset( bsls::TimeInterval(Channel::k_HUNG_CHANNEL_TIMER_SEC)); + writeMessage(Message(rmqamqpt::Method(method.value())), AWAITING_REPLY); } @@ -633,7 +938,7 @@ void ReceiveChannel::removeSingleMessageFromStore(uint64_t deliveryTag) d_metricPublisher->publishDistribution( "acknowledge_latency", (bdlt::CurrentTime::utc() - insertTime).totalSecondsAsDouble(), - d_vhostTags); + getMetricTags()); } void ReceiveChannel::removeMultipleMessagesFromStore(uint64_t deliveryTag) @@ -649,7 +954,7 @@ void ReceiveChannel::removeMultipleMessagesFromStore(uint64_t deliveryTag) d_metricPublisher->publishDistribution( "acknowledge_latency", (bdlt::CurrentTime::utc() - insertTime).totalSecondsAsDouble(), - d_vhostTags); + getMetricTags()); } } @@ -713,6 +1018,12 @@ void ReceiveChannel::processFailures() const char* ReceiveChannel::channelType() const { return "Consumer"; } +bsl::vector > +ReceiveChannel::getMetricTags() const +{ + return d_vhostTags; +} + bsl::string ReceiveChannel::channelDebugName() const { bsl::string consumerSummary; diff --git a/src/rmq/rmqamqp/rmqamqp_receivechannel.h b/src/rmq/rmqamqp/rmqamqp_receivechannel.h index f3aa0710..6ec2bce4 100644 --- a/src/rmq/rmqamqp/rmqamqp_receivechannel.h +++ b/src/rmq/rmqamqp/rmqamqp_receivechannel.h @@ -46,7 +46,10 @@ #include #include #include +#include +#include #include +#include namespace BloombergLP { namespace rmqamqp { @@ -77,7 +80,10 @@ class ReceiveChannel : public Channel { const bsl::string& vhost, const bsl::shared_ptr& ackQueue, const bsl::shared_ptr& hungProgressTimer, - const HungChannelCallback& connErrorCb); + const HungChannelCallback& connErrorCb, + const bool channelPausedOnOpen); + + const rmqt::ConsumerConfig& consumerConfig() const; /// validates the queue handle references a queue in the channel topology, // starts the consumer (basic.consume) on that queue, with consumer tag @@ -86,16 +92,28 @@ class ReceiveChannel : public Channel { const MessageCallback& onNewMessage, const bsl::string& consumerTag = ""); + /// Resumes the paused consumer on the channel + /// returns Future that will resolve when ConsumeOk received from server + virtual rmqt::Future<> resume(); + virtual ~ReceiveChannel() BSLS_KEYWORD_OVERRIDE; virtual void consumeAckBatchFromQueue(); + /// Returns true if there is an active consumer for the channel virtual bool consumerIsActive() const; + /// Returns true if the consumer is in paused state + virtual bool consumerIsPaused() const; + /// Cancels the active consumer on the channel /// returns Future that will resolve when CancelOk received from server virtual rmqt::Future<> cancel(); + /// Pauses the active consumer on the channel + /// returns Future that will resolve when CancelOk received from server + virtual rmqt::Future<> pause(); + /// If the channel is in a cancelled state, waits for number of the /// messages in the message store to reach 0 before resolving the future, /// if the channel is not in a cancelled state then the Future will resolve @@ -146,14 +164,21 @@ class ReceiveChannel : public Channel { ReceiveChannel(const ReceiveChannel& copy) BSLS_KEYWORD_DELETED; ReceiveChannel& operator=(const ReceiveChannel&) BSLS_KEYWORD_DELETED; + /// Get tags including vhost, consumer tag, and queue name + bsl::vector > getMetricTags() const; + rmqt::ConsumerConfig d_consumerConfig; bsl::shared_ptr d_consumer; bslma::ManagedPtr d_nextMessage; rmqamqp::MessageStore d_messageStore; bsl::shared_ptr d_ackQueue; MultipleAckHandler d_multipleAckHandler; + bslma::ManagedPtr::Pair> d_consumeFuturePair; bslma::ManagedPtr::Pair> d_cancelFuturePair; bslma::ManagedPtr::Maker> d_drainFuture; + bool d_channelPausedOnOpen; + bsls::TimeInterval d_pauseStartTime; + bsls::TimeInterval d_resumeStartTime; }; } // namespace rmqamqp diff --git a/src/rmq/rmqp/rmqp_metricpublisher.h b/src/rmq/rmqp/rmqp_metricpublisher.h index 34d49793..600e51e5 100644 --- a/src/rmq/rmqp/rmqp_metricpublisher.h +++ b/src/rmq/rmqp/rmqp_metricpublisher.h @@ -39,29 +39,33 @@ class MetricPublisher { virtual ~MetricPublisher(); /// Publish a gauge - the most recently observed value of a variable. - virtual void publishGauge( - const bsl::string& name, - double value, - const bsl::vector >& tags) = 0; + virtual void + publishGauge(const bsl::string& name, + double value, + const bsl::vector >& tags = + bsl::vector >()) = 0; /// Publish an increment to a counter variable. virtual void publishCounter( const bsl::string& name, double value, - const bsl::vector >& tags) = 0; + const bsl::vector >& tags = + bsl::vector >()) = 0; /// Publish a value for basic summary statistics. virtual void publishSummary( const bsl::string& name, double value, - const bsl::vector >& tags) = 0; + const bsl::vector >& tags = + bsl::vector >()) = 0; /// Publish a value for distribution statistics. A distribution is similar /// to a summary but also includes quantile statistics. virtual void publishDistribution( const bsl::string& name, double value, - const bsl::vector >& tags) = 0; + const bsl::vector >& tags = + bsl::vector >()) = 0; }; } // namespace rmqp diff --git a/src/rmq/rmqt/CMakeLists.txt b/src/rmq/rmqt/CMakeLists.txt index 610f5273..5513863a 100644 --- a/src/rmq/rmqt/CMakeLists.txt +++ b/src/rmq/rmqt/CMakeLists.txt @@ -12,6 +12,7 @@ add_library(rmqt OBJECT rmqt_exchangetype.cpp rmqt_fieldvalue.cpp rmqt_future.cpp + rmqt_hosthealthconfig.cpp rmqt_message.cpp rmqt_mutualsecurityparameters.cpp rmqt_plaincredentials.cpp diff --git a/src/rmq/rmqt/rmqt_consumerconfig.cpp b/src/rmq/rmqt/rmqt_consumerconfig.cpp index d083ef1a..307a737b 100644 --- a/src/rmq/rmqt/rmqt_consumerconfig.cpp +++ b/src/rmq/rmqt/rmqt_consumerconfig.cpp @@ -34,6 +34,7 @@ ConsumerConfig::ConsumerConfig( , d_exclusiveFlag(exclusiveFlag) , d_consumerPriority(consumerPriority) , d_transformers(bsl::vector >()) +, d_consumeOnlyFromHealthyHost(true) { } diff --git a/src/rmq/rmqt/rmqt_consumerconfig.h b/src/rmq/rmqt/rmqt_consumerconfig.h index b3ef999a..b0ee8702 100644 --- a/src/rmq/rmqt/rmqt_consumerconfig.h +++ b/src/rmq/rmqt/rmqt_consumerconfig.h @@ -94,6 +94,11 @@ class ConsumerConfig { return d_transformers; } + bool consumeOnlyFromHealthyHost() const + { + return d_consumeOnlyFromHealthyHost; + } + // Setters /// \param consumerTag A label for the consumer which is displayed on the /// RabbitMQ Management UI. It is useful to give this a meaningful @@ -163,6 +168,24 @@ class ConsumerConfig { return *this; } + /// \param consumeOnlyFromHealthyHost If true, the consumer will only + /// consume messages when connected from a healthy host as determined + /// by the host health monitoring configuration set at the context + /// level. Consumers connecting from unhealthy host will pause + /// message delivery until the host is deemed healthy. + /// By default, this flag is true but only takes effect if + /// \c RabbitContextOptions::setHostHealthConfig has been called on + /// the context; otherwise this setting has no effect. + /// Set this to false to opt out of host health monitoring for + /// this consumer even when host health monitoring is enabled at + /// the context level. + ConsumerConfig& + setConsumeOnlyFromHealthyHost(const bool consumeOnlyFromHealthyHost) + { + d_consumeOnlyFromHealthyHost = consumeOnlyFromHealthyHost; + return *this; + } + private: bsl::string d_consumerTag; uint16_t d_prefetchCount; @@ -170,6 +193,7 @@ class ConsumerConfig { rmqt::Exclusive::Value d_exclusiveFlag; bsl::optional d_consumerPriority; bsl::vector > d_transformers; + bool d_consumeOnlyFromHealthyHost; }; } // namespace rmqt diff --git a/src/rmq/rmqt/rmqt_hosthealthconfig.cpp b/src/rmq/rmqt/rmqt_hosthealthconfig.cpp new file mode 100644 index 00000000..efc506bb --- /dev/null +++ b/src/rmq/rmqt/rmqt_hosthealthconfig.cpp @@ -0,0 +1,52 @@ +// Copyright 2025 Bloomberg Finance L.P. +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include + +namespace BloombergLP { +namespace rmqt { +namespace { +BALL_LOG_SET_NAMESPACE_CATEGORY("RMQT.HOSTHEALTHCONFIG"); +} // namespace + +const uint16_t HostHealthConfig::s_defaultPollInterval = 1; +const uint16_t HostHealthConfig::s_defaultMaxRetriesOnFailure = 5; + +HostHealthConfig::HostHealthConfig(const HealthCheckerFunction& healthChecker, + const uint16_t pollInterval, + const uint16_t maxRetriesOnFailure) +: d_healthChecker(healthChecker) +, d_pollInterval(pollInterval) +, d_maxRetriesOnFailure(maxRetriesOnFailure) +{ +} + +HostHealthConfig::~HostHealthConfig() {} + +bsl::ostream& operator<<(bsl::ostream& os, + const HostHealthConfig& hostHealthConfig) +{ + os << "HostHealthConfig[pollInterval=" << hostHealthConfig.d_pollInterval + << ", maxRetriesOnFailure=" << hostHealthConfig.d_maxRetriesOnFailure + << "]"; + return os; +} + +} // namespace rmqt +} // namespace BloombergLP diff --git a/src/rmq/rmqt/rmqt_hosthealthconfig.h b/src/rmq/rmqt/rmqt_hosthealthconfig.h new file mode 100644 index 00000000..75d90968 --- /dev/null +++ b/src/rmq/rmqt/rmqt_hosthealthconfig.h @@ -0,0 +1,97 @@ +// Copyright 2025 Bloomberg Finance L.P. +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// rmqt_hosthealthconfig.h +#ifndef INCLUDED_RMQT_HOSTHEALTHCONFIG +#define INCLUDED_RMQT_HOSTHEALTHCONFIG + +#include +#include + +namespace BloombergLP { +namespace rmqt { + +/// \brief Class for passing arguments to HostHealthMonitor +/// +/// This class provides configuration for host health monitoring. When set via +/// \c RabbitContextOptions::setHostHealthConfig, a HostHealthMonitor is created +/// that periodically checks host health. Consumers with +/// \c ConsumerConfig::consumeOnlyFromHealthyHost enabled (the default) will +/// pause message delivery when the host is unhealthy and resume when healthy. +class HostHealthConfig { + public: + typedef bsl::function HealthCheckerFunction; + + // Creators + /// \param healthChecker A function that returns `true` if the host is + /// healthy, `false` if unhealthy. If the function throws, the + /// check is retried up to `maxRetriesOnFailure` times before + /// marking the host as unhealthy. This function runs on the main + /// event loop thread and should complete promptly, as it will + /// delay AMQP heartbeats and message delivery for all connections + /// by its execution time every `pollInterval` seconds. + /// \param pollInterval The interval (in seconds) between each + /// health check + /// \param maxRetriesOnFailure The maximum number of retries on failure + /// (exception) before marking host as unhealthy + explicit HostHealthConfig( + const HealthCheckerFunction& healthChecker, + const uint16_t pollInterval = s_defaultPollInterval, + const uint16_t maxRetriesOnFailure = s_defaultMaxRetriesOnFailure); + + ~HostHealthConfig(); + + // Getters + HealthCheckerFunction healthChecker() const { return d_healthChecker; } + + uint16_t pollInterval() const { return d_pollInterval; } + + uint16_t maxRetriesOnFailure() const { return d_maxRetriesOnFailure; } + + // Setters + /// \param pollInterval The interval (in seconds) between each health check + HostHealthConfig& setPollInterval(const uint16_t pollInterval) + { + d_pollInterval = pollInterval; + return *this; + } + + /// \param maxRetriesOnFailure The maximum number of retries on failure + /// before marking host as unhealthy + HostHealthConfig& setMaxRetriesOnFailure(const uint16_t maxRetriesOnFailure) + { + d_maxRetriesOnFailure = maxRetriesOnFailure; + return *this; + } + + friend bsl::ostream& operator<<(bsl::ostream& os, + const HostHealthConfig& hostHealthConfig); + + private: + static const uint16_t s_defaultPollInterval; + static const uint16_t s_defaultMaxRetriesOnFailure; + + HealthCheckerFunction d_healthChecker; + uint16_t d_pollInterval; + uint16_t d_maxRetriesOnFailure; +}; + +bsl::ostream& operator<<(bsl::ostream& os, + const HostHealthConfig& hostHealthConfig); + +} // namespace rmqt +} // namespace BloombergLP + +#endif diff --git a/src/tests/rmqa/rmqa_connectionimpl.t.cpp b/src/tests/rmqa/rmqa_connectionimpl.t.cpp index 901be131..b9ab6eee 100644 --- a/src/tests/rmqa/rmqa_connectionimpl.t.cpp +++ b/src/tests/rmqa/rmqa_connectionimpl.t.cpp @@ -17,7 +17,9 @@ #include #include +#include #include +#include #include #include @@ -37,8 +39,6 @@ #include #include -#include -#include using namespace BloombergLP; using namespace rmqamqp; diff --git a/src/tests/rmqa/rmqa_rabbitcontextimpl.t.cpp b/src/tests/rmqa/rmqa_rabbitcontextimpl.t.cpp index 7e28fcad..3817f5b9 100644 --- a/src/tests/rmqa/rmqa_rabbitcontextimpl.t.cpp +++ b/src/tests/rmqa/rmqa_rabbitcontextimpl.t.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -36,6 +37,10 @@ using namespace BloombergLP; using namespace ::testing; using namespace bdlf::PlaceHolders; +namespace { +bool alwaysHealthy() { return true; } +} // namespace + class RabbitContextImplTests : public Test { protected: bdlmt::ThreadPool* d_pool; @@ -64,7 +69,8 @@ class RabbitContextImplTests : public Test { .setMetricPublisher(d_metricPublisher) .setMessageProcessingTimeout(d_messageProcessingTimeout) .setThreadpool(d_pool) - .setConnectionErrorThreshold(d_connectionErrorThreshold)) + .setConnectionErrorThreshold(d_connectionErrorThreshold) + .setHostHealthConfig(rmqt::HostHealthConfig(alwaysHealthy))) { ON_CALL(*d_mockEventLoop, isStarted()).WillByDefault(Return(false)); ON_CALL(*d_mockEventLoop, resolver(false)) @@ -82,7 +88,7 @@ class RabbitContextImplTests : public Test { { EXPECT_CALL(*d_mockEventLoop, resolver(false)).Times(1); EXPECT_CALL(*d_mockEventLoop, timerFactory()) - .Times(2) + .Times(3) .WillRepeatedly(Return(d_mockTimerFactory)); EXPECT_CALL(*d_mockEventLoop, waitForEventLoopExit(_)).Times(1); diff --git a/src/tests/rmqa/rmqa_rabbitcontextoptions.t.cpp b/src/tests/rmqa/rmqa_rabbitcontextoptions.t.cpp index 0dbb7375..be5a477d 100644 --- a/src/tests/rmqa/rmqa_rabbitcontextoptions.t.cpp +++ b/src/tests/rmqa/rmqa_rabbitcontextoptions.t.cpp @@ -15,17 +15,41 @@ #include +#include + #include using namespace BloombergLP; using namespace rmqa; using namespace ::testing; +namespace { +bool alwaysHealthy() { return true; } +} // namespace + TEST(RabbitContextOptions, Constructs) { rmqa::RabbitContextOptions t; } TEST(RabbitContextOptions, Defaults) { rmqa::RabbitContextOptions t; EXPECT_FALSE(t.metricPublisher()); EXPECT_FALSE(t.threadpool()); + EXPECT_FALSE(t.hostHealthConfig().has_value()); t.errorCallback()("heres an error", -1); } + +TEST(RabbitContextOptions, SetHostHealthConfig) +{ + rmqa::RabbitContextOptions options; + + // Initially not set + EXPECT_FALSE(options.hostHealthConfig().has_value()); + + // Create a health checker function + rmqt::HostHealthConfig config(alwaysHealthy); + + // Set host health config + options.setHostHealthConfig(config); + + // Now it should be set + EXPECT_TRUE(options.hostHealthConfig().has_value()); +} diff --git a/src/tests/rmqamqp/CMakeLists.txt b/src/tests/rmqamqp/CMakeLists.txt index f895cb8a..6a5429ff 100644 --- a/src/tests/rmqamqp/CMakeLists.txt +++ b/src/tests/rmqamqp/CMakeLists.txt @@ -7,6 +7,7 @@ add_executable(rmqamqp_tests rmqamqp_contentmaker.t.cpp rmqamqp_framer.t.cpp rmqamqp_heartbeatmanagerimpl.t.cpp + rmqamqp_hosthealthmonitor.t.cpp rmqamqp_messagestore.t.cpp rmqamqp_multipleackhandler.t.cpp rmqamqp_receivechannel.t.cpp diff --git a/src/tests/rmqamqp/rmqamqp_channelmap.t.cpp b/src/tests/rmqamqp/rmqamqp_channelmap.t.cpp index ce53be91..f8f98adc 100644 --- a/src/tests/rmqamqp/rmqamqp_channelmap.t.cpp +++ b/src/tests/rmqamqp/rmqamqp_channelmap.t.cpp @@ -15,10 +15,11 @@ #include -#include - #include #include +#include +#include +#include #include #include @@ -192,3 +193,95 @@ TEST(ChannelMap, GetReceiveChannels) map.associateChannel(1, bsl::shared_ptr(channel)); EXPECT_EQ(map.getReceiveChannels(), m); } + +TEST(ChannelMap, PauseReceiveChannels) +{ + rmqamqp::ChannelMap map; + + rmqt::ConsumerConfig consumerConfig; + consumerConfig.setConsumeOnlyFromHealthyHost(false); + + rmqt::ConsumerConfig consumerConfigHostHealthAware; + + bsl::shared_ptr channel = + bsl::make_shared( + bsl::make_shared(), + bsl::make_shared(), + bsl::make_shared(), + consumerConfig); + + bsl::shared_ptr channelHostHealthAware = + bsl::make_shared( + bsl::make_shared(), + bsl::make_shared(), + bsl::make_shared(), + consumerConfigHostHealthAware); + + rmqamqp::ChannelMap::ReceiveChannelMap m; + m[1] = channel; + m[2] = channelHostHealthAware; + map.associateChannel(1, bsl::shared_ptr(channel)); + map.associateChannel( + 2, bsl::shared_ptr(channelHostHealthAware)); + + EXPECT_CALL(*channel, pause()).Times(0); + EXPECT_CALL(*channelHostHealthAware, pause()) + .WillOnce(Return(rmqt::Future<>(rmqt::Result<>()))); + + bool respectHostHealth = true; + map.pauseReceiveChannels(respectHostHealth); + + EXPECT_CALL(*channel, pause()) + .WillOnce(Return(rmqt::Future<>(rmqt::Result<>()))); + EXPECT_CALL(*channelHostHealthAware, pause()) + .WillOnce(Return(rmqt::Future<>(rmqt::Result<>()))); + + respectHostHealth = false; + map.pauseReceiveChannels(respectHostHealth); +} + +TEST(ChannelMap, ResumeReceiveChannels) +{ + rmqamqp::ChannelMap map; + + rmqt::ConsumerConfig consumerConfig; + consumerConfig.setConsumeOnlyFromHealthyHost(false); + + rmqt::ConsumerConfig consumerConfigHostHealthAware; + + bsl::shared_ptr channel = + bsl::make_shared( + bsl::make_shared(), + bsl::make_shared(), + bsl::make_shared(), + consumerConfig); + + bsl::shared_ptr channelHostHealthAware = + bsl::make_shared( + bsl::make_shared(), + bsl::make_shared(), + bsl::make_shared(), + consumerConfigHostHealthAware); + + rmqamqp::ChannelMap::ReceiveChannelMap m; + m[1] = channel; + m[2] = channelHostHealthAware; + map.associateChannel(1, bsl::shared_ptr(channel)); + map.associateChannel( + 2, bsl::shared_ptr(channelHostHealthAware)); + + EXPECT_CALL(*channel, resume()).Times(0); + EXPECT_CALL(*channelHostHealthAware, resume()) + .WillOnce(Return(rmqt::Future<>(rmqt::Result<>()))); + + bool respectHostHealth = true; + map.resumeReceiveChannels(respectHostHealth); + + EXPECT_CALL(*channel, resume()) + .WillOnce(Return(rmqt::Future<>(rmqt::Result<>()))); + EXPECT_CALL(*channelHostHealthAware, resume()) + .WillOnce(Return(rmqt::Future<>(rmqt::Result<>()))); + + respectHostHealth = false; + map.resumeReceiveChannels(respectHostHealth); +} diff --git a/src/tests/rmqamqp/rmqamqp_connection.t.cpp b/src/tests/rmqamqp/rmqamqp_connection.t.cpp index 7f76774a..23f75bf7 100644 --- a/src/tests/rmqamqp/rmqamqp_connection.t.cpp +++ b/src/tests/rmqamqp/rmqamqp_connection.t.cpp @@ -340,7 +340,8 @@ class ConnectionFactory : public rmqamqp::Connection::Factory { metricPublisher, bsl::make_shared(), clientProperties, - bsls::TimeInterval()) + bsls::TimeInterval(), + false) , d_retryHandler(retryHandler) , d_hbManager(hbManager) , d_channelFactory(channelFactory) diff --git a/src/tests/rmqamqp/rmqamqp_heartbeatmanagerimpl.t.cpp b/src/tests/rmqamqp/rmqamqp_heartbeatmanagerimpl.t.cpp index 1bbfecc9..1bd0b664 100644 --- a/src/tests/rmqamqp/rmqamqp_heartbeatmanagerimpl.t.cpp +++ b/src/tests/rmqamqp/rmqamqp_heartbeatmanagerimpl.t.cpp @@ -49,17 +49,19 @@ class HeartbeatManager : public Test { TEST_F(HeartbeatManager, Construct) { - rmqamqp::HeartbeatManagerImpl hbManager(d_timerFactory); + bsl::shared_ptr hbManager = + bsl::make_shared(d_timerFactory); } TEST_F(HeartbeatManager, HeartbeatSendTriggersFirst) { const uint32_t TIMEOUT_SEC = 4; - rmqamqp::HeartbeatManagerImpl hbManager(d_timerFactory); - hbManager.start(TIMEOUT_SEC, - rmqtestutil::CallCount(&d_heartbeatCallCount), - rmqtestutil::CallCount(&d_connectionKilledCount)); + bsl::shared_ptr hbManager = + bsl::make_shared(d_timerFactory); + hbManager->start(TIMEOUT_SEC, + rmqtestutil::CallCount(&d_heartbeatCallCount), + rmqtestutil::CallCount(&d_connectionKilledCount)); // Move time 2 seconds into the future tick(2); @@ -72,11 +74,12 @@ TEST_F(HeartbeatManager, StopCancelsTimers) { const uint32_t TIMEOUT_SEC = 4; - rmqamqp::HeartbeatManagerImpl hbManager(d_timerFactory); - hbManager.start(TIMEOUT_SEC, - rmqtestutil::CallCount(&d_heartbeatCallCount), - rmqtestutil::CallCount(&d_connectionKilledCount)); - hbManager.stop(); + bsl::shared_ptr hbManager = + bsl::make_shared(d_timerFactory); + hbManager->start(TIMEOUT_SEC, + rmqtestutil::CallCount(&d_heartbeatCallCount), + rmqtestutil::CallCount(&d_connectionKilledCount)); + hbManager->stop(); tick(8); @@ -89,10 +92,11 @@ TEST_F(HeartbeatManager, DisconnectTriggersLateForFirstHeartbeat) { const uint32_t TIMEOUT_SEC = 4; - rmqamqp::HeartbeatManagerImpl hbManager(d_timerFactory); - hbManager.start(TIMEOUT_SEC, - rmqtestutil::CallCount(&d_heartbeatCallCount), - rmqtestutil::CallCount(&d_connectionKilledCount)); + bsl::shared_ptr hbManager = + bsl::make_shared(d_timerFactory); + hbManager->start(TIMEOUT_SEC, + rmqtestutil::CallCount(&d_heartbeatCallCount), + rmqtestutil::CallCount(&d_connectionKilledCount)); // Trigger < 3.7.11 behaviour (disconnect after timeout * 2) @@ -109,11 +113,12 @@ TEST_F(HeartbeatManager, DisconnectTriggersAfterFirstHeartbeat) { const uint32_t TIMEOUT_SEC = 4; - rmqamqp::HeartbeatManagerImpl hbManager(d_timerFactory); - hbManager.start(TIMEOUT_SEC, - rmqtestutil::CallCount(&d_heartbeatCallCount), - rmqtestutil::CallCount(&d_connectionKilledCount)); - hbManager.notifyHeartbeatReceived(); + bsl::shared_ptr hbManager = + bsl::make_shared(d_timerFactory); + hbManager->start(TIMEOUT_SEC, + rmqtestutil::CallCount(&d_heartbeatCallCount), + rmqtestutil::CallCount(&d_connectionKilledCount)); + hbManager->notifyHeartbeatReceived(); // Trigger 3.7.11+ behaviour (disconnect after timeout) tick(4); @@ -131,24 +136,25 @@ TEST_F(HeartbeatManager, NotifySendStopsSendHeartbeat) { const uint32_t TIMEOUT_SEC = 4; - rmqamqp::HeartbeatManagerImpl hbManager(d_timerFactory); + bsl::shared_ptr hbManager = + bsl::make_shared(d_timerFactory); // Trigger 3.7.11+ behaviour (disconnect after timeout*2) - hbManager.start(TIMEOUT_SEC, - rmqtestutil::CallCount(&d_heartbeatCallCount), - rmqtestutil::CallCount(&d_connectionKilledCount)); - hbManager.notifyHeartbeatReceived(); + hbManager->start(TIMEOUT_SEC, + rmqtestutil::CallCount(&d_heartbeatCallCount), + rmqtestutil::CallCount(&d_connectionKilledCount)); + hbManager->notifyHeartbeatReceived(); tick(1); - hbManager.notifyMessageSent(); + hbManager->notifyMessageSent(); tick(1); - hbManager.notifyMessageSent(); + hbManager->notifyMessageSent(); // Advance to kill timeout tick(1); - hbManager.notifyMessageSent(); + hbManager->notifyMessageSent(); tick(1); - hbManager.notifyMessageSent(); + hbManager->notifyMessageSent(); // Ensure no callbacks were hit yet EXPECT_THAT(d_heartbeatCallCount, Eq(0)); EXPECT_THAT(d_connectionKilledCount, Eq(0)); @@ -161,21 +167,22 @@ TEST_F(HeartbeatManager, NotifyRecieveStopsDisconnect) { const uint32_t TIMEOUT_SEC = 4; - rmqamqp::HeartbeatManagerImpl hbManager(d_timerFactory); - hbManager.start(TIMEOUT_SEC, - rmqtestutil::CallCount(&d_heartbeatCallCount), - rmqtestutil::CallCount(&d_connectionKilledCount)); + bsl::shared_ptr hbManager = + bsl::make_shared(d_timerFactory); + hbManager->start(TIMEOUT_SEC, + rmqtestutil::CallCount(&d_heartbeatCallCount), + rmqtestutil::CallCount(&d_connectionKilledCount)); tick(2); EXPECT_THAT(d_heartbeatCallCount, Eq(1)); - hbManager.notifyMessageReceived(); + hbManager->notifyMessageReceived(); tick(2); EXPECT_THAT(d_heartbeatCallCount, Eq(2)); - hbManager.notifyMessageReceived(); + hbManager->notifyMessageReceived(); tick(2); EXPECT_THAT(d_heartbeatCallCount, Eq(3)); diff --git a/src/tests/rmqamqp/rmqamqp_hosthealthmonitor.t.cpp b/src/tests/rmqamqp/rmqamqp_hosthealthmonitor.t.cpp new file mode 100644 index 00000000..f8df67d8 --- /dev/null +++ b/src/tests/rmqamqp/rmqamqp_hosthealthmonitor.t.cpp @@ -0,0 +1,684 @@ +// Copyright 2020-2023 Bloomberg Finance L.P. +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include +#include + +using namespace BloombergLP; +using namespace rmqamqp; +using namespace ::testing; + +namespace { +class MockConnection : public rmqamqp::Connection { + public: + MockConnection( + const bsl::shared_ptr& resolver, + const bsl::shared_ptr& retryHandler, + const bsl::shared_ptr& hbManager, + const bsl::shared_ptr& hungTimerFactory, + const bsl::shared_ptr& channelFactory, + const bsl::shared_ptr& metricPublisher, + const bsl::shared_ptr& endpoint, + const bsl::shared_ptr& credentials, + const rmqt::FieldTable& clientProperties, + const bsl::string& connectionName) + : rmqamqp::Connection(resolver, + retryHandler, + hbManager, + hungTimerFactory, + channelFactory, + metricPublisher, + endpoint, + credentials, + clientProperties, + connectionName) + { + } + + MOCK_METHOD1(pauseReceiveChannels, void(bool)); + MOCK_METHOD1(resumeReceiveChannels, void(bool)); +}; +} // namespace + +class HostHealthMonitorTests : public Test { + public: + struct ConfigurableHealthChecker { + bool d_nextResult; + bool d_throwBslException; + bool d_throwUnknown; + int d_sleepMicroseconds; + + ConfigurableHealthChecker() + : d_nextResult(true) + , d_throwBslException(false) + , d_throwUnknown(false) + , d_sleepMicroseconds(0) + { + } + + bool operator()() + { + if (d_sleepMicroseconds > 0) { + bslmt::ThreadUtil::microSleep(d_sleepMicroseconds); + } + if (d_throwBslException) { + throw bsl::runtime_error("checker failure"); + } + if (d_throwUnknown) { + throw 42; + } + return d_nextResult; + } + }; + + bsl::shared_ptr d_resolver; + bsl::shared_ptr d_timerFactory; + rmqt::ErrorCallback d_onError; + bsl::shared_ptr d_retryHandler; + bsl::shared_ptr d_hb; + bsl::shared_ptr d_channelFactory; + bsl::shared_ptr d_metricPublisher; + bsl::shared_ptr d_endpoint; + bsl::shared_ptr d_credentials; + + ConfigurableHealthChecker d_configurableHealthChecker; + + rmqt::HostHealthConfig d_config; + bsl::shared_ptr d_monitor; + bsl::shared_ptr d_connection; + + HostHealthMonitorTests() + : d_resolver(bsl::make_shared()) + , d_timerFactory(bsl::make_shared()) + , d_onError() + , d_retryHandler(bsl::make_shared( + d_timerFactory, + d_onError, + bsl::make_shared())) + , d_hb(new rmqamqp::HeartbeatManagerImpl(d_timerFactory)) + , d_channelFactory(bsl::make_shared()) + , d_metricPublisher(bsl::make_shared()) + , d_endpoint(new rmqt::SimpleEndpoint("", "")) + , d_credentials(new rmqt::PlainCredentials("", "")) + , d_config(makeConfig()) + , d_monitor() + , d_connection(makeConnection("host-health-connection")) + { + d_monitor = bsl::make_shared( + d_config, d_metricPublisher.get()); + d_monitor->start(d_timerFactory); + + // Expect gauge metrics (health_aware_vhosts, health_check_status, etc.) + EXPECT_CALL(*d_metricPublisher, publishGauge(_, _, _)) + .Times(AtLeast(0)); + + d_monitor->registerConnection( + bsl::weak_ptr(d_connection)); + } + + ~HostHealthMonitorTests() {} + + void stepOnePollInterval() + { + d_timerFactory->step_time(bsls::TimeInterval(d_config.pollInterval())); + } + + /// Expect metrics for a health check that throws exception (RETRY state) + /// consecutiveFailures is the expected value for the consecutive failures + /// gauge + void expectRetryCheckMetrics(double consecutiveFailures) + { + EXPECT_CALL( + *d_metricPublisher, + publishCounter( + bsl::string(rmqamqp::Metrics::HEALTH_CHECK_TOTAL), 1.0, _)) + .Times(1); + EXPECT_CALL( + *d_metricPublisher, + publishCounter( + bsl::string(rmqamqp::Metrics::HEALTH_CHECK_FAILURES_TOTAL), + 1.0, + _)) + .Times(1); + EXPECT_CALL( + *d_metricPublisher, + publishGauge( + bsl::string( + rmqamqp::Metrics::HEALTH_CHECK_CONSECUTIVE_FAILURES), + consecutiveFailures, + _)) + .Times(1); + EXPECT_CALL(*d_metricPublisher, + publishCounter(bsl::string("disconnect_events"), _, _)) + .Times(AnyNumber()); + } + + void stepAndClear() + { + stepOnePollInterval(); + Mock::VerifyAndClearExpectations(d_connection.get()); + } + + bsl::shared_ptr makeConnection(const bsl::string& name) + { + rmqt::FieldTable clientProps; + clientProps["connection_name"] = rmqt::FieldValue(name); + + bsl::shared_ptr > mc = + bsl::make_shared >(d_resolver, + d_retryHandler, + d_hb, + d_timerFactory, + d_channelFactory, + d_metricPublisher, + d_endpoint, + d_credentials, + clientProps, + name); + + return mc; + } + + rmqt::HostHealthConfig makeConfig() + { + const uint16_t pollIntervalSeconds = 1; + const uint16_t maxRetriesOnFailure = 3; + + return rmqt::HostHealthConfig( + bdlf::BindUtil::bind(&ConfigurableHealthChecker::operator(), + &d_configurableHealthChecker), + pollIntervalSeconds, + maxRetriesOnFailure); + } +}; + +TEST_F(HostHealthMonitorTests, HealthyHostResumesConnections) +{ + d_configurableHealthChecker.d_nextResult = true; + + EXPECT_CALL(*d_connection, resumeReceiveChannels(true)).Times(1); + EXPECT_CALL(*d_connection, pauseReceiveChannels(_)).Times(0); + + stepOnePollInterval(); +} + +TEST_F(HostHealthMonitorTests, UnhealthyHostPausesConnections) +{ + d_configurableHealthChecker.d_nextResult = false; + + EXPECT_CALL(*d_connection, pauseReceiveChannels(true)).Times(1); + EXPECT_CALL(*d_connection, resumeReceiveChannels(_)).Times(0); + + stepOnePollInterval(); +} + +TEST_F(HostHealthMonitorTests, ExpiredConnectionIsRemovedAndNotUsed) +{ + bsl::shared_ptr liveConn = + makeConnection("connection-live"); + bsl::shared_ptr deadConn = + makeConnection("connection-dead"); + + bsl::shared_ptr monitor = + bsl::make_shared(d_config, d_metricPublisher.get()); + monitor->start(d_timerFactory); + + // Expect gauge metrics on registration and cleanup + EXPECT_CALL(*d_metricPublisher, publishGauge(_, _, _)).Times(AtLeast(0)); + + monitor->registerConnection(bsl::weak_ptr(liveConn)); + monitor->registerConnection(bsl::weak_ptr(deadConn)); + + bsl::weak_ptr weakDead = deadConn; + deadConn.reset(); + ASSERT_TRUE(weakDead.expired()); + + d_configurableHealthChecker.d_nextResult = true; + + EXPECT_CALL(*liveConn, resumeReceiveChannels(true)).Times(1); + EXPECT_CALL(*liveConn, pauseReceiveChannels(_)).Times(0); + + d_timerFactory->step_time(bsls::TimeInterval(d_config.pollInterval())); +} + +TEST_F(HostHealthMonitorTests, BslExceptionRetriesUntilMaxThenPauses) +{ + d_configurableHealthChecker.d_throwBslException = true; + + // For first attempt and d_maxRetriesOnFailure retries, hostHealth == RETRY. + // No pause/resume expected. + for (unsigned short i = 0; i <= d_config.maxRetriesOnFailure(); ++i) { + EXPECT_CALL(*d_connection, pauseReceiveChannels(_)).Times(0); + EXPECT_CALL(*d_connection, resumeReceiveChannels(_)).Times(0); + + stepAndClear(); + } + + // Next run: getHostHealth should return UNHEALTHY, and run() should pause. + EXPECT_CALL(*d_connection, pauseReceiveChannels(true)).Times(1); + EXPECT_CALL(*d_connection, resumeReceiveChannels(_)).Times(0); + + stepOnePollInterval(); +} + +TEST_F(HostHealthMonitorTests, UnknownExceptionRetriesUntilMaxThenPauses) +{ + d_configurableHealthChecker.d_throwUnknown = true; + + for (unsigned short i = 0; i <= d_config.maxRetriesOnFailure(); ++i) { + EXPECT_CALL(*d_connection, pauseReceiveChannels(_)).Times(0); + EXPECT_CALL(*d_connection, resumeReceiveChannels(_)).Times(0); + + stepAndClear(); + } + + EXPECT_CALL(*d_connection, pauseReceiveChannels(true)).Times(1); + EXPECT_CALL(*d_connection, resumeReceiveChannels(_)).Times(0); + + stepOnePollInterval(); +} + +TEST_F(HostHealthMonitorTests, + BslExceptionThenHealthyMarksHostHealthyAndResumes) +{ + d_configurableHealthChecker.d_throwBslException = true; + + // First two runs: RETRY, no pause/resume. + for (unsigned short i = 0; i < 2; ++i) { + EXPECT_CALL(*d_connection, pauseReceiveChannels(_)).Times(0); + EXPECT_CALL(*d_connection, resumeReceiveChannels(_)).Times(0); + + stepAndClear(); + } + + // Now checker stops throwing and returns healthy. + d_configurableHealthChecker.d_throwBslException = false; + d_configurableHealthChecker.d_nextResult = true; + + EXPECT_CALL(*d_connection, resumeReceiveChannels(true)).Times(1); + EXPECT_CALL(*d_connection, pauseReceiveChannels(_)).Times(0); + + stepOnePollInterval(); +} + +TEST_F(HostHealthMonitorTests, + BslExceptionThenUnhealthyMarksHostUnhealthyAndPauses) +{ + d_configurableHealthChecker.d_throwBslException = true; + + // First run: RETRY, no pause/resume. + EXPECT_CALL(*d_connection, pauseReceiveChannels(_)).Times(0); + EXPECT_CALL(*d_connection, resumeReceiveChannels(_)).Times(0); + + stepAndClear(); + + // Now checker stops throwing and returns false (unhealthy). + d_configurableHealthChecker.d_throwBslException = false; + d_configurableHealthChecker.d_nextResult = false; + + EXPECT_CALL(*d_connection, pauseReceiveChannels(true)).Times(1); + EXPECT_CALL(*d_connection, resumeReceiveChannels(_)).Times(0); + + stepOnePollInterval(); +} + +TEST_F(HostHealthMonitorTests, DestructionWhileTimerPending) +{ + bsl::shared_ptr monitor = + bsl::make_shared(d_config, d_metricPublisher.get()); + monitor->start(d_timerFactory); + + EXPECT_CALL(*d_metricPublisher, publishGauge(_, _, _)).Times(AtLeast(0)); + + bsl::shared_ptr conn = makeConnection("destroy-test"); + monitor->registerConnection(bsl::weak_ptr(conn)); + + monitor.reset(); + + d_timerFactory->step_time(bsls::TimeInterval(d_config.pollInterval())); + d_timerFactory->step_time(bsls::TimeInterval(d_config.pollInterval())); +} + +TEST_F(HostHealthMonitorTests, StopPreventsFurtherCallbacks) +{ + d_configurableHealthChecker.d_nextResult = true; + + EXPECT_CALL(*d_connection, resumeReceiveChannels(true)).Times(1); + stepAndClear(); + + d_monitor->stop(); + + EXPECT_CALL(*d_connection, resumeReceiveChannels(_)).Times(0); + EXPECT_CALL(*d_connection, pauseReceiveChannels(_)).Times(0); + + d_timerFactory->step_time(bsls::TimeInterval(d_config.pollInterval())); + d_timerFactory->step_time(bsls::TimeInterval(d_config.pollInterval())); +} + +TEST_F(HostHealthMonitorTests, StartAfterStopOnSameInstance) +{ + d_configurableHealthChecker.d_nextResult = true; + + EXPECT_CALL(*d_connection, resumeReceiveChannels(true)).Times(1); + stepAndClear(); + + d_monitor->stop(); + + EXPECT_CALL(*d_connection, resumeReceiveChannels(_)).Times(0); + EXPECT_CALL(*d_connection, pauseReceiveChannels(_)).Times(0); + d_timerFactory->step_time(bsls::TimeInterval(d_config.pollInterval())); + Mock::VerifyAndClearExpectations(d_connection.get()); + + d_monitor->start(d_timerFactory); + + EXPECT_CALL(*d_connection, resumeReceiveChannels(true)).Times(1); + EXPECT_CALL(*d_connection, pauseReceiveChannels(_)).Times(0); + + d_timerFactory->step_time(bsls::TimeInterval(d_config.pollInterval())); +} + +TEST_F(HostHealthMonitorTests, HostHealthStreamingOperator) +{ + { + bsl::ostringstream oss; + oss << HostHealthMonitor::HEALTHY; + EXPECT_EQ(oss.str(), "HEALTHY"); + } + { + bsl::ostringstream oss; + oss << HostHealthMonitor::UNHEALTHY; + EXPECT_EQ(oss.str(), "UNHEALTHY"); + } + { + bsl::ostringstream oss; + oss << HostHealthMonitor::RETRY; + EXPECT_EQ(oss.str(), "RETRY"); + } + { + // Test unknown/default case by casting an invalid value + bsl::ostringstream oss; + oss << static_cast(999); + EXPECT_EQ(oss.str(), "UNKNOWN"); + } +} + +TEST_F(HostHealthMonitorTests, PublishesHealthyCheckMetrics) +{ + d_configurableHealthChecker.d_nextResult = true; + + EXPECT_CALL(*d_metricPublisher, + publishCounter( + bsl::string(rmqamqp::Metrics::HEALTH_CHECK_TOTAL), 1.0, _)) + .Times(1); + EXPECT_CALL( + *d_metricPublisher, + publishSummary( + bsl::string(rmqamqp::Metrics::HEALTH_CHECK_DURATION_MS), _, _)) + .Times(1); + EXPECT_CALL(*d_metricPublisher, + publishGauge( + bsl::string(rmqamqp::Metrics::HEALTH_CHECK_STATUS), 1.0, _)) + .Times(1); + EXPECT_CALL( + *d_metricPublisher, + publishGauge( + bsl::string(rmqamqp::Metrics::HEALTH_CHECK_CONSECUTIVE_FAILURES), + 0.0, + _)) + .Times(1); + EXPECT_CALL( + *d_metricPublisher, + publishCounter( + bsl::string(rmqamqp::Metrics::HEALTH_TRIGGERED_RESUME_TOTAL), + 1.0, + _)) + .Times(1); + EXPECT_CALL(*d_metricPublisher, + publishCounter(bsl::string("disconnect_events"), _, _)) + .Times(AnyNumber()); + + stepOnePollInterval(); +} + +TEST_F(HostHealthMonitorTests, PublishesUnhealthyCheckMetrics) +{ + d_configurableHealthChecker.d_nextResult = false; + + EXPECT_CALL(*d_metricPublisher, + publishCounter( + bsl::string(rmqamqp::Metrics::HEALTH_CHECK_TOTAL), 1.0, _)) + .Times(1); + EXPECT_CALL( + *d_metricPublisher, + publishSummary( + bsl::string(rmqamqp::Metrics::HEALTH_CHECK_DURATION_MS), _, _)) + .Times(1); + EXPECT_CALL(*d_metricPublisher, + publishGauge( + bsl::string(rmqamqp::Metrics::HEALTH_CHECK_STATUS), 0.0, _)) + .Times(1); + EXPECT_CALL(*d_metricPublisher, + publishCounter( + bsl::string(rmqamqp::Metrics::HEALTH_TRIGGERED_PAUSE_TOTAL), + 1.0, + _)) + .Times(1); + EXPECT_CALL(*d_metricPublisher, + publishCounter(bsl::string("disconnect_events"), _, _)) + .Times(AnyNumber()); + + stepOnePollInterval(); +} + +TEST_F(HostHealthMonitorTests, PublishesRetryCheckMetricsOnBslException) +{ + d_configurableHealthChecker.d_throwBslException = true; + + expectRetryCheckMetrics(1.0); + + stepOnePollInterval(); +} + +TEST_F(HostHealthMonitorTests, PublishesRetryCheckMetricsOnUnknownException) +{ + d_configurableHealthChecker.d_throwUnknown = true; + + expectRetryCheckMetrics(1.0); + + stepOnePollInterval(); +} + +TEST_F(HostHealthMonitorTests, + PublishesConsecutiveFailuresGaugeDuringRetryPeriod) +{ + d_configurableHealthChecker.d_throwBslException = true; + + // First failure: consecutive failures = 1 + expectRetryCheckMetrics(1.0); + stepAndClear(); + + // Second failure: consecutive failures = 2 + expectRetryCheckMetrics(2.0); + stepOnePollInterval(); +} + +TEST_F(HostHealthMonitorTests, + PublishesConsecutiveFailuresGaugeWhenMaxRetriesReached) +{ + d_configurableHealthChecker.d_throwBslException = true; + + // Run checks that don't exceed max retries (RETRY state) + for (unsigned short i = 0; i <= d_config.maxRetriesOnFailure(); ++i) { + stepAndClear(); + } + + // This run will exceed maxRetries and transition to UNHEALTHY + // Note: Due to post-increment, consecutive failures will be + // maxRetriesOnFailure + 2 + const double expectedFailures = + static_cast(d_config.maxRetriesOnFailure() + 2); + EXPECT_CALL(*d_metricPublisher, + publishCounter( + bsl::string(rmqamqp::Metrics::HEALTH_CHECK_TOTAL), 1.0, _)) + .Times(1); + EXPECT_CALL( + *d_metricPublisher, + publishCounter( + bsl::string(rmqamqp::Metrics::HEALTH_CHECK_FAILURES_TOTAL), 1.0, _)) + .Times(1); + EXPECT_CALL( + *d_metricPublisher, + publishGauge( + bsl::string(rmqamqp::Metrics::HEALTH_CHECK_CONSECUTIVE_FAILURES), + expectedFailures, + _)) + .Times(1); + EXPECT_CALL(*d_metricPublisher, + publishGauge( + bsl::string(rmqamqp::Metrics::HEALTH_CHECK_STATUS), 0.0, _)) + .Times(1); + EXPECT_CALL(*d_metricPublisher, + publishCounter( + bsl::string(rmqamqp::Metrics::HEALTH_TRIGGERED_PAUSE_TOTAL), + 1.0, + _)) + .Times(1); + EXPECT_CALL(*d_metricPublisher, + publishCounter(bsl::string("disconnect_events"), _, _)) + .Times(AnyNumber()); + + stepOnePollInterval(); +} + +TEST_F(HostHealthMonitorTests, BlockedEventLoopMetricPublished) +{ + // This test takes ~1.1s: exercises real wall-clock duration detection + d_configurableHealthChecker.d_nextResult = true; + d_configurableHealthChecker.d_sleepMicroseconds = 1100000; // 1.1 seconds + + EXPECT_CALL( + *d_metricPublisher, + publishCounter( + bsl::string(rmqamqp::Metrics::HEALTH_CHECK_BLOCKED_EVENT_LOOP), + 1.0, + _)) + .Times(1); + EXPECT_CALL(*d_metricPublisher, + publishCounter( + bsl::string(rmqamqp::Metrics::HEALTH_CHECK_TOTAL), 1.0, _)) + .Times(1); + EXPECT_CALL( + *d_metricPublisher, + publishSummary( + bsl::string(rmqamqp::Metrics::HEALTH_CHECK_DURATION_MS), _, _)) + .Times(1); + EXPECT_CALL( + *d_metricPublisher, + publishCounter( + bsl::string(rmqamqp::Metrics::HEALTH_TRIGGERED_RESUME_TOTAL), + 1.0, + _)) + .Times(1); + EXPECT_CALL(*d_metricPublisher, + publishCounter(bsl::string("disconnect_events"), _, _)) + .Times(AnyNumber()); + + stepOnePollInterval(); +} + +TEST_F(HostHealthMonitorTests, RetrySchedulesNextCheck) +{ + d_configurableHealthChecker.d_throwBslException = true; + + // First step: RETRY path — scheduleNextCheck() runs at top of checkHealth() + EXPECT_CALL(*d_metricPublisher, + publishCounter( + bsl::string(rmqamqp::Metrics::HEALTH_CHECK_TOTAL), 1.0, _)) + .Times(1); + EXPECT_CALL( + *d_metricPublisher, + publishCounter( + bsl::string(rmqamqp::Metrics::HEALTH_CHECK_FAILURES_TOTAL), 1.0, _)) + .Times(1); + EXPECT_CALL(*d_metricPublisher, publishGauge(_, _, _)).Times(AtLeast(0)); + EXPECT_CALL(*d_metricPublisher, + publishCounter(bsl::string("disconnect_events"), _, _)) + .Times(AnyNumber()); + EXPECT_CALL(*d_connection, pauseReceiveChannels(_)).Times(0); + EXPECT_CALL(*d_connection, resumeReceiveChannels(_)).Times(0); + + stepOnePollInterval(); + Mock::VerifyAndClearExpectations(d_metricPublisher.get()); + Mock::VerifyAndClearExpectations(d_connection.get()); + + // Second step: timer fires again — proves scheduleNextCheck() was called + // Catch-alls first (lowest priority in gmock reverse-order matching) + EXPECT_CALL(*d_metricPublisher, publishCounter(_, _, _)).Times(AnyNumber()); + EXPECT_CALL(*d_metricPublisher, publishGauge(_, _, _)).Times(AtLeast(0)); + // Specific expectation last (highest priority — checked first by gmock) + EXPECT_CALL(*d_metricPublisher, + publishCounter( + bsl::string(rmqamqp::Metrics::HEALTH_CHECK_TOTAL), 1.0, _)) + .Times(1); + + stepOnePollInterval(); +} + +TEST_F(HostHealthMonitorTests, RegisterConnectionAfterStopIsValid) +{ + d_monitor->stop(); + + bsl::shared_ptr conn = makeConnection("late-register"); + + EXPECT_CALL( + *d_metricPublisher, + publishGauge(bsl::string(rmqamqp::Metrics::HEALTH_AWARE_VHOSTS), _, _)) + .Times(1); + + d_monitor->registerConnection(bsl::weak_ptr(conn)); + + // Restart: late-registered connection should be notified + d_configurableHealthChecker.d_nextResult = true; + d_monitor->start(d_timerFactory); + + EXPECT_CALL(*d_connection, resumeReceiveChannels(true)).Times(1); + EXPECT_CALL(*conn, resumeReceiveChannels(true)).Times(1); + EXPECT_CALL(*d_metricPublisher, publishGauge(_, _, _)).Times(AtLeast(0)); + EXPECT_CALL(*d_metricPublisher, publishCounter(_, _, _)).Times(AtLeast(0)); + EXPECT_CALL(*d_metricPublisher, publishSummary(_, _, _)).Times(AtLeast(0)); + + d_timerFactory->step_time(bsls::TimeInterval(d_config.pollInterval())); +} diff --git a/src/tests/rmqamqp/rmqamqp_receivechannel.t.cpp b/src/tests/rmqamqp/rmqamqp_receivechannel.t.cpp index e100f4f5..e7ca8c66 100644 --- a/src/tests/rmqamqp/rmqamqp_receivechannel.t.cpp +++ b/src/tests/rmqamqp/rmqamqp_receivechannel.t.cpp @@ -122,6 +122,7 @@ class ReceiveChannelTests : public rmqamqp::ChannelTests { bsl::shared_ptr d_ackQueue; bsl::shared_ptr d_metricPublisher; bsl::vector > d_vhostTag; + bsl::vector > d_metricTags; ReceiveChannelTests() : rmqamqp::ChannelTests() @@ -134,6 +135,8 @@ class ReceiveChannelTests : public rmqamqp::ChannelTests { { d_vhostTag.push_back(bsl::pair( rmqamqp::Metrics::VHOST_TAG, TEST_VHOST)); + + d_metricTags = d_vhostTag; } void ackOrNackMessage(rmqamqp::ReceiveChannel& rc, @@ -189,13 +192,13 @@ class ReceiveChannelTests : public rmqamqp::ChannelTests { EXPECT_THAT(rc.state(), Eq(rmqamqp::Channel::READY)); } - void qosOkReply(rmqamqp::ReceiveChannel& rc) + void receiveBasicQoSOk(rmqamqp::ReceiveChannel& rc) { rc.processReceived(rmqamqp::Message( rmqamqpt::Method(rmqamqpt::BasicMethod(rmqamqpt::BasicQoSOk())))); } - void qosExpectations() + void expectBasicQoS() { EXPECT_CALL( d_callback, @@ -205,13 +208,41 @@ class ReceiveChannelTests : public rmqamqp::ChannelTests { .WillOnce(InvokeArgument<1>()); // qos } - void cancelOkReply(rmqamqp::ReceiveChannel& rc, const bsl::string& tag) + void receiveBasicConsumeOk(rmqamqp::ReceiveChannel& rc, + const bsl::string& tag) + { + rc.processReceived(rmqamqp::Message(rmqamqpt::Method( + rmqamqpt::BasicMethod(rmqamqpt::BasicConsumeOk(tag))))); + } + + void expectBasicConsume() + { + EXPECT_CALL( + d_callback, + onAsyncWrite(Pointee(MethodMsgTypeEq(rmqamqpt::Method( + rmqamqpt::BasicMethod(rmqamqpt::BasicConsume())))), + _)) + .WillOnce(InvokeArgument<1>()); // consume + } + + void notExpectBasicConsume() + { + EXPECT_CALL( + d_callback, + onAsyncWrite(Pointee(MethodMsgTypeEq(rmqamqpt::Method( + rmqamqpt::BasicMethod(rmqamqpt::BasicConsume())))), + _)) + .Times(0); + } + + void receiveBasicCancelOk(rmqamqp::ReceiveChannel& rc, + const bsl::string& tag) { rc.processReceived(rmqamqp::Message(rmqamqpt::Method( rmqamqpt::BasicMethod(rmqamqpt::BasicCancelOk(tag))))); } - void cancelExpectations() + void expectBasicCancel() { EXPECT_CALL( d_callback, @@ -221,15 +252,41 @@ class ReceiveChannelTests : public rmqamqp::ChannelTests { .WillOnce(InvokeArgument<1>()); // cancel } + void notExpectBasicCancel() + { + EXPECT_CALL( + d_callback, + onAsyncWrite(Pointee(MethodMsgTypeEq(rmqamqpt::Method( + rmqamqpt::BasicMethod(rmqamqpt::BasicCancel())))), + _)) + .Times(0); + } + + void expectChannelClose() + { + EXPECT_CALL(d_callback, + onAsyncWrite(Pointee(MethodMsgTypeEq( + rmqamqpt::Method(rmqamqpt::ChannelMethod( + rmqamqpt::ChannelClose())))), + _)) + .WillOnce(InvokeArgument<1>()); // close + } + + void receiveBasicCancel(rmqamqp::ReceiveChannel& rc, const bsl::string& tag) + { + rc.processReceived(rmqamqp::Message(rmqamqpt::Method( + rmqamqpt::BasicMethod(rmqamqpt::BasicCancel(tag))))); + } + void makeReady(rmqamqp::ReceiveChannel& rc) { openAndSendTopology(rc); - qosExpectations(); + expectBasicQoS(); queueDeclareReply(rc); EXPECT_THAT(rc.state(), Eq(rmqamqp::Channel::AWAITING_REPLY)); - qosOkReply(rc); + receiveBasicQoSOk(rc); } void receiveMessage(rmqamqp::ReceiveChannel& rc, @@ -260,7 +317,8 @@ class ReceiveChannelTests : public rmqamqp::ChannelTests { bsl::shared_ptr makeReceiveChannel( size_t prefetchCount = 100, - bsl::optional consumerPriority = bsl::optional()) + bsl::optional consumerPriority = bsl::optional(), + const bool channelPausedOnOpen = false) { rmqt::ConsumerConfig consumerConfig( rmqt::ConsumerConfig::generateConsumerTag(), prefetchCount); @@ -275,7 +333,8 @@ class ReceiveChannelTests : public rmqamqp::ChannelTests { TEST_VHOST, d_ackQueue, d_timerFactory->createWithCallback(&noopHungTimerCallback), - d_connErrorCb); + d_connErrorCb, + channelPausedOnOpen); } }; @@ -352,7 +411,7 @@ TEST_F(ReceiveChannelTests, HandleReopen) EXPECT_TRUE(receiveChannel->consumerIsActive()); openExpectations(); - qosExpectations(); + expectBasicQoS(); EXPECT_CALL( d_callback, @@ -368,11 +427,129 @@ TEST_F(ReceiveChannelTests, HandleReopen) openOkReply(*receiveChannel); queueDeclareReply(*receiveChannel); - qosOkReply(*receiveChannel); + receiveBasicQoSOk(*receiveChannel); consumerReply(*receiveChannel); EXPECT_THAT(receiveChannel->state(), Eq(rmqamqp::Channel::READY)); } + +TEST_F(ReceiveChannelTests, OpenPaused) +{ + const bool channelPausedOnOpen = true; + bsl::shared_ptr receiveChannel = + makeReceiveChannel(100, 5, channelPausedOnOpen); + + makeReady(*receiveChannel); + EXPECT_THAT(receiveChannel->state(), Eq(rmqamqp::Channel::READY)); + + notExpectBasicConsume(); + receiveChannel->consume(d_queue, d_onNewMessage, d_consumerTag); + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_TRUE(receiveChannel->consumerIsPaused()); +} + +TEST_F(ReceiveChannelTests, OpenPausedBeforeResume) +{ + const bool channelPausedOnOpen = true; + bsl::shared_ptr receiveChannel = + makeReceiveChannel(100, 5, channelPausedOnOpen); + + makeReady(*receiveChannel); + EXPECT_THAT(receiveChannel->state(), Eq(rmqamqp::Channel::READY)); + + // Resume + notExpectBasicConsume(); + rmqt::Future<> wereResumed = receiveChannel->resume(); + EXPECT_FALSE(!!wereResumed.tryResult()); + EXPECT_THAT(wereResumed.tryResult().error(), + Eq("Resume called with no existing consumer")); +} + +TEST_F(ReceiveChannelTests, ReopenPaused) +{ + const bool channelPausedOnOpen = true; + bsl::shared_ptr receiveChannel = + makeReceiveChannel(100, 5, channelPausedOnOpen); + + makeReady(*receiveChannel); + EXPECT_THAT(receiveChannel->state(), Eq(rmqamqp::Channel::READY)); + + openExpectations(); + expectBasicQoS(); + + // Re-open the channel + EXPECT_CALL(*d_retryHandler, retry(_)).WillOnce(InvokeArgument<0>()); + receiveChannel->reset(true); + + openOkReply(*receiveChannel); + queueDeclareReply(*receiveChannel); + receiveBasicQoSOk(*receiveChannel); + + EXPECT_THAT(receiveChannel->state(), Eq(rmqamqp::Channel::READY)); +} + +TEST_F(ReceiveChannelTests, ReopenPausedBeforeConsume) +{ + const bool channelPausedOnOpen = true; + bsl::shared_ptr receiveChannel = + makeReceiveChannel(100, 5, channelPausedOnOpen); + + makeReady(*receiveChannel); + EXPECT_THAT(receiveChannel->state(), Eq(rmqamqp::Channel::READY)); + + openExpectations(); + expectBasicQoS(); + + // Re-open the channel + EXPECT_CALL(*d_retryHandler, retry(_)).WillOnce(InvokeArgument<0>()); + receiveChannel->reset(true); + + openOkReply(*receiveChannel); + queueDeclareReply(*receiveChannel); + receiveBasicQoSOk(*receiveChannel); + + EXPECT_THAT(receiveChannel->state(), Eq(rmqamqp::Channel::READY)); + + // Consume + notExpectBasicConsume(); + receiveChannel->consume(d_queue, d_onNewMessage, d_consumerTag); + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_TRUE(receiveChannel->consumerIsPaused()); +} + +TEST_F(ReceiveChannelTests, ReopenPausedAfterConsume) +{ + const bool channelPausedOnOpen = true; + bsl::shared_ptr receiveChannel = + makeReceiveChannel(100, 5, channelPausedOnOpen); + + makeReady(*receiveChannel); + EXPECT_THAT(receiveChannel->state(), Eq(rmqamqp::Channel::READY)); + + // Consume + notExpectBasicConsume(); + receiveChannel->consume(d_queue, d_onNewMessage, d_consumerTag); + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_TRUE(receiveChannel->consumerIsPaused()); + + openExpectations(); + expectBasicQoS(); + notExpectBasicConsume(); + + // Re-open the channel + EXPECT_CALL(*d_retryHandler, retry(_)).WillOnce(InvokeArgument<0>()); + receiveChannel->reset(true); + + openOkReply(*receiveChannel); + queueDeclareReply(*receiveChannel); + receiveBasicQoSOk(*receiveChannel); + + EXPECT_THAT(receiveChannel->state(), Eq(rmqamqp::Channel::READY)); + + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_TRUE(receiveChannel->consumerIsPaused()); +} + TEST_F(ReceiveChannelTests, BadQueueHandleThrows) { bsl::shared_ptr receiveChannel = makeReceiveChannel(); @@ -613,7 +790,7 @@ TEST_F(ReceiveChannelTests, WrongChannelLifetimeId) receiveChannel->reset(true); openOkReply(*receiveChannel); queueDeclareReply(*receiveChannel); - qosOkReply(*receiveChannel); + receiveBasicQoSOk(*receiveChannel); consumerReply(*receiveChannel, "consumer1"); ackMessage(*receiveChannel, id_ack); @@ -662,7 +839,7 @@ TEST_F(ReceiveChannelTests, AckMsgsHavingDiffLifetimeIdSameDeliveryTag) receiveChannel->reset(true); openOkReply(*receiveChannel); queueDeclareReply(*receiveChannel); - qosOkReply(*receiveChannel); + receiveBasicQoSOk(*receiveChannel); consumerReply(*receiveChannel, "consumer1"); EXPECT_THAT(receiveChannel->state(), Eq(rmqamqp::Channel::READY)); @@ -709,9 +886,8 @@ TEST_F(ReceiveChannelTests, AckMessagePublishesMetric) ackExpectations(deliveryTag); - EXPECT_CALL( - *d_metricPublisher, - publishDistribution(bsl::string("acknowledge_latency"), _, d_vhostTag)); + EXPECT_CALL(*d_metricPublisher, + publishDistribution(bsl::string("acknowledge_latency"), _, _)); ackMessage(*receiveChannel, id_ack); } @@ -723,9 +899,8 @@ TEST_F(ReceiveChannelTests, ReceivedMsgPublishesMetric) makeReady(*receiveChannel); setupConsumer(*receiveChannel, "consumer1"); - EXPECT_CALL( - *d_metricPublisher, - publishCounter(bsl::string("received_messages"), 1, d_vhostTag)); + EXPECT_CALL(*d_metricPublisher, + publishCounter(bsl::string("received_messages"), 1, _)); const uint64_t deliveryTag = 40; receiveMessage(*receiveChannel, deliveryTag, "consumer1"); @@ -744,13 +919,13 @@ TEST_F(ReceiveChannelTests, Cancel) EXPECT_THAT(receiveChannel->inFlight(), Eq(1)); size_t lifetimeId = receiveChannel->lifetimeId(); - cancelExpectations(); + expectBasicCancel(); rmqt::Future<> wereCancelled = receiveChannel->cancel(); EXPECT_THAT(lifetimeId, Eq(receiveChannel->lifetimeId())); EXPECT_THAT(!!wereCancelled.tryResult(), Eq(false)); - cancelOkReply(*receiveChannel, "consumer1"); + receiveBasicCancelOk(*receiveChannel, "consumer1"); EXPECT_THAT(!!wereCancelled.tryResult(), Eq(true)); EXPECT_THAT(lifetimeId, Eq(receiveChannel->lifetimeId())); @@ -779,7 +954,7 @@ TEST_F(ReceiveChannelTests, CancelCalled2x) EXPECT_THAT(receiveChannel->inFlight(), Eq(1)); size_t lifetimeId = receiveChannel->lifetimeId(); - cancelExpectations(); + expectBasicCancel(); rmqt::Future<> wereCancelled = receiveChannel->cancel(); EXPECT_THAT(lifetimeId, Eq(receiveChannel->lifetimeId())); @@ -787,7 +962,7 @@ TEST_F(ReceiveChannelTests, CancelCalled2x) rmqt::Future<> wereCancelled2x = receiveChannel->cancel(); - cancelOkReply(*receiveChannel, "consumer1"); + receiveBasicCancelOk(*receiveChannel, "consumer1"); EXPECT_THAT(!!wereCancelled.tryResult(), Eq(true)); EXPECT_THAT(!!wereCancelled2x.tryResult(), Eq(true)); @@ -802,6 +977,191 @@ TEST_F(ReceiveChannelTests, CancelCalled2x) EXPECT_THAT(receiveChannel->inFlight(), Eq(0)); } +TEST_F(ReceiveChannelTests, ClientCancelDuringResume) +{ + bsl::shared_ptr receiveChannel = makeReceiveChannel(); + + makeReady(*receiveChannel); + setupConsumer(*receiveChannel, d_consumerTag); + EXPECT_THAT(receiveChannel->state(), Eq(rmqamqp::Channel::READY)); + EXPECT_TRUE(receiveChannel->consumerIsActive()); + + // Pause + expectBasicCancel(); + rmqt::Future<> werePaused = receiveChannel->pause(); + receiveBasicCancelOk(*receiveChannel, d_consumerTag); + EXPECT_TRUE(!!werePaused.tryResult()); + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_TRUE(receiveChannel->consumerIsPaused()); + + // Start resuming + expectBasicConsume(); + rmqt::Future<> wereResumed = receiveChannel->resume(); + EXPECT_FALSE(!!wereResumed.tryResult()); + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_FALSE(receiveChannel->consumerIsPaused()); + + // Cancel before resume reply + notExpectBasicCancel(); + rmqt::Future<> wereCancelled = receiveChannel->cancel(); + + // Complete resume and expect cancel to be sent afterwards + expectBasicCancel(); + receiveBasicConsumeOk(*receiveChannel, d_consumerTag); + EXPECT_TRUE(!!wereResumed.tryResult()); + + receiveBasicCancelOk(*receiveChannel, d_consumerTag); + EXPECT_TRUE(!!wereCancelled.tryResult()); + + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_FALSE(receiveChannel->consumerIsPaused()); +} + +TEST_F(ReceiveChannelTests, ServerCancelDuringResume) +{ + bsl::shared_ptr receiveChannel = makeReceiveChannel(); + + makeReady(*receiveChannel); + setupConsumer(*receiveChannel, d_consumerTag); + EXPECT_THAT(receiveChannel->state(), Eq(rmqamqp::Channel::READY)); + EXPECT_TRUE(receiveChannel->consumerIsActive()); + + // Pause + expectBasicCancel(); + rmqt::Future<> werePaused = receiveChannel->pause(); + receiveBasicCancelOk(*receiveChannel, d_consumerTag); + EXPECT_TRUE(!!werePaused.tryResult()); + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_TRUE(receiveChannel->consumerIsPaused()); + + // Start resuming + expectBasicConsume(); + rmqt::Future<> wereResumed = receiveChannel->resume(); + EXPECT_FALSE(!!wereResumed.tryResult()); + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_FALSE(receiveChannel->consumerIsPaused()); + + // Server sends Cancel, channel closes, and resets + expectChannelClose(); + receiveBasicCancel(*receiveChannel, d_consumerTag); + receiveChannel->reset(); + + EXPECT_FALSE(!!wereResumed.tryResult()); + EXPECT_THAT(wereResumed.tryResult().error(), + Eq("Consume could not be processed by the server, " + "due channel reset")); + + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_FALSE(receiveChannel->consumerIsPaused()); +} + +TEST_F(ReceiveChannelTests, ClientCancelDuringPause) +{ + bsl::shared_ptr receiveChannel = makeReceiveChannel(1); + + makeReady(*receiveChannel); + setupConsumer(*receiveChannel, d_consumerTag); + EXPECT_THAT(receiveChannel->state(), Eq(rmqamqp::Channel::READY)); + EXPECT_TRUE(receiveChannel->consumerIsActive()); + + // Start pausing + expectBasicCancel(); + rmqt::Future<> werePaused = receiveChannel->pause(); + + // Start cancelling before pause reply + notExpectBasicCancel(); + rmqt::Future<> wereCancelled = receiveChannel->cancel(); + EXPECT_FALSE(!!wereCancelled.tryResult()); + + // Get cancel/pause reply - Both have same reply from broker + receiveBasicCancelOk(*receiveChannel, d_consumerTag); + + // Consumer gets cancelled + EXPECT_TRUE(!!wereCancelled.tryResult()); + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_FALSE(receiveChannel->consumerIsPaused()); +} + +TEST_F(ReceiveChannelTests, ServerCancelDuringPause) +{ + bsl::shared_ptr receiveChannel = makeReceiveChannel(); + + makeReady(*receiveChannel); + setupConsumer(*receiveChannel, d_consumerTag); + EXPECT_THAT(receiveChannel->state(), Eq(rmqamqp::Channel::READY)); + EXPECT_TRUE(receiveChannel->consumerIsActive()); + + // Start pausing + expectBasicCancel(); + rmqt::Future<> werePaused = receiveChannel->pause(); + + // Server sends Cancel, channel closes, and resets + expectChannelClose(); + receiveBasicCancel(*receiveChannel, d_consumerTag); + receiveChannel->reset(); + + // Pause future fails + EXPECT_FALSE(!!werePaused.tryResult()); + EXPECT_THAT(werePaused.tryResult().error(), + Eq("Cancel could not be processed by the server, due channel " + "reset, however consumer is now in a cancelled state")); + + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_FALSE(receiveChannel->consumerIsPaused()); +} + +TEST_F(ReceiveChannelTests, ClientCancelAfterPause) +{ + bsl::shared_ptr receiveChannel = makeReceiveChannel(1); + + makeReady(*receiveChannel); + setupConsumer(*receiveChannel, d_consumerTag); + EXPECT_THAT(receiveChannel->state(), Eq(rmqamqp::Channel::READY)); + EXPECT_TRUE(receiveChannel->consumerIsActive()); + + // Pause + expectBasicCancel(); + rmqt::Future<> werePaused = receiveChannel->pause(); + receiveBasicCancelOk(*receiveChannel, d_consumerTag); + EXPECT_TRUE(!!werePaused.tryResult()); + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_TRUE(receiveChannel->consumerIsPaused()); + + // Cancel after pause + notExpectBasicCancel(); + rmqt::Future<> wereCancelled = receiveChannel->cancel(); + EXPECT_TRUE(!!wereCancelled.tryResult()); + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_FALSE(receiveChannel->consumerIsPaused()); +} + +TEST_F(ReceiveChannelTests, ServerCancelAfterPause) +{ + bsl::shared_ptr receiveChannel = makeReceiveChannel(); + + makeReady(*receiveChannel); + setupConsumer(*receiveChannel, d_consumerTag); + EXPECT_THAT(receiveChannel->state(), Eq(rmqamqp::Channel::READY)); + EXPECT_TRUE(receiveChannel->consumerIsActive()); + + // Pause + expectBasicCancel(); + rmqt::Future<> werePaused = receiveChannel->pause(); + receiveBasicCancelOk(*receiveChannel, d_consumerTag); + EXPECT_TRUE(!!werePaused.tryResult()); + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_TRUE(receiveChannel->consumerIsPaused()); + + // Server sends Cancel, channel closes, and resets + expectChannelClose(); + receiveBasicCancel(*receiveChannel, d_consumerTag); + receiveChannel->reset(); + + // Consumer stays paused + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_TRUE(receiveChannel->consumerIsPaused()); +} + TEST_F(ReceiveChannelTests, CancelFail) { bsl::shared_ptr receiveChannel = makeReceiveChannel(1); @@ -827,9 +1187,9 @@ TEST_F(ReceiveChannelTests, DrainRegular) EXPECT_THAT(receiveChannel->inFlight(), Eq(1)); size_t lifetimeId = receiveChannel->lifetimeId(); - cancelExpectations(); + expectBasicCancel(); rmqt::Future<> wereCancelled = receiveChannel->cancel(); - cancelOkReply(*receiveChannel, "consumer1"); + receiveBasicCancelOk(*receiveChannel, "consumer1"); EXPECT_TRUE(wereCancelled.blockResult()); EXPECT_THAT(receiveChannel->inFlight(), Eq(1)); @@ -882,7 +1242,7 @@ TEST_F(ReceiveChannelTests, CancelFutureCancelledOnReset) EXPECT_THAT(receiveChannel->inFlight(), Eq(1)); - cancelExpectations(); + expectBasicCancel(); rmqt::Future<> wereCancelled = receiveChannel->cancel(); EXPECT_THAT(wereCancelled.tryResult().returnCode(), Eq(rmqt::TIMEOUT)); receiveChannel->reset(); @@ -902,9 +1262,9 @@ TEST_F(ReceiveChannelTests, DrainFutureCancelledOnReset) EXPECT_THAT(receiveChannel->inFlight(), Eq(1)); - cancelExpectations(); + expectBasicCancel(); receiveChannel->cancel(); - cancelOkReply(*receiveChannel, "consumer1"); + receiveBasicCancelOk(*receiveChannel, "consumer1"); rmqt::Future<> wereDrained = receiveChannel->drain(); EXPECT_THAT(wereDrained.tryResult().returnCode(), Eq(rmqt::TIMEOUT)); @@ -925,7 +1285,7 @@ TEST_F(ReceiveChannelTests, CancelFutureCancelledOnDestruct) EXPECT_THAT(receiveChannel->inFlight(), Eq(1)); - cancelExpectations(); + expectBasicCancel(); rmqt::Future<> wereCancelled = receiveChannel->cancel(); EXPECT_THAT(wereCancelled.tryResult().returnCode(), Eq(rmqt::TIMEOUT)); receiveChannel->reset(); @@ -945,9 +1305,9 @@ TEST_F(ReceiveChannelTests, DrainFutureCancelledOnDestruct) EXPECT_THAT(receiveChannel->inFlight(), Eq(1)); - cancelExpectations(); + expectBasicCancel(); receiveChannel->cancel(); - cancelOkReply(*receiveChannel, "consumer1"); + receiveBasicCancelOk(*receiveChannel, "consumer1"); rmqt::Future<> wereDrained = receiveChannel->drain(); EXPECT_THAT(wereDrained.tryResult().returnCode(), Eq(rmqt::TIMEOUT)); @@ -990,10 +1350,10 @@ TEST_F(ReceiveChannelTests, CancelConsumerBeforeTopologyConfirmed) // Now, after cancel() has been called, we expect consume() not to be called // when channel becomes ready // The remaining steps of makeReady() - qosExpectations(); + expectBasicQoS(); queueDeclareReply(*receiveChannel); EXPECT_THAT(receiveChannel->state(), Eq(rmqamqp::Channel::AWAITING_REPLY)); - qosOkReply(*receiveChannel); + receiveBasicQoSOk(*receiveChannel); } TEST_F(ReceiveChannelTests, CancelConsumerBeforeConsumerStarted) @@ -1034,7 +1394,7 @@ TEST_F(ReceiveChannelTests, CancelConsumerBeforeConsumerStarted) receiveChannel->processReceived(rmqamqp::Message(rmqamqpt::Method( rmqamqpt::BasicMethod(rmqamqpt::BasicConsumeOk(d_consumerTag))))); - cancelOkReply(*receiveChannel, d_consumerTag); + receiveBasicCancelOk(*receiveChannel, d_consumerTag); EXPECT_FALSE(receiveChannel->consumerIsActive()); } @@ -1099,6 +1459,453 @@ TEST_F(ReceiveChannelTests, AckBatchesInFlightDuringResetGetLocked) EXPECT_THAT(receiveChannel->inFlight(), Eq(0)); } +TEST_F(ReceiveChannelTests, Pause) +{ + bsl::shared_ptr receiveChannel = makeReceiveChannel(1); + + makeReady(*receiveChannel); + setupConsumer(*receiveChannel, d_consumerTag); + EXPECT_THAT(receiveChannel->state(), Eq(rmqamqp::Channel::READY)); + EXPECT_TRUE(receiveChannel->consumerIsActive()); + + // Pause + expectBasicCancel(); + rmqt::Future<> werePaused = receiveChannel->pause(); + EXPECT_FALSE(!!werePaused.tryResult()); + EXPECT_FALSE(receiveChannel->consumerIsPaused()); + + receiveBasicCancelOk(*receiveChannel, d_consumerTag); + EXPECT_TRUE(!!werePaused.tryResult()); + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_TRUE(receiveChannel->consumerIsPaused()); +} + +TEST_F(ReceiveChannelTests, PauseDuringResume) +{ + bsl::shared_ptr receiveChannel = makeReceiveChannel(); + + makeReady(*receiveChannel); + setupConsumer(*receiveChannel, d_consumerTag); + EXPECT_THAT(receiveChannel->state(), Eq(rmqamqp::Channel::READY)); + EXPECT_TRUE(receiveChannel->consumerIsActive()); + + // Pause before resume + expectBasicCancel(); + rmqt::Future<> werePaused = receiveChannel->pause(); + receiveBasicCancelOk(*receiveChannel, d_consumerTag); + EXPECT_TRUE(!!werePaused.tryResult()); + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_TRUE(receiveChannel->consumerIsPaused()); + + // Resume start + expectBasicConsume(); + rmqt::Future<> wereResumed = receiveChannel->resume(); + + // Start pausing + expectBasicCancel(); + werePaused = receiveChannel->pause(); + EXPECT_FALSE(!!werePaused.tryResult()); + EXPECT_FALSE(receiveChannel->consumerIsPaused()); + + // Complete resume + receiveBasicConsumeOk(*receiveChannel, d_consumerTag); + EXPECT_TRUE(!!wereResumed.tryResult()); + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_FALSE(receiveChannel->consumerIsPaused()); + + // Complete pause + receiveBasicCancelOk(*receiveChannel, d_consumerTag); + EXPECT_TRUE(!!werePaused.tryResult()); + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_TRUE(receiveChannel->consumerIsPaused()); +} + +TEST_F(ReceiveChannelTests, PauseDuringConsume) +{ + bsl::shared_ptr receiveChannel = makeReceiveChannel(); + + makeReady(*receiveChannel); + EXPECT_THAT(receiveChannel->state(), Eq(rmqamqp::Channel::READY)); + + // Consume start + expectBasicConsume(); + rmqt::Result<> result = + receiveChannel->consume(d_queue, d_onNewMessage, d_consumerTag); + EXPECT_TRUE(result); + EXPECT_FALSE(receiveChannel->consumerIsActive()); + + // Start pausing + expectBasicCancel(); + rmqt::Future<> werePaused = receiveChannel->pause(); + EXPECT_FALSE(!!werePaused.tryResult()); + EXPECT_FALSE(receiveChannel->consumerIsPaused()); + + // Complete consume + receiveBasicConsumeOk(*receiveChannel, d_consumerTag); + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_FALSE(receiveChannel->consumerIsPaused()); + + // Complete pause + receiveBasicCancelOk(*receiveChannel, d_consumerTag); + EXPECT_TRUE(!!werePaused.tryResult()); + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_TRUE(receiveChannel->consumerIsPaused()); +} + +TEST_F(ReceiveChannelTests, PauseDuringPause) +{ + bsl::shared_ptr receiveChannel = makeReceiveChannel(1); + + makeReady(*receiveChannel); + setupConsumer(*receiveChannel, d_consumerTag); + EXPECT_THAT(receiveChannel->state(), Eq(rmqamqp::Channel::READY)); + EXPECT_TRUE(receiveChannel->consumerIsActive()); + + // Start pausing + expectBasicCancel(); + rmqt::Future<> werePaused = receiveChannel->pause(); + EXPECT_FALSE(!!werePaused.tryResult()); + EXPECT_FALSE(receiveChannel->consumerIsPaused()); + + // Pause again before first pause completes + notExpectBasicCancel(); + rmqt::Future<> werePaused2x = receiveChannel->pause(); + EXPECT_FALSE(!!werePaused2x.tryResult()); + EXPECT_FALSE(receiveChannel->consumerIsPaused()); + + // Complete pause + receiveBasicCancelOk(*receiveChannel, d_consumerTag); + EXPECT_TRUE(!!werePaused.tryResult()); + EXPECT_TRUE(!!werePaused2x.tryResult()); + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_TRUE(receiveChannel->consumerIsPaused()); +} + +TEST_F(ReceiveChannelTests, PauseAfterPause) +{ + bsl::shared_ptr receiveChannel = makeReceiveChannel(1); + + makeReady(*receiveChannel); + setupConsumer(*receiveChannel, d_consumerTag); + EXPECT_THAT(receiveChannel->state(), Eq(rmqamqp::Channel::READY)); + EXPECT_TRUE(receiveChannel->consumerIsActive()); + + // Pause + expectBasicCancel(); + rmqt::Future<> werePaused = receiveChannel->pause(); + receiveBasicCancelOk(*receiveChannel, d_consumerTag); + EXPECT_TRUE(!!werePaused.tryResult()); + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_TRUE(receiveChannel->consumerIsPaused()); + + // Pause again + notExpectBasicCancel(); + rmqt::Future<> werePaused2x = receiveChannel->pause(); + EXPECT_TRUE(!!werePaused2x.tryResult()); + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_TRUE(receiveChannel->consumerIsPaused()); +} + +TEST_F(ReceiveChannelTests, PauseDuringCancel) +{ + bsl::shared_ptr receiveChannel = makeReceiveChannel(1); + + makeReady(*receiveChannel); + setupConsumer(*receiveChannel, d_consumerTag); + EXPECT_THAT(receiveChannel->state(), Eq(rmqamqp::Channel::READY)); + EXPECT_TRUE(receiveChannel->consumerIsActive()); + + // Start cancelling + expectBasicCancel(); + rmqt::Future<> wereCancelled = receiveChannel->cancel(); + + // Start pausing before cancel reply + notExpectBasicCancel(); + rmqt::Future<> werePaused = receiveChannel->pause(); + EXPECT_FALSE(!!werePaused.tryResult()); + EXPECT_THAT(werePaused.tryResult().error(), + Eq("Pause called with a cancel already in flight")); + + // Complete cancel + receiveBasicCancelOk(*receiveChannel, d_consumerTag); + + // Consumer gets cancelled + EXPECT_TRUE(!!wereCancelled.tryResult()); + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_FALSE(receiveChannel->consumerIsPaused()); +} + +TEST_F(ReceiveChannelTests, PauseAfterCancel) +{ + bsl::shared_ptr receiveChannel = makeReceiveChannel(1); + + makeReady(*receiveChannel); + setupConsumer(*receiveChannel, d_consumerTag); + EXPECT_THAT(receiveChannel->state(), Eq(rmqamqp::Channel::READY)); + EXPECT_TRUE(receiveChannel->consumerIsActive()); + + // Cancel + expectBasicCancel(); + rmqt::Future<> wereCancelled = receiveChannel->cancel(); + receiveBasicCancelOk(*receiveChannel, d_consumerTag); + EXPECT_TRUE(!!wereCancelled.tryResult()); + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_FALSE(receiveChannel->consumerIsPaused()); + + // Pause after cancel + notExpectBasicCancel(); + rmqt::Future<> werePaused = receiveChannel->pause(); + EXPECT_FALSE(!!werePaused.tryResult()); + EXPECT_THAT(werePaused.tryResult().error(), + Eq("Pause called with no active consumer")); + + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_FALSE(receiveChannel->consumerIsPaused()); +} + +TEST_F(ReceiveChannelTests, PausePersistsOnReset) +{ + bsl::shared_ptr receiveChannel = makeReceiveChannel(1); + + makeReady(*receiveChannel); + setupConsumer(*receiveChannel, d_consumerTag); + EXPECT_THAT(receiveChannel->state(), Eq(rmqamqp::Channel::READY)); + EXPECT_TRUE(receiveChannel->consumerIsActive()); + + // Pause + expectBasicCancel(); + rmqt::Future<> werePaused = receiveChannel->pause(); + receiveBasicCancelOk(*receiveChannel, d_consumerTag); + EXPECT_TRUE(!!werePaused.tryResult()); + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_TRUE(receiveChannel->consumerIsPaused()); + + // Disconnect + receiveChannel->reset(); + // Reconnect + makeReady(*receiveChannel); + + // Still paused + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_TRUE(receiveChannel->consumerIsPaused()); +} + +TEST_F(ReceiveChannelTests, PauseFail) +{ + bsl::shared_ptr receiveChannel = makeReceiveChannel(1); + + makeReady(*receiveChannel); + + rmqt::Future<> werePaused = receiveChannel->pause(); + rmqt::Result<> result = werePaused.tryResult(); + EXPECT_FALSE(result); + EXPECT_THAT(result.returnCode(), Ne(rmqt::TIMEOUT)); +} + +TEST_F(ReceiveChannelTests, Resume) +{ + bsl::shared_ptr receiveChannel = makeReceiveChannel(); + + makeReady(*receiveChannel); + setupConsumer(*receiveChannel, d_consumerTag); + EXPECT_THAT(receiveChannel->state(), Eq(rmqamqp::Channel::READY)); + EXPECT_TRUE(receiveChannel->consumerIsActive()); + + // Pause before resume + expectBasicCancel(); + rmqt::Future<> werePaused = receiveChannel->pause(); + receiveBasicCancelOk(*receiveChannel, d_consumerTag); + EXPECT_TRUE(!!werePaused.tryResult()); + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_TRUE(receiveChannel->consumerIsPaused()); + + // Resume + expectBasicConsume(); + rmqt::Future<> wereResumed = receiveChannel->resume(); + receiveBasicConsumeOk(*receiveChannel, d_consumerTag); + EXPECT_TRUE(!!wereResumed.tryResult()); + EXPECT_TRUE(receiveChannel->consumerIsActive()); + EXPECT_FALSE(receiveChannel->consumerIsPaused()); +} + +TEST_F(ReceiveChannelTests, ResumeDuringConsume) +{ + bsl::shared_ptr receiveChannel = makeReceiveChannel(); + + makeReady(*receiveChannel); + setupConsumer(*receiveChannel, d_consumerTag); + EXPECT_THAT(receiveChannel->state(), Eq(rmqamqp::Channel::READY)); + EXPECT_TRUE(receiveChannel->consumerIsActive()); + + // Resume + notExpectBasicConsume(); + rmqt::Future<> wereResumed2x = receiveChannel->resume(); + EXPECT_FALSE(wereResumed2x.tryResult()); + EXPECT_THAT(wereResumed2x.tryResult().error(), + Eq("Resume called on unpaused consumer")); + EXPECT_TRUE(receiveChannel->consumerIsActive()); +} + +TEST_F(ReceiveChannelTests, ResumeDuringResume) +{ + bsl::shared_ptr receiveChannel = makeReceiveChannel(); + + makeReady(*receiveChannel); + setupConsumer(*receiveChannel, d_consumerTag); + EXPECT_THAT(receiveChannel->state(), Eq(rmqamqp::Channel::READY)); + EXPECT_TRUE(receiveChannel->consumerIsActive()); + + // Pause before resume + expectBasicCancel(); + rmqt::Future<> werePaused = receiveChannel->pause(); + receiveBasicCancelOk(*receiveChannel, d_consumerTag); + EXPECT_TRUE(!!werePaused.tryResult()); + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_TRUE(receiveChannel->consumerIsPaused()); + + // Start resuming + expectBasicConsume(); + rmqt::Future<> wereResumed = receiveChannel->resume(); + EXPECT_FALSE(!!wereResumed.tryResult()); + + // Resume again before first resume completes + notExpectBasicConsume(); + rmqt::Future<> wereResumed2x = receiveChannel->resume(); + EXPECT_FALSE(wereResumed2x.tryResult()); + + // Complete first resume + receiveBasicConsumeOk(*receiveChannel, d_consumerTag); + EXPECT_TRUE(!!wereResumed.tryResult()); + EXPECT_TRUE(!!wereResumed2x.tryResult()); + EXPECT_TRUE(receiveChannel->consumerIsActive()); + EXPECT_FALSE(receiveChannel->consumerIsPaused()); +} + +TEST_F(ReceiveChannelTests, ResumeAfterResume) +{ + bsl::shared_ptr receiveChannel = makeReceiveChannel(); + + makeReady(*receiveChannel); + setupConsumer(*receiveChannel, d_consumerTag); + EXPECT_THAT(receiveChannel->state(), Eq(rmqamqp::Channel::READY)); + EXPECT_TRUE(receiveChannel->consumerIsActive()); + + // Pause before resume + expectBasicCancel(); + rmqt::Future<> werePaused = receiveChannel->pause(); + receiveBasicCancelOk(*receiveChannel, d_consumerTag); + EXPECT_TRUE(!!werePaused.tryResult()); + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_TRUE(receiveChannel->consumerIsPaused()); + + // Resume + expectBasicConsume(); + rmqt::Future<> wereResumed = receiveChannel->resume(); + receiveBasicConsumeOk(*receiveChannel, d_consumerTag); + EXPECT_TRUE(!!wereResumed.tryResult()); + EXPECT_TRUE(receiveChannel->consumerIsActive()); + EXPECT_FALSE(receiveChannel->consumerIsPaused()); + + // Resume again + notExpectBasicConsume(); + rmqt::Future<> wereResumed2x = receiveChannel->resume(); + EXPECT_FALSE(wereResumed2x.tryResult()); + EXPECT_THAT(wereResumed2x.tryResult().error(), + Eq("Resume called on unpaused consumer")); + EXPECT_TRUE(receiveChannel->consumerIsActive()); +} + +TEST_F(ReceiveChannelTests, ResumeBeforePause) +{ + bsl::shared_ptr receiveChannel = makeReceiveChannel(); + + makeReady(*receiveChannel); + setupConsumer(*receiveChannel, d_consumerTag); + EXPECT_THAT(receiveChannel->state(), Eq(rmqamqp::Channel::READY)); + EXPECT_TRUE(receiveChannel->consumerIsActive()); + + // Resume + notExpectBasicConsume(); + rmqt::Future<> wereResumed = receiveChannel->resume(); + EXPECT_FALSE(!!wereResumed.tryResult()); + EXPECT_THAT(wereResumed.tryResult().error(), + Eq("Resume called on unpaused consumer")); + EXPECT_TRUE(receiveChannel->consumerIsActive()); +} + +TEST_F(ReceiveChannelTests, ResumeDuringPause) +{ + bsl::shared_ptr receiveChannel = makeReceiveChannel(); + + makeReady(*receiveChannel); + setupConsumer(*receiveChannel, d_consumerTag); + EXPECT_THAT(receiveChannel->state(), Eq(rmqamqp::Channel::READY)); + EXPECT_TRUE(receiveChannel->consumerIsActive()); + + // Start pausing + expectBasicCancel(); + rmqt::Future<> werePaused = receiveChannel->pause(); + EXPECT_FALSE(!!werePaused.tryResult()); + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_FALSE(receiveChannel->consumerIsPaused()); + + // Start resuming before pause reply + notExpectBasicConsume(); + rmqt::Future<> wereResumed = receiveChannel->resume(); + EXPECT_FALSE(!!wereResumed.tryResult()); + EXPECT_THAT(wereResumed.tryResult().error(), + Eq("Resume called with a pause already in flight")); +} + +TEST_F(ReceiveChannelTests, ResumeDuringCancel) +{ + bsl::shared_ptr receiveChannel = makeReceiveChannel(); + + makeReady(*receiveChannel); + setupConsumer(*receiveChannel, d_consumerTag); + EXPECT_THAT(receiveChannel->state(), Eq(rmqamqp::Channel::READY)); + EXPECT_TRUE(receiveChannel->consumerIsActive()); + + // Start cancelling + expectBasicCancel(); + rmqt::Future<> wereCancelled = receiveChannel->cancel(); + EXPECT_FALSE(!!wereCancelled.tryResult()); + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_FALSE(receiveChannel->consumerIsPaused()); + + // Resume before cancel reply + notExpectBasicConsume(); + rmqt::Future<> wereResumed = receiveChannel->resume(); + EXPECT_FALSE(!!wereResumed.tryResult()); + EXPECT_THAT(wereResumed.tryResult().error(), + Eq("Resume called with a cancel already in flight")); +} + +TEST_F(ReceiveChannelTests, ResumeAfterCancel) +{ + bsl::shared_ptr receiveChannel = makeReceiveChannel(); + + makeReady(*receiveChannel); + setupConsumer(*receiveChannel, d_consumerTag); + EXPECT_THAT(receiveChannel->state(), Eq(rmqamqp::Channel::READY)); + EXPECT_TRUE(receiveChannel->consumerIsActive()); + + // Cancel + expectBasicCancel(); + rmqt::Future<> wereCancelled = receiveChannel->cancel(); + receiveBasicCancelOk(*receiveChannel, d_consumerTag); + EXPECT_TRUE(!!wereCancelled.tryResult()); + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_FALSE(receiveChannel->consumerIsPaused()); + + // Resume + notExpectBasicConsume(); + rmqt::Future<> wereResumed = receiveChannel->resume(); + EXPECT_FALSE(!!wereResumed.tryResult()); + EXPECT_THAT(wereResumed.tryResult().error(), + Eq("Resume called with no existing consumer")); +} + class ReceiveChannelHungTests : public ReceiveChannelTests {}; TEST_F(ReceiveChannelHungTests, HungQoSOk) @@ -1109,7 +1916,7 @@ TEST_F(ReceiveChannelHungTests, HungQoSOk) openAndSendTopology(*rc); // Expect that when we send the queue declare, we get a QoS message. - qosExpectations(); + expectBasicQoS(); queueDeclareReply(*rc); // If the channel is never sent a QoSOk, the hung callback should be called. @@ -1119,6 +1926,33 @@ TEST_F(ReceiveChannelHungTests, HungQoSOk) d_timerFactory->step_time(bsls::TimeInterval(HUNG_TIMEOUT_SECONDS)); } +TEST_F(ReceiveChannelTests, NotHungQoSOkOnPause) +{ + bsl::shared_ptr receiveChannel = makeReceiveChannel(); + + makeReady(*receiveChannel); + setupConsumer(*receiveChannel, d_consumerTag); + EXPECT_THAT(receiveChannel->state(), Eq(rmqamqp::Channel::READY)); + EXPECT_TRUE(receiveChannel->consumerIsActive()); + + // Pause + expectBasicCancel(); + rmqt::Future<> werePaused = receiveChannel->pause(); + receiveBasicCancelOk(*receiveChannel, d_consumerTag); + EXPECT_TRUE(!!werePaused.tryResult()); + EXPECT_FALSE(receiveChannel->consumerIsActive()); + EXPECT_TRUE(receiveChannel->consumerIsPaused()); + + // Reset + receiveChannel->reset(); + + // Recovery on pause should not trigger hung timer + makeReady(*receiveChannel); + const int HUNG_TIMEOUT_SECONDS = 65; + EXPECT_CALL(d_callback, onHungTimerCallback()).Times(0); + d_timerFactory->step_time(bsls::TimeInterval(HUNG_TIMEOUT_SECONDS)); +} + TEST_F(ReceiveChannelHungTests, HungConsumeOk) { // Channel is setup diff --git a/src/tests/rmqt/rmqt_consumerconfig.t.cpp b/src/tests/rmqt/rmqt_consumerconfig.t.cpp index d8a38c4f..edc98bb3 100644 --- a/src/tests/rmqt/rmqt_consumerconfig.t.cpp +++ b/src/tests/rmqt/rmqt_consumerconfig.t.cpp @@ -36,6 +36,7 @@ TEST(ConsumerConfig, DefaultConstructorValues) EXPECT_EQ(config.threadpool(), static_cast(NULL)); EXPECT_EQ(config.exclusiveFlag(), rmqt::Exclusive::OFF); EXPECT_FALSE(config.consumerPriority()); + EXPECT_TRUE(config.consumeOnlyFromHealthyHost()); } TEST(ConsumerConfig, Constructor) @@ -63,3 +64,19 @@ TEST(ConsumerConfig, SetConsumerPriority) EXPECT_TRUE(config.consumerPriority()); EXPECT_EQ(config.consumerPriority(), 5); } + +TEST(ConsumerConfig, SetConsumeOnlyFromHealthyHostWorksAndIsTrueByDefault) +{ + rmqt::ConsumerConfig config; + + // Default should be true + EXPECT_TRUE(config.consumeOnlyFromHealthyHost()); + + // User can opt out by setting to false + config.setConsumeOnlyFromHealthyHost(false); + EXPECT_FALSE(config.consumeOnlyFromHealthyHost()); + + // Can also set back to true + config.setConsumeOnlyFromHealthyHost(true); + EXPECT_TRUE(config.consumeOnlyFromHealthyHost()); +} diff --git a/src/tests/rmqtestutil/CMakeLists.txt b/src/tests/rmqtestutil/CMakeLists.txt index 783289dc..8112a07a 100644 --- a/src/tests/rmqtestutil/CMakeLists.txt +++ b/src/tests/rmqtestutil/CMakeLists.txt @@ -13,7 +13,7 @@ add_library(rmqtestutil rmqtestutil_timedmetric.cpp ) -target_link_libraries(rmqtestutil PUBLIC +target_link_libraries(rmqtestutil PUBLIC bsl bdl rmqamqpt diff --git a/src/tests/rmqtestutil/rmqtestutil_mockchannel.t.h b/src/tests/rmqtestutil/rmqtestutil_mockchannel.t.h index 3d13c0a8..82164be4 100644 --- a/src/tests/rmqtestutil/rmqtestutil_mockchannel.t.h +++ b/src/tests/rmqtestutil/rmqtestutil_mockchannel.t.h @@ -16,8 +16,8 @@ #ifndef INCLUDED_RMQTESTUTIL_MOCKCHANNEL_T #define INCLUDED_RMQTESTUTIL_MOCKCHANNEL_T -#include #include + #include #include #include @@ -33,6 +33,8 @@ #include #include +#include + namespace BloombergLP { namespace rmqtestutil { @@ -103,7 +105,8 @@ class MockReceiveChannel : public rmqamqp::ReceiveChannel { "blankvhost", ackQueue, timerFactory->createWithCallback(&noopHungTimerCallback), - &noopHungCallback) + &noopHungCallback, + false) , d_timerFactory(timerFactory) { ON_CALL(*this, consume(testing::_, testing::_, testing::_)) @@ -126,6 +129,8 @@ class MockReceiveChannel : public rmqamqp::ReceiveChannel { const MessageCallback& onNewMessage, const bsl::string&)); MOCK_METHOD0(consumeAckBatchFromQueue, void()); + MOCK_METHOD0(resume, rmqt::Future<>()); + MOCK_METHOD0(pause, rmqt::Future<>()); MOCK_METHOD0(cancel, rmqt::Future<>()); MOCK_METHOD0(drain, rmqt::Future<>()); MOCK_CONST_METHOD1(getMessagesOlderThan, @@ -155,7 +160,8 @@ class MockUpdateReceiveChannel : public rmqamqp::ReceiveChannel { "blankvhost", ackQueue, timerFactory->createWithCallback(&noopHungTimerCallback), - &noopHungCallback) + &noopHungCallback, + false) , d_timerFactory(timerFactory) { ready();