Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
a148dbb
simple Leader & Follower hint
shizy818 Nov 12, 2025
f5a07fe
Leader & Follower hint improvement
shizy818 Nov 12, 2025
42ff254
follower & leader refactor
shizy818 Nov 13, 2025
892afc0
add ReplicaHint
shizy818 Nov 18, 2025
553d16a
add alias support
shizy818 Nov 18, 2025
a9ad438
fix alias serialize
shizy818 Nov 18, 2025
ee57020
Leader & Follower Hint without parameters
shizy818 Nov 19, 2025
c9e930c
hintParameters
shizy818 Nov 19, 2025
b0630fb
fix: rebase master
shizy818 Dec 23, 2025
26d9da8
leading hint
shizy818 Jan 9, 2026
90f82c3
EquiJoinClause part 1
shizy818 Jan 21, 2026
812570e
left & right tables for JoinNode
shizy818 Jan 21, 2026
e0380a5
Update PushPredicateIntoTableScan
shizy818 Jan 21, 2026
97f06c9
simple inner join reorder
shizy818 Jan 22, 2026
5728188
left & right outer join
shizy818 Feb 9, 2026
136e04d
fix leading hint
shizy818 Feb 10, 2026
1f63575
fix leader/follower bug
shizy818 Feb 24, 2026
573f8b4
handle filter & sort & mergesort node for leading hint
shizy818 Feb 25, 2026
78c4726
EquiJoinClause left & right table
shizy818 Feb 25, 2026
b63c2d1
leading hint for asof join
shizy818 Feb 26, 2026
295606f
fix some issues
shizy818 Mar 5, 2026
54c1acc
code change
shizy818 Mar 6, 2026
8cef1d4
revert conditionJoinType
shizy818 Mar 6, 2026
bb03219
remove leading hint
shizy818 Mar 14, 2026
cec41ca
remove leading hint 2
shizy818 Mar 14, 2026
8c1d37c
refactor leader/follower hint
shizy818 Mar 19, 2026
035d010
parallel hint implementation
shizy818 Mar 19, 2026
06db278
rebase master
shizy818 Apr 17, 2026
9a2a665
change to replica & region_route hint
shizy818 Apr 20, 2026
fd8dc66
add test cases
shizy818 Apr 21, 2026
bd1dc03
invalid table does not throw SemanticException
shizy818 Apr 21, 2026
55f9384
add nullable annotation
shizy818 Apr 21, 2026
2bccb24
add aggregation test
shizy818 Apr 21, 2026
4bc8764
revert some code
shizy818 Apr 21, 2026
c7c76a7
code format issue
shizy818 Apr 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.relational.it.query.recent;

import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.TableClusterIT;
import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
import org.apache.iotdb.itbase.env.BaseEnv;

import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Locale;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

@RunWith(IoTDBTestRunner.class)
@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
public class IoTDBHintIT {
private static final String DATABASE_NAME = "testdb";

private static final String[] creationSqls =
new String[] {
"CREATE DATABASE IF NOT EXISTS testdb",
"USE testdb",
"CREATE TABLE IF NOT EXISTS testtb(voltage FLOAT FIELD, manufacturer STRING FIELD, deviceid STRING TAG)",
"INSERT INTO testtb VALUES(1000, 100.0, 'a', 'd1')",
"INSERT INTO testtb VALUES(2000, 200.0, 'b', 'd1')",
"INSERT INTO testtb VALUES(1000, 300.0, 'c', 'd2')",
};

private static final String dropDbSqls = "DROP DATABASE IF EXISTS testdb";

@BeforeClass
public static void setUpClass() {
Locale.setDefault(Locale.ENGLISH);

EnvFactory.getEnv()
.getConfig()
.getCommonConfig()
.setPartitionInterval(1000)
.setMemtableSizeThreshold(10000);
EnvFactory.getEnv().initClusterEnvironment();
}

@AfterClass
public static void tearDownClass() {
EnvFactory.getEnv().cleanClusterEnvironment();
}

@Before
public void setUp() {
prepareData();
}

@After
public void tearDown() {
try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
Statement statement = connection.createStatement()) {
statement.execute(dropDbSqls);
} catch (Exception e) {
fail(e.getMessage());
}
}

@Test
public void testReplicaHintWithInvalidTable() {
try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
Statement statement = connection.createStatement()) {
statement.execute("use " + DATABASE_NAME);
String sql = "SELECT /*+ REPLICA(t1, 2) */ * FROM testtb";
statement.executeQuery(sql);
} catch (Exception e) {
fail(e.getMessage());
}
}

@Test
public void testReplicaHintWithSystemTableQuery() {
try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
Statement statement = connection.createStatement()) {
String sql = "SELECT /*+ REPLICA(0) */ * FROM information_schema.tables";
statement.executeQuery(sql);
} catch (Exception e) {
fail(e.getMessage());
}
}

@Test
public void testReplicaHintWithCTE() {
try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
Statement statement = connection.createStatement()) {
statement.execute("use " + DATABASE_NAME);
// REPLICA hint in CTE definition
String sql1 =
"WITH cte1 AS (SELECT /*+ REPLICA(testtb,0) */ * FROM testtb) SELECT * FROM cte1";
statement.executeQuery(sql1);
// REPLICA hint in materialized CTE definition
String sql2 =
"WITH cte1 AS materialized (SELECT /*+ REPLICA(testtb,0) */ * FROM testtb) SELECT * FROM cte1";
statement.executeQuery(sql2);
// REPLICA hint in main query
String sql3 =
"WITH cte1 AS (SELECT * FROM testtb) SELECT /*+ REPLICA(testtb, 0) */ * FROM cte1";
statement.executeQuery(sql3);
} catch (Exception e) {
fail(e.getMessage());
}
}

@Test
public void testReplicaHintWithExplain() {
try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
Statement statement = connection.createStatement()) {
statement.execute("use " + DATABASE_NAME);
String sql = "EXPLAIN SELECT /*+ REPLICA(0) */ * FROM testtb";
statement.executeQuery(sql);
} catch (Exception e) {
fail(e.getMessage());
}
}

@Test
public void testReplicaHintWithJoin() {
try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
Statement statement = connection.createStatement()) {
statement.execute("use " + DATABASE_NAME);
String sql =
"SELECT /*+ REPLICA(a, 0) REPLICA(b, 1) */ * FROM testtb as a INNER JOIN testtb as b ON a.voltage = b.voltage";
ResultSet rs = statement.executeQuery(sql);
int count = 0;
while (rs.next()) {
count++;
}
assertEquals("Expected 3 rows from the join query", 3, count);
} catch (Exception e) {
fail(e.getMessage());
}
}

private static void prepareData() {
try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
Statement statement = connection.createStatement()) {

for (String sql : creationSqls) {
statement.execute(sql);
}
} catch (Exception e) {
fail(e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ public enum ExplainType {
// Tables in the subquery
private final Map<NodeRef<Query>, List<Identifier>> subQueryTables = new HashMap<>();

// parallel hint
private int parallelism = 0;

@TestOnly
public MPPQueryContext(QueryId queryId) {
this.queryId = queryId;
Expand Down Expand Up @@ -674,5 +677,13 @@ public IAuditEntity setSqlString(String sqlString) {
return this;
}

public int getParallelism() {
return parallelism;
}

public void setParallelism(int parallelism) {
this.parallelism = parallelism;
}

// ================= Authentication Interfaces =========================
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ public class FragmentInstanceContext extends QueryContext {
private long closedSeqFileNum = 0;
private long closedUnseqFileNum = 0;

// parallel hint
private int parallelism = 0;

public static FragmentInstanceContext createFragmentInstanceContext(
FragmentInstanceId id,
FragmentInstanceStateMachine stateMachine,
Expand Down Expand Up @@ -1193,4 +1196,12 @@ public boolean ignoreNotExistsDevice() {
public boolean isSingleSourcePath() {
return singleSourcePath;
}

public int getParallelism() {
return parallelism;
}

public void setParallelism(int parallelism) {
this.parallelism = parallelism;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ public FragmentInstanceInfo execDataQueryFragmentInstance(
instance.isDebug(),
instance.isVerbose()));

// set parallelism from fragment instance
context.setParallelism(instance.getParallelism());

try {
List<PipelineDriverFactory> driverFactories =
planner.plan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public StatementMemorySource visitExplain(

// Generate table model distributed plan
final TableDistributedPlanGenerator.PlanContext planContext =
new TableDistributedPlanGenerator.PlanContext();
new TableDistributedPlanGenerator.PlanContext(context.getAnalysis().getHintMap());
final PlanNode outputNodeWithExchange =
new TableDistributedPlanner(
context.getAnalysis(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ public class FragmentInstance implements IConsensusRequest {

private boolean isHighestPriority;

// parallel hint
private int parallelism = 0;

// indicate which index we are retrying
private transient int nextRetryIndex = 0;

Expand Down Expand Up @@ -101,6 +104,7 @@ public FragmentInstance(
this.timeOut = timeOut > 0 ? timeOut : CONFIG.getQueryTimeoutThreshold();
this.isRoot = false;
this.sessionInfo = sessionInfo;
this.parallelism = 0;
this.debug = debug;
this.verbose = verbose;
}
Expand Down Expand Up @@ -213,6 +217,14 @@ public void setDataNodeFINum(int dataNodeFINum) {
this.dataNodeFINum = dataNodeFINum;
}

public int getParallelism() {
return parallelism;
}

public void setParallelism(int parallelism) {
this.parallelism = parallelism;
}

public boolean isDebug() {
return debug;
}
Expand Down Expand Up @@ -250,6 +262,7 @@ public static FragmentInstance deserializeFrom(ByteBuffer buffer) {
TimePredicate globalTimePredicate = hasTimePredicate ? TimePredicate.deserialize(buffer) : null;
QueryType queryType = QueryType.values()[ReadWriteIOUtils.readInt(buffer)];
int dataNodeFINum = ReadWriteIOUtils.readInt(buffer);
int parallelism = ReadWriteIOUtils.readInt(buffer);
boolean debug = ReadWriteIOUtils.readBool(buffer);
boolean verbose = ReadWriteIOUtils.readBool(buffer);
FragmentInstance fragmentInstance =
Expand All @@ -263,6 +276,7 @@ public static FragmentInstance deserializeFrom(ByteBuffer buffer) {
dataNodeFINum,
debug,
verbose);
fragmentInstance.setParallelism(parallelism);
boolean hasHostDataNode = ReadWriteIOUtils.readBool(buffer);
fragmentInstance.hostDataNode =
hasHostDataNode ? ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer) : null;
Expand All @@ -286,6 +300,7 @@ public ByteBuffer serializeToByteBuffer() {
}
ReadWriteIOUtils.write(type.ordinal(), outputStream);
ReadWriteIOUtils.write(dataNodeFINum, outputStream);
ReadWriteIOUtils.write(parallelism, outputStream);
ReadWriteIOUtils.write(debug, outputStream);
ReadWriteIOUtils.write(verbose, outputStream);
ReadWriteIOUtils.write(hostDataNode != null, outputStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,9 @@ public List<String> visitTableScan(TableScanNode node, GraphContext context) {
List<String> boxValue = new ArrayList<>();
boxValue.add(node.toString());
boxValue.add(String.format("QualifiedTableName: %s", node.getQualifiedObjectName().toString()));
if (node.getAlias() != null) {
boxValue.add(String.format("Alias: %s", node.getAlias().getValue()));
}
boxValue.add(String.format("OutputSymbols: %s", node.getOutputSymbols()));

if (deviceTableScanNode != null) {
Expand Down Expand Up @@ -751,6 +754,9 @@ public List<String> visitAggregationTableScan(
List<String> boxValue = new ArrayList<>();
boxValue.add(node.toString());
boxValue.add(String.format("QualifiedTableName: %s", node.getQualifiedObjectName().toString()));
if (node.getAlias() != null) {
boxValue.add(String.format("Alias: %s", node.getAlias().getValue()));
}
boxValue.add(String.format("OutputSymbols: %s", node.getOutputSymbols()));
int i = 0;
for (org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.Aggregation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WindowFrame;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.With;
import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
import org.apache.iotdb.db.queryengine.plan.relational.utils.hint.Hint;
import org.apache.iotdb.db.queryengine.plan.statement.component.FillPolicy;

import com.google.common.collect.ArrayListMultimap;
Expand Down Expand Up @@ -217,7 +218,7 @@ public class Analysis implements IAnalysis {

private final Map<NodeRef<Relation>, QualifiedName> relationNames = new LinkedHashMap<>();

private final Set<NodeRef<Relation>> aliasedRelations = new LinkedHashSet<>();
private final Map<NodeRef<Relation>, Identifier> aliasedRelations = new LinkedHashMap<>();

private final Map<NodeRef<TableFunctionInvocation>, TableFunctionInvocationAnalysis>
tableFunctionAnalyses = new LinkedHashMap<>();
Expand Down Expand Up @@ -258,6 +259,9 @@ public class Analysis implements IAnalysis {

private boolean isQuery = false;

// Hint map
private Map<String, Hint> hintMap = new HashMap<>();

// SqlParser is needed during query planning phase for executing uncorrelated scalar subqueries
// in advance (predicate folding). The planner needs to parse and execute these subqueries
// independently to utilize predicate pushdown optimization.
Expand All @@ -276,6 +280,14 @@ public void updateNeedSetHighestPriority(QualifiedObjectName tableName) {
needSetHighestPriority = InformationSchema.QUERIES.equals(tableName.getObjectName());
}

public void setHintMap(Map<String, Hint> hintMap) {
this.hintMap = hintMap;
}

public Map<String, Hint> getHintMap() {
return hintMap;
}

public Map<NodeRef<Parameter>, Expression> getParameters() {
return parameters;
}
Expand Down Expand Up @@ -858,12 +870,20 @@ public QualifiedName getRelationName(final Relation relation) {
return relationNames.get(NodeRef.of(relation));
}

public void addAliased(final Relation relation) {
aliasedRelations.add(NodeRef.of(relation));
public Map<NodeRef<Relation>, QualifiedName> getRelationNames() {
return relationNames;
}

public void addAliased(final Relation relation, Identifier alias) {
aliasedRelations.put(NodeRef.of(relation), alias);
}

public Identifier getAliased(Relation relation) {
return aliasedRelations.get(NodeRef.of(relation));
}

public boolean isAliased(Relation relation) {
return aliasedRelations.contains(NodeRef.of(relation));
return aliasedRelations.containsKey(NodeRef.of(relation));
}

public void addTableSchema(
Expand Down
Loading