diff --git a/be/src/scheduling/cluster-membership-mgr-test.cc b/be/src/scheduling/cluster-membership-mgr-test.cc index f67d508d38..243842ea1e 100644 --- a/be/src/scheduling/cluster-membership-mgr-test.cc +++ b/be/src/scheduling/cluster-membership-mgr-test.cc @@ -707,6 +707,13 @@ TEST(ClusterMembershipMgrUnitTest, TestPopulateExpectedExecGroupSets) { EXPECT_FALSE(status.ok()); EXPECT_EQ(status.msg().GetFullMessageDetails(), "Executor group set prefix specified multiple times: group-prefix1:10\n"); + + // Case 10: Star in FLAGS_expected_executor_group_sets + FLAGS_expected_executor_group_sets = "*"; + expected_exec_group_sets.clear(); + status = ClusterMembershipMgr::PopulateExpectedExecGroupSets(expected_exec_group_sets); + EXPECT_TRUE(status.ok()); + EXPECT_EQ(expected_exec_group_sets.size(), 0); } /// This ensures that all executor group configuration scenarios possible using available @@ -838,6 +845,30 @@ TEST(ClusterMembershipMgrUnitTest, PopulateExecutorMembershipRequest) { EXPECT_EQ(update_req.exec_group_sets[1].exec_group_name_prefix, "bar"); snapshot_ptr->executor_groups.clear(); } + + // Case 3: Using executor groups, expected_exec_group_sets is * + { + FLAGS_expected_executor_group_sets = "*"; + + ExecutorGroup exec_group("foo-group1", 1); + exec_group.AddExecutor(MakeBackendDescriptor(1, exec_group, 0)); + snapshot_ptr->executor_groups.insert({exec_group.name(), exec_group}); + ExecutorGroup exec_group2("bar-group1", 1); + exec_group2.AddExecutor(MakeBackendDescriptor(1, exec_group2, 1)); + exec_group2.AddExecutor(MakeBackendDescriptor(2, exec_group2, 2)); + snapshot_ptr->executor_groups.insert({exec_group2.name(), exec_group2}); + ClusterMembershipMgr::SnapshotPtr ptr = snapshot_ptr; + PopulateExecutorMembershipRequest(ptr, empty_exec_group_sets, update_req); + EXPECT_EQ(update_req.exec_group_sets.size(), 2); + // reverse order is ok + EXPECT_EQ(update_req.exec_group_sets[1].curr_num_executors, 1); + EXPECT_EQ(update_req.exec_group_sets[1].expected_num_executors, -1); + EXPECT_EQ(update_req.exec_group_sets[1].exec_group_name_prefix, "foo-group1"); + EXPECT_EQ(update_req.exec_group_sets[0].curr_num_executors, 2); + EXPECT_EQ(update_req.exec_group_sets[0].expected_num_executors, -1); + EXPECT_EQ(update_req.exec_group_sets[0].exec_group_name_prefix, "bar-group1"); + snapshot_ptr->executor_groups.clear(); + } } template diff --git a/be/src/scheduling/cluster-membership-mgr.cc b/be/src/scheduling/cluster-membership-mgr.cc index ce3c211db4..89266ce242 100644 --- a/be/src/scheduling/cluster-membership-mgr.cc +++ b/be/src/scheduling/cluster-membership-mgr.cc @@ -737,6 +737,17 @@ void PopulateExecutorMembershipRequest(const ClusterMembershipMgr::SnapshotPtr& } } } else { + if (FLAGS_expected_executor_group_sets == "*") { + LOG(INFO) << + "Special case handling for FLAGS_expected_executor_group_sets == \"*\""; + for (const auto& it : snapshot->executor_groups) { + // order does not matter + exec_group_sets.emplace_back(); + exec_group_sets.back().__set_exec_group_name_prefix(it.first); + // We set expected_num_executors to -1 to identify automation from Frontend + exec_group_sets.back().__set_expected_num_executors(-1); + } + } else if (expected_exec_group_sets.empty()) { // Add a default exec group set if no expected group sets were specified. exec_group_sets.emplace_back(); @@ -745,6 +756,7 @@ void PopulateExecutorMembershipRequest(const ClusterMembershipMgr::SnapshotPtr& exec_group_sets.insert(exec_group_sets.begin(), expected_exec_group_sets.begin(), expected_exec_group_sets.end()); } + int matching_exec_groups_found = 0; for (auto& set : exec_group_sets) { int max_num_executors = -1; @@ -788,6 +800,11 @@ Status ClusterMembershipMgr::PopulateExpectedExecGroupSets( expected_exec_group_sets.clear(); std::unordered_set parsed_group_prefixes; vector groups; + + if (FLAGS_expected_executor_group_sets == "*") { + return Status::OK(); + } + groups = strings::Split(FLAGS_expected_executor_group_sets, ",", strings::SkipEmpty()); if (groups.empty()) return Status::OK(); diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index 15d43ebe5a..399301c8c4 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -311,7 +311,8 @@ DEFINE_string(expected_executor_group_sets, "", "prefix1-group1, prefix1-group2, etc. The expected group size (number of executors " "in each group) is used during planning when no healthy executor group is available. " "If this flag is used then any executor groups that do not map to the specified group" - " sets will never be used to schedule queries."); + " sets will never be used to schedule queries. If this flag set to “*“ will populate " + "all healthy resource groups."); // TODO: can we automatically choose a startup grace period based on the max admission // control queue timeout + some margin for error? diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index e677580d3f..16a581ddc9 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -2113,12 +2113,21 @@ public static List setupThresholdsForExecutorGroupSets( // Executor groups exist in the cluster. Identify those that can be used. for (TExecutorGroupSet e : executorGroupSets) { - // If defined, request_pool can be a suffix of the group name prefix. For example - // group_set_prefix = root.queue1 - // request_pool = queue1 - if (StringUtils.isNotEmpty(request_pool) - && !e.getExec_group_name_prefix().endsWith(request_pool)) { - continue; + if (StringUtils.isNotEmpty(request_pool)) { + // If defined, request_pool can be a suffix of the group name prefix. For example + // group_set_prefix = root.queue1 + // request_pool = queue1 + if (!e.getExec_group_name_prefix().endsWith(request_pool) + && e.getExpected_num_executors() >= 0) { + continue; + } + // in case of automation (we set expected_num_executors == -1 in that case) we will have + // group_set_prefix = queue1-1-2-3 + // request_pool = queue1 + if (!e.getExec_group_name_prefix().startsWith(request_pool) + && e.getExpected_num_executors() < 0) { + continue; + } } TExecutorGroupSet new_entry = new TExecutorGroupSet(e); if (poolService != null) {