From 40477e820a516ef6eff284cbe49625105adbbe13 Mon Sep 17 00:00:00 2001 From: Yixuan Wang Date: Sun, 19 Apr 2026 21:51:27 +0800 Subject: [PATCH 1/3] [chore](http) Merge ms and recycler http skeleton (#61502) Problem Summary: Meta service and recycler maintained duplicated HTTP parsing, routing, and config handling logic, which made it harder to reuse handlers and keep versioned endpoints consistent. This change extracts the shared HTTP helper layer into `cloud/src/common/ http_helper.*`, introduces role-based handler registration for MetaService and Recycler, reuses common config show/update helpers, and adds versioned cluster HTTP test coverage. If you want to expand the interface version ``` {"add_cluster", {.handler = [](void* s, brpc::Controller* c) { return process_alter_cluster((MS*)s, c); }, .versioned_handlers = {{"v2", [](void* s, brpc::Controller* c) { return process_alter_cluster_v2((MS*)s, c); }}}, .role = HttpRole::META_SERVICE}}, ``` --- cloud/src/common/CMakeLists.txt | 1 + cloud/src/common/configbase.cpp | 57 + cloud/src/common/configbase.h | 5 + cloud/src/common/http_helper.cpp | 1184 ++++++++++++++++++ cloud/src/common/http_helper.h | 206 +++ cloud/src/meta-service/meta_service_http.cpp | 834 +----------- cloud/src/meta-service/meta_service_http.h | 18 + cloud/src/recycler/recycler.h | 4 + cloud/src/recycler/recycler_service.cpp | 218 +--- cloud/src/recycler/recycler_service.h | 30 +- cloud/test/meta_service_http_test.cpp | 169 ++- 11 files changed, 1718 insertions(+), 1008 deletions(-) create mode 100644 cloud/src/common/http_helper.cpp create mode 100644 cloud/src/common/http_helper.h diff --git a/cloud/src/common/CMakeLists.txt b/cloud/src/common/CMakeLists.txt index d3ba8914f9fb3e..c83e11a9eb6ab3 100644 --- a/cloud/src/common/CMakeLists.txt +++ b/cloud/src/common/CMakeLists.txt @@ -10,6 +10,7 @@ set(COMMON_FILES logging.cpp bvars.cpp encryption_util.cpp + http_helper.cpp metric.cpp kms.cpp network_util.cpp diff --git a/cloud/src/common/configbase.cpp b/cloud/src/common/configbase.cpp index ae32ffb5f4da4f..f5e02eac64bee6 100644 --- a/cloud/src/common/configbase.cpp +++ b/cloud/src/common/configbase.cpp @@ -16,6 +16,9 @@ // under the License. #include +#include +#include +#include #include #include @@ -462,4 +465,58 @@ std::pair set_config(std::unordered_mapfind(name); + std::string value = (it != full_conf_map->end()) ? it->second : ""; + + rapidjson::Value item(rapidjson::kArrayType); + item.PushBack(rapidjson::Value(name.data(), name.size(), allocator), allocator); + item.PushBack(rapidjson::Value(field.type, allocator), allocator); + item.PushBack(rapidjson::Value(value.data(), value.size(), allocator), allocator); + item.PushBack(field.valmutable, allocator); + doc.PushBack(item, allocator); + } + + rapidjson::StringBuffer sb; + rapidjson::Writer writer(sb); + doc.Accept(writer); + return sb.GetString(); +} + +std::pair update_config(const std::string& configs, bool persist, + const std::string& custom_conf_path) { + if (configs.empty()) { + return {false, "query param `configs` should not be empty"}; + } + + std::unordered_map conf_map; + std::istringstream ss(configs); + std::string conf; + while (std::getline(ss, conf, ',')) { + auto pos = conf.find('='); + if (pos == std::string::npos) { + return {false, fmt::format("config {} is invalid", conf)}; + } + std::string key = conf.substr(0, pos); + std::string val = conf.substr(pos + 1); + trim(key); + trim(val); + conf_map.emplace(std::move(key), std::move(val)); + } + + return set_config(std::move(conf_map), persist, custom_conf_path); +} + } // namespace doris::cloud::config diff --git a/cloud/src/common/configbase.h b/cloud/src/common/configbase.h index f1cc3716ba320c..b6eff9ae101c06 100644 --- a/cloud/src/common/configbase.h +++ b/cloud/src/common/configbase.h @@ -181,4 +181,9 @@ std::pair set_config(std::unordered_map update_config(const std::string& configs, bool persist, + const std::string& custom_conf_path); + } // namespace doris::cloud::config diff --git a/cloud/src/common/http_helper.cpp b/cloud/src/common/http_helper.cpp new file mode 100644 index 00000000000000..7f6dfd3d932360 --- /dev/null +++ b/cloud/src/common/http_helper.cpp @@ -0,0 +1,1184 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "http_helper.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "common/metric.h" +#include "cpp/s3_rate_limiter.h" +#include "meta-service/meta_service.h" +#include "meta-service/meta_service_helper.h" +#include "meta-service/meta_service_http.h" +#include "recycler/recycler.h" +#include "recycler/recycler_service.h" +namespace doris::cloud { + +template +static google::protobuf::util::Status parse_json_message(const std::string& unresolved_path, + const std::string& body, Message* req) { + static_assert(std::is_base_of_v); + auto st = google::protobuf::util::JsonStringToMessage(body, req); + if (!st.ok()) { + std::string msg = "failed to strictly parse http request for '" + unresolved_path + + "' error: " + st.ToString(); + LOG_WARNING(msg).tag("body", encryt_sk(hide_access_key(body))); + + // ignore unknown fields + google::protobuf::util::JsonParseOptions json_parse_options; + json_parse_options.ignore_unknown_fields = true; + return google::protobuf::util::JsonStringToMessage(body, req, json_parse_options); + } + return {}; +} + +#define PARSE_MESSAGE_OR_RETURN(ctrl, req) \ + do { \ + std::string body = ctrl->request_attachment().to_string(); \ + auto& unresolved_path = ctrl->http_request().unresolved_path(); \ + auto st = parse_json_message(unresolved_path, body, &req); \ + if (!st.ok()) { \ + std::string msg = "parse http request '" + unresolved_path + "': " + st.ToString(); \ + LOG_WARNING(msg).tag("body", encryt_sk(hide_access_key(body))); \ + return http_json_reply(MetaServiceCode::PROTOBUF_PARSE_ERR, msg); \ + } \ + } while (0) + +const std::unordered_map& get_http_handlers() { + using MS = MetaServiceImpl; + using RS = RecyclerServiceImpl; + + static const auto handlers = [] { + return std::unordered_map { + // MetaService APIs + {"add_cluster", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_cluster((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"drop_cluster", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_cluster((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"rename_cluster", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_cluster((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"update_cluster_endpoint", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_cluster((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"update_cluster_mysql_user_name", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_cluster((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"add_node", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_cluster((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"drop_node", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_cluster((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"decommission_node", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_cluster((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"set_cluster_status", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_cluster((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"notify_decommissioned", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_cluster((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"alter_vcluster_info", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_cluster((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + + {"create_instance", + {.handler = [](void* s, + brpc::Controller* c) { return process_create_instance((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"drop_instance", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_instance((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"rename_instance", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_instance((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"enable_instance_sse", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_instance((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"disable_instance_sse", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_instance((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"set_instance_status", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_instance((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + + {"add_obj_info", + {.handler = + [](void* s, brpc::Controller* c) { + return process_alter_obj_store_info((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"legacy_update_ak_sk", + {.handler = + [](void* s, brpc::Controller* c) { + return process_alter_obj_store_info((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"update_ak_sk", + {.handler = [](void* s, + brpc::Controller* c) { return process_update_ak_sk((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"show_storage_vaults", + {.handler = + [](void* s, brpc::Controller* c) { + return process_get_obj_store_info((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"add_hdfs_vault", + {.handler = + [](void* s, brpc::Controller* c) { + return process_alter_storage_vault((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"add_s3_vault", + {.handler = + [](void* s, brpc::Controller* c) { + return process_alter_storage_vault((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"alter_s3_vault", + {.handler = + [](void* s, brpc::Controller* c) { + return process_alter_storage_vault((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"drop_s3_vault", + {.handler = + [](void* s, brpc::Controller* c) { + return process_alter_storage_vault((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"drop_hdfs_vault", + {.handler = + [](void* s, brpc::Controller* c) { + return process_alter_storage_vault((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"alter_obj_info", + {.handler = + [](void* s, brpc::Controller* c) { + return process_alter_obj_store_info((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + + {"decode_key", + {.handler = [](void* s, + brpc::Controller* c) { return process_decode_key((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"encode_key", + {.handler = [](void* s, + brpc::Controller* c) { return process_encode_key((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"get_value", + {.handler = [](void* s, + brpc::Controller* c) { return process_get_value((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"set_value", + {.handler = [](void* s, + brpc::Controller* c) { return process_set_value((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"show_meta_ranges", + {.handler = + [](void* s, brpc::Controller* c) { + return process_show_meta_ranges((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"txn_lazy_commit", + {.handler = [](void* s, + brpc::Controller* c) { return process_txn_lazy_commit((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"injection_point", + {.handler = [](void* s, + brpc::Controller* c) { return process_injection_point((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"fix_tablet_stats", + {.handler = + [](void* s, brpc::Controller* c) { + return process_fix_tablet_stats((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"fix_tablet_db_id", + {.handler = + [](void* s, brpc::Controller* c) { + return process_fix_tablet_db_id((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + + {"get_instance", + {.handler = + [](void* s, brpc::Controller* c) { + return process_get_instance_info((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"get_obj_store_info", + {.handler = + [](void* s, brpc::Controller* c) { + return process_get_obj_store_info((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"get_cluster", + {.handler = [](void* s, + brpc::Controller* c) { return process_get_cluster((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"get_tablet_stats", + {.handler = + [](void* s, brpc::Controller* c) { + return process_get_tablet_stats((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"get_stage", + {.handler = [](void* s, + brpc::Controller* c) { return process_get_stage((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"get_cluster_status", + {.handler = + [](void* s, brpc::Controller* c) { + return process_get_cluster_status((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + + {"list_snapshot", + {.handler = [](void* s, + brpc::Controller* c) { return process_list_snapshot((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"drop_snapshot", + {.handler = [](void* s, + brpc::Controller* c) { return process_drop_snapshot((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"set_snapshot_property", + {.handler = + [](void* s, brpc::Controller* c) { + return process_set_snapshot_property((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"get_snapshot_property", + {.handler = + [](void* s, brpc::Controller* c) { + return process_get_snapshot_property((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"set_multi_version_status", + {.handler = + [](void* s, brpc::Controller* c) { + return process_set_multi_version_status((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"abort_txn", + {.handler = [](void* s, + brpc::Controller* c) { return process_abort_txn((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"abort_tablet_job", + {.handler = + [](void* s, brpc::Controller* c) { + return process_abort_tablet_job((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"alter_ram_user", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_ram_user((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"alter_iam", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_iam((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"adjust_rate_limit", + {.handler = + [](void* s, brpc::Controller* c) { + return process_adjust_rate_limit((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"list_rate_limit", + {.handler = + [](void* s, brpc::Controller* c) { + return process_query_rate_limit((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + + // Recycler APIs + {"recycle_instance", + {.handler = + [](void* s, brpc::Controller* c) { + return process_recycle_instance((RS*)s, c); + }, + .role = HttpRole::RECYCLER}}, + {"statistics_recycle", + {.handler = + [](void* s, brpc::Controller* c) { + return process_statistics_recycle((RS*)s, c); + }, + .role = HttpRole::RECYCLER}}, + {"recycle_copy_jobs", + {.handler = + [](void* s, brpc::Controller* c) { + return process_recycle_copy_jobs((RS*)s, c); + }, + .role = HttpRole::RECYCLER}}, + {"recycle_job_info", + {.handler = + [](void* s, brpc::Controller* c) { + return process_recycle_job_info((RS*)s, c); + }, + .role = HttpRole::RECYCLER}}, + {"check_instance", + {.handler = [](void* s, + brpc::Controller* c) { return process_check_instance((RS*)s, c); }, + .role = HttpRole::RECYCLER}}, + {"check_job_info", + {.handler = [](void* s, + brpc::Controller* c) { return process_check_job_info((RS*)s, c); }, + .role = HttpRole::RECYCLER}}, + {"check_meta", + {.handler = [](void* s, + brpc::Controller* c) { return process_check_meta((RS*)s, c); }, + .role = HttpRole::RECYCLER}}, + {"adjust_rate_limiter", + {.handler = + [](void* s, brpc::Controller* c) { + return process_adjust_rate_limiter((RS*)s, c); + }, + .role = HttpRole::RECYCLER}}, + + // Shared APIs + {"show_config", + {.handler = [](void* s, + brpc::Controller* c) { return process_show_config((MS*)s, c); }, + .role = HttpRole::BOTH}}, + {"update_config", + {.handler = [](void* s, + brpc::Controller* c) { return process_update_config((MS*)s, c); }, + .role = HttpRole::BOTH}}, + }; + }(); + + return handlers; +} + +HttpResponse process_alter_cluster(MetaServiceImpl* service, brpc::Controller* ctrl) { + static std::unordered_map operations { + {"add_cluster", AlterClusterRequest::ADD_CLUSTER}, + {"drop_cluster", AlterClusterRequest::DROP_CLUSTER}, + {"rename_cluster", AlterClusterRequest::RENAME_CLUSTER}, + {"update_cluster_endpoint", AlterClusterRequest::UPDATE_CLUSTER_ENDPOINT}, + {"update_cluster_mysql_user_name", AlterClusterRequest::UPDATE_CLUSTER_MYSQL_USER_NAME}, + {"add_node", AlterClusterRequest::ADD_NODE}, + {"drop_node", AlterClusterRequest::DROP_NODE}, + {"decommission_node", AlterClusterRequest::DECOMMISSION_NODE}, + {"set_cluster_status", AlterClusterRequest::SET_CLUSTER_STATUS}, + {"notify_decommissioned", AlterClusterRequest::NOTIFY_DECOMMISSIONED}, + {"alter_vcluster_info", AlterClusterRequest::ALTER_VCLUSTER_INFO}, + }; + + auto& path = ctrl->http_request().unresolved_path(); + auto body = ctrl->request_attachment().to_string(); + auto it = operations.find(http_api_route(path)); + if (it == operations.end()) { + std::string msg = "not supportted alter cluster operation: " + path; + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); + } + + AlterClusterRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + + req.set_op(it->second); + AlterClusterResponse resp; + service->alter_cluster(ctrl, &req, &resp, nullptr); + return http_json_reply(resp.status()); +} + +HttpResponse process_get_obj_store_info(MetaServiceImpl* service, brpc::Controller* ctrl) { + GetObjStoreInfoRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + + GetObjStoreInfoResponse resp; + service->get_obj_store_info(ctrl, &req, &resp, nullptr); + return http_json_reply_message(resp.status(), resp); +} + +HttpResponse process_alter_obj_store_info(MetaServiceImpl* service, brpc::Controller* ctrl) { + static std::unordered_map operations { + {"add_obj_info", AlterObjStoreInfoRequest::ADD_OBJ_INFO}, + {"legacy_update_ak_sk", AlterObjStoreInfoRequest::LEGACY_UPDATE_AK_SK}, + {"alter_obj_info", AlterObjStoreInfoRequest::ALTER_OBJ_INFO}}; + + auto& path = ctrl->http_request().unresolved_path(); + auto it = operations.find(http_api_route(path)); + if (it == operations.end()) { + std::string msg = "not supportted alter obj store info operation: " + path; + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); + } + + AlterObjStoreInfoRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + req.set_op(it->second); + + AlterObjStoreInfoResponse resp; + service->alter_obj_store_info(ctrl, &req, &resp, nullptr); + return http_json_reply(resp.status()); +} + +HttpResponse process_alter_storage_vault(MetaServiceImpl* service, brpc::Controller* ctrl) { + static std::unordered_map operations { + {"drop_s3_vault", AlterObjStoreInfoRequest::DROP_S3_VAULT}, + {"add_s3_vault", AlterObjStoreInfoRequest::ADD_S3_VAULT}, + {"alter_s3_vault", AlterObjStoreInfoRequest::ALTER_S3_VAULT}, + {"drop_hdfs_vault", AlterObjStoreInfoRequest::DROP_HDFS_INFO}, + {"add_hdfs_vault", AlterObjStoreInfoRequest::ADD_HDFS_INFO}}; + + auto& path = ctrl->http_request().unresolved_path(); + auto it = operations.find(http_api_route(path)); + if (it == operations.end()) { + std::string msg = "not supportted alter storage vault operation: " + path; + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); + } + + AlterObjStoreInfoRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + req.set_op(it->second); + + AlterObjStoreInfoResponse resp; + service->alter_storage_vault(ctrl, &req, &resp, nullptr); + return http_json_reply(resp.status()); +} + +HttpResponse process_update_ak_sk(MetaServiceImpl* service, brpc::Controller* ctrl) { + UpdateAkSkRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + UpdateAkSkResponse resp; + service->update_ak_sk(ctrl, &req, &resp, nullptr); + return http_json_reply(resp.status()); +} + +HttpResponse process_create_instance(MetaServiceImpl* service, brpc::Controller* ctrl) { + CreateInstanceRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + CreateInstanceResponse resp; + service->create_instance(ctrl, &req, &resp, nullptr); + return http_json_reply(resp.status()); +} + +HttpResponse process_alter_instance(MetaServiceImpl* service, brpc::Controller* ctrl) { + static std::unordered_map> + operations {{"rename_instance", {AlterInstanceRequest::RENAME}}, + {"enable_instance_sse", {AlterInstanceRequest::ENABLE_SSE}}, + {"disable_instance_sse", {AlterInstanceRequest::DISABLE_SSE}}, + {"drop_instance", {AlterInstanceRequest::DROP}}, + {"set_instance_status", + {AlterInstanceRequest::SET_NORMAL, AlterInstanceRequest::SET_OVERDUE}}}; + + auto& path = ctrl->http_request().unresolved_path(); + auto it = operations.find(http_api_route(path)); + if (it == operations.end()) { + std::string msg = "not supportted alter instance operation: '" + path + + "', route=" + std::string(http_api_route(path)); + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); + } + + AlterInstanceRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + // for unresolved path whose corresponding operation is signal, we need set operation by ourselves. + if ((it->second).size() == 1) { + req.set_op((it->second)[0]); + } + AlterInstanceResponse resp; + service->alter_instance(ctrl, &req, &resp, nullptr); + return http_json_reply(resp.status()); +} + +HttpResponse process_abort_txn(MetaServiceImpl* service, brpc::Controller* ctrl) { + AbortTxnRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + AbortTxnResponse resp; + service->abort_txn(ctrl, &req, &resp, nullptr); + return http_json_reply(resp.status()); +} + +HttpResponse process_abort_tablet_job(MetaServiceImpl* service, brpc::Controller* ctrl) { + FinishTabletJobRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + req.set_action(FinishTabletJobRequest::ABORT); + FinishTabletJobResponse resp; + service->finish_tablet_job(ctrl, &req, &resp, nullptr); + return http_json_reply(resp.status()); +} + +HttpResponse process_alter_ram_user(MetaServiceImpl* service, brpc::Controller* ctrl) { + AlterRamUserRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + AlterRamUserResponse resp; + service->alter_ram_user(ctrl, &req, &resp, nullptr); + return http_json_reply(resp.status()); +} + +HttpResponse process_alter_iam(MetaServiceImpl* service, brpc::Controller* ctrl) { + AlterIamRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + AlterIamResponse resp; + service->alter_iam(ctrl, &req, &resp, nullptr); + return http_json_reply(resp.status()); +} + +HttpResponse process_adjust_rate_limit(MetaServiceImpl* service, brpc::Controller* cntl) { + const auto& uri = cntl->http_request().uri(); + auto qps_limit_str = std::string {http_query(uri, "qps_limit")}; + auto rpc_name = std::string {http_query(uri, "rpc_name")}; + auto instance_id = std::string {http_query(uri, "instance_id")}; + + auto process_set_qps_limit = [&](std::function cb) -> HttpResponse { + DCHECK(!qps_limit_str.empty()); + int64_t qps_limit = -1; + try { + qps_limit = std::stoll(qps_limit_str); + } catch (const std::exception& ex) { + return http_json_reply( + MetaServiceCode::INVALID_ARGUMENT, + fmt::format("param `qps_limit` is not a legal int64 type:{}", ex.what())); + } + if (qps_limit < 0) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + "`qps_limit` should not be less than 0"); + } + if (cb(qps_limit)) { + return http_json_reply(MetaServiceCode::OK, "sucess to adjust rate limit"); + } + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + fmt::format("failed to adjust rate limit for qps_limit={}, " + "rpc_name={}, instance_id={}, plz ensure correct " + "rpc/instance name", + qps_limit_str, rpc_name, instance_id)); + }; + + auto set_global_qps_limit = [process_set_qps_limit, service]() { + return process_set_qps_limit([service](int64_t qps_limit) { + return service->rate_limiter()->set_rate_limit(qps_limit); + }); + }; + + auto set_rpc_qps_limit = [&]() { + return process_set_qps_limit([&](int64_t qps_limit) { + return service->rate_limiter()->set_rate_limit(qps_limit, rpc_name); + }); + }; + + auto set_instance_qps_limit = [&]() { + return process_set_qps_limit([&](int64_t qps_limit) { + return service->rate_limiter()->set_instance_rate_limit(qps_limit, instance_id); + }); + }; + + auto set_instance_rpc_qps_limit = [&]() { + return process_set_qps_limit([&](int64_t qps_limit) { + return service->rate_limiter()->set_rate_limit(qps_limit, rpc_name, instance_id); + }); + }; + + auto process_invalid_arguments = [&]() -> HttpResponse { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + fmt::format("invalid argument: qps_limit(required)={}, " + "rpc_name(optional)={}, instance_id(optional)={}", + qps_limit_str, rpc_name, instance_id)); + }; + + // We have 3 optional params and 2^3 combination, and 4 of them are illegal. + // We register callbacks for them in porcessors accordings to the level, represented by 3 bits. + std::array, 8> processors; + std::fill_n(processors.begin(), 8, std::move(process_invalid_arguments)); + processors[0b001] = std::move(set_global_qps_limit); + processors[0b011] = std::move(set_rpc_qps_limit); + processors[0b101] = std::move(set_instance_qps_limit); + processors[0b111] = std::move(set_instance_rpc_qps_limit); + + uint8_t level = (0x01 & !qps_limit_str.empty()) | ((0x01 & !rpc_name.empty()) << 1) | + ((0x01 & !instance_id.empty()) << 2); + + DCHECK_LT(level, 8); + + return processors[level](); +} + +HttpResponse process_query_rate_limit(MetaServiceImpl* service, brpc::Controller* cntl) { + auto rate_limiter = service->rate_limiter(); + rapidjson::Document d; + d.SetObject(); + auto get_qps_limit = [&d](std::string_view rpc_name, + std::shared_ptr rpc_limiter) { + rapidjson::Document node; + node.SetObject(); + rapidjson::Document sub; + sub.SetObject(); + auto get_qps_token_limit = [&](std::string_view instance_id, + std::shared_ptr qps_token) { + sub.AddMember(rapidjson::StringRef(instance_id.data(), instance_id.size()), + qps_token->max_qps_limit(), d.GetAllocator()); + }; + rpc_limiter->for_each_qps_token(std::move(get_qps_token_limit)); + + node.AddMember("RPC qps limit", rpc_limiter->max_qps_limit(), d.GetAllocator()); + node.AddMember("instance specific qps limit", sub, d.GetAllocator()); + d.AddMember(rapidjson::StringRef(rpc_name.data(), rpc_name.size()), node, d.GetAllocator()); + }; + rate_limiter->for_each_rpc_limiter(std::move(get_qps_limit)); + + rapidjson::StringBuffer sb; + rapidjson::PrettyWriter writer(sb); + d.Accept(writer); + return http_json_reply(MetaServiceCode::OK, "", sb.GetString()); +} + +// Recycler HTTP handlers +HttpResponse process_recycle_instance(RecyclerServiceImpl* service, brpc::Controller* cntl) { + std::string request_body = cntl->request_attachment().to_string(); + RecycleInstanceRequest req; + auto st = google::protobuf::util::JsonStringToMessage(request_body, &req); + if (!st.ok()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + "failed to parse RecycleInstanceRequest"); + } + RecycleInstanceResponse res; + service->recycle_instance(cntl, &req, &res, nullptr); + return http_text_reply(res.status(), res.status().msg()); +} + +HttpResponse process_statistics_recycle(RecyclerServiceImpl* service, brpc::Controller* cntl) { + std::string request_body = cntl->request_attachment().to_string(); + StatisticsRecycleRequest req; + auto st = google::protobuf::util::JsonStringToMessage(request_body, &req); + if (!st.ok()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + "failed to parse StatisticsRecycleRequest"); + } + MetaServiceCode code = MetaServiceCode::OK; + std::string msg = "OK"; + service->statistics_recycle(req, code, msg); + return http_text_reply(code, msg, msg); +} + +HttpResponse process_recycle_copy_jobs(RecyclerServiceImpl* service, brpc::Controller* cntl) { + const auto* instance_id = cntl->http_request().uri().GetQuery("instance_id"); + if (!instance_id || instance_id->empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "no instance id"); + } + MetaServiceCode code = MetaServiceCode::OK; + std::string msg = "OK"; + recycle_copy_jobs(service->txn_kv(), *instance_id, code, msg, + service->recycler()->thread_pool_group(), service->txn_lazy_committer()); + return http_text_reply(code, msg, msg); +} + +HttpResponse process_recycle_job_info(RecyclerServiceImpl* service, brpc::Controller* cntl) { + const auto* instance_id = cntl->http_request().uri().GetQuery("instance_id"); + if (!instance_id || instance_id->empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "no instance id"); + } + MetaServiceCode code = MetaServiceCode::OK; + std::string msg, key; + job_recycle_key({*instance_id}, &key); + recycle_job_info(service->txn_kv(), *instance_id, key, code, msg); + return http_text_reply(code, msg, msg); +} + +HttpResponse process_check_instance(RecyclerServiceImpl* service, brpc::Controller* cntl) { + const auto* instance_id = cntl->http_request().uri().GetQuery("instance_id"); + if (!instance_id || instance_id->empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "no instance id"); + } + if (!service->checker()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "checker not enabled"); + } + MetaServiceCode code = MetaServiceCode::OK; + std::string msg = "OK"; + service->check_instance(*instance_id, code, msg); + return http_text_reply(code, msg, msg); +} + +HttpResponse process_check_job_info(RecyclerServiceImpl* service, brpc::Controller* cntl) { + const auto* instance_id = cntl->http_request().uri().GetQuery("instance_id"); + if (!instance_id || instance_id->empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "no instance id"); + } + MetaServiceCode code = MetaServiceCode::OK; + std::string msg, key; + job_check_key({*instance_id}, &key); + recycle_job_info(service->txn_kv(), *instance_id, key, code, msg); + return http_text_reply(code, msg, msg); +} + +HttpResponse process_check_meta(RecyclerServiceImpl* service, brpc::Controller* cntl) { + const auto& uri = cntl->http_request().uri(); + const auto* instance_id = uri.GetQuery("instance_id"); + const auto* host = uri.GetQuery("host"); + const auto* port = uri.GetQuery("port"); + const auto* user = uri.GetQuery("user"); + const auto* password = uri.GetQuery("password"); + if (!instance_id || instance_id->empty() || !host || host->empty() || !port || port->empty() || + !password || !user || user->empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "missing required parameters"); + } + std::string msg = "OK"; + check_meta(service->txn_kv(), *instance_id, *host, *port, *user, *password, msg); + return http_text_reply(MetaServiceCode::OK, msg, msg); +} + +HttpResponse process_adjust_rate_limiter(RecyclerServiceImpl*, brpc::Controller* cntl) { + const auto& uri = cntl->http_request().uri(); + const auto* type_string = uri.GetQuery("type"); + const auto* speed = uri.GetQuery("speed"); + const auto* burst = uri.GetQuery("burst"); + const auto* limit = uri.GetQuery("limit"); + if (!type_string || type_string->empty() || !speed || !burst || !limit || + (*type_string != "get" && *type_string != "put")) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "invalid arguments"); + } + auto max_speed = speed->empty() ? 0 : std::stoul(*speed); + auto max_burst = burst->empty() ? 0 : std::stoul(*burst); + auto max_limit = limit->empty() ? 0 : std::stoul(*limit); + if (reset_s3_rate_limiter(string_to_s3_rate_limit_type(*type_string), max_speed, max_burst, + max_limit) != 0) { + return http_json_reply(MetaServiceCode::UNDEFINED_ERR, "adjust failed"); + } + return http_json_reply(MetaServiceCode::OK, ""); +} + +HttpResponse process_show_config(MetaServiceImpl*, brpc::Controller* cntl) { + auto& uri = cntl->http_request().uri(); + std::string_view conf_name = http_query(uri, "conf_key"); + + if (config::full_conf_map == nullptr) { + return http_json_reply(MetaServiceCode::UNDEFINED_ERR, "config map not initialized"); + } + + std::string result = config::show_config(std::string(conf_name)); + return http_json_reply(MetaServiceCode::OK, "", result); +} + +HttpResponse process_update_config(MetaServiceImpl* service, brpc::Controller* cntl) { + const auto& uri = cntl->http_request().uri(); + bool persist = (http_query(uri, "persist") == "true"); + auto configs = std::string {http_query(uri, "configs")}; + auto reason = std::string {http_query(uri, "reason")}; + LOG(INFO) << "modify configs for reason=" << reason << ", configs=" << configs + << ", persist=" << http_query(uri, "persist"); + + if (auto [succ, cause] = config::update_config(configs, persist, config::custom_conf_path); + !succ) { + LOG(WARNING) << cause; + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, cause); + } + return http_json_reply(MetaServiceCode::OK, ""); +} + +HttpResponse process_decode_key(MetaServiceImpl*, brpc::Controller* ctrl) { + auto& uri = ctrl->http_request().uri(); + std::string_view key = http_query(uri, "key"); + if (key.empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "no key to decode"); + } + + bool unicode = http_query(uri, "unicode") != "false"; + std::string body = prettify_key(key, unicode); + if (body.empty()) { + std::string msg = "failed to decode key, key=" + std::string(key); + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); + } + return http_text_reply(MetaServiceCode::OK, "", body); +} + +HttpResponse process_encode_key(MetaServiceImpl*, brpc::Controller* ctrl) { + return process_http_encode_key(ctrl->http_request().uri()); +} + +HttpResponse process_get_value(MetaServiceImpl* service, brpc::Controller* ctrl) { + return process_http_get_value(service->txn_kv().get(), ctrl->http_request().uri()); +} + +HttpResponse process_set_value(MetaServiceImpl* service, brpc::Controller* ctrl) { + return process_http_set_value(service->txn_kv().get(), ctrl); +} + +// show all key ranges and their count. +HttpResponse process_show_meta_ranges(MetaServiceImpl* service, brpc::Controller* ctrl) { + auto txn_kv = std::dynamic_pointer_cast(service->txn_kv()); + if (!txn_kv) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + "this method only support fdb txn kv"); + } + + std::vector partition_boundaries; + TxnErrorCode code = txn_kv->get_partition_boundaries(&partition_boundaries); + if (code != TxnErrorCode::TXN_OK) { + auto msg = fmt::format("failed to get boundaries, code={}", code); + return http_json_reply(MetaServiceCode::UNDEFINED_ERR, msg); + } + std::unordered_map partition_count; + get_kv_range_boundaries_count(partition_boundaries, partition_count); + + // sort ranges by count + std::vector> meta_ranges; + meta_ranges.reserve(partition_count.size()); + for (auto&& [key, count] : partition_count) { + meta_ranges.emplace_back(key, count); + } + + std::sort(meta_ranges.begin(), meta_ranges.end(), + [](const auto& lhs, const auto& rhs) { return lhs.second > rhs.second; }); + + std::string body = fmt::format("total meta ranges: {}\n", partition_boundaries.size()); + for (auto&& [key, count] : meta_ranges) { + body += fmt::format("{}: {}\n", key, count); + } + return http_text_reply(MetaServiceCode::OK, "", body); +} + +HttpResponse process_get_instance_info(MetaServiceImpl* service, brpc::Controller* ctrl) { + auto& uri = ctrl->http_request().uri(); + std::string_view instance_id = http_query(uri, "instance_id"); + std::string_view cloud_unique_id = http_query(uri, "cloud_unique_id"); + + InstanceInfoPB instance; + auto [code, msg] = service->get_instance_info(std::string(instance_id), + std::string(cloud_unique_id), &instance); + return http_json_reply_message(code, msg, instance); +} + +HttpResponse process_get_cluster(MetaServiceImpl* service, brpc::Controller* ctrl) { + GetClusterRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + + bool get_all_cluster_info = false; + // if cluster_id、cluster_name、mysql_user_name all empty, get this instance's all cluster info. + if (req.cluster_id().empty() && req.cluster_name().empty() && req.mysql_user_name().empty()) { + get_all_cluster_info = true; + } + + GetClusterResponse resp; + service->get_cluster(ctrl, &req, &resp, nullptr); + + if (resp.status().code() == MetaServiceCode::OK) { + if (get_all_cluster_info) { + return http_json_reply_message(resp.status(), resp); + } else { + // ATTN: only returns the first cluster pb. + return http_json_reply_message(resp.status(), resp.cluster(0)); + } + } else { + return http_json_reply(resp.status()); + } +} + +HttpResponse process_get_tablet_stats(MetaServiceImpl* service, brpc::Controller* ctrl) { + GetTabletStatsRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + GetTabletStatsResponse resp; + service->get_tablet_stats(ctrl, &req, &resp, nullptr); + + std::string body; + if (resp.status().code() == MetaServiceCode::OK) { + body = resp.DebugString(); + } + return http_text_reply(resp.status(), body); +} + +HttpResponse process_fix_tablet_stats(MetaServiceImpl* service, brpc::Controller* ctrl) { + auto& uri = ctrl->http_request().uri(); + std::string_view cloud_unique_id = http_query(uri, "cloud_unique_id"); + std::string_view table_id = http_query(uri, "table_id"); + std::string_view tablet_id = http_query(uri, "tablet_id"); + + MetaServiceResponseStatus st = service->fix_tablet_stats( + std::string(cloud_unique_id), std::string(table_id), std::string(tablet_id)); + return http_text_reply(st, st.DebugString()); +} + +HttpResponse process_fix_tablet_db_id(MetaServiceImpl* service, brpc::Controller* ctrl) { + auto& uri = ctrl->http_request().uri(); + std::string instance_id(http_query(uri, "instance_id")); + std::string tablet_id_str(http_query(uri, "tablet_id")); + std::string db_id_str(http_query(uri, "db_id")); + + int64_t tablet_id = 0, db_id = 0; + try { + db_id = std::stol(db_id_str); + } catch (const std::exception& e) { + auto msg = fmt::format("db_id {} must be a number, meet error={}", db_id_str, e.what()); + LOG(WARNING) << msg; + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); + } + + try { + tablet_id = std::stol(tablet_id_str); + } catch (const std::exception& e) { + auto msg = fmt::format("tablet_id {} must be a number, meet error={}", tablet_id_str, + e.what()); + LOG(WARNING) << msg; + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); + } + + auto [code, msg] = service->fix_tablet_db_id(instance_id, tablet_id, db_id); + return http_text_reply(code, msg, ""); +} + +HttpResponse process_get_stage(MetaServiceImpl* service, brpc::Controller* ctrl) { + GetStageRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + GetStageResponse resp; + service->get_stage(ctrl, &req, &resp, nullptr); + return http_json_reply_message(resp.status(), resp); +} + +HttpResponse process_get_cluster_status(MetaServiceImpl* service, brpc::Controller* ctrl) { + GetClusterStatusRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + GetClusterStatusResponse resp; + service->get_cluster_status(ctrl, &req, &resp, nullptr); + return http_json_reply_message(resp.status(), resp); +} + +HttpResponse process_txn_lazy_commit(MetaServiceImpl* service, brpc::Controller* ctrl) { + auto& uri = ctrl->http_request().uri(); + std::string instance_id(http_query(uri, "instance_id")); + std::string txn_id_str(http_query(uri, "txn_id")); + if (instance_id.empty() || txn_id_str.empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "instance_id or txn_id is empty"); + } + + int64_t txn_id = 0; + try { + txn_id = std::stol(txn_id_str); + } catch (const std::exception& e) { + auto msg = fmt::format("txn_id {} must be a number, meet error={}", txn_id_str, e.what()); + LOG(WARNING) << msg; + return http_json_reply(MetaServiceCode::UNDEFINED_ERR, msg); + } + + DCHECK_GT(txn_id, 0); + + auto txn_lazy_committer = service->txn_lazy_committer(); + if (!txn_lazy_committer) { + return http_json_reply(MetaServiceCode::UNDEFINED_ERR, "txn lazy committer is nullptr"); + } + + std::shared_ptr task = txn_lazy_committer->submit(instance_id, txn_id); + auto [code, msg] = task->wait(); + return http_json_reply(code, msg); +} + +HttpResponse process_list_snapshot(MetaServiceImpl* service, brpc::Controller* ctrl) { + ListSnapshotRequest req; + auto& uri = ctrl->http_request().uri(); + std::string instance_id(http_query(uri, "instance_id")); + if (instance_id.empty()) { + PARSE_MESSAGE_OR_RETURN(ctrl, req); + } else { + req.set_instance_id(instance_id); + } + + ListSnapshotResponse resp; + service->list_snapshot(ctrl, &req, &resp, nullptr); + return http_json_reply_message(resp.status(), resp); +} + +HttpResponse process_drop_snapshot(MetaServiceImpl* service, brpc::Controller* ctrl) { + auto& uri = ctrl->http_request().uri(); + std::string instance_id(http_query(uri, "instance_id")); + std::string snapshot_id(http_query(uri, "snapshot_id")); + if (instance_id.empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "instance_id is empty"); + } + if (snapshot_id.empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "snapshot_id is empty"); + } + DropSnapshotRequest req; + req.set_snapshot_id(snapshot_id); + DropSnapshotResponse resp; + service->snapshot_manager()->drop_snapshot(instance_id, req, &resp); + return http_json_reply(resp.status()); +} + +HttpResponse process_set_snapshot_property(MetaServiceImpl* service, brpc::Controller* ctrl) { + AlterInstanceRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + auto* properties = req.mutable_properties(); + if (properties->contains("status")) { + std::string status = properties->at("status"); + if (status != "ENABLED" && status != "DISABLED") { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + "Invalid value for status property: " + status + + ", expected 'ENABLED' or 'DISABLED' (case insensitive)"); + } + std::string_view is_enable = (status == "ENABLED") ? "true" : "false"; + const std::string& property_name = + AlterInstanceRequest::SnapshotProperty_Name(AlterInstanceRequest::ENABLE_SNAPSHOT); + (*properties)[property_name] = is_enable; + properties->erase("status"); + } + if (properties->contains("max_reserved_snapshots")) { + const std::string& property_name = AlterInstanceRequest::SnapshotProperty_Name( + AlterInstanceRequest::MAX_RESERVED_SNAPSHOTS); + (*properties)[property_name] = properties->at("max_reserved_snapshots"); + properties->erase("max_reserved_snapshots"); + } + if (properties->contains("snapshot_interval_seconds")) { + const std::string& property_name = AlterInstanceRequest::SnapshotProperty_Name( + AlterInstanceRequest::SNAPSHOT_INTERVAL_SECONDS); + (*properties)[property_name] = properties->at("snapshot_interval_seconds"); + properties->erase("snapshot_interval_seconds"); + } + req.set_op(AlterInstanceRequest::SET_SNAPSHOT_PROPERTY); + AlterInstanceResponse resp; + service->alter_instance(ctrl, &req, &resp, nullptr); + return http_json_reply(resp.status()); +} + +HttpResponse process_set_multi_version_status(MetaServiceImpl* service, brpc::Controller* ctrl) { + auto& uri = ctrl->http_request().uri(); + std::string instance_id(http_query(uri, "instance_id")); + std::string cloud_unique_id(http_query(uri, "cloud_unique_id")); + std::string multi_version_status_str(http_query(uri, "multi_version_status")); + + // Prefer instance_id if provided, fallback to cloud_unique_id + if (instance_id.empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "empty instance id"); + } + + if (multi_version_status_str.empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + "multi_version_status is required"); + } + + // Parse multi_version_status from string to enum + MultiVersionStatus multi_version_status; + std::string multi_version_status_upper = multi_version_status_str; + std::ranges::transform(multi_version_status_upper, multi_version_status_upper.begin(), + ::toupper); + + if (multi_version_status_upper == "MULTI_VERSION_DISABLED") { + multi_version_status = MultiVersionStatus::MULTI_VERSION_DISABLED; + } else if (multi_version_status_upper == "MULTI_VERSION_WRITE_ONLY") { + multi_version_status = MultiVersionStatus::MULTI_VERSION_WRITE_ONLY; + } else if (multi_version_status_upper == "MULTI_VERSION_READ_WRITE") { + multi_version_status = MultiVersionStatus::MULTI_VERSION_READ_WRITE; + } else if (multi_version_status_upper == "MULTI_VERSION_ENABLED") { + multi_version_status = MultiVersionStatus::MULTI_VERSION_ENABLED; + } else { + return http_json_reply( + MetaServiceCode::INVALID_ARGUMENT, + "invalid multi_version_status value. Supported values: MULTI_VERSION_DISABLED, " + "MULTI_VERSION_WRITE_ONLY, MULTI_VERSION_READ_WRITE, MULTI_VERSION_ENABLED"); + } + // Call snapshot manager directly + auto [code, msg] = service->snapshot_manager()->set_multi_version_status(instance_id, + multi_version_status); + + return http_json_reply(code, msg); +} + +HttpResponse process_get_snapshot_property(MetaServiceImpl* service, brpc::Controller* ctrl) { + auto& uri = ctrl->http_request().uri(); + std::string_view instance_id = http_query(uri, "instance_id"); + std::string_view cloud_unique_id = http_query(uri, "cloud_unique_id"); + + if (instance_id.empty() && cloud_unique_id.empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + "empty instance_id and cloud_unique_id"); + } + + InstanceInfoPB instance; + auto [code, msg] = service->get_instance_info(std::string(instance_id), + std::string(cloud_unique_id), &instance); + if (code != MetaServiceCode::OK) { + return http_json_reply(code, msg); + } + + // Build snapshot properties response + rapidjson::Document doc; + doc.SetObject(); + + // Snapshot switch status + std::string_view switch_status; + switch (instance.snapshot_switch_status()) { + case SNAPSHOT_SWITCH_DISABLED: + switch_status = "UNSUPPORTED"; + break; + case SNAPSHOT_SWITCH_OFF: + switch_status = "DISABLED"; + break; + case SNAPSHOT_SWITCH_ON: + switch_status = "ENABLED"; + break; + default: + switch_status = "UNKNOWN"; + break; + } + doc.AddMember("status", rapidjson::StringRef(switch_status.data(), switch_status.size()), + doc.GetAllocator()); + + // Max reserved snapshots + if (instance.has_max_reserved_snapshot()) { + doc.AddMember("max_reserved_snapshots", instance.max_reserved_snapshot(), + doc.GetAllocator()); + } + + // Snapshot interval seconds + if (instance.has_snapshot_interval_seconds()) { + doc.AddMember("snapshot_interval_seconds", instance.snapshot_interval_seconds(), + doc.GetAllocator()); + } + + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + doc.Accept(writer); + + return http_json_reply(MetaServiceCode::OK, "", buffer.GetString()); +} + +} // namespace doris::cloud \ No newline at end of file diff --git a/cloud/src/common/http_helper.h b/cloud/src/common/http_helper.h new file mode 100644 index 00000000000000..79a460b49d1a14 --- /dev/null +++ b/cloud/src/common/http_helper.h @@ -0,0 +1,206 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "meta-service/meta_service_http.h" + +namespace doris::cloud { + +inline constexpr std::string_view kDefaultHttpApiVersion = "v1"; + +struct HttpApiPath { + std::string_view version; + std::string_view route; +}; + +[[maybe_unused]] static bool is_http_api_version(std::string_view segment) { + if (segment.size() < 2 || segment.front() != 'v') { + return false; + } + return std::ranges::all_of(segment.substr(1), [](char ch) { return ch >= '0' && ch <= '9'; }); +} + +[[maybe_unused]] static HttpApiPath split_http_api_path(std::string_view path) { + auto separator = path.find('/'); + if (separator == std::string_view::npos) { + return {.version = "", .route = path}; + } + // This helper only splits the version segment from the route. Whether a version is actually + // supported is determined by exact route registration in get_http_handlers(). + auto segment = path.substr(0, separator); + if (!is_http_api_version(segment)) { + return {.version = "", .route = path}; + } + return {.version = segment, .route = path.substr(separator + 1)}; +} + +[[maybe_unused]] static std::string_view http_api_route(std::string_view path) { + return split_http_api_path(path).route; +} + +[[maybe_unused]] static const HttpHandler* resolve_http_handler(const HttpHandlerInfo& handler_info, + std::string_view version) { + if (version.empty() || version == kDefaultHttpApiVersion) { + return &handler_info.handler; + } + auto it = handler_info.versioned_handlers.find(version); + return it == handler_info.versioned_handlers.end() ? nullptr : &it->second; +} + +const std::unordered_map& get_http_handlers(); + +// injection_point_http.cpp +[[maybe_unused]] HttpResponse process_injection_point(MetaServiceImpl* service, + brpc::Controller* ctrl); + +// MetaService Http handlers +[[maybe_unused]] HttpResponse process_alter_cluster(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_get_obj_store_info(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_alter_obj_store_info(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_alter_storage_vault(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_update_ak_sk(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_create_instance(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_alter_instance(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_abort_txn(MetaServiceImpl* service, brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_abort_tablet_job(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_alter_ram_user(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_alter_iam(MetaServiceImpl* service, brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_adjust_rate_limit(MetaServiceImpl* service, + brpc::Controller* cntl); + +[[maybe_unused]] HttpResponse process_query_rate_limit(MetaServiceImpl* service, + brpc::Controller* cntl); + +[[maybe_unused]] HttpResponse process_decode_key(MetaServiceImpl*, brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_encode_key(MetaServiceImpl*, brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_get_value(MetaServiceImpl* service, brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_set_value(MetaServiceImpl* service, brpc::Controller* ctrl); + +// show all key ranges and their count. +[[maybe_unused]] HttpResponse process_show_meta_ranges(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_get_instance_info(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_get_cluster(MetaServiceImpl* service, brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_get_tablet_stats(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_fix_tablet_stats(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_fix_tablet_db_id(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_get_stage(MetaServiceImpl* service, brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_get_cluster_status(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_txn_lazy_commit(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_list_snapshot(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_drop_snapshot(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_set_snapshot_property(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_set_multi_version_status(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_get_snapshot_property(MetaServiceImpl* service, + brpc::Controller* ctrl); + +// Recycler HTTP handlers +[[maybe_unused]] HttpResponse process_recycle_instance(RecyclerServiceImpl* service, + brpc::Controller* cntl); + +[[maybe_unused]] HttpResponse process_statistics_recycle(RecyclerServiceImpl* service, + brpc::Controller* cntl); + +[[maybe_unused]] HttpResponse process_recycle_copy_jobs(RecyclerServiceImpl* service, + brpc::Controller* cntl); + +[[maybe_unused]] HttpResponse process_recycle_job_info(RecyclerServiceImpl* service, + brpc::Controller* cntl); +[[maybe_unused]] HttpResponse process_check_instance(RecyclerServiceImpl* service, + brpc::Controller* cntl); + +[[maybe_unused]] HttpResponse process_check_job_info(RecyclerServiceImpl* service, + brpc::Controller* cntl); + +[[maybe_unused]] HttpResponse process_check_meta(RecyclerServiceImpl* service, + brpc::Controller* cntl); + +[[maybe_unused]] HttpResponse process_adjust_rate_limiter(RecyclerServiceImpl*, + brpc::Controller* cntl); + +// Both http handlers +[[maybe_unused]] HttpResponse process_show_config(MetaServiceImpl*, brpc::Controller* cntl); + +[[maybe_unused]] HttpResponse process_update_config(MetaServiceImpl* service, + brpc::Controller* cntl); + +} // namespace doris::cloud \ No newline at end of file diff --git a/cloud/src/meta-service/meta_service_http.cpp b/cloud/src/meta-service/meta_service_http.cpp index ff2c4ce080d1da..d28a5af6a8eb02 100644 --- a/cloud/src/meta-service/meta_service_http.cpp +++ b/cloud/src/meta-service/meta_service_http.cpp @@ -32,9 +32,7 @@ #include #include -#include #include -#include #include #include #include @@ -43,37 +41,15 @@ #include #include -#include "common/config.h" -#include "common/configbase.h" +#include "common/http_helper.h" #include "common/logging.h" -#include "common/string_util.h" #include "meta-service/meta_service_helper.h" #include "meta-store/txn_kv.h" -#include "meta-store/txn_kv_error.h" #include "meta_service.h" -#include "rate-limiter/rate_limiter.h" +#include "recycler/recycler_service.h" namespace doris::cloud { -#define PARSE_MESSAGE_OR_RETURN(ctrl, req) \ - do { \ - std::string body = ctrl->request_attachment().to_string(); \ - auto& unresolved_path = ctrl->http_request().unresolved_path(); \ - auto st = parse_json_message(unresolved_path, body, &req); \ - if (!st.ok()) { \ - std::string msg = "parse http request '" + unresolved_path + "': " + st.ToString(); \ - LOG_WARNING(msg).tag("body", encryt_sk(body)); \ - return http_json_reply(MetaServiceCode::PROTOBUF_PARSE_ERR, msg); \ - } \ - } while (0) - -extern std::string get_instance_id(const std::shared_ptr& rc_mgr, - const std::string& cloud_unique_id); - -extern int decrypt_instance_info(InstanceInfoPB& instance, const std::string& instance_id, - MetaServiceCode& code, std::string& msg, - std::shared_ptr& txn); - extern void get_kv_range_boundaries_count(std::vector& partition_boundaries, std::unordered_map& partition_count); @@ -130,15 +106,19 @@ std::tuple convert_ms_code_to_http_code(MetaServiceCode r HttpResponse http_json_reply(MetaServiceCode code, const std::string& msg, std::optional body) { auto [status_code, status_msg] = convert_ms_code_to_http_code(code); + std::string_view response_msg = + code == MetaServiceCode::OK && msg.empty() ? status_msg : std::string_view(msg); rapidjson::Document d; d.SetObject(); if (code == MetaServiceCode::OK) { d.AddMember("code", "OK", d.GetAllocator()); - d.AddMember("msg", "", d.GetAllocator()); + d.AddMember("msg", rapidjson::StringRef(response_msg.data(), response_msg.size()), + d.GetAllocator()); } else { d.AddMember("code", rapidjson::StringRef(status_msg.data(), status_msg.size()), d.GetAllocator()); - d.AddMember("msg", rapidjson::StringRef(msg.data(), msg.size()), d.GetAllocator()); + d.AddMember("msg", rapidjson::StringRef(response_msg.data(), response_msg.size()), + d.GetAllocator()); } rapidjson::Document result; @@ -158,7 +138,7 @@ HttpResponse http_json_reply(MetaServiceCode code, const std::string& msg, rapidjson::StringBuffer sb; rapidjson::PrettyWriter writer(sb); d.Accept(writer); - return {status_code, msg, sb.GetString()}; + return {.status_code = status_code, .msg = std::string(response_msg), .body = sb.GetString()}; } static std::string format_http_request(brpc::Controller* cntl) { @@ -182,790 +162,21 @@ static std::string format_http_request(brpc::Controller* cntl) { return ss.str(); } -static std::string_view remove_version_prefix(std::string_view path) { - if (path.size() > 3 && path.substr(0, 3) == "v1/") path.remove_prefix(3); - return path; -} - -HttpResponse process_injection_point(MetaServiceImpl* service, brpc::Controller* ctrl); - -static HttpResponse process_alter_cluster(MetaServiceImpl* service, brpc::Controller* ctrl) { - static std::unordered_map operations { - {"add_cluster", AlterClusterRequest::ADD_CLUSTER}, - {"drop_cluster", AlterClusterRequest::DROP_CLUSTER}, - {"rename_cluster", AlterClusterRequest::RENAME_CLUSTER}, - {"update_cluster_endpoint", AlterClusterRequest::UPDATE_CLUSTER_ENDPOINT}, - {"update_cluster_mysql_user_name", AlterClusterRequest::UPDATE_CLUSTER_MYSQL_USER_NAME}, - {"add_node", AlterClusterRequest::ADD_NODE}, - {"drop_node", AlterClusterRequest::DROP_NODE}, - {"decommission_node", AlterClusterRequest::DECOMMISSION_NODE}, - {"set_cluster_status", AlterClusterRequest::SET_CLUSTER_STATUS}, - {"notify_decommissioned", AlterClusterRequest::NOTIFY_DECOMMISSIONED}, - {"alter_vcluster_info", AlterClusterRequest::ALTER_VCLUSTER_INFO}, - }; - - auto& path = ctrl->http_request().unresolved_path(); - auto body = ctrl->request_attachment().to_string(); - auto it = operations.find(remove_version_prefix(path)); - if (it == operations.end()) { - std::string msg = "not supportted alter cluster operation: " + path; - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); - } - - AlterClusterRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - - req.set_op(it->second); - AlterClusterResponse resp; - service->alter_cluster(ctrl, &req, &resp, nullptr); - return http_json_reply(resp.status()); -} - -static HttpResponse process_get_obj_store_info(MetaServiceImpl* service, brpc::Controller* ctrl) { - GetObjStoreInfoRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - - GetObjStoreInfoResponse resp; - service->get_obj_store_info(ctrl, &req, &resp, nullptr); - return http_json_reply_message(resp.status(), resp); -} - -static HttpResponse process_alter_obj_store_info(MetaServiceImpl* service, brpc::Controller* ctrl) { - static std::unordered_map operations { - {"add_obj_info", AlterObjStoreInfoRequest::ADD_OBJ_INFO}, - {"legacy_update_ak_sk", AlterObjStoreInfoRequest::LEGACY_UPDATE_AK_SK}, - {"alter_obj_info", AlterObjStoreInfoRequest::ALTER_OBJ_INFO}}; - - auto& path = ctrl->http_request().unresolved_path(); - auto it = operations.find(remove_version_prefix(path)); - if (it == operations.end()) { - std::string msg = "not supportted alter obj store info operation: " + path; - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); - } - - AlterObjStoreInfoRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - req.set_op(it->second); - - AlterObjStoreInfoResponse resp; - service->alter_obj_store_info(ctrl, &req, &resp, nullptr); - return http_json_reply(resp.status()); -} - -static HttpResponse process_alter_storage_vault(MetaServiceImpl* service, brpc::Controller* ctrl) { - static std::unordered_map operations { - {"drop_s3_vault", AlterObjStoreInfoRequest::DROP_S3_VAULT}, - {"add_s3_vault", AlterObjStoreInfoRequest::ADD_S3_VAULT}, - {"alter_s3_vault", AlterObjStoreInfoRequest::ALTER_S3_VAULT}, - {"drop_hdfs_vault", AlterObjStoreInfoRequest::DROP_HDFS_INFO}, - {"add_hdfs_vault", AlterObjStoreInfoRequest::ADD_HDFS_INFO}}; - - auto& path = ctrl->http_request().unresolved_path(); - auto it = operations.find(remove_version_prefix(path)); - if (it == operations.end()) { - std::string msg = "not supportted alter storage vault operation: " + path; - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); - } - - AlterObjStoreInfoRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - req.set_op(it->second); - - AlterObjStoreInfoResponse resp; - service->alter_storage_vault(ctrl, &req, &resp, nullptr); - return http_json_reply(resp.status()); -} - -static HttpResponse process_update_ak_sk(MetaServiceImpl* service, brpc::Controller* ctrl) { - UpdateAkSkRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - UpdateAkSkResponse resp; - service->update_ak_sk(ctrl, &req, &resp, nullptr); - return http_json_reply(resp.status()); -} - -static HttpResponse process_create_instance(MetaServiceImpl* service, brpc::Controller* ctrl) { - CreateInstanceRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - CreateInstanceResponse resp; - service->create_instance(ctrl, &req, &resp, nullptr); - return http_json_reply(resp.status()); -} - -static HttpResponse process_alter_instance(MetaServiceImpl* service, brpc::Controller* ctrl) { - static std::unordered_map> - operations {{"rename_instance", {AlterInstanceRequest::RENAME}}, - {"enable_instance_sse", {AlterInstanceRequest::ENABLE_SSE}}, - {"disable_instance_sse", {AlterInstanceRequest::DISABLE_SSE}}, - {"drop_instance", {AlterInstanceRequest::DROP}}, - {"set_instance_status", - {AlterInstanceRequest::SET_NORMAL, AlterInstanceRequest::SET_OVERDUE}}}; - - auto& path = ctrl->http_request().unresolved_path(); - auto it = operations.find(remove_version_prefix(path)); - if (it == operations.end()) { - std::string msg = "not supportted alter instance operation: '" + path + - "', remove version prefix=" + std::string(remove_version_prefix(path)); - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); - } - - AlterInstanceRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - // for unresolved path whose corresponding operation is signal, we need set operation by ourselves. - if ((it->second).size() == 1) { - req.set_op((it->second)[0]); - } - AlterInstanceResponse resp; - service->alter_instance(ctrl, &req, &resp, nullptr); - return http_json_reply(resp.status()); -} - -static HttpResponse process_abort_txn(MetaServiceImpl* service, brpc::Controller* ctrl) { - AbortTxnRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - AbortTxnResponse resp; - service->abort_txn(ctrl, &req, &resp, nullptr); - return http_json_reply(resp.status()); -} - -static HttpResponse process_abort_tablet_job(MetaServiceImpl* service, brpc::Controller* ctrl) { - FinishTabletJobRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - req.set_action(FinishTabletJobRequest::ABORT); - FinishTabletJobResponse resp; - service->finish_tablet_job(ctrl, &req, &resp, nullptr); - return http_json_reply(resp.status()); -} - -static HttpResponse process_alter_ram_user(MetaServiceImpl* service, brpc::Controller* ctrl) { - AlterRamUserRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - AlterRamUserResponse resp; - service->alter_ram_user(ctrl, &req, &resp, nullptr); - return http_json_reply(resp.status()); -} - -static HttpResponse process_alter_iam(MetaServiceImpl* service, brpc::Controller* ctrl) { - AlterIamRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - AlterIamResponse resp; - service->alter_iam(ctrl, &req, &resp, nullptr); - return http_json_reply(resp.status()); -} - -static HttpResponse process_adjust_rate_limit(MetaServiceImpl* service, brpc::Controller* cntl) { - const auto& uri = cntl->http_request().uri(); - auto qps_limit_str = std::string {http_query(uri, "qps_limit")}; - auto rpc_name = std::string {http_query(uri, "rpc_name")}; - auto instance_id = std::string {http_query(uri, "instance_id")}; - - auto process_set_qps_limit = [&](std::function cb) -> HttpResponse { - DCHECK(!qps_limit_str.empty()); - int64_t qps_limit = -1; - try { - qps_limit = std::stoll(qps_limit_str); - } catch (const std::exception& ex) { - return http_json_reply( - MetaServiceCode::INVALID_ARGUMENT, - fmt::format("param `qps_limit` is not a legal int64 type:{}", ex.what())); - } - if (qps_limit < 0) { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, - "`qps_limit` should not be less than 0"); - } - if (cb(qps_limit)) { - return http_json_reply(MetaServiceCode::OK, "sucess to adjust rate limit"); - } - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, - fmt::format("failed to adjust rate limit for qps_limit={}, " - "rpc_name={}, instance_id={}, plz ensure correct " - "rpc/instance name", - qps_limit_str, rpc_name, instance_id)); - }; - - auto set_global_qps_limit = [process_set_qps_limit, service]() { - return process_set_qps_limit([service](int64_t qps_limit) { - return service->rate_limiter()->set_rate_limit(qps_limit); - }); - }; - - auto set_rpc_qps_limit = [&]() { - return process_set_qps_limit([&](int64_t qps_limit) { - return service->rate_limiter()->set_rate_limit(qps_limit, rpc_name); - }); - }; - - auto set_instance_qps_limit = [&]() { - return process_set_qps_limit([&](int64_t qps_limit) { - return service->rate_limiter()->set_instance_rate_limit(qps_limit, instance_id); - }); - }; - - auto set_instance_rpc_qps_limit = [&]() { - return process_set_qps_limit([&](int64_t qps_limit) { - return service->rate_limiter()->set_rate_limit(qps_limit, rpc_name, instance_id); - }); - }; - - auto process_invalid_arguments = [&]() -> HttpResponse { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, - fmt::format("invalid argument: qps_limit(required)={}, " - "rpc_name(optional)={}, instance_id(optional)={}", - qps_limit_str, rpc_name, instance_id)); - }; - - // We have 3 optional params and 2^3 combination, and 4 of them are illegal. - // We register callbacks for them in porcessors accordings to the level, represented by 3 bits. - std::array, 8> processors; - std::fill_n(processors.begin(), 8, std::move(process_invalid_arguments)); - processors[0b001] = std::move(set_global_qps_limit); - processors[0b011] = std::move(set_rpc_qps_limit); - processors[0b101] = std::move(set_instance_qps_limit); - processors[0b111] = std::move(set_instance_rpc_qps_limit); - - uint8_t level = (0x01 & !qps_limit_str.empty()) | ((0x01 & !rpc_name.empty()) << 1) | - ((0x01 & !instance_id.empty()) << 2); - - DCHECK_LT(level, 8); - - return processors[level](); -} - -static HttpResponse process_query_rate_limit(MetaServiceImpl* service, brpc::Controller* cntl) { - auto rate_limiter = service->rate_limiter(); - rapidjson::Document d; - d.SetObject(); - auto get_qps_limit = [&d](std::string_view rpc_name, - std::shared_ptr rpc_limiter) { - rapidjson::Document node; - node.SetObject(); - rapidjson::Document sub; - sub.SetObject(); - auto get_qps_token_limit = [&](std::string_view instance_id, - std::shared_ptr qps_token) { - sub.AddMember(rapidjson::StringRef(instance_id.data(), instance_id.size()), - qps_token->max_qps_limit(), d.GetAllocator()); - }; - rpc_limiter->for_each_qps_token(std::move(get_qps_token_limit)); - - node.AddMember("RPC qps limit", rpc_limiter->max_qps_limit(), d.GetAllocator()); - node.AddMember("instance specific qps limit", sub, d.GetAllocator()); - d.AddMember(rapidjson::StringRef(rpc_name.data(), rpc_name.size()), node, d.GetAllocator()); - }; - rate_limiter->for_each_rpc_limiter(std::move(get_qps_limit)); - - rapidjson::StringBuffer sb; - rapidjson::PrettyWriter writer(sb); - d.Accept(writer); - return http_json_reply(MetaServiceCode::OK, "", sb.GetString()); -} - -static HttpResponse process_update_config(MetaServiceImpl* service, brpc::Controller* cntl) { - const auto& uri = cntl->http_request().uri(); - bool persist = (http_query(uri, "persist") == "true"); - auto configs = std::string {http_query(uri, "configs")}; - auto reason = std::string {http_query(uri, "reason")}; - LOG(INFO) << "modify configs for reason=" << reason << ", configs=" << configs - << ", persist=" << http_query(uri, "persist"); - if (configs.empty()) [[unlikely]] { - LOG(WARNING) << "query param `config` should not be empty"; - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, - "query param `config` should not be empty"); - } - std::unordered_map conf_map; - auto conf_list = split(configs, ','); - for (const auto& conf : conf_list) { - auto conf_pair = split(conf, '='); - if (conf_pair.size() != 2) [[unlikely]] { - LOG(WARNING) << "failed to split config=[{}] from `k=v` pattern" << conf; - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, - fmt::format("config {} is invalid", configs)); - } - trim(conf_pair[0]); - trim(conf_pair[1]); - conf_map.emplace(std::move(conf_pair[0]), std::move(conf_pair[1])); - } - if (auto [succ, cause] = - config::set_config(std::move(conf_map), persist, config::custom_conf_path); - !succ) { - LOG(WARNING) << cause; - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, cause); - } - return http_json_reply(MetaServiceCode::OK, ""); -} - -static HttpResponse process_decode_key(MetaServiceImpl*, brpc::Controller* ctrl) { - auto& uri = ctrl->http_request().uri(); - std::string_view key = http_query(uri, "key"); - if (key.empty()) { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "no key to decode"); - } - - bool unicode = http_query(uri, "unicode") != "false"; - std::string body = prettify_key(key, unicode); - if (body.empty()) { - std::string msg = "failed to decode key, key=" + std::string(key); - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); - } - return http_text_reply(MetaServiceCode::OK, "", body); -} - -static HttpResponse process_encode_key(MetaServiceImpl*, brpc::Controller* ctrl) { - return process_http_encode_key(ctrl->http_request().uri()); -} - -static HttpResponse process_get_value(MetaServiceImpl* service, brpc::Controller* ctrl) { - return process_http_get_value(service->txn_kv().get(), ctrl->http_request().uri()); -} - -static HttpResponse process_set_value(MetaServiceImpl* service, brpc::Controller* ctrl) { - return process_http_set_value(service->txn_kv().get(), ctrl); -} - -// show all key ranges and their count. -static HttpResponse process_show_meta_ranges(MetaServiceImpl* service, brpc::Controller* ctrl) { - auto txn_kv = std::dynamic_pointer_cast(service->txn_kv()); - if (!txn_kv) { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, - "this method only support fdb txn kv"); - } - - std::vector partition_boundaries; - TxnErrorCode code = txn_kv->get_partition_boundaries(&partition_boundaries); - if (code != TxnErrorCode::TXN_OK) { - auto msg = fmt::format("failed to get boundaries, code={}", code); - return http_json_reply(MetaServiceCode::UNDEFINED_ERR, msg); - } - std::unordered_map partition_count; - get_kv_range_boundaries_count(partition_boundaries, partition_count); - - // sort ranges by count - std::vector> meta_ranges; - meta_ranges.reserve(partition_count.size()); - for (auto&& [key, count] : partition_count) { - meta_ranges.emplace_back(key, count); - } - - std::sort(meta_ranges.begin(), meta_ranges.end(), - [](const auto& lhs, const auto& rhs) { return lhs.second > rhs.second; }); - - std::string body = fmt::format("total meta ranges: {}\n", partition_boundaries.size()); - for (auto&& [key, count] : meta_ranges) { - body += fmt::format("{}: {}\n", key, count); - } - return http_text_reply(MetaServiceCode::OK, "", body); -} - -static HttpResponse process_get_instance_info(MetaServiceImpl* service, brpc::Controller* ctrl) { - auto& uri = ctrl->http_request().uri(); - std::string_view instance_id = http_query(uri, "instance_id"); - std::string_view cloud_unique_id = http_query(uri, "cloud_unique_id"); - - InstanceInfoPB instance; - auto [code, msg] = service->get_instance_info(std::string(instance_id), - std::string(cloud_unique_id), &instance); - return http_json_reply_message(code, msg, instance); -} - -static HttpResponse process_get_cluster(MetaServiceImpl* service, brpc::Controller* ctrl) { - GetClusterRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - - bool get_all_cluster_info = false; - // if cluster_id、cluster_name、mysql_user_name all empty, get this instance's all cluster info. - if (req.cluster_id().empty() && req.cluster_name().empty() && req.mysql_user_name().empty()) { - get_all_cluster_info = true; - } - - GetClusterResponse resp; - service->get_cluster(ctrl, &req, &resp, nullptr); - - if (resp.status().code() == MetaServiceCode::OK) { - if (get_all_cluster_info) { - return http_json_reply_message(resp.status(), resp); - } else { - // ATTN: only returns the first cluster pb. - return http_json_reply_message(resp.status(), resp.cluster(0)); - } - } else { - return http_json_reply(resp.status()); - } -} - -static HttpResponse process_get_tablet_stats(MetaServiceImpl* service, brpc::Controller* ctrl) { - GetTabletStatsRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - GetTabletStatsResponse resp; - service->get_tablet_stats(ctrl, &req, &resp, nullptr); - - std::string body; - if (resp.status().code() == MetaServiceCode::OK) { - body = resp.DebugString(); - } - return http_text_reply(resp.status(), body); -} - -static HttpResponse process_fix_tablet_stats(MetaServiceImpl* service, brpc::Controller* ctrl) { - auto& uri = ctrl->http_request().uri(); - std::string_view cloud_unique_id = http_query(uri, "cloud_unique_id"); - std::string_view table_id = http_query(uri, "table_id"); - std::string_view tablet_id = http_query(uri, "tablet_id"); - - MetaServiceResponseStatus st = service->fix_tablet_stats( - std::string(cloud_unique_id), std::string(table_id), std::string(tablet_id)); - return http_text_reply(st, st.DebugString()); -} - -static HttpResponse process_fix_tablet_db_id(MetaServiceImpl* service, brpc::Controller* ctrl) { - auto& uri = ctrl->http_request().uri(); - std::string instance_id(http_query(uri, "instance_id")); - std::string tablet_id_str(http_query(uri, "tablet_id")); - std::string db_id_str(http_query(uri, "db_id")); - - int64_t tablet_id = 0, db_id = 0; - try { - db_id = std::stol(db_id_str); - } catch (const std::exception& e) { - auto msg = fmt::format("db_id {} must be a number, meet error={}", db_id_str, e.what()); - LOG(WARNING) << msg; - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); - } - - try { - tablet_id = std::stol(tablet_id_str); - } catch (const std::exception& e) { - auto msg = fmt::format("tablet_id {} must be a number, meet error={}", tablet_id_str, - e.what()); - LOG(WARNING) << msg; - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); - } - - auto [code, msg] = service->fix_tablet_db_id(instance_id, tablet_id, db_id); - return http_text_reply(code, msg, ""); -} - -static HttpResponse process_get_stage(MetaServiceImpl* service, brpc::Controller* ctrl) { - GetStageRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - GetStageResponse resp; - service->get_stage(ctrl, &req, &resp, nullptr); - return http_json_reply_message(resp.status(), resp); -} - -static HttpResponse process_get_cluster_status(MetaServiceImpl* service, brpc::Controller* ctrl) { - GetClusterStatusRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - GetClusterStatusResponse resp; - service->get_cluster_status(ctrl, &req, &resp, nullptr); - return http_json_reply_message(resp.status(), resp); -} - -static HttpResponse process_txn_lazy_commit(MetaServiceImpl* service, brpc::Controller* ctrl) { - auto& uri = ctrl->http_request().uri(); - std::string instance_id(http_query(uri, "instance_id")); - std::string txn_id_str(http_query(uri, "txn_id")); - if (instance_id.empty() || txn_id_str.empty()) { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "instance_id or txn_id is empty"); - } - - int64_t txn_id = 0; - try { - txn_id = std::stol(txn_id_str); - } catch (const std::exception& e) { - auto msg = fmt::format("txn_id {} must be a number, meet error={}", txn_id_str, e.what()); - LOG(WARNING) << msg; - return http_json_reply(MetaServiceCode::UNDEFINED_ERR, msg); - } - - DCHECK_GT(txn_id, 0); - - auto txn_lazy_committer = service->txn_lazy_committer(); - if (!txn_lazy_committer) { - return http_json_reply(MetaServiceCode::UNDEFINED_ERR, "txn lazy committer is nullptr"); - } - - std::shared_ptr task = txn_lazy_committer->submit(instance_id, txn_id); - auto [code, msg] = task->wait(); - return http_json_reply(code, msg); -} - -static HttpResponse process_unknown(MetaServiceImpl*, brpc::Controller*) { - // ATTN: To be compatible with cloud manager versions higher than this MS - return http_json_reply(MetaServiceCode::OK, ""); -} - -static HttpResponse process_list_snapshot(MetaServiceImpl* service, brpc::Controller* ctrl) { - ListSnapshotRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - ListSnapshotResponse resp; - service->list_snapshot(ctrl, &req, &resp, nullptr); - return http_json_reply_message(resp.status(), resp); -} - -static HttpResponse process_set_snapshot_property(MetaServiceImpl* service, - brpc::Controller* ctrl) { - AlterInstanceRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - auto* properties = req.mutable_properties(); - if (properties->contains("status")) { - std::string status = properties->at("status"); - if (status != "ENABLED" && status != "DISABLED") { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, - "Invalid value for status property: " + status + - ", expected 'ENABLED' or 'DISABLED' (case insensitive)"); - } - std::string_view is_enable = (status == "ENABLED") ? "true" : "false"; - const std::string& property_name = - AlterInstanceRequest::SnapshotProperty_Name(AlterInstanceRequest::ENABLE_SNAPSHOT); - (*properties)[property_name] = is_enable; - properties->erase("status"); - } - if (properties->contains("max_reserved_snapshots")) { - const std::string& property_name = AlterInstanceRequest::SnapshotProperty_Name( - AlterInstanceRequest::MAX_RESERVED_SNAPSHOTS); - (*properties)[property_name] = properties->at("max_reserved_snapshots"); - properties->erase("max_reserved_snapshots"); - } - if (properties->contains("snapshot_interval_seconds")) { - const std::string& property_name = AlterInstanceRequest::SnapshotProperty_Name( - AlterInstanceRequest::SNAPSHOT_INTERVAL_SECONDS); - (*properties)[property_name] = properties->at("snapshot_interval_seconds"); - properties->erase("snapshot_interval_seconds"); - } - req.set_op(AlterInstanceRequest::SET_SNAPSHOT_PROPERTY); - AlterInstanceResponse resp; - service->alter_instance(ctrl, &req, &resp, nullptr); - return http_json_reply(resp.status()); -} - -static HttpResponse process_set_multi_version_status(MetaServiceImpl* service, - brpc::Controller* ctrl) { - auto& uri = ctrl->http_request().uri(); - std::string instance_id(http_query(uri, "instance_id")); - std::string cloud_unique_id(http_query(uri, "cloud_unique_id")); - std::string multi_version_status_str(http_query(uri, "multi_version_status")); - - // Prefer instance_id if provided, fallback to cloud_unique_id - if (instance_id.empty()) { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "empty instance id"); - } - - if (multi_version_status_str.empty()) { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, - "multi_version_status is required"); - } - - // Parse multi_version_status from string to enum - MultiVersionStatus multi_version_status; - std::string multi_version_status_upper = multi_version_status_str; - std::ranges::transform(multi_version_status_upper, multi_version_status_upper.begin(), - ::toupper); - - if (multi_version_status_upper == "MULTI_VERSION_DISABLED") { - multi_version_status = MultiVersionStatus::MULTI_VERSION_DISABLED; - } else if (multi_version_status_upper == "MULTI_VERSION_WRITE_ONLY") { - multi_version_status = MultiVersionStatus::MULTI_VERSION_WRITE_ONLY; - } else if (multi_version_status_upper == "MULTI_VERSION_READ_WRITE") { - multi_version_status = MultiVersionStatus::MULTI_VERSION_READ_WRITE; - } else if (multi_version_status_upper == "MULTI_VERSION_ENABLED") { - multi_version_status = MultiVersionStatus::MULTI_VERSION_ENABLED; - } else { - return http_json_reply( - MetaServiceCode::INVALID_ARGUMENT, - "invalid multi_version_status value. Supported values: MULTI_VERSION_DISABLED, " - "MULTI_VERSION_WRITE_ONLY, MULTI_VERSION_READ_WRITE, MULTI_VERSION_ENABLED"); - } - // Call snapshot manager directly - auto [code, msg] = service->snapshot_manager()->set_multi_version_status(instance_id, - multi_version_status); - - return http_json_reply(code, msg); -} - -static HttpResponse process_get_snapshot_property(MetaServiceImpl* service, - brpc::Controller* ctrl) { - auto& uri = ctrl->http_request().uri(); - std::string_view instance_id = http_query(uri, "instance_id"); - std::string_view cloud_unique_id = http_query(uri, "cloud_unique_id"); - - if (instance_id.empty() && cloud_unique_id.empty()) { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, - "empty instance_id and cloud_unique_id"); - } - - InstanceInfoPB instance; - auto [code, msg] = service->get_instance_info(std::string(instance_id), - std::string(cloud_unique_id), &instance); - if (code != MetaServiceCode::OK) { - return http_json_reply(code, msg); - } - - // Build snapshot properties response - rapidjson::Document doc; - doc.SetObject(); - - // Snapshot switch status - std::string_view switch_status; - switch (instance.snapshot_switch_status()) { - case SNAPSHOT_SWITCH_DISABLED: - switch_status = "UNSUPPORTED"; - break; - case SNAPSHOT_SWITCH_OFF: - switch_status = "DISABLED"; - break; - case SNAPSHOT_SWITCH_ON: - switch_status = "ENABLED"; - break; - default: - switch_status = "UNKNOWN"; - break; - } - doc.AddMember("status", rapidjson::StringRef(switch_status.data(), switch_status.size()), - doc.GetAllocator()); - - // Max reserved snapshots - if (instance.has_max_reserved_snapshot()) { - doc.AddMember("max_reserved_snapshots", instance.max_reserved_snapshot(), - doc.GetAllocator()); - } - - // Snapshot interval seconds - if (instance.has_snapshot_interval_seconds()) { - doc.AddMember("snapshot_interval_seconds", instance.snapshot_interval_seconds(), - doc.GetAllocator()); - } - - rapidjson::StringBuffer buffer; - rapidjson::Writer writer(buffer); - doc.Accept(writer); - - return http_json_reply(MetaServiceCode::OK, "", buffer.GetString()); -} - void MetaServiceImpl::http(::google::protobuf::RpcController* controller, const MetaServiceHttpRequest*, MetaServiceHttpResponse*, ::google::protobuf::Closure* done) { - using HttpHandler = HttpResponse (*)(MetaServiceImpl*, brpc::Controller*); - static std::unordered_map http_handlers { - // for alter cluster. - {"add_cluster", process_alter_cluster}, - {"drop_cluster", process_alter_cluster}, - {"rename_cluster", process_alter_cluster}, - {"update_cluster_endpoint", process_alter_cluster}, - {"update_cluster_mysql_user_name", process_alter_cluster}, - {"add_node", process_alter_cluster}, - {"drop_node", process_alter_cluster}, - {"decommission_node", process_alter_cluster}, - {"set_cluster_status", process_alter_cluster}, - {"notify_decommissioned", process_alter_cluster}, - {"alter_vcluster_info", process_alter_cluster}, - {"v1/add_cluster", process_alter_cluster}, - {"v1/drop_cluster", process_alter_cluster}, - {"v1/rename_cluster", process_alter_cluster}, - {"v1/update_cluster_endpoint", process_alter_cluster}, - {"v1/update_cluster_mysql_user_name", process_alter_cluster}, - {"v1/add_node", process_alter_cluster}, - {"v1/drop_node", process_alter_cluster}, - {"v1/decommission_node", process_alter_cluster}, - {"v1/set_cluster_status", process_alter_cluster}, - {"v1/alter_vcluster_info", process_alter_cluster}, - // for alter instance - {"create_instance", process_create_instance}, - {"drop_instance", process_alter_instance}, - {"rename_instance", process_alter_instance}, - {"enable_instance_sse", process_alter_instance}, - {"disable_instance_sse", process_alter_instance}, - {"set_instance_status", process_alter_instance}, - {"v1/create_instance", process_create_instance}, - {"v1/drop_instance", process_alter_instance}, - {"v1/rename_instance", process_alter_instance}, - {"v1/enable_instance_sse", process_alter_instance}, - {"v1/disable_instance_sse", process_alter_instance}, - {"v1/set_instance_status", process_alter_instance}, - // for alter obj store info - {"add_obj_info", process_alter_obj_store_info}, - {"legacy_update_ak_sk", process_alter_obj_store_info}, - {"update_ak_sk", process_update_ak_sk}, - {"v1/add_obj_info", process_alter_obj_store_info}, - {"v1/legacy_update_ak_sk", process_alter_obj_store_info}, - {"v1/update_ak_sk", process_update_ak_sk}, - {"show_storage_vaults", process_get_obj_store_info}, - {"add_hdfs_vault", process_alter_storage_vault}, - {"add_s3_vault", process_alter_storage_vault}, - {"alter_s3_vault", process_alter_storage_vault}, - {"drop_s3_vault", process_alter_storage_vault}, - {"drop_hdfs_vault", process_alter_storage_vault}, - {"alter_obj_info", process_alter_obj_store_info}, - {"v1/alter_obj_info", process_alter_obj_store_info}, - {"v1/alter_s3_vault", process_alter_storage_vault}, - - // for tools - {"decode_key", process_decode_key}, - {"encode_key", process_encode_key}, - {"get_value", process_get_value}, - {"set_value", process_set_value}, - {"show_meta_ranges", process_show_meta_ranges}, - {"txn_lazy_commit", process_txn_lazy_commit}, - {"injection_point", process_injection_point}, - {"fix_tablet_stats", process_fix_tablet_stats}, - {"fix_tablet_db_id", process_fix_tablet_db_id}, - {"v1/decode_key", process_decode_key}, - {"v1/encode_key", process_encode_key}, - {"v1/get_value", process_get_value}, - {"v1/set_value", process_set_value}, - {"v1/show_meta_ranges", process_show_meta_ranges}, - {"v1/txn_lazy_commit", process_txn_lazy_commit}, - {"v1/injection_point", process_injection_point}, - {"v1/fix_tablet_stats", process_fix_tablet_stats}, - {"v1/fix_tablet_db_id", process_fix_tablet_db_id}, - // for get - {"get_instance", process_get_instance_info}, - {"get_obj_store_info", process_get_obj_store_info}, - {"get_cluster", process_get_cluster}, - {"get_tablet_stats", process_get_tablet_stats}, - {"get_stage", process_get_stage}, - {"get_cluster_status", process_get_cluster_status}, - {"v1/get_instance", process_get_instance_info}, - {"v1/get_obj_store_info", process_get_obj_store_info}, - {"v1/get_cluster", process_get_cluster}, - {"v1/get_tablet_stats", process_get_tablet_stats}, - {"v1/get_stage", process_get_stage}, - {"v1/get_cluster_status", process_get_cluster_status}, - // snapshot related - {"list_snapshot", process_list_snapshot}, - {"set_snapshot_property", process_set_snapshot_property}, - {"get_snapshot_property", process_get_snapshot_property}, - {"set_multi_version_status", process_set_multi_version_status}, - {"v1/list_snapshot", process_list_snapshot}, - {"v1/set_snapshot_property", process_set_snapshot_property}, - {"v1/get_snapshot_property", process_get_snapshot_property}, - {"v1/set_multi_version_status", process_set_multi_version_status}, - // misc - {"abort_txn", process_abort_txn}, - {"abort_tablet_job", process_abort_tablet_job}, - {"alter_ram_user", process_alter_ram_user}, - {"alter_iam", process_alter_iam}, - {"adjust_rate_limit", process_adjust_rate_limit}, - {"list_rate_limit", process_query_rate_limit}, - {"update_config", process_update_config}, - {"v1/abort_txn", process_abort_txn}, - {"v1/abort_tablet_job", process_abort_tablet_job}, - {"v1/alter_ram_user", process_alter_ram_user}, - {"v1/alter_iam", process_alter_iam}, - {"v1/adjust_rate_limit", process_adjust_rate_limit}, - {"v1/list_rate_limit", process_query_rate_limit}, - {"v1/update_config", process_update_config}, - }; - auto* cntl = static_cast(controller); brpc::ClosureGuard closure_guard(done); - // Prepare input request info LOG(INFO) << "rpc from " << cntl->remote_side() << " request: " << cntl->http_request().uri().path(); std::string http_request = format_http_request(cntl); std::string http_request_for_log = encryt_sk(http_request); http_request_for_log = hide_ak(http_request_for_log); + const auto& unresolved_path = cntl->http_request().unresolved_path(); + auto api_path = split_http_api_path(unresolved_path); + const auto& handlers = get_http_handlers(); + auto it = handlers.find(api_path.route); // Auth auto token = http_query(cntl->http_request().uri(), "token"); @@ -980,15 +191,18 @@ void MetaServiceImpl::http(::google::protobuf::RpcController* controller, return; } - // Process http request - auto& unresolved_path = cntl->http_request().unresolved_path(); - HttpHandler handler = process_unknown; - auto it = http_handlers.find(unresolved_path); - if (it != http_handlers.end()) { - handler = it->second; + const auto* handler = + it == handlers.end() ? nullptr : resolve_http_handler(it->second, api_path.version); + if (handler == nullptr || + (it->second.role != HttpRole::META_SERVICE && it->second.role != HttpRole::BOTH)) { + std::string msg = "http path not found or not allowed"; + cntl->http_response().set_status_code(404); + cntl->response_attachment().append(msg); + cntl->response_attachment().append("\n"); + return; } - auto [status_code, msg, body] = handler(this, cntl); + auto [status_code, msg, body] = (*handler)(this, cntl); cntl->http_response().set_status_code(status_code); cntl->response_attachment().append(body); cntl->response_attachment().append("\n"); @@ -999,4 +213,4 @@ void MetaServiceImpl::http(::google::protobuf::RpcController* controller, << http_request_for_log << "\n ret=" << ret << " msg=" << msg; } -} // namespace doris::cloud +} // namespace doris::cloud \ No newline at end of file diff --git a/cloud/src/meta-service/meta_service_http.h b/cloud/src/meta-service/meta_service_http.h index 1dca1d3d64dd33..8e0c9df6cc1f9c 100644 --- a/cloud/src/meta-service/meta_service_http.h +++ b/cloud/src/meta-service/meta_service_http.h @@ -21,9 +21,11 @@ #include #include +#include #include #include #include +#include #include "common/util.h" @@ -77,4 +79,20 @@ inline static HttpResponse http_text_reply(const MetaServiceResponseStatus& stat return http_text_reply(status.code(), status.msg(), body); } +// Forward declarations +class MetaServiceImpl; +class RecyclerServiceImpl; + +// Role-based HTTP handler +enum class HttpRole { META_SERVICE = 1, RECYCLER = 2, BOTH = 3 }; + +using HttpHandler = std::function; + +struct HttpHandlerInfo { + HttpHandler handler; + // Route-local overrides keyed by API version, e.g. {"v2", handler_for_v2}. + std::unordered_map versioned_handlers; + HttpRole role; +}; + } // namespace doris::cloud diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h index bf04f2f575803b..db774017dac212 100644 --- a/cloud/src/recycler/recycler.h +++ b/cloud/src/recycler/recycler.h @@ -88,6 +88,10 @@ class Recycler { bool stopped() const { return stopped_.load(std::memory_order_acquire); } + [[nodiscard]] const RecyclerThreadPoolGroup& thread_pool_group() const { + return _thread_pool_group; + } + private: void recycle_callback(); diff --git a/cloud/src/recycler/recycler_service.cpp b/cloud/src/recycler/recycler_service.cpp index 4b33ef76c9575e..7503f8cc6f8f44 100644 --- a/cloud/src/recycler/recycler_service.cpp +++ b/cloud/src/recycler/recycler_service.cpp @@ -33,6 +33,7 @@ #include "common/config.h" #include "common/defer.h" +#include "common/http_helper.h" #include "common/logging.h" #include "common/util.h" #include "cpp/s3_rate_limiter.h" @@ -507,204 +508,43 @@ void RecyclerServiceImpl::http(::google::protobuf::RpcController* controller, const ::doris::cloud::MetaServiceHttpRequest* request, ::doris::cloud::MetaServiceHttpResponse* response, ::google::protobuf::Closure* done) { - auto cntl = static_cast(controller); - LOG(INFO) << "rpc from " << cntl->remote_side() << " request: " << request->DebugString(); + auto* cntl = static_cast(controller); + LOG(INFO) << "rpc from " << cntl->remote_side() + << " request: " << cntl->http_request().uri().path(); brpc::ClosureGuard closure_guard(done); - MetaServiceCode code = MetaServiceCode::OK; - int status_code = 200; - std::string msg = "OK"; - std::string req; - std::string response_body; - std::string request_body; - DORIS_CLOUD_DEFER { - status_code = std::get<0>(convert_ms_code_to_http_code(code)); - LOG(INFO) << (code == MetaServiceCode::OK ? "succ to " : "failed to ") << "http" - << " " << cntl->remote_side() << " request=\n" - << req << "\n ret=" << code << " msg=" << msg; - cntl->http_response().set_status_code(status_code); - cntl->response_attachment().append(response_body); + const auto& unresolved_path = cntl->http_request().unresolved_path(); + auto api_path = split_http_api_path(unresolved_path); + const auto& handlers = get_http_handlers(); + auto it = handlers.find(api_path.route); + const auto* handler = + it == handlers.end() ? nullptr : resolve_http_handler(it->second, api_path.version); + if (handler == nullptr || + (it->second.role != HttpRole::RECYCLER && it->second.role != HttpRole::BOTH)) { + std::string msg = "http path not found or not allowed"; + cntl->http_response().set_status_code(404); + cntl->response_attachment().append(msg); cntl->response_attachment().append("\n"); - }; - - // Prepare input request info - auto unresolved_path = cntl->http_request().unresolved_path(); - auto uri = cntl->http_request().uri(); - std::stringstream ss; - ss << "\nuri_path=" << uri.path(); - ss << "\nunresolved_path=" << unresolved_path; - ss << "\nmethod=" << brpc::HttpMethod2Str(cntl->http_request().method()); - ss << "\nquery strings:"; - for (auto it = uri.QueryBegin(); it != uri.QueryEnd(); ++it) { - ss << "\n" << it->first << "=" << it->second; - } - ss << "\nheaders:"; - for (auto it = cntl->http_request().HeaderBegin(); it != cntl->http_request().HeaderEnd(); - ++it) { - ss << "\n" << it->first << ":" << it->second; + return; } - req = ss.str(); - ss.clear(); - request_body = cntl->request_attachment().to_string(); // Just copy // Auth - auto token = uri.GetQuery("token"); + const auto* token = cntl->http_request().uri().GetQuery("token"); if (token == nullptr || *token != config::http_token) { - msg = "incorrect token, token=" + (token == nullptr ? std::string("(not given)") : *token); - response_body = "incorrect token"; - status_code = 403; - return; - } - - if (unresolved_path == "recycle_instance") { - RecycleInstanceRequest req; - auto st = google::protobuf::util::JsonStringToMessage(request_body, &req); - if (!st.ok()) { - msg = "failed to RecycleInstanceRequest, error: " + st.message().ToString(); - response_body = msg; - LOG(WARNING) << msg; - return; - } - RecycleInstanceResponse res; - recycle_instance(cntl, &req, &res, nullptr); - code = res.status().code(); - msg = res.status().msg(); - response_body = msg; - return; - } - - if (unresolved_path == "statistics_recycle") { - StatisticsRecycleRequest req; - auto st = google::protobuf::util::JsonStringToMessage(request_body, &req); - if (!st.ok()) { - msg = "failed to StatisticsRecycleRequest, error: " + st.message().ToString(); - response_body = msg; - LOG(WARNING) << msg; - return; - } - statistics_recycle(req, code, msg); - response_body = msg; - return; - } - - if (unresolved_path == "recycle_copy_jobs") { - auto instance_id = uri.GetQuery("instance_id"); - if (instance_id == nullptr || instance_id->empty()) { - msg = "no instance id"; - response_body = msg; - status_code = 400; - return; - } - recycle_copy_jobs(txn_kv_, *instance_id, code, msg, recycler_->_thread_pool_group, - txn_lazy_committer_); - - response_body = msg; - return; - } - - if (unresolved_path == "recycle_job_info") { - auto instance_id = uri.GetQuery("instance_id"); - if (instance_id == nullptr || instance_id->empty()) { - msg = "no instance id"; - response_body = msg; - status_code = 400; - return; - } - std::string key; - job_recycle_key({*instance_id}, &key); - recycle_job_info(txn_kv_, *instance_id, key, code, msg); - response_body = msg; - return; - } - - if (unresolved_path == "check_instance") { - auto instance_id = uri.GetQuery("instance_id"); - if (instance_id == nullptr || instance_id->empty()) { - msg = "no instance id"; - response_body = msg; - status_code = 400; - return; - } - if (!checker_) { - msg = "checker not enabled"; - response_body = msg; - status_code = 400; - return; - } - check_instance(*instance_id, code, msg); - response_body = msg; - return; - } - - if (unresolved_path == "check_job_info") { - auto instance_id = uri.GetQuery("instance_id"); - if (instance_id == nullptr || instance_id->empty()) { - msg = "no instance id"; - response_body = msg; - status_code = 400; - return; - } - std::string key; - job_check_key({*instance_id}, &key); - recycle_job_info(txn_kv_, *instance_id, key, code, msg); - response_body = msg; - return; - } - - if (unresolved_path == "check_meta") { - auto instance_id = uri.GetQuery("instance_id"); - auto host = uri.GetQuery("host"); - auto port = uri.GetQuery("port"); - auto user = uri.GetQuery("user"); - auto password = uri.GetQuery("password"); - if (instance_id == nullptr || instance_id->empty() || host == nullptr || host->empty() || - port == nullptr || port->empty() || password == nullptr || user == nullptr || - user->empty()) { - msg = "no instance id or mysql conn str info"; - response_body = msg; - status_code = 400; - return; - } - LOG(INFO) << " host " << *host; - LOG(INFO) << " port " << *port; - LOG(INFO) << " user " << *user; - LOG(INFO) << " instance " << *instance_id; - check_meta(txn_kv_, *instance_id, *host, *port, *user, *password, msg); - status_code = 200; - response_body = msg; + std::string msg = "incorrect token, token=" + + (token == nullptr ? std::string("(not given)") : *token); + cntl->http_response().set_status_code(403); + cntl->response_attachment().append(msg); + cntl->response_attachment().append("\n"); + LOG(WARNING) << "failed to handle http from " << cntl->remote_side() << " msg: " << msg; return; } - if (unresolved_path == "adjust_rate_limiter") { - auto type_string = uri.GetQuery("type"); - auto speed = uri.GetQuery("speed"); - auto burst = uri.GetQuery("burst"); - auto limit = uri.GetQuery("limit"); - if (type_string->empty() || speed->empty() || burst->empty() || limit->empty() || - (*type_string != "get" && *type_string != "put")) { - msg = "argument not suitable"; - response_body = msg; - status_code = 400; - return; - } - auto max_speed = speed->empty() ? 0 : std::stoul(*speed); - auto max_burst = burst->empty() ? 0 : std::stoul(*burst); - auto max_limit = burst->empty() ? 0 : std::stoul(*limit); - if (0 != reset_s3_rate_limiter(string_to_s3_rate_limit_type(*type_string), max_speed, - max_burst, max_limit)) { - msg = "adjust failed"; - response_body = msg; - status_code = 400; - return; - } - - status_code = 200; - response_body = msg; - return; - } + auto [status_code, msg, body] = (*handler)(this, cntl); + cntl->http_response().set_status_code(status_code); + cntl->response_attachment().append(body); + cntl->response_attachment().append("\n"); - status_code = 404; - msg = "http path " + uri.path() + " not found, it may be not implemented"; - response_body = msg; + LOG(INFO) << (status_code == 200 ? "succ to " : "failed to ") << __PRETTY_FUNCTION__ << " " + << cntl->remote_side() << " ret=" << status_code << " msg=" << msg; } - } // namespace doris::cloud diff --git a/cloud/src/recycler/recycler_service.h b/cloud/src/recycler/recycler_service.h index 6890d7049bd90b..53612568d24959 100644 --- a/cloud/src/recycler/recycler_service.h +++ b/cloud/src/recycler/recycler_service.h @@ -27,6 +27,8 @@ namespace doris::cloud { class Recycler; class Checker; +struct RecyclerThreadPoolGroup; +class TxnLazyCommitter; class RecyclerServiceImpl : public cloud::RecyclerService { public: @@ -34,6 +36,19 @@ class RecyclerServiceImpl : public cloud::RecyclerService { std::shared_ptr txn_lazy_committer); ~RecyclerServiceImpl() override; + [[nodiscard]] const std::shared_ptr& txn_kv() const { return txn_kv_; } + [[nodiscard]] Recycler* recycler() const { return recycler_; } + [[nodiscard]] Checker* checker() const { return checker_; } + [[nodiscard]] const std::shared_ptr& txn_lazy_committer() const { + return txn_lazy_committer_; + } + + void statistics_recycle(StatisticsRecycleRequest& req, MetaServiceCode& code, + std::string& msg); + + void check_instance(const std::string& instance_id, MetaServiceCode& code, + std::string& msg); + void recycle_instance(::google::protobuf::RpcController* controller, const ::doris::cloud::RecycleInstanceRequest* request, ::doris::cloud::RecycleInstanceResponse* response, @@ -44,11 +59,6 @@ class RecyclerServiceImpl : public cloud::RecyclerService { ::doris::cloud::MetaServiceHttpResponse* response, ::google::protobuf::Closure* done) override; -private: - void statistics_recycle(StatisticsRecycleRequest& req, MetaServiceCode& code, std::string& msg); - - void check_instance(const std::string& instance_id, MetaServiceCode& code, std::string& msg); - private: std::shared_ptr txn_kv_; Recycler* recycler_; // Ref @@ -59,4 +69,14 @@ class RecyclerServiceImpl : public cloud::RecyclerService { extern int reset_s3_rate_limiter(S3RateLimitType type, size_t max_speed, size_t max_burst, size_t limit); +void recycle_copy_jobs(const std::shared_ptr& txn_kv, const std::string& instance_id, + MetaServiceCode& code, std::string& msg, + RecyclerThreadPoolGroup thread_pool_group, + std::shared_ptr txn_lazy_committer); +void recycle_job_info(const std::shared_ptr& txn_kv, const std::string& instance_id, + std::string_view key, MetaServiceCode& code, std::string& msg); +void check_meta(const std::shared_ptr& txn_kv, const std::string& instance_id, + const std::string& host, const std::string& port, const std::string& user, + const std::string& password, std::string& msg); + } // namespace doris::cloud diff --git a/cloud/test/meta_service_http_test.cpp b/cloud/test/meta_service_http_test.cpp index abf791649de72a..37cf0ddd421aec 100644 --- a/cloud/test/meta_service_http_test.cpp +++ b/cloud/test/meta_service_http_test.cpp @@ -42,6 +42,7 @@ #include "common/config.h" #include "common/configbase.h" #include "common/defer.h" +#include "common/http_helper.h" #include "common/logging.h" #include "common/util.h" #include "cpp/sync_point.h" @@ -370,7 +371,52 @@ static void insert_rowset(MetaService* meta_service, int64_t db_id, const std::s commit_txn(meta_service, db_id, txn_id, label); } -/// NOTICE: Not ALL `code`, returned by http server, are supported by `MetaServiceCode`. +TEST(MetaServiceHttpTest, SplitHttpApiPath) { + { + auto path = split_http_api_path("v1/create_instance"); + ASSERT_EQ(path.version, "v1"); + ASSERT_EQ(path.route, "create_instance"); + } + { + auto path = split_http_api_path("v2/create_instance"); + ASSERT_EQ(path.version, "v2"); + ASSERT_EQ(path.route, "create_instance"); + } + { + auto path = split_http_api_path("create_instance"); + ASSERT_TRUE(path.version.empty()); + ASSERT_EQ(path.route, "create_instance"); + } + { + auto path = split_http_api_path("version/create_instance"); + ASSERT_TRUE(path.version.empty()); + ASSERT_EQ(path.route, "version/create_instance"); + } +} + +TEST(MetaServiceHttpTest, ResolveHttpHandlerByVersion) { + // clang-format off + HttpHandlerInfo handler_info { + .handler = [](void*, brpc::Controller*) { return HttpResponse {200, "v1", ""}; }, + .versioned_handlers = {{"v2", + [](void*, brpc::Controller*) { + return HttpResponse {200, "v2", ""}; + }}}, + .role = HttpRole::META_SERVICE}; + // clang-format on + + ASSERT_EQ(resolve_http_handler(handler_info, ""), &handler_info.handler); + ASSERT_EQ(resolve_http_handler(handler_info, "v1"), &handler_info.handler); + ASSERT_EQ(resolve_http_handler(handler_info, "v2"), &handler_info.versioned_handlers.at("v2")); + ASSERT_EQ(resolve_http_handler(handler_info, "v3"), nullptr); + + const auto& handlers = get_http_handlers(); + auto it = handlers.find("add_cluster"); + ASSERT_NE(it, handlers.end()); + ASSERT_EQ(resolve_http_handler(it->second, ""), &it->second.handler); + ASSERT_EQ(resolve_http_handler(it->second, "v1"), &it->second.handler); + ASSERT_EQ(resolve_http_handler(it->second, "v2"), nullptr); +} TEST(MetaServiceHttpTest, InstanceTest) { HttpContext ctx; @@ -610,6 +656,74 @@ TEST(MetaServiceHttpTest, InstanceTestWithVersion) { } } +TEST(MetaServiceHttpTest, AlterClusterTestWithVersion) { + config::enable_cluster_name_check = true; + + HttpContext ctx; + { + CreateInstanceRequest req; + req.set_instance_id(mock_instance); + req.set_user_id("test_user"); + req.set_name("test_name"); + ObjectStoreInfoPB obj; + obj.set_ak("123"); + obj.set_sk("321"); + obj.set_bucket("456"); + obj.set_prefix("654"); + obj.set_endpoint("789"); + obj.set_region("987"); + obj.set_external_endpoint("888"); + obj.set_provider(ObjectStoreInfoPB::BOS); + req.mutable_obj_info()->CopyFrom(obj); + + auto [status_code, resp] = + ctx.forward("v1/create_instance", req); + ASSERT_EQ(status_code, 200); + ASSERT_EQ(resp.code(), MetaServiceCode::OK); + } + + { + AlterClusterRequest req; + req.set_instance_id(mock_instance); + req.mutable_cluster()->set_cluster_name(mock_cluster_name); + req.mutable_cluster()->set_type(ClusterPB::COMPUTE); + req.mutable_cluster()->set_cluster_id(mock_cluster_id); + auto [status_code, resp] = ctx.forward("v1/add_cluster", req); + ASSERT_EQ(status_code, 200); + ASSERT_EQ(resp.code(), MetaServiceCode::OK); + } + + { + GetClusterRequest req; + req.set_cloud_unique_id("1:" + mock_instance + ":xxxx"); + req.set_cluster_id(mock_cluster_id); + auto [status_code, resp] = ctx.forward_with_result("v1/get_cluster", req); + ASSERT_EQ(status_code, 200); + ASSERT_EQ(resp.status.code(), MetaServiceCode::OK); + ASSERT_TRUE(resp.result.has_value()); + ASSERT_EQ(resp.result->cluster_name(), mock_cluster_name); + } + + { + AlterClusterRequest req; + req.set_instance_id(mock_instance); + req.mutable_cluster()->set_cluster_id(mock_cluster_id); + req.mutable_cluster()->set_cluster_name("rename_cluster_name"); + auto [status_code, resp] = ctx.forward("v1/rename_cluster", req); + ASSERT_EQ(status_code, 200); + ASSERT_EQ(resp.code(), MetaServiceCode::OK); + } + + { + AlterClusterRequest req; + req.set_instance_id(mock_instance); + req.mutable_cluster()->set_cluster_id(mock_cluster_id); + auto [status_code, resp] = ctx.forward("v1/drop_cluster", req); + ASSERT_EQ(status_code, 200); + ASSERT_EQ(resp.code(), MetaServiceCode::OK); + } +} + TEST(MetaServiceHttpTest, AlterClusterTest) { config::enable_cluster_name_check = true; @@ -1397,8 +1511,8 @@ TEST(MetaServiceHttpTest, GetTabletStatsTest) { TEST(MetaServiceHttpTest, ToUnknownUrlTest) { HttpContext ctx; auto [status_code, content] = ctx.query("unkown_resource_xxxxxx", ""); - ASSERT_EQ(status_code, 200); - ASSERT_EQ(content, "{\n \"code\": \"OK\",\n \"msg\": \"\"\n}\n"); + ASSERT_EQ(status_code, 404); + ASSERT_EQ(content, "http path not found or not allowed\n"); } TEST(MetaServiceHttpTest, UnknownFields) { @@ -1820,7 +1934,7 @@ TEST(MetaServiceHttpTest, UpdateConfig) { { auto [status_code, content] = ctx.query("update_config", ""); ASSERT_EQ(status_code, 400); - std::string msg = "query param `config` should not be empty"; + std::string msg = "query param `configs` should not be empty"; ASSERT_NE(content.find(msg), std::string::npos); } { @@ -2047,6 +2161,53 @@ TEST(MetaServiceHttpTest, UpdateConfig) { } } +TEST(MetaServiceHttpTest, ShowConfigEscapesJsonSpecialCharacters) { + HttpContext ctx; + + const std::string config_key = "idempotent_request_replay_exclusion"; + const std::string old_value = config::idempotent_request_replay_exclusion; + const std::string new_value = R"(Get"Tablet\StatsRequest)"; + DORIS_CLOUD_DEFER_COPY(config_key, old_value) { + auto [succ, cause] = config::set_config({{config_key, old_value}}, false, ""); + ASSERT_TRUE(succ) << cause; + }; + + { + auto [succ, cause] = config::set_config({{config_key, new_value}}, false, ""); + ASSERT_TRUE(succ) << cause; + } + + { + rapidjson::Document d; + rapidjson::ParseResult ps = d.Parse(config::show_config(config_key).c_str()); + ASSERT_TRUE(ps) << rapidjson::GetParseError_En(ps.Code()); + ASSERT_TRUE(d.IsArray()); + ASSERT_EQ(d.Size(), 1); + ASSERT_TRUE(d[0].IsArray()); + ASSERT_EQ(d[0].Size(), 4); + ASSERT_TRUE(d[0][2].IsString()); + ASSERT_EQ(d[0][2].GetString(), new_value); + } + + { + auto [status_code, body] = ctx.query("show_config", "conf_key=" + config_key); + ASSERT_EQ(status_code, 200); + + rapidjson::Document d; + rapidjson::ParseResult ps = d.Parse(body.c_str()); + ASSERT_TRUE(ps) << rapidjson::GetParseError_En(ps.Code()) << ", body: " << body; + ASSERT_TRUE(d.HasMember("code")); + ASSERT_STREQ(d["code"].GetString(), "OK"); + ASSERT_TRUE(d.HasMember("result")); + ASSERT_TRUE(d["result"].IsArray()); + ASSERT_EQ(d["result"].Size(), 1); + ASSERT_TRUE(d["result"][0].IsArray()); + ASSERT_EQ(d["result"][0].Size(), 4); + ASSERT_TRUE(d["result"][0][2].IsString()); + ASSERT_EQ(d["result"][0][2].GetString(), new_value); + } +} + TEST(HttpEncodeKeyTest, ProcessHttpSetValue) { auto txn_kv = std::make_shared(); std::unique_ptr txn; From ebcd6e0e8a25100384210c53309fe5f78bf3c216 Mon Sep 17 00:00:00 2001 From: Yixuan Wang Date: Fri, 15 May 2026 11:10:27 +0800 Subject: [PATCH 2/3] [fix](recycler) Recycler/Ms http api result msg should set ok (#63051) [fix](recycler) Recycler/Ms http api result msg should set ok Set successful Recycler/MS HTTP API messages to OK. The shared helper covers JSON/text success messages. --- cloud/src/meta-service/meta_service_http.h | 6 ++++-- cloud/test/meta_service_http_test.cpp | 6 +++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/cloud/src/meta-service/meta_service_http.h b/cloud/src/meta-service/meta_service_http.h index 8e0c9df6cc1f9c..3c9894839232e3 100644 --- a/cloud/src/meta-service/meta_service_http.h +++ b/cloud/src/meta-service/meta_service_http.h @@ -70,8 +70,10 @@ inline static HttpResponse http_json_reply_message(const MetaServiceResponseStat inline static HttpResponse http_text_reply(MetaServiceCode code, const std::string& msg, const std::string& body) { - auto [status_code, _] = convert_ms_code_to_http_code(code); - return {status_code, msg, body}; + auto [status_code, status_msg] = convert_ms_code_to_http_code(code); + std::string response_msg = + code == MetaServiceCode::OK && msg.empty() ? std::string(status_msg) : msg; + return {.status_code = status_code, .msg = std::move(response_msg), .body = body}; } inline static HttpResponse http_text_reply(const MetaServiceResponseStatus& status, diff --git a/cloud/test/meta_service_http_test.cpp b/cloud/test/meta_service_http_test.cpp index 37cf0ddd421aec..7452fed53ec417 100644 --- a/cloud/test/meta_service_http_test.cpp +++ b/cloud/test/meta_service_http_test.cpp @@ -867,7 +867,7 @@ TEST(MetaServiceHttpTest, AlterClusterTest) { auto [status_code, resp] = ctx.forward("add_cluster", req); ASSERT_EQ(status_code, 200); ASSERT_EQ(resp.code(), MetaServiceCode::OK); - ASSERT_EQ(resp.msg(), ""); + ASSERT_EQ(resp.msg(), "OK"); } // case: request has invalid argument @@ -2848,7 +2848,7 @@ TEST(MetaServiceHttpTest, VirtualClusterTest) { node->set_edit_log_port(9990); node->set_node_type(NodeInfoPB::FE_MASTER); ret = ctx.forward("add_cluster", req_before_fe); - ASSERT_EQ(std::get<1>(ret).msg(), ""); + ASSERT_EQ(std::get<1>(ret).msg(), "OK"); ASSERT_EQ(std::get<0>(ret), 200); ASSERT_EQ(std::get<1>(ret).code(), MetaServiceCode::OK); @@ -2859,7 +2859,7 @@ TEST(MetaServiceHttpTest, VirtualClusterTest) { policy.add_standby_cluster_names(mock_exist_cluster_name1); ret = add_cluster(ctx, mock_vcg_name1, mock_vcg_id1, ClusterPB::VIRTUAL, {mock_exist_cluster_name1, mock_exist_cluster_name2}, &policy); - ASSERT_EQ(std::get<1>(ret).msg(), ""); + ASSERT_EQ(std::get<1>(ret).msg(), "OK"); ASSERT_EQ(std::get<0>(ret), 200); ASSERT_EQ(std::get<1>(ret).code(), MetaServiceCode::OK); From 153d9375d09c856f82e5d41dc8b7cad13adff183 Mon Sep 17 00:00:00 2001 From: Yixuan Wang Date: Fri, 22 May 2026 15:48:38 +0800 Subject: [PATCH 3/3] format --- cloud/src/recycler/recycler_service.h | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/cloud/src/recycler/recycler_service.h b/cloud/src/recycler/recycler_service.h index 53612568d24959..c00cd1e74a7a6c 100644 --- a/cloud/src/recycler/recycler_service.h +++ b/cloud/src/recycler/recycler_service.h @@ -43,11 +43,9 @@ class RecyclerServiceImpl : public cloud::RecyclerService { return txn_lazy_committer_; } - void statistics_recycle(StatisticsRecycleRequest& req, MetaServiceCode& code, - std::string& msg); + void statistics_recycle(StatisticsRecycleRequest& req, MetaServiceCode& code, std::string& msg); - void check_instance(const std::string& instance_id, MetaServiceCode& code, - std::string& msg); + void check_instance(const std::string& instance_id, MetaServiceCode& code, std::string& msg); void recycle_instance(::google::protobuf::RpcController* controller, const ::doris::cloud::RecycleInstanceRequest* request,