Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/rmq/rmqa/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ add_library(rmqa OBJECT
rmqa_vhostimpl.cpp
)


target_link_libraries(rmqa PUBLIC
bsl
bdl
Expand Down
1 change: 0 additions & 1 deletion src/rmq/rmqa/rmqa_consumerimpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ namespace BloombergLP {
namespace rmqa {
namespace {
BALL_LOG_SET_NAMESPACE_CATEGORY("RMQA.CONSUMERIMPL")

} // namespace

ConsumerImpl::Factory::~Factory() {}
Expand Down
74 changes: 62 additions & 12 deletions src/rmq/rmqa/rmqa_rabbitcontextimpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
#include <rmqa_vhostimpl.h>

#include <rmqamqp_connection.h>
#include <rmqamqp_metrics.h>
#include <rmqio_eventloop.h>
#include <rmqio_timer.h>
#include <rmqio_watchdog.h>
#include <rmqp_connection.h>
#include <rmqt_endpoint.h>
#include <rmqt_future.h>
#include <rmqt_hosthealthconfig.h>
#include <rmqt_vhostinfo.h>

#include <ball_log.h>
Expand All @@ -43,6 +45,7 @@

#include <bsl_sstream.h>
#include <bsl_string.h>
#include <bsl_utility.h>
#include <bsl_vector.h>

namespace BloombergLP {
Expand Down Expand Up @@ -70,7 +73,10 @@ void handleErrorCbOnEventLoop(bdlmt::ThreadPool* threadPool,

void startFirstConnection(
const bsl::weak_ptr<rmqamqp::Connection>& weakConn,
const rmqamqp::Connection::ConnectedCallback& callback)
const rmqamqp::Connection::ConnectedCallback& callback,
const bsl::shared_ptr<rmqamqp::HostHealthMonitor>& hostHealthMonitor,
const bsl::shared_ptr<rmqp::MetricPublisher>& metricPublisher,
const bsl::shared_ptr<rmqt::Endpoint>& endpoint)
{
bsl::shared_ptr<rmqamqp::Connection> amqpConn = weakConn.lock();
if (!amqpConn) {
Expand All @@ -79,6 +85,27 @@ void startFirstConnection(
}

amqpConn->startFirstConnection(callback);

bsl::vector<bsl::pair<bsl::string, bsl::string> > vhostTags;
vhostTags.push_back(bsl::pair<bsl::string, bsl::string>(
rmqamqp::Metrics::VHOST_TAG, endpoint->vhost()));

if (hostHealthMonitor) {
BALL_LOG_INFO << "Registering connection '"
<< amqpConn->connectionDebugName()
<< "' with host health monitor.";
hostHealthMonitor->registerConnection(
bsl::weak_ptr<rmqamqp::Connection>(amqpConn));

// Publish health-aware vhost created counter
metricPublisher->publishCounter(
rmqamqp::Metrics::HEALTH_AWARE_VHOST_CREATED, 1.0, vhostTags);
}
else {
// Publish health-unaware vhost created counter
metricPublisher->publishCounter(
rmqamqp::Metrics::HEALTH_UNAWARE_VHOST_CREATED, 1.0, vhostTags);
}
}

void initiateConnection(
Expand Down Expand Up @@ -142,7 +169,7 @@ RabbitContextImpl::RabbitContextImpl(
bslma::ManagedPtr<rmqio::EventLoop> eventLoop,
const rmqa::RabbitContextOptions& options)
: d_eventLoop(eventLoop)
, d_watchDog(bsl::make_shared<rmqio::WatchDog>(
, d_connectionWatchDog(bsl::make_shared<rmqio::WatchDog>(
bsls::TimeInterval(DEFAULT_WATCHDOG_PERIOD)))
, d_threadPool(options.threadpool())
, d_hostedThreadPool()
Expand All @@ -153,26 +180,42 @@ RabbitContextImpl::RabbitContextImpl(
bdlf::PlaceHolders::_2))
, d_connectionMonitor(
bsl::make_shared<ConnectionMonitor>(options.messageProcessingTimeout()))
, d_metricPublisher(options.metricPublisher()
? options.metricPublisher()
: bsl::shared_ptr<rmqp::MetricPublisher>(
bsl::make_shared<NoOpMetricPublisher>()))
, d_hostHealthMonitor()
, d_connectionFactory()
, d_tunables(options.tunables())
, d_consumerTracing(options.consumerTracing())
, d_producerTracing(options.producerTracing())
{
bsl::shared_ptr<rmqp::MetricPublisher> metricPublisher =
options.metricPublisher();
if (!metricPublisher) {
metricPublisher = bsl::make_shared<NoOpMetricPublisher>();

const bool isHostHealthMonitoringEnabled =
options.hostHealthConfig().has_value();

// Host health monitoring enabled
if (isHostHealthMonitoringEnabled) {
const rmqt::HostHealthConfig& hostHealthConfig =
*options.hostHealthConfig();

d_hostHealthMonitor = bsl::make_shared<rmqamqp::HostHealthMonitor>(
hostHealthConfig, d_metricPublisher.get());
d_hostHealthMonitor->start(d_eventLoop->timerFactory());
}

d_connectionFactory =
bslma::ManagedPtrUtil::makeManaged<rmqamqp::Connection::Factory>(
d_eventLoop->resolver(
options.shuffleConnectionEndpoints().value_or(false)),
d_eventLoop->timerFactory(),
d_onError,
metricPublisher,
d_metricPublisher,
d_connectionMonitor,
options.clientProperties(),
options.connectionErrorThreshold());
options.connectionErrorThreshold(),
isHostHealthMonitoringEnabled);

if (!d_threadPool) {
bslmt::ThreadAttributes attributes;
attributes.setThreadName(DEFAULT_THREADPOOL_WORKER_NAME);
Expand All @@ -188,8 +231,10 @@ RabbitContextImpl::RabbitContextImpl(
BSLS_REVIEW(d_threadPool->enabled());

d_eventLoop->start();
d_watchDog->addTask(bsl::weak_ptr<ConnectionMonitor>(d_connectionMonitor));
d_watchDog->start(d_eventLoop->timerFactory());

d_connectionWatchDog->addTask(
bsl::weak_ptr<ConnectionMonitor>(d_connectionMonitor));
d_connectionWatchDog->start(d_eventLoop->timerFactory());
}

RabbitContextImpl::~RabbitContextImpl()
Expand All @@ -202,9 +247,11 @@ RabbitContextImpl::~RabbitContextImpl()

d_connectionFactory.reset();

d_hostHealthMonitor.reset();

d_hostedThreadPool.reset();

d_watchDog.reset();
d_connectionWatchDog.reset();

const bool eventLoopShutdownResult = d_eventLoop->waitForEventLoopExit(60);

Expand Down Expand Up @@ -370,7 +417,10 @@ rmqt::Future<rmqp::Connection> RabbitContextImpl::createNewConnection(
d_eventLoop->post(
bdlf::BindUtil::bind(&startFirstConnection,
bsl::weak_ptr<rmqamqp::Connection>(amqpConn),
cb));
cb,
d_hostHealthMonitor,
d_metricPublisher,
endpoint));

return futurePair.second;
}
Expand Down
5 changes: 4 additions & 1 deletion src/rmq/rmqa/rmqa_rabbitcontextimpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <rmqa_rabbitcontextoptions.h>

#include <rmqamqp_connection.h>
#include <rmqamqp_hosthealthmonitor.h>
#include <rmqio_eventloop.h>
#include <rmqio_watchdog.h>
#include <rmqp_connection.h>
Expand Down Expand Up @@ -71,11 +72,13 @@ class RabbitContextImpl : public rmqp::RabbitContext {
private:
static const int DEFAULT_WATCHDOG_PERIOD = 60;
bslma::ManagedPtr<rmqio::EventLoop> d_eventLoop;
bsl::shared_ptr<rmqio::WatchDog> d_watchDog;
bsl::shared_ptr<rmqio::WatchDog> d_connectionWatchDog;
bdlmt::ThreadPool* d_threadPool;
bslma::ManagedPtr<bdlmt::ThreadPool> d_hostedThreadPool;
rmqt::ErrorCallback d_onError;
bsl::shared_ptr<ConnectionMonitor> d_connectionMonitor;
bsl::shared_ptr<rmqp::MetricPublisher> d_metricPublisher;
bsl::shared_ptr<rmqamqp::HostHealthMonitor> d_hostHealthMonitor;
bslma::ManagedPtr<rmqamqp::Connection::Factory> d_connectionFactory;
rmqt::Tunables d_tunables;
bsl::shared_ptr<rmqp::ConsumerTracing> d_consumerTracing;
Expand Down
8 changes: 8 additions & 0 deletions src/rmq/rmqa/rmqa_rabbitcontextoptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ RabbitContextOptions::RabbitContextOptions()
, d_tunables()
, d_connectionErrorThreshold()
, d_shuffleConnectionEndpoints()
, d_hostHealthConfig()
{
populateUsefulInformation(&d_clientProperties);
}
Expand Down Expand Up @@ -140,6 +141,13 @@ RabbitContextOptions& RabbitContextOptions::setShuffleConnectionEndpoints(
return *this;
}

RabbitContextOptions& RabbitContextOptions::setHostHealthConfig(
const rmqt::HostHealthConfig& hostHealthConfig)
{
d_hostHealthConfig = hostHealthConfig;
return *this;
}

#ifdef USES_LIBRMQ_EXPERIMENTAL_FEATURES
RabbitContextOptions&
RabbitContextOptions::setTunable(const bsl::string& tunable)
Expand Down
27 changes: 27 additions & 0 deletions src/rmq/rmqa/rmqa_rabbitcontextoptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <rmqp_consumertracing.h>
#include <rmqp_producertracing.h>
#include <rmqt_fieldvalue.h>
#include <rmqt_hosthealthconfig.h>
#include <rmqt_properties.h>
#include <rmqt_result.h>

Expand Down Expand Up @@ -141,6 +142,23 @@ class RabbitContextOptions {
RabbitContextOptions&
setShuffleConnectionEndpoints(bool shuffleConnectionEndpoints);

/// \brief Set host health config for connections created by this context to
/// ensure that RabbitMQ consumers are connected from healthy hosts.
/// When host health config is set, the host health monitor is created and
/// consumers with \c consumeOnlyFromHealthyHost enabled (the default) will
/// pause message delivery when the host becomes unhealthy and resume when
/// the host is deemed healthy again.
/// \note By default, \c ConsumerConfig::consumeOnlyFromHealthyHost is true,
/// meaning consumers automatically participate in host health monitoring
/// when host health config is set at the context level. Consumers can
/// opt out by calling \c
/// ConsumerConfig::setConsumeOnlyFromHealthyHost(false). If host health
/// config is not set, \c consumeOnlyFromHealthyHost has no effect.
///
/// \param hostHealthConfig configuration for host health monitoring
RabbitContextOptions&
setHostHealthConfig(const rmqt::HostHealthConfig& hostHealthConfig);

bdlmt::ThreadPool* threadpool() const { return d_threadpool; }

const bsl::shared_ptr<rmqp::MetricPublisher>& metricPublisher() const
Expand Down Expand Up @@ -182,6 +200,14 @@ class RabbitContextOptions {
return d_shuffleConnectionEndpoints;
}

/// \brief Get the host health config. If not set, host health monitoring is
/// disabled. By default, host health monitoring is disabled.
/// \return The host health config
const bsl::optional<rmqt::HostHealthConfig>& hostHealthConfig() const
{
return d_hostHealthConfig;
}

#ifdef USES_LIBRMQ_EXPERIMENTAL_FEATURES
RabbitContextOptions& setTunable(const bsl::string& tunable);
#endif
Expand All @@ -198,6 +224,7 @@ class RabbitContextOptions {
bsl::shared_ptr<rmqp::ConsumerTracing> d_consumerTracing;
bsl::shared_ptr<rmqp::ProducerTracing> d_producerTracing;
bsl::optional<bool> d_shuffleConnectionEndpoints;
bsl::optional<rmqt::HostHealthConfig> d_hostHealthConfig;
};

} // namespace rmqa
Expand Down
1 change: 1 addition & 0 deletions src/rmq/rmqamqp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ add_library(rmqamqp OBJECT
rmqamqp_framer.cpp
rmqamqp_heartbeatmanager.cpp
rmqamqp_heartbeatmanagerimpl.cpp
rmqamqp_hosthealthmonitor.cpp
rmqamqp_message.cpp
rmqamqp_messagestore.cpp
rmqamqp_messagewithroute.cpp
Expand Down
4 changes: 4 additions & 0 deletions src/rmq/rmqamqp/rmqamqp_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -475,9 +475,13 @@ void Channel::retry(const bsl::weak_ptr<Channel>& weakSelf)

self->open();
}

void Channel::onReset() {}

void Channel::onFlowAllowed() {}

void Channel::onOpen() { ready(); }

void Channel::ready()
{
BALL_LOG_TRACE << "Channel Ready";
Expand Down
39 changes: 38 additions & 1 deletion src/rmq/rmqamqp/rmqamqp_channelfactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,20 @@

#include <rmqamqp_channelfactory.h>

#include <rmqamqp_metrics.h>

#include <bsl_utility.h>
#include <bsl_vector.h>

namespace BloombergLP {
namespace rmqamqp {

ChannelFactory::ChannelFactory(
const ChannelOnOpenState receiveChannelOnOpenState)
: d_receiveChannelOnOpenState(receiveChannelOnOpenState)
{
}

bsl::shared_ptr<ReceiveChannel> ChannelFactory::createReceiveChannel(
const rmqt::Topology& topology,
const Channel::AsyncWriteCallback& onAsyncWrite,
Expand All @@ -29,6 +40,25 @@ bsl::shared_ptr<ReceiveChannel> ChannelFactory::createReceiveChannel(
const bsl::shared_ptr<rmqio::Timer>& hungProgressTimer,
const Channel::HungChannelCallback& connErrorCb)
{
const bool channelPausedOnOpen =
d_receiveChannelOnOpenState == PAUSED ||
(consumerConfig.consumeOnlyFromHealthyHost() &&
d_receiveChannelOnOpenState == PAUSED_HOST_HEALTH_AWARE);

// Publish health-aware or health-unaware consumer created counter
bsl::vector<bsl::pair<bsl::string, bsl::string> > vhostTags;
vhostTags.push_back(
bsl::pair<bsl::string, bsl::string>(Metrics::VHOST_TAG, vhost));

if (consumerConfig.consumeOnlyFromHealthyHost()) {
metricPublisher->publishCounter(
Metrics::HEALTH_AWARE_CONSUMER_CREATED, 1.0, vhostTags);
}
else {
metricPublisher->publishCounter(
Metrics::HEALTH_UNAWARE_CONSUMER_CREATED, 1.0, vhostTags);
}

return bsl::make_shared<ReceiveChannel>(topology,
onAsyncWrite,
retryHandler,
Expand All @@ -37,7 +67,8 @@ bsl::shared_ptr<ReceiveChannel> ChannelFactory::createReceiveChannel(
vhost,
ackQueue,
hungProgressTimer,
connErrorCb);
connErrorCb,
channelPausedOnOpen);
}

bsl::shared_ptr<SendChannel> ChannelFactory::createSendChannel(
Expand All @@ -60,5 +91,11 @@ bsl::shared_ptr<SendChannel> ChannelFactory::createSendChannel(
connErrorCb);
}

void ChannelFactory::setReceiveChannelOnOpenState(
const ChannelOnOpenState channelOnOpenState)
{
d_receiveChannelOnOpenState = channelOnOpenState;
}

} // namespace rmqamqp
} // namespace BloombergLP
13 changes: 13 additions & 0 deletions src/rmq/rmqamqp/rmqamqp_channelfactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ namespace rmqamqp {

class ChannelFactory {
public:
enum ChannelOnOpenState {
PAUSED,
PAUSED_HOST_HEALTH_AWARE,
NOT_PAUSED,
};

ChannelFactory(const ChannelOnOpenState channelOnOpenState = NOT_PAUSED);
virtual ~ChannelFactory() {};

virtual bsl::shared_ptr<ReceiveChannel> createReceiveChannel(
Expand All @@ -58,6 +65,12 @@ class ChannelFactory {
const bsl::string& vhost,
const bsl::shared_ptr<rmqio::Timer>& hungProgressTimer,
const Channel::HungChannelCallback& connErrorCb);

void setReceiveChannelOnOpenState(
const ChannelOnOpenState receiveChannelOnOpenState);

private:
ChannelOnOpenState d_receiveChannelOnOpenState;
};

} // namespace rmqamqp
Expand Down
Loading
Loading