diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBHintIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBHintIT.java new file mode 100644 index 0000000000000..b5c83fc16c630 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBHintIT.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.relational.it.query.recent; + +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.TableClusterIT; +import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; +import org.apache.iotdb.itbase.env.BaseEnv; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Locale; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({TableLocalStandaloneIT.class, TableClusterIT.class}) +public class IoTDBHintIT { + private static final String DATABASE_NAME = "testdb"; + + private static final String[] creationSqls = + new String[] { + "CREATE DATABASE IF NOT EXISTS testdb", + "USE testdb", + "CREATE TABLE IF NOT EXISTS testtb(voltage FLOAT FIELD, manufacturer STRING FIELD, deviceid STRING TAG)", + "INSERT INTO testtb VALUES(1000, 100.0, 'a', 'd1')", + "INSERT INTO testtb VALUES(2000, 200.0, 'b', 'd1')", + "INSERT INTO testtb VALUES(1000, 300.0, 'c', 'd2')", + }; + + private static final String dropDbSqls = "DROP DATABASE IF EXISTS testdb"; + + @BeforeClass + public static void setUpClass() { + Locale.setDefault(Locale.ENGLISH); + + EnvFactory.getEnv() + .getConfig() + .getCommonConfig() + .setPartitionInterval(1000) + .setMemtableSizeThreshold(10000); + EnvFactory.getEnv().initClusterEnvironment(); + } + + @AfterClass + public static void tearDownClass() { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Before + public void setUp() { + prepareData(); + } + + @After + public void tearDown() { + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + statement.execute(dropDbSqls); + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testReplicaHintWithInvalidTable() { + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + statement.execute("use " + DATABASE_NAME); + String sql = "SELECT /*+ REPLICA(t1, 2) */ * FROM testtb"; + statement.executeQuery(sql); + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testReplicaHintWithSystemTableQuery() { + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + String sql = "SELECT /*+ REPLICA(0) */ * FROM information_schema.tables"; + statement.executeQuery(sql); + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testReplicaHintWithCTE() { + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + statement.execute("use " + DATABASE_NAME); + // REPLICA hint in CTE definition + String sql1 = + "WITH cte1 AS (SELECT /*+ REPLICA(testtb,0) */ * FROM testtb) SELECT * FROM cte1"; + statement.executeQuery(sql1); + // REPLICA hint in materialized CTE definition + String sql2 = + "WITH cte1 AS materialized (SELECT /*+ REPLICA(testtb,0) */ * FROM testtb) SELECT * FROM cte1"; + statement.executeQuery(sql2); + // REPLICA hint in main query + String sql3 = + "WITH cte1 AS (SELECT * FROM testtb) SELECT /*+ REPLICA(testtb, 0) */ * FROM cte1"; + statement.executeQuery(sql3); + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testReplicaHintWithExplain() { + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + statement.execute("use " + DATABASE_NAME); + String sql = "EXPLAIN SELECT /*+ REPLICA(0) */ * FROM testtb"; + statement.executeQuery(sql); + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testReplicaHintWithJoin() { + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + statement.execute("use " + DATABASE_NAME); + String sql = + "SELECT /*+ REPLICA(a, 0) REPLICA(b, 1) */ * FROM testtb as a INNER JOIN testtb as b ON a.voltage = b.voltage"; + ResultSet rs = statement.executeQuery(sql); + int count = 0; + while (rs.next()) { + count++; + } + assertEquals("Expected 3 rows from the join query", 3, count); + } catch (Exception e) { + fail(e.getMessage()); + } + } + + private static void prepareData() { + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + + for (String sql : creationSqls) { + statement.execute(sql); + } + } catch (Exception e) { + fail(e.getMessage()); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java index a507b98008bbd..681233fe97779 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java @@ -151,6 +151,9 @@ public enum ExplainType { // Tables in the subquery private final Map, List> subQueryTables = new HashMap<>(); + // parallel hint + private int parallelism = 0; + @TestOnly public MPPQueryContext(QueryId queryId) { this.queryId = queryId; @@ -674,5 +677,13 @@ public IAuditEntity setSqlString(String sqlString) { return this; } + public int getParallelism() { + return parallelism; + } + + public void setParallelism(int parallelism) { + this.parallelism = parallelism; + } + // ================= Authentication Interfaces ========================= } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index df9ac2d2f936f..314cbb59aebe4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -162,6 +162,9 @@ public class FragmentInstanceContext extends QueryContext { private long closedSeqFileNum = 0; private long closedUnseqFileNum = 0; + // parallel hint + private int parallelism = 0; + public static FragmentInstanceContext createFragmentInstanceContext( FragmentInstanceId id, FragmentInstanceStateMachine stateMachine, @@ -1193,4 +1196,12 @@ public boolean ignoreNotExistsDevice() { public boolean isSingleSourcePath() { return singleSourcePath; } + + public int getParallelism() { + return parallelism; + } + + public void setParallelism(int parallelism) { + this.parallelism = parallelism; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java index 1898cbfe53ccb..3a4241cab3a08 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java @@ -164,6 +164,9 @@ public FragmentInstanceInfo execDataQueryFragmentInstance( instance.isDebug(), instance.isVerbose())); + // set parallelism from fragment instance + context.setParallelism(instance.getParallelism()); + try { List driverFactories = planner.plan( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/memory/TableModelStatementMemorySourceVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/memory/TableModelStatementMemorySourceVisitor.java index 932d941979223..2fcc8a608ada1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/memory/TableModelStatementMemorySourceVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/memory/TableModelStatementMemorySourceVisitor.java @@ -92,7 +92,7 @@ public StatementMemorySource visitExplain( // Generate table model distributed plan final TableDistributedPlanGenerator.PlanContext planContext = - new TableDistributedPlanGenerator.PlanContext(); + new TableDistributedPlanGenerator.PlanContext(context.getAnalysis().getHintMap()); final PlanNode outputNodeWithExchange = new TableDistributedPlanner( context.getAnalysis(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java index d53e29e16924f..3b78c8c34f49b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java @@ -72,6 +72,9 @@ public class FragmentInstance implements IConsensusRequest { private boolean isHighestPriority; + // parallel hint + private int parallelism = 0; + // indicate which index we are retrying private transient int nextRetryIndex = 0; @@ -101,6 +104,7 @@ public FragmentInstance( this.timeOut = timeOut > 0 ? timeOut : CONFIG.getQueryTimeoutThreshold(); this.isRoot = false; this.sessionInfo = sessionInfo; + this.parallelism = 0; this.debug = debug; this.verbose = verbose; } @@ -213,6 +217,14 @@ public void setDataNodeFINum(int dataNodeFINum) { this.dataNodeFINum = dataNodeFINum; } + public int getParallelism() { + return parallelism; + } + + public void setParallelism(int parallelism) { + this.parallelism = parallelism; + } + public boolean isDebug() { return debug; } @@ -250,6 +262,7 @@ public static FragmentInstance deserializeFrom(ByteBuffer buffer) { TimePredicate globalTimePredicate = hasTimePredicate ? TimePredicate.deserialize(buffer) : null; QueryType queryType = QueryType.values()[ReadWriteIOUtils.readInt(buffer)]; int dataNodeFINum = ReadWriteIOUtils.readInt(buffer); + int parallelism = ReadWriteIOUtils.readInt(buffer); boolean debug = ReadWriteIOUtils.readBool(buffer); boolean verbose = ReadWriteIOUtils.readBool(buffer); FragmentInstance fragmentInstance = @@ -263,6 +276,7 @@ public static FragmentInstance deserializeFrom(ByteBuffer buffer) { dataNodeFINum, debug, verbose); + fragmentInstance.setParallelism(parallelism); boolean hasHostDataNode = ReadWriteIOUtils.readBool(buffer); fragmentInstance.hostDataNode = hasHostDataNode ? ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer) : null; @@ -286,6 +300,7 @@ public ByteBuffer serializeToByteBuffer() { } ReadWriteIOUtils.write(type.ordinal(), outputStream); ReadWriteIOUtils.write(dataNodeFINum, outputStream); + ReadWriteIOUtils.write(parallelism, outputStream); ReadWriteIOUtils.write(debug, outputStream); ReadWriteIOUtils.write(verbose, outputStream); ReadWriteIOUtils.write(hostDataNode != null, outputStream); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java index 180b4b9f1be11..6de60bb867c31 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java @@ -664,6 +664,9 @@ public List visitTableScan(TableScanNode node, GraphContext context) { List boxValue = new ArrayList<>(); boxValue.add(node.toString()); boxValue.add(String.format("QualifiedTableName: %s", node.getQualifiedObjectName().toString())); + if (node.getAlias() != null) { + boxValue.add(String.format("Alias: %s", node.getAlias().getValue())); + } boxValue.add(String.format("OutputSymbols: %s", node.getOutputSymbols())); if (deviceTableScanNode != null) { @@ -751,6 +754,9 @@ public List visitAggregationTableScan( List boxValue = new ArrayList<>(); boxValue.add(node.toString()); boxValue.add(String.format("QualifiedTableName: %s", node.getQualifiedObjectName().toString())); + if (node.getAlias() != null) { + boxValue.add(String.format("Alias: %s", node.getAlias().getValue())); + } boxValue.add(String.format("OutputSymbols: %s", node.getOutputSymbols())); int i = 0; for (org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.Aggregation diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java index 5149945374319..f0f48166bcbfa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java @@ -70,6 +70,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WindowFrame; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.With; import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser; +import org.apache.iotdb.db.queryengine.plan.relational.utils.hint.Hint; import org.apache.iotdb.db.queryengine.plan.statement.component.FillPolicy; import com.google.common.collect.ArrayListMultimap; @@ -217,7 +218,7 @@ public class Analysis implements IAnalysis { private final Map, QualifiedName> relationNames = new LinkedHashMap<>(); - private final Set> aliasedRelations = new LinkedHashSet<>(); + private final Map, Identifier> aliasedRelations = new LinkedHashMap<>(); private final Map, TableFunctionInvocationAnalysis> tableFunctionAnalyses = new LinkedHashMap<>(); @@ -258,6 +259,9 @@ public class Analysis implements IAnalysis { private boolean isQuery = false; + // Hint map + private Map hintMap = new HashMap<>(); + // SqlParser is needed during query planning phase for executing uncorrelated scalar subqueries // in advance (predicate folding). The planner needs to parse and execute these subqueries // independently to utilize predicate pushdown optimization. @@ -276,6 +280,14 @@ public void updateNeedSetHighestPriority(QualifiedObjectName tableName) { needSetHighestPriority = InformationSchema.QUERIES.equals(tableName.getObjectName()); } + public void setHintMap(Map hintMap) { + this.hintMap = hintMap; + } + + public Map getHintMap() { + return hintMap; + } + public Map, Expression> getParameters() { return parameters; } @@ -858,12 +870,20 @@ public QualifiedName getRelationName(final Relation relation) { return relationNames.get(NodeRef.of(relation)); } - public void addAliased(final Relation relation) { - aliasedRelations.add(NodeRef.of(relation)); + public Map, QualifiedName> getRelationNames() { + return relationNames; + } + + public void addAliased(final Relation relation, Identifier alias) { + aliasedRelations.put(NodeRef.of(relation), alias); + } + + public Identifier getAliased(Relation relation) { + return aliasedRelations.get(NodeRef.of(relation)); } public boolean isAliased(Relation relation) { - return aliasedRelations.contains(NodeRef.of(relation)); + return aliasedRelations.containsKey(NodeRef.of(relation)); } public void addTableSchema( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java index dade85d20b1be..8ee48c066b7c3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java @@ -133,6 +133,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NullIfExpression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Offset; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.OrderBy; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ParallelHintItem; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Parameter; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.PatternRecognitionRelation; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.PipeEnriched; @@ -141,12 +142,15 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QuantifiedComparisonExpression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QuerySpecification; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RegionRouteHintItem; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Relation; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RenameColumn; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RenameTable; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ReplicaHintItem; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Row; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SearchedCaseExpression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Select; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SelectHint; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SelectItem; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SetOperation; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SetProperties; @@ -195,6 +199,10 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WrappedInsertStatement; import org.apache.iotdb.db.queryengine.plan.relational.type.CompatibleResolver; import org.apache.iotdb.db.queryengine.plan.relational.type.TypeManager; +import org.apache.iotdb.db.queryengine.plan.relational.utils.hint.Hint; +import org.apache.iotdb.db.queryengine.plan.relational.utils.hint.ParallelHint; +import org.apache.iotdb.db.queryengine.plan.relational.utils.hint.RegionRouteHint; +import org.apache.iotdb.db.queryengine.plan.relational.utils.hint.ReplicaHint; import org.apache.iotdb.db.queryengine.plan.statement.component.FillPolicy; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement; import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; @@ -1248,6 +1256,8 @@ protected Scope visitQuerySpecification(QuerySpecification node, Optional orderByScope.orElseThrow(() -> new NoSuchElementException("No value present"))); } + // select hint + analyzeHint(node, sourceScope); return outputScope; } @@ -1501,6 +1511,72 @@ private void analyzeWhere(Node node, Scope scope, Expression predicate) { analysis.setWhere(node, predicate); } + private void analyzeHint(QuerySpecification node, Scope scope) { + Optional selectHint = node.getSelectHint(); + selectHint.ifPresent(hint -> process(hint, scope)); + } + + @Override + public Scope visitSelectHint(SelectHint node, final Optional context) { + Map hintMap = new HashMap<>(); + for (Node hintItem : node.getHintItems()) { + Hint hint = processHintItem(hintItem); + if (hint != null) { + hintMap.putIfAbsent(hint.getKey(), hint); + } + } + analysis.setHintMap(hintMap); + return createAndAssignScope(node, context); + } + + private Hint processHintItem(Node hintItem) { + if (hintItem instanceof ReplicaHintItem) { + ReplicaHintItem item = (ReplicaHintItem) hintItem; + QualifiedName table = item.getTable(); + if (table == null) { + return new ReplicaHint(null, item.getReplicaIndex()); + } + QualifiedName resolvedTable = resolveTable(table); + if (resolvedTable != null) { + return new ReplicaHint(resolvedTable, item.getReplicaIndex()); + } + } else if (hintItem instanceof RegionRouteHintItem) { + RegionRouteHintItem item = (RegionRouteHintItem) hintItem; + QualifiedName table = item.getTable(); + if (table == null) { + return new RegionRouteHint(null, item.getRegionDatanodeMap()); + } + QualifiedName resolvedTable = resolveTable(table); + if (resolvedTable != null) { + return new RegionRouteHint(resolvedTable, item.getRegionDatanodeMap()); + } + } else if (hintItem instanceof ParallelHintItem) { + ParallelHintItem item = (ParallelHintItem) hintItem; + return new ParallelHint(item.getParallelism()); + } + return null; + } + + private QualifiedName resolveTable(QualifiedName table) { + List existingTables = + analysis.getRelationNames().values().stream().collect(toImmutableList()); + + // Alias + if (existingTables.contains(table)) { + return table; + } + + // Table + QualifiedObjectName tableObjectName = createQualifiedObjectName(sessionContext, table); + QualifiedName tableName = + QualifiedName.of(tableObjectName.getDatabaseName(), tableObjectName.getObjectName()); + + if (!existingTables.contains(tableName)) { + return null; + } + return tableName; + } + private List analyzeSelect(QuerySpecification node, Scope scope) { ImmutableList.Builder outputExpressionBuilder = ImmutableList.builder(); ImmutableList.Builder selectExpressionBuilder = @@ -3602,7 +3678,7 @@ protected Scope visitValues(Values node, Optional scope) { @Override protected Scope visitAliasedRelation(AliasedRelation relation, Optional scope) { analysis.setRelationName(relation, QualifiedName.of(ImmutableList.of(relation.getAlias()))); - analysis.addAliased(relation.getRelation()); + analysis.addAliased(relation.getRelation(), relation.getAlias()); Scope relationScope = process(relation.getRelation(), scope); RelationType relationType = relationScope.getRelationType(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java index 457a03ab4a012..03859d4499da7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java @@ -398,7 +398,8 @@ private RelationPlan processPhysicalTable(Table table, Scope scope) { qualifiedObjectName, outputSymbols, tableColumnSchema, - tagAndAttributeIndexMap); + tagAndAttributeIndexMap, + analysis.getAliased(table)); } return new RelationPlan(tableScanNode, scope, outputSymbols, outerContext); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java index c74f8bd5b4b25..af5b0bf6181b2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.commons.partition.DataPartition; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistribution; @@ -26,6 +28,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.read.TableDeviceSourceNode; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CopyToNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CteScanNode; @@ -36,6 +39,12 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.TableDeviceFetchNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.TableDeviceQueryCountNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.TableDeviceQueryScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.utils.hint.Hint; +import org.apache.iotdb.db.queryengine.plan.relational.utils.hint.RegionRouteHint; +import org.apache.iotdb.db.queryengine.plan.relational.utils.hint.ReplicaHint; + +import java.util.List; +import java.util.Map; import static org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistributionType.DIFFERENT_FROM_ALL_CHILDREN; import static org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistributionType.NO_CHILD; @@ -96,9 +105,43 @@ public PlanNode visitPlan(PlanNode node, TableDistributedPlanGenerator.PlanConte @Override public PlanNode visitTableScan( TableScanNode node, TableDistributedPlanGenerator.PlanContext context) { - context.nodeDistributionMap.put( - node.getPlanNodeId(), - new NodeDistribution(SAME_WITH_ALL_CHILDREN, node.getRegionReplicaSet())); + // Original region replica set + TRegionReplicaSet regionReplicaSet = node.getRegionReplicaSet(); + + // Determine optimized locations based on hint + List optimizedLocations = null; + + // Find region_route hint + RegionRouteHint regionRouteHint = + (RegionRouteHint) findHint(RegionRouteHint.HINT_NAME, node, context.hintMap); + if (regionRouteHint != null) { + optimizedLocations = + regionRouteHint.selectLocations( + regionReplicaSet.getDataNodeLocations(), regionReplicaSet.getRegionId().getId()); + } + + // Find replica hint + if (optimizedLocations == null) { + ReplicaHint replicaHint = + (ReplicaHint) findHint(ReplicaHint.HINT_NAME, node, context.hintMap); + if (replicaHint != null) { + optimizedLocations = replicaHint.selectLocations(regionReplicaSet.getDataNodeLocations()); + } + } + + if (optimizedLocations == null) { + context.nodeDistributionMap.put( + node.getPlanNodeId(), new NodeDistribution(SAME_WITH_ALL_CHILDREN, regionReplicaSet)); + } else { + // Create optimized region replica set + TRegionReplicaSet optimizedRegionReplicaSet = + new TRegionReplicaSet(regionReplicaSet.getRegionId(), optimizedLocations); + + context.nodeDistributionMap.put( + node.getPlanNodeId(), + new NodeDistribution(SAME_WITH_ALL_CHILDREN, optimizedRegionReplicaSet)); + } + return node; } @@ -229,4 +272,28 @@ private PlanNode processTableDeviceSourceNode( new NodeDistribution(SAME_WITH_ALL_CHILDREN, node.getRegionReplicaSet())); return node; } + + /** + * Finds the applicable replica hint for the given table scan node. First checks for + * table-specific hint, then falls back to global hint. + */ + private Hint findHint(String hintName, TableScanNode node, Map hintMap) { + if (hintMap == null || hintMap.isEmpty()) { + return null; + } + + String tableName = getTableName(node); + String tableSpecificKey = hintName + "-" + tableName; + + Hint hint = hintMap.get(tableSpecificKey); + return hint != null ? hint : hintMap.get(hintName); + } + + private String getTableName(TableScanNode node) { + if (node.getAlias() != null) { + return node.getAlias().getValue(); + } + QualifiedObjectName fullName = node.getQualifiedObjectName(); + return fullName.getDatabaseName() + "." + fullName.getObjectName(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java index 79ea52597bcee..d66938a85ea5c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java @@ -109,6 +109,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FunctionCall; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Insert; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SymbolReference; +import org.apache.iotdb.db.queryengine.plan.relational.utils.hint.Hint; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; import org.apache.iotdb.db.schemaengine.table.DataNodeTreeViewSchemaUtils; @@ -757,7 +758,8 @@ private List constructDeviceTableScanByTags( node.getPushDownLimit(), node.getPushDownOffset(), node.isPushLimitToEachDevice(), - node.containsNonAlignedDevice()); + node.containsNonAlignedDevice(), + node.getAlias()); scanNode.setRegionReplicaSet(regionReplicaSets.get(0)); return scanNode; }); @@ -843,7 +845,8 @@ private List constructDeviceTableScanByRegionReplicaSet( node.getPushDownLimit(), node.getPushDownOffset(), node.isPushLimitToEachDevice(), - node.containsNonAlignedDevice()); + node.containsNonAlignedDevice(), + node.getAlias()); scanNode.setRegionReplicaSet(regionReplicaSet); return scanNode; }); @@ -1724,7 +1727,8 @@ private void buildRegionNodeMap( partialAggTableScanNode.getGroupingSets(), partialAggTableScanNode.getPreGroupedSymbols(), partialAggTableScanNode.getStep(), - partialAggTableScanNode.getGroupIdSymbol()); + partialAggTableScanNode.getGroupIdSymbol(), + partialAggTableScanNode.getAlias()); scanNode.setRegionReplicaSet(regionReplicaSet); return scanNode; }); @@ -2213,6 +2217,7 @@ public List visitUnion(UnionNode node, PlanContext context) { public static class PlanContext { final Map nodeDistributionMap; + final Map hintMap; boolean hasExchangeNode = false; boolean hasSortProperty = false; boolean pushDownGrouping = false; @@ -2222,6 +2227,12 @@ public static class PlanContext { public PlanContext() { this.nodeDistributionMap = new HashMap<>(); + this.hintMap = new HashMap<>(); + } + + public PlanContext(Map hintMap) { + this.nodeDistributionMap = new HashMap<>(); + this.hintMap = hintMap; } public NodeDistribution getNodeDistribution(PlanNodeId nodeId) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java index ff7686fe86891..7ac985cbfbb6c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java @@ -45,6 +45,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.DistributedOptimizeFactory; import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PlanOptimizer; import org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager; +import org.apache.iotdb.db.queryengine.plan.relational.utils.hint.ParallelHint; import java.util.Collections; import java.util.HashMap; @@ -100,7 +101,7 @@ public TableDistributedPlanner( public DistributedQueryPlan plan() { TableDistributedPlanGenerator.PlanContext planContext = - new TableDistributedPlanGenerator.PlanContext(); + new TableDistributedPlanGenerator.PlanContext(analysis.getHintMap()); PlanNode outputNodeWithExchange = generateDistributedPlanWithOptimize(planContext); List planText = null; if (mppQueryContext.isExplain() && mppQueryContext.isInnerTriggeredQuery()) { @@ -130,8 +131,14 @@ public DistributedQueryPlan plan() { public PlanNode generateDistributedPlanWithOptimize( TableDistributedPlanGenerator.PlanContext planContext) { - // generate table model distributed plan + // set parallel hint + ParallelHint parallelHint = + (ParallelHint) planContext.hintMap.getOrDefault(ParallelHint.HINT_NAME, null); + if (parallelHint != null) { + mppQueryContext.setParallelism(parallelHint.getParallelism()); + } + // generate table model distributed plan List distributedPlanResult = new TableDistributedPlanGenerator( mppQueryContext, analysis, symbolAllocator, dataNodeLocationSupplier) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java index 6ad998a5a5335..7de271b4a669e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java @@ -187,6 +187,9 @@ private void produceFragmentInstance( fragment.isRoot(), queryContext.isVerbose()); + // set parallelism from MppQueryContext + fragmentInstance.setParallelism(queryContext.getParallelism()); + selectExecutorAndHost( fragment, fragmentInstance, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java index ffce0b6693e9d..08451ea7edb63 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java @@ -126,7 +126,8 @@ public static Optional pruneColumns(TableScanNode node, Set re deviceTableScanNode.getPushDownLimit(), deviceTableScanNode.getPushDownOffset(), deviceTableScanNode.isPushLimitToEachDevice(), - deviceTableScanNode.containsNonAlignedDevice())); + deviceTableScanNode.containsNonAlignedDevice(), + deviceTableScanNode.getAlias())); } } else if (node instanceof InformationSchemaTableScanNode) { // For the convenience of process in execution stage, column-prune for diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java index 56d39f2f77dcb..1928db3c73120 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java @@ -31,6 +31,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Identifier; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SymbolReference; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; @@ -148,6 +149,50 @@ public AggregationTableScanNode( this.setOutputSymbols(constructOutputSymbols(groupingSets, aggregations)); } + public AggregationTableScanNode( + PlanNodeId id, + QualifiedObjectName qualifiedObjectName, + List outputSymbols, + Map assignments, + List deviceEntries, + Map tagAndAttributeIndexMap, + Ordering scanOrder, + Expression timePredicate, + Expression pushDownPredicate, + long pushDownLimit, + long pushDownOffset, + boolean pushLimitToEachDevice, + boolean containsNonAlignedDevice, + Assignments projection, + Map aggregations, + AggregationNode.GroupingSetDescriptor groupingSets, + List preGroupedSymbols, + AggregationNode.Step step, + Optional groupIdSymbol, + Identifier alias) { + this( + id, + qualifiedObjectName, + outputSymbols, + assignments, + deviceEntries, + tagAndAttributeIndexMap, + scanOrder, + timePredicate, + pushDownPredicate, + pushDownLimit, + pushDownOffset, + pushLimitToEachDevice, + containsNonAlignedDevice, + projection, + aggregations, + groupingSets, + preGroupedSymbols, + step, + groupIdSymbol); + this.alias = alias; + } + protected AggregationTableScanNode() {} private static List constructOutputSymbols( @@ -278,7 +323,8 @@ public AggregationTableScanNode clone() { groupingSets, preGroupedSymbols, step, - groupIdSymbol); + groupIdSymbol, + alias); } @Override @@ -336,7 +382,8 @@ public static AggregationTableScanNode combineAggregationAndTableScan( aggregationNode.getGroupingSets(), aggregationNode.getPreGroupedSymbols(), aggregationNode.getStep(), - aggregationNode.getGroupIdSymbol()); + aggregationNode.getGroupIdSymbol(), + tableScanNode.getAlias()); } public static AggregationTableScanNode combineAggregationAndTableScan( @@ -390,7 +437,8 @@ public static AggregationTableScanNode combineAggregationAndTableScan( aggregationNode.getGroupingSets(), aggregationNode.getPreGroupedSymbols(), step, - aggregationNode.getGroupIdSymbol()); + aggregationNode.getGroupIdSymbol(), + tableScanNode.getAlias()); } public boolean mayUseLastCache() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/DeviceTableScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/DeviceTableScanNode.java index b91737739351a..44eef92451f95 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/DeviceTableScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/DeviceTableScanNode.java @@ -28,6 +28,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Identifier; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.tsfile.read.filter.basic.Filter; @@ -89,6 +90,18 @@ public DeviceTableScanNode( this.tagAndAttributeIndexMap = tagAndAttributeIndexMap; } + public DeviceTableScanNode( + PlanNodeId id, + QualifiedObjectName qualifiedObjectName, + List outputSymbols, + Map assignments, + Map tagAndAttributeIndexMap, + Identifier alias) { + super(id, qualifiedObjectName, outputSymbols, assignments); + this.tagAndAttributeIndexMap = tagAndAttributeIndexMap; + this.alias = alias; + } + public DeviceTableScanNode( PlanNodeId id, QualifiedObjectName qualifiedObjectName, @@ -120,6 +133,38 @@ public DeviceTableScanNode( this.containsNonAlignedDevice = containsNonAlignedDevice; } + public DeviceTableScanNode( + PlanNodeId id, + QualifiedObjectName qualifiedObjectName, + List outputSymbols, + Map assignments, + List deviceEntries, + Map tagAndAttributeIndexMap, + Ordering scanOrder, + Expression timePredicate, + Expression pushDownPredicate, + long pushDownLimit, + long pushDownOffset, + boolean pushLimitToEachDevice, + boolean containsNonAlignedDevice, + Identifier alias) { + this( + id, + qualifiedObjectName, + outputSymbols, + assignments, + deviceEntries, + tagAndAttributeIndexMap, + scanOrder, + timePredicate, + pushDownPredicate, + pushDownLimit, + pushDownOffset, + pushLimitToEachDevice, + containsNonAlignedDevice); + this.alias = alias; + } + @Override public R accept(PlanVisitor visitor, C context) { return visitor.visitDeviceTableScan(this, context); @@ -140,7 +185,8 @@ public DeviceTableScanNode clone() { pushDownLimit, pushDownOffset, pushLimitToEachDevice, - containsNonAlignedDevice); + containsNonAlignedDevice, + alias); } protected static void serializeMemberVariables( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java index 11807d9ceb499..c6bda2232621c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java @@ -31,6 +31,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Identifier; import com.google.common.collect.ImmutableList; import org.apache.tsfile.utils.ReadWriteIOUtils; @@ -75,6 +76,9 @@ public abstract class TableScanNode extends SourceNode { // For query of schemaInfo, we only need the list of DataNodeLocation protected TRegionReplicaSet regionReplicaSet; + // alias name + protected Identifier alias; + public TableScanNode( PlanNodeId id, QualifiedObjectName qualifiedObjectName, @@ -177,6 +181,10 @@ public void open() throws Exception {} @Override public void close() throws Exception {} + public Identifier getAlias() { + return alias; + } + public QualifiedObjectName getQualifiedObjectName() { return this.qualifiedObjectName; } @@ -255,6 +263,13 @@ protected static void serializeMemberVariables( ReadWriteIOUtils.write(node.pushDownLimit, byteBuffer); ReadWriteIOUtils.write(node.pushDownOffset, byteBuffer); + + if (node.alias != null) { + ReadWriteIOUtils.write(true, byteBuffer); + node.alias.serialize(byteBuffer); + } else { + ReadWriteIOUtils.write(false, byteBuffer); + } } protected static void serializeMemberVariables( @@ -290,6 +305,13 @@ protected static void serializeMemberVariables( ReadWriteIOUtils.write(node.pushDownLimit, stream); ReadWriteIOUtils.write(node.pushDownOffset, stream); + + if (node.alias != null) { + ReadWriteIOUtils.write(true, stream); + node.alias.serialize(stream); + } else { + ReadWriteIOUtils.write(false, stream); + } } protected static void deserializeMemberVariables( @@ -327,6 +349,11 @@ protected static void deserializeMemberVariables( node.pushDownLimit = ReadWriteIOUtils.readLong(byteBuffer); node.pushDownOffset = ReadWriteIOUtils.readLong(byteBuffer); + + boolean hasAlias = ReadWriteIOUtils.readBool(byteBuffer); + if (hasAlias) { + node.alias = new Identifier(byteBuffer); + } } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java index 24adb746df4e1..17ec744f58d8f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java @@ -238,7 +238,8 @@ public PlanAndMappings visitDeviceTableScan(DeviceTableScanNode node, UnaliasCon node.getPushDownLimit(), node.getPushDownOffset(), node.isPushLimitToEachDevice(), - node.containsNonAlignedDevice()), + node.containsNonAlignedDevice(), + node.getAlias()), mapping); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/Util.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/Util.java index 706b24a567b3b..27a8ea66bf67c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/Util.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/Util.java @@ -188,7 +188,8 @@ public static Pair split( node.getGroupingSets(), node.getPreGroupedSymbols(), PARTIAL, - node.getGroupIdSymbol()) + node.getGroupIdSymbol(), + node.getAlias()) : new AggregationTreeDeviceViewScanNode( queryId.genPlanNodeId(), node.getQualifiedObjectName(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java index 93cc5cf95a4d6..50c18668cd032 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java @@ -137,6 +137,22 @@ protected R visitSelect(Select node, C context) { return visitNode(node, context); } + protected R visitSelectHint(SelectHint node, C context) { + return visitNode(node, context); + } + + protected R visitReplicaHintItem(ReplicaHintItem node, C context) { + return visitNode(node, context); + } + + protected R visitRegionRouteHintItem(RegionRouteHintItem node, C context) { + return visitNode(node, context); + } + + protected R visitParallelHintItem(ParallelHintItem node, C context) { + return visitNode(node, context); + } + protected R visitRelation(Relation node, C context) { return visitNode(node, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ParallelHintItem.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ParallelHintItem.java new file mode 100644 index 0000000000000..3c37c84c1c309 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ParallelHintItem.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.sql.ast; + +import com.google.common.collect.ImmutableList; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.List; +import java.util.Objects; + +public class ParallelHintItem extends Node { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(ParallelHintItem.class); + + private static final String HINT_NAME_ITEM = "parallel"; + private final int parallelism; + + public ParallelHintItem(int parallelism) { + super(null); + this.parallelism = parallelism; + } + + public int getParallelism() { + return parallelism; + } + + @Override + public R accept(AstVisitor visitor, C context) { + return visitor.visitParallelHintItem(this, context); + } + + @Override + public List getChildren() { + return ImmutableList.of(); + } + + @Override + public int hashCode() { + return Objects.hash(parallelism); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + ParallelHintItem other = (ParallelHintItem) obj; + return this.parallelism == other.parallelism; + } + + @Override + public String toString() { + return HINT_NAME_ITEM + "(" + parallelism + ")"; + } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/QuerySpecification.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/QuerySpecification.java index 5de61a2dde2e2..9a4afb6c2a1f5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/QuerySpecification.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/QuerySpecification.java @@ -45,6 +45,8 @@ public class QuerySpecification extends QueryBody { private final Optional offset; private final Optional limit; + private final Optional selectHint; + public QuerySpecification( Select select, Optional from, @@ -55,8 +57,21 @@ public QuerySpecification( List windows, Optional orderBy, Optional offset, - Optional limit) { - this(null, select, from, where, groupBy, having, fill, windows, orderBy, offset, limit); + Optional limit, + Optional selectHint) { + this( + null, + select, + from, + where, + groupBy, + having, + fill, + windows, + orderBy, + offset, + limit, + selectHint); } public QuerySpecification( @@ -70,7 +85,8 @@ public QuerySpecification( List windows, Optional orderBy, Optional offset, - Optional limit) { + Optional limit, + Optional selectHint) { super(location); this.select = requireNonNull(select, "select is null"); @@ -83,6 +99,7 @@ public QuerySpecification( this.orderBy = requireNonNull(orderBy, "orderBy is null"); this.offset = requireNonNull(offset, "offset is null"); this.limit = requireNonNull(limit, "limit is null"); + this.selectHint = requireNonNull(selectHint, "hintMap is null"); } public Select getSelect() { @@ -125,6 +142,10 @@ public Optional getLimit() { return limit; } + public Optional getSelectHint() { + return selectHint; + } + @Override public R accept(AstVisitor visitor, C context) { return visitor.visitQuerySpecification(this, context); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RegionRouteHintItem.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RegionRouteHintItem.java new file mode 100644 index 0000000000000..c4c6838c46139 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/RegionRouteHintItem.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.sql.ast; + +import com.google.common.collect.ImmutableList; +import org.apache.tsfile.utils.RamUsageEstimator; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class RegionRouteHintItem extends Node { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(RegionRouteHintItem.class); + private static final String HINT_NAME_ITEM = "region_route"; + + private @Nullable final QualifiedName table; + private final Map regionDatanodeMap; + + public RegionRouteHintItem( + @Nullable QualifiedName table, Map regionDatanodeMap) { + super(null); + this.table = table; + this.regionDatanodeMap = regionDatanodeMap; + } + + public @Nullable QualifiedName getTable() { + return table; + } + + public Map getRegionDatanodeMap() { + return regionDatanodeMap; + } + + @Override + public R accept(AstVisitor visitor, C context) { + return visitor.visitRegionRouteHintItem(this, context); + } + + @Override + public List getChildren() { + return ImmutableList.of(); + } + + @Override + public int hashCode() { + return Objects.hash(table, regionDatanodeMap); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + RegionRouteHintItem other = (RegionRouteHintItem) obj; + return this.table.equals(other.table) && regionDatanodeMap.equals(other.regionDatanodeMap); + } + + @Override + public String toString() { + return HINT_NAME_ITEM + (table == null ? "" : "-" + table) + "(" + regionDatanodeMap + ")"; + } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + AstMemoryEstimationHelper.getEstimatedSizeOfAccountableObject(table) + + RamUsageEstimator.sizeOfMap(regionDatanodeMap); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ReplicaHintItem.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ReplicaHintItem.java new file mode 100644 index 0000000000000..87ff1666c8a81 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ReplicaHintItem.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.sql.ast; + +import com.google.common.collect.ImmutableList; +import org.apache.tsfile.utils.RamUsageEstimator; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Objects; + +public class ReplicaHintItem extends Node { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(ReplicaHintItem.class); + private static final String HINT_NAME_ITEM = "replica"; + + private @Nullable final QualifiedName table; + private final int replicaIndex; + + public ReplicaHintItem(@Nullable QualifiedName table, int replicaIndex) { + super(null); + this.table = table; + this.replicaIndex = replicaIndex; + } + + public @Nullable QualifiedName getTable() { + return table; + } + + public int getReplicaIndex() { + return replicaIndex; + } + + @Override + public R accept(AstVisitor visitor, C context) { + return visitor.visitReplicaHintItem(this, context); + } + + @Override + public List getChildren() { + return ImmutableList.of(); + } + + @Override + public int hashCode() { + return Objects.hash(table, replicaIndex); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + ReplicaHintItem other = (ReplicaHintItem) obj; + return this.table.equals(other.table) && this.replicaIndex == other.replicaIndex; + } + + @Override + public String toString() { + return HINT_NAME_ITEM + (table == null ? "" : "-" + table) + "(" + replicaIndex + ")"; + } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + AstMemoryEstimationHelper.getEstimatedSizeOfAccountableObject(table); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/SelectHint.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/SelectHint.java new file mode 100644 index 0000000000000..03dcc1e979a6d --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/SelectHint.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.sql.ast; + +import com.google.common.collect.ImmutableList; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.List; +import java.util.Objects; + +public class SelectHint extends Node { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(SelectHint.class); + + private final List hintItems; + + public SelectHint(List hintItems) { + super(null); + this.hintItems = ImmutableList.copyOf(hintItems); + } + + public List getHintItems() { + return hintItems; + } + + @Override + public List getChildren() { + return hintItems; + } + + @Override + public R accept(AstVisitor visitor, C context) { + return visitor.visitSelectHint(this, context); + } + + @Override + public int hashCode() { + return Objects.hash(hintItems); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + SelectHint other = (SelectHint) obj; + return Objects.equals(this.hintItems, other.hintItems); + } + + @Override + public String toString() { + if (hintItems == null || hintItems.isEmpty()) { + return ""; + } + + StringBuilder sb = new StringBuilder(); + sb.append("/*+ "); + + for (int i = 0; i < hintItems.size(); i++) { + if (i > 0) { + sb.append(" "); + } + sb.append(hintItems.get(i).toString()); + } + + sb.append(" */"); + return sb.toString(); + } + + @Override + public long ramBytesUsed() { + long size = INSTANCE_SIZE; + size += AstMemoryEstimationHelper.getEstimatedSizeOfNodeLocation(getLocationInternal()); + size += AstMemoryEstimationHelper.getEstimatedSizeOfNodeList(hintItems); + return size; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java index 9bd99771f53ba..10430cc024256 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java @@ -144,6 +144,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Offset; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.OneOrMoreQuantifier; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.OrderBy; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ParallelHintItem; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Parameter; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.PatternAlternation; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.PatternConcatenation; @@ -163,6 +164,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QuerySpecification; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RangeQuantifier; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ReconstructRegion; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RegionRouteHintItem; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Relation; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RelationalAuthorStatement; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RemoveAINode; @@ -171,10 +173,12 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RemoveRegion; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RenameColumn; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RenameTable; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ReplicaHintItem; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Row; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RowPattern; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SearchedCaseExpression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Select; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SelectHint; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SelectItem; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SetColumnComment; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SetConfiguration; @@ -2339,7 +2343,8 @@ public Node visitQueryNoWith(RelationalSqlParser.QueryNoWithContext ctx) { query.getWindows(), orderBy, offset, - limit), + limit, + query.getSelectHint()), Optional.empty(), Optional.empty(), Optional.empty(), @@ -2448,7 +2453,8 @@ public Node visitQuerySpecification(RelationalSqlParser.QuerySpecificationContex ctx.where, ctx.groupBy(), ctx.having, - ctx.windowDefinition()); + ctx.windowDefinition(), + ctx.selectHint()); } @Override @@ -2463,7 +2469,8 @@ public Node visitFromFirstQuerySpecification( ctx.where, ctx.groupBy(), ctx.having, - ctx.windowDefinition()); + ctx.windowDefinition(), + null); } private Node buildQuerySpecification( @@ -2475,7 +2482,8 @@ private Node buildQuerySpecification( RelationalSqlParser.BooleanExpressionContext where, RelationalSqlParser.GroupByContext groupBy, RelationalSqlParser.BooleanExpressionContext having, - List windowDefinitions) { + List windowDefinitions, + RelationalSqlParser.SelectHintContext selectHintContext) { Optional from = Optional.empty(); List selectItems = visit(selectItemContexts, SelectItem.class); @@ -2503,6 +2511,12 @@ private Node buildQuerySpecification( NodeLocation selectLocation = selectNode != null ? getLocation(selectNode) : getLocation(parserRuleContext); + // Hint Map + Optional selectHint = + selectHintContext != null + ? Optional.of((SelectHint) visitSelectHint(selectHintContext)) + : Optional.empty(); + return new QuerySpecification( getLocation(parserRuleContext), new Select(selectLocation, isDistinct(setQuantifier), selectItems), @@ -2514,7 +2528,8 @@ private Node buildQuerySpecification( visit(windowDefinitions, WindowDefinition.class), Optional.empty(), Optional.empty(), - Optional.empty()); + Optional.empty(), + selectHint); } @Override @@ -2543,6 +2558,50 @@ public Node visitSelectAll(RelationalSqlParser.SelectAllContext ctx) { } } + @Override + public Node visitSelectHint(RelationalSqlParser.SelectHintContext ctx) { + List hintItems = new ArrayList<>(); + for (RelationalSqlParser.HintItemContext hintItemCtx : ctx.hintItem()) { + hintItems.add(visit(hintItemCtx)); + } + return new SelectHint(hintItems); + } + + @Override + public Node visitReplicaHint(RelationalSqlParser.ReplicaHintContext ctx) { + QualifiedName table = ctx.tableName == null ? null : getQualifiedName(ctx.tableName); + int replicaIndex = Integer.parseInt(ctx.INTEGER_VALUE().getText()); + return new ReplicaHintItem(table, replicaIndex); + } + + @Override + public Node visitRegionRouteHint(RelationalSqlParser.RegionRouteHintContext ctx) { + QualifiedName table = ctx.tableName == null ? null : getQualifiedName(ctx.tableName); + Map regionDatanodeMap = new HashMap<>(); + try { + for (RelationalSqlParser.RegionRouteItemContext itemCtx : ctx.regionRoutes) { + int regionId = Integer.parseInt(itemCtx.regionId.getText()); + if (regionId < -1) { + throw parseError("Region ID must not be less than -1", ctx); + } + int datanodeId = Integer.parseInt(itemCtx.datanodeId.getText()); + if (datanodeId < 0) { + throw parseError("DataNode ID must not be less than 0", ctx); + } + regionDatanodeMap.putIfAbsent(regionId, datanodeId); + } + } catch (NumberFormatException e) { + throw parseError("Region ID and DataNode ID must be integers", ctx); + } + return new RegionRouteHintItem(table, regionDatanodeMap); + } + + @Override + public Node visitParallelHint(RelationalSqlParser.ParallelHintContext ctx) { + int parallelism = Integer.parseInt(ctx.INTEGER_VALUE().getText()); + return new ParallelHintItem(parallelism); + } + @Override public Node visitGroupBy(RelationalSqlParser.GroupByContext ctx) { return new GroupBy( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/QueryUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/QueryUtil.java index f0bd840453e2e..c04b0df5ac630 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/QueryUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/util/QueryUtil.java @@ -160,6 +160,7 @@ public static Query simpleQuery(Select select) { ImmutableList.of(), Optional.empty(), Optional.empty(), + Optional.empty(), Optional.empty())); } @@ -214,7 +215,8 @@ public static Query simpleQuery( ImmutableList.of(), orderBy, offset, - limit)); + limit, + Optional.empty())); } public static Query simpleQuery( @@ -239,7 +241,8 @@ public static Query simpleQuery( windows, orderBy, offset, - limit)); + limit, + Optional.empty())); } public static Query query(QueryBody body) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/utils/hint/Hint.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/utils/hint/Hint.java new file mode 100644 index 0000000000000..e64940510e2c8 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/utils/hint/Hint.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.utils.hint; + +import java.util.Objects; + +public abstract class Hint { + protected String hintName; + + protected Hint(String hintName) { + this.hintName = Objects.requireNonNull(hintName, "hintName can not be null"); + } + + public abstract String getKey(); + + public abstract String toString(); +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/utils/hint/ParallelHint.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/utils/hint/ParallelHint.java new file mode 100644 index 0000000000000..b478d887ba3c5 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/utils/hint/ParallelHint.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.utils.hint; + +public class ParallelHint extends Hint { + public static String HINT_NAME = "parallel"; + + private final int parallelism; + + public ParallelHint(int parallelism) { + super(HINT_NAME); + this.parallelism = parallelism; + } + + public int getParallelism() { + return parallelism; + } + + @Override + public String getKey() { + return HINT_NAME; + } + + @Override + public String toString() { + return HINT_NAME + "(" + parallelism + ")"; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/utils/hint/RegionRouteHint.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/utils/hint/RegionRouteHint.java new file mode 100644 index 0000000000000..37cbae4a6a33e --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/utils/hint/RegionRouteHint.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.utils.hint; + +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QualifiedName; + +import com.google.common.collect.ImmutableList; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Map; + +public class RegionRouteHint extends Hint { + public static String HINT_NAME = "region_route"; + public static int ANY_TABLE = -1; + + private @Nullable final QualifiedName table; + private final Map regionDatanodeMap; + + public RegionRouteHint(@Nullable QualifiedName table, Map regionDatanodeMa) { + super(HINT_NAME); + this.table = table; + this.regionDatanodeMap = regionDatanodeMa; + } + + @Override + public String getKey() { + return HINT_NAME + (table == null ? "" : "-" + table); + } + + @Override + public String toString() { + return HINT_NAME + (table == null ? "" : "-" + table) + "(" + regionDatanodeMap + ")"; + } + + /** + * Selects data node locations based on the region_route strategy. + * + * @param dataNodeLocations the available data node locations + * @param regionId the region ID to route + * @return the selected locations based on region_route hint strategy, or null if no match + */ + public List selectLocations( + List dataNodeLocations, int regionId) { + if (dataNodeLocations == null || dataNodeLocations.isEmpty()) { + return null; + } + + Integer datanodeId = regionDatanodeMap.getOrDefault(regionId, regionDatanodeMap.get(ANY_TABLE)); + if (datanodeId == null) { + return null; + } + + return dataNodeLocations.stream() + .filter(location -> location.getDataNodeId() == datanodeId) + .findFirst() + .map(ImmutableList::of) + .orElse(null); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/utils/hint/ReplicaHint.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/utils/hint/ReplicaHint.java new file mode 100644 index 0000000000000..b2b9dfeedaea1 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/utils/hint/ReplicaHint.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.utils.hint; + +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QualifiedName; + +import com.google.common.collect.ImmutableList; + +import javax.annotation.Nullable; + +import java.util.List; + +public class ReplicaHint extends Hint { + public static String HINT_NAME = "replica"; + + private @Nullable final QualifiedName table; + private final int replicaIndex; + + public ReplicaHint(@Nullable QualifiedName table, int replicaIndex) { + super(HINT_NAME); + this.table = table; + this.replicaIndex = replicaIndex; + } + + @Override + public String getKey() { + return HINT_NAME + (table == null ? "" : "-" + table); + } + + @Override + public String toString() { + return HINT_NAME + (table == null ? "" : "-" + table) + "(" + replicaIndex + ")"; + } + + /** + * Selects data node locations based on the replica strategy. + * + * @param dataNodeLocations the available data node locations + * @return the selected locations based on replica hint strategy + */ + public List selectLocations(List dataNodeLocations) { + if (dataNodeLocations == null || dataNodeLocations.isEmpty()) { + return null; + } + return ImmutableList.of(dataNodeLocations.get(replicaIndex % dataNodeLocations.size())); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestPlanBuilder.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestPlanBuilder.java index 4b0136d892327..8422834c5cb3e 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestPlanBuilder.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestPlanBuilder.java @@ -101,7 +101,8 @@ public TestPlanBuilder deviceTableScan( pushDownLimit, pushDownOffset, pushLimitToEachDevice, - containsNonAlignedDevice); + containsNonAlignedDevice, + null); return this; } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/PlanTester.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/PlanTester.java index 968b0d28de9fa..296085bae0046 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/PlanTester.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/PlanTester.java @@ -29,6 +29,7 @@ import org.apache.iotdb.db.queryengine.common.SessionInfo; import org.apache.iotdb.db.queryengine.execution.warnings.WarningCollector; import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan; +import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis; @@ -238,4 +239,14 @@ public PlanNode getFragmentPlan(int index) { } return distributedQueryPlan.getFragments().get(index).getPlanNodeTree().getChildren().get(0); } + + public FragmentInstance getFragmentInstance(int index) { + if (distributedQueryPlan == null) { + distributedQueryPlan = + new TableDistributedPlanner( + analysis, symbolAllocator, plan, metadata, dataNodeLocationSupplier) + .plan(); + } + return distributedQueryPlan.getInstances().get(index); + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ReplicaHintPlannerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ReplicaHintPlannerTest.java new file mode 100644 index 0000000000000..82261198c1c98 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ReplicaHintPlannerTest.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner; + +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.ParsingException; + +import com.google.common.collect.ImmutableMap; +import org.junit.Test; + +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class ReplicaHintPlannerTest extends PlanTester { + + @Test + public void testReplicaHint() { + String sql = "SELECT /*+ REPLICA(0) */ * FROM table1"; + createPlan(sql); + for (int fragment = 1; fragment <= 3; fragment++) { + verifyReplica(fragment, ImmutableMap.of("table1", 0)); + } + } + + @Test + public void testReplicaHintWithNegativeIndex() { + try { + String sql = "SELECT /*+ REPLICA(-1) */ * FROM table1"; + createPlan(sql); + } catch (ParsingException ex) { + assertTrue( + ex.getMessage().contains("mismatched input '-'. Expecting: , ")); + } + } + + @Test + public void testReplicaHintWithTable() { + String sql = "SELECT /*+ REPLICA(table1, 1) */ * FROM table1"; + createPlan(sql); + for (int fragment = 1; fragment <= 3; fragment++) { + verifyReplica(fragment, ImmutableMap.of("table1", 1)); + } + } + + @Test + public void testReplicaHintWithDatabasePrefix() { + String sql = "SELECT /*+ REPLICA(testdb.table1, 0) */ * FROM table1"; + createPlan(sql); + for (int fragment = 1; fragment <= 3; fragment++) { + verifyReplica(fragment, ImmutableMap.of("table1", 0)); + } + } + + @Test + public void testReplicaHintWithAlias() { + String sql = "SELECT /*+ REPLICA(t, 1) */ * FROM table1 as t"; + createPlan(sql); + for (int fragment = 1; fragment <= 3; fragment++) { + verifyReplica(fragment, ImmutableMap.of("t", 1)); + } + } + + @Test + public void testReplicaHintWithConflict() { + String sql = "SELECT /*+ REPLICA(1) REPLICA(0) */ * FROM table1"; + createPlan(sql); + for (int fragment = 1; fragment <= 3; fragment++) { + verifyReplica(fragment, ImmutableMap.of("table1", 1)); + } + } + + @Test + public void testReplicaHintWithAgg() { + String sql = "SELECT /*+ REPLICA(0) */ tag1, avg(s1) FROM table1 GROUP BY tag1"; + createPlan(sql); + for (int fragment = 1; fragment <= 3; fragment++) { + verifyReplica(fragment, ImmutableMap.of("table1", 0)); + } + } + + @Test + public void testRegionRouteHintWithNegativeRegion() { + String sql = "SELECT /*+ REGION_ROUTE((-1, 1)) */ * FROM table1"; + createPlan(sql); + for (int fragment = 1; fragment <= 3; fragment++) { + verifyRegionRoute(fragment, ImmutableMap.of(10, 1, 12, 1)); + } + } + + @Test + public void testRegionRouteHintWithConflict() { + String sql = "SELECT /*+ REGION_ROUTE(table1, (10, 1), (10, 2), (11, 3)) */ * FROM table1"; + createPlan(sql); + for (int fragment = 1; fragment <= 3; fragment++) { + verifyRegionRoute(fragment, ImmutableMap.of(10, 1, 11, 3)); + } + } + + @Test + public void testHintPriority1() { + String sql = + "SELECT /*+ REPLICA(2) REPLICA(table1, 2) REGION_ROUTE((-1, 2), (10, 2)) REGION_ROUTE(table1, (-1, 2), (10, 1)) */ * FROM table1"; + createPlan(sql); + for (int fragment = 1; fragment <= 3; fragment++) { + verifyRegionRoute(fragment, ImmutableMap.of(10, 1)); + } + } + + @Test + public void testHintPriority2() { + String sql = + "SELECT /*+ REPLICA(2) REPLICA(table1, 2) REGION_ROUTE((-1, 2), (10, 2)) REGION_ROUTE(table1, (-1, 1)) */ * FROM table1"; + createPlan(sql); + for (int fragment = 1; fragment <= 3; fragment++) { + verifyRegionRoute(fragment, ImmutableMap.of(10, 1)); + } + } + + @Test + public void testHintPriority3() { + String sql = + "SELECT /*+ REPLICA(2) REPLICA(table1, 2) REGION_ROUTE((-1, 2), (10, 1)) */ * FROM table1"; + createPlan(sql); + for (int fragment = 1; fragment <= 3; fragment++) { + verifyRegionRoute(fragment, ImmutableMap.of(10, 1)); + } + } + + @Test + public void testHintPriority4() { + String sql = "SELECT /*+ REPLICA(2) REPLICA(table1, 2) REGION_ROUTE((-1, 1)) */ * FROM table1"; + createPlan(sql); + for (int fragment = 1; fragment <= 3; fragment++) { + verifyRegionRoute(fragment, ImmutableMap.of(10, 1)); + } + } + + @Test + public void testHintPriority5() { + String sql = "SELECT /*+ REPLICA(2) REPLICA(table1, 1) */ * FROM table1"; + createPlan(sql); + for (int fragment = 1; fragment <= 3; fragment++) { + verifyReplica(fragment, ImmutableMap.of("table1", 1)); + } + } + + @Test + public void testHintPriority6() { + String sql = "SELECT /*+ REPLICA(1) */ * FROM table1"; + createPlan(sql); + for (int fragment = 1; fragment <= 3; fragment++) { + verifyReplica(fragment, ImmutableMap.of("table1", 1)); + } + } + + @Test + public void testParallelHint() { + String sql = "SELECT /*+ PARALLEL(5) */ * FROM table1"; + createPlan(sql); + for (int fragment = 1; fragment <= 3; fragment++) { + FragmentInstance fragmentInstance = getFragmentInstance(fragment); + assertEquals(5, fragmentInstance.getParallelism()); + } + } + + private void verifyReplica(int fragment, Map tableToReplica) { + FragmentInstance fragmentInstance = getFragmentInstance(fragment); + PlanNode fragmentPlan = getFragmentPlan(fragment); + + TableScanNode tableScanNode = null; + if (fragmentPlan instanceof AggregationNode) { + tableScanNode = (TableScanNode) fragmentPlan.getChildren().get(0); + } else if (fragmentPlan instanceof TableScanNode) { + tableScanNode = (TableScanNode) fragmentPlan; + } + if (tableScanNode == null) { + fail("tableScanNode must not be null"); + } + + assertEquals( + tableScanNode.getRegionReplicaSet().getRegionId().getId(), + fragmentInstance.getRegionReplicaSet().getRegionId().getId()); + + String tableName = + tableScanNode.getAlias() != null + ? tableScanNode.getAlias().getValue() + : tableScanNode.getQualifiedObjectName().getObjectName(); + + List replicaNodes = + tableScanNode.getRegionReplicaSet().getDataNodeLocations(); + TDataNodeLocation queryNode = fragmentInstance.getHostDataNode(); + + assertEquals(2, replicaNodes.size()); + assertEquals( + queryNode.getDataNodeId(), replicaNodes.get(tableToReplica.get(tableName)).getDataNodeId()); + } + + private void verifyRegionRoute(int fragment, Map regionToDatanode) { + FragmentInstance fragmentInstance = getFragmentInstance(fragment); + TableScanNode fragmentPlan = (TableScanNode) getFragmentPlan(fragment); + + // Verify region IDs match + int regionId = fragmentPlan.getRegionReplicaSet().getRegionId().getId(); + assertEquals(regionId, fragmentInstance.getRegionReplicaSet().getRegionId().getId()); + + List replicaNodes = + fragmentPlan.getRegionReplicaSet().getDataNodeLocations(); + TDataNodeLocation queryNode = fragmentInstance.getHostDataNode(); + + assertEquals(2, replicaNodes.size()); + + if (regionToDatanode.containsKey(regionId)) { + int expectedDatanodeId = regionToDatanode.get(regionId); + assertEquals(expectedDatanodeId, queryNode.getDataNodeId()); + boolean datanodeExists = + replicaNodes.stream().anyMatch(dn -> dn.getDataNodeId() == expectedDatanodeId); + assertTrue(datanodeExists); + } + } +} diff --git a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 index ca76646468677..0bb86f8485cd3 100644 --- a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 +++ b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 @@ -1026,7 +1026,7 @@ sortItem ; querySpecification - : SELECT setQuantifier? selectItem (',' selectItem)* + : SELECT selectHint? setQuantifier? selectItem (',' selectItem)* (FROM relation (',' relation)*)? (WHERE where=booleanExpression)? (GROUP BY groupBy)? @@ -1116,6 +1116,32 @@ joinCriteria | USING '(' identifier (',' identifier)* ')' ; +selectHint + : HINT_START hintItem+ HINT_END + ; + +hintItem + : replicaHint + | regionRouteHint + | parallelHint + ; + +replicaHint + : REPLICA '(' (tableName=qualifiedName ',')? INTEGER_VALUE ')' + ; + +regionRouteHint + : REGION_ROUTE '(' (tableName=qualifiedName ',')? regionRoutes+=regionRouteItem (',' regionRoutes+=regionRouteItem)* ')' + ; + +regionRouteItem + : '(' regionId=number ',' datanodeId=number ')' + ; + +parallelHint + : PARALLEL '(' INTEGER_VALUE ')' + ; + patternRecognition : aliasedRelation ( MATCH_RECOGNIZE '(' @@ -1508,6 +1534,7 @@ nonReserved | WEEK | WHILE | WINDOW | WITHIN | WITHOUT | WORK | WRAPPER | WRITE | YEAR | ZONE + | HINT_START | HINT_END ; ABSENT: 'ABSENT'; @@ -1920,6 +1947,9 @@ WRITE: 'WRITE'; YEAR: 'YEAR' | 'Y'; ZONE: 'ZONE'; AUDIT: 'AUDIT'; +REPLICA: 'REPLICA'; +REGION_ROUTE: 'REGION_ROUTE'; +PARALLEL: 'PARALLEL'; AT_SIGN: '@'; EQ: '='; @@ -1938,6 +1968,8 @@ CONCAT: '||'; QUESTION_MARK: '?'; SEMICOLON: ';'; +HINT_START: '/*+'; +HINT_END: '*/'; STRING : '\'' ( ~'\'' | '\'\'' )* '\'' @@ -2034,9 +2066,12 @@ SIMPLE_COMMENT ; BRACKETED_COMMENT - : '/*' .*? '*/' -> channel(HIDDEN) + : '/*' (~[+] .*?)? '*/' -> channel(HIDDEN) ; +EMPTY_HINT + : '/*+' [ \r\n\t]* '*/' -> channel(HIDDEN) + ; WS : [ \r\n\t]+ -> channel(HIDDEN) ;