From 91aa2baea37e3ece1d24de07407693dead85a235 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 20 May 2026 14:41:14 +0800 Subject: [PATCH 1/4] Add new APIs to carry error messages when creating producer, consumer or reader --- include/pulsar/Client.h | 51 ++++++- include/pulsar/Result.h | 7 + lib/Client.cc | 99 +++++++++++++ lib/ClientConnection.cc | 6 +- lib/ClientConnection.h | 3 +- lib/ClientImpl.cc | 251 +++++++++++++++++++++++++++++++-- lib/ClientImpl.h | 29 ++++ lib/ConsumerImpl.cc | 16 ++- lib/ConsumerImpl.h | 5 + lib/ConsumerImplBase.h | 1 + lib/Future.h | 2 + lib/MockServer.h | 28 +++- lib/MultiTopicsConsumerImpl.cc | 18 +++ lib/MultiTopicsConsumerImpl.h | 2 + lib/PartitionedProducerImpl.cc | 10 ++ lib/PartitionedProducerImpl.h | 2 + lib/PendingRequest.h | 5 + lib/ProducerImpl.cc | 6 + lib/ProducerImpl.h | 2 + lib/ProducerImplBase.h | 1 + lib/ReaderImpl.cc | 4 + lib/ReaderImpl.h | 1 + tests/ClientTest.cc | 62 ++++++++ 23 files changed, 589 insertions(+), 22 deletions(-) diff --git a/include/pulsar/Client.h b/include/pulsar/Client.h index e9813e3d..be3f33ec 100644 --- a/include/pulsar/Client.h +++ b/include/pulsar/Client.h @@ -36,11 +36,15 @@ #include #include +#include namespace pulsar { typedef std::function CreateProducerCallback; +typedef std::function)> CreateProducerCallbackV2; typedef std::function SubscribeCallback; +typedef std::function)> SubscribeCallbackV2; typedef std::function ReaderCallback; +typedef std::function)> ReaderCallbackV2; typedef std::function TableViewCallback; typedef std::function&)> GetPartitionsCallback; typedef std::function CloseCallback; @@ -108,7 +112,9 @@ class PULSAR_PUBLIC Client { * @return ResultOk if the producer has been successfully created * @return ResultError if there was an error */ - Result createProducer(const std::string& topic, const ProducerConfiguration& conf, Producer& producer); + [[deprecated("use createProducerV2 instead")]] Result createProducer(const std::string& topic, + const ProducerConfiguration& conf, + Producer& producer); /** * Asynchronously create a producer with the default ProducerConfiguration for publishing on a specific @@ -118,7 +124,18 @@ class PULSAR_PUBLIC Client { * @param callback the callback that is triggered when the producer is created successfully or not * @param callback Callback function that is invoked when the operation is completed */ - void createProducerAsync(const std::string& topic, const CreateProducerCallback& callback); + [[deprecated("use createProducerAsyncV2 instead")]] void createProducerAsync( + const std::string& topic, const CreateProducerCallback& callback); + + std::variant createProducerV2(const std::string& topic); + + std::variant createProducerV2(const std::string& topic, + const ProducerConfiguration& conf); + + void createProducerAsyncV2(const std::string& topic, const CreateProducerCallbackV2& callback); + + void createProducerAsyncV2(const std::string& topic, const ProducerConfiguration& conf, + const CreateProducerCallbackV2& callback); /** * Asynchronously create a producer with the customized ProducerConfiguration for publishing on a specific @@ -151,6 +168,11 @@ class PULSAR_PUBLIC Client { Result subscribe(const std::string& topic, const std::string& subscriptionName, const ConsumerConfiguration& conf, Consumer& consumer); + std::variant subscribeV2(const std::string& topic, const std::string& subscriptionName); + + std::variant subscribeV2(const std::string& topic, const std::string& subscriptionName, + const ConsumerConfiguration& conf); + /** * Asynchronously subscribe to a given topic and subscription combination with the default * ConsumerConfiguration @@ -163,6 +185,9 @@ class PULSAR_PUBLIC Client { void subscribeAsync(const std::string& topic, const std::string& subscriptionName, const SubscribeCallback& callback); + void subscribeAsyncV2(const std::string& topic, const std::string& subscriptionName, + const SubscribeCallbackV2& callback); + /** * Asynchronously subscribe to a given topic and subscription combination with the customized * ConsumerConfiguration @@ -176,6 +201,9 @@ class PULSAR_PUBLIC Client { void subscribeAsync(const std::string& topic, const std::string& subscriptionName, const ConsumerConfiguration& conf, const SubscribeCallback& callback); + void subscribeAsyncV2(const std::string& topic, const std::string& subscriptionName, + const ConsumerConfiguration& conf, const SubscribeCallbackV2& callback); + /** * Subscribe to multiple topics under the same namespace. * @@ -197,6 +225,13 @@ class PULSAR_PUBLIC Client { Result subscribe(const std::vector& topics, const std::string& subscriptionName, const ConsumerConfiguration& conf, Consumer& consumer); + std::variant subscribeV2(const std::vector& topics, + const std::string& subscriptionName); + + std::variant subscribeV2(const std::vector& topics, + const std::string& subscriptionName, + const ConsumerConfiguration& conf); + /** * Asynchronously subscribe to a list of topics and subscription combination using the default ConsumerConfiguration @@ -210,6 +245,9 @@ class PULSAR_PUBLIC Client { void subscribeAsync(const std::vector& topics, const std::string& subscriptionName, const SubscribeCallback& callback); + void subscribeAsyncV2(const std::vector& topics, const std::string& subscriptionName, + const SubscribeCallbackV2& callback); + /** * Asynchronously subscribe to a list of topics and subscription combination using the customized * ConsumerConfiguration @@ -223,6 +261,9 @@ class PULSAR_PUBLIC Client { void subscribeAsync(const std::vector& topics, const std::string& subscriptionName, const ConsumerConfiguration& conf, const SubscribeCallback& callback); + void subscribeAsyncV2(const std::vector& topics, const std::string& subscriptionName, + const ConsumerConfiguration& conf, const SubscribeCallbackV2& callback); + /** * Subscribe to multiple topics, which match given regexPattern, under the same namespace. */ @@ -291,6 +332,9 @@ class PULSAR_PUBLIC Client { Result createReader(const std::string& topic, const MessageId& startMessageId, const ReaderConfiguration& conf, Reader& reader); + std::variant createReaderV2(const std::string& topic, const MessageId& startMessageId, + const ReaderConfiguration& conf); + /** * Asynchronously create a topic reader with the customized ReaderConfiguration for reading messages from * the specified topic. @@ -320,6 +364,9 @@ class PULSAR_PUBLIC Client { void createReaderAsync(const std::string& topic, const MessageId& startMessageId, const ReaderConfiguration& conf, const ReaderCallback& callback); + void createReaderAsyncV2(const std::string& topic, const MessageId& startMessageId, + const ReaderConfiguration& conf, const ReaderCallbackV2& callback); + /** * Create a table view with given {@code TableViewConfiguration} for specified topic. * diff --git a/include/pulsar/Result.h b/include/pulsar/Result.h index a6c30d4c..7b1bb1a6 100644 --- a/include/pulsar/Result.h +++ b/include/pulsar/Result.h @@ -23,6 +23,7 @@ #include #include +#include namespace pulsar { @@ -101,6 +102,12 @@ enum Result : int8_t PULSAR_PUBLIC const char* strResult(Result result); PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, pulsar::Result result); + +struct Error { + Result result; + std::string message; +}; + } // namespace pulsar #endif /* ERROR_HPP_ */ diff --git a/lib/Client.cc b/lib/Client.cc index 48e92dda..95b71c49 100644 --- a/lib/Client.cc +++ b/lib/Client.cc @@ -63,6 +63,31 @@ void Client::createProducerAsync(const std::string& topic, const CreateProducerC createProducerAsync(topic, ProducerConfiguration(), callback); } +std::variant Client::createProducerV2(const std::string& topic) { + return createProducerV2(topic, ProducerConfiguration()); +} + +std::variant Client::createProducerV2(const std::string& topic, + const ProducerConfiguration& conf) { + Promise > promise; + createProducerAsyncV2(topic, conf, + [promise](std::variant result) { promise.setValue(result); }); + Future > future = promise.getFuture(); + + std::variant result; + future.get(result); + return result; +} + +void Client::createProducerAsyncV2(const std::string& topic, const CreateProducerCallbackV2& callback) { + createProducerAsyncV2(topic, ProducerConfiguration(), callback); +} + +void Client::createProducerAsyncV2(const std::string& topic, const ProducerConfiguration& conf, + const CreateProducerCallbackV2& callback) { + impl_->createProducerAsyncV2(topic, conf, callback); +} + void Client::createProducerAsync(const std::string& topic, const ProducerConfiguration& conf, const CreateProducerCallback& callback) { impl_->createProducerAsync(topic, conf, callback); @@ -81,17 +106,46 @@ Result Client::subscribe(const std::string& topic, const std::string& subscripti return future.get(consumer); } +std::variant Client::subscribeV2(const std::string& topic, + const std::string& subscriptionName) { + return subscribeV2(topic, subscriptionName, ConsumerConfiguration()); +} + +std::variant Client::subscribeV2(const std::string& topic, + const std::string& subscriptionName, + const ConsumerConfiguration& conf) { + Promise > promise; + subscribeAsyncV2(topic, subscriptionName, conf, + [promise](std::variant result) { promise.setValue(result); }); + Future > future = promise.getFuture(); + + std::variant result; + future.get(result); + return result; +} + void Client::subscribeAsync(const std::string& topic, const std::string& subscriptionName, const SubscribeCallback& callback) { subscribeAsync(topic, subscriptionName, ConsumerConfiguration(), callback); } +void Client::subscribeAsyncV2(const std::string& topic, const std::string& subscriptionName, + const SubscribeCallbackV2& callback) { + subscribeAsyncV2(topic, subscriptionName, ConsumerConfiguration(), callback); +} + void Client::subscribeAsync(const std::string& topic, const std::string& subscriptionName, const ConsumerConfiguration& conf, const SubscribeCallback& callback) { LOG_INFO("Subscribing on Topic :" << topic); impl_->subscribeAsync(topic, subscriptionName, conf, callback); } +void Client::subscribeAsyncV2(const std::string& topic, const std::string& subscriptionName, + const ConsumerConfiguration& conf, const SubscribeCallbackV2& callback) { + LOG_INFO("Subscribing on Topic :" << topic); + impl_->subscribeAsyncV2(topic, subscriptionName, conf, callback); +} + Result Client::subscribe(const std::vector& topics, const std::string& subscriptionName, Consumer& consumer) { return subscribe(topics, subscriptionName, ConsumerConfiguration(), consumer); @@ -106,16 +160,44 @@ Result Client::subscribe(const std::vector& topics, const std::stri return future.get(consumer); } +std::variant Client::subscribeV2(const std::vector& topics, + const std::string& subscriptionName) { + return subscribeV2(topics, subscriptionName, ConsumerConfiguration()); +} + +std::variant Client::subscribeV2(const std::vector& topics, + const std::string& subscriptionName, + const ConsumerConfiguration& conf) { + Promise > promise; + subscribeAsyncV2(topics, subscriptionName, conf, + [promise](std::variant result) { promise.setValue(result); }); + Future > future = promise.getFuture(); + + std::variant result; + future.get(result); + return result; +} + void Client::subscribeAsync(const std::vector& topics, const std::string& subscriptionName, const SubscribeCallback& callback) { subscribeAsync(topics, subscriptionName, ConsumerConfiguration(), callback); } +void Client::subscribeAsyncV2(const std::vector& topics, const std::string& subscriptionName, + const SubscribeCallbackV2& callback) { + subscribeAsyncV2(topics, subscriptionName, ConsumerConfiguration(), callback); +} + void Client::subscribeAsync(const std::vector& topics, const std::string& subscriptionName, const ConsumerConfiguration& conf, const SubscribeCallback& callback) { impl_->subscribeAsync(topics, subscriptionName, conf, callback); } +void Client::subscribeAsyncV2(const std::vector& topics, const std::string& subscriptionName, + const ConsumerConfiguration& conf, const SubscribeCallbackV2& callback) { + impl_->subscribeAsyncV2(topics, subscriptionName, conf, callback); +} + Result Client::subscribeWithRegex(const std::string& regexPattern, const std::string& subscriptionName, Consumer& consumer) { return subscribeWithRegex(regexPattern, subscriptionName, ConsumerConfiguration(), consumer); @@ -149,11 +231,28 @@ Result Client::createReader(const std::string& topic, const MessageId& startMess return future.get(reader); } +std::variant Client::createReaderV2(const std::string& topic, const MessageId& startMessageId, + const ReaderConfiguration& conf) { + Promise > promise; + createReaderAsyncV2(topic, startMessageId, conf, + [promise](std::variant result) { promise.setValue(result); }); + Future > future = promise.getFuture(); + + std::variant result; + future.get(result); + return result; +} + void Client::createReaderAsync(const std::string& topic, const MessageId& startMessageId, const ReaderConfiguration& conf, const ReaderCallback& callback) { impl_->createReaderAsync(topic, startMessageId, conf, callback); } +void Client::createReaderAsyncV2(const std::string& topic, const MessageId& startMessageId, + const ReaderConfiguration& conf, const ReaderCallbackV2& callback) { + impl_->createReaderAsyncV2(topic, startMessageId, conf, callback); +} + Result Client::createTableView(const std::string& topic, const TableViewConfiguration& conf, TableView& tableView) { Promise promise; diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index a135ade7..620ba932 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -1711,7 +1711,11 @@ void ClientConnection::handleError(const proto::CommandError& error) { pendingRequests_.erase(it); lock.unlock(); - request->fail(result); + ResponseData data; + if (error.has_message()) { + data.errorMessage = error.message(); + } + request->fail(result, data); } else { auto it = pendingGetLastMessageIdRequests_.find(error.request_id()); if (it != pendingGetLastMessageIdRequests_.end()) { diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index c8cd86fe..f17354c1 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -113,9 +113,10 @@ class CommandSuccess; // Data returned on the request operation. Mostly used on create-producer command struct ResponseData { std::string producerName; - int64_t lastSequenceId; + int64_t lastSequenceId = -1L; std::string schemaVersion; optional topicEpoch; + std::string errorMessage; }; typedef std::shared_ptr> NamespaceTopicsPtr; diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index b84c14c4..a51dcd87 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -189,6 +189,20 @@ LookupServicePtr ClientImpl::getLookup(const std::string& redirectedClusterURI) void ClientImpl::createProducerAsync(const std::string& topic, const ProducerConfiguration& conf, const CreateProducerCallback& callback, bool autoDownloadSchema) { + createProducerAsyncV2( + topic, conf, + [callback](std::variant result) { + if (auto producer = std::get_if(&result)) { + callback(ResultOk, *producer); + } else { + callback(std::get(result).result, {}); + } + }, + autoDownloadSchema); +} + +void ClientImpl::createProducerAsyncV2(const std::string& topic, const ProducerConfiguration& conf, + const CreateProducerCallbackV2& callback, bool autoDownloadSchema) { if (conf.isChunkingEnabled() && conf.getBatchingEnabled()) { throw std::invalid_argument("Batching and chunking of messages can't be enabled together"); } @@ -197,11 +211,11 @@ void ClientImpl::createProducerAsync(const std::string& topic, const ProducerCon std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); - callback(ResultAlreadyClosed, Producer()); + callback(Error{ResultAlreadyClosed, ""}); return; } else if (!(topicName = TopicName::get(topic))) { lock.unlock(); - callback(ResultInvalidTopicName, Producer()); + callback(Error{ResultInvalidTopicName, ""}); return; } } @@ -211,18 +225,18 @@ void ClientImpl::createProducerAsync(const std::string& topic, const ProducerCon getSchema(topicName).addListener( [self, topicName, callback](Result res, const SchemaInfo& topicSchema) { if (res != ResultOk) { - callback(res, Producer()); + callback(Error{res, ""}); return; } ProducerConfiguration conf; conf.setSchema(topicSchema); self->getPartitionMetadataAsync(topicName).addListener( - std::bind(&ClientImpl::handleCreateProducer, self, std::placeholders::_1, + std::bind(&ClientImpl::handleCreateProducerV2, self, std::placeholders::_1, std::placeholders::_2, topicName, conf, callback)); }); } else { getPartitionMetadataAsync(topicName).addListener( - std::bind(&ClientImpl::handleCreateProducer, shared_from_this(), std::placeholders::_1, + std::bind(&ClientImpl::handleCreateProducerV2, shared_from_this(), std::placeholders::_1, std::placeholders::_2, topicName, conf, callback)); } } @@ -258,6 +272,37 @@ void ClientImpl::handleCreateProducer(Result result, const LookupDataResultPtr& } } +void ClientImpl::handleCreateProducerV2(Result result, const LookupDataResultPtr& partitionMetadata, + const TopicNamePtr& topicName, const ProducerConfiguration& conf, + const CreateProducerCallbackV2& callback) { + if (!result) { + ProducerImplBasePtr producer; + + auto interceptors = std::make_shared(conf.getInterceptors()); + + try { + if (partitionMetadata->getPartitions() > 0) { + producer = std::make_shared( + shared_from_this(), topicName, partitionMetadata->getPartitions(), conf, interceptors); + } else { + producer = std::make_shared(shared_from_this(), *topicName, conf, interceptors); + } + } catch (const std::runtime_error& e) { + LOG_ERROR("Failed to create producer: " << e.what()); + callback(Error{ResultConnectError, e.what()}); + return; + } + producer->getProducerCreatedFuture().addListener( + std::bind(&ClientImpl::handleProducerCreatedV2, shared_from_this(), std::placeholders::_1, + std::placeholders::_2, callback, producer)); + producer->start(); + } else { + LOG_ERROR("Error Checking/Getting Partition Metadata while creating producer on " + << topicName->toString() << " -- " << result); + callback(Error{result, ""}); + } +} + void ClientImpl::handleProducerCreated(Result result, const ProducerImplBaseWeakPtr& producerBaseWeakPtr, const CreateProducerCallback& callback, const ProducerImplBasePtr& producer) { @@ -277,25 +322,55 @@ void ClientImpl::handleProducerCreated(Result result, const ProducerImplBaseWeak } } +void ClientImpl::handleProducerCreatedV2(Result result, const ProducerImplBaseWeakPtr& producerBaseWeakPtr, + const CreateProducerCallbackV2& callback, + const ProducerImplBasePtr& producer) { + if (result == ResultOk) { + auto address = producer.get(); + auto existingProducer = producers_.putIfAbsent(address, producer); + if (existingProducer) { + auto producer = existingProducer.value().lock(); + LOG_ERROR("Unexpected existing producer at the same address: " + << address << ", producer: " << (producer ? producer->getProducerName() : "(null)")); + callback(Error{ResultUnknownError, ""}); + return; + } + callback(Producer(producer)); + } else { + callback(Error{result, producer->getLastErrorMessage()}); + } +} + void ClientImpl::createReaderAsync(const std::string& topic, const MessageId& startMessageId, const ReaderConfiguration& conf, const ReaderCallback& callback) { + createReaderAsyncV2(topic, startMessageId, conf, [callback](std::variant result) { + if (auto reader = std::get_if(&result)) { + callback(ResultOk, *reader); + } else { + callback(std::get(result).result, {}); + } + }); +} + +void ClientImpl::createReaderAsyncV2(const std::string& topic, const MessageId& startMessageId, + const ReaderConfiguration& conf, const ReaderCallbackV2& callback) { TopicNamePtr topicName; { std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); - callback(ResultAlreadyClosed, Reader()); + callback(Error{ResultAlreadyClosed, ""}); return; } else if (!(topicName = TopicName::get(topic))) { lock.unlock(); - callback(ResultInvalidTopicName, Reader()); + callback(Error{ResultInvalidTopicName, ""}); return; } } MessageId msgId(startMessageId); getPartitionMetadataAsync(topicName).addListener( - std::bind(&ClientImpl::handleReaderMetadataLookup, shared_from_this(), std::placeholders::_1, + std::bind(&ClientImpl::handleReaderMetadataLookupV2, shared_from_this(), std::placeholders::_1, std::placeholders::_2, topicName, msgId, conf, callback)); } @@ -364,6 +439,56 @@ void ClientImpl::handleReaderMetadataLookup(Result result, const LookupDataResul }); } +void ClientImpl::handleReaderMetadataLookupV2(Result result, const LookupDataResultPtr& partitionMetadata, + const TopicNamePtr& topicName, const MessageId& startMessageId, + const ReaderConfiguration& conf, + const ReaderCallbackV2& callback) { + if (result != ResultOk) { + LOG_ERROR("Error Checking/Getting Partition Metadata while creating readeron " + << topicName->toString() << " -- " << result); + callback(Error{result, ""}); + return; + } + + auto readerWeak = std::make_shared(); + ReaderCallback readerCallback = [readerWeak, callback](Result result, const Reader& reader) { + if (result == ResultOk) { + callback(reader); + } else { + auto readerImpl = readerWeak->lock(); + callback(Error{result, readerImpl ? readerImpl->getLastErrorMessage() : ""}); + } + }; + + ReaderImplPtr reader; + try { + reader.reset(new ReaderImpl(shared_from_this(), topicName->toString(), + partitionMetadata->getPartitions(), conf, + getListenerExecutorProvider()->get(), readerCallback)); + } catch (const std::runtime_error& e) { + LOG_ERROR("Failed to create reader: " << e.what()); + callback(Error{ResultConnectError, e.what()}); + return; + } + *readerWeak = reader; + ConsumerImplBasePtr consumer = reader->getConsumer(); + auto self = shared_from_this(); + reader->start(startMessageId, [this, self](const ConsumerImplBaseWeakPtr& weakConsumerPtr) { + auto consumer = weakConsumerPtr.lock(); + if (consumer) { + auto address = consumer.get(); + auto existingConsumer = consumers_.putIfAbsent(address, consumer); + if (existingConsumer) { + consumer = existingConsumer.value().lock(); + LOG_ERROR("Unexpected existing consumer at the same address: " + << address << ", consumer: " << (consumer ? consumer->getName() : "(null)")); + } + } else { + LOG_ERROR("Unexpected case: the consumer is somehow expired"); + } + }); +} + void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern, const std::string& subscriptionName, const ConsumerConfiguration& conf, const SubscribeCallback& callback) { @@ -445,6 +570,19 @@ void ClientImpl::createPatternMultiTopicsConsumer(Result result, const Namespace void ClientImpl::subscribeAsync(const std::vector& originalTopics, const std::string& subscriptionName, const ConsumerConfiguration& conf, const SubscribeCallback& callback) { + subscribeAsyncV2(originalTopics, subscriptionName, conf, + [callback](std::variant result) { + if (auto consumer = std::get_if(&result)) { + callback(ResultOk, *consumer); + } else { + callback(std::get(result).result, {}); + } + }); +} + +void ClientImpl::subscribeAsyncV2(const std::vector& originalTopics, + const std::string& subscriptionName, const ConsumerConfiguration& conf, + const SubscribeCallbackV2& callback) { TopicNamePtr topicNamePtr; // Remove duplicates from the list of topics @@ -456,12 +594,12 @@ void ClientImpl::subscribeAsync(const std::vector& originalTopics, std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); - callback(ResultAlreadyClosed, Consumer()); + callback(Error{ResultAlreadyClosed, ""}); return; } else { if (!topics.empty() && !(topicNamePtr = MultiTopicsConsumerImpl::topicNamesValid(topics))) { lock.unlock(); - callback(ResultInvalidTopicName, Consumer()); + callback(Error{ResultInvalidTopicName, ""}); return; } } @@ -479,7 +617,7 @@ void ClientImpl::subscribeAsync(const std::vector& originalTopics, ConsumerImplBasePtr consumer = std::make_shared( shared_from_this(), topics, subscriptionName, topicNamePtr, conf, interceptors); - consumer->getConsumerCreatedFuture().addListener(std::bind(&ClientImpl::handleConsumerCreated, + consumer->getConsumerCreatedFuture().addListener(std::bind(&ClientImpl::handleConsumerCreatedV2, shared_from_this(), std::placeholders::_1, std::placeholders::_2, callback, consumer)); consumer->start(); @@ -487,28 +625,39 @@ void ClientImpl::subscribeAsync(const std::vector& originalTopics, void ClientImpl::subscribeAsync(const std::string& topic, const std::string& subscriptionName, const ConsumerConfiguration& conf, const SubscribeCallback& callback) { + subscribeAsyncV2(topic, subscriptionName, conf, [callback](std::variant result) { + if (auto consumer = std::get_if(&result)) { + callback(ResultOk, *consumer); + } else { + callback(std::get(result).result, {}); + } + }); +} + +void ClientImpl::subscribeAsyncV2(const std::string& topic, const std::string& subscriptionName, + const ConsumerConfiguration& conf, const SubscribeCallbackV2& callback) { TopicNamePtr topicName; { std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); - callback(ResultAlreadyClosed, Consumer()); + callback(Error{ResultAlreadyClosed, ""}); return; } else if (!(topicName = TopicName::get(topic))) { lock.unlock(); - callback(ResultInvalidTopicName, Consumer()); + callback(Error{ResultInvalidTopicName, ""}); return; } else if (conf.isReadCompacted() && (topicName->getDomain().compare("persistent") != 0 || (conf.getConsumerType() != ConsumerExclusive && conf.getConsumerType() != ConsumerFailover))) { lock.unlock(); - callback(ResultInvalidConfiguration, Consumer()); + callback(Error{ResultInvalidConfiguration, ""}); return; } } getPartitionMetadataAsync(topicName).addListener( - std::bind(&ClientImpl::handleSubscribe, shared_from_this(), std::placeholders::_1, + std::bind(&ClientImpl::handleSubscribeV2, shared_from_this(), std::placeholders::_1, std::placeholders::_2, topicName, subscriptionName, conf, callback)); } @@ -556,6 +705,50 @@ void ClientImpl::handleSubscribe(Result result, const LookupDataResultPtr& parti } } +void ClientImpl::handleSubscribeV2(Result result, const LookupDataResultPtr& partitionMetadata, + const TopicNamePtr& topicName, const std::string& subscriptionName, + ConsumerConfiguration conf, const SubscribeCallbackV2& callback) { + if (result == ResultOk) { + // generate random name if not supplied by the customer. + if (conf.getConsumerName().empty()) { + conf.setConsumerName(generateRandomName()); + } + ConsumerImplBasePtr consumer; + auto interceptors = std::make_shared(conf.getInterceptors()); + + try { + if (partitionMetadata->getPartitions() > 0) { + if (conf.getReceiverQueueSize() == 0) { + LOG_ERROR("Can't use partitioned topic if the queue size is 0."); + callback(Error{ResultInvalidConfiguration, ""}); + return; + } + consumer = std::make_shared(shared_from_this(), topicName, + partitionMetadata->getPartitions(), + subscriptionName, conf, interceptors); + } else { + auto consumerImpl = std::make_shared(shared_from_this(), topicName->toString(), + subscriptionName, conf, + topicName->isPersistent(), interceptors); + consumerImpl->setPartitionIndex(topicName->getPartitionIndex()); + consumer = consumerImpl; + } + } catch (const std::runtime_error& e) { + LOG_ERROR("Failed to create consumer: " << e.what()); + callback(Error{ResultConnectError, e.what()}); + return; + } + consumer->getConsumerCreatedFuture().addListener( + std::bind(&ClientImpl::handleConsumerCreatedV2, shared_from_this(), std::placeholders::_1, + std::placeholders::_2, callback, consumer)); + consumer->start(); + } else { + LOG_ERROR("Error Checking/Getting Partition Metadata while Subscribing on " << topicName->toString() + << " -- " << result); + callback(Error{result, ""}); + } +} + void ClientImpl::handleConsumerCreated(Result result, const ConsumerImplBaseWeakPtr& consumerImplBaseWeakPtr, const SubscribeCallback& callback, const ConsumerImplBasePtr& consumer) { @@ -582,6 +775,34 @@ void ClientImpl::handleConsumerCreated(Result result, const ConsumerImplBaseWeak } } +void ClientImpl::handleConsumerCreatedV2(Result result, + const ConsumerImplBaseWeakPtr& consumerImplBaseWeakPtr, + const SubscribeCallbackV2& callback, + const ConsumerImplBasePtr& consumer) { + if (result == ResultOk) { + auto address = consumer.get(); + auto existingConsumer = consumers_.putIfAbsent(address, consumer); + if (existingConsumer) { + auto consumer = existingConsumer.value().lock(); + LOG_ERROR("Unexpected existing consumer at the same address: " + << address << ", consumer: " << (consumer ? consumer->getName() : "(null)")); + callback(Error{ResultUnknownError, ""}); + return; + } + callback(Consumer(consumer)); + } else { + const auto errorMessage = consumer->getLastErrorMessage(); + // In order to be compatible with the current broker error code confusion. + // https://github.com/apache/pulsar/blob/cd2aa550d0fe4e72b5ff88c4f6c1c2795b3ff2cd/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java#L240-L241 + if (result == ResultProducerBusy) { + LOG_ERROR("Failed to create consumer: SubscriptionName cannot be empty."); + callback(Error{ResultInvalidConfiguration, errorMessage}); + } else { + callback(Error{result, errorMessage}); + } + } +} + GetConnectionFuture ClientImpl::getConnection(const std::string& redirectedClusterURI, const std::string& topic, size_t key) { Promise promise; diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h index 7772b15b..a96d510d 100644 --- a/lib/ClientImpl.h +++ b/lib/ClientImpl.h @@ -95,18 +95,30 @@ class ClientImpl : public std::enable_shared_from_this { void createProducerAsync(const std::string& topic, const ProducerConfiguration& conf, const CreateProducerCallback& callback, bool autoDownloadSchema = false); + void createProducerAsyncV2(const std::string& topic, const ProducerConfiguration& conf, + const CreateProducerCallbackV2& callback, bool autoDownloadSchema = false); + void subscribeAsync(const std::string& topic, const std::string& subscriptionName, const ConsumerConfiguration& conf, const SubscribeCallback& callback); + void subscribeAsyncV2(const std::string& topic, const std::string& subscriptionName, + const ConsumerConfiguration& conf, const SubscribeCallbackV2& callback); + void subscribeAsync(const std::vector& topics, const std::string& subscriptionName, const ConsumerConfiguration& conf, const SubscribeCallback& callback); + void subscribeAsyncV2(const std::vector& topics, const std::string& subscriptionName, + const ConsumerConfiguration& conf, const SubscribeCallbackV2& callback); + void subscribeWithRegexAsync(const std::string& regexPattern, const std::string& subscriptionName, const ConsumerConfiguration& conf, const SubscribeCallback& callback); void createReaderAsync(const std::string& topic, const MessageId& startMessageId, const ReaderConfiguration& conf, const ReaderCallback& callback); + void createReaderAsyncV2(const std::string& topic, const MessageId& startMessageId, + const ReaderConfiguration& conf, const ReaderCallbackV2& callback); + void createTableViewAsync(const std::string& topic, const TableViewConfiguration& conf, const TableViewCallback& callback); @@ -176,21 +188,38 @@ class ClientImpl : public std::enable_shared_from_this { const TopicNamePtr& topicName, const ProducerConfiguration& conf, const CreateProducerCallback& callback); + void handleCreateProducerV2(Result result, const LookupDataResultPtr& partitionMetadata, + const TopicNamePtr& topicName, const ProducerConfiguration& conf, + const CreateProducerCallbackV2& callback); + void handleSubscribe(Result result, const LookupDataResultPtr& partitionMetadata, const TopicNamePtr& topicName, const std::string& consumerName, ConsumerConfiguration conf, const SubscribeCallback& callback); + void handleSubscribeV2(Result result, const LookupDataResultPtr& partitionMetadata, + const TopicNamePtr& topicName, const std::string& consumerName, + ConsumerConfiguration conf, const SubscribeCallbackV2& callback); + void handleReaderMetadataLookup(Result result, const LookupDataResultPtr& partitionMetadata, const TopicNamePtr& topicName, const MessageId& startMessageId, const ReaderConfiguration& conf, const ReaderCallback& callback); + void handleReaderMetadataLookupV2(Result result, const LookupDataResultPtr& partitionMetadata, + const TopicNamePtr& topicName, const MessageId& startMessageId, + const ReaderConfiguration& conf, const ReaderCallbackV2& callback); + void handleGetPartitions(Result result, const LookupDataResultPtr& partitionMetadata, const TopicNamePtr& topicName, const GetPartitionsCallback& callback); void handleProducerCreated(Result result, const ProducerImplBaseWeakPtr& producerWeakPtr, const CreateProducerCallback& callback, const ProducerImplBasePtr& producer); + void handleProducerCreatedV2(Result result, const ProducerImplBaseWeakPtr& producerWeakPtr, + const CreateProducerCallbackV2& callback, + const ProducerImplBasePtr& producer); void handleConsumerCreated(Result result, const ConsumerImplBaseWeakPtr& consumerWeakPtr, const SubscribeCallback& callback, const ConsumerImplBasePtr& consumer); + void handleConsumerCreatedV2(Result result, const ConsumerImplBaseWeakPtr& consumerWeakPtr, + const SubscribeCallbackV2& callback, const ConsumerImplBasePtr& consumer); typedef std::shared_ptr SharedInt; diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 85f49946..6d5ff7ca 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -214,6 +214,11 @@ const std::string& ConsumerImpl::getSubscriptionName() const { return originalSu const std::string& ConsumerImpl::getTopic() const { return topic(); } +std::string ConsumerImpl::getLastErrorMessage() const { + Lock lock(mutex_); + return lastErrorMessage_; +} + void ConsumerImpl::start() { HandlerBase::start(); ackGroupingTrackerPtr_->start(get_shared_this_ptr()); @@ -263,7 +268,7 @@ Future ConsumerImpl::connectionOpened(const ClientConnectionPtr& c setFirstRequestIdAfterConnect(requestId); cnx->sendRequestWithId(cmd, requestId, "SUBSCRIBE") .addListener([this, self, cnx, promise](Result result, const ResponseData& responseData) { - Result handleResult = handleCreateConsumer(cnx, result); + Result handleResult = handleCreateConsumer(cnx, result, responseData); if (handleResult != ResultOk) { promise.setFailed(handleResult); return; @@ -301,6 +306,11 @@ void ConsumerImpl::sendFlowPermitsToBroker(const ClientConnectionPtr& cnx, int n } Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result result) { + return handleCreateConsumer(cnx, result, ResponseData{}); +} + +Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result result, + const ResponseData& responseData) { Result handleResult = ResultOk; if (result == ResultOk) { @@ -361,6 +371,10 @@ Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result } consumerCreatedPromise_.setValue(get_shared_this_ptr()); } else { + { + Lock lock(mutex_); + lastErrorMessage_ = responseData.errorMessage; + } if (result == ResultTimeout) { // Creating the consumer has timed out. We need to ensure the broker closes the consumer // in case it was indeed created, otherwise it might prevent new subscribe operation, diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 6f287aa2..58bd467d 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -50,6 +50,7 @@ class ExecutorService; class ConsumerImpl; class MessageCrypto; class GetLastMessageIdResponse; +struct ResponseData; typedef std::shared_ptr MessageCryptoPtr; typedef std::shared_ptr BackoffPtr; typedef std::function ProcessDLQCallBack; @@ -113,6 +114,7 @@ class ConsumerImpl : public ConsumerImplBase { Future getConsumerCreatedFuture() override; const std::string& getSubscriptionName() const override; const std::string& getTopic() const override; + std::string getLastErrorMessage() const override; Result receive(Message& msg) override; Result receive(Message& msg, int timeout) override; void receiveAsync(const ReceiveCallback& callback) override; @@ -174,6 +176,8 @@ class ConsumerImpl : public ConsumerImplBase { void notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) override; Result handleCreateConsumer(const ClientConnectionPtr& cnx, Result result); + Result handleCreateConsumer(const ClientConnectionPtr& cnx, Result result, + const ResponseData& responseData); void internalListener(); @@ -250,6 +254,7 @@ class ConsumerImpl : public ConsumerImplBase { const int receiverQueueRefillThreshold_; const uint64_t consumerId_; const std::string consumerStr_; + std::string lastErrorMessage_; int32_t partitionIndex_ = -1; Promise consumerCreatedPromise_; std::atomic_bool messageListenerRunning_; diff --git a/lib/ConsumerImplBase.h b/lib/ConsumerImplBase.h index ffc0e3cb..6b7e2aff 100644 --- a/lib/ConsumerImplBase.h +++ b/lib/ConsumerImplBase.h @@ -53,6 +53,7 @@ class ConsumerImplBase : public HandlerBase { virtual Future getConsumerCreatedFuture() = 0; virtual const std::string& getTopic() const = 0; virtual const std::string& getSubscriptionName() const = 0; + virtual std::string getLastErrorMessage() const = 0; virtual Result receive(Message& msg) = 0; virtual Result receive(Message& msg, int timeout) = 0; virtual void receiveAsync(const ReceiveCallback& callback) = 0; diff --git a/lib/Future.h b/lib/Future.h index 3b2b5246..1ca5cae8 100644 --- a/lib/Future.h +++ b/lib/Future.h @@ -137,6 +137,8 @@ class Promise { bool setFailed(Result result) const { return state_->complete(result, {}); } + bool setFailed(Result result, const Type &value) const { return state_->complete(result, value); } + bool setSuccess() const { return setValue({}); } bool isComplete() const { return state_->completed(); } diff --git a/lib/MockServer.h b/lib/MockServer.h index 6f8d1390..63a11cc9 100644 --- a/lib/MockServer.h +++ b/lib/MockServer.h @@ -36,6 +36,10 @@ namespace pulsar { class MockServer : public std::enable_shared_from_this { public: using RequestDelayType = std::unordered_map; + struct RequestError { + proto::ServerError error; + std::string message; + }; MockServer(const ClientConnectionPtr& connection) : connection_(connection) { requestDelays_["CLOSE_CONSUMER"] = 1; @@ -48,6 +52,11 @@ class MockServer : public std::enable_shared_from_this { } } + void setRequestError(const std::string& request, proto::ServerError error, const std::string& message) { + std::lock_guard lock(mutex_); + requestErrors_[request] = RequestError{error, message}; + } + bool sendRequest(const std::string& request, uint64_t requestId) { auto connection = connection_.lock(); if (!connection) { @@ -75,9 +84,23 @@ class MockServer : public std::enable_shared_from_this { } }); } + bool shouldFail = false; + proto::ServerError error = proto::UnknownError; + std::string message; + if (auto errorIter = requestErrors_.find(request); errorIter != requestErrors_.end()) { + shouldFail = true; + error = errorIter->second.error; + message = errorIter->second.message; + } schedule(connection, request + std::to_string(requestId), iter->second, - [connection, request, requestId] { - if (request == "CONSUMER_STATS") { + [connection, request, requestId, shouldFail, error, message] { + if (shouldFail) { + proto::CommandError response; + response.set_request_id(requestId); + response.set_error(error); + response.set_message(message); + connection->handleError(response); + } else if (request == "CONSUMER_STATS") { proto::CommandConsumerStatsResponse response; response.set_request_id(requestId); connection->handleConsumerStatsResponse(response); @@ -132,6 +155,7 @@ class MockServer : public std::enable_shared_from_this { private: mutable std::mutex mutex_; std::unordered_map requestDelays_; + std::unordered_map requestErrors_; std::unordered_map pendingTimers_; ClientConnectionWeakPtr connection_; diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index a5699781..04d6cf08 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -255,6 +255,10 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions( subscriptionMode_, startMessageId_); } catch (const std::runtime_error& e) { LOG_ERROR("Failed to create ConsumerImpl for " << topicName->toString() << ": " << e.what()); + { + Lock lock(mutex_); + lastErrorMessage_ = e.what(); + } topicSubResultPromise->setFailed(ResultConnectError); return; } @@ -276,6 +280,10 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions( subscriptionMode_, startMessageId_); } catch (const std::runtime_error& e) { LOG_ERROR("Failed to create ConsumerImpl for " << topicPartitionName << ": " << e.what()); + { + Lock lock(mutex_); + lastErrorMessage_ = e.what(); + } topicSubResultPromise->setFailed(ResultConnectError); return; } @@ -310,6 +318,11 @@ void MultiTopicsConsumerImpl::handleSingleConsumerCreated( assert(previous > 0); if (result != ResultOk) { + if (auto consumer = consumerImplBaseWeakPtr.lock()) { + auto lastErrorMessage = consumer->getLastErrorMessage(); + Lock lock(mutex_); + lastErrorMessage_ = lastErrorMessage; + } topicSubResultPromise->setFailed(result); LOG_ERROR("Unable to create Consumer - " << consumerStr_ << " Error - " << result); return; @@ -763,6 +776,11 @@ const std::string& MultiTopicsConsumerImpl::getSubscriptionName() const { return const std::string& MultiTopicsConsumerImpl::getTopic() const { return topic(); } +std::string MultiTopicsConsumerImpl::getLastErrorMessage() const { + Lock lock(mutex_); + return lastErrorMessage_; +} + const std::string& MultiTopicsConsumerImpl::getName() const { return consumerStr_; } void MultiTopicsConsumerImpl::shutdown() { internalShutdown(); } diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h index 38a44cdf..b49eaed8 100644 --- a/lib/MultiTopicsConsumerImpl.h +++ b/lib/MultiTopicsConsumerImpl.h @@ -67,6 +67,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase { Future getConsumerCreatedFuture() override; const std::string& getSubscriptionName() const override; const std::string& getTopic() const override; + std::string getLastErrorMessage() const override; Result receive(Message& msg) override; Result receive(Message& msg, int timeout) override; void receiveAsync(const ReceiveCallback& callback) override; @@ -117,6 +118,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase { TimeDuration partitionsUpdateInterval_; std::shared_ptr> numberTopicPartitions_; std::atomic failedResult{ResultOk}; + std::string lastErrorMessage_; Promise multiTopicsConsumerCreatedPromise_; UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_; const std::vector topics_; diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc index 1aa5c87b..dbb170b8 100644 --- a/lib/PartitionedProducerImpl.cc +++ b/lib/PartitionedProducerImpl.cc @@ -81,6 +81,11 @@ PartitionedProducerImpl::~PartitionedProducerImpl() { internalShutdown(); } // override const std::string& PartitionedProducerImpl::getTopic() const { return topic_; } +std::string PartitionedProducerImpl::getLastErrorMessage() const { + Lock lock(producersMutex_); + return lastErrorMessage_; +} + unsigned int PartitionedProducerImpl::getNumPartitions() const { return static_cast(topicMetadata_->getNumPartitions()); } @@ -163,6 +168,11 @@ void PartitionedProducerImpl::handleSinglePartitionProducerCreated( if (result != ResultOk) { LOG_ERROR("Unable to create Producer for partition - " << partitionIndex << " Error - " << result); + if (auto producer = producerWeakPtr.lock()) { + const auto lastErrorMessage = producer->getLastErrorMessage(); + Lock lock(producersMutex_); + lastErrorMessage_ = lastErrorMessage; + } partitionedProducerCreatedPromise_.setFailed(result); state_ = Failed; if (++numProducersCreated_ == numPartitions) { diff --git a/lib/PartitionedProducerImpl.h b/lib/PartitionedProducerImpl.h index 94ba7179..6e1bb9d9 100644 --- a/lib/PartitionedProducerImpl.h +++ b/lib/PartitionedProducerImpl.h @@ -78,6 +78,7 @@ class PartitionedProducerImpl : public ProducerImplBase, void internalShutdown(); bool isClosed() override; const std::string& getTopic() const override; + std::string getLastErrorMessage() const override; Future getProducerCreatedFuture() override; void triggerFlush() override; void flushAsync(FlushCallback callback) override; @@ -101,6 +102,7 @@ class PartitionedProducerImpl : public ProducerImplBase, const TopicNamePtr topicName_; const std::string topic_; + std::string lastErrorMessage_; std::atomic_uint numProducersCreated_{0}; diff --git a/lib/PendingRequest.h b/lib/PendingRequest.h index 465073f6..8a9f9633 100644 --- a/lib/PendingRequest.h +++ b/lib/PendingRequest.h @@ -57,6 +57,11 @@ class PendingRequest : public std::enable_shared_from_this> { cancelTimer(timer_); } + void fail(Result result, const T& value) { + promise_.setFailed(result, value); + cancelTimer(timer_); + } + void disableTimeout() { timeoutDisabled_.store(true, std::memory_order_release); } auto getFuture() const { return promise_.getFuture(); } diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index 7632581e..6222e611 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -127,6 +127,11 @@ const std::string& ProducerImpl::getTopic() const { return topic(); } const std::string& ProducerImpl::getProducerName() const { return producerName_; } +std::string ProducerImpl::getLastErrorMessage() const { + Lock lock(mutex_); + return lastErrorMessage_; +} + int64_t ProducerImpl::getLastSequenceId() const { return lastSequenceIdPublished_; } const std::string& ProducerImpl::getSchemaVersion() const { return schemaVersion_; } @@ -261,6 +266,7 @@ Result ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result } else { // Producer creation failed + lastErrorMessage_ = responseData.errorMessage; if (result == ResultTimeout) { // Creating the producer has timed out. We need to ensure the broker closes the producer // in case it was indeed created, otherwise it might prevent new create producer operation, diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h index 26207f80..78bfa15f 100644 --- a/lib/ProducerImpl.h +++ b/lib/ProducerImpl.h @@ -87,6 +87,7 @@ class ProducerImpl : public HandlerBase, public ProducerImplBase { void internalShutdown(); bool isClosed() override; const std::string& getTopic() const override; + std::string getLastErrorMessage() const override; Future getProducerCreatedFuture() override; void triggerFlush() override; void flushAsync(FlushCallback callback) override; @@ -180,6 +181,7 @@ class ProducerImpl : public HandlerBase, public ProducerImplBase { std::string producerName_; bool userProvidedProducerName_; std::string producerStr_; + std::string lastErrorMessage_; uint64_t producerId_; std::unique_ptr batchMessageContainer_; diff --git a/lib/ProducerImplBase.h b/lib/ProducerImplBase.h index 25a12c8a..f5ee5b81 100644 --- a/lib/ProducerImplBase.h +++ b/lib/ProducerImplBase.h @@ -44,6 +44,7 @@ class ProducerImplBase { virtual bool isClosed() = 0; virtual void shutdown() = 0; virtual const std::string& getTopic() const = 0; + virtual std::string getLastErrorMessage() const = 0; virtual Future getProducerCreatedFuture() = 0; virtual void triggerFlush() = 0; virtual void flushAsync(FlushCallback callback) = 0; diff --git a/lib/ReaderImpl.cc b/lib/ReaderImpl.cc index 754137c5..a1f1a5b9 100644 --- a/lib/ReaderImpl.cc +++ b/lib/ReaderImpl.cc @@ -117,6 +117,10 @@ void ReaderImpl::start(const MessageId& startMessageId, const std::string& ReaderImpl::getTopic() const { return consumer_->getTopic(); } +std::string ReaderImpl::getLastErrorMessage() const { + return consumer_ ? consumer_->getLastErrorMessage() : ""; +} + Result ReaderImpl::readNext(Message& msg) { Result res = consumer_->receive(msg); acknowledgeIfNecessary(res, msg); diff --git a/lib/ReaderImpl.h b/lib/ReaderImpl.h index 020a5037..2c5ed934 100644 --- a/lib/ReaderImpl.h +++ b/lib/ReaderImpl.h @@ -66,6 +66,7 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this const std::function& callback); const std::string& getTopic() const; + std::string getLastErrorMessage() const; Result readNext(Message& msg); Result readNext(Message& msg, int timeoutMs); diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc index 63ac9d16..807b1c7f 100644 --- a/tests/ClientTest.cc +++ b/tests/ClientTest.cc @@ -328,6 +328,68 @@ TEST(ClientTest, testTimedOutPendingRequestsAreErasedFromConnectionMaps) { executorProvider->close(); } +TEST(ClientTest, testRequestErrorMessageIsReturnedInResponseData) { + ClientConfiguration conf; + + auto executorProvider = std::make_shared(1); + AtomicSharedPtr serviceInfo; + serviceInfo.store(std::make_shared(lookupUrl)); + ConnectionPool pool(serviceInfo, conf, executorProvider, ""); + auto connection = std::make_shared(lookupUrl, lookupUrl, *serviceInfo.load(), + executorProvider->get(), conf, "", pool, 0); + PulsarFriend::setServerProtocolVersion(*connection, 8); + + auto mockServer = std::make_shared(connection); + connection->attachMockServer(mockServer); + + const std::string errorMessage = "bad token"; + mockServer->setRequestDelay({{"PRODUCER", 1}}); + mockServer->setRequestError("PRODUCER", proto::AuthenticationError, errorMessage); + + auto future = connection->sendRequestWithId(Commands::newPing(), 0, "PRODUCER"); + + ResponseData responseData; + ASSERT_EQ(ResultAuthenticationError, future.get(responseData)); + ASSERT_EQ(errorMessage, responseData.errorMessage); + ASSERT_EQ(0u, PulsarFriend::getPendingRequests(*connection)); + ASSERT_EQ(0u, mockServer->close()); + + connection->close(ResultDisconnected).wait(); + executorProvider->close(); +} + +TEST(ClientTest, testCreateProducerV2ReturnsError) { + Client client(lookupUrl); + + auto result = client.createProducerV2("persistent://prop//unit/ns1/testCreateProducerV2ReturnsError"); + auto error = std::get_if(&result); + + ASSERT_NE(nullptr, error); + ASSERT_EQ(ResultInvalidTopicName, error->result); +} + +TEST(ClientTest, testSubscribeV2ReturnsError) { + Client client(lookupUrl); + + auto result = client.subscribeV2("persistent://prop//unit/ns1/testSubscribeV2ReturnsError", "sub"); + auto error = std::get_if(&result); + + ASSERT_NE(nullptr, error); + ASSERT_EQ(ResultInvalidTopicName, error->result); +} + +TEST(ClientTest, testCreateReaderV2ReturnsError) { + Client client(lookupUrl); + ReaderConfiguration conf; + + auto result = client.createReaderV2("persistent://prop//unit/ns1/testCreateReaderV2ReturnsError", + MessageId::earliest(), conf); + auto error = std::get_if(&result); + + ASSERT_NE(nullptr, error); + ASSERT_EQ(ResultInvalidTopicName, error->result); +} + TEST(ClientTest, testGetNumberOfReferences) { Client client("pulsar://localhost:6650"); From fc2ee3dc6d61d5012e5caae1524951dac07fe156 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 21 May 2026 10:21:38 +0800 Subject: [PATCH 2/4] fix clang-tidy error --- lib/Client.cc | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/lib/Client.cc b/lib/Client.cc index 95b71c49..dce18ce2 100644 --- a/lib/Client.cc +++ b/lib/Client.cc @@ -70,8 +70,7 @@ std::variant Client::createProducerV2(const std::string& topic) std::variant Client::createProducerV2(const std::string& topic, const ProducerConfiguration& conf) { Promise > promise; - createProducerAsyncV2(topic, conf, - [promise](std::variant result) { promise.setValue(result); }); + createProducerAsyncV2(topic, conf, [promise](const auto& result) { promise.setValue(result); }); Future > future = promise.getFuture(); std::variant result; @@ -116,7 +115,7 @@ std::variant Client::subscribeV2(const std::string& topic, const ConsumerConfiguration& conf) { Promise > promise; subscribeAsyncV2(topic, subscriptionName, conf, - [promise](std::variant result) { promise.setValue(result); }); + [promise](const auto& result) { promise.setValue(result); }); Future > future = promise.getFuture(); std::variant result; @@ -170,7 +169,7 @@ std::variant Client::subscribeV2(const std::vector const ConsumerConfiguration& conf) { Promise > promise; subscribeAsyncV2(topics, subscriptionName, conf, - [promise](std::variant result) { promise.setValue(result); }); + [promise](const auto& result) { promise.setValue(result); }); Future > future = promise.getFuture(); std::variant result; @@ -235,7 +234,7 @@ std::variant Client::createReaderV2(const std::string& topic, con const ReaderConfiguration& conf) { Promise > promise; createReaderAsyncV2(topic, startMessageId, conf, - [promise](std::variant result) { promise.setValue(result); }); + [promise](const auto& result) { promise.setValue(result); }); Future > future = promise.getFuture(); std::variant result; From ac168b8203ad168b95f35e9d7b93653e5f4f333c Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 21 May 2026 11:24:23 +0800 Subject: [PATCH 3/4] remove unused methods --- lib/ClientImpl.cc | 113 ---------------------------------------------- lib/ClientImpl.h | 12 ----- 2 files changed, 125 deletions(-) diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index a51dcd87..f1e41b4b 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -241,37 +241,6 @@ void ClientImpl::createProducerAsyncV2(const std::string& topic, const ProducerC } } -void ClientImpl::handleCreateProducer(Result result, const LookupDataResultPtr& partitionMetadata, - const TopicNamePtr& topicName, const ProducerConfiguration& conf, - const CreateProducerCallback& callback) { - if (!result) { - ProducerImplBasePtr producer; - - auto interceptors = std::make_shared(conf.getInterceptors()); - - try { - if (partitionMetadata->getPartitions() > 0) { - producer = std::make_shared( - shared_from_this(), topicName, partitionMetadata->getPartitions(), conf, interceptors); - } else { - producer = std::make_shared(shared_from_this(), *topicName, conf, interceptors); - } - } catch (const std::runtime_error& e) { - LOG_ERROR("Failed to create producer: " << e.what()); - callback(ResultConnectError, {}); - return; - } - producer->getProducerCreatedFuture().addListener( - std::bind(&ClientImpl::handleProducerCreated, shared_from_this(), std::placeholders::_1, - std::placeholders::_2, callback, producer)); - producer->start(); - } else { - LOG_ERROR("Error Checking/Getting Partition Metadata while creating producer on " - << topicName->toString() << " -- " << result); - callback(result, Producer()); - } -} - void ClientImpl::handleCreateProducerV2(Result result, const LookupDataResultPtr& partitionMetadata, const TopicNamePtr& topicName, const ProducerConfiguration& conf, const CreateProducerCallbackV2& callback) { @@ -401,44 +370,6 @@ void ClientImpl::createTableViewAsync(const std::string& topic, const TableViewC }); } -void ClientImpl::handleReaderMetadataLookup(Result result, const LookupDataResultPtr& partitionMetadata, - const TopicNamePtr& topicName, const MessageId& startMessageId, - const ReaderConfiguration& conf, const ReaderCallback& callback) { - if (result != ResultOk) { - LOG_ERROR("Error Checking/Getting Partition Metadata while creating readeron " - << topicName->toString() << " -- " << result); - callback(result, Reader()); - return; - } - - ReaderImplPtr reader; - try { - reader.reset(new ReaderImpl(shared_from_this(), topicName->toString(), - partitionMetadata->getPartitions(), conf, - getListenerExecutorProvider()->get(), callback)); - } catch (const std::runtime_error& e) { - LOG_ERROR("Failed to create reader: " << e.what()); - callback(ResultConnectError, {}); - return; - } - ConsumerImplBasePtr consumer = reader->getConsumer(); - auto self = shared_from_this(); - reader->start(startMessageId, [this, self](const ConsumerImplBaseWeakPtr& weakConsumerPtr) { - auto consumer = weakConsumerPtr.lock(); - if (consumer) { - auto address = consumer.get(); - auto existingConsumer = consumers_.putIfAbsent(address, consumer); - if (existingConsumer) { - consumer = existingConsumer.value().lock(); - LOG_ERROR("Unexpected existing consumer at the same address: " - << address << ", consumer: " << (consumer ? consumer->getName() : "(null)")); - } - } else { - LOG_ERROR("Unexpected case: the consumer is somehow expired"); - } - }); -} - void ClientImpl::handleReaderMetadataLookupV2(Result result, const LookupDataResultPtr& partitionMetadata, const TopicNamePtr& topicName, const MessageId& startMessageId, const ReaderConfiguration& conf, @@ -661,50 +592,6 @@ void ClientImpl::subscribeAsyncV2(const std::string& topic, const std::string& s std::placeholders::_2, topicName, subscriptionName, conf, callback)); } -void ClientImpl::handleSubscribe(Result result, const LookupDataResultPtr& partitionMetadata, - const TopicNamePtr& topicName, const std::string& subscriptionName, - ConsumerConfiguration conf, const SubscribeCallback& callback) { - if (result == ResultOk) { - // generate random name if not supplied by the customer. - if (conf.getConsumerName().empty()) { - conf.setConsumerName(generateRandomName()); - } - ConsumerImplBasePtr consumer; - auto interceptors = std::make_shared(conf.getInterceptors()); - - try { - if (partitionMetadata->getPartitions() > 0) { - if (conf.getReceiverQueueSize() == 0) { - LOG_ERROR("Can't use partitioned topic if the queue size is 0."); - callback(ResultInvalidConfiguration, Consumer()); - return; - } - consumer = std::make_shared(shared_from_this(), topicName, - partitionMetadata->getPartitions(), - subscriptionName, conf, interceptors); - } else { - auto consumerImpl = std::make_shared(shared_from_this(), topicName->toString(), - subscriptionName, conf, - topicName->isPersistent(), interceptors); - consumerImpl->setPartitionIndex(topicName->getPartitionIndex()); - consumer = consumerImpl; - } - } catch (const std::runtime_error& e) { - LOG_ERROR("Failed to create consumer: " << e.what()); - callback(ResultConnectError, {}); - return; - } - consumer->getConsumerCreatedFuture().addListener( - std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1, - std::placeholders::_2, callback, consumer)); - consumer->start(); - } else { - LOG_ERROR("Error Checking/Getting Partition Metadata while Subscribing on " << topicName->toString() - << " -- " << result); - callback(result, Consumer()); - } -} - void ClientImpl::handleSubscribeV2(Result result, const LookupDataResultPtr& partitionMetadata, const TopicNamePtr& topicName, const std::string& subscriptionName, ConsumerConfiguration conf, const SubscribeCallbackV2& callback) { diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h index a96d510d..c0cb1eb8 100644 --- a/lib/ClientImpl.h +++ b/lib/ClientImpl.h @@ -184,26 +184,14 @@ class ClientImpl : public std::enable_shared_from_this { friend class PulsarFriend; private: - void handleCreateProducer(Result result, const LookupDataResultPtr& partitionMetadata, - const TopicNamePtr& topicName, const ProducerConfiguration& conf, - const CreateProducerCallback& callback); - void handleCreateProducerV2(Result result, const LookupDataResultPtr& partitionMetadata, const TopicNamePtr& topicName, const ProducerConfiguration& conf, const CreateProducerCallbackV2& callback); - void handleSubscribe(Result result, const LookupDataResultPtr& partitionMetadata, - const TopicNamePtr& topicName, const std::string& consumerName, - ConsumerConfiguration conf, const SubscribeCallback& callback); - void handleSubscribeV2(Result result, const LookupDataResultPtr& partitionMetadata, const TopicNamePtr& topicName, const std::string& consumerName, ConsumerConfiguration conf, const SubscribeCallbackV2& callback); - void handleReaderMetadataLookup(Result result, const LookupDataResultPtr& partitionMetadata, - const TopicNamePtr& topicName, const MessageId& startMessageId, - const ReaderConfiguration& conf, const ReaderCallback& callback); - void handleReaderMetadataLookupV2(Result result, const LookupDataResultPtr& partitionMetadata, const TopicNamePtr& topicName, const MessageId& startMessageId, const ReaderConfiguration& conf, const ReaderCallbackV2& callback); From 3b54f6bdabbf26b21484e5d529b4c7f0b4e1a69f Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 21 May 2026 13:55:21 +0800 Subject: [PATCH 4/4] add tests to verify message is correctly set --- include/pulsar/Result.h | 9 +++- lib/BinaryProtoLookupService.cc | 27 ++++++----- lib/BinaryProtoLookupService.h | 14 +++--- lib/Client.cc | 19 +++++--- lib/ClientConnection.cc | 81 ++++++++++++++++++++++----------- lib/ClientConnection.h | 26 +++++------ lib/ClientImpl.cc | 25 +++++----- lib/ClientImpl.h | 6 +-- lib/HTTPLookupService.cc | 18 ++++---- lib/HTTPLookupService.h | 12 ++--- lib/LookupDataResult.h | 2 +- lib/LookupService.h | 12 ++--- lib/MockServer.h | 28 +++++++++++- lib/MultiTopicsConsumerImpl.cc | 7 ++- lib/PendingRequest.h | 14 +++--- lib/RetryableLookupService.h | 22 ++++----- lib/RetryableOperation.h | 26 +++++------ lib/RetryableOperationCache.h | 24 +++++----- tests/ClientTest.cc | 78 +++++++++++++++++++++++++++++++ tests/LookupServiceTest.cc | 12 ++--- 20 files changed, 304 insertions(+), 158 deletions(-) diff --git a/include/pulsar/Result.h b/include/pulsar/Result.h index 7b1bb1a6..51ae8746 100644 --- a/include/pulsar/Result.h +++ b/include/pulsar/Result.h @@ -24,6 +24,7 @@ #include #include #include +#include namespace pulsar { @@ -104,7 +105,13 @@ PULSAR_PUBLIC const char* strResult(Result result); PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, pulsar::Result result); struct Error { - Result result; + Error() = default; + Error(Result result) : result(result) {} + Error(Result result, std::string message) : result(result), message(std::move(message)) {} + + operator Result() const { return result; } + + Result result = ResultOk; std::string message; }; diff --git a/lib/BinaryProtoLookupService.cc b/lib/BinaryProtoLookupService.cc index a110e965..7f127126 100644 --- a/lib/BinaryProtoLookupService.cc +++ b/lib/BinaryProtoLookupService.cc @@ -37,7 +37,7 @@ auto BinaryProtoLookupService::findBroker(const std::string& address, bool autho -> LookupResultFuture { LOG_DEBUG("find broker from " << address << ", authoritative: " << authoritative << ", topic: " << topic << ", redirect count: " << redirectCount); - auto promise = std::make_shared>(); + auto promise = std::make_shared>(); if (maxLookupRedirects_ > 0 && redirectCount > maxLookupRedirects_) { LOG_ERROR("Too many lookup request redirects on topic " << topic << ", configured limit is " << maxLookupRedirects_); @@ -62,7 +62,7 @@ auto BinaryProtoLookupService::findBroker(const std::string& address, bool autho auto lookupPromise = std::make_shared(); cnx->newTopicLookup(topic, authoritative, listenerName_, newRequestId(), lookupPromise); lookupPromise->getFuture().addListener([this, cnx, promise, topic, address, redirectCount]( - Result result, const LookupDataResultPtr& data) { + Error result, const LookupDataResultPtr& data) { if (result != ResultOk || !data) { LOG_ERROR("Lookup failed for " << topic << ", result " << result); promise->setFailed(result); @@ -74,7 +74,7 @@ auto BinaryProtoLookupService::findBroker(const std::string& address, bool autho if (data->isRedirect()) { LOG_DEBUG("Lookup request is for " << topic << " redirected to " << responseBrokerAddress); findBroker(responseBrokerAddress, data->isAuthoritative(), topic, redirectCount + 1) - .addListener([promise](Result result, const LookupResult& value) { + .addListener([promise](Error result, const LookupResult& value) { if (result == ResultOk) { promise->setValue(value); } else { @@ -100,7 +100,7 @@ auto BinaryProtoLookupService::findBroker(const std::string& address, bool autho * @param topicName topic to get number of partitions. * */ -Future BinaryProtoLookupService::getPartitionMetadataAsync( +Future BinaryProtoLookupService::getPartitionMetadataAsync( const TopicNamePtr& topicName) { LookupDataResultPromisePtr promise = std::make_shared(); if (!topicName) { @@ -135,7 +135,7 @@ void BinaryProtoLookupService::sendPartitionMetadataLookupRequest(const std::str std::placeholders::_2, clientCnx, promise)); } -void BinaryProtoLookupService::handlePartitionMetadataLookup(const std::string& topicName, Result result, +void BinaryProtoLookupService::handlePartitionMetadataLookup(const std::string& topicName, Error result, const LookupDataResultPtr& data, const ClientConnectionWeakPtr& clientCnx, const LookupDataResultPromisePtr& promise) { @@ -154,9 +154,9 @@ uint64_t BinaryProtoLookupService::newRequestId() { return ++requestIdGenerator_; } -Future BinaryProtoLookupService::getTopicsOfNamespaceAsync( +Future BinaryProtoLookupService::getTopicsOfNamespaceAsync( const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) { - NamespaceTopicsPromisePtr promise = std::make_shared>(); + NamespaceTopicsPromisePtr promise = std::make_shared>(); if (!nsName) { promise->setFailed(ResultInvalidTopicName); return promise->getFuture(); @@ -168,9 +168,9 @@ Future BinaryProtoLookupService::getTopicsOfNamespac return promise->getFuture(); } -Future BinaryProtoLookupService::getSchema(const TopicNamePtr& topicName, - const std::string& version) { - GetSchemaPromisePtr promise = std::make_shared>(); +Future BinaryProtoLookupService::getSchema(const TopicNamePtr& topicName, + const std::string& version) { + GetSchemaPromisePtr promise = std::make_shared>(); if (!topicName) { promise->setFailed(ResultInvalidTopicName); @@ -197,7 +197,7 @@ void BinaryProtoLookupService::sendGetSchemaRequest(const std::string& topicName << " version: " << version); conn->newGetSchema(topicName, version, requestId) - .addListener([promise](Result result, const SchemaInfo& schemaInfo) { + .addListener([promise](Error result, const SchemaInfo& schemaInfo) { if (result != ResultOk) { promise->setFailed(result); return; @@ -228,11 +228,10 @@ void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const std::string std::placeholders::_1, std::placeholders::_2, promise)); } -void BinaryProtoLookupService::getTopicsOfNamespaceListener(Result result, - const NamespaceTopicsPtr& topicsPtr, +void BinaryProtoLookupService::getTopicsOfNamespaceListener(Error result, const NamespaceTopicsPtr& topicsPtr, const NamespaceTopicsPromisePtr& promise) { if (result != ResultOk) { - promise->setFailed(ResultLookupError); + promise->setFailed(Error{ResultLookupError, result.message}); return; } diff --git a/lib/BinaryProtoLookupService.h b/lib/BinaryProtoLookupService.h index 35dcb163..9ee7e4ae 100644 --- a/lib/BinaryProtoLookupService.h +++ b/lib/BinaryProtoLookupService.h @@ -34,8 +34,8 @@ using ClientConnectionWeakPtr = std::weak_ptr; class ConnectionPool; class LookupDataResult; class ServiceNameResolver; -using NamespaceTopicsPromisePtr = std::shared_ptr>; -using GetSchemaPromisePtr = std::shared_ptr>; +using NamespaceTopicsPromisePtr = std::shared_ptr>; +using GetSchemaPromisePtr = std::shared_ptr>; class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService { public: @@ -48,12 +48,12 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService { LookupResultFuture getBroker(const TopicName& topicName) override; - Future getPartitionMetadataAsync(const TopicNamePtr& topicName) override; + Future getPartitionMetadataAsync(const TopicNamePtr& topicName) override; - Future getTopicsOfNamespaceAsync( + Future getTopicsOfNamespaceAsync( const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) override; - Future getSchema(const TopicNamePtr& topicName, const std::string& version) override; + Future getSchema(const TopicNamePtr& topicName, const std::string& version) override; ServiceNameResolver& getServiceNameResolver() override { return serviceNameResolver_; } @@ -75,7 +75,7 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService { const ClientConnectionWeakPtr& clientCnx, const LookupDataResultPromisePtr& promise); - void handlePartitionMetadataLookup(const std::string& topicName, Result result, + void handlePartitionMetadataLookup(const std::string& topicName, Error result, const LookupDataResultPtr& data, const ClientConnectionWeakPtr& clientCnx, const LookupDataResultPromisePtr& promise); @@ -87,7 +87,7 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService { void sendGetSchemaRequest(const std::string& topicName, const std::string& version, Result result, const ClientConnectionWeakPtr& clientCnx, const GetSchemaPromisePtr& promise); - void getTopicsOfNamespaceListener(Result result, const NamespaceTopicsPtr& topicsPtr, + void getTopicsOfNamespaceListener(Error result, const NamespaceTopicsPtr& topicsPtr, const NamespaceTopicsPromisePtr& promise); uint64_t newRequestId(); diff --git a/lib/Client.cc b/lib/Client.cc index dce18ce2..e0d6d5bc 100644 --- a/lib/Client.cc +++ b/lib/Client.cc @@ -70,7 +70,9 @@ std::variant Client::createProducerV2(const std::string& topic) std::variant Client::createProducerV2(const std::string& topic, const ProducerConfiguration& conf) { Promise > promise; - createProducerAsyncV2(topic, conf, [promise](const auto& result) { promise.setValue(result); }); + createProducerAsyncV2(topic, conf, [promise](std::variant&& result) { + promise.setValue(std::move(result)); + }); Future > future = promise.getFuture(); std::variant result; @@ -114,8 +116,9 @@ std::variant Client::subscribeV2(const std::string& topic, const std::string& subscriptionName, const ConsumerConfiguration& conf) { Promise > promise; - subscribeAsyncV2(topic, subscriptionName, conf, - [promise](const auto& result) { promise.setValue(result); }); + subscribeAsyncV2(topic, subscriptionName, conf, [promise](std::variant&& result) { + promise.setValue(std::move(result)); + }); Future > future = promise.getFuture(); std::variant result; @@ -168,8 +171,9 @@ std::variant Client::subscribeV2(const std::vector const std::string& subscriptionName, const ConsumerConfiguration& conf) { Promise > promise; - subscribeAsyncV2(topics, subscriptionName, conf, - [promise](const auto& result) { promise.setValue(result); }); + subscribeAsyncV2(topics, subscriptionName, conf, [promise](std::variant&& result) { + promise.setValue(std::move(result)); + }); Future > future = promise.getFuture(); std::variant result; @@ -233,8 +237,9 @@ Result Client::createReader(const std::string& topic, const MessageId& startMess std::variant Client::createReaderV2(const std::string& topic, const MessageId& startMessageId, const ReaderConfiguration& conf) { Promise > promise; - createReaderAsyncV2(topic, startMessageId, conf, - [promise](const auto& result) { promise.setValue(result); }); + createReaderAsyncV2(topic, startMessageId, conf, [promise](std::variant&& result) { + promise.setValue(std::move(result)); + }); Future > future = promise.getFuture(); std::variant result; diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 620ba932..36eb9b27 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -1048,7 +1048,7 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId, co self->numOfPendingLookupRequest_--; } }); - request->getFuture().addListener([promise](Result result, const LookupDataResultPtr& lookupDataResult) { + request->getFuture().addListener([promise](Error result, const LookupDataResultPtr& lookupDataResult) { if (result == ResultOk) { promise->setValue(lookupDataResult); } else { @@ -1413,13 +1413,13 @@ Future ClientConnection::newGetLastMessageId(u return request->getFuture(); } -Future ClientConnection::newGetTopicsOfNamespace( +Future ClientConnection::newGetTopicsOfNamespace( const std::string& nsName, CommandGetTopicsOfNamespace_Mode mode, uint64_t requestId) { Lock lock(mutex_); if (isClosed()) { lock.unlock(); LOG_ERROR(cnxString() << "Client is not connected to the broker"); - Promise promise; + Promise promise; promise.setFailed(ResultNotConnected); return promise.getFuture(); } @@ -1437,14 +1437,14 @@ Future ClientConnection::newGetTopicsOfNamespace( return request->getFuture(); } -Future ClientConnection::newGetSchema(const std::string& topicName, - const std::string& version, uint64_t requestId) { +Future ClientConnection::newGetSchema(const std::string& topicName, + const std::string& version, uint64_t requestId) { Lock lock(mutex_); if (isClosed()) { lock.unlock(); LOG_ERROR(cnxString() << "Client is not connected to the broker"); - Promise promise; + Promise promise; promise.setFailed(ResultNotConnected); return promise.getFuture(); } @@ -1556,7 +1556,8 @@ void ClientConnection::handlePartitionedMetadataResponse( << " msg: " << partitionMetadataResponse.message()); checkServerError(partitionMetadataResponse.error(), partitionMetadataResponse.message()); request->fail( - getResult(partitionMetadataResponse.error(), partitionMetadataResponse.message())); + Error{getResult(partitionMetadataResponse.error(), partitionMetadataResponse.message()), + partitionMetadataResponse.message()}); } else { LOG_ERROR(cnxString() << "Failed partition-metadata lookup req_id: " << partitionMetadataResponse.request_id() << " with empty response: "); @@ -1628,7 +1629,8 @@ void ClientConnection::handleLookupTopicRespose( << " error: " << lookupTopicResponse.error() << " msg: " << lookupTopicResponse.message()); checkServerError(lookupTopicResponse.error(), lookupTopicResponse.message()); - request->fail(getResult(lookupTopicResponse.error(), lookupTopicResponse.message())); + request->fail(Error{getResult(lookupTopicResponse.error(), lookupTopicResponse.message()), + lookupTopicResponse.message()}); } else { LOG_ERROR(cnxString() << "Failed lookup req_id: " << lookupTopicResponse.request_id() << " with empty response: "); @@ -1699,6 +1701,7 @@ void ClientConnection::handleProducerSuccess(const proto::CommandProducerSuccess void ClientConnection::handleError(const proto::CommandError& error) { Result result = getResult(error.error(), error.message()); + Error errorResult{result, error.has_message() ? error.message() : ""}; LOG_WARN(cnxString() << "Received error response from server: " << result << (error.has_message() ? (" (" + error.message() + ")") : "") << " -- req_id: " << error.request_id()); @@ -1716,27 +1719,51 @@ void ClientConnection::handleError(const proto::CommandError& error) { data.errorMessage = error.message(); } request->fail(result, data); - } else { - auto it = pendingGetLastMessageIdRequests_.find(error.request_id()); - if (it != pendingGetLastMessageIdRequests_.end()) { - auto request = std::move(it->second); - pendingGetLastMessageIdRequests_.erase(it); - lock.unlock(); + return; + } - request->fail(result); - } else { - auto it = pendingGetNamespaceTopicsRequests_.find(error.request_id()); - if (it != pendingGetNamespaceTopicsRequests_.end()) { - auto request = std::move(it->second); - pendingGetNamespaceTopicsRequests_.erase(it); - lock.unlock(); + auto lookupIt = pendingLookupRequests_.find(error.request_id()); + if (lookupIt != pendingLookupRequests_.end()) { + auto request = std::move(lookupIt->second); + pendingLookupRequests_.erase(lookupIt); + numOfPendingLookupRequest_--; + lock.unlock(); - request->fail(result); - } else { - lock.unlock(); - } - } + request->fail(errorResult); + return; + } + + auto lastMessageIdIt = pendingGetLastMessageIdRequests_.find(error.request_id()); + if (lastMessageIdIt != pendingGetLastMessageIdRequests_.end()) { + auto request = std::move(lastMessageIdIt->second); + pendingGetLastMessageIdRequests_.erase(lastMessageIdIt); + lock.unlock(); + + request->fail(result); + return; + } + + auto topicsIt = pendingGetNamespaceTopicsRequests_.find(error.request_id()); + if (topicsIt != pendingGetNamespaceTopicsRequests_.end()) { + auto request = std::move(topicsIt->second); + pendingGetNamespaceTopicsRequests_.erase(topicsIt); + lock.unlock(); + + request->fail(errorResult); + return; } + + auto schemaIt = pendingGetSchemaRequests_.find(error.request_id()); + if (schemaIt != pendingGetSchemaRequests_.end()) { + auto request = std::move(schemaIt->second); + pendingGetSchemaRequests_.erase(schemaIt); + lock.unlock(); + + request->fail(errorResult); + return; + } + + lock.unlock(); } std::string ClientConnection::getMigratedBrokerServiceUrl( @@ -1959,7 +1986,7 @@ void ClientConnection::handleGetSchemaResponse(const proto::CommandGetSchemaResp : "") << " -- req_id: " << response.request_id()); } - request->fail(result); + request->fail(Error{result, response.has_error_message() ? response.error_message() : ""}); return; } diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index f17354c1..f7e07060 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -209,12 +209,12 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this newGetLastMessageId(uint64_t consumerId, uint64_t requestId); - Future newGetTopicsOfNamespace(const std::string& nsName, - CommandGetTopicsOfNamespace_Mode mode, - uint64_t requestId); + Future newGetTopicsOfNamespace(const std::string& nsName, + CommandGetTopicsOfNamespace_Mode mode, + uint64_t requestId); - Future newGetSchema(const std::string& topicName, const std::string& version, - uint64_t requestId); + Future newGetSchema(const std::string& topicName, const std::string& version, + uint64_t requestId); void attachMockServer(const std::shared_ptr& mockServer) { mockServer_ = mockServer; @@ -339,14 +339,14 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this - using RequestMap = std::unordered_map>; + template + using RequestMap = std::unordered_map>; RequestMap pendingRequests_; - RequestMap pendingLookupRequests_; + RequestMap pendingLookupRequests_; RequestMap pendingGetLastMessageIdRequests_; - RequestMap pendingGetNamespaceTopicsRequests_; - RequestMap pendingGetSchemaRequests_; + RequestMap pendingGetNamespaceTopicsRequests_; + RequestMap pendingGetSchemaRequests_; typedef std::unordered_map ProducersMap; ProducersMap producers_; @@ -361,9 +361,9 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this Lock; // Note: this method must be called when holding `mutex_` - template - auto insertRequest(RequestMap& pendingRequests, uint64_t requestId, OnTimeout onTimeout) { - auto request = std::make_shared>( + template + auto insertRequest(RequestMap& pendingRequests, uint64_t requestId, OnTimeout onTimeout) { + auto request = std::make_shared>( executor_->createTimer(operationsTimeout_), [this, self{shared_from_this()}, requestId, onTimeout{std::move(onTimeout)}, &pendingRequests]() mutable { diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index f1e41b4b..f0e1997d 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -191,7 +191,7 @@ void ClientImpl::createProducerAsync(const std::string& topic, const ProducerCon const CreateProducerCallback& callback, bool autoDownloadSchema) { createProducerAsyncV2( topic, conf, - [callback](std::variant result) { + [callback](std::variant&& result) { if (auto producer = std::get_if(&result)) { callback(ResultOk, *producer); } else { @@ -223,9 +223,9 @@ void ClientImpl::createProducerAsyncV2(const std::string& topic, const ProducerC if (autoDownloadSchema) { auto self = shared_from_this(); getSchema(topicName).addListener( - [self, topicName, callback](Result res, const SchemaInfo& topicSchema) { + [self, topicName, callback](const Error& res, const SchemaInfo& topicSchema) { if (res != ResultOk) { - callback(Error{res, ""}); + callback(Error{res.result, res.message}); return; } ProducerConfiguration conf; @@ -241,7 +241,7 @@ void ClientImpl::createProducerAsyncV2(const std::string& topic, const ProducerC } } -void ClientImpl::handleCreateProducerV2(Result result, const LookupDataResultPtr& partitionMetadata, +void ClientImpl::handleCreateProducerV2(const Error& result, const LookupDataResultPtr& partitionMetadata, const TopicNamePtr& topicName, const ProducerConfiguration& conf, const CreateProducerCallbackV2& callback) { if (!result) { @@ -268,7 +268,7 @@ void ClientImpl::handleCreateProducerV2(Result result, const LookupDataResultPtr } else { LOG_ERROR("Error Checking/Getting Partition Metadata while creating producer on " << topicName->toString() << " -- " << result); - callback(Error{result, ""}); + callback(Error{result.result, result.message}); } } @@ -312,7 +312,7 @@ void ClientImpl::handleProducerCreatedV2(Result result, const ProducerImplBaseWe void ClientImpl::createReaderAsync(const std::string& topic, const MessageId& startMessageId, const ReaderConfiguration& conf, const ReaderCallback& callback) { - createReaderAsyncV2(topic, startMessageId, conf, [callback](std::variant result) { + createReaderAsyncV2(topic, startMessageId, conf, [callback](std::variant&& result) { if (auto reader = std::get_if(&result)) { callback(ResultOk, *reader); } else { @@ -370,14 +370,15 @@ void ClientImpl::createTableViewAsync(const std::string& topic, const TableViewC }); } -void ClientImpl::handleReaderMetadataLookupV2(Result result, const LookupDataResultPtr& partitionMetadata, +void ClientImpl::handleReaderMetadataLookupV2(const Error& result, + const LookupDataResultPtr& partitionMetadata, const TopicNamePtr& topicName, const MessageId& startMessageId, const ReaderConfiguration& conf, const ReaderCallbackV2& callback) { if (result != ResultOk) { LOG_ERROR("Error Checking/Getting Partition Metadata while creating readeron " << topicName->toString() << " -- " << result); - callback(Error{result, ""}); + callback(Error{result.result, result.message}); return; } @@ -502,7 +503,7 @@ void ClientImpl::subscribeAsync(const std::vector& originalTopics, const std::string& subscriptionName, const ConsumerConfiguration& conf, const SubscribeCallback& callback) { subscribeAsyncV2(originalTopics, subscriptionName, conf, - [callback](std::variant result) { + [callback](std::variant&& result) { if (auto consumer = std::get_if(&result)) { callback(ResultOk, *consumer); } else { @@ -556,7 +557,7 @@ void ClientImpl::subscribeAsyncV2(const std::vector& originalTopics void ClientImpl::subscribeAsync(const std::string& topic, const std::string& subscriptionName, const ConsumerConfiguration& conf, const SubscribeCallback& callback) { - subscribeAsyncV2(topic, subscriptionName, conf, [callback](std::variant result) { + subscribeAsyncV2(topic, subscriptionName, conf, [callback](std::variant&& result) { if (auto consumer = std::get_if(&result)) { callback(ResultOk, *consumer); } else { @@ -592,7 +593,7 @@ void ClientImpl::subscribeAsyncV2(const std::string& topic, const std::string& s std::placeholders::_2, topicName, subscriptionName, conf, callback)); } -void ClientImpl::handleSubscribeV2(Result result, const LookupDataResultPtr& partitionMetadata, +void ClientImpl::handleSubscribeV2(const Error& result, const LookupDataResultPtr& partitionMetadata, const TopicNamePtr& topicName, const std::string& subscriptionName, ConsumerConfiguration conf, const SubscribeCallbackV2& callback) { if (result == ResultOk) { @@ -632,7 +633,7 @@ void ClientImpl::handleSubscribeV2(Result result, const LookupDataResultPtr& par } else { LOG_ERROR("Error Checking/Getting Partition Metadata while Subscribing on " << topicName->toString() << " -- " << result); - callback(Error{result, ""}); + callback(Error{result.result, result.message}); } } diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h index c0cb1eb8..680bd8f5 100644 --- a/lib/ClientImpl.h +++ b/lib/ClientImpl.h @@ -184,15 +184,15 @@ class ClientImpl : public std::enable_shared_from_this { friend class PulsarFriend; private: - void handleCreateProducerV2(Result result, const LookupDataResultPtr& partitionMetadata, + void handleCreateProducerV2(const Error& result, const LookupDataResultPtr& partitionMetadata, const TopicNamePtr& topicName, const ProducerConfiguration& conf, const CreateProducerCallbackV2& callback); - void handleSubscribeV2(Result result, const LookupDataResultPtr& partitionMetadata, + void handleSubscribeV2(const Error& result, const LookupDataResultPtr& partitionMetadata, const TopicNamePtr& topicName, const std::string& consumerName, ConsumerConfiguration conf, const SubscribeCallbackV2& callback); - void handleReaderMetadataLookupV2(Result result, const LookupDataResultPtr& partitionMetadata, + void handleReaderMetadataLookupV2(const Error& result, const LookupDataResultPtr& partitionMetadata, const TopicNamePtr& topicName, const MessageId& startMessageId, const ReaderConfiguration& conf, const ReaderCallbackV2& callback); diff --git a/lib/HTTPLookupService.cc b/lib/HTTPLookupService.cc index 0be9713c..8e436521 100644 --- a/lib/HTTPLookupService.cc +++ b/lib/HTTPLookupService.cc @@ -93,7 +93,7 @@ auto HTTPLookupService::getBroker(const TopicName &topicName) -> LookupResultFut return promise.getFuture(); } -Future HTTPLookupService::getPartitionMetadataAsync( +Future HTTPLookupService::getPartitionMetadataAsync( const TopicNamePtr &topicName) { LookupPromise promise; std::stringstream completeUrlStream; @@ -116,7 +116,7 @@ Future HTTPLookupService::getPartitionMetadataAsync return promise.getFuture(); } -Future HTTPLookupService::getTopicsOfNamespaceAsync( +Future HTTPLookupService::getTopicsOfNamespaceAsync( const NamespaceNamePtr &nsName, CommandGetTopicsOfNamespace_Mode mode) { NamespaceTopicsPromise promise; std::stringstream completeUrlStream; @@ -148,9 +148,9 @@ Future HTTPLookupService::getTopicsOfNamespaceAsync( return promise.getFuture(); } -Future HTTPLookupService::getSchema(const TopicNamePtr &topicName, - const std::string &version) { - Promise promise; +Future HTTPLookupService::getSchema(const TopicNamePtr &topicName, + const std::string &version) { + Promise promise; std::stringstream completeUrlStream; const auto &url = serviceNameResolver_.resolveHost(); @@ -178,7 +178,7 @@ void HTTPLookupService::handleNamespaceTopicsHTTPRequest(const NamespaceTopicsPr Result result = sendHTTPRequest(completeUrl, responseData); if (result != ResultOk) { - promise.setFailed(result); + promise.setFailed(Error{result, responseData}); } else { promise.setValue(parseNamespaceTopicsData(responseData)); } @@ -356,7 +356,7 @@ void HTTPLookupService::handleLookupHTTPRequest(const LookupPromise &promise, co Result result = sendHTTPRequest(completeUrl, responseData); if (result != ResultOk) { - promise.setFailed(result); + promise.setFailed(Error{result, responseData}); } else { promise.setValue((requestType == PartitionMetaData) ? parsePartitionData(responseData) : parseLookupData(responseData)); @@ -370,9 +370,9 @@ void HTTPLookupService::handleGetSchemaHTTPRequest(const GetSchemaPromise &promi Result result = sendHTTPRequest(completeUrl, responseData, responseCode); if (responseCode == 404) { - promise.setFailed(ResultTopicNotFound); + promise.setFailed(Error{ResultTopicNotFound, responseData}); } else if (result != ResultOk) { - promise.setFailed(result); + promise.setFailed(Error{result, responseData}); } else { ptree::ptree root; std::stringstream stream(responseData); diff --git a/lib/HTTPLookupService.h b/lib/HTTPLookupService.h index 61a06155..be561c38 100644 --- a/lib/HTTPLookupService.h +++ b/lib/HTTPLookupService.h @@ -30,9 +30,9 @@ namespace pulsar { class ServiceNameResolver; -using NamespaceTopicsPromise = Promise; +using NamespaceTopicsPromise = Promise; using NamespaceTopicsPromisePtr = std::shared_ptr; -using GetSchemaPromise = Promise; +using GetSchemaPromise = Promise; class HTTPLookupService : public LookupService, public std::enable_shared_from_this { enum RequestType : uint8_t @@ -41,7 +41,7 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t PartitionMetaData }; - typedef Promise LookupPromise; + typedef Promise LookupPromise; ExecutorServiceProviderPtr executorProvider_; ServiceNameResolver serviceNameResolver_; @@ -73,11 +73,11 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t LookupResultFuture getBroker(const TopicName& topicName) override; - Future getPartitionMetadataAsync(const TopicNamePtr&) override; + Future getPartitionMetadataAsync(const TopicNamePtr&) override; - Future getSchema(const TopicNamePtr& topicName, const std::string& version) override; + Future getSchema(const TopicNamePtr& topicName, const std::string& version) override; - Future getTopicsOfNamespaceAsync( + Future getTopicsOfNamespaceAsync( const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) override; ServiceNameResolver& getServiceNameResolver() override { return serviceNameResolver_; } diff --git a/lib/LookupDataResult.h b/lib/LookupDataResult.h index 81e50ccd..8f6e740e 100644 --- a/lib/LookupDataResult.h +++ b/lib/LookupDataResult.h @@ -29,7 +29,7 @@ namespace pulsar { class LookupDataResult; typedef std::shared_ptr LookupDataResultPtr; -typedef Promise LookupDataResultPromise; +typedef Promise LookupDataResultPromise; typedef std::shared_ptr LookupDataResultPromisePtr; class LookupDataResult { diff --git a/lib/LookupService.h b/lib/LookupService.h index 684984fc..edebe7ec 100644 --- a/lib/LookupService.h +++ b/lib/LookupService.h @@ -50,8 +50,8 @@ class LookupService { << ", physical address: " << lookupResult.physicalAddress; } }; - using LookupResultFuture = Future; - using LookupResultPromise = Promise; + using LookupResultFuture = Future; + using LookupResultPromise = Promise; /** * Call broker lookup-api to get broker which serves namespace bundle that contains the given topic. @@ -67,14 +67,14 @@ class LookupService { * * Gets Partition metadata */ - virtual Future getPartitionMetadataAsync(const TopicNamePtr& topicName) = 0; + virtual Future getPartitionMetadataAsync(const TopicNamePtr& topicName) = 0; /** * @param namespace - namespace-name * * Returns all the topics name for a given namespace. */ - virtual Future getTopicsOfNamespaceAsync( + virtual Future getTopicsOfNamespaceAsync( const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) = 0; /** @@ -84,8 +84,8 @@ class LookupService { * @param version the schema version byte array, if it's empty, use the latest version * @return SchemaInfo */ - virtual Future getSchema(const TopicNamePtr& topicName, - const std::string& version = "") = 0; + virtual Future getSchema(const TopicNamePtr& topicName, + const std::string& version = "") = 0; virtual ServiceNameResolver& getServiceNameResolver() = 0; diff --git a/lib/MockServer.h b/lib/MockServer.h index 63a11cc9..796c8f1e 100644 --- a/lib/MockServer.h +++ b/lib/MockServer.h @@ -94,7 +94,27 @@ class MockServer : public std::enable_shared_from_this { } schedule(connection, request + std::to_string(requestId), iter->second, [connection, request, requestId, shouldFail, error, message] { - if (shouldFail) { + if (shouldFail && request == "PARTITIONED_METADATA") { + proto::CommandPartitionedTopicMetadataResponse response; + response.set_request_id(requestId); + response.set_response(proto::CommandPartitionedTopicMetadataResponse::Failed); + response.set_error(error); + response.set_message(message); + connection->handlePartitionedMetadataResponse(response); + } else if (shouldFail && request == "LOOKUP") { + proto::CommandLookupTopicResponse response; + response.set_request_id(requestId); + response.set_response(proto::CommandLookupTopicResponse::Failed); + response.set_error(error); + response.set_message(message); + connection->handleLookupTopicRespose(response); + } else if (shouldFail && request == "GET_SCHEMA") { + proto::CommandGetSchemaResponse response; + response.set_request_id(requestId); + response.set_error_code(error); + response.set_error_message(message); + connection->handleGetSchemaResponse(response); + } else if (shouldFail) { proto::CommandError response; response.set_request_id(requestId); response.set_error(error); @@ -110,6 +130,12 @@ class MockServer : public std::enable_shared_from_this { response.set_response(proto::CommandLookupTopicResponse_LookupType_Connect); response.set_brokerserviceurl("pulsar://localhost:6650"); connection->handleLookupTopicRespose(response); + } else if (request == "PARTITIONED_METADATA") { + proto::CommandPartitionedTopicMetadataResponse response; + response.set_request_id(requestId); + response.set_response(proto::CommandPartitionedTopicMetadataResponse::Success); + response.set_partitions(0); + connection->handlePartitionedMetadataResponse(response); } else if (request == "GET_LAST_MESSAGE_ID") { proto::CommandGetLastMessageIdResponse response; response.set_request_id(requestId); diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index 04d6cf08..2597688a 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -188,10 +188,13 @@ Future MultiTopicsConsumerImpl::subscribeOneTopicAsync(const s return topicPromise->getFuture(); } client->getPartitionMetadataAsync(topicName).addListener( - [this, topicName, topicPromise](Result result, const LookupDataResultPtr& lookupDataResult) { + [this, topicName, topicPromise](Error result, const LookupDataResultPtr& lookupDataResult) { if (result != ResultOk) { LOG_ERROR("Error Checking/Getting Partition Metadata while MultiTopics Subscribing- " - << consumerStr_ << " result: " << result) + << consumerStr_ << " result: " << result) { + Lock lock(mutex_); + lastErrorMessage_ = result.message; + } topicPromise->setFailed(result); return; } diff --git a/lib/PendingRequest.h b/lib/PendingRequest.h index 8a9f9633..4ee6f963 100644 --- a/lib/PendingRequest.h +++ b/lib/PendingRequest.h @@ -30,8 +30,8 @@ namespace pulsar { -template -class PendingRequest : public std::enable_shared_from_this> { +template +class PendingRequest : public std::enable_shared_from_this> { public: PendingRequest(ASIO::steady_timer timer, std::function timeoutCallback) : timer_(std::move(timer)), timeoutCallback_(std::move(timeoutCallback)) {} @@ -52,12 +52,12 @@ class PendingRequest : public std::enable_shared_from_this> { cancelTimer(timer_); } - void fail(Result result) { + void fail(ResultType result) { promise_.setFailed(result); cancelTimer(timer_); } - void fail(Result result, const T& value) { + void fail(ResultType result, const T& value) { promise_.setFailed(result, value); cancelTimer(timer_); } @@ -70,12 +70,12 @@ class PendingRequest : public std::enable_shared_from_this> { private: ASIO::steady_timer timer_; - Promise promise_; + Promise promise_; std::function timeoutCallback_; std::atomic_bool timeoutDisabled_{false}; }; -template -using PendingRequestPtr = std::shared_ptr>; +template +using PendingRequestPtr = std::shared_ptr>; } // namespace pulsar diff --git a/lib/RetryableLookupService.h b/lib/RetryableLookupService.h index 7f50cf12..a16f451d 100644 --- a/lib/RetryableLookupService.h +++ b/lib/RetryableLookupService.h @@ -57,20 +57,20 @@ class RetryableLookupService : public LookupService { [this, topicName] { return lookupService_->getBroker(topicName); }); } - Future getPartitionMetadataAsync(const TopicNamePtr& topicName) override { + Future getPartitionMetadataAsync(const TopicNamePtr& topicName) override { return partitionLookupCache_->run( "get-partition-metadata-" + topicName->toString(), [this, topicName] { return lookupService_->getPartitionMetadataAsync(topicName); }); } - Future getTopicsOfNamespaceAsync( + Future getTopicsOfNamespaceAsync( const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) override { return namespaceLookupCache_->run( "get-topics-of-namespace-" + nsName->toString() + "-" + std::to_string(mode), [this, nsName, mode] { return lookupService_->getTopicsOfNamespaceAsync(nsName, mode); }); } - Future getSchema(const TopicNamePtr& topicName, const std::string& version) override { + Future getSchema(const TopicNamePtr& topicName, const std::string& version) override { return getSchemaCache_->run("get-schema" + topicName->toString(), [this, topicName, version] { return lookupService_->getSchema(topicName, version); }); @@ -82,20 +82,20 @@ class RetryableLookupService : public LookupService { private: const std::shared_ptr lookupService_; - RetryableOperationCachePtr lookupCache_; - RetryableOperationCachePtr partitionLookupCache_; - RetryableOperationCachePtr namespaceLookupCache_; - RetryableOperationCachePtr getSchemaCache_; + RetryableOperationCachePtr lookupCache_; + RetryableOperationCachePtr partitionLookupCache_; + RetryableOperationCachePtr namespaceLookupCache_; + RetryableOperationCachePtr getSchemaCache_; RetryableLookupService(std::shared_ptr lookupService, TimeDuration timeout, ExecutorServiceProviderPtr executorProvider) : lookupService_(std::move(lookupService)), - lookupCache_(RetryableOperationCache::create(executorProvider, timeout)), + lookupCache_(RetryableOperationCache::create(executorProvider, timeout)), partitionLookupCache_( - RetryableOperationCache::create(executorProvider, timeout)), + RetryableOperationCache::create(executorProvider, timeout)), namespaceLookupCache_( - RetryableOperationCache::create(executorProvider, timeout)), - getSchemaCache_(RetryableOperationCache::create(executorProvider, timeout)) {} + RetryableOperationCache::create(executorProvider, timeout)), + getSchemaCache_(RetryableOperationCache::create(executorProvider, timeout)) {} }; } // namespace pulsar diff --git a/lib/RetryableOperation.h b/lib/RetryableOperation.h index f4b056ed..db0d89c4 100644 --- a/lib/RetryableOperation.h +++ b/lib/RetryableOperation.h @@ -35,13 +35,13 @@ namespace pulsar { -template -class RetryableOperation : public std::enable_shared_from_this> { +template +class RetryableOperation : public std::enable_shared_from_this> { struct PassKey { explicit PassKey() {} }; - RetryableOperation(const std::string& name, std::function()>&& func, + RetryableOperation(const std::string& name, std::function()>&& func, TimeDuration timeout, DeadlineTimerPtr timer) : name_(name), func_(std::move(func)), @@ -54,11 +54,11 @@ class RetryableOperation : public std::enable_shared_from_this(args)...) {} template - static std::shared_ptr> create(Args&&... args) { - return std::make_shared>(PassKey{}, std::forward(args)...); + static std::shared_ptr> create(Args&&... args) { + return std::make_shared>(PassKey{}, std::forward(args)...); } - Future run() { + Future run() { bool expected = false; if (!started_.compare_exchange_strong(expected, true)) { return promise_.getFuture(); @@ -73,10 +73,10 @@ class RetryableOperation : public std::enable_shared_from_this()> func_; + std::function()> func_; const TimeDuration timeout_; Backoff backoff_; - Promise promise_; + Promise promise_; std::atomic_bool started_{false}; DeadlineTimerPtr timer_; @@ -84,19 +84,19 @@ class RetryableOperation : public std::enable_shared_from_this + Future runImpl(TimeDuration remainingTime) { - std::weak_ptr> weakSelf{this->shared_from_this()}; - func_().addListener([this, weakSelf, remainingTime](Result result, const T& value) { + std::weak_ptr> weakSelf{this->shared_from_this()}; + func_().addListener([this, weakSelf, remainingTime](ResultType result, const T& value) { auto self = weakSelf.lock(); if (!self) { return; } - if (result == ResultOk) { + if (static_cast(result) == ResultOk) { promise_.setValue(value); return; } - if (!isResultRetryable(result)) { + if (!isResultRetryable(static_cast(result))) { promise_.setFailed(result); return; } diff --git a/lib/RetryableOperationCache.h b/lib/RetryableOperationCache.h index fa4c8fc1..ed3082c3 100644 --- a/lib/RetryableOperationCache.h +++ b/lib/RetryableOperationCache.h @@ -27,14 +27,14 @@ namespace pulsar { -template +template class RetryableOperationCache; -template -using RetryableOperationCachePtr = std::shared_ptr>; +template +using RetryableOperationCachePtr = std::shared_ptr>; -template -class RetryableOperationCache : public std::enable_shared_from_this> { +template +class RetryableOperationCache : public std::enable_shared_from_this> { friend class LookupServiceTest; friend class RetryableOperationCacheTest; struct PassKey { @@ -44,7 +44,7 @@ class RetryableOperationCache : public std::enable_shared_from_this; + using Self = RetryableOperationCache; public: template @@ -56,10 +56,10 @@ class RetryableOperationCache : public std::enable_shared_from_this(PassKey{}, std::forward(args)...); } - Future run(const std::string& key, std::function()>&& func) { + Future run(const std::string& key, std::function()>&& func) { std::unique_lock lock{mutex_}; if (closed_) { - Promise promise; + Promise promise; promise.setFailed(ResultAlreadyClosed); return promise.getFuture(); } @@ -70,18 +70,18 @@ class RetryableOperationCache : public std::enable_shared_from_thisget()->createDeadlineTimer(); } catch (const std::runtime_error& e) { LOG_ERROR("Failed to retry lookup for " << key << ": " << e.what()); - Promise promise; + Promise promise; promise.setFailed(ResultConnectError); return promise.getFuture(); } - auto operation = RetryableOperation::create(key, std::move(func), timeout_, timer); + auto operation = RetryableOperation::create(key, std::move(func), timeout_, timer); auto future = operation->run(); operations_[key] = operation; lock.unlock(); auto weakSelf = this->weak_from_this(); - future.addListener([this, weakSelf, key, operation](Result, const T&) { + future.addListener([this, weakSelf, key, operation](ResultType, const T&) { auto self = weakSelf.lock(); if (!self) { return; @@ -118,7 +118,7 @@ class RetryableOperationCache : public std::enable_shared_from_this>> operations_; + std::unordered_map>> operations_; bool closed_{false}; mutable std::mutex mutex_; diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc index 807b1c7f..27008487 100644 --- a/tests/ClientTest.cc +++ b/tests/ClientTest.cc @@ -28,6 +28,7 @@ #include #include #include +#include #include "MockClientImpl.h" #include "PulsarAdminHelper.h" @@ -358,6 +359,40 @@ TEST(ClientTest, testRequestErrorMessageIsReturnedInResponseData) { executorProvider->close(); } +TEST(ClientTest, testPartitionMetadataErrorMessageIsReturnedInError) { + ClientConfiguration conf; + + auto executorProvider = std::make_shared(1); + AtomicSharedPtr serviceInfo; + serviceInfo.store(std::make_shared(lookupUrl)); + ConnectionPool pool(serviceInfo, conf, executorProvider, ""); + auto connection = std::make_shared(lookupUrl, lookupUrl, *serviceInfo.load(), + executorProvider->get(), conf, "", pool, 0); + PulsarFriend::setServerProtocolVersion(*connection, 8); + + auto mockServer = std::make_shared(connection); + connection->attachMockServer(mockServer); + + const std::string errorMessage = "partition metadata auth failed"; + mockServer->setRequestDelay({{"PARTITIONED_METADATA", 1}}); + mockServer->setRequestError("PARTITIONED_METADATA", proto::AuthenticationError, errorMessage); + + auto lookupPromise = std::make_shared(); + auto lookupFuture = lookupPromise->getFuture(); + connection->newPartitionedMetadataLookup("persistent://public/default/metadata-error", 0, lookupPromise); + + LookupDataResultPtr lookupData; + Error error = lookupFuture.get(lookupData); + ASSERT_EQ(ResultAuthenticationError, error.result); + ASSERT_EQ(errorMessage, error.message); + ASSERT_EQ(0u, PulsarFriend::getPendingLookupRequests(*connection)); + ASSERT_EQ(0u, PulsarFriend::getNumOfPendingLookupRequests(*connection)); + ASSERT_EQ(0u, mockServer->close()); + + connection->close(ResultDisconnected).wait(); + executorProvider->close(); +} + TEST(ClientTest, testCreateProducerV2ReturnsError) { Client client(lookupUrl); @@ -764,3 +799,46 @@ TEST(ClientTest, testNoRetry) { ASSERT_TRUE(result.timeMs < 1000) << "consumer: " << result.timeMs << " ms"; } } + +template +struct overloaded : Ts... { + using Ts::operator()...; +}; + +template +overloaded(Ts...) -> overloaded; + +TEST(ClientTest, testV2Creation) { + auto topic = "persistent://private/ns/client-test-v2-creation-" + std::to_string(time(nullptr)); + + Client clientWithoutAuth(lookupUrl); + + std::visit(overloaded{ + [](const auto &) { FAIL() << "Producer creation should fail without auth"; }, + [](const Error &error) { + ASSERT_EQ(ResultAuthorizationError, error.result); + ASSERT_EQ("Client is not authorized to Get Partition Metadata", error.message); + }, + }, + clientWithoutAuth.createProducerV2(topic)); + + std::visit(overloaded{ + [](const auto &) { FAIL() << "Producer creation should fail without auth"; }, + [](const Error &error) { + ASSERT_EQ(ResultAuthorizationError, error.result); + ASSERT_EQ("Client is not authorized to Get Partition Metadata", error.message); + }, + }, + clientWithoutAuth.subscribeV2(topic, "sub")); + + std::visit(overloaded{ + [](const auto &) { FAIL() << "Producer creation should fail without auth"; }, + [](const Error &error) { + ASSERT_EQ(ResultAuthorizationError, error.result); + ASSERT_EQ("Client is not authorized to Get Partition Metadata", error.message); + }, + }, + clientWithoutAuth.createReaderV2(topic, MessageId::earliest(), {})); + + clientWithoutAuth.close(); +} diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc index 53cb76e1..2da254ea 100644 --- a/tests/LookupServiceTest.cc +++ b/tests/LookupServiceTest.cc @@ -59,8 +59,8 @@ class LookupServiceTest : public ::testing::TestWithParam { } void TearDown() override { client_.close(); } - template - static bool isEmpty(const RetryableOperationCache& cache) { + template + static bool isEmpty(const RetryableOperationCache& cache) { std::lock_guard lock{cache.mutex_}; return cache.operations_.empty(); } @@ -91,7 +91,7 @@ TEST(LookupServiceTest, basicLookup) { TopicNamePtr topicName = TopicName::get("topic"); - Future partitionFuture = lookupService.getPartitionMetadataAsync(topicName); + Future partitionFuture = lookupService.getPartitionMetadataAsync(topicName); LookupDataResultPtr lookupData; partitionFuture.get(lookupData); ASSERT_TRUE(lookupData != NULL); @@ -275,7 +275,7 @@ TEST_P(LookupServiceTest, basicGetNamespaceTopics) { auto lookupServicePtr = PulsarFriend::getClientImplPtr(client_); auto verifyGetTopics = [&](CommandGetTopicsOfNamespace_Mode mode, const std::set& expectedTopics) { - Future getTopicsFuture = + Future getTopicsFuture = lookupServicePtr->getTopicsOfNamespaceAsync(nsName, mode); NamespaceTopicsPtr topicsData; result = getTopicsFuture.get(topicsData); @@ -510,12 +510,12 @@ class MockLookupService : public BinaryProtoLookupService { public: using BinaryProtoLookupService::BinaryProtoLookupService; - Future getPartitionMetadataAsync(const TopicNamePtr& topicName) override { + Future getPartitionMetadataAsync(const TopicNamePtr& topicName) override { bool expected = true; if (firstTime_.compare_exchange_strong(expected, false)) { // Trigger the retry LOG_INFO("Fail the lookup for " << topicName->toString() << " intentionally"); - Promise promise; + Promise promise; promise.setFailed(ResultRetryable); return promise.getFuture(); }