From ccc48d474b1353f71be561fc24d77aff13e07bf0 Mon Sep 17 00:00:00 2001 From: Liqi Geng Date: Fri, 19 Sep 2025 10:30:11 +0800 Subject: [PATCH 01/20] Downgrade some error logs (#214) Signed-off-by: gengliqi Signed-off-by: xzhangxian1008 --- src/coprocessor/Client.cc | 2 +- src/kv/LockResolver.cc | 2 +- src/kv/Snapshot.cc | 2 +- src/pd/Client.cc | 30 +++++++++++++++--------------- 4 files changed, 18 insertions(+), 18 deletions(-) diff --git a/src/coprocessor/Client.cc b/src/coprocessor/Client.cc index 481ecc96..cd2ea11a 100644 --- a/src/coprocessor/Client.cc +++ b/src/coprocessor/Client.cc @@ -861,7 +861,7 @@ void ResponseIter::handleTask(const CopTask & task) } catch (const pingcap::Exception & e) { - log->error("coprocessor meets error, error_message=" + e.displayText() + " error_code=" + std::to_string(e.code()) + " region_id=" + current_task.region_id.toString()); + log->warning("coprocessor meets error, error_message=" + e.displayText() + " error_code=" + std::to_string(e.code()) + " region_id=" + current_task.region_id.toString()); queue->push(Result(e)); meet_error = true; break; diff --git a/src/kv/LockResolver.cc b/src/kv/LockResolver.cc index 0496136c..40dc6ef9 100644 --- a/src/kv/LockResolver.cc +++ b/src/kv/LockResolver.cc @@ -295,7 +295,7 @@ void LockResolver::resolvePessimisticLock(Backoffer & bo, LockPtr lock, std::uno const auto & key_errors = response.errors(); if (!key_errors.empty()) { - log->error("unexpected resolve pessimistic lock err: " + key_errors[0].ShortDebugString()); + log->warning("unexpected resolve pessimistic lock err: " + key_errors[0].ShortDebugString()); throw Exception("unexpected err :" + key_errors[0].ShortDebugString(), ErrorCodes::UnknownError); } diff --git a/src/kv/Snapshot.cc b/src/kv/Snapshot.cc index de4e5cb5..85067f2f 100644 --- a/src/kv/Snapshot.cc +++ b/src/kv/Snapshot.cc @@ -48,7 +48,7 @@ kvrpcpb::MvccInfo Snapshot::mvccGet(Backoffer & bo, const std::string & key) if (!response.error().empty()) { Logger * log(&Logger::get("Snapshot::mvccGet")); - log->error("reponse error is " + response.error()); + log->warning("reponse error is " + response.error()); continue; } return response.info(); diff --git a/src/pd/Client.cc b/src/pd/Client.cc index 9fa7fda6..6632048f 100644 --- a/src/pd/Client.cc +++ b/src/pd/Client.cc @@ -119,7 +119,7 @@ pdpb::GetMembersResponse Client::getMembers(const std::string & url) if (!status.ok()) { std::string err_msg = "get member failed: " + std::to_string(status.error_code()) + ": " + status.error_message(); - log->error(err_msg); + log->warning(err_msg); return {}; } return resp; @@ -176,7 +176,7 @@ void Client::initLeader() } else { - log->error("failed to update leader, stop retrying"); + log->warning("failed to update leader, stop retrying"); throw e; } } @@ -276,7 +276,7 @@ void Client::leaderLoop() } catch (Exception & e) { - log->error(e.displayText()); + log->warning(e.displayText()); } } } @@ -305,14 +305,14 @@ uint64_t Client::getTS() if (!stream->Write(request)) { std::string err_msg = ("Send TsoRequest failed"); - log->error(err_msg); + log->warning(err_msg); check_leader.store(true); throw Exception(err_msg, GRPCErrorCode); } if (!stream->Read(&response)) { std::string err_msg = ("Receive TsoResponse failed"); - log->error(err_msg); + log->warning(err_msg); check_leader.store(true); throw Exception(err_msg, GRPCErrorCode); } @@ -336,7 +336,7 @@ uint64_t Client::getGCSafePoint() if (!status.ok()) { err_msg = "get safe point failed: " + std::to_string(status.error_code()) + ": " + status.error_message(); - log->error(err_msg); + log->warning(err_msg); check_leader.store(true); throw Exception(err_msg, status.error_code()); } @@ -360,7 +360,7 @@ uint64_t Client::getGCSafePointV2(KeyspaceID keyspace_id) if (!status.ok()) { err_msg = "get keyspace_id:" + std::to_string(keyspace_id) + " safe point failed: " + std::to_string(status.error_code()) + ": " + status.error_message(); - log->error(err_msg); + log->warning(err_msg); check_leader.store(true); throw Exception(err_msg, status.error_code()); } @@ -384,7 +384,7 @@ pdpb::GetRegionResponse Client::getRegionByKey(const std::string & key) if (!status.ok()) { std::string err_msg = ("get region failed: " + std::to_string(status.error_code()) + " : " + status.error_message()); - log->error(err_msg); + log->warning(err_msg); check_leader.store(true); throw Exception(err_msg, GRPCErrorCode); } @@ -409,7 +409,7 @@ pdpb::GetRegionResponse Client::getRegionByID(uint64_t region_id) if (!status.ok()) { std::string err_msg = ("get region by id failed: " + std::to_string(status.error_code()) + ": " + status.error_message()); - log->error(err_msg); + log->warning(err_msg); check_leader.store(true); throw Exception(err_msg, GRPCErrorCode); } @@ -434,7 +434,7 @@ std::vector Client::getAllStores(bool exclude_tombstone) if (!status.ok()) { std::string err_msg = ("get all stores failed: " + std::to_string(status.error_code()) + ": " + status.error_message()); - log->error(err_msg); + log->warning(err_msg); check_leader.store(true); throw Exception(err_msg, GRPCErrorCode); } @@ -463,7 +463,7 @@ metapb::Store Client::getStore(uint64_t store_id) if (!status.ok()) { std::string err_msg = ("get store failed: " + std::to_string(status.error_code()) + ": " + status.error_message()); - log->error(err_msg); + log->warning(err_msg); check_leader.store(true); throw Exception(err_msg, GRPCErrorCode); } @@ -487,7 +487,7 @@ KeyspaceID Client::getKeyspaceID(const std::string & keyspace_name) if (!status.ok()) { std::string err_msg = ("get keyspace id failed: " + std::to_string(status.error_code()) + ": " + status.error_message()); - log->error(err_msg); + log->warning(err_msg); check_leader.store(true); throw Exception(err_msg, GRPCErrorCode); } @@ -495,14 +495,14 @@ KeyspaceID Client::getKeyspaceID(const std::string & keyspace_name) if (response.header().has_error()) { std::string err_msg = ("get keyspace id failed: " + response.header().error().message()); - log->error(err_msg); + log->warning(err_msg); throw Exception(err_msg, InternalError); } if (response.keyspace().state() != keyspacepb::KeyspaceState::ENABLED) { std::string err_msg = ("keyspace " + keyspace_name + " is not enabled"); - log->error(err_msg); + log->warning(err_msg); throw Exception(err_msg, KeyspaceNotEnabled); } return response.keyspace().id(); @@ -556,7 +556,7 @@ bool Client::isClusterBootstrapped() if (!status.ok()) \ { \ std::string err_msg = ("resource manager grpc call failed: " #GRPC_METHOD ". " + std::to_string(status.error_code()) + ": " + status.error_message()); \ - log->error(err_msg); \ + log->warning(err_msg); \ check_leader.store(true); \ throw Exception(err_msg, GRPCErrorCode); \ } \ From ee163762667c717c7c4231c76454bbfdc4e303b2 Mon Sep 17 00:00:00 2001 From: JaySon Date: Tue, 4 Nov 2025 21:02:02 +0800 Subject: [PATCH 02/20] pd: Support new API: GetGCState (#217) Signed-off-by: JaySon-Huang Signed-off-by: xzhangxian1008 --- include/pingcap/pd/Client.h | 12 ++++--- include/pingcap/pd/IClient.h | 15 +++++---- include/pingcap/pd/MockPDClient.h | 45 ++++++++++++++++++++------ src/pd/Client.cc | 54 +++++++++++++++++++++++++++---- 4 files changed, 99 insertions(+), 27 deletions(-) diff --git a/include/pingcap/pd/Client.h b/include/pingcap/pd/Client.h index 0f624780..6b946008 100644 --- a/include/pingcap/pd/Client.h +++ b/include/pingcap/pd/Client.h @@ -17,10 +17,9 @@ #include #include -namespace pingcap -{ -namespace pd +namespace pingcap::pd { + class Client : public IClient { const int max_init_cluster_retries; @@ -59,6 +58,10 @@ class Client : public IClient uint64_t getGCSafePointV2(KeyspaceID keyspace_id) override; + pdpb::GetGCStateResponse getGCState(KeyspaceID keyspace_id) override; + + pdpb::GetAllKeyspacesGCStatesResponse getAllKeyspacesGCStates() override; + KeyspaceID getKeyspaceID(const std::string & keyspace_name) override; bool isMock() override; @@ -151,5 +154,4 @@ class Client : public IClient }; -} // namespace pd -} // namespace pingcap +} // namespace pingcap::pd diff --git a/include/pingcap/pd/IClient.h b/include/pingcap/pd/IClient.h index 13001c87..f2142b39 100644 --- a/include/pingcap/pd/IClient.h +++ b/include/pingcap/pd/IClient.h @@ -13,9 +13,7 @@ #include #pragma GCC diagnostic pop -namespace pingcap -{ -namespace pd +namespace pingcap::pd { class IClient @@ -35,10 +33,14 @@ class IClient virtual std::vector getAllStores(bool exclude_tombstone) = 0; - virtual uint64_t getGCSafePoint() = 0; + [[deprecated("Use getGCState instead")]] virtual uint64_t getGCSafePoint() = 0; // Return the gc safe point of given keyspace_id. - virtual uint64_t getGCSafePointV2(KeyspaceID keyspace_id) = 0; + [[deprecated("Use getGCState instead")]] virtual uint64_t getGCSafePointV2(KeyspaceID keyspace_id) = 0; + + virtual pdpb::GetGCStateResponse getGCState(KeyspaceID keyspace_id) = 0; + + virtual pdpb::GetAllKeyspacesGCStatesResponse getAllKeyspacesGCStates() = 0; virtual KeyspaceID getKeyspaceID(const std::string & keyspace_name) = 0; @@ -64,5 +66,4 @@ class IClient using ClientPtr = std::shared_ptr; -} // namespace pd -} // namespace pingcap +} // namespace pingcap::pd diff --git a/include/pingcap/pd/MockPDClient.h b/include/pingcap/pd/MockPDClient.h index 7ed89916..d03c2cd9 100644 --- a/include/pingcap/pd/MockPDClient.h +++ b/include/pingcap/pd/MockPDClient.h @@ -1,27 +1,55 @@ #pragma once +#include #include #include #include -#include - -namespace pingcap -{ -namespace pd +namespace pingcap::pd { using Clock = std::chrono::system_clock; class MockPDClient : public IClient { +public: + static constexpr uint64_t MOCKED_GC_SAFE_POINT = 10000000; + public: MockPDClient() = default; ~MockPDClient() override = default; - uint64_t getGCSafePoint() override { return 10000000; } + uint64_t getGCSafePoint() override { return MOCKED_GC_SAFE_POINT; } + + uint64_t getGCSafePointV2(KeyspaceID) override { return MOCKED_GC_SAFE_POINT; } - uint64_t getGCSafePointV2(KeyspaceID) override { return 10000000; } + pdpb::GetGCStateResponse getGCState(KeyspaceID keyspace_id) override + { + pdpb::GetGCStateResponse gc_state; + auto * hdr = gc_state.mutable_header(); + hdr->set_cluster_id(1); + hdr->mutable_error()->set_type(pdpb::ErrorType::OK); + auto * state = gc_state.mutable_gc_state(); + state->mutable_keyspace_scope()->set_keyspace_id(keyspace_id); + state->set_is_keyspace_level_gc(true); + state->set_txn_safe_point(MOCKED_GC_SAFE_POINT); + state->set_gc_safe_point(MOCKED_GC_SAFE_POINT); + return gc_state; + } + + pdpb::GetAllKeyspacesGCStatesResponse getAllKeyspacesGCStates() override + { + pdpb::GetAllKeyspacesGCStatesResponse all_states; + auto * hdr = all_states.mutable_header(); + hdr->set_cluster_id(1); + hdr->mutable_error()->set_type(pdpb::ErrorType::OK); + auto * state = all_states.add_gc_states(); + state->mutable_keyspace_scope()->set_keyspace_id(1); + state->set_is_keyspace_level_gc(true); + state->set_txn_safe_point(MOCKED_GC_SAFE_POINT); + state->set_gc_safe_point(MOCKED_GC_SAFE_POINT); + return all_states; + } uint64_t getTS() override { return Clock::now().time_since_epoch().count(); } @@ -73,5 +101,4 @@ class MockPDClient : public IClient } }; -} // namespace pd -} // namespace pingcap +} // namespace pingcap::pd diff --git a/src/pd/Client.cc b/src/pd/Client.cc index 6632048f..ef8ad707 100644 --- a/src/pd/Client.cc +++ b/src/pd/Client.cc @@ -8,9 +8,7 @@ #include #include -namespace pingcap -{ -namespace pd +namespace pingcap::pd { inline std::vector addrsToUrls(const std::vector & addrs, const ClusterConfig & config) { @@ -367,6 +365,51 @@ uint64_t Client::getGCSafePointV2(KeyspaceID keyspace_id) return response.safe_point(); } +pdpb::GetGCStateResponse Client::getGCState(KeyspaceID keyspace_id) +{ + pdpb::GetGCStateRequest request{}; + request.set_allocated_header(requestHeader()); + request.mutable_keyspace_scope()->set_keyspace_id(keyspace_id); + + auto leader_client = leaderClient(); + grpc::ClientContext context; + context.set_deadline(std::chrono::system_clock::now() + pd_timeout); + + pdpb::GetGCStateResponse response{}; + auto status = leader_client->stub->GetGCState(&context, request, &response); + if (!status.ok()) + { + std::string err_msg = "GetGCState failed, keyspace_id=" + std::to_string(keyspace_id) + ": " + std::to_string(status.error_code()) + ": " + status.error_message(); + log->warning(err_msg); + check_leader.store(true); + throw Exception(err_msg, status.error_code()); + } + + return response; +} + +pdpb::GetAllKeyspacesGCStatesResponse Client::getAllKeyspacesGCStates() +{ + pdpb::GetAllKeyspacesGCStatesRequest request{}; + request.set_allocated_header(requestHeader()); + + auto leader_client = leaderClient(); + grpc::ClientContext context; + context.set_deadline(std::chrono::system_clock::now() + pd_timeout); + + pdpb::GetAllKeyspacesGCStatesResponse response{}; + auto status = leader_client->stub->GetAllKeyspacesGCStates(&context, request, &response); + if (!status.ok()) + { + std::string err_msg = "GetAllKeyspacesGCStates failed: " + std::to_string(status.error_code()) + ": " + status.error_message(); + log->warning(err_msg); + check_leader.store(true); + throw Exception(err_msg, status.error_code()); + } + + return response; +} + pdpb::GetRegionResponse Client::getRegionByKey(const std::string & key) { pdpb::GetRegionRequest request{}; @@ -556,7 +599,7 @@ bool Client::isClusterBootstrapped() if (!status.ok()) \ { \ std::string err_msg = ("resource manager grpc call failed: " #GRPC_METHOD ". " + std::to_string(status.error_code()) + ": " + status.error_message()); \ - log->warning(err_msg); \ + log->warning(err_msg); \ check_leader.store(true); \ throw Exception(err_msg, GRPCErrorCode); \ } \ @@ -594,5 +637,4 @@ resource_manager::TokenBucketsResponse Client::acquireTokenBuckets(const resourc return resp; } -} // namespace pd -} // namespace pingcap +} // namespace pingcap::pd From 80a14ead676f687ca32082140066c9f4caca8117 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 19 Dec 2025 12:02:05 +0800 Subject: [PATCH 03/20] OWNERS: Auto Sync OWNERS files from community membership (#222) Signed-off-by: Ti Chi Robot Signed-off-by: xzhangxian1008 --- OWNERS | 88 +++++--------------------------------------------- OWNERS_ALIASES | 84 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+), 80 deletions(-) create mode 100644 OWNERS_ALIASES diff --git a/OWNERS b/OWNERS index af3aca17..be8ab04d 100644 --- a/OWNERS +++ b/OWNERS @@ -1,80 +1,8 @@ -# See the OWNERS docs at https://go.k8s.io/owners -approvers: - - 5kbpers - - AndreMouche - - andylokandy - - breezewish - - brson - - bufferflies - - BusyJay - - cfzjywxk - - Connor1996 - - coocood - - crazycs520 - - disksing - - ekexium - - gengliqi - - glorv - - hicqu - - hunterlxt - - imtbkcat - - innerr - - iosmanthus - - jackysp - - kennytm - - Little-Wallace - - liuzix - - lonng - - LykxSassinator - - lysu - - marsishandsome - - MyonKeminta - - niedhui - - NingLin-P - - nrc - - overvenus - - pingyu - - skyzh - - SpadeA-Tang - - sticnarf - - sunxiaoguang - - tabokie - - TennyZhuang - - tonyxuqqi - - v01dstar - - yiwu-arbug - - you06 - - youjiali1995 - - YuJuncen - - zhangjinpeng87 - - zhongzc - - zhouqiang-cl - - zyguan -reviewers: - - 3AceShowHand - - 3pointer - - CalvinNeo - - ethercflow - - fredchenbj - - Fullstop000 - - gozssky - - haojinming - - hbisheng - - hhwyt - - HuSharp - - jayzhan211 - - Jibbow - - JmPotato - - Leavrth - - lhy1024 - - longfangsong - - lzmhhh123 - - Mossaka - - MrCroxx - - nolouch - - rleungx - - Rustin170506 - - tier-cap - - wjhuang2016 - - wshwsh12 - - Xuanwo +# See the OWNERS docs at https://www.kubernetes.dev/docs/guide/owners/#owners +# The members of 'sig-community-*' are synced from memberships defined in repository: https://github.com/tikv/community. +filters: + .*: + approvers: + - sig-community-approvers + reviewers: + - sig-community-reviewers diff --git a/OWNERS_ALIASES b/OWNERS_ALIASES new file mode 100644 index 00000000..538aa528 --- /dev/null +++ b/OWNERS_ALIASES @@ -0,0 +1,84 @@ +# See the OWNERS docs at https://www.kubernetes.dev/docs/guide/owners/#owners_aliases +# The members of 'sig-community-*' are synced from memberships defined in repository: https://github.com/tikv/community. +aliases: + sig-community-reviewers: + - 3AceShowHand + - 3pointer + - CalvinNeo + - Fullstop000 + - HuSharp + - Jibbow + - JmPotato + - Leavrth + - Mossaka + - MrCroxx + - Rustin170506 + - Xuanwo + - ethercflow + - fredchenbj + - gozssky + - haojinming + - hbisheng + - hhwyt + - jayzhan211 + - lcwangchao + - lhy1024 + - longfangsong + - lzmhhh123 + - mittalrishabh + - nolouch + - rleungx + - tier-cap + - wjhuang2016 + - wshwsh12 + sig-community-approvers: + - 5kbpers + - AndreMouche + - BusyJay + - Connor1996 + - Little-Wallace + - LykxSassinator + - MyonKeminta + - NingLin-P + - SpadeA-Tang + - TennyZhuang + - YuJuncen + - andylokandy + - breezewish + - brson + - bufferflies + - cfzjywxk + - coocood + - crazycs520 + - disksing + - ekexium + - gengliqi + - glorv + - hicqu + - hunterlxt + - imtbkcat + - innerr + - iosmanthus + - jackysp + - kennytm + - liuzix + - lonng + - lysu + - marsishandsome + - niedhui + - nrc + - overvenus + - pingyu + - skyzh + - sticnarf + - sunxiaoguang + - tabokie + - tonyxuqqi + - v01dstar + - yiwu-arbug + - you06 + - youjiali1995 + - zhangjinpeng87 + - zhongzc + - zhouqiang-cl + - zyguan From a3ecf4dfc665d0770df96be205edbd3c0e600b3d Mon Sep 17 00:00:00 2001 From: JaySon Date: Mon, 22 Dec 2025 10:08:22 +0800 Subject: [PATCH 04/20] *: Add more context about the region/address when rpc fails (#221) Signed-off-by: JaySon-Huang Signed-off-by: xzhangxian1008 --- include/pingcap/common/MPPProber.h | 4 +- include/pingcap/kv/RegionCache.h | 52 ++++++------------ include/pingcap/kv/RegionClient.h | 21 ++++---- include/pingcap/kv/Rpc.h | 10 ++-- src/common/MPPProber.cc | 8 ++- src/coprocessor/Client.cc | 12 ++--- src/kv/RegionCache.cc | 84 +++++++++++++++++++----------- src/kv/RegionClient.cc | 6 +-- src/kv/Snapshot.cc | 2 +- src/pd/Client.cc | 14 ++--- 10 files changed, 111 insertions(+), 102 deletions(-) diff --git a/include/pingcap/common/MPPProber.h b/include/pingcap/common/MPPProber.h index 6e7694ab..d8617022 100644 --- a/include/pingcap/common/MPPProber.h +++ b/include/pingcap/common/MPPProber.h @@ -21,7 +21,7 @@ namespace common using TimePoint = std::chrono::time_point; static constexpr TimePoint INVALID_TIME_POINT = std::chrono::steady_clock::time_point::max(); static constexpr auto MAX_RECOVERY_TIME_LIMIT = std::chrono::minutes(15); -static constexpr auto MAX_OBSOLET_TIME_LIMIT = std::chrono::hours(1); +static constexpr auto MAX_OBSOLETE_TIME_LIMIT = std::chrono::hours(1); static constexpr auto SCAN_INTERVAL = std::chrono::seconds(1); // scan per 1s. static constexpr auto DETECT_PERIOD = std::chrono::seconds(3); // do real alive rpc per 3s. static constexpr size_t DETECT_RPC_TIMEOUT = 2; @@ -66,7 +66,7 @@ struct ProbeState // If the probe fails again, the store will be re-added to failed_stores; otherwise, it can be used directly. // // If a store was previously attempted to be used (last_lookup_time exists) but couldn't actually be used, -// and this state persists beyond MAX_OBSOLETE_TIME, it will be removed from failed_stores to avoid being continuously probed in the background. +// and this state persists beyond MAX_OBSOLETE_TIME_LIMIT, it will be removed from failed_stores to avoid being continuously probed in the background. class MPPProber { public: diff --git a/include/pingcap/kv/RegionCache.h b/include/pingcap/kv/RegionCache.h index f1dc4d88..eeda4904 100644 --- a/include/pingcap/kv/RegionCache.h +++ b/include/pingcap/kv/RegionCache.h @@ -29,9 +29,7 @@ struct Store StoreType store_type; ::metapb::StoreState state; - Store(uint64_t id_, const std::string & addr_, const std::string & peer_addr_, - const std::map & labels_, StoreType store_type_, - const ::metapb::StoreState state_) + Store(uint64_t id_, const std::string & addr_, const std::string & peer_addr_, const std::map & labels_, StoreType store_type_, const ::metapb::StoreState state_) : id(id_) , addr(addr_) , peer_addr(peer_addr_) @@ -188,12 +186,12 @@ class RegionCache {} RPCContextPtr getRPCContext(Backoffer & bo, - const RegionVerID & id, - StoreType store_type, - bool load_balance, - const LabelFilter & tiflash_label_filter, - const std::unordered_set * store_id_blocklist = nullptr, - uint64_t prefer_store_id = 0); + const RegionVerID & id, + StoreType store_type, + bool load_balance, + const LabelFilter & tiflash_label_filter, + const std::unordered_set * store_id_blocklist = nullptr, + uint64_t prefer_store_id = 0); bool updateLeader(const RegionVerID & region_id, const metapb::Peer & leader); @@ -217,11 +215,12 @@ class RegionCache // Return values: // 1. all stores of this region. // 2. stores of non pending peers of this region. - std::pair, std::vector> getAllValidTiFlashStores(Backoffer & bo, - const RegionVerID & region_id, - const Store & current_store, - const LabelFilter & label_filter, - const std::unordered_set * store_id_blocklist = nullptr); + std::pair, std::vector> getAllValidTiFlashStores( + Backoffer & bo, + const RegionVerID & region_id, + const Store & current_store, + const LabelFilter & label_filter, + const std::unordered_set * store_id_blocklist = nullptr); std::pair>, RegionVerID> groupKeysByRegion(Backoffer & bo, @@ -229,29 +228,7 @@ class RegionCache std::map getAllTiFlashStores(const LabelFilter & label_filter, bool exclude_tombstone); - void updateCachePeriodically() - { - while (!stopped.load()) - { - // TODO: Also update region cache periodically. - try - { - forceReloadAllStores(); - } - catch (...) - { - log->warning(getCurrentExceptionMsg("failed to reload all stores periodically: ")); - } - - { - std::unique_lock lock(update_cache_mu); - // Update store cache every 2 mins. - update_cache_cv.wait_for(lock, std::chrono::minutes(2), [this]() { - return stopped.load(); - }); - } - } - } + void updateCachePeriodically(); void stop() { @@ -259,6 +236,7 @@ class RegionCache std::lock_guard lock(update_cache_mu); update_cache_cv.notify_all(); } + private: RegionPtr loadRegionByKey(Backoffer & bo, const std::string & key); diff --git a/include/pingcap/kv/RegionClient.h b/include/pingcap/kv/RegionClient.h index e985b72a..4da18096 100644 --- a/include/pingcap/kv/RegionClient.h +++ b/include/pingcap/kv/RegionClient.h @@ -73,23 +73,24 @@ struct RegionClient auto status = rpc.call(&context, req, resp); if (!status.ok()) { + auto extra_msg = "region_id: " + region_id.toString() + ", addr: " + ctx->addr; if (status.error_code() == ::grpc::StatusCode::UNIMPLEMENTED) { // The rpc is not implemented on this service. - throw Exception("rpc is not implemented: " + rpc.errMsg(status), GRPCNotImplemented); + throw Exception("rpc is not implemented: " + rpc.errMsg(status, extra_msg), GRPCNotImplemented); } - std::string err_msg = rpc.errMsg(status); + std::string err_msg = rpc.errMsg(status, extra_msg); log->warning(err_msg); onSendFail(bo, Exception(err_msg, GRPCErrorCode), ctx); continue; } if (resp->has_region_error()) { - log->warning("region " + region_id.toString() + " find error: " + resp->region_error().DebugString()); + log->warning("region_id " + region_id.toString() + " find error: " + resp->region_error().DebugString()); onRegionError(bo, ctx, resp->region_error()); continue; } - if (same_zone_flag && source_zone_label != "") { + if (same_zone_flag && !source_zone_label.empty()) { auto iter = ctx->store.labels.find(DCLabelKey); if (iter != ctx->store.labels.end()) { *same_zone_flag = iter->second == source_zone_label; @@ -173,7 +174,7 @@ struct RegionClient { if (stream_reader->first_resp.has_region_error()) { - log->warning("region " + region_id.toString() + " find error: " + stream_reader->first_resp.region_error().message()); + log->warning("region_id " + region_id.toString() + " find error: " + stream_reader->first_resp.region_error().message()); onRegionError(bo, ctx, stream_reader->first_resp.region_error()); continue; } @@ -182,7 +183,7 @@ struct RegionClient auto status = stream_reader->reader->Finish(); if (status.ok()) { - if (same_zone_flag && source_zone_label != "") { + if (same_zone_flag && !source_zone_label.empty()) { auto iter = ctx->store.labels.find(DCLabelKey); if (iter != ctx->store.labels.end()) { *same_zone_flag = iter->second == source_zone_label; @@ -192,12 +193,14 @@ struct RegionClient stream_reader->no_resp = true; return stream_reader; } - else if (status.error_code() == ::grpc::StatusCode::UNIMPLEMENTED) + auto extra_msg = "region_id: " + region_id.toString() + ", addr: " + ctx->addr; + if (status.error_code() == ::grpc::StatusCode::UNIMPLEMENTED) { + // The rpc is not implemented on this service. - throw Exception("rpc is not implemented: " + rpc.errMsg(status), GRPCNotImplemented); + throw Exception("rpc is not implemented: " + rpc.errMsg(status, extra_msg), GRPCNotImplemented); } - std::string err_msg = rpc.errMsg(status); + std::string err_msg = rpc.errMsg(status, extra_msg); log->warning(err_msg); onSendFail(bo, Exception(err_msg, GRPCErrorCode), ctx); } diff --git a/include/pingcap/kv/Rpc.h b/include/pingcap/kv/Rpc.h index 3410edde..b3a32cb3 100644 --- a/include/pingcap/kv/Rpc.h +++ b/include/pingcap/kv/Rpc.h @@ -6,7 +6,6 @@ #include #include -#include #include namespace pingcap @@ -97,9 +96,14 @@ class RpcCall return T::call(conn_client, std::forward(args)...); } - std::string errMsg(const ::grpc::Status & status) + std::string errMsg(const ::grpc::Status & status, const std::string & extra_msg) { - return std::string(T::errMsg()) + std::to_string(status.error_code()) + ": " + status.error_message(); + auto msg = std::string(T::errMsg()) + " " + std::to_string(status.error_code()) + ": " + status.error_message(); + if (!extra_msg.empty()) + { + msg += " " + extra_msg; + } + return msg; } private: diff --git a/src/common/MPPProber.cc b/src/common/MPPProber.cc index 503d9b0f..8823d54f 100644 --- a/src/common/MPPProber.cc +++ b/src/common/MPPProber.cc @@ -3,8 +3,6 @@ #include #include -#include - namespace pingcap { namespace common @@ -101,8 +99,8 @@ void MPPProber::scan() } else { - // Store is dead, we want to check if this store has not used for MAX_OBSOLET_TIME. - if (ele.second->last_lookup_timepoint != INVALID_TIME_POINT && getElapsed(ele.second->last_lookup_timepoint) > std::chrono::duration_cast(MAX_OBSOLET_TIME_LIMIT)) + // Store is dead, we want to check if this store has not used for MAX_OBSOLETE_TIME. + if (ele.second->last_lookup_timepoint != INVALID_TIME_POINT && getElapsed(ele.second->last_lookup_timepoint) > std::chrono::duration_cast(MAX_OBSOLETE_TIME_LIMIT)) recovery_stores.push_back(ele.first); } ele.second->state_lock.unlock(); @@ -146,7 +144,7 @@ bool detectStore(kv::RpcClientPtr & rpc_client, const std::string & store_addr, auto status = rpc.call(&context, req, &resp); if (!status.ok()) { - log->warning("detect failed: " + store_addr + " error: ", rpc.errMsg(status)); + log->warning("detect failed: " + store_addr + " error: " + rpc.errMsg(status, "")); return false; } diff --git a/src/coprocessor/Client.cc b/src/coprocessor/Client.cc index cd2ea11a..92419005 100644 --- a/src/coprocessor/Client.cc +++ b/src/coprocessor/Client.cc @@ -166,7 +166,7 @@ std::vector balanceBatchCopTasks( if (task.region_infos[0].all_stores.empty()) { // Only true when all stores are blocked, in which case will not call balanceBatchCopTasks though. - throw Exception("no region in batch cop task, region_id=" + task.region_infos[0].region_id.toString(), ErrorCodes::CoprocessorError); + throw Exception("no region in batch cop task, region_ver_id=" + task.region_infos[0].region_id.toString(), ErrorCodes::CoprocessorError); } auto task_store_id = task.region_infos[0].all_stores[0]; BatchCopTask new_batch_task; @@ -537,7 +537,7 @@ std::vector buildBatchCopTasks( if (rpc_context == nullptr) { need_retry = true; - log->information("retry for TiFlash peer with region missing, region=" + cop_task.region_id.toString()); + log->information("retry for TiFlash peer with region missing, region_ver_id=" + cop_task.region_id.toString()); // Probably all the regions are invalid. Make the loop continue and mark all the regions invalid. // Then `splitRegion` will reloads these regions. continue; @@ -717,7 +717,7 @@ std::vector ResponseIter::handleTaskImpl(kv::Backoffer & bo, const CopT auto handle_locked_resp = [&](const ::kvrpcpb::LockInfo & locked) -> std::vector { kv::LockPtr lock = std::make_shared(locked); - log->debug("region " + task.region_id.toString() + " encounter lock problem: " + locked.DebugString()); + log->debug("region_id " + task.region_id.toString() + " encounter lock problem: " + locked.DebugString()); std::vector pushed; std::vector locks{lock}; auto before_expired = cluster->lock_resolver->resolveLocks(bo, task.req->start_ts, locks, pushed); @@ -727,10 +727,10 @@ std::vector ResponseIter::handleTaskImpl(kv::Backoffer & bo, const CopT } if (before_expired > 0) { - log->information("encounter lock and sleep for a while, region_id=" + task.region_id.toString() + // + log->information("encounter lock and sleep for a while, region_ver_id=" + task.region_id.toString() + // " req_start_ts=" + std::to_string(task.req->start_ts) + " lock_version=" + std::to_string(lock->txn_id) + // " sleep time is " + std::to_string(before_expired) + "ms."); - bo.backoffWithMaxSleep(kv::boTxnLockFast, before_expired, Exception("encounter lock, region_id=" + task.region_id.toString() + " " + locked.DebugString(), ErrorCodes::LockError)); + bo.backoffWithMaxSleep(kv::boTxnLockFast, before_expired, Exception("encounter lock, region_ver_id=" + task.region_id.toString() + " " + locked.DebugString(), ErrorCodes::LockError)); } return buildCopTasks(bo, cluster, task.ranges, task.req, task.store_type, task.keyspace_id, task.connection_id, task.connection_alias, log, task.meta_data, task.before_send); }; @@ -861,7 +861,7 @@ void ResponseIter::handleTask(const CopTask & task) } catch (const pingcap::Exception & e) { - log->warning("coprocessor meets error, error_message=" + e.displayText() + " error_code=" + std::to_string(e.code()) + " region_id=" + current_task.region_id.toString()); + log->warning("coprocessor meets error, error_message=" + e.displayText() + " error_code=" + std::to_string(e.code()) + " region_ver_id=" + current_task.region_id.toString()); queue->push(Result(e)); meet_error = true; break; diff --git a/src/kv/RegionCache.cc b/src/kv/RegionCache.cc index e955caa8..c5575f46 100644 --- a/src/kv/RegionCache.cc +++ b/src/kv/RegionCache.cc @@ -7,15 +7,16 @@ namespace pingcap { namespace kv { -// load_balance is an option, becase if store fail, it may cause batchCop fail. +// load_balance is an option, because if store fail, it may cause batchCop fail. // For now, label_filter only works for tiflash. -RPCContextPtr RegionCache::getRPCContext(Backoffer & bo, - const RegionVerID & id, - const StoreType store_type, - bool load_balance, - const LabelFilter & tiflash_label_filter, - const std::unordered_set * store_id_blocklist, - uint64_t prefer_store_id) +RPCContextPtr RegionCache::getRPCContext( // + Backoffer & bo, + const RegionVerID & id, + const StoreType store_type, + bool load_balance, + const LabelFilter & tiflash_label_filter, + const std::unordered_set * store_id_blocklist, + uint64_t prefer_store_id) { for (;;) { @@ -76,13 +77,13 @@ RPCContextPtr RegionCache::getRPCContext(Backoffer & bo, { dropStore(peer.store_id()); bo.backoff(boRegionMiss, - Exception("miss store, region id is: " + std::to_string(id.id) + " store id is: " + std::to_string(peer.store_id()), + Exception("miss store, region_id is: " + std::to_string(id.id) + " store_id is: " + std::to_string(peer.store_id()), StoreNotReady)); continue; } if (store_id_blocklist && store_id_blocklist->count(store.id) > 0) { - log->warning("blocklist remove peer for region: " + id.toString() + std::string(", store: ") + std::to_string(store.id)); + log->warning("blocklist remove peer for region_id: " + id.toString() + std::string(", store_id= ") + std::to_string(store.id)); continue; } if (store_type == StoreType::TiFlash) @@ -93,7 +94,7 @@ RPCContextPtr RegionCache::getRPCContext(Backoffer & bo, return std::make_shared(id, meta, peer, store, store.addr); } dropRegion(id); - bo.backoff(boRegionMiss, Exception("region miss, region id is: " + std::to_string(id.id), RegionUnavailable)); + bo.backoff(boRegionMiss, Exception("region miss, region_id is: " + std::to_string(id.id), RegionUnavailable)); } } @@ -179,7 +180,7 @@ RegionPtr RegionCache::loadRegionByID(Backoffer & bo, uint64_t region_id) { region->switchPeer(leader.id()); } - log->debug("load region id: " + std::to_string(region->meta.id()) + " leader peer id: " + std::to_string(leader.id()) + " leader store id: " + std::to_string(leader.store_id())); + log->debug("load region_id: " + std::to_string(region->meta.id()) + " leader peer id: " + std::to_string(leader.id()) + " leader store_id: " + std::to_string(leader.store_id())); return region; } catch (const Exception & e) @@ -228,7 +229,7 @@ metapb::Store RegionCache::loadStore(Backoffer & bo, uint64_t id) { // TODO:: The store may be not ready, it's better to check store's state. const auto & store = pd_client->getStore(id); - log->information("load store id " + std::to_string(id) + " address " + store.address()); + log->information("load store_id " + std::to_string(id) + " address " + store.address()); return store; } catch (Exception & e) @@ -277,16 +278,17 @@ void RegionCache::forceReloadAllStores() reloadStoreWithoutLock(store_pb); } -std::pair, std::vector> RegionCache::getAllValidTiFlashStores(Backoffer & bo, - const RegionVerID & region_id, - const Store & current_store, - const LabelFilter & label_filter, - const std::unordered_set * store_id_blocklist) +std::pair, std::vector> RegionCache::getAllValidTiFlashStores( // + Backoffer & bo, + const RegionVerID & region_id, + const Store & current_store, + const LabelFilter & label_filter, + const std::unordered_set * store_id_blocklist) { auto remove_blocklist = [](const std::unordered_set * store_id_blocklist, - std::vector & stores, - const RegionVerID & region_id, - Logger * log) { + std::vector & stores, + const RegionVerID & region_id, + Logger * log) { if (store_id_blocklist != nullptr) { auto origin_size = stores.size(); @@ -296,7 +298,7 @@ std::pair, std::vector> RegionCache::getAllValid stores.end()); if (log != nullptr && origin_size != stores.size()) { - auto s = "blocklist peer removed, region=" + region_id.toString() + ", origin_store_size=" + std::to_string(origin_size) + ", current_store_size=" + std::to_string(stores.size()); + auto s = "blocklist peer removed, region_ver_id=" + region_id.toString() + ", origin_store_size=" + std::to_string(origin_size) + ", current_store_size=" + std::to_string(stores.size()); log->information(s); } } @@ -367,7 +369,7 @@ void RegionCache::insertRegionToCache(RegionPtr region) void RegionCache::dropRegion(const RegionVerID & region_id) { std::unique_lock lock(region_mutex); - log->information("try drop region " + region_id.toString()); + log->information("try drop region, region_ver_id=" + region_id.toString()); auto iter_by_id = regions.find(region_id); if (iter_by_id != regions.end()) { @@ -379,7 +381,7 @@ void RegionCache::dropRegion(const RegionVerID & region_id) /// record the work flash index when drop region region_last_work_flash_index[region_id.id] = iter_by_id->second->work_tiflash_peer_idx.load(); regions.erase(iter_by_id); - log->information("drop region " + std::to_string(region_id.id) + " because of send failure"); + log->information("drop region because of send failure, region_ver_id=" + std::to_string(region_id.id)); } } @@ -388,11 +390,11 @@ void RegionCache::dropStore(uint64_t failed_store_id) std::lock_guard lock(store_mutex); if (stores.erase(failed_store_id)) { - log->information("drop store " + std::to_string(failed_store_id) + " because of send failure"); + log->information("drop store because of send failure, store_id=" + std::to_string(failed_store_id)); } } -void RegionCache::onSendReqFail(RPCContextPtr & ctx, const Exception & exc) +void RegionCache::onSendReqFail(RPCContextPtr & ctx, const Exception & /*exc*/) { const auto & failed_region_id = ctx->region; uint64_t failed_store_id = ctx->peer.store_id(); @@ -420,8 +422,8 @@ bool RegionCache::updateLeader(const RegionVerID & region_id, const metapb::Peer if (!it->second->switchPeer(leader.id())) { lock.unlock(); - log->warning("failed to update leader, region " + region_id.toString() + ", new leader {" + std::to_string(leader.id()) - + "," + std::to_string(leader.store_id()) + "}"); + log->warning("failed to update leader, region_ver_id=" + region_id.toString() + ", new leader peer_id=" + std::to_string(leader.id()) + + ", store_id=" + std::to_string(leader.store_id())); dropRegion(region_id); return false; } @@ -430,7 +432,7 @@ bool RegionCache::updateLeader(const RegionVerID & region_id, const metapb::Peer void RegionCache::onRegionStale(Backoffer & /*bo*/, RPCContextPtr ctx, const errorpb::EpochNotMatch & stale_epoch) { - log->information("region stale for region " + ctx->region.toString() + "."); + log->information("region stale for region_ver_id=" + ctx->region.toString()); dropRegion(ctx->region); @@ -524,5 +526,29 @@ bool labelFilterInvalid(const std::map &) { throw Exception("invalid label_filter", ErrorCodes::LogicalError); } + +void RegionCache::updateCachePeriodically() +{ + while (!stopped.load()) + { + // TODO: Also update region cache periodically. + try + { + forceReloadAllStores(); + } + catch (...) + { + log->warning(getCurrentExceptionMsg("failed to reload all stores periodically: ")); + } + + { + std::unique_lock lock(update_cache_mu); + // Update store cache every 2 mins. + update_cache_cv.wait_for(lock, std::chrono::minutes(2), [this]() { + return stopped.load(); + }); + } + } +} } // namespace kv } // namespace pingcap diff --git a/src/kv/RegionClient.cc b/src/kv/RegionClient.cc index b6cb6ab3..6c369f92 100644 --- a/src/kv/RegionClient.cc +++ b/src/kv/RegionClient.cc @@ -12,8 +12,8 @@ void RegionClient::onRegionError(Backoffer & bo, RPCContextPtr rpc_ctx, const er if (not_leader.has_leader()) { // don't backoff if a new leader is returned. - log->information("not leader but has leader, region " + rpc_ctx->region.toString() + ", new leader {" + std::to_string(not_leader.leader().id()) - + "," + std::to_string(not_leader.leader().store_id()) + "}"); + log->information("not leader but has leader, region_ver_id=" + rpc_ctx->region.toString() + ", new leader peer_id=" + std::to_string(not_leader.leader().id()) + + ", store_id=" + std::to_string(not_leader.leader().store_id())); if (!cluster->region_cache->updateLeader(rpc_ctx->region, not_leader.leader())) { bo.backoff(boRegionScheduling, Exception("not leader, ctx: " + rpc_ctx->toString(), NotLeader)); @@ -25,7 +25,7 @@ void RegionClient::onRegionError(Backoffer & bo, RPCContextPtr rpc_ctx, const er // the Raft group is in an election, but it's possible that the peer is // isolated and removed from the Raft group. So it's necessary to reload // the region from PD. - log->information("not leader but doesn't have new leader, region: " + rpc_ctx->region.toString()); + log->information("not leader but doesn't have new leader, region_ver_id=" + rpc_ctx->region.toString()); cluster->region_cache->dropRegion(rpc_ctx->region); bo.backoff(boRegionScheduling, Exception("not leader, ctx: " + rpc_ctx->toString(), NotLeader)); } diff --git a/src/kv/Snapshot.cc b/src/kv/Snapshot.cc index 85067f2f..f9e67c5e 100644 --- a/src/kv/Snapshot.cc +++ b/src/kv/Snapshot.cc @@ -48,7 +48,7 @@ kvrpcpb::MvccInfo Snapshot::mvccGet(Backoffer & bo, const std::string & key) if (!response.error().empty()) { Logger * log(&Logger::get("Snapshot::mvccGet")); - log->warning("reponse error is " + response.error()); + log->warning("response error is " + response.error()); continue; } return response.info(); diff --git a/src/pd/Client.cc b/src/pd/Client.cc index ef8ad707..86179c7a 100644 --- a/src/pd/Client.cc +++ b/src/pd/Client.cc @@ -139,21 +139,21 @@ void Client::initClusterID() auto resp = getMembers(url); if (!resp.has_header()) { - log->warning("failed to get cluster id by :" + url + " retrying"); + log->warning("failed to get cluster_id by :" + url + " retrying"); continue; } if (resp.header().has_error()) { - log->warning("failed to init cluster id: " + resp.header().error().message()); + log->warning("failed to init cluster_id: " + resp.header().error().message()); continue; } cluster_id = resp.header().cluster_id(); - log->information("init cluster id done: " + std::to_string(cluster_id)); + log->information("init cluster_id done: " + std::to_string(cluster_id)); return; } std::this_thread::sleep_for(std::chrono::seconds(1)); } - throw Exception("failed to init cluster id", InitClusterIDFailed); + throw Exception("failed to init cluster_id", InitClusterIDFailed); } void Client::initLeader() @@ -189,7 +189,7 @@ void Client::updateLeader() auto resp = getMembers(url); if (!resp.has_header() || resp.leader().client_urls_size() == 0) { - log->warning("failed to get cluster id by :" + url); + log->warning("failed to get members by :" + url); failed_urls.insert(url); continue; } @@ -529,7 +529,7 @@ KeyspaceID Client::getKeyspaceID(const std::string & keyspace_name) auto status = leader_client->keyspace_stub->LoadKeyspace(&context, request, &response); if (!status.ok()) { - std::string err_msg = ("get keyspace id failed: " + std::to_string(status.error_code()) + ": " + status.error_message()); + std::string err_msg = ("get keyspace_id failed: " + std::to_string(status.error_code()) + ": " + status.error_message()); log->warning(err_msg); check_leader.store(true); throw Exception(err_msg, GRPCErrorCode); @@ -537,7 +537,7 @@ KeyspaceID Client::getKeyspaceID(const std::string & keyspace_name) if (response.header().has_error()) { - std::string err_msg = ("get keyspace id failed: " + response.header().error().message()); + std::string err_msg = ("get keyspace_id failed: " + response.header().error().message()); log->warning(err_msg); throw Exception(err_msg, InternalError); } From 6badc3bb0d4c6b37e70cd8fb4966ac5de58c943e Mon Sep 17 00:00:00 2001 From: Liqi Geng Date: Mon, 2 Feb 2026 17:58:48 +0800 Subject: [PATCH 05/20] Add TTL mechanism for region cache (#225) close tikv/client-c#171, close tikv/client-c#223 Signed-off-by: gengliqi Signed-off-by: xzhangxian1008 --- include/pingcap/kv/RegionCache.h | 39 ++++++++++++-- include/pingcap/kv/Rpc.h | 1 + include/pingcap/pd/Client.h | 2 + include/pingcap/pd/IClient.h | 2 + include/pingcap/pd/MockPDClient.h | 2 + src/kv/RegionCache.cc | 87 +++++++++++++++++++++++++++++-- src/test/CMakeLists.txt | 1 + src/test/region_cache_test.cc | 67 ++++++++++++++++++++++++ 8 files changed, 194 insertions(+), 7 deletions(-) create mode 100644 src/test/region_cache_test.cc diff --git a/include/pingcap/kv/RegionCache.h b/include/pingcap/kv/RegionCache.h index eeda4904..10d33e57 100644 --- a/include/pingcap/kv/RegionCache.h +++ b/include/pingcap/kv/RegionCache.h @@ -7,6 +7,8 @@ #include #include +#include +#include #include #include @@ -87,19 +89,26 @@ struct Region metapb::Peer leader_peer; std::vector pending_peers; std::atomic_uint work_tiflash_peer_idx; + std::atomic_int64_t ttl; Region(const metapb::Region & meta_, const metapb::Peer & peer_) : meta(meta_) , leader_peer(peer_) , work_tiflash_peer_idx(0) - {} + , ttl(0) + { + initTTL(); + } Region(const metapb::Region & meta_, const metapb::Peer & peer_, const std::vector & pending_peers_) : meta(meta_) , leader_peer(peer_) , pending_peers(pending_peers_) , work_tiflash_peer_idx(0) - {} + , ttl(0) + { + initTTL(); + } const std::string & startKey() const { return meta.start_key(); } @@ -128,6 +137,24 @@ struct Region } return false; } + + inline void initTTL() + { + int64_t now = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + ttl = nextTTL(now); + } + + // nextTTL returns a random TTL in range [ts+base, ts+base+jitter). The input ts should be an epoch timestamp in seconds. + static int64_t nextTTL(int64_t ts); + + // checkRegionCacheTTL returns false means the region cache is expired. + bool checkRegionCacheTTL(int64_t ts); + + // setRegionCacheTTL configures region cache TTL and jitter (seconds). + static void setRegionCacheTTL(int64_t base_sec, int64_t jitter_sec); + + // setRegionCacheTTLEnabled enables or disables region cache TTL check. + static void setRegionCacheTTLEnabled(bool enable); }; using RegionPtr = std::shared_ptr; @@ -152,14 +179,16 @@ struct KeyLocation struct RPCContext { + uint64_t cluster_id; RegionVerID region; metapb::Region meta; metapb::Peer peer; Store store; std::string addr; - RPCContext(const RegionVerID & region_, const metapb::Region & meta_, const metapb::Peer & peer_, const Store & store_, const std::string & addr_) - : region(region_) + RPCContext(uint64_t cluster_id_, const RegionVerID & region_, const metapb::Region & meta_, const metapb::Peer & peer_, const Store & store_, const std::string & addr_) + : cluster_id(cluster_id_) + , region(region_) , meta(meta_) , peer(peer_) , store(store_) @@ -180,6 +209,7 @@ class RegionCache public: RegionCache(pd::ClientPtr pdClient_, const ClusterConfig & config) : pd_client(pdClient_) + , cluster_id(pd_client ? pd_client->getClusterID() : 0) , tiflash_engine_key(config.tiflash_engine_key) , tiflash_engine_value(config.tiflash_engine_value) , log(&Logger::get("pingcap.tikv")) @@ -266,6 +296,7 @@ class RegionCache std::map stores; pd::ClientPtr pd_client; + const uint64_t cluster_id; std::shared_mutex region_mutex; diff --git a/include/pingcap/kv/Rpc.h b/include/pingcap/kv/Rpc.h index b3a32cb3..7439b2fd 100644 --- a/include/pingcap/kv/Rpc.h +++ b/include/pingcap/kv/Rpc.h @@ -79,6 +79,7 @@ class RpcCall context->set_region_id(rpc_ctx->region.id); context->set_allocated_region_epoch(new metapb::RegionEpoch(rpc_ctx->meta.region_epoch())); context->set_allocated_peer(new metapb::Peer(rpc_ctx->peer)); + context->set_cluster_id(rpc_ctx->cluster_id); } void setClientContext(::grpc::ClientContext & context, int timeout, const GRPCMetaData & meta_data = {}) diff --git a/include/pingcap/pd/Client.h b/include/pingcap/pd/Client.h index 6b946008..6a52ed24 100644 --- a/include/pingcap/pd/Client.h +++ b/include/pingcap/pd/Client.h @@ -39,6 +39,8 @@ class Client : public IClient ~Client() override; + uint64_t getClusterID() override { return cluster_id; } + void update(const std::vector & addrs, const ClusterConfig & config) override; // only implement a weak get ts. diff --git a/include/pingcap/pd/IClient.h b/include/pingcap/pd/IClient.h index f2142b39..cfccdbf4 100644 --- a/include/pingcap/pd/IClient.h +++ b/include/pingcap/pd/IClient.h @@ -21,6 +21,8 @@ class IClient public: virtual ~IClient() = default; + virtual uint64_t getClusterID() = 0; + virtual uint64_t getTS() = 0; virtual pdpb::GetRegionResponse getRegionByKey(const std::string & key) = 0; diff --git a/include/pingcap/pd/MockPDClient.h b/include/pingcap/pd/MockPDClient.h index d03c2cd9..a04c4a6a 100644 --- a/include/pingcap/pd/MockPDClient.h +++ b/include/pingcap/pd/MockPDClient.h @@ -19,6 +19,8 @@ class MockPDClient : public IClient ~MockPDClient() override = default; + uint64_t getClusterID() override { return 1; } + uint64_t getGCSafePoint() override { return MOCKED_GC_SAFE_POINT; } uint64_t getGCSafePointV2(KeyspaceID) override { return MOCKED_GC_SAFE_POINT; } diff --git a/src/kv/RegionCache.cc b/src/kv/RegionCache.cc index c5575f46..c31f209e 100644 --- a/src/kv/RegionCache.cc +++ b/src/kv/RegionCache.cc @@ -3,10 +3,82 @@ #include #include +#include + namespace pingcap { namespace kv { + +std::atomic regionCacheTTLSec{600}; +std::atomic regionCacheTTLJitterSec{60}; +std::atomic enableRegionCacheTTL{false}; + +int64_t Region::nextTTL(int64_t ts) +{ + int64_t base = regionCacheTTLSec.load(std::memory_order_relaxed); + if (base < 0) + base = 0; + int64_t ttl = ts + base; + + static thread_local std::mt19937 generator{std::random_device{}()}; + + int64_t jitter_sec = regionCacheTTLJitterSec.load(std::memory_order_relaxed); + if (jitter_sec <= 0) + return ttl; + + std::uniform_int_distribution distribution(0, jitter_sec - 1); + int64_t jitter = distribution(generator); + + return ttl + jitter; +} + +bool Region::checkRegionCacheTTL(int64_t ts) +{ + if (!enableRegionCacheTTL.load(std::memory_order_relaxed)) + return true; + int64_t base = regionCacheTTLSec.load(std::memory_order_relaxed); + if (base < 0) + base = 0; + int64_t new_ttl = 0; + int64_t current_ttl = ttl.load(std::memory_order_relaxed); + while (true) + { + if (ts > current_ttl) + { + return false; + } + + // skip updating TTL when the TTL is far away from ts (still within jitter time) + if (current_ttl > ts + base) + { + return true; + } + + if (new_ttl == 0) + { + new_ttl = nextTTL(ts); + } + + // now we have ts <= ttl <= ts+regionCacheTTLSec <= newTTL = ts+regionCacheTTLSec+randomJitter + if (ttl.compare_exchange_weak(current_ttl, new_ttl, std::memory_order_relaxed)) + { + return true; + } + } +} + +void Region::setRegionCacheTTL(int64_t base_sec, int64_t jitter_sec) +{ + regionCacheTTLSec.store(base_sec, std::memory_order_relaxed); + regionCacheTTLJitterSec.store(jitter_sec, std::memory_order_relaxed); +} + +void Region::setRegionCacheTTLEnabled(bool enable) +{ + enableRegionCacheTTL.store(enable, std::memory_order_relaxed); +} + // load_balance is an option, because if store fail, it may cause batchCop fail. // For now, label_filter only works for tiflash. RPCContextPtr RegionCache::getRPCContext( // @@ -53,7 +125,7 @@ RPCContextPtr RegionCache::getRPCContext( // break; if (store_id_blocklist && store_id_blocklist->count(store.id) > 0) break; - return std::make_shared(id, meta, peer, store, store.addr); + return std::make_shared(cluster_id, id, meta, peer, store, store.addr); } } if (store_type == StoreType::TiFlash) @@ -91,7 +163,7 @@ RPCContextPtr RegionCache::getRPCContext( // // set the index for next access in order to balance the workload among all tiflash peers region->work_tiflash_peer_idx.store(peer_index); } - return std::make_shared(id, meta, peer, store, store.addr); + return std::make_shared(cluster_id, id, meta, peer, store, store.addr); } dropRegion(id); bo.backoff(boRegionMiss, Exception("region miss, region_id is: " + std::to_string(id.id), RegionUnavailable)); @@ -129,7 +201,16 @@ KeyLocation RegionCache::locateKey(Backoffer & bo, const std::string & key) RegionPtr region = searchCachedRegion(key); if (region != nullptr) { - return KeyLocation(region->verID(), region->startKey(), region->endKey()); + int64_t now = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + if (region->checkRegionCacheTTL(now)) + { + return KeyLocation(region->verID(), region->startKey(), region->endKey()); + } + else + { + log->debug("region cache ttl expired for region_id: " + std::to_string(region->meta.id())); + dropRegion(region->verID()); + } } region = loadRegionByKey(bo, key); diff --git a/src/test/CMakeLists.txt b/src/test/CMakeLists.txt index 8fcb3e7b..afad481c 100644 --- a/src/test/CMakeLists.txt +++ b/src/test/CMakeLists.txt @@ -13,6 +13,7 @@ list(APPEND test_libs add_executable(kv_client_ut io_or_region_error_get_test.cc region_split_test.cc + region_cache_test.cc lock_resolve_test.cc coprocessor_test.cc batch_coprocessor_test.cc diff --git a/src/test/region_cache_test.cc b/src/test/region_cache_test.cc new file mode 100644 index 00000000..35e1f31c --- /dev/null +++ b/src/test/region_cache_test.cc @@ -0,0 +1,67 @@ +#include +#include + +#include "test_helper.h" + +namespace pingcap::tests +{ +using namespace pingcap::kv; + +namespace +{ +Region makeRegion() +{ + metapb::Region meta; + meta.set_id(1); + meta.set_start_key("a"); + meta.set_end_key("b"); + auto * epoch = meta.mutable_region_epoch(); + epoch->set_conf_ver(1); + epoch->set_version(1); + + metapb::Peer leader; + leader.set_id(1); + leader.set_store_id(1); + return Region(meta, leader); +} +} // namespace + +class RegionCacheTest : public testing::Test +{ +}; + +TEST_F(RegionCacheTest, CheckRegionCacheTTL) +{ + Region::setRegionCacheTTL(/*base_sec=*/2, /*jitter_sec=*/0); + Region::setRegionCacheTTLEnabled(true); + + int64_t ts = 1000; + + // expired + { + Region region = makeRegion(); + region.ttl.store(ts - 1); + EXPECT_FALSE(region.checkRegionCacheTTL(ts)); + } + + // refresh on access + { + Region region = makeRegion(); + region.ttl.store(ts); + EXPECT_TRUE(region.checkRegionCacheTTL(ts)); + EXPECT_EQ(region.ttl.load(), ts + 2); + } + + // skip refresh when far away + { + Region region = makeRegion(); + region.ttl.store(ts + 10); + EXPECT_TRUE(region.checkRegionCacheTTL(ts)); + EXPECT_EQ(region.ttl.load(), ts + 10); + } + + Region::setRegionCacheTTLEnabled(false); + Region::setRegionCacheTTL(/*base_sec=*/600, /*jitter_sec=*/60); +} + +} // namespace pingcap::tests From 7a5e80828fce6e70a3ec22b2a06e16c3c6db2a10 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 10 Mar 2026 11:58:08 +0800 Subject: [PATCH 06/20] OWNERS: Auto Sync OWNERS files from community membership (#229) Signed-off-by: Ti Chi Robot Signed-off-by: xzhangxian1008 --- OWNERS_ALIASES | 1 + 1 file changed, 1 insertion(+) diff --git a/OWNERS_ALIASES b/OWNERS_ALIASES index 538aa528..725a4b7f 100644 --- a/OWNERS_ALIASES +++ b/OWNERS_ALIASES @@ -21,6 +21,7 @@ aliases: - hbisheng - hhwyt - jayzhan211 + - jiadebin - lcwangchao - lhy1024 - longfangsong From a07dd677698d85cf5f6075b0ebc0d067a7a0ce9e Mon Sep 17 00:00:00 2001 From: yebin <50281525+lybcodes@users.noreply.github.com> Date: Thu, 9 Apr 2026 15:47:57 +0800 Subject: [PATCH 07/20] ci: add GitHub Actions pull-build-test workflow (#231) Signed-off-by: lyb Signed-off-by: xzhangxian1008 --- .github/workflows/pull-build-test.yml | 45 +++++++++++++++ ci/Dockerfile | 80 ++++++++++++++++++--------- cmake/Modules/FindgRPC.cmake | 6 +- 3 files changed, 104 insertions(+), 27 deletions(-) create mode 100644 .github/workflows/pull-build-test.yml diff --git a/.github/workflows/pull-build-test.yml b/.github/workflows/pull-build-test.yml new file mode 100644 index 00000000..db1b75e7 --- /dev/null +++ b/.github/workflows/pull-build-test.yml @@ -0,0 +1,45 @@ +name: pull-build-test + +on: + pull_request: + branches: + - master + - feature/** + - release-fts-* + - "!feature/release-*" + +permissions: + contents: read + +jobs: + build-test: + runs-on: ubuntu-latest + timeout-minutes: 120 + + steps: + - name: Checkout + uses: actions/checkout@v4 + with: + fetch-depth: 0 + submodules: recursive + + - name: Build Builder Image + run: docker build -f ci/Dockerfile -t local/tikv-client-c-builder:latest . + + - name: Build And Test + run: | + docker run --rm \ + --user "$(id -u):$(id -g)" \ + -e HOME=/tmp \ + -v "${GITHUB_WORKSPACE}:/client-c" \ + -w /client-c \ + local/tikv-client-c-builder:latest \ + /bin/bash -c "/client-c/ci/build-test.sh" + + - name: Upload Logs + if: always() + uses: actions/upload-artifact@v4 + with: + name: build-logs + path: build/**/*.log + if-no-files-found: ignore diff --git a/ci/Dockerfile b/ci/Dockerfile index 74a1b1ee..a83008bd 100644 --- a/ci/Dockerfile +++ b/ci/Dockerfile @@ -1,26 +1,54 @@ -FROM ubuntu:23.04 - -RUN apt update -y \ - && apt install -y cmake build-essential \ - wget git \ - protobuf-compiler libprotobuf-dev libgrpc-dev libgrpc++-dev libc-ares-dev protobuf-compiler-grpc libpoco-dev - -RUN rm -rf /var/lib/apt/lists/* - -#back to root dir and download golang -RUN cd / - -ENV GOLANG_VERSION 1.13.3 - -RUN wget -O go.tgz "https://dl.google.com/go/go$GOLANG_VERSION.linux-amd64.tar.gz"; \ -tar -C /usr/local -xzf go.tgz; \ -rm go.tgz; \ -export PATH="/usr/local/go/bin:$PATH"; \ -go version - -ENV GOPATH /go -ENV PATH $GOPATH/bin:/usr/local/go/bin:$PATH - -RUN cd / - -RUN git clone https://github.com/tikv/mock-tikv.git && cd mock-tikv && git checkout 60d5921028afd72e1aeba880b9052c40e932eef3 && make failpoint-enable && make +FROM ubuntu:22.04 + +ARG DEBIAN_FRONTEND=noninteractive +ARG GOLANG_VERSION=1.13.3 +ARG TARGETARCH +ARG GO_SHA256_AMD64=0804bf02020dceaa8a7d7275ee79f7a142f1996bfd0c39216ccb405f93f994c0 +ARG GO_SHA256_ARM64=9fa65ae42665baff53802091b49b83af6f2e397986b6cbea2ae30e2c7ee0f2f2 + +SHELL ["/bin/bash", "-o", "pipefail", "-c"] + +RUN apt-get update \ + && apt-get install -y --no-install-recommends \ + build-essential \ + ca-certificates \ + cmake \ + git \ + protobuf-compiler \ + libprotobuf-dev \ + libgrpc-dev \ + libgrpc++-dev \ + libc-ares-dev \ + protobuf-compiler-grpc \ + libpoco-dev \ + wget \ + && rm -rf /var/lib/apt/lists/* + +RUN case "${TARGETARCH}" in \ + amd64) GO_ARCH="${TARGETARCH}"; GO_SHA256="${GO_SHA256_AMD64}" ;; \ + arm64) GO_ARCH="${TARGETARCH}"; GO_SHA256="${GO_SHA256_ARM64}" ;; \ + "") GO_ARCH="amd64"; GO_SHA256="${GO_SHA256_AMD64}" ;; \ + *) echo "unsupported TARGETARCH: ${TARGETARCH}" >&2; exit 1 ;; \ + esac \ + && wget -q -O /tmp/go.tgz "https://dl.google.com/go/go${GOLANG_VERSION}.linux-${GO_ARCH}.tar.gz" \ + && echo "${GO_SHA256} /tmp/go.tgz" | sha256sum -c - \ + && tar -C /usr/local -xzf /tmp/go.tgz \ + && rm -f /tmp/go.tgz + +ENV GOPATH=/go +ENV PATH=/usr/local/go/bin:${GOPATH}/bin:${PATH} +ENV HOME=/home/ciuser + +RUN groupadd --system ciuser \ + && useradd --system --create-home --gid ciuser ciuser \ + && mkdir -p /go /mock-tikv \ + && chown -R ciuser:ciuser /go /mock-tikv + +USER ciuser + +RUN git clone --depth 1 https://github.com/tikv/mock-tikv.git /mock-tikv \ + && cd /mock-tikv \ + && git fetch --depth 1 origin 60d5921028afd72e1aeba880b9052c40e932eef3 \ + && git checkout 60d5921028afd72e1aeba880b9052c40e932eef3 \ + && make failpoint-enable \ + && make diff --git a/cmake/Modules/FindgRPC.cmake b/cmake/Modules/FindgRPC.cmake index 62605b94..668aa42d 100644 --- a/cmake/Modules/FindgRPC.cmake +++ b/cmake/Modules/FindgRPC.cmake @@ -268,7 +268,11 @@ else() set(gRPC_LIBRARIES ${gRPC_LIBRARIES} ${gRPC_LIBRARY}) endif() endif() -set(gRPC_LIBRARIES ${gRPC_LIBRARIES} ${gRPC_CARES_LIBRARY} ${gRPC_GPR_LIBRARY} ${gRPC_ADDRESS_SORTING_LIBRARY} ${gRPC_UPB_LIBRARY}) +foreach(_dep IN ITEMS gRPC_CARES_LIBRARY gRPC_GPR_LIBRARY gRPC_ADDRESS_SORTING_LIBRARY gRPC_UPB_LIBRARY) + if(${_dep}) + list(APPEND gRPC_LIBRARIES "${${_dep}}") + endif() +endforeach() # Restore the original find library ordering. if(gRPC_USE_STATIC_LIBS) From 9451c37521077837466a7a674d1dabfa5895b9c5 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Mon, 20 Apr 2026 11:42:38 +0800 Subject: [PATCH 08/20] add function to remove useless grpc conn Signed-off-by: xzhangxian1008 --- include/pingcap/kv/RegionClient.h | 6 +++++- include/pingcap/kv/Rpc.h | 20 ++++++++++++++++++++ src/common/MPPProber.cc | 1 + src/kv/Rpc.cc | 6 ++++++ 4 files changed, 32 insertions(+), 1 deletion(-) diff --git a/include/pingcap/kv/RegionClient.h b/include/pingcap/kv/RegionClient.h index 4da18096..62e5037a 100644 --- a/include/pingcap/kv/RegionClient.h +++ b/include/pingcap/kv/RegionClient.h @@ -74,6 +74,7 @@ struct RegionClient if (!status.ok()) { auto extra_msg = "region_id: " + region_id.toString() + ", addr: " + ctx->addr; + rpc.dropConnIfNeeded(status); if (status.error_code() == ::grpc::StatusCode::UNIMPLEMENTED) { // The rpc is not implemented on this service. @@ -125,7 +126,10 @@ struct RegionClient { if (no_resp) return ::grpc::Status::OK; - return reader->Finish(); + auto status = reader->Finish(); + if (client != nullptr && shouldRemoveConnOnStatus(status)) + (*client)->removeConn(addr); + return status; } private: diff --git a/include/pingcap/kv/Rpc.h b/include/pingcap/kv/Rpc.h index 7439b2fd..f8f20243 100644 --- a/include/pingcap/kv/Rpc.h +++ b/include/pingcap/kv/Rpc.h @@ -27,6 +27,18 @@ struct ConnArray std::shared_ptr get(); }; +inline bool shouldRemoveConnOnStatus(const ::grpc::Status & status) +{ + switch (status.error_code()) + { + case ::grpc::StatusCode::UNAVAILABLE: + case ::grpc::StatusCode::CANCELLED: + return true; + default: + return false; + } +} + using ConnArrayPtr = std::shared_ptr; using GRPCMetaData = std::multimap; @@ -54,6 +66,8 @@ struct RpcClient ConnArrayPtr getConnArray(const std::string & addr); ConnArrayPtr createConnArray(const std::string & addr); + + void removeConn(const std::string & addr); }; using RpcClientPtr = std::unique_ptr; @@ -107,6 +121,12 @@ class RpcCall return msg; } + void dropConnIfNeeded(const ::grpc::Status & status) + { + if (shouldRemoveConnOnStatus(status)) + client->removeConn(addr); + } + private: const RpcClientPtr & client; const std::string & addr; diff --git a/src/common/MPPProber.cc b/src/common/MPPProber.cc index 8823d54f..4010573d 100644 --- a/src/common/MPPProber.cc +++ b/src/common/MPPProber.cc @@ -144,6 +144,7 @@ bool detectStore(kv::RpcClientPtr & rpc_client, const std::string & store_addr, auto status = rpc.call(&context, req, &resp); if (!status.ok()) { + rpc.dropConnIfNeeded(status); log->warning("detect failed: " + store_addr + " error: " + rpc.errMsg(status, "")); return false; } diff --git a/src/kv/Rpc.cc b/src/kv/Rpc.cc index dddb4bc9..42f9bed3 100644 --- a/src/kv/Rpc.cc +++ b/src/kv/Rpc.cc @@ -40,5 +40,11 @@ ConnArrayPtr RpcClient::createConnArray(const std::string & addr) return conn_array; } +void RpcClient::removeConn(const std::string & addr) +{ + std::lock_guard lock(mutex); + conns.erase(addr); +} + } // namespace kv } // namespace pingcap From 6ff1ddce70d9695e4db9a8dc5bd399525e722350 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Mon, 20 Apr 2026 12:37:54 +0800 Subject: [PATCH 09/20] tweaking Signed-off-by: xzhangxian1008 --- include/pingcap/kv/RegionClient.h | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/include/pingcap/kv/RegionClient.h b/include/pingcap/kv/RegionClient.h index 62e5037a..83f2b9b3 100644 --- a/include/pingcap/kv/RegionClient.h +++ b/include/pingcap/kv/RegionClient.h @@ -128,12 +128,14 @@ struct RegionClient return ::grpc::Status::OK; auto status = reader->Finish(); if (client != nullptr && shouldRemoveConnOnStatus(status)) - (*client)->removeConn(addr); + client->removeConn(addr); return status; } private: friend struct RegionClient; + RpcClient * client = nullptr; + std::string addr; ::grpc::ClientContext context; std::unique_ptr<::grpc::ClientReader> reader; bool no_resp = false; @@ -169,6 +171,8 @@ struct RegionClient } auto stream_reader = std::make_unique>(); + stream_reader->client = cluster->rpc_client.get(); + stream_reader->addr = ctx->addr; RpcCall rpc(cluster->rpc_client, ctx->addr); rpc.setRequestCtx(req, ctx, cluster->api_version); rpc.setClientContext(stream_reader->context, timeout, meta_data); @@ -198,6 +202,7 @@ struct RegionClient return stream_reader; } auto extra_msg = "region_id: " + region_id.toString() + ", addr: " + ctx->addr; + rpc.dropConnIfNeeded(status); if (status.error_code() == ::grpc::StatusCode::UNIMPLEMENTED) { From 7264109608a481d070f0404b3e0c5931f49e3411 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Mon, 20 Apr 2026 16:20:10 +0800 Subject: [PATCH 10/20] tweaking Signed-off-by: xzhangxian1008 --- include/pingcap/kv/RegionClient.h | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/include/pingcap/kv/RegionClient.h b/include/pingcap/kv/RegionClient.h index 83f2b9b3..e908c8e6 100644 --- a/include/pingcap/kv/RegionClient.h +++ b/include/pingcap/kv/RegionClient.h @@ -74,12 +74,12 @@ struct RegionClient if (!status.ok()) { auto extra_msg = "region_id: " + region_id.toString() + ", addr: " + ctx->addr; - rpc.dropConnIfNeeded(status); if (status.error_code() == ::grpc::StatusCode::UNIMPLEMENTED) { // The rpc is not implemented on this service. throw Exception("rpc is not implemented: " + rpc.errMsg(status, extra_msg), GRPCNotImplemented); } + rpc.dropConnIfNeeded(status); std::string err_msg = rpc.errMsg(status, extra_msg); log->warning(err_msg); onSendFail(bo, Exception(err_msg, GRPCErrorCode), ctx); @@ -202,13 +202,12 @@ struct RegionClient return stream_reader; } auto extra_msg = "region_id: " + region_id.toString() + ", addr: " + ctx->addr; - rpc.dropConnIfNeeded(status); if (status.error_code() == ::grpc::StatusCode::UNIMPLEMENTED) { - // The rpc is not implemented on this service. throw Exception("rpc is not implemented: " + rpc.errMsg(status, extra_msg), GRPCNotImplemented); } + rpc.dropConnIfNeeded(status); std::string err_msg = rpc.errMsg(status, extra_msg); log->warning(err_msg); onSendFail(bo, Exception(err_msg, GRPCErrorCode), ctx); From 148caa0e6fcc3fa0746c4fa85367f9382a3a6d6c Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Tue, 21 Apr 2026 16:37:10 +0800 Subject: [PATCH 11/20] remove needless call Signed-off-by: xzhangxian1008 --- include/pingcap/kv/RegionClient.h | 17 ++++------------- include/pingcap/kv/Rpc.h | 12 ++++++------ src/common/MPPProber.cc | 1 - src/kv/RegionClient.cc | 3 ++- 4 files changed, 12 insertions(+), 21 deletions(-) diff --git a/include/pingcap/kv/RegionClient.h b/include/pingcap/kv/RegionClient.h index e908c8e6..cdc573d0 100644 --- a/include/pingcap/kv/RegionClient.h +++ b/include/pingcap/kv/RegionClient.h @@ -79,10 +79,9 @@ struct RegionClient // The rpc is not implemented on this service. throw Exception("rpc is not implemented: " + rpc.errMsg(status, extra_msg), GRPCNotImplemented); } - rpc.dropConnIfNeeded(status); std::string err_msg = rpc.errMsg(status, extra_msg); log->warning(err_msg); - onSendFail(bo, Exception(err_msg, GRPCErrorCode), ctx); + onSendFail(bo, Exception(err_msg, GRPCErrorCode), ctx, status); continue; } if (resp->has_region_error()) @@ -126,16 +125,11 @@ struct RegionClient { if (no_resp) return ::grpc::Status::OK; - auto status = reader->Finish(); - if (client != nullptr && shouldRemoveConnOnStatus(status)) - client->removeConn(addr); - return status; + return reader->Finish(); } private: friend struct RegionClient; - RpcClient * client = nullptr; - std::string addr; ::grpc::ClientContext context; std::unique_ptr<::grpc::ClientReader> reader; bool no_resp = false; @@ -171,8 +165,6 @@ struct RegionClient } auto stream_reader = std::make_unique>(); - stream_reader->client = cluster->rpc_client.get(); - stream_reader->addr = ctx->addr; RpcCall rpc(cluster->rpc_client, ctx->addr); rpc.setRequestCtx(req, ctx, cluster->api_version); rpc.setClientContext(stream_reader->context, timeout, meta_data); @@ -207,10 +199,9 @@ struct RegionClient // The rpc is not implemented on this service. throw Exception("rpc is not implemented: " + rpc.errMsg(status, extra_msg), GRPCNotImplemented); } - rpc.dropConnIfNeeded(status); std::string err_msg = rpc.errMsg(status, extra_msg); log->warning(err_msg); - onSendFail(bo, Exception(err_msg, GRPCErrorCode), ctx); + onSendFail(bo, Exception(err_msg, GRPCErrorCode), ctx, status); } } @@ -218,7 +209,7 @@ struct RegionClient void onRegionError(Backoffer & bo, RPCContextPtr rpc_ctx, const errorpb::Error & err) const; // Normally, it happens when machine down or network partition between tidb and kv or process crash. - void onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx) const; + void onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx, const ::grpc::Status & status) const; }; } // namespace kv diff --git a/include/pingcap/kv/Rpc.h b/include/pingcap/kv/Rpc.h index f8f20243..02a68101 100644 --- a/include/pingcap/kv/Rpc.h +++ b/include/pingcap/kv/Rpc.h @@ -39,6 +39,12 @@ inline bool shouldRemoveConnOnStatus(const ::grpc::Status & status) } } +inline void dropConnIfNeeded(const RpcClientPtr & client, const std::string & addr, const ::grpc::Status & status) +{ + if (shouldRemoveConnOnStatus(status)) + client->removeConn(addr); +} + using ConnArrayPtr = std::shared_ptr; using GRPCMetaData = std::multimap; @@ -121,12 +127,6 @@ class RpcCall return msg; } - void dropConnIfNeeded(const ::grpc::Status & status) - { - if (shouldRemoveConnOnStatus(status)) - client->removeConn(addr); - } - private: const RpcClientPtr & client; const std::string & addr; diff --git a/src/common/MPPProber.cc b/src/common/MPPProber.cc index 4010573d..8823d54f 100644 --- a/src/common/MPPProber.cc +++ b/src/common/MPPProber.cc @@ -144,7 +144,6 @@ bool detectStore(kv::RpcClientPtr & rpc_client, const std::string & store_addr, auto status = rpc.call(&context, req, &resp); if (!status.ok()) { - rpc.dropConnIfNeeded(status); log->warning("detect failed: " + store_addr + " error: " + rpc.errMsg(status, "")); return false; } diff --git a/src/kv/RegionClient.cc b/src/kv/RegionClient.cc index 6c369f92..faf4599c 100644 --- a/src/kv/RegionClient.cc +++ b/src/kv/RegionClient.cc @@ -106,8 +106,9 @@ void RegionClient::onRegionError(Backoffer & bo, RPCContextPtr rpc_ctx, const er cluster->region_cache->dropRegion(rpc_ctx->region); } -void RegionClient::onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx) const +void RegionClient::onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx, const ::grpc::Status & status) const { + dropConnIfNeeded(cluster->rpc_client, rpc_ctx->addr, status); cluster->region_cache->onSendReqFail(rpc_ctx, e); // Retry on send request failure when it's not canceled. // When a store is not available, the leader of related region should be elected quickly. From ed20a9ca4806b6cbe26d3c8fa50f58f8cb858fce Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Tue, 21 Apr 2026 16:40:06 +0800 Subject: [PATCH 12/20] tweaking Signed-off-by: xzhangxian1008 --- include/pingcap/kv/Rpc.h | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/include/pingcap/kv/Rpc.h b/include/pingcap/kv/Rpc.h index 02a68101..92ecc736 100644 --- a/include/pingcap/kv/Rpc.h +++ b/include/pingcap/kv/Rpc.h @@ -29,14 +29,7 @@ struct ConnArray inline bool shouldRemoveConnOnStatus(const ::grpc::Status & status) { - switch (status.error_code()) - { - case ::grpc::StatusCode::UNAVAILABLE: - case ::grpc::StatusCode::CANCELLED: - return true; - default: - return false; - } + return status.error_code() == grpc::StatusCode::UNAVAILABLE; } inline void dropConnIfNeeded(const RpcClientPtr & client, const std::string & addr, const ::grpc::Status & status) From 93e6e1c20551a2fbcdf3df8bd274018941e94cfd Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Tue, 21 Apr 2026 16:41:17 +0800 Subject: [PATCH 13/20] tweaking Signed-off-by: xzhangxian1008 --- include/pingcap/kv/Rpc.h | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/include/pingcap/kv/Rpc.h b/include/pingcap/kv/Rpc.h index 92ecc736..6c4121dd 100644 --- a/include/pingcap/kv/Rpc.h +++ b/include/pingcap/kv/Rpc.h @@ -27,14 +27,9 @@ struct ConnArray std::shared_ptr get(); }; -inline bool shouldRemoveConnOnStatus(const ::grpc::Status & status) -{ - return status.error_code() == grpc::StatusCode::UNAVAILABLE; -} - inline void dropConnIfNeeded(const RpcClientPtr & client, const std::string & addr, const ::grpc::Status & status) { - if (shouldRemoveConnOnStatus(status)) + if (status.error_code() == grpc::StatusCode::UNAVAILABLE) client->removeConn(addr); } From cd31303f118062862249d6059f763aa003d61819 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Tue, 21 Apr 2026 17:29:27 +0800 Subject: [PATCH 14/20] add background thread; need more refine Signed-off-by: xzhangxian1008 --- include/pingcap/kv/Cluster.h | 5 +- include/pingcap/kv/Rpc.h | 34 ++++++++++--- src/kv/Cluster.cc | 3 ++ src/kv/Rpc.cc | 95 ++++++++++++++++++++++++++++++++++++ 4 files changed, 129 insertions(+), 8 deletions(-) diff --git a/include/pingcap/kv/Cluster.h b/include/pingcap/kv/Cluster.h index ec2eac07..6561f79f 100644 --- a/include/pingcap/kv/Cluster.h +++ b/include/pingcap/kv/Cluster.h @@ -35,7 +35,7 @@ struct Cluster Cluster() : pd_client(std::make_shared()) , rpc_client(std::make_unique()) - , thread_pool(std::make_unique(1)) + , thread_pool(std::make_unique(2)) , mpp_prober(std::make_unique(this)) { startBackgroundTasks(); @@ -48,7 +48,7 @@ struct Cluster , oracle(std::make_unique(pd_client, std::chrono::milliseconds(oracle_update_interval))) , lock_resolver(std::make_unique(this)) , api_version(config.api_version) - , thread_pool(std::make_unique(2)) + , thread_pool(std::make_unique(3)) , mpp_prober(std::make_unique(this)) { startBackgroundTasks(); @@ -64,6 +64,7 @@ struct Cluster // (e.g. background threads) that cluster object holds so as to exit elegantly. ~Cluster() { + rpc_client->stop(); mpp_prober->stop(); if (region_cache) region_cache->stop(); diff --git a/include/pingcap/kv/Rpc.h b/include/pingcap/kv/Rpc.h index 6c4121dd..51a102d2 100644 --- a/include/pingcap/kv/Rpc.h +++ b/include/pingcap/kv/Rpc.h @@ -1,10 +1,14 @@ #pragma once +#include +#include +#include #include #include #include #include +#include #include #include @@ -12,6 +16,9 @@ namespace pingcap { namespace kv { +constexpr auto rpc_conn_check_interval = std::chrono::minutes(5); +constexpr size_t rpc_conn_check_timeout = 2; + struct ConnArray { std::mutex mutex; @@ -27,12 +34,6 @@ struct ConnArray std::shared_ptr get(); }; -inline void dropConnIfNeeded(const RpcClientPtr & client, const std::string & addr, const ::grpc::Status & status) -{ - if (status.error_code() == grpc::StatusCode::UNAVAILABLE) - client->removeConn(addr); -} - using ConnArrayPtr = std::shared_ptr; using GRPCMetaData = std::multimap; @@ -44,6 +45,13 @@ struct RpcClient std::map conns; + Logger * log = &Logger::get("pingcap.RpcClient"); + std::chrono::minutes scan_interval = rpc_conn_check_interval; + size_t detect_rpc_timeout = rpc_conn_check_timeout; + std::atomic stopped = false; + std::mutex scan_mu; + std::condition_variable scan_cv; + RpcClient() = default; explicit RpcClient(const ClusterConfig & config_) @@ -57,15 +65,29 @@ struct RpcClient conns.clear(); } + void run(); + + void stop(); + + void scanConns(); + ConnArrayPtr getConnArray(const std::string & addr); ConnArrayPtr createConnArray(const std::string & addr); void removeConn(const std::string & addr); + + void removeConn(const std::string & addr, const ConnArrayPtr & expected); }; using RpcClientPtr = std::unique_ptr; +inline void dropConnIfNeeded(const RpcClientPtr & client, const std::string & addr, const ::grpc::Status & status) +{ + if (status.error_code() == grpc::StatusCode::UNAVAILABLE) + client->removeConn(addr); +} + // RpcCall holds the request and response, and delegates RPC calls. template class RpcCall diff --git a/src/kv/Cluster.cc b/src/kv/Cluster.cc index 70b6c903..a5a10eb0 100644 --- a/src/kv/Cluster.cc +++ b/src/kv/Cluster.cc @@ -31,6 +31,9 @@ void Cluster::startBackgroundTasks() { thread_pool->start(); + thread_pool->enqueue([this] { + rpc_client->run(); + }); thread_pool->enqueue([this] { mpp_prober->run(); }); diff --git a/src/kv/Rpc.cc b/src/kv/Rpc.cc index 42f9bed3..c616bbe9 100644 --- a/src/kv/Rpc.cc +++ b/src/kv/Rpc.cc @@ -1,9 +1,42 @@ +#include #include namespace pingcap { namespace kv { +namespace +{ +bool isConnValid(const std::shared_ptr & conn_client, size_t rpc_timeout) +{ + auto state = conn_client->channel->GetState(false); + if (state == GRPC_CHANNEL_READY) + return true; + + auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(rpc_timeout); + if (conn_client->channel->WaitForConnected(deadline)) + return true; + + return false; +} + +bool isConnArrayValid(const ConnArrayPtr & conn_array, size_t rpc_timeout) +{ + std::vector> conn_snapshot; + { + std::lock_guard lock(conn_array->mutex); + conn_snapshot = conn_array->vec; + } + + for (const auto & conn_client : conn_snapshot) + { + if (!isConnValid(conn_client, rpc_timeout)) + return false; + } + return true; +} +} // namespace + ConnArray::ConnArray(size_t max_size, const std::string & addr, const ClusterConfig & config_) : address(addr) , index(0) @@ -22,6 +55,60 @@ std::shared_ptr ConnArray::get() return vec[index]; } +void RpcClient::run() +{ + while (!stopped.load()) + { + { + std::unique_lock lock(scan_mu); + scan_cv.wait_for(lock, scan_interval, [this] { + return stopped.load(); + }); + } + + if (stopped.load()) + return; + + try + { + scanConns(); + } + catch (...) + { + log->warning(getCurrentExceptionMsg("RpcClient scan conns failed: ")); + } + } +} + +void RpcClient::stop() +{ + stopped.store(true); + std::lock_guard lock(scan_mu); + scan_cv.notify_all(); +} + +void RpcClient::scanConns() +{ + std::vector> conn_snapshot; + { + std::lock_guard lock(mutex); + conn_snapshot.reserve(conns.size()); + for (const auto & [addr, conn_array] : conns) + conn_snapshot.emplace_back(addr, conn_array); + } + + for (const auto & [addr, conn_array] : conn_snapshot) + { + if (!isConnArrayValid(conn_array, detect_rpc_timeout)) + { + log->information("delete unavailable addr: " + addr); + // RpcClient caches a connection pool per address, so drop the whole pool + // and let subsequent requests recreate fresh underlying channels lazily. + removeConn(addr, conn_array); + } + } +} + ConnArrayPtr RpcClient::getConnArray(const std::string & addr) { std::lock_guard lock(mutex); @@ -46,5 +133,13 @@ void RpcClient::removeConn(const std::string & addr) conns.erase(addr); } +void RpcClient::removeConn(const std::string & addr, const ConnArrayPtr & expected) +{ + std::lock_guard lock(mutex); + auto it = conns.find(addr); + if (it != conns.end() && it->second == expected) + conns.erase(it); +} + } // namespace kv } // namespace pingcap From 88f5a7fcae0b2f7fda9d351599b397f3530d991a Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Tue, 21 Apr 2026 19:18:59 +0800 Subject: [PATCH 15/20] refine Signed-off-by: xzhangxian1008 --- include/pingcap/kv/Rpc.h | 11 +++++++--- src/kv/Rpc.cc | 47 ++++++++++++++++++++++++++++++---------- 2 files changed, 44 insertions(+), 14 deletions(-) diff --git a/include/pingcap/kv/Rpc.h b/include/pingcap/kv/Rpc.h index 51a102d2..0d1b64e4 100644 --- a/include/pingcap/kv/Rpc.h +++ b/include/pingcap/kv/Rpc.h @@ -25,7 +25,7 @@ struct ConnArray std::string address; size_t index = 0; - std::vector> vec; + std::vector> vec; // TODO(xzx) why this is a vector? ConnArray() = default; @@ -49,8 +49,8 @@ struct RpcClient std::chrono::minutes scan_interval = rpc_conn_check_interval; size_t detect_rpc_timeout = rpc_conn_check_timeout; std::atomic stopped = false; - std::mutex scan_mu; std::condition_variable scan_cv; + std::vector invalid_conns; RpcClient() = default; @@ -63,6 +63,7 @@ struct RpcClient std::unique_lock lk(mutex); config = config_; conns.clear(); + invalid_conns.clear(); } void run(); @@ -71,6 +72,10 @@ struct RpcClient void scanConns(); + void markConnInvalid(const std::string & addr); + + void removeInvalidConns(); + ConnArrayPtr getConnArray(const std::string & addr); ConnArrayPtr createConnArray(const std::string & addr); @@ -85,7 +90,7 @@ using RpcClientPtr = std::unique_ptr; inline void dropConnIfNeeded(const RpcClientPtr & client, const std::string & addr, const ::grpc::Status & status) { if (status.error_code() == grpc::StatusCode::UNAVAILABLE) - client->removeConn(addr); + client->markConnInvalid(addr); } // RpcCall holds the request and response, and delegates RPC calls. diff --git a/src/kv/Rpc.cc b/src/kv/Rpc.cc index c616bbe9..73ca0de7 100644 --- a/src/kv/Rpc.cc +++ b/src/kv/Rpc.cc @@ -14,10 +14,7 @@ bool isConnValid(const std::shared_ptr & conn_client, size_t rpc_t return true; auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(rpc_timeout); - if (conn_client->channel->WaitForConnected(deadline)) - return true; - - return false; + return conn_client->channel->WaitForConnected(deadline); } bool isConnArrayValid(const ConnArrayPtr & conn_array, size_t rpc_timeout) @@ -59,19 +56,28 @@ void RpcClient::run() { while (!stopped.load()) { + bool has_invalid_conns = false; { - std::unique_lock lock(scan_mu); + std::unique_lock lock(mutex); scan_cv.wait_for(lock, scan_interval, [this] { - return stopped.load(); + return stopped.load() || !invalid_conns.empty(); }); + has_invalid_conns = !invalid_conns.empty(); } if (stopped.load()) return; + if (has_invalid_conns) + { + removeInvalidConns(); + continue; + } + try { scanConns(); + removeInvalidConns(); } catch (...) { @@ -83,7 +89,6 @@ void RpcClient::run() void RpcClient::stop() { stopped.store(true); - std::lock_guard lock(scan_mu); scan_cv.notify_all(); } @@ -101,14 +106,34 @@ void RpcClient::scanConns() { if (!isConnArrayValid(conn_array, detect_rpc_timeout)) { - log->information("delete unavailable addr: " + addr); - // RpcClient caches a connection pool per address, so drop the whole pool - // and let subsequent requests recreate fresh underlying channels lazily. - removeConn(addr, conn_array); + std::lock_guard lock(mutex); + invalid_conns.push_back(addr); } } } +void RpcClient::markConnInvalid(const std::string & addr) +{ + std::lock_guard lock(mutex); + invalid_conns.push_back(addr); + scan_cv.notify_all(); +} + +void RpcClient::removeInvalidConns() +{ + std::lock_guard lock(mutex); + if (invalid_conns.empty()) + return; + + for (const auto & addr : invalid_conns) + { + log->information("delete unavailable addr: " + addr); + conns.erase(addr); + } + + invalid_conns.clear(); +} + ConnArrayPtr RpcClient::getConnArray(const std::string & addr) { std::lock_guard lock(mutex); From 1530578e47a933aa252338d3bef1e4d59b1b819f Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Wed, 22 Apr 2026 10:05:23 +0800 Subject: [PATCH 16/20] refine Signed-off-by: xzhangxian1008 --- include/pingcap/kv/Rpc.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/pingcap/kv/Rpc.h b/include/pingcap/kv/Rpc.h index 0d1b64e4..994dfb5c 100644 --- a/include/pingcap/kv/Rpc.h +++ b/include/pingcap/kv/Rpc.h @@ -25,7 +25,7 @@ struct ConnArray std::string address; size_t index = 0; - std::vector> vec; // TODO(xzx) why this is a vector? + std::vector> vec; ConnArray() = default; From 696975707e1b31c7059d3bd2f1e96814e2d5226c Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Wed, 22 Apr 2026 11:00:15 +0800 Subject: [PATCH 17/20] remove useless codes Signed-off-by: xzhangxian1008 --- include/pingcap/kv/Rpc.h | 4 ---- src/kv/Rpc.cc | 15 --------------- 2 files changed, 19 deletions(-) diff --git a/include/pingcap/kv/Rpc.h b/include/pingcap/kv/Rpc.h index 994dfb5c..b0f98078 100644 --- a/include/pingcap/kv/Rpc.h +++ b/include/pingcap/kv/Rpc.h @@ -79,10 +79,6 @@ struct RpcClient ConnArrayPtr getConnArray(const std::string & addr); ConnArrayPtr createConnArray(const std::string & addr); - - void removeConn(const std::string & addr); - - void removeConn(const std::string & addr, const ConnArrayPtr & expected); }; using RpcClientPtr = std::unique_ptr; diff --git a/src/kv/Rpc.cc b/src/kv/Rpc.cc index 73ca0de7..f419995c 100644 --- a/src/kv/Rpc.cc +++ b/src/kv/Rpc.cc @@ -151,20 +151,5 @@ ConnArrayPtr RpcClient::createConnArray(const std::string & addr) conns[addr] = conn_array; return conn_array; } - -void RpcClient::removeConn(const std::string & addr) -{ - std::lock_guard lock(mutex); - conns.erase(addr); -} - -void RpcClient::removeConn(const std::string & addr, const ConnArrayPtr & expected) -{ - std::lock_guard lock(mutex); - auto it = conns.find(addr); - if (it != conns.end() && it->second == expected) - conns.erase(it); -} - } // namespace kv } // namespace pingcap From f27b1e5f0c70a3a34eab1ec6eba21192617e1633 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Fri, 24 Apr 2026 17:41:17 +0800 Subject: [PATCH 18/20] delete invalid conns by pd info Signed-off-by: xzhangxian1008 --- include/pingcap/kv/Cluster.h | 4 +- include/pingcap/kv/Rpc.h | 18 ++++++--- src/kv/Rpc.cc | 74 +++++++++++++++++++++--------------- 3 files changed, 58 insertions(+), 38 deletions(-) diff --git a/include/pingcap/kv/Cluster.h b/include/pingcap/kv/Cluster.h index 6561f79f..69053423 100644 --- a/include/pingcap/kv/Cluster.h +++ b/include/pingcap/kv/Cluster.h @@ -34,7 +34,7 @@ struct Cluster Cluster() : pd_client(std::make_shared()) - , rpc_client(std::make_unique()) + , rpc_client(std::make_unique(pd_client, ClusterConfig{})) , thread_pool(std::make_unique(2)) , mpp_prober(std::make_unique(this)) { @@ -44,7 +44,7 @@ struct Cluster Cluster(const std::vector & pd_addrs, const ClusterConfig & config) : pd_client(std::make_shared(pd_addrs, config)) , region_cache(std::make_unique(pd_client, config)) - , rpc_client(std::make_unique(config)) + , rpc_client(std::make_unique(pd_client, config)) , oracle(std::make_unique(pd_client, std::chrono::milliseconds(oracle_update_interval))) , lock_resolver(std::make_unique(this)) , api_version(config.api_version) diff --git a/include/pingcap/kv/Rpc.h b/include/pingcap/kv/Rpc.h index b0f98078..a6a2453a 100644 --- a/include/pingcap/kv/Rpc.h +++ b/include/pingcap/kv/Rpc.h @@ -1,23 +1,24 @@ #pragma once -#include -#include -#include #include #include #include #include +#include +#include +#include #include #include +#include #include namespace pingcap { namespace kv { -constexpr auto rpc_conn_check_interval = std::chrono::minutes(5); -constexpr size_t rpc_conn_check_timeout = 2; +constexpr auto rpc_conn_check_interval = std::chrono::minutes(10); +constexpr auto rpc_conn_check_interval_jitter = std::chrono::minutes(5); struct ConnArray { @@ -40,6 +41,7 @@ using GRPCMetaData = std::multimap; struct RpcClient { ClusterConfig config; + pd::ClientPtr pd_client; std::mutex mutex; @@ -47,7 +49,6 @@ struct RpcClient Logger * log = &Logger::get("pingcap.RpcClient"); std::chrono::minutes scan_interval = rpc_conn_check_interval; - size_t detect_rpc_timeout = rpc_conn_check_timeout; std::atomic stopped = false; std::condition_variable scan_cv; std::vector invalid_conns; @@ -58,6 +59,11 @@ struct RpcClient : config(config_) {} + RpcClient(pd::ClientPtr pd_client_, const ClusterConfig & config_) + : config(config_) + , pd_client(std::move(pd_client_)) + {} + void update(const ClusterConfig & config_) { std::unique_lock lk(mutex); diff --git a/src/kv/Rpc.cc b/src/kv/Rpc.cc index f419995c..147b9d86 100644 --- a/src/kv/Rpc.cc +++ b/src/kv/Rpc.cc @@ -1,36 +1,37 @@ #include #include +#include +#include + namespace pingcap { namespace kv { namespace { -bool isConnValid(const std::shared_ptr & conn_client, size_t rpc_timeout) +std::unordered_set getStoreAddresses(const pd::ClientPtr & pd_client) { - auto state = conn_client->channel->GetState(false); - if (state == GRPC_CHANNEL_READY) - return true; - - auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(rpc_timeout); - return conn_client->channel->WaitForConnected(deadline); + std::unordered_set store_addrs; + const auto stores = pd_client->getAllStores(true); + store_addrs.reserve(stores.size()); + for (const auto & store : stores) + { + if (!store.address().empty()) + store_addrs.emplace(store.address()); + } + return store_addrs; } -bool isConnArrayValid(const ConnArrayPtr & conn_array, size_t rpc_timeout) +std::chrono::seconds getRandomScanInterval(std::chrono::minutes scan_interval) { - std::vector> conn_snapshot; - { - std::lock_guard lock(conn_array->mutex); - conn_snapshot = conn_array->vec; - } + const auto min_seconds = std::chrono::duration_cast(scan_interval); + const auto max_seconds = std::chrono::duration_cast( + scan_interval + rpc_conn_check_interval_jitter); - for (const auto & conn_client : conn_snapshot) - { - if (!isConnValid(conn_client, rpc_timeout)) - return false; - } - return true; + thread_local std::mt19937_64 generator(std::random_device{}()); + std::uniform_int_distribution distribution(min_seconds.count(), max_seconds.count()); + return std::chrono::seconds(distribution(generator)); } } // namespace @@ -59,7 +60,8 @@ void RpcClient::run() bool has_invalid_conns = false; { std::unique_lock lock(mutex); - scan_cv.wait_for(lock, scan_interval, [this] { + const auto wait_interval = getRandomScanInterval(scan_interval); + scan_cv.wait_for(lock, wait_interval, [this] { return stopped.load() || !invalid_conns.empty(); }); has_invalid_conns = !invalid_conns.empty(); @@ -94,21 +96,33 @@ void RpcClient::stop() void RpcClient::scanConns() { - std::vector> conn_snapshot; + std::vector conn_snapshot; { std::lock_guard lock(mutex); conn_snapshot.reserve(conns.size()); - for (const auto & [addr, conn_array] : conns) - conn_snapshot.emplace_back(addr, conn_array); + for (const auto & conn : conns) + conn_snapshot.emplace_back(conn.first); + } + + if (conn_snapshot.empty() || !pd_client || pd_client->isMock()) + return; + + const auto store_addrs = getStoreAddresses(pd_client); + std::vector conns_to_remove; + for (const auto & addr : conn_snapshot) + { + if (store_addrs.find(addr) == store_addrs.end()) + conns_to_remove.emplace_back(addr); } - for (const auto & [addr, conn_array] : conn_snapshot) + if (conns_to_remove.empty()) + return; + + std::lock_guard lock(mutex); + for (const auto & addr : conns_to_remove) { - if (!isConnArrayValid(conn_array, detect_rpc_timeout)) - { - std::lock_guard lock(mutex); + if (conns.find(addr) != conns.end()) invalid_conns.push_back(addr); - } } } @@ -127,8 +141,8 @@ void RpcClient::removeInvalidConns() for (const auto & addr : invalid_conns) { - log->information("delete unavailable addr: " + addr); - conns.erase(addr); + if (conns.erase(addr)) + log->information("delete invalid addr: " + addr); } invalid_conns.clear(); From 798b04a77344c259a64dd7f2c7612c36157305af Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Wed, 29 Apr 2026 10:54:21 +0800 Subject: [PATCH 19/20] tweaking Signed-off-by: xzhangxian1008 --- include/pingcap/kv/Rpc.h | 4 ++-- src/kv/Rpc.cc | 18 +++++------------- 2 files changed, 7 insertions(+), 15 deletions(-) diff --git a/include/pingcap/kv/Rpc.h b/include/pingcap/kv/Rpc.h index a6a2453a..5e0fabd5 100644 --- a/include/pingcap/kv/Rpc.h +++ b/include/pingcap/kv/Rpc.h @@ -78,7 +78,7 @@ struct RpcClient void scanConns(); - void markConnInvalid(const std::string & addr); + void removeConn(const std::string & addr); void removeInvalidConns(); @@ -92,7 +92,7 @@ using RpcClientPtr = std::unique_ptr; inline void dropConnIfNeeded(const RpcClientPtr & client, const std::string & addr, const ::grpc::Status & status) { if (status.error_code() == grpc::StatusCode::UNAVAILABLE) - client->markConnInvalid(addr); + client->removeConn(addr); } // RpcCall holds the request and response, and delegates RPC calls. diff --git a/src/kv/Rpc.cc b/src/kv/Rpc.cc index 147b9d86..6a409bd1 100644 --- a/src/kv/Rpc.cc +++ b/src/kv/Rpc.cc @@ -57,25 +57,17 @@ void RpcClient::run() { while (!stopped.load()) { - bool has_invalid_conns = false; { - std::unique_lock lock(mutex); const auto wait_interval = getRandomScanInterval(scan_interval); + std::unique_lock lock(mutex); scan_cv.wait_for(lock, wait_interval, [this] { - return stopped.load() || !invalid_conns.empty(); + return stopped.load(); }); - has_invalid_conns = !invalid_conns.empty(); } if (stopped.load()) return; - if (has_invalid_conns) - { - removeInvalidConns(); - continue; - } - try { scanConns(); @@ -126,11 +118,11 @@ void RpcClient::scanConns() } } -void RpcClient::markConnInvalid(const std::string & addr) +void RpcClient::removeConn(const std::string & addr) { std::lock_guard lock(mutex); - invalid_conns.push_back(addr); - scan_cv.notify_all(); + if (conns.erase(addr)) + log->information("delete invalid addr: " + addr); } void RpcClient::removeInvalidConns() From 637d6196a773992832d167410df7a8e06940ed2f Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Wed, 6 May 2026 14:28:09 +0800 Subject: [PATCH 20/20] address comment Signed-off-by: xzhangxian1008 --- src/kv/Rpc.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/kv/Rpc.cc b/src/kv/Rpc.cc index 6a409bd1..94c0e3a1 100644 --- a/src/kv/Rpc.cc +++ b/src/kv/Rpc.cc @@ -113,8 +113,7 @@ void RpcClient::scanConns() std::lock_guard lock(mutex); for (const auto & addr : conns_to_remove) { - if (conns.find(addr) != conns.end()) - invalid_conns.push_back(addr); + invalid_conns.push_back(addr); } }