Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ public class MPPQueryContext {

private boolean userQuery = false;

/**
* When true (e.g. SHOW QUERIES), operator and exchange memory may use fallback when pool is
* insufficient. Set from analysis via {@link #setNeedSetHighestPriority(boolean)}.
*/
private boolean needSetHighestPriority = false;

@TestOnly
public MPPQueryContext(QueryId queryId) {
this.queryId = queryId;
Expand Down Expand Up @@ -406,6 +412,14 @@ public void setUserQuery(boolean userQuery) {
this.userQuery = userQuery;
}

public boolean needSetHighestPriority() {
return needSetHighestPriority;
}

public void setNeedSetHighestPriority(boolean needSetHighestPriority) {
this.needSetHighestPriority = needSetHighestPriority;
}

public String getClientHostName() {
if (session == null || session.getCliHostname() == null) {
return "UNKNOWN";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.queryengine.exception.exchange.GetTsBlockFromClosedOrAbortedChannelException;
import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
import org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelIndex;
Expand Down Expand Up @@ -656,7 +657,11 @@ private synchronized ISinkChannel createLocalSinkChannel(
}
queue =
new SharedTsBlockQueue(
localFragmentInstanceId, localPlanNodeId, localMemoryManager, executorService);
localFragmentInstanceId,
localPlanNodeId,
localMemoryManager,
executorService,
instanceContext.isHighestPriority());
}

return new LocalSinkChannel(
Expand All @@ -680,7 +685,8 @@ public ISinkChannel createLocalSinkChannelForPipeline(
driverContext.getDriverTaskID().getFragmentInstanceId().toThrift(),
planNodeId,
localMemoryManager,
executorService);
executorService,
driverContext.getFragmentInstanceContext().isHighestPriority());
queue.allowAddingTsBlock();
return new LocalSinkChannel(
queue,
Expand Down Expand Up @@ -718,6 +724,7 @@ private ISinkChannel createSinkChannel(
tsBlockSerdeFactory.get(),
new ISinkChannelListenerImpl(
localFragmentInstanceId, instanceContext, instanceContext::failed, cnt),
instanceContext.isHighestPriority(),
mppDataExchangeServiceClientManager);
}

Expand Down Expand Up @@ -802,13 +809,32 @@ public ISourceHandle createLocalSourceHandleForPipeline(
context.getDriverTaskID().toString());
}

@TestOnly
public synchronized ISourceHandle createLocalSourceHandleForFragment(
TFragmentInstanceId localFragmentInstanceId,
String localPlanNodeId,
String remotePlanNodeId,
TFragmentInstanceId remoteFragmentInstanceId,
int index,
IMPPDataExchangeManagerCallback<Throwable> onFailureCallback) {
return createLocalSourceHandleForFragment(
localFragmentInstanceId,
localPlanNodeId,
remotePlanNodeId,
remoteFragmentInstanceId,
index,
onFailureCallback,
false);
}

public synchronized ISourceHandle createLocalSourceHandleForFragment(
TFragmentInstanceId localFragmentInstanceId,
String localPlanNodeId,
String remotePlanNodeId,
TFragmentInstanceId remoteFragmentInstanceId,
int index,
IMPPDataExchangeManagerCallback<Throwable> onFailureCallback,
boolean isHighestPriority) {
if (sourceHandles.containsKey(localFragmentInstanceId)
&& sourceHandles.get(localFragmentInstanceId).containsKey(localPlanNodeId)) {
throw new IllegalStateException(
Expand Down Expand Up @@ -840,7 +866,11 @@ public synchronized ISourceHandle createLocalSourceHandleForFragment(
}
queue =
new SharedTsBlockQueue(
remoteFragmentInstanceId, remotePlanNodeId, localMemoryManager, executorService);
remoteFragmentInstanceId,
remotePlanNodeId,
localMemoryManager,
executorService,
isHighestPriority);
}
LocalSourceHandle localSourceHandle =
new LocalSourceHandle(
Expand All @@ -854,6 +884,7 @@ public synchronized ISourceHandle createLocalSourceHandleForFragment(
return localSourceHandle;
}

@TestOnly
@Override
public ISourceHandle createSourceHandle(
TFragmentInstanceId localFragmentInstanceId,
Expand All @@ -862,6 +893,24 @@ public ISourceHandle createSourceHandle(
TEndPoint remoteEndpoint,
TFragmentInstanceId remoteFragmentInstanceId,
IMPPDataExchangeManagerCallback<Throwable> onFailureCallback) {
return createSourceHandle(
localFragmentInstanceId,
localPlanNodeId,
indexOfUpstreamSinkHandle,
remoteEndpoint,
remoteFragmentInstanceId,
onFailureCallback,
false);
}

public ISourceHandle createSourceHandle(
TFragmentInstanceId localFragmentInstanceId,
String localPlanNodeId,
int indexOfUpstreamSinkHandle,
TEndPoint remoteEndpoint,
TFragmentInstanceId remoteFragmentInstanceId,
IMPPDataExchangeManagerCallback<Throwable> onFailureCallback,
boolean isHighestPriority) {
Map<String, ISourceHandle> sourceHandleMap = sourceHandles.get(localFragmentInstanceId);
if (sourceHandleMap != null && sourceHandleMap.containsKey(localPlanNodeId)) {
throw new IllegalStateException(
Expand Down Expand Up @@ -891,6 +940,7 @@ public ISourceHandle createSourceHandle(
executorService,
tsBlockSerdeFactory.get(),
new SourceHandleListenerImpl(onFailureCallback),
isHighestPriority,
mppDataExchangeServiceClientManager);
sourceHandles
.computeIfAbsent(localFragmentInstanceId, key -> new ConcurrentHashMap<>())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

package org.apache.iotdb.db.queryengine.execution.exchange;

import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import org.apache.iotdb.db.queryengine.execution.exchange.sink.LocalSinkChannel;
import org.apache.iotdb.db.queryengine.execution.exchange.source.LocalSourceHandle;
import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool.MemoryReservationResult;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;

import com.google.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -62,7 +64,7 @@ public class SharedTsBlockQueue {

private long bufferRetainedSizeInBytes = 0L;

private final Queue<TsBlock> queue = new LinkedList<>();
private final Queue<Pair<TsBlock, Long>> queue = new LinkedList<>();

private SettableFuture<Void> blocked = SettableFuture.create();

Expand All @@ -82,17 +84,28 @@ public class SharedTsBlockQueue {

private long maxBytesCanReserve =
IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance();
private final boolean isHighestPriority;

private volatile Throwable abortedCause = null;

// used for SharedTsBlockQueue listener
private final ExecutorService executorService;

@TestOnly
public SharedTsBlockQueue(
TFragmentInstanceId fragmentInstanceId,
String planNodeId,
LocalMemoryManager localMemoryManager,
ExecutorService executorService) {
this(fragmentInstanceId, planNodeId, localMemoryManager, executorService, false);
}

public SharedTsBlockQueue(
TFragmentInstanceId fragmentInstanceId,
String planNodeId,
LocalMemoryManager localMemoryManager,
ExecutorService executorService,
boolean isHighestPriority) {
this.localFragmentInstanceId =
Validate.notNull(fragmentInstanceId, "fragment instance ID cannot be null");
this.fullFragmentInstanceId =
Expand All @@ -101,6 +114,7 @@ public SharedTsBlockQueue(
this.localMemoryManager =
Validate.notNull(localMemoryManager, "local memory manager cannot be null");
this.executorService = Validate.notNull(executorService, "ExecutorService can not be null.");
this.isHighestPriority = isHighestPriority;
}

public boolean hasNoMoreTsBlocks() {
Expand Down Expand Up @@ -195,15 +209,18 @@ public TsBlock remove() {
}
throw new IllegalStateException("queue has been destroyed");
}
TsBlock tsBlock = queue.remove();
localMemoryManager
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
fullFragmentInstanceId,
localPlanNodeId,
tsBlock.getSizeInBytes());
bufferRetainedSizeInBytes -= tsBlock.getSizeInBytes();
Pair<TsBlock, Long> tsBlockWithReservedBytes = queue.remove();
long reservedBytes = tsBlockWithReservedBytes.right;
if (reservedBytes > 0) {
localMemoryManager
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
fullFragmentInstanceId,
localPlanNodeId,
reservedBytes);
bufferRetainedSizeInBytes -= reservedBytes;
}
// Every time LocalSourceHandle consumes a TsBlock, it needs to send the event
// to
// corresponding LocalSinkChannel.
Expand All @@ -213,7 +230,7 @@ public TsBlock remove() {
if (blocked.isDone() && queue.isEmpty() && !noMoreTsBlocks) {
blocked = SettableFuture.create();
}
return tsBlock;
return tsBlockWithReservedBytes.left;
}

/**
Expand All @@ -236,20 +253,22 @@ public ListenableFuture<Void> add(TsBlock tsBlock) {
localFragmentInstanceId.queryId, fullFragmentInstanceId, localPlanNodeId);
alreadyRegistered = true;
}
Pair<ListenableFuture<Void>, Boolean> pair =
MemoryReservationResult reserveResult =
localMemoryManager
.getQueryPool()
.reserve(
.reserveWithPriority(
localFragmentInstanceId.getQueryId(),
fullFragmentInstanceId,
localPlanNodeId,
tsBlock.getSizeInBytes(),
maxBytesCanReserve);
blockedOnMemory = pair.left;
bufferRetainedSizeInBytes += tsBlock.getSizeInBytes();
maxBytesCanReserve,
isHighestPriority);
blockedOnMemory = reserveResult.getFuture();
long reservedBytes = reserveResult.getReservedBytes();
bufferRetainedSizeInBytes += reservedBytes;

// reserve memory failed, we should wait until there is enough memory
if (!Boolean.TRUE.equals(pair.right)) {
if (!reserveResult.isReserveSuccess()) {
SettableFuture<Void> channelBlocked = SettableFuture.create();
blockedOnMemory.addListener(
() -> {
Expand All @@ -264,7 +283,7 @@ public ListenableFuture<Void> add(TsBlock tsBlock) {
channelBlocked.set(null);
return;
}
queue.add(tsBlock);
queue.add(new Pair<>(tsBlock, reservedBytes));
if (!blocked.isDone()) {
blocked.set(null);
}
Expand All @@ -281,7 +300,7 @@ public ListenableFuture<Void> add(TsBlock tsBlock) {
executorService);
return channelBlocked;
} else { // reserve memory succeeded, add the TsBlock directly
queue.add(tsBlock);
queue.add(new Pair<>(tsBlock, reservedBytes));
if (!blocked.isDone()) {
blocked.set(null);
}
Expand Down
Loading
Loading