diff --git a/be/src/information_schema/schema_cluster_snapshots_scanner.cpp b/be/src/information_schema/schema_cluster_snapshots_scanner.cpp index ba92ff896f3901..19e1b3e10da9fb 100644 --- a/be/src/information_schema/schema_cluster_snapshots_scanner.cpp +++ b/be/src/information_schema/schema_cluster_snapshots_scanner.cpp @@ -47,6 +47,7 @@ std::vector SchemaClusterSnapshotsScanner::_s_tbls_co {"LABEL", TYPE_STRING, sizeof(StringRef), true}, {"MSG", TYPE_STRING, sizeof(StringRef), true}, {"COUNT", TYPE_INT, sizeof(int32_t), true}, + {"VAULT_ID", TYPE_STRING, sizeof(StringRef), true}, }; SchemaClusterSnapshotsScanner::SchemaClusterSnapshotsScanner() @@ -247,6 +248,19 @@ Status SchemaClusterSnapshotsScanner::_fill_block_impl(Block* block) { } RETURN_IF_ERROR(fill_dest_column_for_range(block, 11, datas)); } + // resource_id + { + for (int i = 0; i < row_num; ++i) { + auto& snapshot = _snapshots[i]; + if (snapshot.has_resource_id()) { + strs[i] = StringRef(snapshot.resource_id().c_str(), snapshot.resource_id().size()); + datas[i] = strs.data() + i; + } else { + datas[i] = nullptr; + } + } + RETURN_IF_ERROR(fill_dest_column_for_range(block, 12, datas)); + } return Status::OK(); } diff --git a/be/test/exec/schema_scanner/schema_cluster_snapshots_scanner_test.cpp b/be/test/exec/schema_scanner/schema_cluster_snapshots_scanner_test.cpp index 6415f837a7c2e0..f1392dcb831a3f 100644 --- a/be/test/exec/schema_scanner/schema_cluster_snapshots_scanner_test.cpp +++ b/be/test/exec/schema_scanner/schema_cluster_snapshots_scanner_test.cpp @@ -49,6 +49,7 @@ TEST_F(SchemaClusterSnapshotsScannerTest, test_get_next_block_internal) { snapshot.set_ttl_seconds(3600); snapshot.set_snapshot_label("label"); snapshot.set_reason("reason"); + snapshot.set_resource_id("vault_1"); snapshots.push_back(snapshot); } @@ -62,6 +63,10 @@ TEST_F(SchemaClusterSnapshotsScannerTest, test_get_next_block_internal) { auto col = data_block->safe_get_by_position(0); auto v = (*col.column)[1].get(); EXPECT_EQ(v, "232ds"); + + auto vault_col = data_block->safe_get_by_position(12); + auto vault_id = (*vault_col.column)[1].get(); + EXPECT_EQ(vault_id, "vault_1"); } } // namespace doris diff --git a/docker/runtime/doris-compose/Readme.md b/docker/runtime/doris-compose/Readme.md index c4b60460e49b8a..a92a1fff76480c 100644 --- a/docker/runtime/doris-compose/Readme.md +++ b/docker/runtime/doris-compose/Readme.md @@ -122,12 +122,20 @@ python docker/runtime/doris-compose/doris-compose.py up --be-id ] ... [ --cloud ] + [ --cluster-snapshot ] ``` if it's a new cluster, must specific the image. add fe/be nodes with the specific image, or update existing nodes with `--fe-id`, `--be-id` +The `--cluster-snapshot` parameter allows you to provide a cluster snapshot JSON content for FE-1 first startup in cloud mode only. The JSON will be written to FE conf/cluster_snapshot.json and passed to start_fe.sh with --cluster_snapshot parameter. This is only effective on first startup. + +Example: +```shell +python docker/runtime/doris-compose/doris-compose.py up my-cluster my-image --cloud --cluster-snapshot '{"instance_id":"instance_id_xxx"}' +``` + For create a cloud cluster, steps are as below: 1. Write cloud s3 store config file, its default path is '/tmp/doris/cloud.ini'. @@ -142,6 +150,12 @@ The simplest way to create a cloud cluster: python docker/runtime/doris-compose/doris-compose.py up --cloud ``` +To create a cloud cluster with a custom cluster snapshot: + +```shell +python docker/runtime/doris-compose/doris-compose.py up --cloud --cluster-snapshot '{"instance_id":"instance_id_xxx"}' +``` + It will create 1 fdb, 1 meta service server, 1 recycler, 3 fe and 3 be. ### Remove node from the cluster diff --git a/docker/runtime/doris-compose/cluster.py b/docker/runtime/doris-compose/cluster.py index 05529e2bf999a7..4345021a662933 100644 --- a/docker/runtime/doris-compose/cluster.py +++ b/docker/runtime/doris-compose/cluster.py @@ -64,6 +64,20 @@ LOG = utils.get_logger() +def is_true(value): + return str(value).strip().lower() in ("1", "true", "yes", "y", "on") + + +def get_env_value(envs, name): + for env in envs or []: + pos = env.find('=') + if pos == -1: + continue + if env[:pos] == name: + return env[pos + 1:] + return None + + def get_cluster_path(cluster_name): return os.path.join(LOCAL_DORIS_PATH, cluster_name) @@ -397,6 +411,7 @@ def docker_env(self): "STOP_GRACE": 1 if enable_coverage else 0, "IS_CLOUD": 1 if self.cluster.is_cloud else 0, "SQL_MODE_NODE_MGR": 1 if self.cluster.sql_mode_node_mgr else 0, + "ENABLE_STORAGE_VAULT": 1 if getattr(self.cluster, "enable_storage_vault", False) else 0, "TDE_AK": self.get_tde_ak(), "TDE_SK": self.get_tde_sk(), } @@ -528,6 +543,21 @@ def init(self): "is_cloud_follower"] = self.cluster.is_cloud and self.cluster.fe_follower super().init() + def init_conf(self): + # Call parent's init_conf first + super().init_conf() + + # Write cluster_snapshot.json for FE-1 in cloud mode only + if self.id == 1 and self.cluster.is_cloud and self.cluster.cluster_snapshot: + conf_dir = os.path.join(self.get_path(), "conf") + snapshot_file = os.path.join(conf_dir, "cluster_snapshot.json") + try: + with open(snapshot_file, "w") as f: + f.write(self.cluster.cluster_snapshot) + LOG.info(f"Written cluster snapshot to {snapshot_file}") + except Exception as e: + LOG.warning(f"Failed to write cluster snapshot file: {e}") + def get_add_init_config(self): cfg = super().get_add_init_config() if self.cluster.fe_config: @@ -578,6 +608,11 @@ def docker_env(self): envs["CLOUD_UNIQUE_ID"] = self.cloud_unique_id() if self.meta["is_cloud_follower"]: envs["IS_FE_FOLLOWER"] = 1 + # Add CLUSTER_SNAPSHOT_FILE env var for FE-1 if the file exists + if self.id == 1: + snapshot_file = os.path.join(self.get_path(), "conf", "cluster_snapshot.json") + if os.path.exists(snapshot_file): + envs["CLUSTER_SNAPSHOT_FILE"] = "./conf/cluster_snapshot.json" envs["MY_QUERY_PORT"] = self.meta["ports"]["query_port"] envs["MY_EDITLOG_PORT"] = self.meta["ports"]["edit_log_port"] return envs @@ -832,7 +867,9 @@ def __init__(self, name, subnet, image, is_cloud, is_root_user, fe_config, be_config, ms_config, recycle_config, remote_master_fe, local_network_ip, fe_follower, be_disks, be_cluster, reg_be, extra_hosts, coverage_dir, cloud_store_config, - sql_mode_node_mgr, be_metaservice_endpoint, be_cluster_id, tde_ak, tde_sk): + sql_mode_node_mgr, be_metaservice_endpoint, be_cluster_id, tde_ak, tde_sk, + external_ms_cluster, instance_id, cluster_snapshot="", + enable_storage_vault=False): self.name = name self.subnet = subnet self.image = image @@ -851,6 +888,14 @@ def __init__(self, name, subnet, image, is_cloud, is_root_user, fe_config, self.extra_hosts = extra_hosts self.coverage_dir = coverage_dir self.cloud_store_config = cloud_store_config + self.external_ms_cluster = external_ms_cluster + self.instance_id = instance_id + if not self.instance_id: + self.instance_id = f"instance_{name}" if self.external_ms_cluster else "default_instance_id" + # cluster_snapshot is not persisted to meta, only used during cluster creation + self.cluster_snapshot = cluster_snapshot + self.enable_storage_vault = is_true(enable_storage_vault) + self.is_rollback = False self.groups = { node_type: Group(node_type) for node_type in Node.TYPE_ALL @@ -869,7 +914,9 @@ def new(name, image, is_cloud, is_root_user, fe_config, be_config, ms_config, recycle_config, remote_master_fe, local_network_ip, fe_follower, be_disks, be_cluster, reg_be, extra_hosts, coverage_dir, cloud_store_config, sql_mode_node_mgr, - be_metaservice_endpoint, be_cluster_id, tde_ak, tde_sk): + be_metaservice_endpoint, be_cluster_id, tde_ak, tde_sk, + external_ms_cluster, instance_id, cluster_snapshot="", + enable_storage_vault=False): if not os.path.exists(LOCAL_DORIS_PATH): os.makedirs(LOCAL_DORIS_PATH, exist_ok=True) os.chmod(LOCAL_DORIS_PATH, 0o777) @@ -884,7 +931,8 @@ def new(name, image, is_cloud, is_root_user, fe_config, be_config, be_disks, be_cluster, reg_be, extra_hosts, coverage_dir, cloud_store_config, sql_mode_node_mgr, be_metaservice_endpoint, - be_cluster_id, tde_ak, tde_sk) + be_cluster_id, tde_ak, tde_sk, external_ms_cluster, + instance_id, cluster_snapshot, enable_storage_vault) os.makedirs(cluster.get_path(), exist_ok=True) os.makedirs(get_status_path(name), exist_ok=True) cluster._save_meta() diff --git a/docker/runtime/doris-compose/command.py b/docker/runtime/doris-compose/command.py index 6acbe4795321f4..c1d3f8b3d56551 100644 --- a/docker/runtime/doris-compose/command.py +++ b/docker/runtime/doris-compose/command.py @@ -470,6 +470,35 @@ def add_parser(self, args_parsers): "Only use when creating new cluster and specify --remote-master-fe." ) + parser.add_argument( + "--external-ms", + type=str, + help= + "Use external meta service cluster (specify cluster name). " \ + "This cluster will not create its own MS/FDB/Recycler, but use the specified cluster's services. " \ + "The external cluster must be a cloud cluster with MS/FDB already running. " \ + "Example: --external-ms shared-meta. Only use when creating new cloud cluster." + ) + + parser.add_argument( + "--instance-id", + type=str, + help= + "Specify instance ID for cloud mode. If not specified, will auto-generate 'default_instance_id'. " \ + "When using external MS with multiple clusters, each cluster should have a unique instance ID. " \ + "Example: --instance-id prod_instance_1" + ) + + parser.add_argument( + "--cluster-snapshot", + type=str, + help= + "Cluster snapshot JSON content for FE-1 first startup in cloud mode only. " \ + "The JSON will be written to FE conf/cluster_snapshot.json and passed to start_fe.sh " \ + "with --cluster_snapshot parameter. Only effective on first startup. " \ + "Example: --cluster-snapshot '{\"instance_id\":\"instance_id_xxx\"}'" + ) + if self._support_boolean_action(): parser.add_argument( "--be-metaservice-endpoint", @@ -624,13 +653,20 @@ def run(self, args): if args.cloud: args.sql_mode_node_mgr = True + instance_id = getattr(args, 'instance_id', None) + cluster_snapshot = getattr(args, 'cluster_snapshot', '') + enable_storage_vault = CLUSTER.is_true( + CLUSTER.get_env_value(args.env, "ENABLE_STORAGE_VAULT")) + cluster = CLUSTER.Cluster.new( args.NAME, args.IMAGE, args.cloud, args.root, args.fe_config, args.be_config, args.ms_config, args.recycle_config, args.remote_master_fe, args.local_network_ip, args.fe_follower, args.be_disks, args.be_cluster, args.reg_be, args.extra_hosts, args.coverage_dir, cloud_store_config, args.sql_mode_node_mgr, - args.be_metaservice_endpoint, args.be_cluster_id, args.tde_ak, args.tde_sk) + args.be_metaservice_endpoint, args.be_cluster_id, args.tde_ak, args.tde_sk, + external_ms_cluster, instance_id, cluster_snapshot, + enable_storage_vault) LOG.info("Create new cluster {} succ, cluster path is {}".format( args.NAME, cluster.get_path())) diff --git a/docker/runtime/doris-compose/resource/common.sh b/docker/runtime/doris-compose/resource/common.sh index 2c53ca587a5019..cc1d43eb806569 100644 --- a/docker/runtime/doris-compose/resource/common.sh +++ b/docker/runtime/doris-compose/resource/common.sh @@ -147,3 +147,99 @@ wait_pid() { health_log "wait end" } + +create_doris_instance() { + while true; do + + lock_cluster + + if [[ "${ENABLE_STORAGE_VAULT}" =~ ^([Tt][Rr][Uu][Ee]|[Yy][Ee][Ss]|[Yy]|[Oo][Nn]|1)$ ]]; then + output=$(curl -s "${META_SERVICE_ENDPOINT}/MetaService/http/create_instance?token=greedisgood9999" \ + -d '{"instance_id":"'"${INSTANCE_ID}"'", + "name": "'"${INSTANCE_ID}"'", + "user_id": "'"${DORIS_CLOUD_USER}"'", + "vault": { + "obj_info": { + "ak": "'"${DORIS_CLOUD_AK}"'", + "sk": "'"${DORIS_CLOUD_SK}"'", + "bucket": "'"${DORIS_CLOUD_BUCKET}"'", + "endpoint": "'"${DORIS_CLOUD_ENDPOINT}"'", + "external_endpoint": "'"${DORIS_CLOUD_EXTERNAL_ENDPOINT}"'", + "prefix": "'"${DORIS_CLOUD_PREFIX}"'", + "region": "'"${DORIS_CLOUD_REGION}"'", + "provider": "'"${DORIS_CLOUD_PROVIDER}"'" + }}}') + else + output=$(curl -s "${META_SERVICE_ENDPOINT}/MetaService/http/create_instance?token=greedisgood9999" \ + -d '{"instance_id":"'"${INSTANCE_ID}"'", + "name": "'"${INSTANCE_ID}"'", + "user_id": "'"${DORIS_CLOUD_USER}"'", + "obj_info": { + "ak": "'"${DORIS_CLOUD_AK}"'", + "sk": "'"${DORIS_CLOUD_SK}"'", + "bucket": "'"${DORIS_CLOUD_BUCKET}"'", + "endpoint": "'"${DORIS_CLOUD_ENDPOINT}"'", + "external_endpoint": "'"${DORIS_CLOUD_EXTERNAL_ENDPOINT}"'", + "prefix": "'"${DORIS_CLOUD_PREFIX}"'", + "region": "'"${DORIS_CLOUD_REGION}"'", + "provider": "'"${DORIS_CLOUD_PROVIDER}"'" + }}') + fi + + unlock_cluster + + health_log "create instance output: $output" + code=$(jq -r '.code' <<<$output) + + if [ "$code" != "OK" ]; then + health_log "create instance failed" + sleep 1 + continue + fi + + health_log "create doris instance succ, output: $output" + touch $HAS_CREATE_INSTANCE_FILE + break + done +} + +is_doris_instance_exists() { + output=$(curl -s "${META_SERVICE_ENDPOINT}/MetaService/http/get_instance?token=greedisgood9999&instance_id=${INSTANCE_ID}") + + health_log "get instance output: $output" + code=$(jq -r '.code' <<<$output) + + if [ "$code" != "OK" ]; then + health_log "get instance failed" + return 1 + fi + + return 0 +} + +# Like wait_create_instance, but query meta service directly. +wait_doris_instance_ready() { + ok=0 + for ((i = 0; i < 30; i++)); do + is_doris_instance_exists + if [ $? -eq 0 ]; then + ok=1 + break + fi + + health_log "doris instance not exist yet." + + sleep 1 + done + + if [ $ok -eq 0 ]; then + health_log "wait doris instance too long, exit" + exit 1 + fi + + if [ ! -f $HAS_CREATE_INSTANCE_FILE ]; then + touch $HAS_CREATE_INSTANCE_FILE + fi + + health_log "check doris instance ok" +} diff --git a/docker/runtime/doris-compose/resource/init_fe.sh b/docker/runtime/doris-compose/resource/init_fe.sh index 051711c80043fc..ad93b65f343cc7 100755 --- a/docker/runtime/doris-compose/resource/init_fe.sh +++ b/docker/runtime/doris-compose/resource/init_fe.sh @@ -93,7 +93,15 @@ run_fe() { export DORIS_TDE_AK=${TDE_AK} export DORIS_TDE_SK=${TDE_SK} health_log "run start_fe.sh" - bash $DORIS_HOME/bin/start_fe.sh --daemon $@ | tee -a $DORIS_HOME/log/fe.out + + # Add cluster_snapshot parameter for first startup only (when REGISTER_FILE does not exist) + EXTRA_ARGS="" + if [ -n "$CLUSTER_SNAPSHOT_FILE" ] && [ ! -f "$REGISTER_FILE" ]; then + EXTRA_ARGS="--cluster_snapshot $CLUSTER_SNAPSHOT_FILE" + health_log "Using cluster snapshot: $CLUSTER_SNAPSHOT_FILE" + fi + + bash $DORIS_HOME/bin/start_fe.sh --daemon $EXTRA_ARGS $@ | tee -a $DORIS_HOME/log/fe.out } start_cloud_fe() { @@ -142,6 +150,9 @@ start_cloud_fe() { wait_create_instance +} + +register_sql_server_cluster() { action=add_cluster node_type=FE_MASTER if [ "$MY_ID" != "1" ]; then @@ -197,6 +208,48 @@ start_cloud_fe() { fi touch $REGISTER_FILE +} + +start_cloud_fe() { + if [ -f "$REGISTER_FILE" ] || [ -n "${CLUSTER_SNAPSHOT_FILE}" ]; then + fe_daemon & + run_fe + + # Cluster snapshot is provided, need to register cluster after FE is started. + if [ -n "${CLUSTER_SNAPSHOT_FILE}" ]; then + wait_doris_instance_ready + register_sql_server_cluster + fi + + return + fi + + # Check if SQL_MODE_NODE_MGR is set to 1 + if [ "${SQL_MODE_NODE_MGR}" = "1" ]; then + health_log "SQL_MODE_NODE_MGR is set to 1. Skipping add FE." + + touch $REGISTER_FILE + + fe_daemon & + run_fe + + return + fi + + # Support to create instance in FE startup. + AUTO_CREATE_INSTANCE=${AUTO_CREATE_INSTANCE:-"0"} + if [ "a$MY_ID" == "a1" ] && [ "a$AUTO_CREATE_INSTANCE" == "a1" ]; then + health_log "auto create instance is enabled, trying to create instance" + if [ -f $HAS_CREATE_INSTANCE_FILE ]; then + health_log "instance has been created before, skip create instance" + else + create_doris_instance + fi + else + wait_create_instance + fi + + register_sql_server_cluster fe_daemon & run_fe diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java index a54c707e4e428b..ca1b2d3ad82a10 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java @@ -815,6 +815,7 @@ public class SchemaTable extends Table { .column("LABEL", ScalarType.createStringType()) .column("MSG", ScalarType.createStringType()) .column("COUNT", ScalarType.createType(PrimitiveType.INT)) + .column("VAULT_ID", ScalarType.createStringType()) .build())) .put("cluster_snapshot_properties", new SchemaTable(SystemIdGenerator.getNextId(), "cluster_snapshot_properties", TableType.SCHEMA, diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/snapshot/CloudSnapshotHandler.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/snapshot/CloudSnapshotHandler.java index dad391cc948fa5..0b6dc996df5f6a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/snapshot/CloudSnapshotHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/snapshot/CloudSnapshotHandler.java @@ -62,7 +62,7 @@ protected void runAfterCatalogReady() { // do nothing } - public void submitJob(long ttl, String label) throws Exception { + public void submitJob(long ttl, String label, String vaultName) throws Exception { throw new NotImplementedException("submitJob is not implemented"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AdminCreateClusterSnapshotCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AdminCreateClusterSnapshotCommand.java index 425b6bb21765d2..c7b9cff0cd0e05 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AdminCreateClusterSnapshotCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AdminCreateClusterSnapshotCommand.java @@ -45,11 +45,13 @@ public class AdminCreateClusterSnapshotCommand extends Command implements Forwar public static final String PROP_TTL = "ttl"; public static final String PROP_LABEL = "label"; + public static final String PROP_VAULT_NAME = "vault_name"; private static final Logger LOG = LogManager.getLogger(AdminCreateClusterSnapshotCommand.class); private Map properties; private long ttl; private String label = null; + private String vaultName = null; /** * AdminCreateClusterSnapshotCommand @@ -64,7 +66,7 @@ public AdminCreateClusterSnapshotCommand(Map properties) { public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { validate(ctx); CloudSnapshotHandler cloudSnapshotHandler = ((CloudEnv) ctx.getEnv()).getCloudSnapshotHandler(); - cloudSnapshotHandler.submitJob(ttl, label); + cloudSnapshotHandler.submitJob(ttl, label, vaultName); } /** @@ -106,6 +108,11 @@ public void validate(ConnectContext ctx) throws AnalysisException { if (label == null || label.isEmpty()) { throw new AnalysisException("Property 'label' cannot be empty"); } + } else if (entry.getKey().equalsIgnoreCase(PROP_VAULT_NAME)) { + vaultName = entry.getValue(); + if (vaultName == null || vaultName.isEmpty()) { + throw new AnalysisException("Property 'vault_name' cannot be empty"); + } } else { throw new AnalysisException("Unknown property: " + entry.getKey()); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AdminCreateClusterSnapshotCommandTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AdminCreateClusterSnapshotCommandTest.java index c0ecf74a7632c3..8d4a4dd8105a5e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AdminCreateClusterSnapshotCommandTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AdminCreateClusterSnapshotCommandTest.java @@ -100,10 +100,13 @@ public void testValidateNormal() throws Exception { properties.add(Pair.of(ImmutableMap.of("ttl", "a", "label", "a"), "Invalid value")); properties.add(Pair.of(ImmutableMap.of("ttl", "0", "label", "a"), "Property 'ttl' must be positive")); properties.add(Pair.of(ImmutableMap.of("ttl", "3600", "label", ""), "Property 'label' cannot be empty")); + properties.add(Pair.of(ImmutableMap.of("ttl", "3600", "label", "a", "vault_name", ""), + "Property 'vault_name' cannot be empty")); // unknown property properties.add(Pair.of(ImmutableMap.of("ttl", "0", "a", "b"), "Unknown property")); // normal case properties.add(Pair.of(ImmutableMap.of("ttl", "3600", "label", "abc"), "")); + properties.add(Pair.of(ImmutableMap.of("ttl", "3600", "label", "abc", "vault_name", "vault_1"), "")); for (Pair, String> entry : properties) { AdminCreateClusterSnapshotCommand command0 = new AdminCreateClusterSnapshotCommand(entry.first); diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index 710a0b0dd796e0..e4cea4d2c2ffcc 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -2105,6 +2105,7 @@ message BeginSnapshotRequest { optional int64 timeout_seconds = 4; optional int64 ttl_seconds = 5; optional string request_ip = 6; + optional string vault_name = 7; } message BeginSnapshotResponse { @@ -2173,6 +2174,7 @@ message SnapshotInfoPB { optional int64 snapshot_logical_data_size = 17; optional int64 snapshot_retained_data_size = 18; optional int64 snapshot_billable_data_size = 19; + optional string resource_id = 20; } message ListSnapshotRequest { diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index 01ed92f3e70115..3c14e06eeb2d79 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -384,6 +384,156 @@ class Suite implements GroovyInterceptable { } } + /** + * Create and manage multiple Docker clusters for multi-cluster test scenarios. + * + * Usage example: + * dockers([ + * "cluster_1": new ClusterOptions(cloudMode: true, feNum: 1, beNum: 1, msNum: 1), + * "cluster_2": new ClusterOptions(cloudMode: true, feNum: 1, beNum: 1, msNum: 0, externalMsCluster: "cluster_1") + * ]) { clusters -> + * connectWithDockerCluster(clusters.cluster_1) { sql "..." } + * connectWithDockerCluster(clusters.cluster_2) { sql "..." } + * } + * + * Important: + * - Must use LinkedHashMap to preserve insertion order + * - Clusters are created in map insertion order + * - Clusters are destroyed in reverse order (dependent clusters first) + * - If using externalMsCluster, the referenced cluster must appear earlier in the map + * + * @param clusterConfigs LinkedHashMap of cluster name to ClusterOptions + * @param manual_init_clusters Set of cluster names to skip automatic initialization + * @param actionSupplier Closure receiving Map for test execution + */ + void dockers(LinkedHashMap clusterConfigs, + Set manual_init_clusters = new HashSet<>(), Closure actionSupplier) throws Exception { + if (context.config.excludeDockerTest) { + logger.info("do not run the docker suite {}, because regression config excludeDockerTest=true", name) + return + } + + if (RegressionTest.getGroupExecType(group) != RegressionTest.GroupExecType.DOCKER) { + throw new Exception("Need to add 'docker' to docker suite's belong groups, " + + "see example demo_p0/docker_action.groovy") + } + + if (context.isMultiDockerClusterRunning) { + throw new Exception("Nested dockers() calls are not supported") + } + + // Validate cluster configs + Set clusterNames = new HashSet<>() + for (def entry : clusterConfigs.entrySet()) { + String clusterName = entry.key + ClusterOptions options = entry.value + + if (clusterNames.contains(clusterName)) { + throw new Exception("Duplicate cluster name: ${clusterName}") + } + clusterNames.add(clusterName) + + // Validate externalMsCluster reference + if (options.externalMsCluster != null && !options.externalMsCluster.isEmpty()) { + if (!clusterNames.contains(options.externalMsCluster)) { + throw new Exception("Cluster ${clusterName} references non-existent external MS cluster: ${options.externalMsCluster}") + } + if (options.msNum > 0) { + throw new Exception("Cluster ${clusterName} cannot have its own MS when using external MS cluster") + } + } + } + + List clusterNamesReversed = new ArrayList<>(clusterConfigs.keySet()) + Collections.reverse(clusterNamesReversed) + + // Use LinkedHashMap to preserve order + Map clusters = new LinkedHashMap<>() + + try { + // Create and initialize clusters in order + for (def entry : clusterConfigs.entrySet()) { + String clusterName = entry.key + ClusterOptions options = entry.value + + logger.info("Creating cluster: ${clusterName}") + SuiteCluster cluster = new SuiteCluster(clusterName, context.config) + + clusters.put(clusterName, cluster) + } + + for (String clusterName : clusterNamesReversed) { + clusters.get(clusterName).destroy(true) + } + + for (def entry : clusterConfigs.entrySet()) { + String clusterName = entry.key + ClusterOptions options = entry.value + SuiteCluster cluster = clusters.get(clusterName) + + if (manual_init_clusters.contains(clusterName)) { + logger.info("Skipping initialization of cluster: ${clusterName}") + continue + } + + // Determine cloud mode + boolean isCloud = false + if (options.cloudMode == null) { + // If not specified, use config default or run both modes + if (context.config.runMode == RunMode.CLOUD) { + isCloud = true + } else if (context.config.runMode == RunMode.NOT_CLOUD) { + isCloud = false + } else { + throw new Exception("cloudMode must be specified when runMode is UNKNOWN for multi-cluster setup") + } + } else { + if (options.cloudMode == true && context.config.runMode == RunMode.NOT_CLOUD) { + logger.info("Skip cluster ${clusterName} because cloudMode=true but regression test is in local mode") + continue + } + if (options.cloudMode == false && context.config.runMode == RunMode.CLOUD) { + logger.info("Skip cluster ${clusterName} because cloudMode=false but regression test is in cloud mode") + continue + } + isCloud = options.cloudMode + } + logger.info("Initializing cluster ${cluster.name} in ${isCloud ? 'cloud' : 'not_cloud'} mode") + cluster.init(options, isCloud) + logger.info("Cluster ${clusterName} initialized successfully") + } + + // Wait for BE to report + Thread.sleep(5000) + + Connection originConnection = context.threadLocalConn.get() + context.threadLocalConn.remove() + context.isMultiDockerClusterRunning = true + try { + actionSupplier.call(clusters) + } finally { + context.isMultiDockerClusterRunning = false + if (originConnection == null) { + context.threadLocalConn.remove() + } else { + context.threadLocalConn.set(originConnection) + } + } + } finally { + // Destroy clusters in reverse order + if (!context.config.dockerEndNoKill) { + for (String clusterName : clusterNamesReversed) { + try { + logger.info("Destroying cluster: ${clusterName}") + clusters.get(clusterName).destroy(context.config.dockerEndDeleteFiles) + } catch (Throwable t) { + logger.warn("Failed to destroy cluster ${clusterName}", t) + } + } + } + } + } + String get_ccr_body(String table, String db = null) { if (db == null) { db = context.dbName diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy index dd1c2b8f2fddbe..83c6f0cb60aa90 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy @@ -98,6 +98,26 @@ class ClusterOptions { String tdeAk = ""; String tdeSk = ""; + // Use external meta service cluster (shared MS/FDB) + // Specify the cluster name that provides MS/FDB services + // When set, this cluster will not create its own MS/FDB/Recycler + // Example: externalMsCluster = "shared-meta" (Cloud mode only) + String externalMsCluster = null + + // Specify the instance id. + // When not set, "default_instance_id" will be used. (Cloud mode only) + String instanceId = null; + + // Cluster snapshot JSON content for FE-1 first startup in cloud mode only. + // The JSON will be written to FE conf/cluster_snapshot.json and passed to start_fe.sh + // with --cluster_snapshot parameter. Only effective on first startup. + // Example: clusterSnapshot = '{"cloud_unique_id":"1:instance_id:xxx"}' + String clusterSnapshot = null; + + // Create cloud instance in storage-vault mode instead of legacy obj_info mode. + // Docker framework will also create a default storage vault automatically for new clusters. + Boolean enableStorageVault = false; + void enableDebugPoints() { feConfigs.add('enable_debug_points=true') beConfigs.add('enable_debug_points=true') @@ -356,6 +376,14 @@ class SuiteCluster { cmd += ['--extra-hosts'] cmd += options.extraHosts } + def envs = new ArrayList(options.environments) + if (options.enableStorageVault) { + envs.add('ENABLE_STORAGE_VAULT=1') + } + if (!envs.isEmpty()) { + cmd += ['--env'] + cmd += envs + } if (!options.cloudStoreConfigs.isEmpty()) { cmd += ['--cloud-config'] cmd += options.cloudStoreConfigs @@ -391,6 +419,23 @@ class SuiteCluster { cmd += options.tdeSk } + if (options.externalMsCluster != null && options.externalMsCluster != "") { + cmd += ['--external-ms', options.externalMsCluster] + } + + if (options.instanceId != null && options.instanceId != "") { + cmd += ['--instance-id', options.instanceId] + } + + if (options.clusterSnapshot != null && options.clusterSnapshot != "") { + // Remove newlines and extra whitespace to make it a compact JSON string + def compactJson = options.clusterSnapshot + .replaceAll(/\s+/, ' ') // Replace all whitespace sequences with single space + .trim() // Remove leading/trailing spaces + // No need to escape when using list-based execution + cmd += ['--cluster-snapshot', compactJson] + } + cmd += ['--wait-timeout', String.valueOf(options.waitTimeout)] sqlModeNodeMgr = options.sqlModeNodeMgr