Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions include/pingcap/kv/Cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ struct Cluster

Cluster()
: pd_client(std::make_shared<pd::MockPDClient>())
, rpc_client(std::make_unique<RpcClient>())
, thread_pool(std::make_unique<pingcap::common::FixedThreadPool>(1))
, rpc_client(std::make_unique<RpcClient>(pd_client, ClusterConfig{}))
, thread_pool(std::make_unique<pingcap::common::FixedThreadPool>(2))
, mpp_prober(std::make_unique<common::MPPProber>(this))
{
startBackgroundTasks();
Expand All @@ -44,11 +44,11 @@ struct Cluster
Cluster(const std::vector<std::string> & pd_addrs, const ClusterConfig & config)
: pd_client(std::make_shared<pd::CodecClient>(pd_addrs, config))
, region_cache(std::make_unique<RegionCache>(pd_client, config))
, rpc_client(std::make_unique<RpcClient>(config))
, rpc_client(std::make_unique<RpcClient>(pd_client, config))
, oracle(std::make_unique<pd::Oracle>(pd_client, std::chrono::milliseconds(oracle_update_interval)))
, lock_resolver(std::make_unique<LockResolver>(this))
, api_version(config.api_version)
, thread_pool(std::make_unique<pingcap::common::FixedThreadPool>(2))
, thread_pool(std::make_unique<pingcap::common::FixedThreadPool>(3))
, mpp_prober(std::make_unique<common::MPPProber>(this))
{
startBackgroundTasks();
Expand All @@ -64,6 +64,7 @@ struct Cluster
// (e.g. background threads) that cluster object holds so as to exit elegantly.
~Cluster()
{
rpc_client->stop();
mpp_prober->stop();
if (region_cache)
region_cache->stop();
Expand Down
6 changes: 3 additions & 3 deletions include/pingcap/kv/RegionClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ struct RegionClient
}
std::string err_msg = rpc.errMsg(status, extra_msg);
log->warning(err_msg);
onSendFail(bo, Exception(err_msg, GRPCErrorCode), ctx);
onSendFail(bo, Exception(err_msg, GRPCErrorCode), ctx, status);
continue;
}
if (resp->has_region_error())
Expand Down Expand Up @@ -202,15 +202,15 @@ struct RegionClient
}
std::string err_msg = rpc.errMsg(status, extra_msg);
log->warning(err_msg);
onSendFail(bo, Exception(err_msg, GRPCErrorCode), ctx);
onSendFail(bo, Exception(err_msg, GRPCErrorCode), ctx, status);
}
}

protected:
void onRegionError(Backoffer & bo, RPCContextPtr rpc_ctx, const errorpb::Error & err) const;

// Normally, it happens when machine down or network partition between tidb and kv or process crash.
void onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx) const;
void onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx, const ::grpc::Status & status) const;
};

} // namespace kv
Expand Down
37 changes: 37 additions & 0 deletions include/pingcap/kv/Rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,21 @@
#include <pingcap/kv/RegionCache.h>
#include <pingcap/kv/internal/type_traits.h>

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <map>
#include <mutex>
#include <utility>
#include <vector>

namespace pingcap
{
namespace kv
{
constexpr auto rpc_conn_check_interval = std::chrono::minutes(10);
constexpr auto rpc_conn_check_interval_jitter = std::chrono::minutes(5);

struct ConnArray
{
std::mutex mutex;
Expand All @@ -33,31 +41,60 @@ using GRPCMetaData = std::multimap<std::string, std::string>;
struct RpcClient
{
ClusterConfig config;
pd::ClientPtr pd_client;

std::mutex mutex;

std::map<std::string, ConnArrayPtr> conns;

Logger * log = &Logger::get("pingcap.RpcClient");
std::chrono::minutes scan_interval = rpc_conn_check_interval;
std::atomic<bool> stopped = false;
std::condition_variable scan_cv;
std::vector<std::string> invalid_conns;

RpcClient() = default;

explicit RpcClient(const ClusterConfig & config_)
: config(config_)
{}

RpcClient(pd::ClientPtr pd_client_, const ClusterConfig & config_)
: config(config_)
, pd_client(std::move(pd_client_))
{}

void update(const ClusterConfig & config_)
{
std::unique_lock lk(mutex);
config = config_;
conns.clear();
invalid_conns.clear();
}

void run();

void stop();

void scanConns();

void removeConn(const std::string & addr);

void removeInvalidConns();

ConnArrayPtr getConnArray(const std::string & addr);

ConnArrayPtr createConnArray(const std::string & addr);
};

using RpcClientPtr = std::unique_ptr<RpcClient>;

inline void dropConnIfNeeded(const RpcClientPtr & client, const std::string & addr, const ::grpc::Status & status)
{
if (status.error_code() == grpc::StatusCode::UNAVAILABLE)
client->removeConn(addr);
}
Comment on lines +92 to +96
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Align dropConnIfNeeded() with the shared removal predicate.

shouldRemoveConnOnStatus() treats both UNAVAILABLE and CANCELLED as removable, but dropConnIfNeeded() only handles UNAVAILABLE. Since onSendFail() uses this helper, unary failures and stream setup failures with CANCELLED won’t invalidate the connection, while StreamReader::finish() will.

Proposed fix
 inline void dropConnIfNeeded(const RpcClientPtr & client, const std::string & addr, const ::grpc::Status & status)
 {
-    if (status.error_code() == grpc::StatusCode::UNAVAILABLE)
+    if (shouldRemoveConnOnStatus(status))
         client->markConnInvalid(addr);
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
inline void dropConnIfNeeded(const RpcClientPtr & client, const std::string & addr, const ::grpc::Status & status)
{
if (status.error_code() == grpc::StatusCode::UNAVAILABLE)
client->markConnInvalid(addr);
}
inline void dropConnIfNeeded(const RpcClientPtr & client, const std::string & addr, const ::grpc::Status & status)
{
if (shouldRemoveConnOnStatus(status))
client->markConnInvalid(addr);
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@include/pingcap/kv/Rpc.h` around lines 102 - 106, The helper dropConnIfNeeded
currently only treats grpc::StatusCode::UNAVAILABLE as removable while
shouldRemoveConnOnStatus also considers CANCELLED; update dropConnIfNeeded to
mark the connection invalid for both UNAVAILABLE and CANCELLED (i.e., check for
status.error_code() == UNAVAILABLE || status.error_code() == CANCELLED) so that
callers like onSendFail and stream setup failures behave consistently with
StreamReader::finish and shouldRemoveConnOnStatus, still calling
client->markConnInvalid(addr) when either code is seen.


// RpcCall holds the request and response, and delegates RPC calls.
template <typename T>
class RpcCall
Expand Down
3 changes: 3 additions & 0 deletions src/kv/Cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ void Cluster::startBackgroundTasks()
{
thread_pool->start();

thread_pool->enqueue([this] {
rpc_client->run();
});
thread_pool->enqueue([this] {
mpp_prober->run();
});
Expand Down
3 changes: 2 additions & 1 deletion src/kv/RegionClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ void RegionClient::onRegionError(Backoffer & bo, RPCContextPtr rpc_ctx, const er
cluster->region_cache->dropRegion(rpc_ctx->region);
}

void RegionClient::onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx) const
void RegionClient::onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx, const ::grpc::Status & status) const
{
dropConnIfNeeded(cluster->rpc_client, rpc_ctx->addr, status);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to call it after cluster->region_cache->onSendReqFail(rpc_ctx, e);

cluster->region_cache->onSendReqFail(rpc_ctx, e);
// Retry on send request failure when it's not canceled.
// When a store is not available, the leader of related region should be elected quickly.
Expand Down
119 changes: 118 additions & 1 deletion src/kv/Rpc.cc
Original file line number Diff line number Diff line change
@@ -1,9 +1,40 @@
#include <pingcap/Exception.h>
#include <pingcap/kv/Rpc.h>

#include <random>
#include <unordered_set>

namespace pingcap
{
namespace kv
{
namespace
{
std::unordered_set<std::string> getStoreAddresses(const pd::ClientPtr & pd_client)
{
std::unordered_set<std::string> store_addrs;
const auto stores = pd_client->getAllStores(true);
store_addrs.reserve(stores.size());
for (const auto & store : stores)
{
if (!store.address().empty())
store_addrs.emplace(store.address());
}
return store_addrs;
}

std::chrono::seconds getRandomScanInterval(std::chrono::minutes scan_interval)
{
const auto min_seconds = std::chrono::duration_cast<std::chrono::seconds>(scan_interval);
const auto max_seconds = std::chrono::duration_cast<std::chrono::seconds>(
scan_interval + rpc_conn_check_interval_jitter);

thread_local std::mt19937_64 generator(std::random_device{}());
std::uniform_int_distribution<std::chrono::seconds::rep> distribution(min_seconds.count(), max_seconds.count());
return std::chrono::seconds(distribution(generator));
}
} // namespace

ConnArray::ConnArray(size_t max_size, const std::string & addr, const ClusterConfig & config_)
: address(addr)
, index(0)
Expand All @@ -22,6 +53,93 @@ std::shared_ptr<KvConnClient> ConnArray::get()
return vec[index];
}

void RpcClient::run()
{
while (!stopped.load())
{
{
const auto wait_interval = getRandomScanInterval(scan_interval);
std::unique_lock lock(mutex);
scan_cv.wait_for(lock, wait_interval, [this] {
return stopped.load();
});
}

if (stopped.load())
return;

try
{
scanConns();
removeInvalidConns();
}
catch (...)
{
log->warning(getCurrentExceptionMsg("RpcClient scan conns failed: "));
}
}
}

void RpcClient::stop()
{
stopped.store(true);
scan_cv.notify_all();
}

void RpcClient::scanConns()
{
std::vector<std::string> conn_snapshot;
{
std::lock_guard<std::mutex> lock(mutex);
conn_snapshot.reserve(conns.size());
for (const auto & conn : conns)
conn_snapshot.emplace_back(conn.first);
}

if (conn_snapshot.empty() || !pd_client || pd_client->isMock())
return;

const auto store_addrs = getStoreAddresses(pd_client);
std::vector<std::string> conns_to_remove;
for (const auto & addr : conn_snapshot)
{
if (store_addrs.find(addr) == store_addrs.end())
conns_to_remove.emplace_back(addr);
}

if (conns_to_remove.empty())
return;

std::lock_guard<std::mutex> lock(mutex);
for (const auto & addr : conns_to_remove)
{
if (conns.find(addr) != conns.end())
invalid_conns.push_back(addr);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks to me that invalid_conns now is only used by the background threads, there is no need to add lock? And further more, I think we can just add the invalid addr into invalid_conns directly in L107

}
}

void RpcClient::removeConn(const std::string & addr)
{
std::lock_guard<std::mutex> lock(mutex);
if (conns.erase(addr))
log->information("delete invalid addr: " + addr);
}

void RpcClient::removeInvalidConns()
{
std::lock_guard<std::mutex> lock(mutex);
if (invalid_conns.empty())
return;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add lock after invalid_conns.empty() check?


for (const auto & addr : invalid_conns)
{
if (conns.erase(addr))
log->information("delete invalid addr: " + addr);
}

invalid_conns.clear();
Comment on lines +89 to +140
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don’t erase newly recreated connection pools from stale scan results.

scanConns() validates a snapshot, then queues only addr. If another thread recreates conns[addr] before removeInvalidConns(), the later address-only erase can delete the fresh healthy pool. Use the captured ConnArrayPtr as the expected value when removing scan-detected failures.

One possible fix using the existing conditional remover
 void RpcClient::scanConns()
 {
     std::vector<std::pair<std::string, ConnArrayPtr>> conn_snapshot;
@@
     for (const auto & [addr, conn_array] : conn_snapshot)
     {
         if (!isConnArrayValid(conn_array, detect_rpc_timeout))
         {
-            std::lock_guard<std::mutex> lock(mutex);
-            invalid_conns.push_back(addr);
+            removeConn(addr, conn_array);
         }
     }
 }

If the scan path still needs batched logging/removal, store (addr, expected_conn_array) in invalid_conns and compare the pointer before erasing.

🧰 Tools
🪛 Clang (14.0.6)

[error] 95-95: method 'scanConns' can be made static

(readability-convert-member-functions-to-static,-warnings-as-errors)


[error] 122-122: method 'removeInvalidConns' can be made static

(readability-convert-member-functions-to-static,-warnings-as-errors)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/kv/Rpc.cc` around lines 95 - 134, scanConns currently snapshots (addr,
ConnArrayPtr) but only pushes addr into invalid_conns, which can cause
removeInvalidConns to erase a newly recreated pool; change the invalid_conns
container to hold pairs of (std::string addr, ConnArrayPtr expected) and in
scanConns push the captured ConnArrayPtr, update markConnInvalid to push a pair
with the current conns[addr] (or nullptr if called externally), and modify
removeInvalidConns to check that conns[addr] exists and equals the expected
ConnArrayPtr before erasing (skip if different), leaving semantics of logging
and clearing the invalid_conns intact.

}

ConnArrayPtr RpcClient::getConnArray(const std::string & addr)
{
std::lock_guard<std::mutex> lock(mutex);
Expand All @@ -39,6 +157,5 @@ ConnArrayPtr RpcClient::createConnArray(const std::string & addr)
conns[addr] = conn_array;
return conn_array;
}

} // namespace kv
} // namespace pingcap
Loading