Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions be/src/information_schema/schema_cluster_snapshots_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ std::vector<SchemaScanner::ColumnDesc> SchemaClusterSnapshotsScanner::_s_tbls_co
{"LABEL", TYPE_STRING, sizeof(StringRef), true},
{"MSG", TYPE_STRING, sizeof(StringRef), true},
{"COUNT", TYPE_INT, sizeof(int32_t), true},
{"VAULT_ID", TYPE_STRING, sizeof(StringRef), true},
};

SchemaClusterSnapshotsScanner::SchemaClusterSnapshotsScanner()
Expand Down Expand Up @@ -247,6 +248,19 @@ Status SchemaClusterSnapshotsScanner::_fill_block_impl(Block* block) {
}
RETURN_IF_ERROR(fill_dest_column_for_range(block, 11, datas));
}
// resource_id
{
for (int i = 0; i < row_num; ++i) {
auto& snapshot = _snapshots[i];
if (snapshot.has_resource_id()) {
strs[i] = StringRef(snapshot.resource_id().c_str(), snapshot.resource_id().size());
datas[i] = strs.data() + i;
} else {
datas[i] = nullptr;
}
}
RETURN_IF_ERROR(fill_dest_column_for_range(block, 12, datas));
}
return Status::OK();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ TEST_F(SchemaClusterSnapshotsScannerTest, test_get_next_block_internal) {
snapshot.set_ttl_seconds(3600);
snapshot.set_snapshot_label("label");
snapshot.set_reason("reason");
snapshot.set_resource_id("vault_1");
snapshots.push_back(snapshot);
}

Expand All @@ -62,6 +63,10 @@ TEST_F(SchemaClusterSnapshotsScannerTest, test_get_next_block_internal) {
auto col = data_block->safe_get_by_position(0);
auto v = (*col.column)[1].get<TYPE_STRING>();
EXPECT_EQ(v, "232ds");

auto vault_col = data_block->safe_get_by_position(12);
auto vault_id = (*vault_col.column)[1].get<TYPE_STRING>();
EXPECT_EQ(vault_id, "vault_1");
}

} // namespace doris
14 changes: 14 additions & 0 deletions docker/runtime/doris-compose/Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,20 @@ python docker/runtime/doris-compose/doris-compose.py up <cluster-name> <image
[--fe-id <fd-id> --be-id <be-id>]
...
[ --cloud ]
[ --cluster-snapshot <cluster-snapshot-json> ]
```

if it's a new cluster, must specific the image.

add fe/be nodes with the specific image, or update existing nodes with `--fe-id`, `--be-id`

The `--cluster-snapshot` parameter allows you to provide a cluster snapshot JSON content for FE-1 first startup in cloud mode only. The JSON will be written to FE conf/cluster_snapshot.json and passed to start_fe.sh with --cluster_snapshot parameter. This is only effective on first startup.

Example:
```shell
python docker/runtime/doris-compose/doris-compose.py up my-cluster my-image --cloud --cluster-snapshot '{"instance_id":"instance_id_xxx"}'
```

For create a cloud cluster, steps are as below:

1. Write cloud s3 store config file, its default path is '/tmp/doris/cloud.ini'.
Expand All @@ -142,6 +150,12 @@ The simplest way to create a cloud cluster:
python docker/runtime/doris-compose/doris-compose.py up <cluster-name> <image> --cloud
```

To create a cloud cluster with a custom cluster snapshot:

```shell
python docker/runtime/doris-compose/doris-compose.py up <cluster-name> <image> --cloud --cluster-snapshot '{"instance_id":"instance_id_xxx"}'
```

It will create 1 fdb, 1 meta service server, 1 recycler, 3 fe and 3 be.

### Remove node from the cluster
Expand Down
54 changes: 51 additions & 3 deletions docker/runtime/doris-compose/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,20 @@
LOG = utils.get_logger()


def is_true(value):
return str(value).strip().lower() in ("1", "true", "yes", "y", "on")


def get_env_value(envs, name):
for env in envs or []:
pos = env.find('=')
if pos == -1:
continue
if env[:pos] == name:
return env[pos + 1:]
return None


def get_cluster_path(cluster_name):
return os.path.join(LOCAL_DORIS_PATH, cluster_name)

Expand Down Expand Up @@ -397,6 +411,7 @@ def docker_env(self):
"STOP_GRACE": 1 if enable_coverage else 0,
"IS_CLOUD": 1 if self.cluster.is_cloud else 0,
"SQL_MODE_NODE_MGR": 1 if self.cluster.sql_mode_node_mgr else 0,
"ENABLE_STORAGE_VAULT": 1 if getattr(self.cluster, "enable_storage_vault", False) else 0,
"TDE_AK": self.get_tde_ak(),
"TDE_SK": self.get_tde_sk(),
}
Expand Down Expand Up @@ -528,6 +543,21 @@ def init(self):
"is_cloud_follower"] = self.cluster.is_cloud and self.cluster.fe_follower
super().init()

def init_conf(self):
# Call parent's init_conf first
super().init_conf()

# Write cluster_snapshot.json for FE-1 in cloud mode only
if self.id == 1 and self.cluster.is_cloud and self.cluster.cluster_snapshot:
conf_dir = os.path.join(self.get_path(), "conf")
snapshot_file = os.path.join(conf_dir, "cluster_snapshot.json")
try:
with open(snapshot_file, "w") as f:
f.write(self.cluster.cluster_snapshot)
LOG.info(f"Written cluster snapshot to {snapshot_file}")
except Exception as e:
LOG.warning(f"Failed to write cluster snapshot file: {e}")

def get_add_init_config(self):
cfg = super().get_add_init_config()
if self.cluster.fe_config:
Expand Down Expand Up @@ -578,6 +608,11 @@ def docker_env(self):
envs["CLOUD_UNIQUE_ID"] = self.cloud_unique_id()
if self.meta["is_cloud_follower"]:
envs["IS_FE_FOLLOWER"] = 1
# Add CLUSTER_SNAPSHOT_FILE env var for FE-1 if the file exists
if self.id == 1:
snapshot_file = os.path.join(self.get_path(), "conf", "cluster_snapshot.json")
if os.path.exists(snapshot_file):
envs["CLUSTER_SNAPSHOT_FILE"] = "./conf/cluster_snapshot.json"
envs["MY_QUERY_PORT"] = self.meta["ports"]["query_port"]
envs["MY_EDITLOG_PORT"] = self.meta["ports"]["edit_log_port"]
return envs
Expand Down Expand Up @@ -832,7 +867,9 @@ def __init__(self, name, subnet, image, is_cloud, is_root_user, fe_config,
be_config, ms_config, recycle_config, remote_master_fe,
local_network_ip, fe_follower, be_disks, be_cluster, reg_be,
extra_hosts, coverage_dir, cloud_store_config,
sql_mode_node_mgr, be_metaservice_endpoint, be_cluster_id, tde_ak, tde_sk):
sql_mode_node_mgr, be_metaservice_endpoint, be_cluster_id, tde_ak, tde_sk,
external_ms_cluster, instance_id, cluster_snapshot="",
enable_storage_vault=False):
self.name = name
self.subnet = subnet
self.image = image
Expand All @@ -851,6 +888,14 @@ def __init__(self, name, subnet, image, is_cloud, is_root_user, fe_config,
self.extra_hosts = extra_hosts
self.coverage_dir = coverage_dir
self.cloud_store_config = cloud_store_config
self.external_ms_cluster = external_ms_cluster
self.instance_id = instance_id
if not self.instance_id:
self.instance_id = f"instance_{name}" if self.external_ms_cluster else "default_instance_id"
# cluster_snapshot is not persisted to meta, only used during cluster creation
self.cluster_snapshot = cluster_snapshot
self.enable_storage_vault = is_true(enable_storage_vault)
self.is_rollback = False
self.groups = {
node_type: Group(node_type)
for node_type in Node.TYPE_ALL
Expand All @@ -869,7 +914,9 @@ def new(name, image, is_cloud, is_root_user, fe_config, be_config,
ms_config, recycle_config, remote_master_fe, local_network_ip,
fe_follower, be_disks, be_cluster, reg_be, extra_hosts,
coverage_dir, cloud_store_config, sql_mode_node_mgr,
be_metaservice_endpoint, be_cluster_id, tde_ak, tde_sk):
be_metaservice_endpoint, be_cluster_id, tde_ak, tde_sk,
external_ms_cluster, instance_id, cluster_snapshot="",
enable_storage_vault=False):
if not os.path.exists(LOCAL_DORIS_PATH):
os.makedirs(LOCAL_DORIS_PATH, exist_ok=True)
os.chmod(LOCAL_DORIS_PATH, 0o777)
Expand All @@ -884,7 +931,8 @@ def new(name, image, is_cloud, is_root_user, fe_config, be_config,
be_disks, be_cluster, reg_be, extra_hosts,
coverage_dir, cloud_store_config,
sql_mode_node_mgr, be_metaservice_endpoint,
be_cluster_id, tde_ak, tde_sk)
be_cluster_id, tde_ak, tde_sk, external_ms_cluster,
instance_id, cluster_snapshot, enable_storage_vault)
os.makedirs(cluster.get_path(), exist_ok=True)
os.makedirs(get_status_path(name), exist_ok=True)
cluster._save_meta()
Expand Down
38 changes: 37 additions & 1 deletion docker/runtime/doris-compose/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,35 @@ def add_parser(self, args_parsers):
"Only use when creating new cluster and specify --remote-master-fe."
)

parser.add_argument(
"--external-ms",
type=str,
help=
"Use external meta service cluster (specify cluster name). " \
"This cluster will not create its own MS/FDB/Recycler, but use the specified cluster's services. " \
"The external cluster must be a cloud cluster with MS/FDB already running. " \
"Example: --external-ms shared-meta. Only use when creating new cloud cluster."
)

parser.add_argument(
"--instance-id",
type=str,
help=
"Specify instance ID for cloud mode. If not specified, will auto-generate 'default_instance_id'. " \
"When using external MS with multiple clusters, each cluster should have a unique instance ID. " \
"Example: --instance-id prod_instance_1"
)

parser.add_argument(
"--cluster-snapshot",
type=str,
help=
"Cluster snapshot JSON content for FE-1 first startup in cloud mode only. " \
"The JSON will be written to FE conf/cluster_snapshot.json and passed to start_fe.sh " \
"with --cluster_snapshot parameter. Only effective on first startup. " \
"Example: --cluster-snapshot '{\"instance_id\":\"instance_id_xxx\"}'"
)

if self._support_boolean_action():
parser.add_argument(
"--be-metaservice-endpoint",
Expand Down Expand Up @@ -624,13 +653,20 @@ def run(self, args):
if args.cloud:
args.sql_mode_node_mgr = True

instance_id = getattr(args, 'instance_id', None)
cluster_snapshot = getattr(args, 'cluster_snapshot', '')
enable_storage_vault = CLUSTER.is_true(
CLUSTER.get_env_value(args.env, "ENABLE_STORAGE_VAULT"))

cluster = CLUSTER.Cluster.new(
args.NAME, args.IMAGE, args.cloud, args.root, args.fe_config,
args.be_config, args.ms_config, args.recycle_config,
args.remote_master_fe, args.local_network_ip, args.fe_follower,
args.be_disks, args.be_cluster, args.reg_be, args.extra_hosts,
args.coverage_dir, cloud_store_config, args.sql_mode_node_mgr,
args.be_metaservice_endpoint, args.be_cluster_id, args.tde_ak, args.tde_sk)
args.be_metaservice_endpoint, args.be_cluster_id, args.tde_ak, args.tde_sk,
external_ms_cluster, instance_id, cluster_snapshot,
enable_storage_vault)
LOG.info("Create new cluster {} succ, cluster path is {}".format(
args.NAME, cluster.get_path()))

Expand Down
96 changes: 96 additions & 0 deletions docker/runtime/doris-compose/resource/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,99 @@ wait_pid() {

health_log "wait end"
}

create_doris_instance() {
while true; do

lock_cluster

if [[ "${ENABLE_STORAGE_VAULT}" =~ ^([Tt][Rr][Uu][Ee]|[Yy][Ee][Ss]|[Yy]|[Oo][Nn]|1)$ ]]; then
output=$(curl -s "${META_SERVICE_ENDPOINT}/MetaService/http/create_instance?token=greedisgood9999" \
-d '{"instance_id":"'"${INSTANCE_ID}"'",
"name": "'"${INSTANCE_ID}"'",
"user_id": "'"${DORIS_CLOUD_USER}"'",
"vault": {
"obj_info": {
"ak": "'"${DORIS_CLOUD_AK}"'",
"sk": "'"${DORIS_CLOUD_SK}"'",
"bucket": "'"${DORIS_CLOUD_BUCKET}"'",
"endpoint": "'"${DORIS_CLOUD_ENDPOINT}"'",
"external_endpoint": "'"${DORIS_CLOUD_EXTERNAL_ENDPOINT}"'",
"prefix": "'"${DORIS_CLOUD_PREFIX}"'",
"region": "'"${DORIS_CLOUD_REGION}"'",
"provider": "'"${DORIS_CLOUD_PROVIDER}"'"
}}}')
else
output=$(curl -s "${META_SERVICE_ENDPOINT}/MetaService/http/create_instance?token=greedisgood9999" \
-d '{"instance_id":"'"${INSTANCE_ID}"'",
"name": "'"${INSTANCE_ID}"'",
"user_id": "'"${DORIS_CLOUD_USER}"'",
"obj_info": {
"ak": "'"${DORIS_CLOUD_AK}"'",
"sk": "'"${DORIS_CLOUD_SK}"'",
"bucket": "'"${DORIS_CLOUD_BUCKET}"'",
"endpoint": "'"${DORIS_CLOUD_ENDPOINT}"'",
"external_endpoint": "'"${DORIS_CLOUD_EXTERNAL_ENDPOINT}"'",
"prefix": "'"${DORIS_CLOUD_PREFIX}"'",
"region": "'"${DORIS_CLOUD_REGION}"'",
"provider": "'"${DORIS_CLOUD_PROVIDER}"'"
}}')
fi

unlock_cluster

health_log "create instance output: $output"
code=$(jq -r '.code' <<<$output)

if [ "$code" != "OK" ]; then
health_log "create instance failed"
sleep 1
continue
fi

health_log "create doris instance succ, output: $output"
touch $HAS_CREATE_INSTANCE_FILE
break
done
}

is_doris_instance_exists() {
output=$(curl -s "${META_SERVICE_ENDPOINT}/MetaService/http/get_instance?token=greedisgood9999&instance_id=${INSTANCE_ID}")

health_log "get instance output: $output"
code=$(jq -r '.code' <<<$output)

if [ "$code" != "OK" ]; then
health_log "get instance failed"
return 1
fi

return 0
}

# Like wait_create_instance, but query meta service directly.
wait_doris_instance_ready() {
ok=0
for ((i = 0; i < 30; i++)); do
is_doris_instance_exists
if [ $? -eq 0 ]; then
ok=1
break
fi

health_log "doris instance not exist yet."

sleep 1
done

if [ $ok -eq 0 ]; then
health_log "wait doris instance too long, exit"
exit 1
fi

if [ ! -f $HAS_CREATE_INSTANCE_FILE ]; then
touch $HAS_CREATE_INSTANCE_FILE
fi

health_log "check doris instance ok"
}
Loading
Loading