diff --git a/be/src/cloud/cloud_backend_service.cpp b/be/src/cloud/cloud_backend_service.cpp index 403da0b76c6ee5..88f88811bb007a 100644 --- a/be/src/cloud/cloud_backend_service.cpp +++ b/be/src/cloud/cloud_backend_service.cpp @@ -224,7 +224,8 @@ void CloudBackendService::_warm_up_cache(TWarmUpCacheAsyncResponse& response, } std::string brpc_addr = get_host_port(host, request.brpc_port); std::shared_ptr brpc_stub = - _exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_addr); + _exec_env->brpc_internal_client_cache()->get_new_client_no_cache( + brpc_addr, "", "", "", request.host); if (!brpc_stub) { LOG(WARNING) << "warm_up_cache_async: failed to get brpc_stub for addr " << brpc_addr; return; diff --git a/be/src/cloud/cloud_warm_up_manager.cpp b/be/src/cloud/cloud_warm_up_manager.cpp index fc3a96f30f351c..b40c2ddfbc5a10 100644 --- a/be/src/cloud/cloud_warm_up_manager.cpp +++ b/be/src/cloud/cloud_warm_up_manager.cpp @@ -651,7 +651,7 @@ Status CloudWarmUpManager::_do_warm_up_rowset(RowsetMeta& rs_meta, Status st = Status::OK(); std::shared_ptr brpc_stub = ExecEnv::GetInstance()->brpc_internal_client_cache()->get_new_client_no_cache( - brpc_addr); + brpc_addr, "", "", "", replica.host); if (!brpc_stub) { st = Status::RpcError("Address {} is wrong", brpc_addr); continue; @@ -789,7 +789,7 @@ void CloudWarmUpManager::_recycle_cache(int64_t tablet_id, Status st = Status::OK(); std::shared_ptr brpc_stub = ExecEnv::GetInstance()->brpc_internal_client_cache()->get_new_client_no_cache( - brpc_addr); + brpc_addr, "", "", "", replica.host); if (!brpc_stub) { st = Status::RpcError("Address {} is wrong", brpc_addr); continue; diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index f1ff09559dd816..a1189260dd99be 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -849,6 +849,8 @@ DEFINE_String(default_rowset_type, "BETA"); DEFINE_Int64(brpc_max_body_size, "3147483648"); DEFINE_Int64(brpc_socket_max_unwritten_bytes, "-1"); DEFINE_mBool(brpc_usercode_in_pthread, "false"); +DEFINE_mBool(enable_brpc_get_client_handshake, "true"); // test it +DEFINE_mInt32(brpc_get_client_handshake_max_retries, "3"); // TODO(zxy): expect to be true in v1.3 // Whether to embed the ProtoBuf Request serialized string together with Tuple/Block data into diff --git a/be/src/common/config.h b/be/src/common/config.h index 6b4e393999fbc7..b1cfda38700e34 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -906,6 +906,10 @@ DECLARE_Int64(brpc_max_body_size); DECLARE_Int64(brpc_socket_max_unwritten_bytes); // Whether to set FLAGS_usercode_in_pthread to true in brpc DECLARE_mBool(brpc_usercode_in_pthread); +// Whether BrpcClientCache validates newly rebuilt clients with a handshake. +DECLARE_mBool(enable_brpc_get_client_handshake); +// Max attempts in get_client when handshake validation is enabled. +DECLARE_mInt32(brpc_get_client_handshake_max_retries); // TODO(zxy): expect to be true in v1.3 // Whether to embed the ProtoBuf Request serialized string together with Tuple/Block data into // Controller Attachment and send it through http brpc when the length of the Tuple/Block data diff --git a/be/src/exec/operator/exchange_sink_buffer.cpp b/be/src/exec/operator/exchange_sink_buffer.cpp index ec032aca17ed26..3f6c823b8c0789 100644 --- a/be/src/exec/operator/exchange_sink_buffer.cpp +++ b/be/src/exec/operator/exchange_sink_buffer.cpp @@ -322,7 +322,7 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) { } // attach task for memory tracker and query id when core SCOPED_ATTACH_TASK(_state); - _failed(ins->id, err); + _failed(ins->id, Status::RpcError(err)); }); send_callback->start_rpc_time = GetCurrentTimeNanos(); send_callback->addSuccessHandler([&, weak_task_ctx = weak_task_exec_ctx()]( @@ -345,8 +345,9 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) { if (s.is()) { _set_receiver_eof(ins); } else if (!s.ok()) { - _failed(ins.id, - fmt::format("exchange req success but status isn't ok: {}", s.to_string())); + _failed(ins.id, Status::Cancelled( + "exchange req success but status isn't ok: {}", + s.to_string())); return; } else if (eos) { _ended(ins); @@ -356,8 +357,9 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) { // `_send_rpc` must be the LAST operation in this function, because it may reuse the callback! s = _send_rpc(ins); if (!s) { - _failed(ins.id, - fmt::format("exchange req success but status isn't ok: {}", s.to_string())); + _failed(ins.id, Status::Cancelled( + "exchange req success but status isn't ok: {}", + s.to_string())); } }); { @@ -457,7 +459,7 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) { } // attach task for memory tracker and query id when core SCOPED_ATTACH_TASK(_state); - _failed(ins->id, err); + _failed(ins->id, Status::RpcError(err)); }); send_callback->start_rpc_time = GetCurrentTimeNanos(); send_callback->addSuccessHandler([&, weak_task_ctx = weak_task_exec_ctx()]( @@ -479,8 +481,9 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) { if (s.is()) { _set_receiver_eof(ins); } else if (!s.ok()) { - _failed(ins.id, - fmt::format("exchange req success but status isn't ok: {}", s.to_string())); + _failed(ins.id, Status::Cancelled( + "exchange req success but status isn't ok: {}", + s.to_string())); return; } else if (eos) { _ended(ins); @@ -490,8 +493,9 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) { // `_send_rpc` must be the LAST operation in this function, because it may reuse the callback! s = _send_rpc(ins); if (!s) { - _failed(ins.id, - fmt::format("exchange req success but status isn't ok: {}", s.to_string())); + _failed(ins.id, Status::Cancelled( + "exchange req success but status isn't ok: {}", + s.to_string())); } }); { @@ -522,11 +526,11 @@ void ExchangeSinkBuffer::_ended(RpcInstance& ins) { } } -void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) { +void ExchangeSinkBuffer::_failed(InstanceLoId id, Status err) { _is_failed = true; LOG(INFO) << "send rpc failed, instance id: " << id << ", _dest_node_id: " << _dest_node_id << ", node id: " << _node_id << ", err: " << err; - _context->cancel(Status::Cancelled(err)); + _context->cancel(std::move(err)); } void ExchangeSinkBuffer::_set_receiver_eof(RpcInstance& ins) { diff --git a/be/src/exec/operator/exchange_sink_buffer.h b/be/src/exec/operator/exchange_sink_buffer.h index 7478c63f4e8f12..fb95445cca72d4 100644 --- a/be/src/exec/operator/exchange_sink_buffer.h +++ b/be/src/exec/operator/exchange_sink_buffer.h @@ -310,13 +310,13 @@ class ExchangeSinkBuffer : public HasTaskExecutionCtx { #ifndef BE_TEST inline void _ended(RpcInstance& ins); - inline void _failed(InstanceLoId id, const std::string& err); + inline void _failed(InstanceLoId id, Status err); inline void _set_receiver_eof(RpcInstance& ins); inline void _turn_off_channel(RpcInstance& ins, std::unique_lock& with_lock); #else virtual void _ended(RpcInstance& ins); - virtual void _failed(InstanceLoId id, const std::string& err); + virtual void _failed(InstanceLoId id, Status err); virtual void _set_receiver_eof(RpcInstance& ins); virtual void _turn_off_channel(RpcInstance& ins, std::unique_lock& with_lock); #endif diff --git a/be/src/exec/sink/writer/vtablet_writer.cpp b/be/src/exec/sink/writer/vtablet_writer.cpp index c77a94c501585b..5752276356bb1b 100644 --- a/be/src/exec/sink/writer/vtablet_writer.cpp +++ b/be/src/exec/sink/writer/vtablet_writer.cpp @@ -1044,8 +1044,8 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) { //format an ipv6 address std::string brpc_url = get_brpc_http_url(host, _node_info.brpc_port); std::shared_ptr _brpc_http_stub = - _state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url, - "http"); + _state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache( + brpc_url, "http", "", "", _node_info.host); if (_brpc_http_stub == nullptr) { cancel(fmt::format("{}, failed to open brpc http client to {}", channel_info(), brpc_url)); diff --git a/be/src/io/cache/peer_file_cache_reader.cpp b/be/src/io/cache/peer_file_cache_reader.cpp index 4e6ce038177571..119cbbe5fa0b2d 100644 --- a/be/src/io/cache/peer_file_cache_reader.cpp +++ b/be/src/io/cache/peer_file_cache_reader.cpp @@ -106,7 +106,7 @@ Status PeerFileCacheReader::fetch_blocks(const std::vector& block Status st = Status::OK(); std::shared_ptr brpc_stub = ExecEnv::GetInstance()->brpc_internal_client_cache()->get_new_client_no_cache( - brpc_addr); + brpc_addr, "", "", "", _host); if (!brpc_stub) { peer_cache_reader_failed_counter << 1; LOG(WARNING) << "failed to get brpc stub " << brpc_addr; diff --git a/be/src/util/brpc_client_cache.h b/be/src/util/brpc_client_cache.h index e2e77908dabd57..c8e39420cb5447 100644 --- a/be/src/util/brpc_client_cache.h +++ b/be/src/util/brpc_client_cache.h @@ -30,6 +30,7 @@ #include #include +#include #include #include #include @@ -64,25 +65,57 @@ using StubMap = phmap::parallel_flat_hash_map< std::allocator>>, 8, std::mutex>; namespace doris { +inline bool is_brpc_network_fault(int errcode) { + switch (errcode) { + case EHOSTDOWN: + case ETIMEDOUT: + case ECONNREFUSED: + case ECONNRESET: + case EHOSTUNREACH: + case ENETUNREACH: + case ENOTCONN: + case EPIPE: + return true; + default: + return false; + } +} + +inline void on_brpc_network_fault(const std::shared_ptr& channel_st, + const std::string& hostname, brpc::Controller* cntl) { + Status error_st = Status::NetworkError( + "Failed to send brpc, error={}, error_text={}, client: {}, latency = {}", + berror(cntl->ErrorCode()), cntl->ErrorText(), BackendOptions::get_localhost(), + cntl->latency_us()); + LOG(WARNING) << error_st; + channel_st->update(error_st); + + if (!hostname.empty() && !is_valid_ip(hostname)) { + auto* env = ExecEnv::GetInstance(); + auto* dns_cache = (env != nullptr) ? env->dns_cache() : nullptr; + if (dns_cache != nullptr) { + dns_cache->invalidate(hostname); + } + } +} + class FailureDetectClosure : public ::google::protobuf::Closure { public: - FailureDetectClosure(std::shared_ptr& channel_st, + FailureDetectClosure(std::shared_ptr& channel_st, std::string hostname, ::google::protobuf::RpcController* controller, ::google::protobuf::Closure* done) - : _channel_st(channel_st), _controller(controller), _done(done) {} + : _channel_st(channel_st), + _hostname(std::move(hostname)), + _controller(controller), + _done(done) {} void Run() override { Defer defer {[&]() { delete this; }}; // All brpc related API will use brpc::Controller, so that it is safe // to do static cast here. auto* cntl = static_cast(_controller); - if (cntl->Failed() && cntl->ErrorCode() == EHOSTDOWN) { - Status error_st = Status::NetworkError( - "Failed to send brpc, error={}, error_text={}, client: {}, latency = {}", - berror(cntl->ErrorCode()), cntl->ErrorText(), BackendOptions::get_localhost(), - cntl->latency_us()); - LOG(WARNING) << error_st; - _channel_st->update(error_st); + if (cntl->Failed() && is_brpc_network_fault(cntl->ErrorCode())) { + on_brpc_network_fault(_channel_st, _hostname, cntl); } // Sometimes done == nullptr, for example hand_shake API. if (_done != nullptr) { @@ -94,6 +127,7 @@ class FailureDetectClosure : public ::google::protobuf::Closure { private: std::shared_ptr _channel_st; + std::string _hostname; ::google::protobuf::RpcController* _controller; ::google::protobuf::Closure* _done; }; @@ -107,6 +141,8 @@ class FailureDetectChannel : public ::brpc::Channel { FailureDetectChannel() : ::brpc::Channel() { _channel_st = std::make_shared(); // default OK } + void set_hostname(std::string hostname) { _hostname = std::move(hostname); } + void CallMethod(const google::protobuf::MethodDescriptor* method, google::protobuf::RpcController* controller, const google::protobuf::Message* request, google::protobuf::Message* response, @@ -115,19 +151,15 @@ class FailureDetectChannel : public ::brpc::Channel { if (done != nullptr) { // If done == nullptr, then it means the call is sync call, so that should not // gen a failure detect closure for it. Or it will core. - failure_detect_closure = new FailureDetectClosure(_channel_st, controller, done); + failure_detect_closure = + new FailureDetectClosure(_channel_st, _hostname, controller, done); } ::brpc::Channel::CallMethod(method, controller, request, response, failure_detect_closure); // Done == nullptr, it is a sync call, should also deal with the bad channel. if (done == nullptr) { auto* cntl = static_cast(controller); - if (cntl->Failed() && cntl->ErrorCode() == EHOSTDOWN) { - Status error_st = Status::NetworkError( - "Failed to send brpc, error={}, error_text={}, client: {}, latency = {}", - berror(cntl->ErrorCode()), cntl->ErrorText(), - BackendOptions::get_localhost(), cntl->latency_us()); - LOG(WARNING) << error_st; - _channel_st->update(error_st); + if (cntl->Failed() && is_brpc_network_fault(cntl->ErrorCode())) { + on_brpc_network_fault(_channel_st, _hostname, cntl); } } } @@ -136,6 +168,7 @@ class FailureDetectChannel : public ::brpc::Channel { private: std::shared_ptr _channel_st; + std::string _hostname; }; template @@ -165,66 +198,94 @@ class BrpcClientCache { } std::shared_ptr get_client(const std::string& host, int port) { - std::string realhost = host; - auto dns_cache = ExecEnv::GetInstance()->dns_cache(); - if (dns_cache == nullptr) { - LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve"; - } else if (!is_valid_ip(host)) { - Status status = dns_cache->get(host, &realhost); - if (!status.ok()) { - LOG(WARNING) << "failed to get ip from host:" << status.to_string(); - return nullptr; - } - } + return get_client(host, port, config::enable_brpc_get_client_handshake); + } - // Use original host:port as key (like Java's TNetworkAddress address) - // This allows us to detect IP changes when DNS resolution changes + std::shared_ptr get_client(const std::string& host, int port, bool enable_handshake) { + const int max_attempts = enable_handshake + ? std::max(1, config::brpc_get_client_handshake_max_retries) + : 1; + // Use original host:port as key (like Java's TNetworkAddress address). std::string host_port = fmt::format("{}:{}", host, port); + std::string real_host_port; + + for (int attempt = 1; attempt <= max_attempts; ++attempt) { + std::string realhost = host; + auto dns_cache = ExecEnv::GetInstance()->dns_cache(); + if (dns_cache == nullptr) { + LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve"; + } else if (!is_valid_ip(host)) { + Status status = dns_cache->get(host, &realhost); + if (!status.ok()) { + LOG(WARNING) << "failed to get ip from host:" << status.to_string(); + return nullptr; + } + } - std::shared_ptr stub_ptr; - bool need_remove = false; - - auto check_entry = [&](const auto& v) { - const StubEntry& entry = v.second; - // Check if cached IP matches current resolved IP - if (entry.real_ip != realhost) { - // IP changed (DNS resolution changed) - LOG(WARNING) << "Cached ip changed for " << host << ", before ip: " << entry.real_ip - << ", current ip: " << realhost; - need_remove = true; - } else if (!static_cast(entry.stub->channel()) - ->channel_status() - ->ok()) { - // Client is not in normal state, need to recreate - // At this point we cannot judge the progress of reconnecting the underlying channel. - // In the worst case, it may take two minutes. But we can't stand the connection refused - // for two minutes, so rebuild the channel directly. - need_remove = true; - } else { - // Cache hit: IP matches and client is healthy - stub_ptr = entry.stub; + std::shared_ptr stub_ptr; + bool need_remove = false; + + auto check_entry = [&](const auto& v) { + const StubEntry& entry = v.second; + // Check if cached IP matches current resolved IP. + if (entry.real_ip != realhost) { + // IP changed (DNS resolution changed). + LOG(WARNING) << "Cached ip changed for " << host + << ", before ip: " << entry.real_ip + << ", current ip: " << realhost; + need_remove = true; + } else if (!static_cast(entry.stub->channel()) + ->channel_status() + ->ok()) { + // Client is not in normal state, need to recreate. + // At this point we cannot judge the progress of reconnecting the underlying + // channel. In the worst case, it may take two minutes. But we can't stand + // connection refused for two minutes, so rebuild the channel directly. + need_remove = true; + } else { + // Cache hit: IP matches and client is healthy. + stub_ptr = entry.stub; + } + }; + + if (LIKELY(_stub_map.if_contains(host_port, check_entry))) { + if (stub_ptr != nullptr) { + return stub_ptr; + } + if (need_remove) { + _stub_map.erase(host_port); + } } - }; - if (LIKELY(_stub_map.if_contains(host_port, check_entry))) { - if (stub_ptr != nullptr) { - return stub_ptr; + // Create new stub using resolved IP for actual connection. + real_host_port = get_host_port(realhost, port); + auto stub = get_new_client_no_cache(real_host_port, "", "", "", host); + if (stub == nullptr) { + LOG(WARNING) << "failed to build brpc stub to " << real_host_port + << ", attempt=" << attempt << "/" << max_attempts; + if (enable_handshake) { + continue; + } + return nullptr; } - // IP changed or client unhealthy, need to remove old entry - if (need_remove) { - _stub_map.erase(host_port); + + if (enable_handshake && !available(stub, real_host_port)) { + LOG(WARNING) << "handshake failed to " << real_host_port + << ", attempt=" << attempt << "/" << max_attempts; + if (dns_cache != nullptr && !is_valid_ip(host)) { + dns_cache->invalidate(host); + } + continue; } - } - // Create new stub using resolved IP for actual connection - std::string real_host_port = get_host_port(realhost, port); - auto stub = get_new_client_no_cache(real_host_port); - if (stub != nullptr) { StubEntry entry {realhost, stub}; _stub_map.try_emplace_l( host_port, [&stub](const auto& v) { stub = v.second.stub; }, entry); + return stub; } - return stub; + LOG(WARNING) << "get_client gave up after " << max_attempts + << " handshake attempts to " << real_host_port; + return nullptr; } std::shared_ptr get_client(const std::string& host_port) { @@ -243,7 +304,8 @@ class BrpcClientCache { std::shared_ptr get_new_client_no_cache(const std::string& host_port, const std::string& protocol = "", const std::string& connection_type = "", - const std::string& connection_group = "") { + const std::string& connection_group = "", + const std::string& original_hostname = "") { brpc::ChannelOptions options; if (protocol != "") { options.protocol = protocol; @@ -267,6 +329,9 @@ class BrpcClientCache { options.max_retry = 10; std::unique_ptr channel(new FailureDetectChannel()); + if (!original_hostname.empty()) { + channel->set_hostname(original_hostname); + } int ret_code = 0; if (host_port.find("://") == std::string::npos) { ret_code = channel->Init(host_port.c_str(), &options); diff --git a/be/src/util/dns_cache.cpp b/be/src/util/dns_cache.cpp index 0ea794872c1a67..aa5eb0c953ccce 100644 --- a/be/src/util/dns_cache.cpp +++ b/be/src/util/dns_cache.cpp @@ -37,7 +37,7 @@ Status DNSCache::get(const std::string& hostname, std::string* ip) { { std::shared_lock lock(mutex); auto it = cache.find(hostname); - if (it != cache.end()) { + if (it != cache.end() && dirty.find(hostname) == dirty.end()) { *ip = it->second; return Status::OK(); } @@ -46,11 +46,26 @@ Status DNSCache::get(const std::string& hostname, std::string* ip) { RETURN_IF_ERROR(_update(hostname)); { std::shared_lock lock(mutex); - *ip = cache[hostname]; + auto it = cache.find(hostname); + if (it == cache.end()) { + return Status::InternalError("DNSCache update succeeded but hostname {} is missing", + hostname); + } + *ip = it->second; return Status::OK(); } } +void DNSCache::invalidate(const std::string& hostname) { + if (hostname.empty()) { + return; + } + std::unique_lock lock(mutex); + if (dirty.insert(hostname).second) { + LOG(INFO) << "DNSCache: mark hostname dirty, will re-resolve on next get: " << hostname; + } +} + // Resolve hostname to IP address, similar to Java's DNSCache.resolveHostname. // If resolution fails, falls back to cached IP if available. // Returns the resolved IP, or cached IP on failure, or empty string if no cache available. @@ -97,6 +112,7 @@ Status DNSCache::_update(const std::string& hostname) { cache[hostname] = real_ip; LOG(INFO) << "update hostname " << hostname << "'s ip to " << real_ip; } + dirty.erase(hostname); return Status::OK(); } diff --git a/be/src/util/dns_cache.h b/be/src/util/dns_cache.h index 51ffb6567ece68..9102713cb2eb5e 100644 --- a/be/src/util/dns_cache.h +++ b/be/src/util/dns_cache.h @@ -23,6 +23,7 @@ #include #include #include +#include #include "common/status.h" @@ -38,6 +39,10 @@ class DNSCache { // get ip by hostname Status get(const std::string& hostname, std::string* ip); + // Mark a hostname's cached IP as dirty. The next get(hostname) will + // re-resolve instead of returning the cached IP. + void invalidate(const std::string& hostname); + private: // Resolve hostname to IP address. // If resolution fails, falls back to cached IP if available. @@ -54,6 +59,8 @@ class DNSCache { private: // hostname -> ip std::unordered_map cache; + // hostnames marked dirty by invalidate(); next get() must re-resolve. + std::unordered_set dirty; mutable std::shared_mutex mutex; std::thread refresh_thread; bool stop_refresh = false; diff --git a/be/src/util/proto_util.h b/be/src/util/proto_util.h index aec345688a9396..3e89746a2006ac 100644 --- a/be/src/util/proto_util.h +++ b/be/src/util/proto_util.h @@ -87,7 +87,8 @@ Status transmit_block_httpv2(ExecEnv* exec_env, std::unique_ptr closure std::string brpc_url = get_brpc_http_url(brpc_dest_addr.hostname, brpc_dest_addr.port); std::shared_ptr brpc_http_stub = - exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url, "http"); + exec_env->brpc_internal_client_cache()->get_new_client_no_cache( + brpc_url, "http", "", "", brpc_dest_addr.hostname); if (brpc_http_stub == nullptr) { return Status::InternalError("failed to open brpc http client to {}", brpc_url); }