-
Notifications
You must be signed in to change notification settings - Fork 52
Remove useless grpc connections #233
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
ccc48d4
ee16376
80a14ea
a3ecf4d
6badc3b
7a5e808
a07dd67
9451c37
6ff1ddc
7264109
148caa0
ed20a9c
93e6e1c
cd31303
88f5a7f
1530578
6969757
f27b1e5
f627bc2
798b04a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -106,8 +106,9 @@ void RegionClient::onRegionError(Backoffer & bo, RPCContextPtr rpc_ctx, const er | |
| cluster->region_cache->dropRegion(rpc_ctx->region); | ||
| } | ||
|
|
||
| void RegionClient::onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx) const | ||
| void RegionClient::onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx, const ::grpc::Status & status) const | ||
| { | ||
| dropConnIfNeeded(cluster->rpc_client, rpc_ctx->addr, status); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. better to call it after |
||
| cluster->region_cache->onSendReqFail(rpc_ctx, e); | ||
| // Retry on send request failure when it's not canceled. | ||
| // When a store is not available, the leader of related region should be elected quickly. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,9 +1,40 @@ | ||
| #include <pingcap/Exception.h> | ||
| #include <pingcap/kv/Rpc.h> | ||
|
|
||
| #include <random> | ||
| #include <unordered_set> | ||
|
|
||
| namespace pingcap | ||
| { | ||
| namespace kv | ||
| { | ||
| namespace | ||
| { | ||
| std::unordered_set<std::string> getStoreAddresses(const pd::ClientPtr & pd_client) | ||
| { | ||
| std::unordered_set<std::string> store_addrs; | ||
| const auto stores = pd_client->getAllStores(true); | ||
| store_addrs.reserve(stores.size()); | ||
| for (const auto & store : stores) | ||
| { | ||
| if (!store.address().empty()) | ||
| store_addrs.emplace(store.address()); | ||
| } | ||
| return store_addrs; | ||
| } | ||
|
|
||
| std::chrono::seconds getRandomScanInterval(std::chrono::minutes scan_interval) | ||
| { | ||
| const auto min_seconds = std::chrono::duration_cast<std::chrono::seconds>(scan_interval); | ||
| const auto max_seconds = std::chrono::duration_cast<std::chrono::seconds>( | ||
| scan_interval + rpc_conn_check_interval_jitter); | ||
|
|
||
| thread_local std::mt19937_64 generator(std::random_device{}()); | ||
| std::uniform_int_distribution<std::chrono::seconds::rep> distribution(min_seconds.count(), max_seconds.count()); | ||
| return std::chrono::seconds(distribution(generator)); | ||
| } | ||
| } // namespace | ||
|
|
||
| ConnArray::ConnArray(size_t max_size, const std::string & addr, const ClusterConfig & config_) | ||
| : address(addr) | ||
| , index(0) | ||
|
|
@@ -22,6 +53,93 @@ std::shared_ptr<KvConnClient> ConnArray::get() | |
| return vec[index]; | ||
| } | ||
|
|
||
| void RpcClient::run() | ||
| { | ||
| while (!stopped.load()) | ||
| { | ||
| { | ||
| const auto wait_interval = getRandomScanInterval(scan_interval); | ||
| std::unique_lock lock(mutex); | ||
| scan_cv.wait_for(lock, wait_interval, [this] { | ||
| return stopped.load(); | ||
| }); | ||
| } | ||
|
|
||
| if (stopped.load()) | ||
| return; | ||
|
|
||
| try | ||
| { | ||
| scanConns(); | ||
| removeInvalidConns(); | ||
| } | ||
| catch (...) | ||
| { | ||
| log->warning(getCurrentExceptionMsg("RpcClient scan conns failed: ")); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| void RpcClient::stop() | ||
| { | ||
| stopped.store(true); | ||
| scan_cv.notify_all(); | ||
| } | ||
|
|
||
| void RpcClient::scanConns() | ||
| { | ||
| std::vector<std::string> conn_snapshot; | ||
| { | ||
| std::lock_guard<std::mutex> lock(mutex); | ||
| conn_snapshot.reserve(conns.size()); | ||
| for (const auto & conn : conns) | ||
| conn_snapshot.emplace_back(conn.first); | ||
| } | ||
|
|
||
| if (conn_snapshot.empty() || !pd_client || pd_client->isMock()) | ||
| return; | ||
|
|
||
| const auto store_addrs = getStoreAddresses(pd_client); | ||
| std::vector<std::string> conns_to_remove; | ||
| for (const auto & addr : conn_snapshot) | ||
| { | ||
| if (store_addrs.find(addr) == store_addrs.end()) | ||
| conns_to_remove.emplace_back(addr); | ||
| } | ||
|
|
||
| if (conns_to_remove.empty()) | ||
| return; | ||
|
|
||
| std::lock_guard<std::mutex> lock(mutex); | ||
| for (const auto & addr : conns_to_remove) | ||
| { | ||
| if (conns.find(addr) != conns.end()) | ||
| invalid_conns.push_back(addr); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it looks to me that |
||
| } | ||
| } | ||
|
|
||
| void RpcClient::removeConn(const std::string & addr) | ||
| { | ||
| std::lock_guard<std::mutex> lock(mutex); | ||
| if (conns.erase(addr)) | ||
| log->information("delete invalid addr: " + addr); | ||
| } | ||
|
|
||
| void RpcClient::removeInvalidConns() | ||
| { | ||
| std::lock_guard<std::mutex> lock(mutex); | ||
| if (invalid_conns.empty()) | ||
| return; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add lock after |
||
|
|
||
| for (const auto & addr : invalid_conns) | ||
| { | ||
| if (conns.erase(addr)) | ||
| log->information("delete invalid addr: " + addr); | ||
| } | ||
|
|
||
| invalid_conns.clear(); | ||
|
Comment on lines
+89
to
+140
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don’t erase newly recreated connection pools from stale scan results.
One possible fix using the existing conditional remover void RpcClient::scanConns()
{
std::vector<std::pair<std::string, ConnArrayPtr>> conn_snapshot;
@@
for (const auto & [addr, conn_array] : conn_snapshot)
{
if (!isConnArrayValid(conn_array, detect_rpc_timeout))
{
- std::lock_guard<std::mutex> lock(mutex);
- invalid_conns.push_back(addr);
+ removeConn(addr, conn_array);
}
}
}If the scan path still needs batched logging/removal, store 🧰 Tools🪛 Clang (14.0.6)[error] 95-95: method 'scanConns' can be made static (readability-convert-member-functions-to-static,-warnings-as-errors) [error] 122-122: method 'removeInvalidConns' can be made static (readability-convert-member-functions-to-static,-warnings-as-errors) 🤖 Prompt for AI Agents |
||
| } | ||
|
|
||
| ConnArrayPtr RpcClient::getConnArray(const std::string & addr) | ||
| { | ||
| std::lock_guard<std::mutex> lock(mutex); | ||
|
|
@@ -39,6 +157,5 @@ ConnArrayPtr RpcClient::createConnArray(const std::string & addr) | |
| conns[addr] = conn_array; | ||
| return conn_array; | ||
| } | ||
|
|
||
| } // namespace kv | ||
| } // namespace pingcap | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Align
dropConnIfNeeded()with the shared removal predicate.shouldRemoveConnOnStatus()treats bothUNAVAILABLEandCANCELLEDas removable, butdropConnIfNeeded()only handlesUNAVAILABLE. SinceonSendFail()uses this helper, unary failures and stream setup failures withCANCELLEDwon’t invalidate the connection, whileStreamReader::finish()will.Proposed fix
inline void dropConnIfNeeded(const RpcClientPtr & client, const std::string & addr, const ::grpc::Status & status) { - if (status.error_code() == grpc::StatusCode::UNAVAILABLE) + if (shouldRemoveConnOnStatus(status)) client->markConnInvalid(addr); }📝 Committable suggestion
🤖 Prompt for AI Agents