Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion be/src/cloud/cloud_backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,8 @@ void CloudBackendService::_warm_up_cache(TWarmUpCacheAsyncResponse& response,
}
std::string brpc_addr = get_host_port(host, request.brpc_port);
std::shared_ptr<PBackendService_Stub> brpc_stub =
_exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_addr);
_exec_env->brpc_internal_client_cache()->get_new_client_no_cache(
brpc_addr, "", "", "", request.host);
if (!brpc_stub) {
LOG(WARNING) << "warm_up_cache_async: failed to get brpc_stub for addr " << brpc_addr;
return;
Expand Down
4 changes: 2 additions & 2 deletions be/src/cloud/cloud_warm_up_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ Status CloudWarmUpManager::_do_warm_up_rowset(RowsetMeta& rs_meta,
Status st = Status::OK();
std::shared_ptr<PBackendService_Stub> brpc_stub =
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_new_client_no_cache(
brpc_addr);
brpc_addr, "", "", "", replica.host);
if (!brpc_stub) {
st = Status::RpcError("Address {} is wrong", brpc_addr);
continue;
Expand Down Expand Up @@ -789,7 +789,7 @@ void CloudWarmUpManager::_recycle_cache(int64_t tablet_id,
Status st = Status::OK();
std::shared_ptr<PBackendService_Stub> brpc_stub =
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_new_client_no_cache(
brpc_addr);
brpc_addr, "", "", "", replica.host);
if (!brpc_stub) {
st = Status::RpcError("Address {} is wrong", brpc_addr);
continue;
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,8 @@ DEFINE_String(default_rowset_type, "BETA");
DEFINE_Int64(brpc_max_body_size, "3147483648");
DEFINE_Int64(brpc_socket_max_unwritten_bytes, "-1");
DEFINE_mBool(brpc_usercode_in_pthread, "false");
DEFINE_mBool(enable_brpc_get_client_handshake, "true"); // test it
DEFINE_mInt32(brpc_get_client_handshake_max_retries, "3");

// TODO(zxy): expect to be true in v1.3
// Whether to embed the ProtoBuf Request serialized string together with Tuple/Block data into
Expand Down
4 changes: 4 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,10 @@ DECLARE_Int64(brpc_max_body_size);
DECLARE_Int64(brpc_socket_max_unwritten_bytes);
// Whether to set FLAGS_usercode_in_pthread to true in brpc
DECLARE_mBool(brpc_usercode_in_pthread);
// Whether BrpcClientCache validates newly rebuilt clients with a handshake.
DECLARE_mBool(enable_brpc_get_client_handshake);
// Max attempts in get_client when handshake validation is enabled.
DECLARE_mInt32(brpc_get_client_handshake_max_retries);
// TODO(zxy): expect to be true in v1.3
// Whether to embed the ProtoBuf Request serialized string together with Tuple/Block data into
// Controller Attachment and send it through http brpc when the length of the Tuple/Block data
Expand Down
28 changes: 16 additions & 12 deletions be/src/exec/operator/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) {
}
// attach task for memory tracker and query id when core
SCOPED_ATTACH_TASK(_state);
_failed(ins->id, err);
_failed(ins->id, Status::RpcError(err));
});
send_callback->start_rpc_time = GetCurrentTimeNanos();
send_callback->addSuccessHandler([&, weak_task_ctx = weak_task_exec_ctx()](
Expand All @@ -345,8 +345,9 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) {
if (s.is<ErrorCode::END_OF_FILE>()) {
_set_receiver_eof(ins);
} else if (!s.ok()) {
_failed(ins.id,
fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
_failed(ins.id, Status::Cancelled(
"exchange req success but status isn't ok: {}",
s.to_string()));
return;
} else if (eos) {
_ended(ins);
Expand All @@ -356,8 +357,9 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) {
// `_send_rpc` must be the LAST operation in this function, because it may reuse the callback!
s = _send_rpc(ins);
if (!s) {
_failed(ins.id,
fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
_failed(ins.id, Status::Cancelled(
"exchange req success but status isn't ok: {}",
s.to_string()));
}
});
{
Expand Down Expand Up @@ -457,7 +459,7 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) {
}
// attach task for memory tracker and query id when core
SCOPED_ATTACH_TASK(_state);
_failed(ins->id, err);
_failed(ins->id, Status::RpcError(err));
});
send_callback->start_rpc_time = GetCurrentTimeNanos();
send_callback->addSuccessHandler([&, weak_task_ctx = weak_task_exec_ctx()](
Expand All @@ -479,8 +481,9 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) {
if (s.is<ErrorCode::END_OF_FILE>()) {
_set_receiver_eof(ins);
} else if (!s.ok()) {
_failed(ins.id,
fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
_failed(ins.id, Status::Cancelled(
"exchange req success but status isn't ok: {}",
s.to_string()));
return;
} else if (eos) {
_ended(ins);
Expand All @@ -490,8 +493,9 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) {
// `_send_rpc` must be the LAST operation in this function, because it may reuse the callback!
s = _send_rpc(ins);
if (!s) {
_failed(ins.id,
fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
_failed(ins.id, Status::Cancelled(
"exchange req success but status isn't ok: {}",
s.to_string()));
}
});
{
Expand Down Expand Up @@ -522,11 +526,11 @@ void ExchangeSinkBuffer::_ended(RpcInstance& ins) {
}
}

void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) {
void ExchangeSinkBuffer::_failed(InstanceLoId id, Status err) {
_is_failed = true;
LOG(INFO) << "send rpc failed, instance id: " << id << ", _dest_node_id: " << _dest_node_id
<< ", node id: " << _node_id << ", err: " << err;
_context->cancel(Status::Cancelled(err));
_context->cancel(std::move(err));
}

void ExchangeSinkBuffer::_set_receiver_eof(RpcInstance& ins) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/operator/exchange_sink_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -310,13 +310,13 @@ class ExchangeSinkBuffer : public HasTaskExecutionCtx {

#ifndef BE_TEST
inline void _ended(RpcInstance& ins);
inline void _failed(InstanceLoId id, const std::string& err);
inline void _failed(InstanceLoId id, Status err);
inline void _set_receiver_eof(RpcInstance& ins);
inline void _turn_off_channel(RpcInstance& ins, std::unique_lock<std::mutex>& with_lock);

#else
virtual void _ended(RpcInstance& ins);
virtual void _failed(InstanceLoId id, const std::string& err);
virtual void _failed(InstanceLoId id, Status err);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This signature change breaks the BE test mock override. be/test/exec/exchange/exchange_sink_test.h still declares void _failed(InstanceLoId id, const std::string& err) override, so when tests are built with BE_TEST the override no longer matches ExchangeSinkBuffer::_failed(InstanceLoId, Status) and compilation will fail. Please update the mock (and any other BE_TEST overrides) to take Status, or avoid changing the virtual test-only signature.

virtual void _set_receiver_eof(RpcInstance& ins);
virtual void _turn_off_channel(RpcInstance& ins, std::unique_lock<std::mutex>& with_lock);
#endif
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/sink/writer/vtablet_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1044,8 +1044,8 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) {
//format an ipv6 address
std::string brpc_url = get_brpc_http_url(host, _node_info.brpc_port);
std::shared_ptr<PBackendService_Stub> _brpc_http_stub =
_state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url,
"http");
_state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache(
brpc_url, "http", "", "", _node_info.host);
if (_brpc_http_stub == nullptr) {
cancel(fmt::format("{}, failed to open brpc http client to {}", channel_info(),
brpc_url));
Expand Down
2 changes: 1 addition & 1 deletion be/src/io/cache/peer_file_cache_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ Status PeerFileCacheReader::fetch_blocks(const std::vector<FileBlockSPtr>& block
Status st = Status::OK();
std::shared_ptr<PBackendService_Stub> brpc_stub =
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_new_client_no_cache(
brpc_addr);
brpc_addr, "", "", "", _host);
if (!brpc_stub) {
peer_cache_reader_failed_counter << 1;
LOG(WARNING) << "failed to get brpc stub " << brpc_addr;
Expand Down
Loading
Loading