diff --git a/cloud/src/common/CMakeLists.txt b/cloud/src/common/CMakeLists.txt index d3ba8914f9fb3e..c83e11a9eb6ab3 100644 --- a/cloud/src/common/CMakeLists.txt +++ b/cloud/src/common/CMakeLists.txt @@ -10,6 +10,7 @@ set(COMMON_FILES logging.cpp bvars.cpp encryption_util.cpp + http_helper.cpp metric.cpp kms.cpp network_util.cpp diff --git a/cloud/src/common/configbase.cpp b/cloud/src/common/configbase.cpp index ae32ffb5f4da4f..f5e02eac64bee6 100644 --- a/cloud/src/common/configbase.cpp +++ b/cloud/src/common/configbase.cpp @@ -16,6 +16,9 @@ // under the License. #include +#include +#include +#include #include #include @@ -462,4 +465,58 @@ std::pair set_config(std::unordered_mapfind(name); + std::string value = (it != full_conf_map->end()) ? it->second : ""; + + rapidjson::Value item(rapidjson::kArrayType); + item.PushBack(rapidjson::Value(name.data(), name.size(), allocator), allocator); + item.PushBack(rapidjson::Value(field.type, allocator), allocator); + item.PushBack(rapidjson::Value(value.data(), value.size(), allocator), allocator); + item.PushBack(field.valmutable, allocator); + doc.PushBack(item, allocator); + } + + rapidjson::StringBuffer sb; + rapidjson::Writer writer(sb); + doc.Accept(writer); + return sb.GetString(); +} + +std::pair update_config(const std::string& configs, bool persist, + const std::string& custom_conf_path) { + if (configs.empty()) { + return {false, "query param `configs` should not be empty"}; + } + + std::unordered_map conf_map; + std::istringstream ss(configs); + std::string conf; + while (std::getline(ss, conf, ',')) { + auto pos = conf.find('='); + if (pos == std::string::npos) { + return {false, fmt::format("config {} is invalid", conf)}; + } + std::string key = conf.substr(0, pos); + std::string val = conf.substr(pos + 1); + trim(key); + trim(val); + conf_map.emplace(std::move(key), std::move(val)); + } + + return set_config(std::move(conf_map), persist, custom_conf_path); +} + } // namespace doris::cloud::config diff --git a/cloud/src/common/configbase.h b/cloud/src/common/configbase.h index f1cc3716ba320c..b6eff9ae101c06 100644 --- a/cloud/src/common/configbase.h +++ b/cloud/src/common/configbase.h @@ -181,4 +181,9 @@ std::pair set_config(std::unordered_map update_config(const std::string& configs, bool persist, + const std::string& custom_conf_path); + } // namespace doris::cloud::config diff --git a/cloud/src/common/http_helper.cpp b/cloud/src/common/http_helper.cpp new file mode 100644 index 00000000000000..7f6dfd3d932360 --- /dev/null +++ b/cloud/src/common/http_helper.cpp @@ -0,0 +1,1184 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "http_helper.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "common/metric.h" +#include "cpp/s3_rate_limiter.h" +#include "meta-service/meta_service.h" +#include "meta-service/meta_service_helper.h" +#include "meta-service/meta_service_http.h" +#include "recycler/recycler.h" +#include "recycler/recycler_service.h" +namespace doris::cloud { + +template +static google::protobuf::util::Status parse_json_message(const std::string& unresolved_path, + const std::string& body, Message* req) { + static_assert(std::is_base_of_v); + auto st = google::protobuf::util::JsonStringToMessage(body, req); + if (!st.ok()) { + std::string msg = "failed to strictly parse http request for '" + unresolved_path + + "' error: " + st.ToString(); + LOG_WARNING(msg).tag("body", encryt_sk(hide_access_key(body))); + + // ignore unknown fields + google::protobuf::util::JsonParseOptions json_parse_options; + json_parse_options.ignore_unknown_fields = true; + return google::protobuf::util::JsonStringToMessage(body, req, json_parse_options); + } + return {}; +} + +#define PARSE_MESSAGE_OR_RETURN(ctrl, req) \ + do { \ + std::string body = ctrl->request_attachment().to_string(); \ + auto& unresolved_path = ctrl->http_request().unresolved_path(); \ + auto st = parse_json_message(unresolved_path, body, &req); \ + if (!st.ok()) { \ + std::string msg = "parse http request '" + unresolved_path + "': " + st.ToString(); \ + LOG_WARNING(msg).tag("body", encryt_sk(hide_access_key(body))); \ + return http_json_reply(MetaServiceCode::PROTOBUF_PARSE_ERR, msg); \ + } \ + } while (0) + +const std::unordered_map& get_http_handlers() { + using MS = MetaServiceImpl; + using RS = RecyclerServiceImpl; + + static const auto handlers = [] { + return std::unordered_map { + // MetaService APIs + {"add_cluster", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_cluster((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"drop_cluster", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_cluster((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"rename_cluster", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_cluster((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"update_cluster_endpoint", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_cluster((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"update_cluster_mysql_user_name", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_cluster((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"add_node", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_cluster((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"drop_node", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_cluster((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"decommission_node", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_cluster((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"set_cluster_status", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_cluster((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"notify_decommissioned", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_cluster((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"alter_vcluster_info", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_cluster((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + + {"create_instance", + {.handler = [](void* s, + brpc::Controller* c) { return process_create_instance((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"drop_instance", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_instance((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"rename_instance", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_instance((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"enable_instance_sse", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_instance((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"disable_instance_sse", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_instance((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"set_instance_status", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_instance((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + + {"add_obj_info", + {.handler = + [](void* s, brpc::Controller* c) { + return process_alter_obj_store_info((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"legacy_update_ak_sk", + {.handler = + [](void* s, brpc::Controller* c) { + return process_alter_obj_store_info((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"update_ak_sk", + {.handler = [](void* s, + brpc::Controller* c) { return process_update_ak_sk((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"show_storage_vaults", + {.handler = + [](void* s, brpc::Controller* c) { + return process_get_obj_store_info((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"add_hdfs_vault", + {.handler = + [](void* s, brpc::Controller* c) { + return process_alter_storage_vault((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"add_s3_vault", + {.handler = + [](void* s, brpc::Controller* c) { + return process_alter_storage_vault((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"alter_s3_vault", + {.handler = + [](void* s, brpc::Controller* c) { + return process_alter_storage_vault((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"drop_s3_vault", + {.handler = + [](void* s, brpc::Controller* c) { + return process_alter_storage_vault((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"drop_hdfs_vault", + {.handler = + [](void* s, brpc::Controller* c) { + return process_alter_storage_vault((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"alter_obj_info", + {.handler = + [](void* s, brpc::Controller* c) { + return process_alter_obj_store_info((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + + {"decode_key", + {.handler = [](void* s, + brpc::Controller* c) { return process_decode_key((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"encode_key", + {.handler = [](void* s, + brpc::Controller* c) { return process_encode_key((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"get_value", + {.handler = [](void* s, + brpc::Controller* c) { return process_get_value((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"set_value", + {.handler = [](void* s, + brpc::Controller* c) { return process_set_value((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"show_meta_ranges", + {.handler = + [](void* s, brpc::Controller* c) { + return process_show_meta_ranges((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"txn_lazy_commit", + {.handler = [](void* s, + brpc::Controller* c) { return process_txn_lazy_commit((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"injection_point", + {.handler = [](void* s, + brpc::Controller* c) { return process_injection_point((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"fix_tablet_stats", + {.handler = + [](void* s, brpc::Controller* c) { + return process_fix_tablet_stats((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"fix_tablet_db_id", + {.handler = + [](void* s, brpc::Controller* c) { + return process_fix_tablet_db_id((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + + {"get_instance", + {.handler = + [](void* s, brpc::Controller* c) { + return process_get_instance_info((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"get_obj_store_info", + {.handler = + [](void* s, brpc::Controller* c) { + return process_get_obj_store_info((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"get_cluster", + {.handler = [](void* s, + brpc::Controller* c) { return process_get_cluster((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"get_tablet_stats", + {.handler = + [](void* s, brpc::Controller* c) { + return process_get_tablet_stats((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"get_stage", + {.handler = [](void* s, + brpc::Controller* c) { return process_get_stage((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"get_cluster_status", + {.handler = + [](void* s, brpc::Controller* c) { + return process_get_cluster_status((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + + {"list_snapshot", + {.handler = [](void* s, + brpc::Controller* c) { return process_list_snapshot((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"drop_snapshot", + {.handler = [](void* s, + brpc::Controller* c) { return process_drop_snapshot((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"set_snapshot_property", + {.handler = + [](void* s, brpc::Controller* c) { + return process_set_snapshot_property((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"get_snapshot_property", + {.handler = + [](void* s, brpc::Controller* c) { + return process_get_snapshot_property((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"set_multi_version_status", + {.handler = + [](void* s, brpc::Controller* c) { + return process_set_multi_version_status((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"abort_txn", + {.handler = [](void* s, + brpc::Controller* c) { return process_abort_txn((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"abort_tablet_job", + {.handler = + [](void* s, brpc::Controller* c) { + return process_abort_tablet_job((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"alter_ram_user", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_ram_user((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"alter_iam", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_iam((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"adjust_rate_limit", + {.handler = + [](void* s, brpc::Controller* c) { + return process_adjust_rate_limit((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"list_rate_limit", + {.handler = + [](void* s, brpc::Controller* c) { + return process_query_rate_limit((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + + // Recycler APIs + {"recycle_instance", + {.handler = + [](void* s, brpc::Controller* c) { + return process_recycle_instance((RS*)s, c); + }, + .role = HttpRole::RECYCLER}}, + {"statistics_recycle", + {.handler = + [](void* s, brpc::Controller* c) { + return process_statistics_recycle((RS*)s, c); + }, + .role = HttpRole::RECYCLER}}, + {"recycle_copy_jobs", + {.handler = + [](void* s, brpc::Controller* c) { + return process_recycle_copy_jobs((RS*)s, c); + }, + .role = HttpRole::RECYCLER}}, + {"recycle_job_info", + {.handler = + [](void* s, brpc::Controller* c) { + return process_recycle_job_info((RS*)s, c); + }, + .role = HttpRole::RECYCLER}}, + {"check_instance", + {.handler = [](void* s, + brpc::Controller* c) { return process_check_instance((RS*)s, c); }, + .role = HttpRole::RECYCLER}}, + {"check_job_info", + {.handler = [](void* s, + brpc::Controller* c) { return process_check_job_info((RS*)s, c); }, + .role = HttpRole::RECYCLER}}, + {"check_meta", + {.handler = [](void* s, + brpc::Controller* c) { return process_check_meta((RS*)s, c); }, + .role = HttpRole::RECYCLER}}, + {"adjust_rate_limiter", + {.handler = + [](void* s, brpc::Controller* c) { + return process_adjust_rate_limiter((RS*)s, c); + }, + .role = HttpRole::RECYCLER}}, + + // Shared APIs + {"show_config", + {.handler = [](void* s, + brpc::Controller* c) { return process_show_config((MS*)s, c); }, + .role = HttpRole::BOTH}}, + {"update_config", + {.handler = [](void* s, + brpc::Controller* c) { return process_update_config((MS*)s, c); }, + .role = HttpRole::BOTH}}, + }; + }(); + + return handlers; +} + +HttpResponse process_alter_cluster(MetaServiceImpl* service, brpc::Controller* ctrl) { + static std::unordered_map operations { + {"add_cluster", AlterClusterRequest::ADD_CLUSTER}, + {"drop_cluster", AlterClusterRequest::DROP_CLUSTER}, + {"rename_cluster", AlterClusterRequest::RENAME_CLUSTER}, + {"update_cluster_endpoint", AlterClusterRequest::UPDATE_CLUSTER_ENDPOINT}, + {"update_cluster_mysql_user_name", AlterClusterRequest::UPDATE_CLUSTER_MYSQL_USER_NAME}, + {"add_node", AlterClusterRequest::ADD_NODE}, + {"drop_node", AlterClusterRequest::DROP_NODE}, + {"decommission_node", AlterClusterRequest::DECOMMISSION_NODE}, + {"set_cluster_status", AlterClusterRequest::SET_CLUSTER_STATUS}, + {"notify_decommissioned", AlterClusterRequest::NOTIFY_DECOMMISSIONED}, + {"alter_vcluster_info", AlterClusterRequest::ALTER_VCLUSTER_INFO}, + }; + + auto& path = ctrl->http_request().unresolved_path(); + auto body = ctrl->request_attachment().to_string(); + auto it = operations.find(http_api_route(path)); + if (it == operations.end()) { + std::string msg = "not supportted alter cluster operation: " + path; + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); + } + + AlterClusterRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + + req.set_op(it->second); + AlterClusterResponse resp; + service->alter_cluster(ctrl, &req, &resp, nullptr); + return http_json_reply(resp.status()); +} + +HttpResponse process_get_obj_store_info(MetaServiceImpl* service, brpc::Controller* ctrl) { + GetObjStoreInfoRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + + GetObjStoreInfoResponse resp; + service->get_obj_store_info(ctrl, &req, &resp, nullptr); + return http_json_reply_message(resp.status(), resp); +} + +HttpResponse process_alter_obj_store_info(MetaServiceImpl* service, brpc::Controller* ctrl) { + static std::unordered_map operations { + {"add_obj_info", AlterObjStoreInfoRequest::ADD_OBJ_INFO}, + {"legacy_update_ak_sk", AlterObjStoreInfoRequest::LEGACY_UPDATE_AK_SK}, + {"alter_obj_info", AlterObjStoreInfoRequest::ALTER_OBJ_INFO}}; + + auto& path = ctrl->http_request().unresolved_path(); + auto it = operations.find(http_api_route(path)); + if (it == operations.end()) { + std::string msg = "not supportted alter obj store info operation: " + path; + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); + } + + AlterObjStoreInfoRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + req.set_op(it->second); + + AlterObjStoreInfoResponse resp; + service->alter_obj_store_info(ctrl, &req, &resp, nullptr); + return http_json_reply(resp.status()); +} + +HttpResponse process_alter_storage_vault(MetaServiceImpl* service, brpc::Controller* ctrl) { + static std::unordered_map operations { + {"drop_s3_vault", AlterObjStoreInfoRequest::DROP_S3_VAULT}, + {"add_s3_vault", AlterObjStoreInfoRequest::ADD_S3_VAULT}, + {"alter_s3_vault", AlterObjStoreInfoRequest::ALTER_S3_VAULT}, + {"drop_hdfs_vault", AlterObjStoreInfoRequest::DROP_HDFS_INFO}, + {"add_hdfs_vault", AlterObjStoreInfoRequest::ADD_HDFS_INFO}}; + + auto& path = ctrl->http_request().unresolved_path(); + auto it = operations.find(http_api_route(path)); + if (it == operations.end()) { + std::string msg = "not supportted alter storage vault operation: " + path; + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); + } + + AlterObjStoreInfoRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + req.set_op(it->second); + + AlterObjStoreInfoResponse resp; + service->alter_storage_vault(ctrl, &req, &resp, nullptr); + return http_json_reply(resp.status()); +} + +HttpResponse process_update_ak_sk(MetaServiceImpl* service, brpc::Controller* ctrl) { + UpdateAkSkRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + UpdateAkSkResponse resp; + service->update_ak_sk(ctrl, &req, &resp, nullptr); + return http_json_reply(resp.status()); +} + +HttpResponse process_create_instance(MetaServiceImpl* service, brpc::Controller* ctrl) { + CreateInstanceRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + CreateInstanceResponse resp; + service->create_instance(ctrl, &req, &resp, nullptr); + return http_json_reply(resp.status()); +} + +HttpResponse process_alter_instance(MetaServiceImpl* service, brpc::Controller* ctrl) { + static std::unordered_map> + operations {{"rename_instance", {AlterInstanceRequest::RENAME}}, + {"enable_instance_sse", {AlterInstanceRequest::ENABLE_SSE}}, + {"disable_instance_sse", {AlterInstanceRequest::DISABLE_SSE}}, + {"drop_instance", {AlterInstanceRequest::DROP}}, + {"set_instance_status", + {AlterInstanceRequest::SET_NORMAL, AlterInstanceRequest::SET_OVERDUE}}}; + + auto& path = ctrl->http_request().unresolved_path(); + auto it = operations.find(http_api_route(path)); + if (it == operations.end()) { + std::string msg = "not supportted alter instance operation: '" + path + + "', route=" + std::string(http_api_route(path)); + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); + } + + AlterInstanceRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + // for unresolved path whose corresponding operation is signal, we need set operation by ourselves. + if ((it->second).size() == 1) { + req.set_op((it->second)[0]); + } + AlterInstanceResponse resp; + service->alter_instance(ctrl, &req, &resp, nullptr); + return http_json_reply(resp.status()); +} + +HttpResponse process_abort_txn(MetaServiceImpl* service, brpc::Controller* ctrl) { + AbortTxnRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + AbortTxnResponse resp; + service->abort_txn(ctrl, &req, &resp, nullptr); + return http_json_reply(resp.status()); +} + +HttpResponse process_abort_tablet_job(MetaServiceImpl* service, brpc::Controller* ctrl) { + FinishTabletJobRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + req.set_action(FinishTabletJobRequest::ABORT); + FinishTabletJobResponse resp; + service->finish_tablet_job(ctrl, &req, &resp, nullptr); + return http_json_reply(resp.status()); +} + +HttpResponse process_alter_ram_user(MetaServiceImpl* service, brpc::Controller* ctrl) { + AlterRamUserRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + AlterRamUserResponse resp; + service->alter_ram_user(ctrl, &req, &resp, nullptr); + return http_json_reply(resp.status()); +} + +HttpResponse process_alter_iam(MetaServiceImpl* service, brpc::Controller* ctrl) { + AlterIamRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + AlterIamResponse resp; + service->alter_iam(ctrl, &req, &resp, nullptr); + return http_json_reply(resp.status()); +} + +HttpResponse process_adjust_rate_limit(MetaServiceImpl* service, brpc::Controller* cntl) { + const auto& uri = cntl->http_request().uri(); + auto qps_limit_str = std::string {http_query(uri, "qps_limit")}; + auto rpc_name = std::string {http_query(uri, "rpc_name")}; + auto instance_id = std::string {http_query(uri, "instance_id")}; + + auto process_set_qps_limit = [&](std::function cb) -> HttpResponse { + DCHECK(!qps_limit_str.empty()); + int64_t qps_limit = -1; + try { + qps_limit = std::stoll(qps_limit_str); + } catch (const std::exception& ex) { + return http_json_reply( + MetaServiceCode::INVALID_ARGUMENT, + fmt::format("param `qps_limit` is not a legal int64 type:{}", ex.what())); + } + if (qps_limit < 0) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + "`qps_limit` should not be less than 0"); + } + if (cb(qps_limit)) { + return http_json_reply(MetaServiceCode::OK, "sucess to adjust rate limit"); + } + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + fmt::format("failed to adjust rate limit for qps_limit={}, " + "rpc_name={}, instance_id={}, plz ensure correct " + "rpc/instance name", + qps_limit_str, rpc_name, instance_id)); + }; + + auto set_global_qps_limit = [process_set_qps_limit, service]() { + return process_set_qps_limit([service](int64_t qps_limit) { + return service->rate_limiter()->set_rate_limit(qps_limit); + }); + }; + + auto set_rpc_qps_limit = [&]() { + return process_set_qps_limit([&](int64_t qps_limit) { + return service->rate_limiter()->set_rate_limit(qps_limit, rpc_name); + }); + }; + + auto set_instance_qps_limit = [&]() { + return process_set_qps_limit([&](int64_t qps_limit) { + return service->rate_limiter()->set_instance_rate_limit(qps_limit, instance_id); + }); + }; + + auto set_instance_rpc_qps_limit = [&]() { + return process_set_qps_limit([&](int64_t qps_limit) { + return service->rate_limiter()->set_rate_limit(qps_limit, rpc_name, instance_id); + }); + }; + + auto process_invalid_arguments = [&]() -> HttpResponse { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + fmt::format("invalid argument: qps_limit(required)={}, " + "rpc_name(optional)={}, instance_id(optional)={}", + qps_limit_str, rpc_name, instance_id)); + }; + + // We have 3 optional params and 2^3 combination, and 4 of them are illegal. + // We register callbacks for them in porcessors accordings to the level, represented by 3 bits. + std::array, 8> processors; + std::fill_n(processors.begin(), 8, std::move(process_invalid_arguments)); + processors[0b001] = std::move(set_global_qps_limit); + processors[0b011] = std::move(set_rpc_qps_limit); + processors[0b101] = std::move(set_instance_qps_limit); + processors[0b111] = std::move(set_instance_rpc_qps_limit); + + uint8_t level = (0x01 & !qps_limit_str.empty()) | ((0x01 & !rpc_name.empty()) << 1) | + ((0x01 & !instance_id.empty()) << 2); + + DCHECK_LT(level, 8); + + return processors[level](); +} + +HttpResponse process_query_rate_limit(MetaServiceImpl* service, brpc::Controller* cntl) { + auto rate_limiter = service->rate_limiter(); + rapidjson::Document d; + d.SetObject(); + auto get_qps_limit = [&d](std::string_view rpc_name, + std::shared_ptr rpc_limiter) { + rapidjson::Document node; + node.SetObject(); + rapidjson::Document sub; + sub.SetObject(); + auto get_qps_token_limit = [&](std::string_view instance_id, + std::shared_ptr qps_token) { + sub.AddMember(rapidjson::StringRef(instance_id.data(), instance_id.size()), + qps_token->max_qps_limit(), d.GetAllocator()); + }; + rpc_limiter->for_each_qps_token(std::move(get_qps_token_limit)); + + node.AddMember("RPC qps limit", rpc_limiter->max_qps_limit(), d.GetAllocator()); + node.AddMember("instance specific qps limit", sub, d.GetAllocator()); + d.AddMember(rapidjson::StringRef(rpc_name.data(), rpc_name.size()), node, d.GetAllocator()); + }; + rate_limiter->for_each_rpc_limiter(std::move(get_qps_limit)); + + rapidjson::StringBuffer sb; + rapidjson::PrettyWriter writer(sb); + d.Accept(writer); + return http_json_reply(MetaServiceCode::OK, "", sb.GetString()); +} + +// Recycler HTTP handlers +HttpResponse process_recycle_instance(RecyclerServiceImpl* service, brpc::Controller* cntl) { + std::string request_body = cntl->request_attachment().to_string(); + RecycleInstanceRequest req; + auto st = google::protobuf::util::JsonStringToMessage(request_body, &req); + if (!st.ok()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + "failed to parse RecycleInstanceRequest"); + } + RecycleInstanceResponse res; + service->recycle_instance(cntl, &req, &res, nullptr); + return http_text_reply(res.status(), res.status().msg()); +} + +HttpResponse process_statistics_recycle(RecyclerServiceImpl* service, brpc::Controller* cntl) { + std::string request_body = cntl->request_attachment().to_string(); + StatisticsRecycleRequest req; + auto st = google::protobuf::util::JsonStringToMessage(request_body, &req); + if (!st.ok()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + "failed to parse StatisticsRecycleRequest"); + } + MetaServiceCode code = MetaServiceCode::OK; + std::string msg = "OK"; + service->statistics_recycle(req, code, msg); + return http_text_reply(code, msg, msg); +} + +HttpResponse process_recycle_copy_jobs(RecyclerServiceImpl* service, brpc::Controller* cntl) { + const auto* instance_id = cntl->http_request().uri().GetQuery("instance_id"); + if (!instance_id || instance_id->empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "no instance id"); + } + MetaServiceCode code = MetaServiceCode::OK; + std::string msg = "OK"; + recycle_copy_jobs(service->txn_kv(), *instance_id, code, msg, + service->recycler()->thread_pool_group(), service->txn_lazy_committer()); + return http_text_reply(code, msg, msg); +} + +HttpResponse process_recycle_job_info(RecyclerServiceImpl* service, brpc::Controller* cntl) { + const auto* instance_id = cntl->http_request().uri().GetQuery("instance_id"); + if (!instance_id || instance_id->empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "no instance id"); + } + MetaServiceCode code = MetaServiceCode::OK; + std::string msg, key; + job_recycle_key({*instance_id}, &key); + recycle_job_info(service->txn_kv(), *instance_id, key, code, msg); + return http_text_reply(code, msg, msg); +} + +HttpResponse process_check_instance(RecyclerServiceImpl* service, brpc::Controller* cntl) { + const auto* instance_id = cntl->http_request().uri().GetQuery("instance_id"); + if (!instance_id || instance_id->empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "no instance id"); + } + if (!service->checker()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "checker not enabled"); + } + MetaServiceCode code = MetaServiceCode::OK; + std::string msg = "OK"; + service->check_instance(*instance_id, code, msg); + return http_text_reply(code, msg, msg); +} + +HttpResponse process_check_job_info(RecyclerServiceImpl* service, brpc::Controller* cntl) { + const auto* instance_id = cntl->http_request().uri().GetQuery("instance_id"); + if (!instance_id || instance_id->empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "no instance id"); + } + MetaServiceCode code = MetaServiceCode::OK; + std::string msg, key; + job_check_key({*instance_id}, &key); + recycle_job_info(service->txn_kv(), *instance_id, key, code, msg); + return http_text_reply(code, msg, msg); +} + +HttpResponse process_check_meta(RecyclerServiceImpl* service, brpc::Controller* cntl) { + const auto& uri = cntl->http_request().uri(); + const auto* instance_id = uri.GetQuery("instance_id"); + const auto* host = uri.GetQuery("host"); + const auto* port = uri.GetQuery("port"); + const auto* user = uri.GetQuery("user"); + const auto* password = uri.GetQuery("password"); + if (!instance_id || instance_id->empty() || !host || host->empty() || !port || port->empty() || + !password || !user || user->empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "missing required parameters"); + } + std::string msg = "OK"; + check_meta(service->txn_kv(), *instance_id, *host, *port, *user, *password, msg); + return http_text_reply(MetaServiceCode::OK, msg, msg); +} + +HttpResponse process_adjust_rate_limiter(RecyclerServiceImpl*, brpc::Controller* cntl) { + const auto& uri = cntl->http_request().uri(); + const auto* type_string = uri.GetQuery("type"); + const auto* speed = uri.GetQuery("speed"); + const auto* burst = uri.GetQuery("burst"); + const auto* limit = uri.GetQuery("limit"); + if (!type_string || type_string->empty() || !speed || !burst || !limit || + (*type_string != "get" && *type_string != "put")) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "invalid arguments"); + } + auto max_speed = speed->empty() ? 0 : std::stoul(*speed); + auto max_burst = burst->empty() ? 0 : std::stoul(*burst); + auto max_limit = limit->empty() ? 0 : std::stoul(*limit); + if (reset_s3_rate_limiter(string_to_s3_rate_limit_type(*type_string), max_speed, max_burst, + max_limit) != 0) { + return http_json_reply(MetaServiceCode::UNDEFINED_ERR, "adjust failed"); + } + return http_json_reply(MetaServiceCode::OK, ""); +} + +HttpResponse process_show_config(MetaServiceImpl*, brpc::Controller* cntl) { + auto& uri = cntl->http_request().uri(); + std::string_view conf_name = http_query(uri, "conf_key"); + + if (config::full_conf_map == nullptr) { + return http_json_reply(MetaServiceCode::UNDEFINED_ERR, "config map not initialized"); + } + + std::string result = config::show_config(std::string(conf_name)); + return http_json_reply(MetaServiceCode::OK, "", result); +} + +HttpResponse process_update_config(MetaServiceImpl* service, brpc::Controller* cntl) { + const auto& uri = cntl->http_request().uri(); + bool persist = (http_query(uri, "persist") == "true"); + auto configs = std::string {http_query(uri, "configs")}; + auto reason = std::string {http_query(uri, "reason")}; + LOG(INFO) << "modify configs for reason=" << reason << ", configs=" << configs + << ", persist=" << http_query(uri, "persist"); + + if (auto [succ, cause] = config::update_config(configs, persist, config::custom_conf_path); + !succ) { + LOG(WARNING) << cause; + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, cause); + } + return http_json_reply(MetaServiceCode::OK, ""); +} + +HttpResponse process_decode_key(MetaServiceImpl*, brpc::Controller* ctrl) { + auto& uri = ctrl->http_request().uri(); + std::string_view key = http_query(uri, "key"); + if (key.empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "no key to decode"); + } + + bool unicode = http_query(uri, "unicode") != "false"; + std::string body = prettify_key(key, unicode); + if (body.empty()) { + std::string msg = "failed to decode key, key=" + std::string(key); + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); + } + return http_text_reply(MetaServiceCode::OK, "", body); +} + +HttpResponse process_encode_key(MetaServiceImpl*, brpc::Controller* ctrl) { + return process_http_encode_key(ctrl->http_request().uri()); +} + +HttpResponse process_get_value(MetaServiceImpl* service, brpc::Controller* ctrl) { + return process_http_get_value(service->txn_kv().get(), ctrl->http_request().uri()); +} + +HttpResponse process_set_value(MetaServiceImpl* service, brpc::Controller* ctrl) { + return process_http_set_value(service->txn_kv().get(), ctrl); +} + +// show all key ranges and their count. +HttpResponse process_show_meta_ranges(MetaServiceImpl* service, brpc::Controller* ctrl) { + auto txn_kv = std::dynamic_pointer_cast(service->txn_kv()); + if (!txn_kv) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + "this method only support fdb txn kv"); + } + + std::vector partition_boundaries; + TxnErrorCode code = txn_kv->get_partition_boundaries(&partition_boundaries); + if (code != TxnErrorCode::TXN_OK) { + auto msg = fmt::format("failed to get boundaries, code={}", code); + return http_json_reply(MetaServiceCode::UNDEFINED_ERR, msg); + } + std::unordered_map partition_count; + get_kv_range_boundaries_count(partition_boundaries, partition_count); + + // sort ranges by count + std::vector> meta_ranges; + meta_ranges.reserve(partition_count.size()); + for (auto&& [key, count] : partition_count) { + meta_ranges.emplace_back(key, count); + } + + std::sort(meta_ranges.begin(), meta_ranges.end(), + [](const auto& lhs, const auto& rhs) { return lhs.second > rhs.second; }); + + std::string body = fmt::format("total meta ranges: {}\n", partition_boundaries.size()); + for (auto&& [key, count] : meta_ranges) { + body += fmt::format("{}: {}\n", key, count); + } + return http_text_reply(MetaServiceCode::OK, "", body); +} + +HttpResponse process_get_instance_info(MetaServiceImpl* service, brpc::Controller* ctrl) { + auto& uri = ctrl->http_request().uri(); + std::string_view instance_id = http_query(uri, "instance_id"); + std::string_view cloud_unique_id = http_query(uri, "cloud_unique_id"); + + InstanceInfoPB instance; + auto [code, msg] = service->get_instance_info(std::string(instance_id), + std::string(cloud_unique_id), &instance); + return http_json_reply_message(code, msg, instance); +} + +HttpResponse process_get_cluster(MetaServiceImpl* service, brpc::Controller* ctrl) { + GetClusterRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + + bool get_all_cluster_info = false; + // if cluster_id、cluster_name、mysql_user_name all empty, get this instance's all cluster info. + if (req.cluster_id().empty() && req.cluster_name().empty() && req.mysql_user_name().empty()) { + get_all_cluster_info = true; + } + + GetClusterResponse resp; + service->get_cluster(ctrl, &req, &resp, nullptr); + + if (resp.status().code() == MetaServiceCode::OK) { + if (get_all_cluster_info) { + return http_json_reply_message(resp.status(), resp); + } else { + // ATTN: only returns the first cluster pb. + return http_json_reply_message(resp.status(), resp.cluster(0)); + } + } else { + return http_json_reply(resp.status()); + } +} + +HttpResponse process_get_tablet_stats(MetaServiceImpl* service, brpc::Controller* ctrl) { + GetTabletStatsRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + GetTabletStatsResponse resp; + service->get_tablet_stats(ctrl, &req, &resp, nullptr); + + std::string body; + if (resp.status().code() == MetaServiceCode::OK) { + body = resp.DebugString(); + } + return http_text_reply(resp.status(), body); +} + +HttpResponse process_fix_tablet_stats(MetaServiceImpl* service, brpc::Controller* ctrl) { + auto& uri = ctrl->http_request().uri(); + std::string_view cloud_unique_id = http_query(uri, "cloud_unique_id"); + std::string_view table_id = http_query(uri, "table_id"); + std::string_view tablet_id = http_query(uri, "tablet_id"); + + MetaServiceResponseStatus st = service->fix_tablet_stats( + std::string(cloud_unique_id), std::string(table_id), std::string(tablet_id)); + return http_text_reply(st, st.DebugString()); +} + +HttpResponse process_fix_tablet_db_id(MetaServiceImpl* service, brpc::Controller* ctrl) { + auto& uri = ctrl->http_request().uri(); + std::string instance_id(http_query(uri, "instance_id")); + std::string tablet_id_str(http_query(uri, "tablet_id")); + std::string db_id_str(http_query(uri, "db_id")); + + int64_t tablet_id = 0, db_id = 0; + try { + db_id = std::stol(db_id_str); + } catch (const std::exception& e) { + auto msg = fmt::format("db_id {} must be a number, meet error={}", db_id_str, e.what()); + LOG(WARNING) << msg; + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); + } + + try { + tablet_id = std::stol(tablet_id_str); + } catch (const std::exception& e) { + auto msg = fmt::format("tablet_id {} must be a number, meet error={}", tablet_id_str, + e.what()); + LOG(WARNING) << msg; + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); + } + + auto [code, msg] = service->fix_tablet_db_id(instance_id, tablet_id, db_id); + return http_text_reply(code, msg, ""); +} + +HttpResponse process_get_stage(MetaServiceImpl* service, brpc::Controller* ctrl) { + GetStageRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + GetStageResponse resp; + service->get_stage(ctrl, &req, &resp, nullptr); + return http_json_reply_message(resp.status(), resp); +} + +HttpResponse process_get_cluster_status(MetaServiceImpl* service, brpc::Controller* ctrl) { + GetClusterStatusRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + GetClusterStatusResponse resp; + service->get_cluster_status(ctrl, &req, &resp, nullptr); + return http_json_reply_message(resp.status(), resp); +} + +HttpResponse process_txn_lazy_commit(MetaServiceImpl* service, brpc::Controller* ctrl) { + auto& uri = ctrl->http_request().uri(); + std::string instance_id(http_query(uri, "instance_id")); + std::string txn_id_str(http_query(uri, "txn_id")); + if (instance_id.empty() || txn_id_str.empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "instance_id or txn_id is empty"); + } + + int64_t txn_id = 0; + try { + txn_id = std::stol(txn_id_str); + } catch (const std::exception& e) { + auto msg = fmt::format("txn_id {} must be a number, meet error={}", txn_id_str, e.what()); + LOG(WARNING) << msg; + return http_json_reply(MetaServiceCode::UNDEFINED_ERR, msg); + } + + DCHECK_GT(txn_id, 0); + + auto txn_lazy_committer = service->txn_lazy_committer(); + if (!txn_lazy_committer) { + return http_json_reply(MetaServiceCode::UNDEFINED_ERR, "txn lazy committer is nullptr"); + } + + std::shared_ptr task = txn_lazy_committer->submit(instance_id, txn_id); + auto [code, msg] = task->wait(); + return http_json_reply(code, msg); +} + +HttpResponse process_list_snapshot(MetaServiceImpl* service, brpc::Controller* ctrl) { + ListSnapshotRequest req; + auto& uri = ctrl->http_request().uri(); + std::string instance_id(http_query(uri, "instance_id")); + if (instance_id.empty()) { + PARSE_MESSAGE_OR_RETURN(ctrl, req); + } else { + req.set_instance_id(instance_id); + } + + ListSnapshotResponse resp; + service->list_snapshot(ctrl, &req, &resp, nullptr); + return http_json_reply_message(resp.status(), resp); +} + +HttpResponse process_drop_snapshot(MetaServiceImpl* service, brpc::Controller* ctrl) { + auto& uri = ctrl->http_request().uri(); + std::string instance_id(http_query(uri, "instance_id")); + std::string snapshot_id(http_query(uri, "snapshot_id")); + if (instance_id.empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "instance_id is empty"); + } + if (snapshot_id.empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "snapshot_id is empty"); + } + DropSnapshotRequest req; + req.set_snapshot_id(snapshot_id); + DropSnapshotResponse resp; + service->snapshot_manager()->drop_snapshot(instance_id, req, &resp); + return http_json_reply(resp.status()); +} + +HttpResponse process_set_snapshot_property(MetaServiceImpl* service, brpc::Controller* ctrl) { + AlterInstanceRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + auto* properties = req.mutable_properties(); + if (properties->contains("status")) { + std::string status = properties->at("status"); + if (status != "ENABLED" && status != "DISABLED") { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + "Invalid value for status property: " + status + + ", expected 'ENABLED' or 'DISABLED' (case insensitive)"); + } + std::string_view is_enable = (status == "ENABLED") ? "true" : "false"; + const std::string& property_name = + AlterInstanceRequest::SnapshotProperty_Name(AlterInstanceRequest::ENABLE_SNAPSHOT); + (*properties)[property_name] = is_enable; + properties->erase("status"); + } + if (properties->contains("max_reserved_snapshots")) { + const std::string& property_name = AlterInstanceRequest::SnapshotProperty_Name( + AlterInstanceRequest::MAX_RESERVED_SNAPSHOTS); + (*properties)[property_name] = properties->at("max_reserved_snapshots"); + properties->erase("max_reserved_snapshots"); + } + if (properties->contains("snapshot_interval_seconds")) { + const std::string& property_name = AlterInstanceRequest::SnapshotProperty_Name( + AlterInstanceRequest::SNAPSHOT_INTERVAL_SECONDS); + (*properties)[property_name] = properties->at("snapshot_interval_seconds"); + properties->erase("snapshot_interval_seconds"); + } + req.set_op(AlterInstanceRequest::SET_SNAPSHOT_PROPERTY); + AlterInstanceResponse resp; + service->alter_instance(ctrl, &req, &resp, nullptr); + return http_json_reply(resp.status()); +} + +HttpResponse process_set_multi_version_status(MetaServiceImpl* service, brpc::Controller* ctrl) { + auto& uri = ctrl->http_request().uri(); + std::string instance_id(http_query(uri, "instance_id")); + std::string cloud_unique_id(http_query(uri, "cloud_unique_id")); + std::string multi_version_status_str(http_query(uri, "multi_version_status")); + + // Prefer instance_id if provided, fallback to cloud_unique_id + if (instance_id.empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "empty instance id"); + } + + if (multi_version_status_str.empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + "multi_version_status is required"); + } + + // Parse multi_version_status from string to enum + MultiVersionStatus multi_version_status; + std::string multi_version_status_upper = multi_version_status_str; + std::ranges::transform(multi_version_status_upper, multi_version_status_upper.begin(), + ::toupper); + + if (multi_version_status_upper == "MULTI_VERSION_DISABLED") { + multi_version_status = MultiVersionStatus::MULTI_VERSION_DISABLED; + } else if (multi_version_status_upper == "MULTI_VERSION_WRITE_ONLY") { + multi_version_status = MultiVersionStatus::MULTI_VERSION_WRITE_ONLY; + } else if (multi_version_status_upper == "MULTI_VERSION_READ_WRITE") { + multi_version_status = MultiVersionStatus::MULTI_VERSION_READ_WRITE; + } else if (multi_version_status_upper == "MULTI_VERSION_ENABLED") { + multi_version_status = MultiVersionStatus::MULTI_VERSION_ENABLED; + } else { + return http_json_reply( + MetaServiceCode::INVALID_ARGUMENT, + "invalid multi_version_status value. Supported values: MULTI_VERSION_DISABLED, " + "MULTI_VERSION_WRITE_ONLY, MULTI_VERSION_READ_WRITE, MULTI_VERSION_ENABLED"); + } + // Call snapshot manager directly + auto [code, msg] = service->snapshot_manager()->set_multi_version_status(instance_id, + multi_version_status); + + return http_json_reply(code, msg); +} + +HttpResponse process_get_snapshot_property(MetaServiceImpl* service, brpc::Controller* ctrl) { + auto& uri = ctrl->http_request().uri(); + std::string_view instance_id = http_query(uri, "instance_id"); + std::string_view cloud_unique_id = http_query(uri, "cloud_unique_id"); + + if (instance_id.empty() && cloud_unique_id.empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + "empty instance_id and cloud_unique_id"); + } + + InstanceInfoPB instance; + auto [code, msg] = service->get_instance_info(std::string(instance_id), + std::string(cloud_unique_id), &instance); + if (code != MetaServiceCode::OK) { + return http_json_reply(code, msg); + } + + // Build snapshot properties response + rapidjson::Document doc; + doc.SetObject(); + + // Snapshot switch status + std::string_view switch_status; + switch (instance.snapshot_switch_status()) { + case SNAPSHOT_SWITCH_DISABLED: + switch_status = "UNSUPPORTED"; + break; + case SNAPSHOT_SWITCH_OFF: + switch_status = "DISABLED"; + break; + case SNAPSHOT_SWITCH_ON: + switch_status = "ENABLED"; + break; + default: + switch_status = "UNKNOWN"; + break; + } + doc.AddMember("status", rapidjson::StringRef(switch_status.data(), switch_status.size()), + doc.GetAllocator()); + + // Max reserved snapshots + if (instance.has_max_reserved_snapshot()) { + doc.AddMember("max_reserved_snapshots", instance.max_reserved_snapshot(), + doc.GetAllocator()); + } + + // Snapshot interval seconds + if (instance.has_snapshot_interval_seconds()) { + doc.AddMember("snapshot_interval_seconds", instance.snapshot_interval_seconds(), + doc.GetAllocator()); + } + + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + doc.Accept(writer); + + return http_json_reply(MetaServiceCode::OK, "", buffer.GetString()); +} + +} // namespace doris::cloud \ No newline at end of file diff --git a/cloud/src/common/http_helper.h b/cloud/src/common/http_helper.h new file mode 100644 index 00000000000000..79a460b49d1a14 --- /dev/null +++ b/cloud/src/common/http_helper.h @@ -0,0 +1,206 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "meta-service/meta_service_http.h" + +namespace doris::cloud { + +inline constexpr std::string_view kDefaultHttpApiVersion = "v1"; + +struct HttpApiPath { + std::string_view version; + std::string_view route; +}; + +[[maybe_unused]] static bool is_http_api_version(std::string_view segment) { + if (segment.size() < 2 || segment.front() != 'v') { + return false; + } + return std::ranges::all_of(segment.substr(1), [](char ch) { return ch >= '0' && ch <= '9'; }); +} + +[[maybe_unused]] static HttpApiPath split_http_api_path(std::string_view path) { + auto separator = path.find('/'); + if (separator == std::string_view::npos) { + return {.version = "", .route = path}; + } + // This helper only splits the version segment from the route. Whether a version is actually + // supported is determined by exact route registration in get_http_handlers(). + auto segment = path.substr(0, separator); + if (!is_http_api_version(segment)) { + return {.version = "", .route = path}; + } + return {.version = segment, .route = path.substr(separator + 1)}; +} + +[[maybe_unused]] static std::string_view http_api_route(std::string_view path) { + return split_http_api_path(path).route; +} + +[[maybe_unused]] static const HttpHandler* resolve_http_handler(const HttpHandlerInfo& handler_info, + std::string_view version) { + if (version.empty() || version == kDefaultHttpApiVersion) { + return &handler_info.handler; + } + auto it = handler_info.versioned_handlers.find(version); + return it == handler_info.versioned_handlers.end() ? nullptr : &it->second; +} + +const std::unordered_map& get_http_handlers(); + +// injection_point_http.cpp +[[maybe_unused]] HttpResponse process_injection_point(MetaServiceImpl* service, + brpc::Controller* ctrl); + +// MetaService Http handlers +[[maybe_unused]] HttpResponse process_alter_cluster(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_get_obj_store_info(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_alter_obj_store_info(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_alter_storage_vault(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_update_ak_sk(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_create_instance(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_alter_instance(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_abort_txn(MetaServiceImpl* service, brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_abort_tablet_job(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_alter_ram_user(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_alter_iam(MetaServiceImpl* service, brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_adjust_rate_limit(MetaServiceImpl* service, + brpc::Controller* cntl); + +[[maybe_unused]] HttpResponse process_query_rate_limit(MetaServiceImpl* service, + brpc::Controller* cntl); + +[[maybe_unused]] HttpResponse process_decode_key(MetaServiceImpl*, brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_encode_key(MetaServiceImpl*, brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_get_value(MetaServiceImpl* service, brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_set_value(MetaServiceImpl* service, brpc::Controller* ctrl); + +// show all key ranges and their count. +[[maybe_unused]] HttpResponse process_show_meta_ranges(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_get_instance_info(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_get_cluster(MetaServiceImpl* service, brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_get_tablet_stats(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_fix_tablet_stats(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_fix_tablet_db_id(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_get_stage(MetaServiceImpl* service, brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_get_cluster_status(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_txn_lazy_commit(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_list_snapshot(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_drop_snapshot(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_set_snapshot_property(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_set_multi_version_status(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_get_snapshot_property(MetaServiceImpl* service, + brpc::Controller* ctrl); + +// Recycler HTTP handlers +[[maybe_unused]] HttpResponse process_recycle_instance(RecyclerServiceImpl* service, + brpc::Controller* cntl); + +[[maybe_unused]] HttpResponse process_statistics_recycle(RecyclerServiceImpl* service, + brpc::Controller* cntl); + +[[maybe_unused]] HttpResponse process_recycle_copy_jobs(RecyclerServiceImpl* service, + brpc::Controller* cntl); + +[[maybe_unused]] HttpResponse process_recycle_job_info(RecyclerServiceImpl* service, + brpc::Controller* cntl); +[[maybe_unused]] HttpResponse process_check_instance(RecyclerServiceImpl* service, + brpc::Controller* cntl); + +[[maybe_unused]] HttpResponse process_check_job_info(RecyclerServiceImpl* service, + brpc::Controller* cntl); + +[[maybe_unused]] HttpResponse process_check_meta(RecyclerServiceImpl* service, + brpc::Controller* cntl); + +[[maybe_unused]] HttpResponse process_adjust_rate_limiter(RecyclerServiceImpl*, + brpc::Controller* cntl); + +// Both http handlers +[[maybe_unused]] HttpResponse process_show_config(MetaServiceImpl*, brpc::Controller* cntl); + +[[maybe_unused]] HttpResponse process_update_config(MetaServiceImpl* service, + brpc::Controller* cntl); + +} // namespace doris::cloud \ No newline at end of file diff --git a/cloud/src/meta-service/meta_service_http.cpp b/cloud/src/meta-service/meta_service_http.cpp index ff2c4ce080d1da..d28a5af6a8eb02 100644 --- a/cloud/src/meta-service/meta_service_http.cpp +++ b/cloud/src/meta-service/meta_service_http.cpp @@ -32,9 +32,7 @@ #include #include -#include #include -#include #include #include #include @@ -43,37 +41,15 @@ #include #include -#include "common/config.h" -#include "common/configbase.h" +#include "common/http_helper.h" #include "common/logging.h" -#include "common/string_util.h" #include "meta-service/meta_service_helper.h" #include "meta-store/txn_kv.h" -#include "meta-store/txn_kv_error.h" #include "meta_service.h" -#include "rate-limiter/rate_limiter.h" +#include "recycler/recycler_service.h" namespace doris::cloud { -#define PARSE_MESSAGE_OR_RETURN(ctrl, req) \ - do { \ - std::string body = ctrl->request_attachment().to_string(); \ - auto& unresolved_path = ctrl->http_request().unresolved_path(); \ - auto st = parse_json_message(unresolved_path, body, &req); \ - if (!st.ok()) { \ - std::string msg = "parse http request '" + unresolved_path + "': " + st.ToString(); \ - LOG_WARNING(msg).tag("body", encryt_sk(body)); \ - return http_json_reply(MetaServiceCode::PROTOBUF_PARSE_ERR, msg); \ - } \ - } while (0) - -extern std::string get_instance_id(const std::shared_ptr& rc_mgr, - const std::string& cloud_unique_id); - -extern int decrypt_instance_info(InstanceInfoPB& instance, const std::string& instance_id, - MetaServiceCode& code, std::string& msg, - std::shared_ptr& txn); - extern void get_kv_range_boundaries_count(std::vector& partition_boundaries, std::unordered_map& partition_count); @@ -130,15 +106,19 @@ std::tuple convert_ms_code_to_http_code(MetaServiceCode r HttpResponse http_json_reply(MetaServiceCode code, const std::string& msg, std::optional body) { auto [status_code, status_msg] = convert_ms_code_to_http_code(code); + std::string_view response_msg = + code == MetaServiceCode::OK && msg.empty() ? status_msg : std::string_view(msg); rapidjson::Document d; d.SetObject(); if (code == MetaServiceCode::OK) { d.AddMember("code", "OK", d.GetAllocator()); - d.AddMember("msg", "", d.GetAllocator()); + d.AddMember("msg", rapidjson::StringRef(response_msg.data(), response_msg.size()), + d.GetAllocator()); } else { d.AddMember("code", rapidjson::StringRef(status_msg.data(), status_msg.size()), d.GetAllocator()); - d.AddMember("msg", rapidjson::StringRef(msg.data(), msg.size()), d.GetAllocator()); + d.AddMember("msg", rapidjson::StringRef(response_msg.data(), response_msg.size()), + d.GetAllocator()); } rapidjson::Document result; @@ -158,7 +138,7 @@ HttpResponse http_json_reply(MetaServiceCode code, const std::string& msg, rapidjson::StringBuffer sb; rapidjson::PrettyWriter writer(sb); d.Accept(writer); - return {status_code, msg, sb.GetString()}; + return {.status_code = status_code, .msg = std::string(response_msg), .body = sb.GetString()}; } static std::string format_http_request(brpc::Controller* cntl) { @@ -182,790 +162,21 @@ static std::string format_http_request(brpc::Controller* cntl) { return ss.str(); } -static std::string_view remove_version_prefix(std::string_view path) { - if (path.size() > 3 && path.substr(0, 3) == "v1/") path.remove_prefix(3); - return path; -} - -HttpResponse process_injection_point(MetaServiceImpl* service, brpc::Controller* ctrl); - -static HttpResponse process_alter_cluster(MetaServiceImpl* service, brpc::Controller* ctrl) { - static std::unordered_map operations { - {"add_cluster", AlterClusterRequest::ADD_CLUSTER}, - {"drop_cluster", AlterClusterRequest::DROP_CLUSTER}, - {"rename_cluster", AlterClusterRequest::RENAME_CLUSTER}, - {"update_cluster_endpoint", AlterClusterRequest::UPDATE_CLUSTER_ENDPOINT}, - {"update_cluster_mysql_user_name", AlterClusterRequest::UPDATE_CLUSTER_MYSQL_USER_NAME}, - {"add_node", AlterClusterRequest::ADD_NODE}, - {"drop_node", AlterClusterRequest::DROP_NODE}, - {"decommission_node", AlterClusterRequest::DECOMMISSION_NODE}, - {"set_cluster_status", AlterClusterRequest::SET_CLUSTER_STATUS}, - {"notify_decommissioned", AlterClusterRequest::NOTIFY_DECOMMISSIONED}, - {"alter_vcluster_info", AlterClusterRequest::ALTER_VCLUSTER_INFO}, - }; - - auto& path = ctrl->http_request().unresolved_path(); - auto body = ctrl->request_attachment().to_string(); - auto it = operations.find(remove_version_prefix(path)); - if (it == operations.end()) { - std::string msg = "not supportted alter cluster operation: " + path; - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); - } - - AlterClusterRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - - req.set_op(it->second); - AlterClusterResponse resp; - service->alter_cluster(ctrl, &req, &resp, nullptr); - return http_json_reply(resp.status()); -} - -static HttpResponse process_get_obj_store_info(MetaServiceImpl* service, brpc::Controller* ctrl) { - GetObjStoreInfoRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - - GetObjStoreInfoResponse resp; - service->get_obj_store_info(ctrl, &req, &resp, nullptr); - return http_json_reply_message(resp.status(), resp); -} - -static HttpResponse process_alter_obj_store_info(MetaServiceImpl* service, brpc::Controller* ctrl) { - static std::unordered_map operations { - {"add_obj_info", AlterObjStoreInfoRequest::ADD_OBJ_INFO}, - {"legacy_update_ak_sk", AlterObjStoreInfoRequest::LEGACY_UPDATE_AK_SK}, - {"alter_obj_info", AlterObjStoreInfoRequest::ALTER_OBJ_INFO}}; - - auto& path = ctrl->http_request().unresolved_path(); - auto it = operations.find(remove_version_prefix(path)); - if (it == operations.end()) { - std::string msg = "not supportted alter obj store info operation: " + path; - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); - } - - AlterObjStoreInfoRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - req.set_op(it->second); - - AlterObjStoreInfoResponse resp; - service->alter_obj_store_info(ctrl, &req, &resp, nullptr); - return http_json_reply(resp.status()); -} - -static HttpResponse process_alter_storage_vault(MetaServiceImpl* service, brpc::Controller* ctrl) { - static std::unordered_map operations { - {"drop_s3_vault", AlterObjStoreInfoRequest::DROP_S3_VAULT}, - {"add_s3_vault", AlterObjStoreInfoRequest::ADD_S3_VAULT}, - {"alter_s3_vault", AlterObjStoreInfoRequest::ALTER_S3_VAULT}, - {"drop_hdfs_vault", AlterObjStoreInfoRequest::DROP_HDFS_INFO}, - {"add_hdfs_vault", AlterObjStoreInfoRequest::ADD_HDFS_INFO}}; - - auto& path = ctrl->http_request().unresolved_path(); - auto it = operations.find(remove_version_prefix(path)); - if (it == operations.end()) { - std::string msg = "not supportted alter storage vault operation: " + path; - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); - } - - AlterObjStoreInfoRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - req.set_op(it->second); - - AlterObjStoreInfoResponse resp; - service->alter_storage_vault(ctrl, &req, &resp, nullptr); - return http_json_reply(resp.status()); -} - -static HttpResponse process_update_ak_sk(MetaServiceImpl* service, brpc::Controller* ctrl) { - UpdateAkSkRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - UpdateAkSkResponse resp; - service->update_ak_sk(ctrl, &req, &resp, nullptr); - return http_json_reply(resp.status()); -} - -static HttpResponse process_create_instance(MetaServiceImpl* service, brpc::Controller* ctrl) { - CreateInstanceRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - CreateInstanceResponse resp; - service->create_instance(ctrl, &req, &resp, nullptr); - return http_json_reply(resp.status()); -} - -static HttpResponse process_alter_instance(MetaServiceImpl* service, brpc::Controller* ctrl) { - static std::unordered_map> - operations {{"rename_instance", {AlterInstanceRequest::RENAME}}, - {"enable_instance_sse", {AlterInstanceRequest::ENABLE_SSE}}, - {"disable_instance_sse", {AlterInstanceRequest::DISABLE_SSE}}, - {"drop_instance", {AlterInstanceRequest::DROP}}, - {"set_instance_status", - {AlterInstanceRequest::SET_NORMAL, AlterInstanceRequest::SET_OVERDUE}}}; - - auto& path = ctrl->http_request().unresolved_path(); - auto it = operations.find(remove_version_prefix(path)); - if (it == operations.end()) { - std::string msg = "not supportted alter instance operation: '" + path + - "', remove version prefix=" + std::string(remove_version_prefix(path)); - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); - } - - AlterInstanceRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - // for unresolved path whose corresponding operation is signal, we need set operation by ourselves. - if ((it->second).size() == 1) { - req.set_op((it->second)[0]); - } - AlterInstanceResponse resp; - service->alter_instance(ctrl, &req, &resp, nullptr); - return http_json_reply(resp.status()); -} - -static HttpResponse process_abort_txn(MetaServiceImpl* service, brpc::Controller* ctrl) { - AbortTxnRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - AbortTxnResponse resp; - service->abort_txn(ctrl, &req, &resp, nullptr); - return http_json_reply(resp.status()); -} - -static HttpResponse process_abort_tablet_job(MetaServiceImpl* service, brpc::Controller* ctrl) { - FinishTabletJobRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - req.set_action(FinishTabletJobRequest::ABORT); - FinishTabletJobResponse resp; - service->finish_tablet_job(ctrl, &req, &resp, nullptr); - return http_json_reply(resp.status()); -} - -static HttpResponse process_alter_ram_user(MetaServiceImpl* service, brpc::Controller* ctrl) { - AlterRamUserRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - AlterRamUserResponse resp; - service->alter_ram_user(ctrl, &req, &resp, nullptr); - return http_json_reply(resp.status()); -} - -static HttpResponse process_alter_iam(MetaServiceImpl* service, brpc::Controller* ctrl) { - AlterIamRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - AlterIamResponse resp; - service->alter_iam(ctrl, &req, &resp, nullptr); - return http_json_reply(resp.status()); -} - -static HttpResponse process_adjust_rate_limit(MetaServiceImpl* service, brpc::Controller* cntl) { - const auto& uri = cntl->http_request().uri(); - auto qps_limit_str = std::string {http_query(uri, "qps_limit")}; - auto rpc_name = std::string {http_query(uri, "rpc_name")}; - auto instance_id = std::string {http_query(uri, "instance_id")}; - - auto process_set_qps_limit = [&](std::function cb) -> HttpResponse { - DCHECK(!qps_limit_str.empty()); - int64_t qps_limit = -1; - try { - qps_limit = std::stoll(qps_limit_str); - } catch (const std::exception& ex) { - return http_json_reply( - MetaServiceCode::INVALID_ARGUMENT, - fmt::format("param `qps_limit` is not a legal int64 type:{}", ex.what())); - } - if (qps_limit < 0) { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, - "`qps_limit` should not be less than 0"); - } - if (cb(qps_limit)) { - return http_json_reply(MetaServiceCode::OK, "sucess to adjust rate limit"); - } - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, - fmt::format("failed to adjust rate limit for qps_limit={}, " - "rpc_name={}, instance_id={}, plz ensure correct " - "rpc/instance name", - qps_limit_str, rpc_name, instance_id)); - }; - - auto set_global_qps_limit = [process_set_qps_limit, service]() { - return process_set_qps_limit([service](int64_t qps_limit) { - return service->rate_limiter()->set_rate_limit(qps_limit); - }); - }; - - auto set_rpc_qps_limit = [&]() { - return process_set_qps_limit([&](int64_t qps_limit) { - return service->rate_limiter()->set_rate_limit(qps_limit, rpc_name); - }); - }; - - auto set_instance_qps_limit = [&]() { - return process_set_qps_limit([&](int64_t qps_limit) { - return service->rate_limiter()->set_instance_rate_limit(qps_limit, instance_id); - }); - }; - - auto set_instance_rpc_qps_limit = [&]() { - return process_set_qps_limit([&](int64_t qps_limit) { - return service->rate_limiter()->set_rate_limit(qps_limit, rpc_name, instance_id); - }); - }; - - auto process_invalid_arguments = [&]() -> HttpResponse { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, - fmt::format("invalid argument: qps_limit(required)={}, " - "rpc_name(optional)={}, instance_id(optional)={}", - qps_limit_str, rpc_name, instance_id)); - }; - - // We have 3 optional params and 2^3 combination, and 4 of them are illegal. - // We register callbacks for them in porcessors accordings to the level, represented by 3 bits. - std::array, 8> processors; - std::fill_n(processors.begin(), 8, std::move(process_invalid_arguments)); - processors[0b001] = std::move(set_global_qps_limit); - processors[0b011] = std::move(set_rpc_qps_limit); - processors[0b101] = std::move(set_instance_qps_limit); - processors[0b111] = std::move(set_instance_rpc_qps_limit); - - uint8_t level = (0x01 & !qps_limit_str.empty()) | ((0x01 & !rpc_name.empty()) << 1) | - ((0x01 & !instance_id.empty()) << 2); - - DCHECK_LT(level, 8); - - return processors[level](); -} - -static HttpResponse process_query_rate_limit(MetaServiceImpl* service, brpc::Controller* cntl) { - auto rate_limiter = service->rate_limiter(); - rapidjson::Document d; - d.SetObject(); - auto get_qps_limit = [&d](std::string_view rpc_name, - std::shared_ptr rpc_limiter) { - rapidjson::Document node; - node.SetObject(); - rapidjson::Document sub; - sub.SetObject(); - auto get_qps_token_limit = [&](std::string_view instance_id, - std::shared_ptr qps_token) { - sub.AddMember(rapidjson::StringRef(instance_id.data(), instance_id.size()), - qps_token->max_qps_limit(), d.GetAllocator()); - }; - rpc_limiter->for_each_qps_token(std::move(get_qps_token_limit)); - - node.AddMember("RPC qps limit", rpc_limiter->max_qps_limit(), d.GetAllocator()); - node.AddMember("instance specific qps limit", sub, d.GetAllocator()); - d.AddMember(rapidjson::StringRef(rpc_name.data(), rpc_name.size()), node, d.GetAllocator()); - }; - rate_limiter->for_each_rpc_limiter(std::move(get_qps_limit)); - - rapidjson::StringBuffer sb; - rapidjson::PrettyWriter writer(sb); - d.Accept(writer); - return http_json_reply(MetaServiceCode::OK, "", sb.GetString()); -} - -static HttpResponse process_update_config(MetaServiceImpl* service, brpc::Controller* cntl) { - const auto& uri = cntl->http_request().uri(); - bool persist = (http_query(uri, "persist") == "true"); - auto configs = std::string {http_query(uri, "configs")}; - auto reason = std::string {http_query(uri, "reason")}; - LOG(INFO) << "modify configs for reason=" << reason << ", configs=" << configs - << ", persist=" << http_query(uri, "persist"); - if (configs.empty()) [[unlikely]] { - LOG(WARNING) << "query param `config` should not be empty"; - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, - "query param `config` should not be empty"); - } - std::unordered_map conf_map; - auto conf_list = split(configs, ','); - for (const auto& conf : conf_list) { - auto conf_pair = split(conf, '='); - if (conf_pair.size() != 2) [[unlikely]] { - LOG(WARNING) << "failed to split config=[{}] from `k=v` pattern" << conf; - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, - fmt::format("config {} is invalid", configs)); - } - trim(conf_pair[0]); - trim(conf_pair[1]); - conf_map.emplace(std::move(conf_pair[0]), std::move(conf_pair[1])); - } - if (auto [succ, cause] = - config::set_config(std::move(conf_map), persist, config::custom_conf_path); - !succ) { - LOG(WARNING) << cause; - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, cause); - } - return http_json_reply(MetaServiceCode::OK, ""); -} - -static HttpResponse process_decode_key(MetaServiceImpl*, brpc::Controller* ctrl) { - auto& uri = ctrl->http_request().uri(); - std::string_view key = http_query(uri, "key"); - if (key.empty()) { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "no key to decode"); - } - - bool unicode = http_query(uri, "unicode") != "false"; - std::string body = prettify_key(key, unicode); - if (body.empty()) { - std::string msg = "failed to decode key, key=" + std::string(key); - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); - } - return http_text_reply(MetaServiceCode::OK, "", body); -} - -static HttpResponse process_encode_key(MetaServiceImpl*, brpc::Controller* ctrl) { - return process_http_encode_key(ctrl->http_request().uri()); -} - -static HttpResponse process_get_value(MetaServiceImpl* service, brpc::Controller* ctrl) { - return process_http_get_value(service->txn_kv().get(), ctrl->http_request().uri()); -} - -static HttpResponse process_set_value(MetaServiceImpl* service, brpc::Controller* ctrl) { - return process_http_set_value(service->txn_kv().get(), ctrl); -} - -// show all key ranges and their count. -static HttpResponse process_show_meta_ranges(MetaServiceImpl* service, brpc::Controller* ctrl) { - auto txn_kv = std::dynamic_pointer_cast(service->txn_kv()); - if (!txn_kv) { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, - "this method only support fdb txn kv"); - } - - std::vector partition_boundaries; - TxnErrorCode code = txn_kv->get_partition_boundaries(&partition_boundaries); - if (code != TxnErrorCode::TXN_OK) { - auto msg = fmt::format("failed to get boundaries, code={}", code); - return http_json_reply(MetaServiceCode::UNDEFINED_ERR, msg); - } - std::unordered_map partition_count; - get_kv_range_boundaries_count(partition_boundaries, partition_count); - - // sort ranges by count - std::vector> meta_ranges; - meta_ranges.reserve(partition_count.size()); - for (auto&& [key, count] : partition_count) { - meta_ranges.emplace_back(key, count); - } - - std::sort(meta_ranges.begin(), meta_ranges.end(), - [](const auto& lhs, const auto& rhs) { return lhs.second > rhs.second; }); - - std::string body = fmt::format("total meta ranges: {}\n", partition_boundaries.size()); - for (auto&& [key, count] : meta_ranges) { - body += fmt::format("{}: {}\n", key, count); - } - return http_text_reply(MetaServiceCode::OK, "", body); -} - -static HttpResponse process_get_instance_info(MetaServiceImpl* service, brpc::Controller* ctrl) { - auto& uri = ctrl->http_request().uri(); - std::string_view instance_id = http_query(uri, "instance_id"); - std::string_view cloud_unique_id = http_query(uri, "cloud_unique_id"); - - InstanceInfoPB instance; - auto [code, msg] = service->get_instance_info(std::string(instance_id), - std::string(cloud_unique_id), &instance); - return http_json_reply_message(code, msg, instance); -} - -static HttpResponse process_get_cluster(MetaServiceImpl* service, brpc::Controller* ctrl) { - GetClusterRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - - bool get_all_cluster_info = false; - // if cluster_id、cluster_name、mysql_user_name all empty, get this instance's all cluster info. - if (req.cluster_id().empty() && req.cluster_name().empty() && req.mysql_user_name().empty()) { - get_all_cluster_info = true; - } - - GetClusterResponse resp; - service->get_cluster(ctrl, &req, &resp, nullptr); - - if (resp.status().code() == MetaServiceCode::OK) { - if (get_all_cluster_info) { - return http_json_reply_message(resp.status(), resp); - } else { - // ATTN: only returns the first cluster pb. - return http_json_reply_message(resp.status(), resp.cluster(0)); - } - } else { - return http_json_reply(resp.status()); - } -} - -static HttpResponse process_get_tablet_stats(MetaServiceImpl* service, brpc::Controller* ctrl) { - GetTabletStatsRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - GetTabletStatsResponse resp; - service->get_tablet_stats(ctrl, &req, &resp, nullptr); - - std::string body; - if (resp.status().code() == MetaServiceCode::OK) { - body = resp.DebugString(); - } - return http_text_reply(resp.status(), body); -} - -static HttpResponse process_fix_tablet_stats(MetaServiceImpl* service, brpc::Controller* ctrl) { - auto& uri = ctrl->http_request().uri(); - std::string_view cloud_unique_id = http_query(uri, "cloud_unique_id"); - std::string_view table_id = http_query(uri, "table_id"); - std::string_view tablet_id = http_query(uri, "tablet_id"); - - MetaServiceResponseStatus st = service->fix_tablet_stats( - std::string(cloud_unique_id), std::string(table_id), std::string(tablet_id)); - return http_text_reply(st, st.DebugString()); -} - -static HttpResponse process_fix_tablet_db_id(MetaServiceImpl* service, brpc::Controller* ctrl) { - auto& uri = ctrl->http_request().uri(); - std::string instance_id(http_query(uri, "instance_id")); - std::string tablet_id_str(http_query(uri, "tablet_id")); - std::string db_id_str(http_query(uri, "db_id")); - - int64_t tablet_id = 0, db_id = 0; - try { - db_id = std::stol(db_id_str); - } catch (const std::exception& e) { - auto msg = fmt::format("db_id {} must be a number, meet error={}", db_id_str, e.what()); - LOG(WARNING) << msg; - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); - } - - try { - tablet_id = std::stol(tablet_id_str); - } catch (const std::exception& e) { - auto msg = fmt::format("tablet_id {} must be a number, meet error={}", tablet_id_str, - e.what()); - LOG(WARNING) << msg; - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); - } - - auto [code, msg] = service->fix_tablet_db_id(instance_id, tablet_id, db_id); - return http_text_reply(code, msg, ""); -} - -static HttpResponse process_get_stage(MetaServiceImpl* service, brpc::Controller* ctrl) { - GetStageRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - GetStageResponse resp; - service->get_stage(ctrl, &req, &resp, nullptr); - return http_json_reply_message(resp.status(), resp); -} - -static HttpResponse process_get_cluster_status(MetaServiceImpl* service, brpc::Controller* ctrl) { - GetClusterStatusRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - GetClusterStatusResponse resp; - service->get_cluster_status(ctrl, &req, &resp, nullptr); - return http_json_reply_message(resp.status(), resp); -} - -static HttpResponse process_txn_lazy_commit(MetaServiceImpl* service, brpc::Controller* ctrl) { - auto& uri = ctrl->http_request().uri(); - std::string instance_id(http_query(uri, "instance_id")); - std::string txn_id_str(http_query(uri, "txn_id")); - if (instance_id.empty() || txn_id_str.empty()) { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "instance_id or txn_id is empty"); - } - - int64_t txn_id = 0; - try { - txn_id = std::stol(txn_id_str); - } catch (const std::exception& e) { - auto msg = fmt::format("txn_id {} must be a number, meet error={}", txn_id_str, e.what()); - LOG(WARNING) << msg; - return http_json_reply(MetaServiceCode::UNDEFINED_ERR, msg); - } - - DCHECK_GT(txn_id, 0); - - auto txn_lazy_committer = service->txn_lazy_committer(); - if (!txn_lazy_committer) { - return http_json_reply(MetaServiceCode::UNDEFINED_ERR, "txn lazy committer is nullptr"); - } - - std::shared_ptr task = txn_lazy_committer->submit(instance_id, txn_id); - auto [code, msg] = task->wait(); - return http_json_reply(code, msg); -} - -static HttpResponse process_unknown(MetaServiceImpl*, brpc::Controller*) { - // ATTN: To be compatible with cloud manager versions higher than this MS - return http_json_reply(MetaServiceCode::OK, ""); -} - -static HttpResponse process_list_snapshot(MetaServiceImpl* service, brpc::Controller* ctrl) { - ListSnapshotRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - ListSnapshotResponse resp; - service->list_snapshot(ctrl, &req, &resp, nullptr); - return http_json_reply_message(resp.status(), resp); -} - -static HttpResponse process_set_snapshot_property(MetaServiceImpl* service, - brpc::Controller* ctrl) { - AlterInstanceRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - auto* properties = req.mutable_properties(); - if (properties->contains("status")) { - std::string status = properties->at("status"); - if (status != "ENABLED" && status != "DISABLED") { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, - "Invalid value for status property: " + status + - ", expected 'ENABLED' or 'DISABLED' (case insensitive)"); - } - std::string_view is_enable = (status == "ENABLED") ? "true" : "false"; - const std::string& property_name = - AlterInstanceRequest::SnapshotProperty_Name(AlterInstanceRequest::ENABLE_SNAPSHOT); - (*properties)[property_name] = is_enable; - properties->erase("status"); - } - if (properties->contains("max_reserved_snapshots")) { - const std::string& property_name = AlterInstanceRequest::SnapshotProperty_Name( - AlterInstanceRequest::MAX_RESERVED_SNAPSHOTS); - (*properties)[property_name] = properties->at("max_reserved_snapshots"); - properties->erase("max_reserved_snapshots"); - } - if (properties->contains("snapshot_interval_seconds")) { - const std::string& property_name = AlterInstanceRequest::SnapshotProperty_Name( - AlterInstanceRequest::SNAPSHOT_INTERVAL_SECONDS); - (*properties)[property_name] = properties->at("snapshot_interval_seconds"); - properties->erase("snapshot_interval_seconds"); - } - req.set_op(AlterInstanceRequest::SET_SNAPSHOT_PROPERTY); - AlterInstanceResponse resp; - service->alter_instance(ctrl, &req, &resp, nullptr); - return http_json_reply(resp.status()); -} - -static HttpResponse process_set_multi_version_status(MetaServiceImpl* service, - brpc::Controller* ctrl) { - auto& uri = ctrl->http_request().uri(); - std::string instance_id(http_query(uri, "instance_id")); - std::string cloud_unique_id(http_query(uri, "cloud_unique_id")); - std::string multi_version_status_str(http_query(uri, "multi_version_status")); - - // Prefer instance_id if provided, fallback to cloud_unique_id - if (instance_id.empty()) { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "empty instance id"); - } - - if (multi_version_status_str.empty()) { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, - "multi_version_status is required"); - } - - // Parse multi_version_status from string to enum - MultiVersionStatus multi_version_status; - std::string multi_version_status_upper = multi_version_status_str; - std::ranges::transform(multi_version_status_upper, multi_version_status_upper.begin(), - ::toupper); - - if (multi_version_status_upper == "MULTI_VERSION_DISABLED") { - multi_version_status = MultiVersionStatus::MULTI_VERSION_DISABLED; - } else if (multi_version_status_upper == "MULTI_VERSION_WRITE_ONLY") { - multi_version_status = MultiVersionStatus::MULTI_VERSION_WRITE_ONLY; - } else if (multi_version_status_upper == "MULTI_VERSION_READ_WRITE") { - multi_version_status = MultiVersionStatus::MULTI_VERSION_READ_WRITE; - } else if (multi_version_status_upper == "MULTI_VERSION_ENABLED") { - multi_version_status = MultiVersionStatus::MULTI_VERSION_ENABLED; - } else { - return http_json_reply( - MetaServiceCode::INVALID_ARGUMENT, - "invalid multi_version_status value. Supported values: MULTI_VERSION_DISABLED, " - "MULTI_VERSION_WRITE_ONLY, MULTI_VERSION_READ_WRITE, MULTI_VERSION_ENABLED"); - } - // Call snapshot manager directly - auto [code, msg] = service->snapshot_manager()->set_multi_version_status(instance_id, - multi_version_status); - - return http_json_reply(code, msg); -} - -static HttpResponse process_get_snapshot_property(MetaServiceImpl* service, - brpc::Controller* ctrl) { - auto& uri = ctrl->http_request().uri(); - std::string_view instance_id = http_query(uri, "instance_id"); - std::string_view cloud_unique_id = http_query(uri, "cloud_unique_id"); - - if (instance_id.empty() && cloud_unique_id.empty()) { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, - "empty instance_id and cloud_unique_id"); - } - - InstanceInfoPB instance; - auto [code, msg] = service->get_instance_info(std::string(instance_id), - std::string(cloud_unique_id), &instance); - if (code != MetaServiceCode::OK) { - return http_json_reply(code, msg); - } - - // Build snapshot properties response - rapidjson::Document doc; - doc.SetObject(); - - // Snapshot switch status - std::string_view switch_status; - switch (instance.snapshot_switch_status()) { - case SNAPSHOT_SWITCH_DISABLED: - switch_status = "UNSUPPORTED"; - break; - case SNAPSHOT_SWITCH_OFF: - switch_status = "DISABLED"; - break; - case SNAPSHOT_SWITCH_ON: - switch_status = "ENABLED"; - break; - default: - switch_status = "UNKNOWN"; - break; - } - doc.AddMember("status", rapidjson::StringRef(switch_status.data(), switch_status.size()), - doc.GetAllocator()); - - // Max reserved snapshots - if (instance.has_max_reserved_snapshot()) { - doc.AddMember("max_reserved_snapshots", instance.max_reserved_snapshot(), - doc.GetAllocator()); - } - - // Snapshot interval seconds - if (instance.has_snapshot_interval_seconds()) { - doc.AddMember("snapshot_interval_seconds", instance.snapshot_interval_seconds(), - doc.GetAllocator()); - } - - rapidjson::StringBuffer buffer; - rapidjson::Writer writer(buffer); - doc.Accept(writer); - - return http_json_reply(MetaServiceCode::OK, "", buffer.GetString()); -} - void MetaServiceImpl::http(::google::protobuf::RpcController* controller, const MetaServiceHttpRequest*, MetaServiceHttpResponse*, ::google::protobuf::Closure* done) { - using HttpHandler = HttpResponse (*)(MetaServiceImpl*, brpc::Controller*); - static std::unordered_map http_handlers { - // for alter cluster. - {"add_cluster", process_alter_cluster}, - {"drop_cluster", process_alter_cluster}, - {"rename_cluster", process_alter_cluster}, - {"update_cluster_endpoint", process_alter_cluster}, - {"update_cluster_mysql_user_name", process_alter_cluster}, - {"add_node", process_alter_cluster}, - {"drop_node", process_alter_cluster}, - {"decommission_node", process_alter_cluster}, - {"set_cluster_status", process_alter_cluster}, - {"notify_decommissioned", process_alter_cluster}, - {"alter_vcluster_info", process_alter_cluster}, - {"v1/add_cluster", process_alter_cluster}, - {"v1/drop_cluster", process_alter_cluster}, - {"v1/rename_cluster", process_alter_cluster}, - {"v1/update_cluster_endpoint", process_alter_cluster}, - {"v1/update_cluster_mysql_user_name", process_alter_cluster}, - {"v1/add_node", process_alter_cluster}, - {"v1/drop_node", process_alter_cluster}, - {"v1/decommission_node", process_alter_cluster}, - {"v1/set_cluster_status", process_alter_cluster}, - {"v1/alter_vcluster_info", process_alter_cluster}, - // for alter instance - {"create_instance", process_create_instance}, - {"drop_instance", process_alter_instance}, - {"rename_instance", process_alter_instance}, - {"enable_instance_sse", process_alter_instance}, - {"disable_instance_sse", process_alter_instance}, - {"set_instance_status", process_alter_instance}, - {"v1/create_instance", process_create_instance}, - {"v1/drop_instance", process_alter_instance}, - {"v1/rename_instance", process_alter_instance}, - {"v1/enable_instance_sse", process_alter_instance}, - {"v1/disable_instance_sse", process_alter_instance}, - {"v1/set_instance_status", process_alter_instance}, - // for alter obj store info - {"add_obj_info", process_alter_obj_store_info}, - {"legacy_update_ak_sk", process_alter_obj_store_info}, - {"update_ak_sk", process_update_ak_sk}, - {"v1/add_obj_info", process_alter_obj_store_info}, - {"v1/legacy_update_ak_sk", process_alter_obj_store_info}, - {"v1/update_ak_sk", process_update_ak_sk}, - {"show_storage_vaults", process_get_obj_store_info}, - {"add_hdfs_vault", process_alter_storage_vault}, - {"add_s3_vault", process_alter_storage_vault}, - {"alter_s3_vault", process_alter_storage_vault}, - {"drop_s3_vault", process_alter_storage_vault}, - {"drop_hdfs_vault", process_alter_storage_vault}, - {"alter_obj_info", process_alter_obj_store_info}, - {"v1/alter_obj_info", process_alter_obj_store_info}, - {"v1/alter_s3_vault", process_alter_storage_vault}, - - // for tools - {"decode_key", process_decode_key}, - {"encode_key", process_encode_key}, - {"get_value", process_get_value}, - {"set_value", process_set_value}, - {"show_meta_ranges", process_show_meta_ranges}, - {"txn_lazy_commit", process_txn_lazy_commit}, - {"injection_point", process_injection_point}, - {"fix_tablet_stats", process_fix_tablet_stats}, - {"fix_tablet_db_id", process_fix_tablet_db_id}, - {"v1/decode_key", process_decode_key}, - {"v1/encode_key", process_encode_key}, - {"v1/get_value", process_get_value}, - {"v1/set_value", process_set_value}, - {"v1/show_meta_ranges", process_show_meta_ranges}, - {"v1/txn_lazy_commit", process_txn_lazy_commit}, - {"v1/injection_point", process_injection_point}, - {"v1/fix_tablet_stats", process_fix_tablet_stats}, - {"v1/fix_tablet_db_id", process_fix_tablet_db_id}, - // for get - {"get_instance", process_get_instance_info}, - {"get_obj_store_info", process_get_obj_store_info}, - {"get_cluster", process_get_cluster}, - {"get_tablet_stats", process_get_tablet_stats}, - {"get_stage", process_get_stage}, - {"get_cluster_status", process_get_cluster_status}, - {"v1/get_instance", process_get_instance_info}, - {"v1/get_obj_store_info", process_get_obj_store_info}, - {"v1/get_cluster", process_get_cluster}, - {"v1/get_tablet_stats", process_get_tablet_stats}, - {"v1/get_stage", process_get_stage}, - {"v1/get_cluster_status", process_get_cluster_status}, - // snapshot related - {"list_snapshot", process_list_snapshot}, - {"set_snapshot_property", process_set_snapshot_property}, - {"get_snapshot_property", process_get_snapshot_property}, - {"set_multi_version_status", process_set_multi_version_status}, - {"v1/list_snapshot", process_list_snapshot}, - {"v1/set_snapshot_property", process_set_snapshot_property}, - {"v1/get_snapshot_property", process_get_snapshot_property}, - {"v1/set_multi_version_status", process_set_multi_version_status}, - // misc - {"abort_txn", process_abort_txn}, - {"abort_tablet_job", process_abort_tablet_job}, - {"alter_ram_user", process_alter_ram_user}, - {"alter_iam", process_alter_iam}, - {"adjust_rate_limit", process_adjust_rate_limit}, - {"list_rate_limit", process_query_rate_limit}, - {"update_config", process_update_config}, - {"v1/abort_txn", process_abort_txn}, - {"v1/abort_tablet_job", process_abort_tablet_job}, - {"v1/alter_ram_user", process_alter_ram_user}, - {"v1/alter_iam", process_alter_iam}, - {"v1/adjust_rate_limit", process_adjust_rate_limit}, - {"v1/list_rate_limit", process_query_rate_limit}, - {"v1/update_config", process_update_config}, - }; - auto* cntl = static_cast(controller); brpc::ClosureGuard closure_guard(done); - // Prepare input request info LOG(INFO) << "rpc from " << cntl->remote_side() << " request: " << cntl->http_request().uri().path(); std::string http_request = format_http_request(cntl); std::string http_request_for_log = encryt_sk(http_request); http_request_for_log = hide_ak(http_request_for_log); + const auto& unresolved_path = cntl->http_request().unresolved_path(); + auto api_path = split_http_api_path(unresolved_path); + const auto& handlers = get_http_handlers(); + auto it = handlers.find(api_path.route); // Auth auto token = http_query(cntl->http_request().uri(), "token"); @@ -980,15 +191,18 @@ void MetaServiceImpl::http(::google::protobuf::RpcController* controller, return; } - // Process http request - auto& unresolved_path = cntl->http_request().unresolved_path(); - HttpHandler handler = process_unknown; - auto it = http_handlers.find(unresolved_path); - if (it != http_handlers.end()) { - handler = it->second; + const auto* handler = + it == handlers.end() ? nullptr : resolve_http_handler(it->second, api_path.version); + if (handler == nullptr || + (it->second.role != HttpRole::META_SERVICE && it->second.role != HttpRole::BOTH)) { + std::string msg = "http path not found or not allowed"; + cntl->http_response().set_status_code(404); + cntl->response_attachment().append(msg); + cntl->response_attachment().append("\n"); + return; } - auto [status_code, msg, body] = handler(this, cntl); + auto [status_code, msg, body] = (*handler)(this, cntl); cntl->http_response().set_status_code(status_code); cntl->response_attachment().append(body); cntl->response_attachment().append("\n"); @@ -999,4 +213,4 @@ void MetaServiceImpl::http(::google::protobuf::RpcController* controller, << http_request_for_log << "\n ret=" << ret << " msg=" << msg; } -} // namespace doris::cloud +} // namespace doris::cloud \ No newline at end of file diff --git a/cloud/src/meta-service/meta_service_http.h b/cloud/src/meta-service/meta_service_http.h index 1dca1d3d64dd33..3c9894839232e3 100644 --- a/cloud/src/meta-service/meta_service_http.h +++ b/cloud/src/meta-service/meta_service_http.h @@ -21,9 +21,11 @@ #include #include +#include #include #include #include +#include #include "common/util.h" @@ -68,8 +70,10 @@ inline static HttpResponse http_json_reply_message(const MetaServiceResponseStat inline static HttpResponse http_text_reply(MetaServiceCode code, const std::string& msg, const std::string& body) { - auto [status_code, _] = convert_ms_code_to_http_code(code); - return {status_code, msg, body}; + auto [status_code, status_msg] = convert_ms_code_to_http_code(code); + std::string response_msg = + code == MetaServiceCode::OK && msg.empty() ? std::string(status_msg) : msg; + return {.status_code = status_code, .msg = std::move(response_msg), .body = body}; } inline static HttpResponse http_text_reply(const MetaServiceResponseStatus& status, @@ -77,4 +81,20 @@ inline static HttpResponse http_text_reply(const MetaServiceResponseStatus& stat return http_text_reply(status.code(), status.msg(), body); } +// Forward declarations +class MetaServiceImpl; +class RecyclerServiceImpl; + +// Role-based HTTP handler +enum class HttpRole { META_SERVICE = 1, RECYCLER = 2, BOTH = 3 }; + +using HttpHandler = std::function; + +struct HttpHandlerInfo { + HttpHandler handler; + // Route-local overrides keyed by API version, e.g. {"v2", handler_for_v2}. + std::unordered_map versioned_handlers; + HttpRole role; +}; + } // namespace doris::cloud diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h index bf04f2f575803b..db774017dac212 100644 --- a/cloud/src/recycler/recycler.h +++ b/cloud/src/recycler/recycler.h @@ -88,6 +88,10 @@ class Recycler { bool stopped() const { return stopped_.load(std::memory_order_acquire); } + [[nodiscard]] const RecyclerThreadPoolGroup& thread_pool_group() const { + return _thread_pool_group; + } + private: void recycle_callback(); diff --git a/cloud/src/recycler/recycler_service.cpp b/cloud/src/recycler/recycler_service.cpp index 4b33ef76c9575e..7503f8cc6f8f44 100644 --- a/cloud/src/recycler/recycler_service.cpp +++ b/cloud/src/recycler/recycler_service.cpp @@ -33,6 +33,7 @@ #include "common/config.h" #include "common/defer.h" +#include "common/http_helper.h" #include "common/logging.h" #include "common/util.h" #include "cpp/s3_rate_limiter.h" @@ -507,204 +508,43 @@ void RecyclerServiceImpl::http(::google::protobuf::RpcController* controller, const ::doris::cloud::MetaServiceHttpRequest* request, ::doris::cloud::MetaServiceHttpResponse* response, ::google::protobuf::Closure* done) { - auto cntl = static_cast(controller); - LOG(INFO) << "rpc from " << cntl->remote_side() << " request: " << request->DebugString(); + auto* cntl = static_cast(controller); + LOG(INFO) << "rpc from " << cntl->remote_side() + << " request: " << cntl->http_request().uri().path(); brpc::ClosureGuard closure_guard(done); - MetaServiceCode code = MetaServiceCode::OK; - int status_code = 200; - std::string msg = "OK"; - std::string req; - std::string response_body; - std::string request_body; - DORIS_CLOUD_DEFER { - status_code = std::get<0>(convert_ms_code_to_http_code(code)); - LOG(INFO) << (code == MetaServiceCode::OK ? "succ to " : "failed to ") << "http" - << " " << cntl->remote_side() << " request=\n" - << req << "\n ret=" << code << " msg=" << msg; - cntl->http_response().set_status_code(status_code); - cntl->response_attachment().append(response_body); + const auto& unresolved_path = cntl->http_request().unresolved_path(); + auto api_path = split_http_api_path(unresolved_path); + const auto& handlers = get_http_handlers(); + auto it = handlers.find(api_path.route); + const auto* handler = + it == handlers.end() ? nullptr : resolve_http_handler(it->second, api_path.version); + if (handler == nullptr || + (it->second.role != HttpRole::RECYCLER && it->second.role != HttpRole::BOTH)) { + std::string msg = "http path not found or not allowed"; + cntl->http_response().set_status_code(404); + cntl->response_attachment().append(msg); cntl->response_attachment().append("\n"); - }; - - // Prepare input request info - auto unresolved_path = cntl->http_request().unresolved_path(); - auto uri = cntl->http_request().uri(); - std::stringstream ss; - ss << "\nuri_path=" << uri.path(); - ss << "\nunresolved_path=" << unresolved_path; - ss << "\nmethod=" << brpc::HttpMethod2Str(cntl->http_request().method()); - ss << "\nquery strings:"; - for (auto it = uri.QueryBegin(); it != uri.QueryEnd(); ++it) { - ss << "\n" << it->first << "=" << it->second; - } - ss << "\nheaders:"; - for (auto it = cntl->http_request().HeaderBegin(); it != cntl->http_request().HeaderEnd(); - ++it) { - ss << "\n" << it->first << ":" << it->second; + return; } - req = ss.str(); - ss.clear(); - request_body = cntl->request_attachment().to_string(); // Just copy // Auth - auto token = uri.GetQuery("token"); + const auto* token = cntl->http_request().uri().GetQuery("token"); if (token == nullptr || *token != config::http_token) { - msg = "incorrect token, token=" + (token == nullptr ? std::string("(not given)") : *token); - response_body = "incorrect token"; - status_code = 403; - return; - } - - if (unresolved_path == "recycle_instance") { - RecycleInstanceRequest req; - auto st = google::protobuf::util::JsonStringToMessage(request_body, &req); - if (!st.ok()) { - msg = "failed to RecycleInstanceRequest, error: " + st.message().ToString(); - response_body = msg; - LOG(WARNING) << msg; - return; - } - RecycleInstanceResponse res; - recycle_instance(cntl, &req, &res, nullptr); - code = res.status().code(); - msg = res.status().msg(); - response_body = msg; - return; - } - - if (unresolved_path == "statistics_recycle") { - StatisticsRecycleRequest req; - auto st = google::protobuf::util::JsonStringToMessage(request_body, &req); - if (!st.ok()) { - msg = "failed to StatisticsRecycleRequest, error: " + st.message().ToString(); - response_body = msg; - LOG(WARNING) << msg; - return; - } - statistics_recycle(req, code, msg); - response_body = msg; - return; - } - - if (unresolved_path == "recycle_copy_jobs") { - auto instance_id = uri.GetQuery("instance_id"); - if (instance_id == nullptr || instance_id->empty()) { - msg = "no instance id"; - response_body = msg; - status_code = 400; - return; - } - recycle_copy_jobs(txn_kv_, *instance_id, code, msg, recycler_->_thread_pool_group, - txn_lazy_committer_); - - response_body = msg; - return; - } - - if (unresolved_path == "recycle_job_info") { - auto instance_id = uri.GetQuery("instance_id"); - if (instance_id == nullptr || instance_id->empty()) { - msg = "no instance id"; - response_body = msg; - status_code = 400; - return; - } - std::string key; - job_recycle_key({*instance_id}, &key); - recycle_job_info(txn_kv_, *instance_id, key, code, msg); - response_body = msg; - return; - } - - if (unresolved_path == "check_instance") { - auto instance_id = uri.GetQuery("instance_id"); - if (instance_id == nullptr || instance_id->empty()) { - msg = "no instance id"; - response_body = msg; - status_code = 400; - return; - } - if (!checker_) { - msg = "checker not enabled"; - response_body = msg; - status_code = 400; - return; - } - check_instance(*instance_id, code, msg); - response_body = msg; - return; - } - - if (unresolved_path == "check_job_info") { - auto instance_id = uri.GetQuery("instance_id"); - if (instance_id == nullptr || instance_id->empty()) { - msg = "no instance id"; - response_body = msg; - status_code = 400; - return; - } - std::string key; - job_check_key({*instance_id}, &key); - recycle_job_info(txn_kv_, *instance_id, key, code, msg); - response_body = msg; - return; - } - - if (unresolved_path == "check_meta") { - auto instance_id = uri.GetQuery("instance_id"); - auto host = uri.GetQuery("host"); - auto port = uri.GetQuery("port"); - auto user = uri.GetQuery("user"); - auto password = uri.GetQuery("password"); - if (instance_id == nullptr || instance_id->empty() || host == nullptr || host->empty() || - port == nullptr || port->empty() || password == nullptr || user == nullptr || - user->empty()) { - msg = "no instance id or mysql conn str info"; - response_body = msg; - status_code = 400; - return; - } - LOG(INFO) << " host " << *host; - LOG(INFO) << " port " << *port; - LOG(INFO) << " user " << *user; - LOG(INFO) << " instance " << *instance_id; - check_meta(txn_kv_, *instance_id, *host, *port, *user, *password, msg); - status_code = 200; - response_body = msg; + std::string msg = "incorrect token, token=" + + (token == nullptr ? std::string("(not given)") : *token); + cntl->http_response().set_status_code(403); + cntl->response_attachment().append(msg); + cntl->response_attachment().append("\n"); + LOG(WARNING) << "failed to handle http from " << cntl->remote_side() << " msg: " << msg; return; } - if (unresolved_path == "adjust_rate_limiter") { - auto type_string = uri.GetQuery("type"); - auto speed = uri.GetQuery("speed"); - auto burst = uri.GetQuery("burst"); - auto limit = uri.GetQuery("limit"); - if (type_string->empty() || speed->empty() || burst->empty() || limit->empty() || - (*type_string != "get" && *type_string != "put")) { - msg = "argument not suitable"; - response_body = msg; - status_code = 400; - return; - } - auto max_speed = speed->empty() ? 0 : std::stoul(*speed); - auto max_burst = burst->empty() ? 0 : std::stoul(*burst); - auto max_limit = burst->empty() ? 0 : std::stoul(*limit); - if (0 != reset_s3_rate_limiter(string_to_s3_rate_limit_type(*type_string), max_speed, - max_burst, max_limit)) { - msg = "adjust failed"; - response_body = msg; - status_code = 400; - return; - } - - status_code = 200; - response_body = msg; - return; - } + auto [status_code, msg, body] = (*handler)(this, cntl); + cntl->http_response().set_status_code(status_code); + cntl->response_attachment().append(body); + cntl->response_attachment().append("\n"); - status_code = 404; - msg = "http path " + uri.path() + " not found, it may be not implemented"; - response_body = msg; + LOG(INFO) << (status_code == 200 ? "succ to " : "failed to ") << __PRETTY_FUNCTION__ << " " + << cntl->remote_side() << " ret=" << status_code << " msg=" << msg; } - } // namespace doris::cloud diff --git a/cloud/src/recycler/recycler_service.h b/cloud/src/recycler/recycler_service.h index 6890d7049bd90b..c00cd1e74a7a6c 100644 --- a/cloud/src/recycler/recycler_service.h +++ b/cloud/src/recycler/recycler_service.h @@ -27,6 +27,8 @@ namespace doris::cloud { class Recycler; class Checker; +struct RecyclerThreadPoolGroup; +class TxnLazyCommitter; class RecyclerServiceImpl : public cloud::RecyclerService { public: @@ -34,6 +36,17 @@ class RecyclerServiceImpl : public cloud::RecyclerService { std::shared_ptr txn_lazy_committer); ~RecyclerServiceImpl() override; + [[nodiscard]] const std::shared_ptr& txn_kv() const { return txn_kv_; } + [[nodiscard]] Recycler* recycler() const { return recycler_; } + [[nodiscard]] Checker* checker() const { return checker_; } + [[nodiscard]] const std::shared_ptr& txn_lazy_committer() const { + return txn_lazy_committer_; + } + + void statistics_recycle(StatisticsRecycleRequest& req, MetaServiceCode& code, std::string& msg); + + void check_instance(const std::string& instance_id, MetaServiceCode& code, std::string& msg); + void recycle_instance(::google::protobuf::RpcController* controller, const ::doris::cloud::RecycleInstanceRequest* request, ::doris::cloud::RecycleInstanceResponse* response, @@ -44,11 +57,6 @@ class RecyclerServiceImpl : public cloud::RecyclerService { ::doris::cloud::MetaServiceHttpResponse* response, ::google::protobuf::Closure* done) override; -private: - void statistics_recycle(StatisticsRecycleRequest& req, MetaServiceCode& code, std::string& msg); - - void check_instance(const std::string& instance_id, MetaServiceCode& code, std::string& msg); - private: std::shared_ptr txn_kv_; Recycler* recycler_; // Ref @@ -59,4 +67,14 @@ class RecyclerServiceImpl : public cloud::RecyclerService { extern int reset_s3_rate_limiter(S3RateLimitType type, size_t max_speed, size_t max_burst, size_t limit); +void recycle_copy_jobs(const std::shared_ptr& txn_kv, const std::string& instance_id, + MetaServiceCode& code, std::string& msg, + RecyclerThreadPoolGroup thread_pool_group, + std::shared_ptr txn_lazy_committer); +void recycle_job_info(const std::shared_ptr& txn_kv, const std::string& instance_id, + std::string_view key, MetaServiceCode& code, std::string& msg); +void check_meta(const std::shared_ptr& txn_kv, const std::string& instance_id, + const std::string& host, const std::string& port, const std::string& user, + const std::string& password, std::string& msg); + } // namespace doris::cloud diff --git a/cloud/test/meta_service_http_test.cpp b/cloud/test/meta_service_http_test.cpp index abf791649de72a..7452fed53ec417 100644 --- a/cloud/test/meta_service_http_test.cpp +++ b/cloud/test/meta_service_http_test.cpp @@ -42,6 +42,7 @@ #include "common/config.h" #include "common/configbase.h" #include "common/defer.h" +#include "common/http_helper.h" #include "common/logging.h" #include "common/util.h" #include "cpp/sync_point.h" @@ -370,7 +371,52 @@ static void insert_rowset(MetaService* meta_service, int64_t db_id, const std::s commit_txn(meta_service, db_id, txn_id, label); } -/// NOTICE: Not ALL `code`, returned by http server, are supported by `MetaServiceCode`. +TEST(MetaServiceHttpTest, SplitHttpApiPath) { + { + auto path = split_http_api_path("v1/create_instance"); + ASSERT_EQ(path.version, "v1"); + ASSERT_EQ(path.route, "create_instance"); + } + { + auto path = split_http_api_path("v2/create_instance"); + ASSERT_EQ(path.version, "v2"); + ASSERT_EQ(path.route, "create_instance"); + } + { + auto path = split_http_api_path("create_instance"); + ASSERT_TRUE(path.version.empty()); + ASSERT_EQ(path.route, "create_instance"); + } + { + auto path = split_http_api_path("version/create_instance"); + ASSERT_TRUE(path.version.empty()); + ASSERT_EQ(path.route, "version/create_instance"); + } +} + +TEST(MetaServiceHttpTest, ResolveHttpHandlerByVersion) { + // clang-format off + HttpHandlerInfo handler_info { + .handler = [](void*, brpc::Controller*) { return HttpResponse {200, "v1", ""}; }, + .versioned_handlers = {{"v2", + [](void*, brpc::Controller*) { + return HttpResponse {200, "v2", ""}; + }}}, + .role = HttpRole::META_SERVICE}; + // clang-format on + + ASSERT_EQ(resolve_http_handler(handler_info, ""), &handler_info.handler); + ASSERT_EQ(resolve_http_handler(handler_info, "v1"), &handler_info.handler); + ASSERT_EQ(resolve_http_handler(handler_info, "v2"), &handler_info.versioned_handlers.at("v2")); + ASSERT_EQ(resolve_http_handler(handler_info, "v3"), nullptr); + + const auto& handlers = get_http_handlers(); + auto it = handlers.find("add_cluster"); + ASSERT_NE(it, handlers.end()); + ASSERT_EQ(resolve_http_handler(it->second, ""), &it->second.handler); + ASSERT_EQ(resolve_http_handler(it->second, "v1"), &it->second.handler); + ASSERT_EQ(resolve_http_handler(it->second, "v2"), nullptr); +} TEST(MetaServiceHttpTest, InstanceTest) { HttpContext ctx; @@ -610,6 +656,74 @@ TEST(MetaServiceHttpTest, InstanceTestWithVersion) { } } +TEST(MetaServiceHttpTest, AlterClusterTestWithVersion) { + config::enable_cluster_name_check = true; + + HttpContext ctx; + { + CreateInstanceRequest req; + req.set_instance_id(mock_instance); + req.set_user_id("test_user"); + req.set_name("test_name"); + ObjectStoreInfoPB obj; + obj.set_ak("123"); + obj.set_sk("321"); + obj.set_bucket("456"); + obj.set_prefix("654"); + obj.set_endpoint("789"); + obj.set_region("987"); + obj.set_external_endpoint("888"); + obj.set_provider(ObjectStoreInfoPB::BOS); + req.mutable_obj_info()->CopyFrom(obj); + + auto [status_code, resp] = + ctx.forward("v1/create_instance", req); + ASSERT_EQ(status_code, 200); + ASSERT_EQ(resp.code(), MetaServiceCode::OK); + } + + { + AlterClusterRequest req; + req.set_instance_id(mock_instance); + req.mutable_cluster()->set_cluster_name(mock_cluster_name); + req.mutable_cluster()->set_type(ClusterPB::COMPUTE); + req.mutable_cluster()->set_cluster_id(mock_cluster_id); + auto [status_code, resp] = ctx.forward("v1/add_cluster", req); + ASSERT_EQ(status_code, 200); + ASSERT_EQ(resp.code(), MetaServiceCode::OK); + } + + { + GetClusterRequest req; + req.set_cloud_unique_id("1:" + mock_instance + ":xxxx"); + req.set_cluster_id(mock_cluster_id); + auto [status_code, resp] = ctx.forward_with_result("v1/get_cluster", req); + ASSERT_EQ(status_code, 200); + ASSERT_EQ(resp.status.code(), MetaServiceCode::OK); + ASSERT_TRUE(resp.result.has_value()); + ASSERT_EQ(resp.result->cluster_name(), mock_cluster_name); + } + + { + AlterClusterRequest req; + req.set_instance_id(mock_instance); + req.mutable_cluster()->set_cluster_id(mock_cluster_id); + req.mutable_cluster()->set_cluster_name("rename_cluster_name"); + auto [status_code, resp] = ctx.forward("v1/rename_cluster", req); + ASSERT_EQ(status_code, 200); + ASSERT_EQ(resp.code(), MetaServiceCode::OK); + } + + { + AlterClusterRequest req; + req.set_instance_id(mock_instance); + req.mutable_cluster()->set_cluster_id(mock_cluster_id); + auto [status_code, resp] = ctx.forward("v1/drop_cluster", req); + ASSERT_EQ(status_code, 200); + ASSERT_EQ(resp.code(), MetaServiceCode::OK); + } +} + TEST(MetaServiceHttpTest, AlterClusterTest) { config::enable_cluster_name_check = true; @@ -753,7 +867,7 @@ TEST(MetaServiceHttpTest, AlterClusterTest) { auto [status_code, resp] = ctx.forward("add_cluster", req); ASSERT_EQ(status_code, 200); ASSERT_EQ(resp.code(), MetaServiceCode::OK); - ASSERT_EQ(resp.msg(), ""); + ASSERT_EQ(resp.msg(), "OK"); } // case: request has invalid argument @@ -1397,8 +1511,8 @@ TEST(MetaServiceHttpTest, GetTabletStatsTest) { TEST(MetaServiceHttpTest, ToUnknownUrlTest) { HttpContext ctx; auto [status_code, content] = ctx.query("unkown_resource_xxxxxx", ""); - ASSERT_EQ(status_code, 200); - ASSERT_EQ(content, "{\n \"code\": \"OK\",\n \"msg\": \"\"\n}\n"); + ASSERT_EQ(status_code, 404); + ASSERT_EQ(content, "http path not found or not allowed\n"); } TEST(MetaServiceHttpTest, UnknownFields) { @@ -1820,7 +1934,7 @@ TEST(MetaServiceHttpTest, UpdateConfig) { { auto [status_code, content] = ctx.query("update_config", ""); ASSERT_EQ(status_code, 400); - std::string msg = "query param `config` should not be empty"; + std::string msg = "query param `configs` should not be empty"; ASSERT_NE(content.find(msg), std::string::npos); } { @@ -2047,6 +2161,53 @@ TEST(MetaServiceHttpTest, UpdateConfig) { } } +TEST(MetaServiceHttpTest, ShowConfigEscapesJsonSpecialCharacters) { + HttpContext ctx; + + const std::string config_key = "idempotent_request_replay_exclusion"; + const std::string old_value = config::idempotent_request_replay_exclusion; + const std::string new_value = R"(Get"Tablet\StatsRequest)"; + DORIS_CLOUD_DEFER_COPY(config_key, old_value) { + auto [succ, cause] = config::set_config({{config_key, old_value}}, false, ""); + ASSERT_TRUE(succ) << cause; + }; + + { + auto [succ, cause] = config::set_config({{config_key, new_value}}, false, ""); + ASSERT_TRUE(succ) << cause; + } + + { + rapidjson::Document d; + rapidjson::ParseResult ps = d.Parse(config::show_config(config_key).c_str()); + ASSERT_TRUE(ps) << rapidjson::GetParseError_En(ps.Code()); + ASSERT_TRUE(d.IsArray()); + ASSERT_EQ(d.Size(), 1); + ASSERT_TRUE(d[0].IsArray()); + ASSERT_EQ(d[0].Size(), 4); + ASSERT_TRUE(d[0][2].IsString()); + ASSERT_EQ(d[0][2].GetString(), new_value); + } + + { + auto [status_code, body] = ctx.query("show_config", "conf_key=" + config_key); + ASSERT_EQ(status_code, 200); + + rapidjson::Document d; + rapidjson::ParseResult ps = d.Parse(body.c_str()); + ASSERT_TRUE(ps) << rapidjson::GetParseError_En(ps.Code()) << ", body: " << body; + ASSERT_TRUE(d.HasMember("code")); + ASSERT_STREQ(d["code"].GetString(), "OK"); + ASSERT_TRUE(d.HasMember("result")); + ASSERT_TRUE(d["result"].IsArray()); + ASSERT_EQ(d["result"].Size(), 1); + ASSERT_TRUE(d["result"][0].IsArray()); + ASSERT_EQ(d["result"][0].Size(), 4); + ASSERT_TRUE(d["result"][0][2].IsString()); + ASSERT_EQ(d["result"][0][2].GetString(), new_value); + } +} + TEST(HttpEncodeKeyTest, ProcessHttpSetValue) { auto txn_kv = std::make_shared(); std::unique_ptr txn; @@ -2687,7 +2848,7 @@ TEST(MetaServiceHttpTest, VirtualClusterTest) { node->set_edit_log_port(9990); node->set_node_type(NodeInfoPB::FE_MASTER); ret = ctx.forward("add_cluster", req_before_fe); - ASSERT_EQ(std::get<1>(ret).msg(), ""); + ASSERT_EQ(std::get<1>(ret).msg(), "OK"); ASSERT_EQ(std::get<0>(ret), 200); ASSERT_EQ(std::get<1>(ret).code(), MetaServiceCode::OK); @@ -2698,7 +2859,7 @@ TEST(MetaServiceHttpTest, VirtualClusterTest) { policy.add_standby_cluster_names(mock_exist_cluster_name1); ret = add_cluster(ctx, mock_vcg_name1, mock_vcg_id1, ClusterPB::VIRTUAL, {mock_exist_cluster_name1, mock_exist_cluster_name2}, &policy); - ASSERT_EQ(std::get<1>(ret).msg(), ""); + ASSERT_EQ(std::get<1>(ret).msg(), "OK"); ASSERT_EQ(std::get<0>(ret), 200); ASSERT_EQ(std::get<1>(ret).code(), MetaServiceCode::OK);