diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java index 96e54b7df9..e50e18a90a 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java @@ -514,6 +514,23 @@ public class AmoroManagementConf { .defaultValue(Duration.ofSeconds(3)) .withDescription("Optimizer polling task timeout."); + public static final ConfigOption OPTIMIZER_GROUP_MIN_PARALLELISM_CHECK_INTERVAL = + ConfigOptions.key("optimizer-group.min-parallelism-check-interval") + .durationType() + .defaultValue(Duration.ofMinutes(5)) + .withDescription( + "The interval for checking and ensuring the optimizer group meets its minimum parallelism requirement. " + + "When the current parallelism falls below the configured min-parallelism, " + + "the system will attempt to scale out optimizers at this interval. " + + "The actual scale-out timing is calculated as: consecutive keeping attempts * this interval."); + + public static final ConfigOption OPTIMIZER_GROUP_MAX_KEEPING_ATTEMPTS = + ConfigOptions.key("optimizer-group.max-keeping-attempts") + .intType() + .defaultValue(3) + .withDescription( + "The maximum number of consecutive attempts to keep the optimizer group at its current parallelism."); + public static final ConfigOption OPTIMIZING_REFRESH_GROUP_INTERVAL = ConfigOptions.key("self-optimizing.refresh-group-interval") .durationType() diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java index f3b13e69a6..a4550c8b2c 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java @@ -32,8 +32,13 @@ import org.apache.amoro.exception.IllegalTaskStateException; import org.apache.amoro.exception.ObjectNotExistsException; import org.apache.amoro.exception.PluginRetryAuthException; +import org.apache.amoro.resource.Resource; +import org.apache.amoro.resource.ResourceContainer; import org.apache.amoro.resource.ResourceGroup; +import org.apache.amoro.resource.ResourceType; import org.apache.amoro.server.catalog.CatalogManager; +import org.apache.amoro.server.dashboard.model.OptimizerResourceInfo; +import org.apache.amoro.server.manager.AbstractOptimizerContainer; import org.apache.amoro.server.optimizing.OptimizingProcess; import org.apache.amoro.server.optimizing.OptimizingQueue; import org.apache.amoro.server.optimizing.OptimizingStatus; @@ -43,6 +48,7 @@ import org.apache.amoro.server.persistence.mapper.ResourceMapper; import org.apache.amoro.server.persistence.mapper.TableProcessMapper; import org.apache.amoro.server.process.TableProcessMeta; +import org.apache.amoro.server.resource.Containers; import org.apache.amoro.server.resource.OptimizerInstance; import org.apache.amoro.server.resource.OptimizerManager; import org.apache.amoro.server.resource.OptimizerThread; @@ -89,6 +95,8 @@ public class DefaultOptimizingService extends StatedPersistentBase private static final Logger LOG = LoggerFactory.getLogger(DefaultOptimizingService.class); + private final long groupMinParallelismCheckInterval; + private final int groupMaxKeepingAttempts; private final long optimizerTouchTimeout; private final long taskAckTimeout; private final long taskExecuteTimeout; @@ -99,7 +107,9 @@ public class DefaultOptimizingService extends StatedPersistentBase private final Map optimizingQueueByGroup = new ConcurrentHashMap<>(); private final Map optimizingQueueByToken = new ConcurrentHashMap<>(); private final Map authOptimizers = new ConcurrentHashMap<>(); - private final OptimizerKeeper optimizerKeeper = new OptimizerKeeper(); + private final OptimizerKeeper optimizerKeeper = new OptimizerKeeper("optimizer-keeper-thread"); + private final OptimizerGroupKeeper optimizerGroupKeeper = + new OptimizerGroupKeeper("optimizer-group-keeper-thread"); private final OptimizingConfigWatcher optimizingConfigWatcher = new OptimizingConfigWatcher(); private final CatalogManager catalogManager; private final OptimizerManager optimizerManager; @@ -126,6 +136,11 @@ public DefaultOptimizingService( serviceConfig.getDurationInMillis(AmoroManagementConf.OPTIMIZER_POLLING_TIMEOUT); this.breakQuotaLimit = serviceConfig.getBoolean(AmoroManagementConf.OPTIMIZING_BREAK_QUOTA_LIMIT_ENABLED); + this.groupMinParallelismCheckInterval = + serviceConfig.getDurationInMillis( + AmoroManagementConf.OPTIMIZER_GROUP_MIN_PARALLELISM_CHECK_INTERVAL); + this.groupMaxKeepingAttempts = + serviceConfig.getInteger(AmoroManagementConf.OPTIMIZER_GROUP_MAX_KEEPING_ATTEMPTS); this.tableService = tableService; this.catalogManager = catalogManager; this.optimizerManager = optimizerManager; @@ -161,6 +176,7 @@ private void loadOptimizingQueues(List tableRuntimeList) { Optional.ofNullable(tableRuntimes).orElseGet(ArrayList::new), maxPlanningParallelism); optimizingQueueByGroup.put(groupName, optimizingQueue); + optimizerGroupKeeper.keepInTouch(groupName, 1); }); optimizers.forEach(optimizer -> registerOptimizer(optimizer, false)); groupToTableRuntimes @@ -329,7 +345,9 @@ public void createResourceGroup(ResourceGroup resourceGroup) { planExecutor, new ArrayList<>(), maxPlanningParallelism); - optimizingQueueByGroup.put(resourceGroup.getName(), optimizingQueue); + String groupName = resourceGroup.getName(); + optimizingQueueByGroup.put(groupName, optimizingQueue); + optimizerGroupKeeper.keepInTouch(groupName, 1); }); } @@ -350,6 +368,7 @@ public void dispose() { // dispose all queues optimizingQueueByGroup.values().forEach(OptimizingQueue::dispose); optimizerKeeper.dispose(); + optimizerGroupKeeper.dispose(); tableHandlerChain.dispose(); optimizingQueueByGroup.clear(); optimizingQueueByToken.clear(); @@ -409,6 +428,7 @@ protected void initHandler(List tableRuntimeList) { .map(t -> (DefaultTableRuntime) t) .collect(Collectors.toList())); optimizerKeeper.start(); + optimizerGroupKeeper.start(); optimizingConfigWatcher.start(); LOG.info("SuspendingDetector for Optimizer has been started."); LOG.info("OptimizerManagementService initializing has completed"); @@ -459,21 +479,16 @@ public OptimizerInstance getOptimizer() { } } - private class OptimizerKeeper implements Runnable { - - private volatile boolean stopped = false; - private final Thread thread = new Thread(this, "optimizer-keeper-thread"); - private final DelayQueue suspendingQueue = new DelayQueue<>(); + protected abstract class AbstractKeeper implements Runnable { + protected volatile boolean stopped = false; + protected final Thread thread = new Thread(this); + protected final DelayQueue suspendingQueue = new DelayQueue<>(); - public OptimizerKeeper() { + public AbstractKeeper(String threadName) { + thread.setName(threadName); thread.setDaemon(true); } - public void keepInTouch(OptimizerInstance optimizerInstance) { - Preconditions.checkNotNull(optimizerInstance, "token can not be null"); - suspendingQueue.add(new OptimizerKeepingTask(optimizerInstance)); - } - public void start() { thread.start(); } @@ -487,29 +502,48 @@ public void dispose() { public void run() { while (!stopped) { try { - OptimizerKeepingTask keepingTask = suspendingQueue.take(); - String token = keepingTask.getToken(); - boolean isExpired = !keepingTask.tryKeeping(); - Optional.ofNullable(keepingTask.getQueue()) - .ifPresent( - queue -> - queue - .collectTasks(buildSuspendingPredication(authOptimizers.keySet())) - .forEach(task -> retryTask(task, queue))); - if (isExpired) { - LOG.info("Optimizer {} has been expired, unregister it", keepingTask.getOptimizer()); - unregisterOptimizer(token); - } else { - LOG.debug("Optimizer {} is being touched, keep it", keepingTask.getOptimizer()); - keepInTouch(keepingTask.getOptimizer()); - } + T keepingTask = suspendingQueue.take(); + this.processTask(keepingTask); } catch (InterruptedException ignored) { } catch (Throwable t) { - LOG.error("OptimizerKeeper has encountered a problem.", t); + LOG.error("{} has encountered a problem.", this.getClass().getSimpleName(), t); } } } + protected abstract void processTask(T task) throws Exception; + } + + private class OptimizerKeeper extends AbstractKeeper { + + public OptimizerKeeper(String threadName) { + super(threadName); + } + + public void keepInTouch(OptimizerInstance optimizerInstance) { + Preconditions.checkNotNull(optimizerInstance, "token can not be null"); + suspendingQueue.add(new OptimizerKeepingTask(optimizerInstance)); + } + + @Override + protected void processTask(OptimizerKeepingTask keepingTask) { + String token = keepingTask.getToken(); + boolean isExpired = !keepingTask.tryKeeping(); + Optional.ofNullable(keepingTask.getQueue()) + .ifPresent( + queue -> + queue + .collectTasks(buildSuspendingPredication(authOptimizers.keySet())) + .forEach(task -> retryTask(task, queue))); + if (isExpired) { + LOG.info("Optimizer {} has been expired, unregister it", keepingTask.getOptimizer()); + unregisterOptimizer(token); + } else { + LOG.debug("Optimizer {} is being touched, keep it", keepingTask.getOptimizer()); + keepInTouch(keepingTask.getOptimizer()); + } + } + private void retryTask(TaskRuntime task, OptimizingQueue queue) { if (isTaskExecTimeout(task)) { LOG.warn( @@ -598,4 +632,148 @@ void dispose() { scheduler.shutdown(); } } + + private class OptimizerGroupKeepingTask implements Delayed { + + private final String groupName; + private final long lastCheckTime; + private final int attempts; + + public OptimizerGroupKeepingTask(String groupName, int attempts) { + this.groupName = groupName; + this.lastCheckTime = System.currentTimeMillis(); + this.attempts = attempts; + } + + @Override + public long getDelay(@NotNull TimeUnit unit) { + return unit.convert( + lastCheckTime + groupMinParallelismCheckInterval * attempts - System.currentTimeMillis(), + TimeUnit.MILLISECONDS); + } + + @Override + public int compareTo(@NotNull Delayed o) { + OptimizerGroupKeepingTask another = (OptimizerGroupKeepingTask) o; + return Long.compare(lastCheckTime, another.lastCheckTime); + } + + public int getMinParallelism(ResourceGroup resourceGroup) { + if (!resourceGroup + .getProperties() + .containsKey(OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM)) { + return 0; + } + String minParallelism = + resourceGroup.getProperties().get(OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM); + try { + return Integer.parseInt(minParallelism); + } catch (Throwable t) { + LOG.warn("Illegal minParallelism : {}, will use default value 0", minParallelism, t); + return 0; + } + } + + public int tryKeeping(ResourceGroup resourceGroup) { + List optimizers = optimizerManager.listOptimizers(groupName); + OptimizerResourceInfo optimizerResourceInfo = new OptimizerResourceInfo(); + optimizers.forEach( + e -> { + optimizerResourceInfo.addOccupationCore(e.getThreadCount()); + optimizerResourceInfo.addOccupationMemory(e.getMemoryMb()); + }); + return getMinParallelism(resourceGroup) - optimizerResourceInfo.getOccupationCore(); + } + + public ResourceGroup getResourceGroup() { + OptimizingQueue optimizingQueue = optimizingQueueByGroup.get(groupName); + if (optimizingQueue == null) { + return null; + } + return optimizingQueue.getOptimizerGroup(); + } + + public String getGroupName() { + return groupName; + } + + public int getAttempts() { + return attempts; + } + } + + /** + * Optimizer group keeper thread responsible for monitoring resource group status and + * automatically maintaining optimizer resources. + */ + private class OptimizerGroupKeeper extends AbstractKeeper { + + public OptimizerGroupKeeper(String threadName) { + super(threadName); + } + + public void keepInTouch(String groupName, int attempts) { + Preconditions.checkNotNull(groupName, "groupName can not be null"); + Preconditions.checkArgument(attempts > 0, "attempts must be greater than 0"); + if (this.stopped) { + return; + } + suspendingQueue.add(new OptimizerGroupKeepingTask(groupName, attempts)); + } + + @Override + protected void processTask(OptimizerGroupKeepingTask keepingTask) { + ResourceGroup resourceGroup = keepingTask.getResourceGroup(); + if (resourceGroup == null) { + LOG.warn( + "ResourceGroup:{} may have been deleted, stop keeping it", keepingTask.getGroupName()); + return; + } + + int requiredCores = keepingTask.tryKeeping(resourceGroup); + if (requiredCores <= 0) { + LOG.debug( + "The Resource Group:{} has sufficient resources, keep it", resourceGroup.getName()); + keepInTouch(resourceGroup.getName(), 1); + return; + } + + if (keepingTask.getAttempts() > groupMaxKeepingAttempts) { + int minParallelism = keepingTask.getMinParallelism(resourceGroup); + LOG.warn( + "Resource Group:{}, creating optimizer {} times in a row, optimizers still below min-parallel:{}, will reset min-parallel to {}", + resourceGroup.getName(), + keepingTask.getAttempts(), + minParallelism, + minParallelism - requiredCores); + resourceGroup + .getProperties() + .put( + OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM, + String.valueOf(minParallelism - requiredCores)); + updateResourceGroup(resourceGroup); + optimizerManager.updateResourceGroup(resourceGroup); + keepInTouch(resourceGroup.getName(), 1); + return; + } + + Resource resource = + new Resource.Builder( + resourceGroup.getContainer(), resourceGroup.getName(), ResourceType.OPTIMIZER) + .setProperties(resourceGroup.getProperties()) + .setThreadCount(requiredCores) + .build(); + ResourceContainer rc = Containers.get(resource.getContainerName()); + try { + ((AbstractOptimizerContainer) rc).requestResource(resource); + optimizerManager.createResource(resource); + } finally { + keepInTouch(resourceGroup.getName(), keepingTask.getAttempts() + 1); + } + LOG.info( + "Resource Group:{} has insufficient resources, created an optimizer with parallelism of {}", + resourceGroup.getName(), + requiredCores); + } + } } diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java index 9f3d25b3f7..12cb5bce74 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java @@ -53,6 +53,9 @@ public static void initTableService() { configurations.set(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT, Duration.ofMillis(800L)); configurations.set( AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT, Duration.ofMillis(30000L)); + configurations.set( + AmoroManagementConf.OPTIMIZER_GROUP_MIN_PARALLELISM_CHECK_INTERVAL, + Duration.ofMillis(10L)); TABLE_SERVICE = new DefaultTableService( new Configurations(), CATALOG_MANAGER, tableRuntimeFactoryManager); diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/TestOptimizerGroupKeeper.java b/amoro-ams/src/test/java/org/apache/amoro/server/TestOptimizerGroupKeeper.java new file mode 100644 index 0000000000..72ff587530 --- /dev/null +++ b/amoro-ams/src/test/java/org/apache/amoro/server/TestOptimizerGroupKeeper.java @@ -0,0 +1,377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server; + +import static org.apache.amoro.server.AmoroManagementConf.OPTIMIZER_GROUP_MAX_KEEPING_ATTEMPTS; + +import org.apache.amoro.BasicTableTestHelper; +import org.apache.amoro.OptimizerProperties; +import org.apache.amoro.TableFormat; +import org.apache.amoro.TableTestHelper; +import org.apache.amoro.api.OptimizerRegisterInfo; +import org.apache.amoro.catalog.BasicCatalogTestHelper; +import org.apache.amoro.catalog.CatalogTestHelper; +import org.apache.amoro.resource.Resource; +import org.apache.amoro.resource.ResourceContainer; +import org.apache.amoro.resource.ResourceGroup; +import org.apache.amoro.server.manager.AbstractOptimizerContainer; +import org.apache.amoro.server.resource.ContainerMetadata; +import org.apache.amoro.server.resource.Containers; +import org.apache.amoro.server.resource.OptimizerInstance; +import org.apache.amoro.server.table.AMSTableTestBase; +import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; +import org.apache.iceberg.common.DynFields; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +@RunWith(Parameterized.class) +public class TestOptimizerGroupKeeper extends AMSTableTestBase { + + private static final String TEST_GROUP_NAME = "test-keeper-group"; + private static final String MOCK_CONTAINER_NAME = "mock-container"; + + // Control flags for mock container behavior + private final AtomicBoolean resourceAvailable = new AtomicBoolean(true); + private final AtomicInteger scaleOutCallCount = new AtomicInteger(0); + // Function to register optimizer (will call authenticate) + private Function optimizerRegistrar; + private static boolean originIsInitialized = false; + // Track the current test's group name for cleanup + private String currentGroupName; + + public TestOptimizerGroupKeeper( + CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) { + super(catalogTestHelper, tableTestHelper, false); + } + + @Parameterized.Parameters(name = "{0}, {1}") + public static Object[] parameters() { + return new Object[][] { + {new BasicCatalogTestHelper(TableFormat.ICEBERG), new BasicTableTestHelper(true, false)} + }; + } + + @Before + public void prepare() throws Exception { + optimizerRegistrar = registerInfo -> optimizingService().authenticate(registerInfo); + setupMockContainer(); + } + + @After + public void clear() { + if (currentGroupName == null) { + return; + } + try { + // Clean up optimizers + optimizerManager() + .listOptimizers(currentGroupName) + .forEach( + optimizer -> + optimizingService() + .deleteOptimizer(optimizer.getGroupName(), optimizer.getResourceId())); + // Delete resource group from optimizing service first (this will dispose and unregister + // metrics) + try { + optimizingService().deleteResourceGroup(currentGroupName); + } catch (Exception ignored) { + } + // Then delete from optimizer manager + try { + optimizerManager().deleteResourceGroup(currentGroupName); + } catch (Exception ignored) { + } + } catch (Exception e) { + // ignore + } finally { + currentGroupName = null; + } + } + + @AfterClass + public static void cleanup() { + if (!originIsInitialized) { + DynFields.UnboundField initializedField = + DynFields.builder().hiddenImpl(Containers.class, "isInitialized").build(); + initializedField.asStatic().set(false); + } + } + + /** Setup mock container and inject it into Containers using reflection. */ + private void setupMockContainer() throws Exception { + MockOptimizerContainer mockContainer = + new MockOptimizerContainer(resourceAvailable, scaleOutCallCount, optimizerRegistrar); + + // Use reflection to set isInitialized to true + DynFields.UnboundField initializedField = + DynFields.builder().hiddenImpl(Containers.class, "isInitialized").build(); + if (!initializedField.asStatic().get()) { + originIsInitialized = false; + initializedField.asStatic().set(true); + } + + // Use reflection to inject mock container into Containers + DynFields.UnboundField> containersField = + DynFields.builder().hiddenImpl(Containers.class, "globalContainers").build(); + Map globalContainers = containersField.asStatic().get(); + + // Create ContainerWrapper using reflection + ContainerMetadata metadata = + new ContainerMetadata(MOCK_CONTAINER_NAME, MockOptimizerContainer.class.getName()); + Map properties = Maps.newHashMap(); + properties.put(OptimizerProperties.AMS_HOME, "/tmp"); + properties.put(OptimizerProperties.AMS_OPTIMIZER_URI, "thrift://localhost:1261"); + properties.put("memory", "1024"); + metadata.setProperties(properties); + + // Create ContainerWrapper with pre-initialized container + Class wrapperClass = + Class.forName("org.apache.amoro.server.resource.Containers$ContainerWrapper"); + // Get the two-parameter constructor: ContainerWrapper(ContainerMetadata, ResourceContainer) + java.lang.reflect.Constructor constructor = + wrapperClass.getDeclaredConstructor(ContainerMetadata.class, ResourceContainer.class); + constructor.setAccessible(true); + Object wrapper = constructor.newInstance(metadata, mockContainer); + globalContainers.put(MOCK_CONTAINER_NAME, wrapper); + } + + private ResourceGroup buildTestResourceGroup(String groupName, int minParallelism) { + // Track the group name for cleanup + this.currentGroupName = groupName; + Map properties = Maps.newHashMap(); + properties.put( + OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM, String.valueOf(minParallelism)); + properties.put("memory", "1024"); + return new ResourceGroup.Builder(groupName, MOCK_CONTAINER_NAME) + .addProperties(properties) + .build(); + } + + /** + * Test scenario 1: When resources are available, optimizer will be auto-allocated. + * + *

When min-parallelism > current optimizer cores and resources are available, + * OptimizerGroupKeeper should automatically create new optimizer instances. + */ + @Test + public void testOptimizerAutoAllocatedWhenResourceAvailable() throws InterruptedException { + resourceAvailable.set(true); + scaleOutCallCount.set(0); + ResourceGroup resourceGroup = buildTestResourceGroup(TEST_GROUP_NAME + "-1", 2); + + optimizerManager().createResourceGroup(resourceGroup); + optimizingService().createResourceGroup(resourceGroup); + + // Wait for OptimizerGroupKeeper to detect and create optimizer + // OPTIMIZER_GROUP_MIN_PARALLELISM_CHECK_INTERVAL is set to 10ms, call intervals are 10, 20, 30, + // 40, 10, 200ms can cover abnormal scenarios + Thread.sleep(200); + + int totalCores = + optimizerManager().listOptimizers(resourceGroup.getName()).stream() + .mapToInt(OptimizerInstance::getThreadCount) + .sum(); + + Assertions.assertEquals( + 1, + scaleOutCallCount.get(), + resourceGroup.getName() + + ":One scale-out should be triggered when min-parallelism is not satisfied"); + Assertions.assertEquals( + 2, + totalCores, + resourceGroup.getName() + + ":OptimizerGroupKeeper should attempt to create optimizer when resources are needed"); + } + + /** + * Test scenario 2: When min-parallelism is already satisfied, optimizer will not be allocated. + * + *

When current optimizer cores >= min-parallelism, OptimizerGroupKeeper should not trigger any + * scale-out operation. + */ + @Test + public void testNoAllocationWhenMinParallelismSatisfied() throws InterruptedException { + resourceAvailable.set(true); + scaleOutCallCount.set(0); + ResourceGroup resourceGroup = buildTestResourceGroup(TEST_GROUP_NAME + "-2", 2); + + optimizerManager().createResourceGroup(resourceGroup); + optimizingService().createResourceGroup(resourceGroup); + + // Register an optimizer with 3 threads (exceeds min-parallelism of 2) + OptimizerRegisterInfo registerInfo = buildRegisterInfo(resourceGroup.getName(), 3); + String testToken = optimizingService().authenticate(registerInfo); + Assertions.assertNotNull(testToken, "Optimizer should be registered successfully"); + + Thread.sleep(200); + + // Verify no scale-out was triggered since min-parallelism is satisfied + Assertions.assertEquals( + 0, + scaleOutCallCount.get(), + resourceGroup.getName() + + ":No scale-out should be triggered when min-parallelism is already satisfied"); + } + + /** + * Test scenario 3: When no resources available, min-parallelism will be reset to 0. + * + *

When OptimizerGroupKeeper fails to create optimizer multiple times (exceeds max attempts), + * and there are no existing optimizers, it will reset min-parallelism to 0. + */ + @Test + public void testMinParallelismResetToZeroWhenNoResource() throws InterruptedException { + // Set resource not available - container will throw exception on scaleOut + resourceAvailable.set(false); + scaleOutCallCount.set(0); + ResourceGroup resourceGroup = buildTestResourceGroup(TEST_GROUP_NAME + "-3", 2); + + optimizerManager().createResourceGroup(resourceGroup); + optimizingService().createResourceGroup(resourceGroup); + + Thread.sleep(200); + Assertions.assertEquals( + OPTIMIZER_GROUP_MAX_KEEPING_ATTEMPTS.defaultValue(), + scaleOutCallCount.get(), + resourceGroup.getName() + + ":max scale-out attempts should be exhausted when no resources available"); + ResourceGroup updatedGroup = optimizerManager().getResourceGroup(resourceGroup.getName()); + String minParallelismStr = + updatedGroup.getProperties().get(OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM); + Assertions.assertEquals( + "0", + minParallelismStr, + resourceGroup.getName() + + ":min-parallelism should be reset to 0 when no resources available and no optimizer exists"); + } + + /** + * Test scenario 4: When no resources but has optimizer, min-parallelism will be reset to + * optimizer's executionParallel. + * + *

When OptimizerGroupKeeper fails to create optimizer multiple times and there are existing + * optimizers but not enough to meet min-parallelism, it will reset min-parallelism to the current + * total cores. + */ + @Test + public void testMinParallelismResetToOptimizerParallelWhenNoMoreResource() + throws InterruptedException { + resourceAvailable.set(false); + scaleOutCallCount.set(0); + ResourceGroup resourceGroup = buildTestResourceGroup(TEST_GROUP_NAME + "-4", 2); + + optimizerManager().createResourceGroup(resourceGroup); + optimizingService().createResourceGroup(resourceGroup); + + OptimizerRegisterInfo registerInfo = buildRegisterInfo(resourceGroup.getName(), 1); + String testToken = optimizingService().authenticate(registerInfo); + Assertions.assertNotNull(testToken, "Optimizer should be registered successfully"); + + Thread.sleep(200); + + ResourceGroup updatedGroup = optimizerManager().getResourceGroup(resourceGroup.getName()); + String minParallelismStr = + updatedGroup.getProperties().get(OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM); + Assertions.assertEquals( + OPTIMIZER_GROUP_MAX_KEEPING_ATTEMPTS.defaultValue(), + scaleOutCallCount.get(), + resourceGroup.getName() + + ":max scale-out attempts should be exhausted when no resources available"); + Assertions.assertEquals( + "1", + minParallelismStr, + resourceGroup.getName() + + ":min-parallelism should be reset to optimizer's current total cores (1) when no more resources available"); + } + + private static OptimizerRegisterInfo buildRegisterInfo(String groupName, int threadCount) { + OptimizerRegisterInfo registerInfo = new OptimizerRegisterInfo(); + Map registerProperties = Maps.newHashMap(); + registerProperties.put(OptimizerProperties.OPTIMIZER_HEART_BEAT_INTERVAL, "100"); + registerInfo.setProperties(registerProperties); + registerInfo.setThreadCount(threadCount); + registerInfo.setMemoryMb(1024); + registerInfo.setGroupName(groupName); + registerInfo.setResourceId("test-resource-" + System.currentTimeMillis() + "-" + threadCount); + registerInfo.setStartTime(System.currentTimeMillis()); + return registerInfo; + } + + /** + * Mock optimizer container for testing. + * + *

Simulates resource availability by controlling doScaleOut behavior: + * + *

    + *
  • When resourceAvailable=true: calls authenticate to register optimizer + *
  • When resourceAvailable=false: throw RuntimeException + *
+ */ + public static class MockOptimizerContainer extends AbstractOptimizerContainer { + + private final AtomicBoolean resourceAvailable; + private final AtomicInteger scaleOutCallCount; + private final Function optimizerRegistrar; + + public MockOptimizerContainer( + AtomicBoolean resourceAvailable, + AtomicInteger scaleOutCallCount, + Function optimizerRegistrar) { + this.resourceAvailable = resourceAvailable; + this.scaleOutCallCount = scaleOutCallCount; + this.optimizerRegistrar = optimizerRegistrar; + } + + @Override + public void init(String name, Map containerProperties) {} + + @Override + protected Map doScaleOut(Resource resource) { + scaleOutCallCount.incrementAndGet(); + if (!resourceAvailable.get()) { + throw new RuntimeException("No resources available"); + } + // When resources are available, register optimizer by calling authenticate + // This simulates the real behavior where SparkOptimizerContainer starts SparkOptimizer, + // which uses OptimizerToucher to call authenticate + if (optimizerRegistrar != null) { + OptimizerRegisterInfo registerInfo = + buildRegisterInfo(resource.getGroupName(), resource.getThreadCount()); + registerInfo.setMemoryMb(resource.getMemoryMb()); + registerInfo.setResourceId(resource.getResourceId()); + optimizerRegistrar.apply(registerInfo); + } + return Maps.newHashMap(); + } + + @Override + public void releaseResource(Resource resource) {} + } +} diff --git a/amoro-common/src/main/java/org/apache/amoro/OptimizerProperties.java b/amoro-common/src/main/java/org/apache/amoro/OptimizerProperties.java index bea9b90f9f..a8d04a1235 100644 --- a/amoro-common/src/main/java/org/apache/amoro/OptimizerProperties.java +++ b/amoro-common/src/main/java/org/apache/amoro/OptimizerProperties.java @@ -32,6 +32,7 @@ public class OptimizerProperties { public static final String OPTIMIZER_EXECUTION_PARALLEL = "execution-parallel"; public static final String OPTIMIZER_MEMORY_SIZE = "memory-size"; public static final String OPTIMIZER_GROUP_NAME = "group-name"; + public static final String OPTIMIZER_GROUP_MIN_PARALLELISM = "min-parallelism"; public static final String OPTIMIZER_HEART_BEAT_INTERVAL = "heart-beat-interval"; public static final String OPTIMIZER_EXTEND_DISK_STORAGE = "extend-disk-storage"; public static final boolean OPTIMIZER_EXTEND_DISK_STORAGE_DEFAULT = false; diff --git a/docs/configuration/ams-config.md b/docs/configuration/ams-config.md index 24b518d0c1..0f5f272da9 100644 --- a/docs/configuration/ams-config.md +++ b/docs/configuration/ams-config.md @@ -90,6 +90,8 @@ table td:last-child, table th:last-child { width: 40%; word-break: break-all; } | http-server.proxy-client-ip-header | X-Real-IP | The HTTP header to record the real client IP address. If your server is behind a load balancer or other proxy, the server will see this load balancer or proxy IP address as the client IP address, to get around this common issue, most load balancers or proxies offer the ability to record the real remote IP address in an HTTP header that will be added to the request for other devices to use. | | http-server.rest-auth-type | token | The authentication used by REST APIs, token (default), basic or jwt. | | http-server.session-timeout | 7 d | Timeout for http session. | +| optimizer-group.max-keeping-attempts | 3 | The maximum number of consecutive attempts to keep the optimizer group at its current parallelism. | +| optimizer-group.min-parallelism-check-interval | 5 min | The interval for checking and ensuring the optimizer group meets its minimum parallelism requirement. When the current parallelism falls below the configured min-parallelism, the system will attempt to scale out optimizers at this interval. The actual scale-out timing is calculated as: consecutive keeping attempts * this interval. | | optimizer.heart-beat-timeout | 1 min | Timeout duration for Optimizer heartbeat. | | optimizer.max-planning-parallelism | 1 | Max planning parallelism in one optimizer group. | | optimizer.polling-timeout | 3 s | Optimizer polling task timeout. |