diff --git a/include/pingcap/kv/Cluster.h b/include/pingcap/kv/Cluster.h index ec2eac07..69053423 100644 --- a/include/pingcap/kv/Cluster.h +++ b/include/pingcap/kv/Cluster.h @@ -34,8 +34,8 @@ struct Cluster Cluster() : pd_client(std::make_shared()) - , rpc_client(std::make_unique()) - , thread_pool(std::make_unique(1)) + , rpc_client(std::make_unique(pd_client, ClusterConfig{})) + , thread_pool(std::make_unique(2)) , mpp_prober(std::make_unique(this)) { startBackgroundTasks(); @@ -44,11 +44,11 @@ struct Cluster Cluster(const std::vector & pd_addrs, const ClusterConfig & config) : pd_client(std::make_shared(pd_addrs, config)) , region_cache(std::make_unique(pd_client, config)) - , rpc_client(std::make_unique(config)) + , rpc_client(std::make_unique(pd_client, config)) , oracle(std::make_unique(pd_client, std::chrono::milliseconds(oracle_update_interval))) , lock_resolver(std::make_unique(this)) , api_version(config.api_version) - , thread_pool(std::make_unique(2)) + , thread_pool(std::make_unique(3)) , mpp_prober(std::make_unique(this)) { startBackgroundTasks(); @@ -64,6 +64,7 @@ struct Cluster // (e.g. background threads) that cluster object holds so as to exit elegantly. ~Cluster() { + rpc_client->stop(); mpp_prober->stop(); if (region_cache) region_cache->stop(); diff --git a/include/pingcap/kv/RegionClient.h b/include/pingcap/kv/RegionClient.h index 4da18096..4ba701de 100644 --- a/include/pingcap/kv/RegionClient.h +++ b/include/pingcap/kv/RegionClient.h @@ -81,7 +81,7 @@ struct RegionClient } std::string err_msg = rpc.errMsg(status, extra_msg); log->warning(err_msg); - onSendFail(bo, Exception(err_msg, GRPCErrorCode), ctx); + onSendFail(bo, Exception(err_msg, GRPCErrorCode), ctx, status); continue; } if (resp->has_region_error()) @@ -202,7 +202,7 @@ struct RegionClient } std::string err_msg = rpc.errMsg(status, extra_msg); log->warning(err_msg); - onSendFail(bo, Exception(err_msg, GRPCErrorCode), ctx); + onSendFail(bo, Exception(err_msg, GRPCErrorCode), ctx, status); } } @@ -210,7 +210,7 @@ struct RegionClient void onRegionError(Backoffer & bo, RPCContextPtr rpc_ctx, const errorpb::Error & err) const; // Normally, it happens when machine down or network partition between tidb and kv or process crash. - void onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx) const; + void onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx, const ::grpc::Status & status) const; }; } // namespace kv diff --git a/include/pingcap/kv/Rpc.h b/include/pingcap/kv/Rpc.h index 7439b2fd..5e0fabd5 100644 --- a/include/pingcap/kv/Rpc.h +++ b/include/pingcap/kv/Rpc.h @@ -5,13 +5,21 @@ #include #include +#include +#include +#include +#include #include +#include #include namespace pingcap { namespace kv { +constexpr auto rpc_conn_check_interval = std::chrono::minutes(10); +constexpr auto rpc_conn_check_interval_jitter = std::chrono::minutes(5); + struct ConnArray { std::mutex mutex; @@ -33,24 +41,47 @@ using GRPCMetaData = std::multimap; struct RpcClient { ClusterConfig config; + pd::ClientPtr pd_client; std::mutex mutex; std::map conns; + Logger * log = &Logger::get("pingcap.RpcClient"); + std::chrono::minutes scan_interval = rpc_conn_check_interval; + std::atomic stopped = false; + std::condition_variable scan_cv; + std::vector invalid_conns; + RpcClient() = default; explicit RpcClient(const ClusterConfig & config_) : config(config_) {} + RpcClient(pd::ClientPtr pd_client_, const ClusterConfig & config_) + : config(config_) + , pd_client(std::move(pd_client_)) + {} + void update(const ClusterConfig & config_) { std::unique_lock lk(mutex); config = config_; conns.clear(); + invalid_conns.clear(); } + void run(); + + void stop(); + + void scanConns(); + + void removeConn(const std::string & addr); + + void removeInvalidConns(); + ConnArrayPtr getConnArray(const std::string & addr); ConnArrayPtr createConnArray(const std::string & addr); @@ -58,6 +89,12 @@ struct RpcClient using RpcClientPtr = std::unique_ptr; +inline void dropConnIfNeeded(const RpcClientPtr & client, const std::string & addr, const ::grpc::Status & status) +{ + if (status.error_code() == grpc::StatusCode::UNAVAILABLE) + client->removeConn(addr); +} + // RpcCall holds the request and response, and delegates RPC calls. template class RpcCall diff --git a/src/kv/Cluster.cc b/src/kv/Cluster.cc index 70b6c903..a5a10eb0 100644 --- a/src/kv/Cluster.cc +++ b/src/kv/Cluster.cc @@ -31,6 +31,9 @@ void Cluster::startBackgroundTasks() { thread_pool->start(); + thread_pool->enqueue([this] { + rpc_client->run(); + }); thread_pool->enqueue([this] { mpp_prober->run(); }); diff --git a/src/kv/RegionClient.cc b/src/kv/RegionClient.cc index 6c369f92..faf4599c 100644 --- a/src/kv/RegionClient.cc +++ b/src/kv/RegionClient.cc @@ -106,8 +106,9 @@ void RegionClient::onRegionError(Backoffer & bo, RPCContextPtr rpc_ctx, const er cluster->region_cache->dropRegion(rpc_ctx->region); } -void RegionClient::onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx) const +void RegionClient::onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx, const ::grpc::Status & status) const { + dropConnIfNeeded(cluster->rpc_client, rpc_ctx->addr, status); cluster->region_cache->onSendReqFail(rpc_ctx, e); // Retry on send request failure when it's not canceled. // When a store is not available, the leader of related region should be elected quickly. diff --git a/src/kv/Rpc.cc b/src/kv/Rpc.cc index dddb4bc9..94c0e3a1 100644 --- a/src/kv/Rpc.cc +++ b/src/kv/Rpc.cc @@ -1,9 +1,40 @@ +#include #include +#include +#include + namespace pingcap { namespace kv { +namespace +{ +std::unordered_set getStoreAddresses(const pd::ClientPtr & pd_client) +{ + std::unordered_set store_addrs; + const auto stores = pd_client->getAllStores(true); + store_addrs.reserve(stores.size()); + for (const auto & store : stores) + { + if (!store.address().empty()) + store_addrs.emplace(store.address()); + } + return store_addrs; +} + +std::chrono::seconds getRandomScanInterval(std::chrono::minutes scan_interval) +{ + const auto min_seconds = std::chrono::duration_cast(scan_interval); + const auto max_seconds = std::chrono::duration_cast( + scan_interval + rpc_conn_check_interval_jitter); + + thread_local std::mt19937_64 generator(std::random_device{}()); + std::uniform_int_distribution distribution(min_seconds.count(), max_seconds.count()); + return std::chrono::seconds(distribution(generator)); +} +} // namespace + ConnArray::ConnArray(size_t max_size, const std::string & addr, const ClusterConfig & config_) : address(addr) , index(0) @@ -22,6 +53,92 @@ std::shared_ptr ConnArray::get() return vec[index]; } +void RpcClient::run() +{ + while (!stopped.load()) + { + { + const auto wait_interval = getRandomScanInterval(scan_interval); + std::unique_lock lock(mutex); + scan_cv.wait_for(lock, wait_interval, [this] { + return stopped.load(); + }); + } + + if (stopped.load()) + return; + + try + { + scanConns(); + removeInvalidConns(); + } + catch (...) + { + log->warning(getCurrentExceptionMsg("RpcClient scan conns failed: ")); + } + } +} + +void RpcClient::stop() +{ + stopped.store(true); + scan_cv.notify_all(); +} + +void RpcClient::scanConns() +{ + std::vector conn_snapshot; + { + std::lock_guard lock(mutex); + conn_snapshot.reserve(conns.size()); + for (const auto & conn : conns) + conn_snapshot.emplace_back(conn.first); + } + + if (conn_snapshot.empty() || !pd_client || pd_client->isMock()) + return; + + const auto store_addrs = getStoreAddresses(pd_client); + std::vector conns_to_remove; + for (const auto & addr : conn_snapshot) + { + if (store_addrs.find(addr) == store_addrs.end()) + conns_to_remove.emplace_back(addr); + } + + if (conns_to_remove.empty()) + return; + + std::lock_guard lock(mutex); + for (const auto & addr : conns_to_remove) + { + invalid_conns.push_back(addr); + } +} + +void RpcClient::removeConn(const std::string & addr) +{ + std::lock_guard lock(mutex); + if (conns.erase(addr)) + log->information("delete invalid addr: " + addr); +} + +void RpcClient::removeInvalidConns() +{ + std::lock_guard lock(mutex); + if (invalid_conns.empty()) + return; + + for (const auto & addr : invalid_conns) + { + if (conns.erase(addr)) + log->information("delete invalid addr: " + addr); + } + + invalid_conns.clear(); +} + ConnArrayPtr RpcClient::getConnArray(const std::string & addr) { std::lock_guard lock(mutex); @@ -39,6 +156,5 @@ ConnArrayPtr RpcClient::createConnArray(const std::string & addr) conns[addr] = conn_array; return conn_array; } - } // namespace kv } // namespace pingcap