Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,23 @@ public class AmoroManagementConf {
.defaultValue(Duration.ofSeconds(3))
.withDescription("Optimizer polling task timeout.");

public static final ConfigOption<Duration> OPTIMIZER_GROUP_MIN_PARALLELISM_CHECK_INTERVAL =
ConfigOptions.key("optimizer-group.min-parallelism-check-interval")
.durationType()
.defaultValue(Duration.ofMinutes(5))
.withDescription(
"The interval for checking and ensuring the optimizer group meets its minimum parallelism requirement. "
+ "When the current parallelism falls below the configured min-parallelism, "
+ "the system will attempt to scale out optimizers at this interval. "
+ "The actual scale-out timing is calculated as: consecutive keeping attempts * this interval.");

public static final ConfigOption<Integer> OPTIMIZER_GROUP_MAX_KEEPING_ATTEMPTS =
ConfigOptions.key("optimizer-group.max-keeping-attempts")
.intType()
.defaultValue(3)
.withDescription(
"The maximum number of consecutive attempts to keep the optimizer group at its current parallelism.");

public static final ConfigOption<Duration> OPTIMIZING_REFRESH_GROUP_INTERVAL =
ConfigOptions.key("self-optimizing.refresh-group-interval")
.durationType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,13 @@
import org.apache.amoro.exception.IllegalTaskStateException;
import org.apache.amoro.exception.ObjectNotExistsException;
import org.apache.amoro.exception.PluginRetryAuthException;
import org.apache.amoro.resource.Resource;
import org.apache.amoro.resource.ResourceContainer;
import org.apache.amoro.resource.ResourceGroup;
import org.apache.amoro.resource.ResourceType;
import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.dashboard.model.OptimizerResourceInfo;
import org.apache.amoro.server.manager.AbstractOptimizerContainer;
import org.apache.amoro.server.optimizing.OptimizingProcess;
import org.apache.amoro.server.optimizing.OptimizingQueue;
import org.apache.amoro.server.optimizing.OptimizingStatus;
Expand All @@ -43,6 +48,7 @@
import org.apache.amoro.server.persistence.mapper.ResourceMapper;
import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
import org.apache.amoro.server.process.TableProcessMeta;
import org.apache.amoro.server.resource.Containers;
import org.apache.amoro.server.resource.OptimizerInstance;
import org.apache.amoro.server.resource.OptimizerManager;
import org.apache.amoro.server.resource.OptimizerThread;
Expand Down Expand Up @@ -89,6 +95,8 @@ public class DefaultOptimizingService extends StatedPersistentBase

private static final Logger LOG = LoggerFactory.getLogger(DefaultOptimizingService.class);

private final long groupMinParallelismCheckInterval;
private final int groupMaxKeepingAttempts;
private final long optimizerTouchTimeout;
private final long taskAckTimeout;
private final long taskExecuteTimeout;
Expand All @@ -99,7 +107,9 @@ public class DefaultOptimizingService extends StatedPersistentBase
private final Map<String, OptimizingQueue> optimizingQueueByGroup = new ConcurrentHashMap<>();
private final Map<String, OptimizingQueue> optimizingQueueByToken = new ConcurrentHashMap<>();
private final Map<String, OptimizerInstance> authOptimizers = new ConcurrentHashMap<>();
private final OptimizerKeeper optimizerKeeper = new OptimizerKeeper();
private final OptimizerKeeper optimizerKeeper = new OptimizerKeeper("optimizer-keeper-thread");
private final OptimizerGroupKeeper optimizerGroupKeeper =
new OptimizerGroupKeeper("optimizer-group-keeper-thread");
private final OptimizingConfigWatcher optimizingConfigWatcher = new OptimizingConfigWatcher();
private final CatalogManager catalogManager;
private final OptimizerManager optimizerManager;
Expand All @@ -126,6 +136,11 @@ public DefaultOptimizingService(
serviceConfig.getDurationInMillis(AmoroManagementConf.OPTIMIZER_POLLING_TIMEOUT);
this.breakQuotaLimit =
serviceConfig.getBoolean(AmoroManagementConf.OPTIMIZING_BREAK_QUOTA_LIMIT_ENABLED);
this.groupMinParallelismCheckInterval =
serviceConfig.getDurationInMillis(
AmoroManagementConf.OPTIMIZER_GROUP_MIN_PARALLELISM_CHECK_INTERVAL);
this.groupMaxKeepingAttempts =
serviceConfig.getInteger(AmoroManagementConf.OPTIMIZER_GROUP_MAX_KEEPING_ATTEMPTS);
this.tableService = tableService;
this.catalogManager = catalogManager;
this.optimizerManager = optimizerManager;
Expand Down Expand Up @@ -161,6 +176,7 @@ private void loadOptimizingQueues(List<DefaultTableRuntime> tableRuntimeList) {
Optional.ofNullable(tableRuntimes).orElseGet(ArrayList::new),
maxPlanningParallelism);
optimizingQueueByGroup.put(groupName, optimizingQueue);
optimizerGroupKeeper.keepInTouch(groupName, 1);
});
optimizers.forEach(optimizer -> registerOptimizer(optimizer, false));
groupToTableRuntimes
Expand Down Expand Up @@ -329,7 +345,9 @@ public void createResourceGroup(ResourceGroup resourceGroup) {
planExecutor,
new ArrayList<>(),
maxPlanningParallelism);
optimizingQueueByGroup.put(resourceGroup.getName(), optimizingQueue);
String groupName = resourceGroup.getName();
optimizingQueueByGroup.put(groupName, optimizingQueue);
optimizerGroupKeeper.keepInTouch(groupName, 1);
});
}

Expand All @@ -350,6 +368,7 @@ public void dispose() {
// dispose all queues
optimizingQueueByGroup.values().forEach(OptimizingQueue::dispose);
optimizerKeeper.dispose();
optimizerGroupKeeper.dispose();
tableHandlerChain.dispose();
optimizingQueueByGroup.clear();
optimizingQueueByToken.clear();
Expand Down Expand Up @@ -409,6 +428,7 @@ protected void initHandler(List<TableRuntime> tableRuntimeList) {
.map(t -> (DefaultTableRuntime) t)
.collect(Collectors.toList()));
optimizerKeeper.start();
optimizerGroupKeeper.start();
optimizingConfigWatcher.start();
LOG.info("SuspendingDetector for Optimizer has been started.");
LOG.info("OptimizerManagementService initializing has completed");
Expand Down Expand Up @@ -459,21 +479,16 @@ public OptimizerInstance getOptimizer() {
}
}

private class OptimizerKeeper implements Runnable {

private volatile boolean stopped = false;
private final Thread thread = new Thread(this, "optimizer-keeper-thread");
private final DelayQueue<OptimizerKeepingTask> suspendingQueue = new DelayQueue<>();
protected abstract class AbstractKeeper<T extends Delayed> implements Runnable {
protected volatile boolean stopped = false;
protected final Thread thread = new Thread(this);
protected final DelayQueue<T> suspendingQueue = new DelayQueue<>();

public OptimizerKeeper() {
public AbstractKeeper(String threadName) {
thread.setName(threadName);
thread.setDaemon(true);
}

public void keepInTouch(OptimizerInstance optimizerInstance) {
Preconditions.checkNotNull(optimizerInstance, "token can not be null");
suspendingQueue.add(new OptimizerKeepingTask(optimizerInstance));
}

public void start() {
thread.start();
}
Expand All @@ -487,29 +502,48 @@ public void dispose() {
public void run() {
while (!stopped) {
try {
OptimizerKeepingTask keepingTask = suspendingQueue.take();
String token = keepingTask.getToken();
boolean isExpired = !keepingTask.tryKeeping();
Optional.ofNullable(keepingTask.getQueue())
.ifPresent(
queue ->
queue
.collectTasks(buildSuspendingPredication(authOptimizers.keySet()))
.forEach(task -> retryTask(task, queue)));
if (isExpired) {
LOG.info("Optimizer {} has been expired, unregister it", keepingTask.getOptimizer());
unregisterOptimizer(token);
} else {
LOG.debug("Optimizer {} is being touched, keep it", keepingTask.getOptimizer());
keepInTouch(keepingTask.getOptimizer());
}
T keepingTask = suspendingQueue.take();
this.processTask(keepingTask);
} catch (InterruptedException ignored) {
} catch (Throwable t) {
LOG.error("OptimizerKeeper has encountered a problem.", t);
LOG.error("{} has encountered a problem.", this.getClass().getSimpleName(), t);
}
}
}

protected abstract void processTask(T task) throws Exception;
}

private class OptimizerKeeper extends AbstractKeeper<OptimizerKeepingTask> {

public OptimizerKeeper(String threadName) {
super(threadName);
}

public void keepInTouch(OptimizerInstance optimizerInstance) {
Preconditions.checkNotNull(optimizerInstance, "token can not be null");
suspendingQueue.add(new OptimizerKeepingTask(optimizerInstance));
}

@Override
protected void processTask(OptimizerKeepingTask keepingTask) {
String token = keepingTask.getToken();
boolean isExpired = !keepingTask.tryKeeping();
Optional.ofNullable(keepingTask.getQueue())
.ifPresent(
queue ->
queue
.collectTasks(buildSuspendingPredication(authOptimizers.keySet()))
.forEach(task -> retryTask(task, queue)));
if (isExpired) {
LOG.info("Optimizer {} has been expired, unregister it", keepingTask.getOptimizer());
unregisterOptimizer(token);
} else {
LOG.debug("Optimizer {} is being touched, keep it", keepingTask.getOptimizer());
keepInTouch(keepingTask.getOptimizer());
}
}

private void retryTask(TaskRuntime<?> task, OptimizingQueue queue) {
if (isTaskExecTimeout(task)) {
LOG.warn(
Expand Down Expand Up @@ -598,4 +632,148 @@ void dispose() {
scheduler.shutdown();
}
}

private class OptimizerGroupKeepingTask implements Delayed {

private final String groupName;
private final long lastCheckTime;
private final int attempts;

public OptimizerGroupKeepingTask(String groupName, int attempts) {
this.groupName = groupName;
this.lastCheckTime = System.currentTimeMillis();
this.attempts = attempts;
}

@Override
public long getDelay(@NotNull TimeUnit unit) {
return unit.convert(
lastCheckTime + groupMinParallelismCheckInterval * attempts - System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
}

@Override
public int compareTo(@NotNull Delayed o) {
OptimizerGroupKeepingTask another = (OptimizerGroupKeepingTask) o;
return Long.compare(lastCheckTime, another.lastCheckTime);
}

public int getMinParallelism(ResourceGroup resourceGroup) {
if (!resourceGroup
.getProperties()
.containsKey(OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM)) {
return 0;
}
String minParallelism =
resourceGroup.getProperties().get(OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM);
try {
return Integer.parseInt(minParallelism);
} catch (Throwable t) {
LOG.warn("Illegal minParallelism : {}, will use default value 0", minParallelism, t);
return 0;
}
}

public int tryKeeping(ResourceGroup resourceGroup) {
List<OptimizerInstance> optimizers = optimizerManager.listOptimizers(groupName);
OptimizerResourceInfo optimizerResourceInfo = new OptimizerResourceInfo();
optimizers.forEach(
e -> {
optimizerResourceInfo.addOccupationCore(e.getThreadCount());
optimizerResourceInfo.addOccupationMemory(e.getMemoryMb());
});
return getMinParallelism(resourceGroup) - optimizerResourceInfo.getOccupationCore();
}

public ResourceGroup getResourceGroup() {
OptimizingQueue optimizingQueue = optimizingQueueByGroup.get(groupName);
if (optimizingQueue == null) {
return null;
}
return optimizingQueue.getOptimizerGroup();
}

public String getGroupName() {
return groupName;
}

public int getAttempts() {
return attempts;
}
}

/**
* Optimizer group keeper thread responsible for monitoring resource group status and
* automatically maintaining optimizer resources.
*/
private class OptimizerGroupKeeper extends AbstractKeeper<OptimizerGroupKeepingTask> {

public OptimizerGroupKeeper(String threadName) {
super(threadName);
}

public void keepInTouch(String groupName, int attempts) {
Preconditions.checkNotNull(groupName, "groupName can not be null");
Preconditions.checkArgument(attempts > 0, "attempts must be greater than 0");
if (this.stopped) {
return;
}
suspendingQueue.add(new OptimizerGroupKeepingTask(groupName, attempts));
}

@Override
protected void processTask(OptimizerGroupKeepingTask keepingTask) {
ResourceGroup resourceGroup = keepingTask.getResourceGroup();
if (resourceGroup == null) {
LOG.warn(
"ResourceGroup:{} may have been deleted, stop keeping it", keepingTask.getGroupName());
return;
}

int requiredCores = keepingTask.tryKeeping(resourceGroup);
if (requiredCores <= 0) {
LOG.debug(
"The Resource Group:{} has sufficient resources, keep it", resourceGroup.getName());
keepInTouch(resourceGroup.getName(), 1);
return;
}

if (keepingTask.getAttempts() > groupMaxKeepingAttempts) {
int minParallelism = keepingTask.getMinParallelism(resourceGroup);
LOG.warn(
"Resource Group:{}, creating optimizer {} times in a row, optimizers still below min-parallel:{}, will reset min-parallel to {}",
resourceGroup.getName(),
keepingTask.getAttempts(),
minParallelism,
minParallelism - requiredCores);
resourceGroup
.getProperties()
.put(
OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM,
String.valueOf(minParallelism - requiredCores));
updateResourceGroup(resourceGroup);
optimizerManager.updateResourceGroup(resourceGroup);
keepInTouch(resourceGroup.getName(), 1);
return;
}

Resource resource =
new Resource.Builder(
resourceGroup.getContainer(), resourceGroup.getName(), ResourceType.OPTIMIZER)
.setProperties(resourceGroup.getProperties())
.setThreadCount(requiredCores)
.build();
ResourceContainer rc = Containers.get(resource.getContainerName());
try {
((AbstractOptimizerContainer) rc).requestResource(resource);
optimizerManager.createResource(resource);
} finally {
keepInTouch(resourceGroup.getName(), keepingTask.getAttempts() + 1);
}
LOG.info(
"Resource Group:{} has insufficient resources, created an optimizer with parallelism of {}",
resourceGroup.getName(),
requiredCores);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public static void initTableService() {
configurations.set(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT, Duration.ofMillis(800L));
configurations.set(
AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT, Duration.ofMillis(30000L));
configurations.set(
AmoroManagementConf.OPTIMIZER_GROUP_MIN_PARALLELISM_CHECK_INTERVAL,
Duration.ofMillis(10L));
TABLE_SERVICE =
new DefaultTableService(
new Configurations(), CATALOG_MANAGER, tableRuntimeFactoryManager);
Expand Down
Loading
Loading