From b1d1722d29c104233573c4981a30747261589bea Mon Sep 17 00:00:00 2001 From: Weihao Li <60659567+Wei-hao-Li@users.noreply.github.com> Date: Tue, 21 Apr 2026 15:45:20 +0800 Subject: [PATCH 1/2] Enable show queries to be executed immediately when the available memory in the memoryPool is insufficient (#17507) (cherry picked from commit 022e11332e6e5e8e6500e497357b2fd4f5949c7e) --- .../queryengine/common/MPPQueryContext.java | 13 ++ .../exchange/MPPDataExchangeManager.java | 56 ++++++++- .../exchange/SharedTsBlockQueue.java | 57 ++++++--- .../execution/exchange/sink/SinkChannel.java | 62 ++++++++-- .../exchange/source/SourceHandle.java | 90 ++++++++++---- .../fragment/FragmentInstanceContext.java | 13 ++ .../fragment/FragmentInstanceManager.java | 2 + .../execution/memory/MemoryPool.java | 56 +++++++-- .../db/queryengine/plan/analyze/Analysis.java | 2 +- .../plan/execution/QueryExecution.java | 7 +- .../plan/planner/OperatorTreeGenerator.java | 6 +- .../plan/planner/plan/FragmentInstance.java | 2 + .../exchange/LocalSinkChannelTest.java | 10 +- .../exchange/SharedTsBlockQueueTest.java | 11 +- .../exchange/ShuffleSinkHandleTest.java | 5 +- .../execution/exchange/SourceHandleTest.java | 5 +- .../queryengine/execution/exchange/Utils.java | 26 ++-- .../execution/memory/MemoryPoolTest.java | 115 +++++++++++++++--- 18 files changed, 425 insertions(+), 113 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java index 7479e832a9016..ac00b4cc712ae 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java @@ -100,6 +100,11 @@ public class MPPQueryContext { private boolean userQuery = false; + /** + * When true (e.g. SHOW QUERIES), operator and exchange memory may use fallback when pool is + * insufficient. Set from analysis via {@link #setNeedSetHighestPriority(boolean)}. + */ + private boolean needSetHighestPriority = false; @TestOnly public MPPQueryContext(QueryId queryId) { this.queryId = queryId; @@ -406,6 +411,14 @@ public void setUserQuery(boolean userQuery) { this.userQuery = userQuery; } + public boolean needSetHighestPriority() { + return needSetHighestPriority; + } + + public void setNeedSetHighestPriority(boolean needSetHighestPriority) { + this.needSetHighestPriority = needSetHighestPriority; + } + public String getClientHostName() { if (session == null || session.getCliHostname() == null) { return "UNKNOWN"; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java index ac0b411109089..61dc05d4bfd34 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java @@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.queryengine.exception.exchange.GetTsBlockFromClosedOrAbortedChannelException; import org.apache.iotdb.db.queryengine.execution.driver.DriverContext; import org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelIndex; @@ -656,7 +657,11 @@ private synchronized ISinkChannel createLocalSinkChannel( } queue = new SharedTsBlockQueue( - localFragmentInstanceId, localPlanNodeId, localMemoryManager, executorService); + localFragmentInstanceId, + localPlanNodeId, + localMemoryManager, + executorService, + instanceContext.isHighestPriority()); } return new LocalSinkChannel( @@ -680,7 +685,8 @@ public ISinkChannel createLocalSinkChannelForPipeline( driverContext.getDriverTaskID().getFragmentInstanceId().toThrift(), planNodeId, localMemoryManager, - executorService); + executorService, + driverContext.getFragmentInstanceContext().isHighestPriority()); queue.allowAddingTsBlock(); return new LocalSinkChannel( queue, @@ -718,6 +724,7 @@ private ISinkChannel createSinkChannel( tsBlockSerdeFactory.get(), new ISinkChannelListenerImpl( localFragmentInstanceId, instanceContext, instanceContext::failed, cnt), + instanceContext.isHighestPriority(), mppDataExchangeServiceClientManager); } @@ -802,6 +809,7 @@ public ISourceHandle createLocalSourceHandleForPipeline( context.getDriverTaskID().toString()); } + @TestOnly public synchronized ISourceHandle createLocalSourceHandleForFragment( TFragmentInstanceId localFragmentInstanceId, String localPlanNodeId, @@ -809,6 +817,24 @@ public synchronized ISourceHandle createLocalSourceHandleForFragment( TFragmentInstanceId remoteFragmentInstanceId, int index, IMPPDataExchangeManagerCallback onFailureCallback) { + return createLocalSourceHandleForFragment( + localFragmentInstanceId, + localPlanNodeId, + remotePlanNodeId, + remoteFragmentInstanceId, + index, + onFailureCallback, + false); + } + + public synchronized ISourceHandle createLocalSourceHandleForFragment( + TFragmentInstanceId localFragmentInstanceId, + String localPlanNodeId, + String remotePlanNodeId, + TFragmentInstanceId remoteFragmentInstanceId, + int index, + IMPPDataExchangeManagerCallback onFailureCallback, + boolean isHighestPriority) { if (sourceHandles.containsKey(localFragmentInstanceId) && sourceHandles.get(localFragmentInstanceId).containsKey(localPlanNodeId)) { throw new IllegalStateException( @@ -840,7 +866,11 @@ public synchronized ISourceHandle createLocalSourceHandleForFragment( } queue = new SharedTsBlockQueue( - remoteFragmentInstanceId, remotePlanNodeId, localMemoryManager, executorService); + remoteFragmentInstanceId, + remotePlanNodeId, + localMemoryManager, + executorService, + isHighestPriority); } LocalSourceHandle localSourceHandle = new LocalSourceHandle( @@ -854,6 +884,7 @@ public synchronized ISourceHandle createLocalSourceHandleForFragment( return localSourceHandle; } + @TestOnly @Override public ISourceHandle createSourceHandle( TFragmentInstanceId localFragmentInstanceId, @@ -862,6 +893,24 @@ public ISourceHandle createSourceHandle( TEndPoint remoteEndpoint, TFragmentInstanceId remoteFragmentInstanceId, IMPPDataExchangeManagerCallback onFailureCallback) { + return createSourceHandle( + localFragmentInstanceId, + localPlanNodeId, + indexOfUpstreamSinkHandle, + remoteEndpoint, + remoteFragmentInstanceId, + onFailureCallback, + false); + } + + public ISourceHandle createSourceHandle( + TFragmentInstanceId localFragmentInstanceId, + String localPlanNodeId, + int indexOfUpstreamSinkHandle, + TEndPoint remoteEndpoint, + TFragmentInstanceId remoteFragmentInstanceId, + IMPPDataExchangeManagerCallback onFailureCallback, + boolean isHighestPriority) { Map sourceHandleMap = sourceHandles.get(localFragmentInstanceId); if (sourceHandleMap != null && sourceHandleMap.containsKey(localPlanNodeId)) { throw new IllegalStateException( @@ -891,6 +940,7 @@ public ISourceHandle createSourceHandle( executorService, tsBlockSerdeFactory.get(), new SourceHandleListenerImpl(onFailureCallback), + isHighestPriority, mppDataExchangeServiceClientManager); sourceHandles .computeIfAbsent(localFragmentInstanceId, key -> new ConcurrentHashMap<>()) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java index 555cf9efe5ad9..f4c21e2fdfa91 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java @@ -19,11 +19,13 @@ package org.apache.iotdb.db.queryengine.execution.exchange; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; import org.apache.iotdb.db.queryengine.execution.exchange.sink.LocalSinkChannel; import org.apache.iotdb.db.queryengine.execution.exchange.source.LocalSourceHandle; import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager; +import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult; import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId; import com.google.common.util.concurrent.ListenableFuture; @@ -62,7 +64,7 @@ public class SharedTsBlockQueue { private long bufferRetainedSizeInBytes = 0L; - private final Queue queue = new LinkedList<>(); + private final Queue> queue = new LinkedList<>(); private SettableFuture blocked = SettableFuture.create(); @@ -82,17 +84,28 @@ public class SharedTsBlockQueue { private long maxBytesCanReserve = IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance(); + private final boolean isHighestPriority; private volatile Throwable abortedCause = null; // used for SharedTsBlockQueue listener private final ExecutorService executorService; + @TestOnly public SharedTsBlockQueue( TFragmentInstanceId fragmentInstanceId, String planNodeId, LocalMemoryManager localMemoryManager, ExecutorService executorService) { + this(fragmentInstanceId, planNodeId, localMemoryManager, executorService, false); + } + + public SharedTsBlockQueue( + TFragmentInstanceId fragmentInstanceId, + String planNodeId, + LocalMemoryManager localMemoryManager, + ExecutorService executorService, + boolean isHighestPriority) { this.localFragmentInstanceId = Validate.notNull(fragmentInstanceId, "fragment instance ID cannot be null"); this.fullFragmentInstanceId = @@ -101,6 +114,7 @@ public SharedTsBlockQueue( this.localMemoryManager = Validate.notNull(localMemoryManager, "local memory manager cannot be null"); this.executorService = Validate.notNull(executorService, "ExecutorService can not be null."); + this.isHighestPriority = isHighestPriority; } public boolean hasNoMoreTsBlocks() { @@ -195,15 +209,18 @@ public TsBlock remove() { } throw new IllegalStateException("queue has been destroyed"); } - TsBlock tsBlock = queue.remove(); - localMemoryManager - .getQueryPool() - .free( - localFragmentInstanceId.getQueryId(), - fullFragmentInstanceId, - localPlanNodeId, - tsBlock.getSizeInBytes()); - bufferRetainedSizeInBytes -= tsBlock.getSizeInBytes(); + Pair tsBlockWithReservedBytes = queue.remove(); + long reservedBytes = tsBlockWithReservedBytes.right; + if (reservedBytes > 0) { + localMemoryManager + .getQueryPool() + .free( + localFragmentInstanceId.getQueryId(), + fullFragmentInstanceId, + localPlanNodeId, + reservedBytes); + bufferRetainedSizeInBytes -= reservedBytes; + } // Every time LocalSourceHandle consumes a TsBlock, it needs to send the event // to // corresponding LocalSinkChannel. @@ -213,7 +230,7 @@ public TsBlock remove() { if (blocked.isDone() && queue.isEmpty() && !noMoreTsBlocks) { blocked = SettableFuture.create(); } - return tsBlock; + return tsBlockWithReservedBytes.left; } /** @@ -236,20 +253,22 @@ public ListenableFuture add(TsBlock tsBlock) { localFragmentInstanceId.queryId, fullFragmentInstanceId, localPlanNodeId); alreadyRegistered = true; } - Pair, Boolean> pair = + MemoryReservationResult reserveResult = localMemoryManager .getQueryPool() - .reserve( + .reserveWithPriority( localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId, tsBlock.getSizeInBytes(), - maxBytesCanReserve); - blockedOnMemory = pair.left; - bufferRetainedSizeInBytes += tsBlock.getSizeInBytes(); + maxBytesCanReserve, + isHighestPriority); + blockedOnMemory = reserveResult.getFuture(); + long reservedBytes = reserveResult.getReservedBytes(); + bufferRetainedSizeInBytes += reservedBytes; // reserve memory failed, we should wait until there is enough memory - if (!Boolean.TRUE.equals(pair.right)) { + if (!reserveResult.isReserveSuccess()) { SettableFuture channelBlocked = SettableFuture.create(); blockedOnMemory.addListener( () -> { @@ -264,7 +283,7 @@ public ListenableFuture add(TsBlock tsBlock) { channelBlocked.set(null); return; } - queue.add(tsBlock); + queue.add(new Pair<>(tsBlock, reservedBytes)); if (!blocked.isDone()) { blocked.set(null); } @@ -281,7 +300,7 @@ public ListenableFuture add(TsBlock tsBlock) { executorService); return channelBlocked; } else { // reserve memory succeeded, add the TsBlock directly - queue.add(tsBlock); + queue.add(new Pair<>(tsBlock, reservedBytes)); if (!blocked.isDone()) { blocked.set(null); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java index ca6fdadc993e6..47ce6128fb76a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java @@ -28,6 +28,7 @@ import org.apache.iotdb.db.queryengine.exception.exchange.GetTsBlockFromClosedOrAbortedChannelException; import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SinkListener; import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager; +import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult; import org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet; import org.apache.iotdb.db.queryengine.metric.DataExchangeCountMetricSet; import org.apache.iotdb.db.utils.SetThreadName; @@ -119,6 +120,8 @@ public class SinkChannel implements ISinkChannel { private long maxBytesCanReserve = IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance(); + private final boolean isHighestPriority; + private static final DataExchangeCostMetricSet DATA_EXCHANGE_COST_METRIC_SET = DataExchangeCostMetricSet.getInstance(); private static final DataExchangeCountMetricSet DATA_EXCHANGE_COUNT_METRIC_SET = @@ -128,6 +131,34 @@ public class SinkChannel implements ISinkChannel { RamUsageEstimator.shallowSizeOfInstance(SinkChannel.class) + RamUsageEstimator.shallowSizeOfInstance(TFragmentInstanceId.class) * 2; + @TestOnly + @SuppressWarnings("squid:S107") + public SinkChannel( + TEndPoint remoteEndpoint, + TFragmentInstanceId remoteFragmentInstanceId, + String remotePlanNodeId, + String localPlanNodeId, + TFragmentInstanceId localFragmentInstanceId, + LocalMemoryManager localMemoryManager, + ExecutorService executorService, + TsBlockSerde serde, + SinkListener sinkListener, + IClientManager + mppDataExchangeServiceClientManager) { + this( + remoteEndpoint, + remoteFragmentInstanceId, + remotePlanNodeId, + localPlanNodeId, + localFragmentInstanceId, + localMemoryManager, + executorService, + serde, + sinkListener, + false, + mppDataExchangeServiceClientManager); + } + @SuppressWarnings("squid:S107") public SinkChannel( TEndPoint remoteEndpoint, @@ -139,6 +170,7 @@ public SinkChannel( ExecutorService executorService, TsBlockSerde serde, SinkListener sinkListener, + boolean isHighestPriority, IClientManager mppDataExchangeServiceClientManager) { this.remoteEndpoint = Validate.notNull(remoteEndpoint, "remoteEndPoint can not be null."); @@ -155,6 +187,7 @@ public SinkChannel( this.executorService = Validate.notNull(executorService, "executorService can not be null."); this.serde = Validate.notNull(serde, "serde can not be null."); this.sinkListener = Validate.notNull(sinkListener, "sinkListener can not be null."); + this.isHighestPriority = isHighestPriority; this.mppDataExchangeServiceClientManager = mppDataExchangeServiceClientManager; this.retryIntervalInMs = DEFAULT_RETRY_INTERVAL_IN_MS; this.threadName = @@ -204,21 +237,22 @@ public synchronized void send(TsBlock tsBlock) { long sizeInBytes = tsBlock.getSizeInBytes(); int startSequenceId; startSequenceId = nextSequenceId; - blocked = + MemoryReservationResult reserveResult = localMemoryManager .getQueryPool() - .reserve( + .reserveWithPriority( localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId, sizeInBytes, - maxBytesCanReserve) - .left; - bufferRetainedSizeInBytes += sizeInBytes; + maxBytesCanReserve, + isHighestPriority); + blocked = reserveResult.getFuture(); + bufferRetainedSizeInBytes += reserveResult.getReservedBytes(); sequenceIdToTsBlock.put(nextSequenceId, new Pair<>(tsBlock, currentTsBlockSize)); nextSequenceId += 1; - currentTsBlockSize = sizeInBytes; + currentTsBlockSize = reserveResult.getReservedBytes(); submitSendNewDataBlockEventTask(startSequenceId, ImmutableList.of(sizeInBytes)); } finally { @@ -433,19 +467,21 @@ public synchronized void open() { return; } // SinkChannel is opened when ShuffleSinkHandle choose it as the next channel - this.blocked = + MemoryReservationResult reserveResult = localMemoryManager .getQueryPool() - .reserve( + .reserveWithPriority( localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId, DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, - maxBytesCanReserve) // actually we only know maxBytesCanReserve after - // the handle is created, so we use DEFAULT here. It is ok to use DEFAULT here because - // at first this SinkChannel has not reserved memory. - .left; - this.bufferRetainedSizeInBytes = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + maxBytesCanReserve, + isHighestPriority); // actually we only know maxBytesCanReserve after + // the handle is created, so we use DEFAULT here. It is ok to use DEFAULT here because + // at first this SinkChannel has not reserved memory. + this.blocked = reserveResult.getFuture(); + this.bufferRetainedSizeInBytes = reserveResult.getReservedBytes(); + this.currentTsBlockSize = reserveResult.getReservedBytes(); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/SourceHandle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/SourceHandle.java index 2d1a06fcd3217..0e0b94094b566 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/SourceHandle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/SourceHandle.java @@ -27,6 +27,7 @@ import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SourceHandleListener; import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager; +import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult; import org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet; import org.apache.iotdb.db.queryengine.metric.DataExchangeCountMetricSet; import org.apache.iotdb.db.utils.SetThreadName; @@ -115,6 +116,8 @@ public class SourceHandle implements ISourceHandle { */ private boolean canGetTsBlockFromRemote = false; + private final boolean isHighestPriority; + private static final DataExchangeCostMetricSet DATA_EXCHANGE_COST_METRIC_SET = DataExchangeCostMetricSet.getInstance(); private static final DataExchangeCountMetricSet DATA_EXCHANGE_COUNT_METRIC_SET = @@ -124,6 +127,7 @@ public class SourceHandle implements ISourceHandle { RamUsageEstimator.shallowSizeOfInstance(SourceHandle.class) + RamUsageEstimator.shallowSizeOfInstance(TFragmentInstanceId.class) * 2; + @TestOnly @SuppressWarnings("squid:S107") public SourceHandle( TEndPoint remoteEndpoint, @@ -137,6 +141,34 @@ public SourceHandle( SourceHandleListener sourceHandleListener, IClientManager mppDataExchangeServiceClientManager) { + this( + remoteEndpoint, + remoteFragmentInstanceId, + localFragmentInstanceId, + localPlanNodeId, + indexOfUpstreamSinkHandle, + localMemoryManager, + executorService, + serde, + sourceHandleListener, + false, + mppDataExchangeServiceClientManager); + } + + @SuppressWarnings("squid:S107") + public SourceHandle( + TEndPoint remoteEndpoint, + TFragmentInstanceId remoteFragmentInstanceId, + TFragmentInstanceId localFragmentInstanceId, + String localPlanNodeId, + int indexOfUpstreamSinkHandle, + LocalMemoryManager localMemoryManager, + ExecutorService executorService, + TsBlockSerde serde, + SourceHandleListener sourceHandleListener, + boolean isHighestPriority, + IClientManager + mppDataExchangeServiceClientManager) { this.remoteEndpoint = Validate.notNull(remoteEndpoint, "remoteEndpoint can not be null."); this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId, "remoteFragmentInstanceId can not be null."); @@ -152,6 +184,7 @@ public SourceHandle( this.serde = Validate.notNull(serde, "serde can not be null."); this.sourceHandleListener = Validate.notNull(sourceHandleListener, "sourceHandleListener can not be null."); + this.isHighestPriority = isHighestPriority; this.bufferRetainedSizeInBytes = 0L; this.mppDataExchangeServiceClientManager = mppDataExchangeServiceClientManager; this.retryIntervalInMs = DEFAULT_RETRY_INTERVAL_IN_MS; @@ -192,19 +225,24 @@ public synchronized ByteBuffer getSerializedTsBlock() { if (tsBlock == null) { return null; } - long retainedSize = sequenceIdToDataBlockSize.remove(currSequenceId); + Long retainedSize = sequenceIdToDataBlockSize.remove(currSequenceId); + if (retainedSize == null) { + throw new IllegalStateException("Reserved data block size is null."); + } if (LOGGER.isDebugEnabled()) { LOGGER.debug("[GetTsBlockFromBuffer] sequenceId:{}, size:{}", currSequenceId, retainedSize); } currSequenceId += 1; - bufferRetainedSizeInBytes -= retainedSize; - localMemoryManager - .getQueryPool() - .free( - localFragmentInstanceId.getQueryId(), - fullFragmentInstanceId, - localPlanNodeId, - retainedSize); + if (retainedSize > 0) { + bufferRetainedSizeInBytes -= retainedSize; + localMemoryManager + .getQueryPool() + .free( + localFragmentInstanceId.getQueryId(), + fullFragmentInstanceId, + localPlanNodeId, + retainedSize); + } if (sequenceIdToTsBlock.isEmpty() && !isFinished()) { if (LOGGER.isDebugEnabled()) { @@ -241,18 +279,24 @@ private synchronized void trySubmitGetDataBlocksTask() { if (bytesToReserve == null) { throw new IllegalStateException("Data block size is null."); } - pair = + MemoryReservationResult reserveResult = localMemoryManager .getQueryPool() - .reserve( + .reserveWithPriority( localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId, bytesToReserve, - maxBytesCanReserve); - bufferRetainedSizeInBytes += bytesToReserve; + maxBytesCanReserve, + isHighestPriority); + pair = new Pair<>(reserveResult.getFuture(), reserveResult.isReserveSuccess()); + // actually reserve size is not equals raw size, update the actually reserve size to the map + if (reserveResult.getReservedBytes() != bytesToReserve) { + sequenceIdToDataBlockSize.put(endSequenceId, reserveResult.getReservedBytes()); + } + bufferRetainedSizeInBytes += reserveResult.getReservedBytes(); endSequenceId += 1; - reservedBytes += bytesToReserve; + reservedBytes += reserveResult.getReservedBytes(); if (!Boolean.TRUE.equals(pair.right)) { blockedSize = bytesToReserve; break; @@ -619,14 +663,16 @@ private void fail(Throwable t) { if (aborted || closed) { return; } - bufferRetainedSizeInBytes -= reservedBytes; - localMemoryManager - .getQueryPool() - .free( - localFragmentInstanceId.getQueryId(), - fullFragmentInstanceId, - localPlanNodeId, - reservedBytes); + if (reservedBytes > 0) { + bufferRetainedSizeInBytes -= reservedBytes; + localMemoryManager + .getQueryPool() + .free( + localFragmentInstanceId.getQueryId(), + fullFragmentInstanceId, + localPlanNodeId, + reservedBytes); + } sourceHandleListener.onFailure(SourceHandle.this, t); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index f655df432bff7..f02b86579b496 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -152,6 +152,7 @@ public class FragmentInstanceContext extends QueryContext { private long unclosedUnseqFileNum = 0; private long closedSeqFileNum = 0; private long closedUnseqFileNum = 0; + private boolean highestPriority = false; public static FragmentInstanceContext createFragmentInstanceContext( FragmentInstanceId id, FragmentInstanceStateMachine stateMachine, SessionInfo sessionInfo) { @@ -1128,6 +1129,18 @@ public boolean ignoreNotExistsDevice() { return ignoreNotExistsDevice; } + /** + * Same flag as {@link + * org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis#needSetHighestPriority()}. + */ + public boolean isHighestPriority() { + return highestPriority; + } + + public void setHighestPriority(boolean highestPriority) { + this.highestPriority = highestPriority; + } + public boolean isSingleSourcePath() { return singleSourcePath; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java index e8d0fd8243201..9dacdf44d887d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java @@ -158,6 +158,7 @@ public FragmentInstanceInfo execDataQueryFragmentInstance( dataRegion, instance.getGlobalTimePredicate(), dataNodeQueryContextMap)); + context.setHighestPriority(instance.isHighestPriority()); try { List driverFactories = @@ -259,6 +260,7 @@ public FragmentInstanceInfo execSchemaQueryFragmentInstance( fragmentInstanceId -> createFragmentInstanceContext( fragmentInstanceId, stateMachine, instance.getSessionInfo())); + context.setHighestPriority(instance.isHighestPriority()); try { List driverFactories = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java index a9ab6ed5d8120..9a00d5a87de0d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java @@ -23,10 +23,8 @@ import org.apache.iotdb.db.exception.runtime.MemoryLeakException; import com.google.common.util.concurrent.AbstractFuture; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.apache.commons.lang3.Validate; -import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,6 +39,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; +import static com.google.common.util.concurrent.Futures.immediateVoidFuture; + /** A thread-safe memory pool. */ public class MemoryPool { @@ -111,6 +111,31 @@ public boolean set(@Nullable V value) { } } + public static class MemoryReservationResult { + private final ListenableFuture future; + private final boolean reserveSuccess; + private final long reservedBytes; + + public MemoryReservationResult( + ListenableFuture future, boolean reserveSuccess, long reservedBytes) { + this.future = future; + this.reserveSuccess = reserveSuccess; + this.reservedBytes = reservedBytes; + } + + public ListenableFuture getFuture() { + return future; + } + + public boolean isReserveSuccess() { + return reserveSuccess; + } + + public long getReservedBytes() { + return reservedBytes; + } + } + private final String id; private final long maxBytes; private final long maxBytesPerFragmentInstance; @@ -220,18 +245,20 @@ public void deRegisterFragmentInstanceFromQueryMemoryMap( } /** - * Reserve memory with bytesToReserve. + * Reserve memory with bytesToReserve respect priority. * - * @return if reserve succeed, pair.right will be true, otherwise false + * @return if reserve succeed, reservedBytes may be zero or equals with bytesToReserve; if reserve + * failed, reservedBytes must be equals with bytesToReserve * @throws IllegalArgumentException throw exception if current query requests more memory than can * be allocated. */ - public Pair, Boolean> reserve( + public MemoryReservationResult reserveWithPriority( String queryId, String fragmentInstanceId, String planNodeId, long bytesToReserve, - long maxBytesCanReserve) { + long maxBytesCanReserve, + boolean isHighestPriority) { Validate.notNull(queryId, "queryId can not be null."); Validate.notNull(fragmentInstanceId, "fragmentInstanceId can not be null."); Validate.notNull(planNodeId, "planNodeId can not be null."); @@ -250,19 +277,21 @@ public Pair, Boolean> reserve( "Query is aborted since it requests more memory than can be allocated."); } - ListenableFuture result; if (tryReserve(queryId, fragmentInstanceId, planNodeId, bytesToReserve, maxBytesCanReserve)) { - result = Futures.immediateFuture(null); - return new Pair<>(result, Boolean.TRUE); + return new MemoryReservationResult(immediateVoidFuture(), true, bytesToReserve); } else { + rollbackReserve(queryId, fragmentInstanceId, planNodeId, bytesToReserve); + if (isHighestPriority) { + // SHOW QUERIES: treat as success with zero bytes reserved from pool when insufficient. + return new MemoryReservationResult(immediateVoidFuture(), true, 0L); + } LOGGER.debug( "Blocked reserve request: {} bytes memory for planNodeId{}", bytesToReserve, planNodeId); - rollbackReserve(queryId, fragmentInstanceId, planNodeId, bytesToReserve); - result = + ListenableFuture result = MemoryReservationFuture.create( queryId, fragmentInstanceId, planNodeId, bytesToReserve, maxBytesCanReserve); memoryReservationFutures.add((MemoryReservationFuture) result); - return new Pair<>(result, Boolean.FALSE); + return new MemoryReservationResult(result, false, bytesToReserve); } } @@ -293,7 +322,8 @@ public boolean tryReserveForTest( /** * Cancel the specified memory reservation. If the reservation has finished, do nothing. * - * @param future The future returned from {@link #reserve(String, String, String, long, long)} + * @param future The future returned from {@link #reserveWithPriority(String, String, String, + * long, long, boolean)} * @return If the future has not complete, return the number of bytes being reserved. Otherwise, * return 0. */ diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java index 01d1020934dda..63b2d379ddd93 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java @@ -503,7 +503,7 @@ public boolean isQuery() { public boolean needSetHighestPriority() { // if is this Statement is ShowQueryStatement, set its instances to the highest priority, so // that the sub-tasks of the ShowQueries instances could be executed first. - return StatementType.SHOW_QUERIES.equals(statement.getType()); + return statement != null && StatementType.SHOW_QUERIES.equals(statement.getType()); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java index 8de6165525e29..5af143bfca83f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java @@ -125,6 +125,7 @@ public QueryExecution(IPlanner planner, MPPQueryContext context, ExecutorService this.context = context; this.planner = planner; this.analysis = analyze(context); + context.setNeedSetHighestPriority(analysis.needSetHighestPriority()); this.stateMachine = new QueryStateMachine(context.getQueryId(), executor); // We add the abort logic inside the QueryExecution. @@ -587,7 +588,8 @@ private void initResultHandle() { context.getResultNodeContext().getUpStreamPlanNodeId().getId(), context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(), 0, // Upstream of result ExchangeNode will only have one child. - stateMachine::transitionToFailed) + stateMachine::transitionToFailed, + context.needSetHighestPriority()) : MPPDataExchangeService.getInstance() .getMPPDataExchangeManager() .createSourceHandle( @@ -596,7 +598,8 @@ private void initResultHandle() { 0, upstreamEndPoint, context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(), - stateMachine::transitionToFailed); + stateMachine::transitionToFailed, + context.needSetHighestPriority()); } @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java index e0aeacb7f56b0..0649f808c4ffd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java @@ -2624,14 +2624,16 @@ public Operator visitExchange(ExchangeNode node, LocalExecutionPlanContext conte node.getUpstreamPlanNodeId().getId(), remoteInstanceId.toThrift(), node.getIndexOfUpstreamSinkHandle(), - context.getInstanceContext()::failed) + context.getInstanceContext()::failed, + context.getInstanceContext().isHighestPriority()) : MPP_DATA_EXCHANGE_MANAGER.createSourceHandle( localInstanceId.toThrift(), node.getPlanNodeId().getId(), node.getIndexOfUpstreamSinkHandle(), upstreamEndPoint, remoteInstanceId.toThrift(), - context.getInstanceContext()::failed); + context.getInstanceContext()::failed, + context.getInstanceContext().isHighestPriority()); if (!isSameNode) { context.addExchangeSumNum(1); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java index d212f4ca72526..b96485199ad4e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java @@ -223,6 +223,7 @@ public static FragmentInstance deserializeFrom(ByteBuffer buffer) { fragmentInstance.hostDataNode = hasHostDataNode ? ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer) : null; fragmentInstance.isExplainAnalyze = ReadWriteIOUtils.readBool(buffer); + fragmentInstance.setHighestPriority(ReadWriteIOUtils.readBool(buffer)); return fragmentInstance; } @@ -247,6 +248,7 @@ public ByteBuffer serializeToByteBuffer() { ThriftCommonsSerDeUtils.serializeTDataNodeLocation(hostDataNode, outputStream); } ReadWriteIOUtils.write(isExplainAnalyze, outputStream); + ReadWriteIOUtils.write(isHighestPriority, outputStream); return ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size()); } catch (IOException e) { LOGGER.error("Unexpected error occurs when serializing this FragmentInstance.", e); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/LocalSinkChannelTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/LocalSinkChannelTest.java index 8d9c4b9cd6569..b3c50aaa517c5 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/LocalSinkChannelTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/LocalSinkChannelTest.java @@ -92,13 +92,14 @@ public void testSend() { Assert.assertFalse(localSinkChannel.isFinished()); Assert.assertEquals(11 * mockTsBlockSize, localSinkChannel.getBufferRetainedSizeInBytes()); Mockito.verify(spyMemoryPool, Mockito.times(11)) - .reserve( + .reserveWithPriority( queryId, FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId( remoteFragmentInstanceId), remotePlanNodeId, mockTsBlockSize, - Long.MAX_VALUE); + Long.MAX_VALUE, + false); // Receive TsBlocks. int numOfReceivedTsblocks = 0; @@ -184,13 +185,14 @@ public void testAbort() { Assert.assertFalse(localSinkChannel.isFinished()); Assert.assertEquals(11 * mockTsBlockSize, localSinkChannel.getBufferRetainedSizeInBytes()); Mockito.verify(spyMemoryPool, Mockito.times(11)) - .reserve( + .reserveWithPriority( queryId, FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId( remoteFragmentInstanceId), remotePlanNodeId, mockTsBlockSize, - Long.MAX_VALUE); + Long.MAX_VALUE, + false); // Abort. localSinkChannel.abort(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java index 46196d1c990d1..a95fd35ba76c9 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java @@ -21,13 +21,13 @@ import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager; import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool; +import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult; import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import org.apache.commons.lang3.Validate; import org.apache.tsfile.read.common.block.TsBlock; -import org.apache.tsfile.utils.Pair; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; @@ -62,15 +62,16 @@ public void testAsyncListenerAfterAbortDoesNotAddTsBlock() { MemoryPool mockMemoryPool = Mockito.mock(MemoryPool.class); Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool); - // reserve() returns (manualFuture, false) — simulating memory blocked + // reserveWithPriority() returns blocked future and reserve failure. Mockito.when( - mockMemoryPool.reserve( + mockMemoryPool.reserveWithPriority( Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), - Mockito.anyLong())) - .thenReturn(new Pair<>(manualFuture, Boolean.FALSE)); + Mockito.anyLong(), + Mockito.anyBoolean())) + .thenReturn(new MemoryReservationResult(manualFuture, false, 1024L)); // tryCancel returns 0 — simulating future already completed (can't cancel) Mockito.when(mockMemoryPool.tryCancel(Mockito.any())).thenReturn(0L); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/ShuffleSinkHandleTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/ShuffleSinkHandleTest.java index 489ec5c9ed607..4190c2fa61abe 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/ShuffleSinkHandleTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/ShuffleSinkHandleTest.java @@ -104,13 +104,14 @@ public void testAbort() { Assert.assertFalse(localSinkChannel.isFinished()); Assert.assertEquals(11 * mockTsBlockSize, localSinkChannel.getBufferRetainedSizeInBytes()); Mockito.verify(spyMemoryPool, Mockito.times(11)) - .reserve( + .reserveWithPriority( queryId, FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId( remoteFragmentInstanceId), remotePlanNodeId, mockTsBlockSize, - Long.MAX_VALUE); + Long.MAX_VALUE, + false); // Abort. shuffleSinkHandle.abort(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SourceHandleTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SourceHandleTest.java index ee8493372065a..40721c93db785 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SourceHandleTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SourceHandleTest.java @@ -254,13 +254,14 @@ public void testBlockedOneTimeReceive() { .collect(Collectors.toList())); try { Mockito.verify(spyMemoryPool, Mockito.timeout(10_000).times(6)) - .reserve( + .reserveWithPriority( queryId, FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId( localFragmentInstanceId), localPlanNodeId, MOCK_TSBLOCK_SIZE, - maxBytesCanReserve); + maxBytesCanReserve, + false); Mockito.verify(mockClient, Mockito.timeout(10_0000).times(1)) .getDataBlock( Mockito.argThat( diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/Utils.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/Utils.java index 327d4a34c39e7..b09498ad949dd 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/Utils.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/Utils.java @@ -20,11 +20,11 @@ package org.apache.iotdb.db.queryengine.execution.exchange; import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool; +import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult; import com.google.common.util.concurrent.SettableFuture; import org.apache.tsfile.read.common.block.TsBlock; import org.apache.tsfile.read.common.block.column.TsBlockSerde; -import org.apache.tsfile.utils.Pair; import org.mockito.Mockito; import org.mockito.stubbing.Answer; @@ -68,21 +68,25 @@ public static MemoryPool createMockBlockedMemoryPool( settableFuture.get().set(null); AtomicReference reservedBytes = new AtomicReference<>(0L); Mockito.when( - mockMemoryPool.reserve( + mockMemoryPool.reserveWithPriority( Mockito.eq(queryId), Mockito.eq(fragmentInstanceId), Mockito.eq(planNodeId), Mockito.anyLong(), - Mockito.anyLong())) + Mockito.anyLong(), + Mockito.anyBoolean())) .thenAnswer( invocation -> { long bytesToReserve = invocation.getArgument(3); if (reservedBytes.get() + bytesToReserve <= capacityInBytes) { - reservedBytes.updateAndGet(v -> v + (long) invocation.getArgument(3)); - return new Pair<>(settableFuture.get(), true); + reservedBytes.updateAndGet(v -> v + bytesToReserve); + return new MemoryReservationResult(settableFuture.get(), true, bytesToReserve); } else { + if (invocation.getArgument(5)) { + return new MemoryReservationResult(settableFuture.get(), true, 0L); + } settableFuture.set(SettableFuture.create()); - return new Pair<>(settableFuture.get(), false); + return new MemoryReservationResult(settableFuture.get(), false, bytesToReserve); } }); Mockito.doAnswer( @@ -124,13 +128,17 @@ public static MemoryPool createMockBlockedMemoryPool( public static MemoryPool createMockNonBlockedMemoryPool() { MemoryPool mockMemoryPool = Mockito.mock(MemoryPool.class); Mockito.when( - mockMemoryPool.reserve( + mockMemoryPool.reserveWithPriority( Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), - Mockito.anyLong())) - .thenReturn(new Pair<>(immediateFuture(null), true)); + Mockito.anyLong(), + Mockito.anyBoolean())) + .thenAnswer( + invocation -> + new MemoryReservationResult( + immediateFuture(null), true, invocation.getArgument(3))); Mockito.when( mockMemoryPool.tryReserve( Mockito.anyString(), diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPoolTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPoolTest.java index bae3c0643f45c..7b2ae8695aa5b 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPoolTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPoolTest.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.execution.memory; +import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult; import com.google.common.util.concurrent.ListenableFuture; import org.junit.Assert; import org.junit.Before; @@ -94,7 +95,9 @@ public void testOverTryReserve() { public void testReserve() { ListenableFuture future = - pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, Long.MAX_VALUE).left; + pool.reserveWithPriority( + QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, Long.MAX_VALUE, false) + .getFuture(); Assert.assertTrue(future.isDone()); Assert.assertEquals(256L, pool.getQueryMemoryReservedBytes(QUERY_ID)); Assert.assertEquals(256L, pool.getReservedBytes()); @@ -104,7 +107,8 @@ public void testReserve() { public void tesReserveZero() { try { - pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 0L, Long.MAX_VALUE); + pool.reserveWithPriority( + QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 0L, Long.MAX_VALUE, false); Assert.fail("Expect IllegalArgumentException"); } catch (IllegalArgumentException ignore) { } @@ -114,7 +118,8 @@ public void tesReserveZero() { public void testReserveNegative() { try { - pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, -1L, Long.MAX_VALUE); + pool.reserveWithPriority( + QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, -1L, Long.MAX_VALUE, false); Assert.fail("Expect IllegalArgumentException"); } catch (IllegalArgumentException ignore) { } @@ -124,7 +129,9 @@ public void testReserveNegative() { public void testReserveAll() { ListenableFuture future = - pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, Long.MAX_VALUE).left; + pool.reserveWithPriority( + QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, Long.MAX_VALUE, false) + .getFuture(); Assert.assertTrue(future.isDone()); Assert.assertEquals(512L, pool.getQueryMemoryReservedBytes(QUERY_ID)); Assert.assertEquals(512L, pool.getReservedBytes()); @@ -134,11 +141,15 @@ public void testReserveAll() { public void testOverReserve() { ListenableFuture future = - pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, Long.MAX_VALUE).left; + pool.reserveWithPriority( + QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, Long.MAX_VALUE, false) + .getFuture(); Assert.assertTrue(future.isDone()); Assert.assertEquals(256L, pool.getQueryMemoryReservedBytes(QUERY_ID)); Assert.assertEquals(256L, pool.getReservedBytes()); - future = pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, 513L).left; + future = + pool.reserveWithPriority(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, 513L, false) + .getFuture(); Assert.assertFalse(future.isDone()); Assert.assertEquals(256L, pool.getQueryMemoryReservedBytes(QUERY_ID)); Assert.assertEquals(256L, pool.getReservedBytes()); @@ -148,11 +159,13 @@ public void testOverReserve() { public void testReserveAndFree() { Assert.assertTrue( - pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, Long.MAX_VALUE) - .left + pool.reserveWithPriority( + QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, Long.MAX_VALUE, false) + .getFuture() .isDone()); ListenableFuture future = - pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, 513L).left; + pool.reserveWithPriority(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, 513L, false) + .getFuture(); Assert.assertFalse(future.isDone()); Assert.assertEquals(512L, pool.getQueryMemoryReservedBytes(QUERY_ID)); Assert.assertEquals(512L, pool.getReservedBytes()); @@ -166,18 +179,22 @@ public void testReserveAndFree() { public void testMultiReserveAndFree() { Assert.assertTrue( - pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, Long.MAX_VALUE) - .left + pool.reserveWithPriority( + QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, Long.MAX_VALUE, false) + .getFuture() .isDone()); Assert.assertEquals(256L, pool.getQueryMemoryReservedBytes(QUERY_ID)); Assert.assertEquals(256L, pool.getReservedBytes()); ListenableFuture future1 = - pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, 513L).left; + pool.reserveWithPriority(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, 513L, false) + .getFuture(); ListenableFuture future2 = - pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, 513L).left; + pool.reserveWithPriority(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, 513L, false) + .getFuture(); ListenableFuture future3 = - pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, 513L).left; + pool.reserveWithPriority(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, 513L, false) + .getFuture(); Assert.assertFalse(future1.isDone()); Assert.assertFalse(future2.isDone()); Assert.assertFalse(future3.isDone()); @@ -284,7 +301,8 @@ public void testTryCancelBlockedReservation() { pool.tryReserveForTest(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, Long.MAX_VALUE)); ListenableFuture f = - pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, 512L).left; + pool.reserveWithPriority(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, 512L, false) + .getFuture(); Assert.assertFalse(f.isDone()); // Cancel the reservation. Assert.assertEquals(256L, pool.tryCancel(f)); @@ -296,11 +314,76 @@ public void testTryCancelBlockedReservation() { public void testTryCancelCompletedReservation() { ListenableFuture f = - pool.reserve(QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, Long.MAX_VALUE).left; + pool.reserveWithPriority( + QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, Long.MAX_VALUE, false) + .getFuture(); Assert.assertTrue(f.isDone()); // Cancel the reservation. Assert.assertEquals(0L, pool.tryCancel(f)); Assert.assertTrue(f.isDone()); Assert.assertFalse(f.isCancelled()); } + + /** + * Normal query: requested bytes exceed what the pool can still provide — reserve fails (blocked + * future, not immediate success). + */ + @Test + public void testReserveWithPriorityNormalQueryExceedsAvailable() { + MemoryReservationResult r1 = + pool.reserveWithPriority( + QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, Long.MAX_VALUE, false); + Assert.assertTrue(r1.isReserveSuccess()); + Assert.assertEquals(512L, r1.getReservedBytes()); + Assert.assertTrue(r1.getFuture().isDone()); + + MemoryReservationResult r2 = + pool.reserveWithPriority( + QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, Long.MAX_VALUE, false); + Assert.assertTrue(r2.isReserveSuccess()); + Assert.assertEquals(512L, r2.getReservedBytes()); + Assert.assertEquals(1024L, pool.getReservedBytes()); + + MemoryReservationResult r3 = + pool.reserveWithPriority( + QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, Long.MAX_VALUE, false); + Assert.assertFalse(r3.isReserveSuccess()); + Assert.assertEquals(256L, r3.getReservedBytes()); + Assert.assertFalse(r3.getFuture().isDone()); + Assert.assertEquals(1024L, pool.getReservedBytes()); + } + + /** SHOW QUERIES path: exceeds pool capacity — treated as success with zero bytes from pool. */ + @Test + public void testReserveWithPriorityShowQueriesExceedsAvailable() { + Assert.assertTrue( + pool.reserveWithPriority( + QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, Long.MAX_VALUE, false) + .isReserveSuccess()); + Assert.assertTrue( + pool.reserveWithPriority( + QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 512L, Long.MAX_VALUE, false) + .isReserveSuccess()); + Assert.assertEquals(1024L, pool.getReservedBytes()); + + MemoryReservationResult r = + pool.reserveWithPriority( + QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, Long.MAX_VALUE, true); + Assert.assertTrue(r.isReserveSuccess()); + Assert.assertEquals(0L, r.getReservedBytes()); + Assert.assertTrue(r.getFuture().isDone()); + Assert.assertEquals(1024L, pool.getReservedBytes()); + } + + /** SHOW QUERIES path: pool has room — same as normal successful reserve. */ + @Test + public void testReserveWithPriorityShowQueriesWithinAvailable() { + MemoryReservationResult r = + pool.reserveWithPriority( + QUERY_ID, FRAGMENT_INSTANCE_ID, PLAN_NODE_ID, 256L, Long.MAX_VALUE, true); + Assert.assertTrue(r.isReserveSuccess()); + Assert.assertEquals(256L, r.getReservedBytes()); + Assert.assertTrue(r.getFuture().isDone()); + Assert.assertEquals(256L, pool.getReservedBytes()); + } } From 50b24215c8e03717479d7c7e1e23d84d66876b73 Mon Sep 17 00:00:00 2001 From: Weihao Li <18110526956@163.com> Date: Tue, 21 Apr 2026 16:48:29 +0800 Subject: [PATCH 2/2] spotless Signed-off-by: Weihao Li <18110526956@163.com> --- .../org/apache/iotdb/db/queryengine/common/MPPQueryContext.java | 1 + .../iotdb/db/queryengine/execution/memory/MemoryPoolTest.java | 1 + 2 files changed, 2 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java index ac00b4cc712ae..13773c150906c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java @@ -105,6 +105,7 @@ public class MPPQueryContext { * insufficient. Set from analysis via {@link #setNeedSetHighestPriority(boolean)}. */ private boolean needSetHighestPriority = false; + @TestOnly public MPPQueryContext(QueryId queryId) { this.queryId = queryId; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPoolTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPoolTest.java index 7b2ae8695aa5b..cf7e15dc71f46 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPoolTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPoolTest.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.queryengine.execution.memory; import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult; + import com.google.common.util.concurrent.ListenableFuture; import org.junit.Assert; import org.junit.Before;