From 558a2fcc48bc2690d3bfac314da6c4429fc747ca Mon Sep 17 00:00:00 2001 From: "zhangyongxiang.alpha" Date: Wed, 11 Feb 2026 20:19:15 +0800 Subject: [PATCH 1/2] Provide a simpler way to extend process of formats. --- .../amoro/server/AmoroServiceContainer.java | 12 +- .../mapper/ProcessStateMapper.java | 87 ------ .../process/ActionCoordinatorScheduler.java | 11 +- .../amoro/server/process/ProcessService.java | 56 +--- .../server/process/TableProcessMeta.java | 20 -- .../executor/ExecuteEngineManager.java | 28 ++ .../scheduler/PeriodicExternalScheduler.java | 278 ------------------ .../server/table/AbstractTableRuntime.java | 62 +++- .../server/table/DefaultTableRuntime.java | 72 +---- .../table/DefaultTableRuntimeFactory.java | 7 + .../table/simple/SimpleActionCoordinator.java | 108 +++++++ .../table/simple/SimpleTableRuntime.java | 35 +++ .../simple/SimpleTableRuntimeFactory.java | 173 +++++++++++ .../main/resources/mysql/ams-mysql-init.sql | 14 - .../src/main/resources/mysql/upgrade.sql | 12 - .../amoro/server/AMSServiceTestBase.java | 12 +- .../server/process/MockActionCoordinator.java | 22 +- .../main/java/org/apache/amoro/Action.java | 41 ++- .../java/org/apache/amoro/IcebergActions.java | 10 +- .../java/org/apache/amoro/TableRuntime.java | 4 + .../amoro}/process/ActionCoordinator.java | 39 +-- .../apache/amoro/process/AmoroProcess.java | 2 +- .../apache/amoro/process/OptimizingState.java | 80 ----- .../apache/amoro/process/ProcessFactory.java | 14 +- .../apache/amoro/process/ProcessState.java | 77 ----- .../amoro/process/TableProcessState.java | 224 -------------- .../amoro/table/TableRuntimeFactory.java | 3 + .../conf/plugins/table-runtime-factories.yaml | 12 + 28 files changed, 511 insertions(+), 1004 deletions(-) delete mode 100644 amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/ProcessStateMapper.java create mode 100644 amoro-ams/src/main/java/org/apache/amoro/server/process/executor/ExecuteEngineManager.java delete mode 100644 amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicExternalScheduler.java create mode 100644 amoro-ams/src/main/java/org/apache/amoro/server/table/simple/SimpleActionCoordinator.java create mode 100644 amoro-ams/src/main/java/org/apache/amoro/server/table/simple/SimpleTableRuntime.java create mode 100644 amoro-ams/src/main/java/org/apache/amoro/server/table/simple/SimpleTableRuntimeFactory.java rename {amoro-ams/src/main/java/org/apache/amoro/server => amoro-common/src/main/java/org/apache/amoro}/process/ActionCoordinator.java (72%) delete mode 100644 amoro-common/src/main/java/org/apache/amoro/process/OptimizingState.java delete mode 100644 amoro-common/src/main/java/org/apache/amoro/process/ProcessState.java delete mode 100644 amoro-common/src/main/java/org/apache/amoro/process/TableProcessState.java diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java index 4fd3e5e9af..a5c798bf89 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java @@ -32,6 +32,7 @@ import org.apache.amoro.config.Configurations; import org.apache.amoro.config.shade.utils.ConfigShadeUtils; import org.apache.amoro.exception.AmoroRuntimeException; +import org.apache.amoro.process.ActionCoordinator; import org.apache.amoro.server.catalog.CatalogManager; import org.apache.amoro.server.catalog.DefaultCatalogManager; import org.apache.amoro.server.dashboard.DashboardServer; @@ -47,6 +48,7 @@ import org.apache.amoro.server.persistence.HttpSessionHandlerFactory; import org.apache.amoro.server.persistence.SqlSessionFactoryProvider; import org.apache.amoro.server.process.ProcessService; +import org.apache.amoro.server.process.executor.ExecuteEngineManager; import org.apache.amoro.server.resource.ContainerMetadata; import org.apache.amoro.server.resource.Containers; import org.apache.amoro.server.resource.DefaultOptimizerManager; @@ -96,6 +98,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import java.util.stream.Collectors; public class AmoroServiceContainer { @@ -231,6 +234,12 @@ public void transitionToFollower() { public void startOptimizingService() throws Exception { TableRuntimeFactoryManager tableRuntimeFactoryManager = new TableRuntimeFactoryManager(); tableRuntimeFactoryManager.initialize(); + List actionCoordinators = + tableRuntimeFactoryManager.installedPlugins().stream() + .flatMap(f -> f.supportedCoordinators().stream()) + .collect(Collectors.toList()); + + ExecuteEngineManager executeEngineManager = new ExecuteEngineManager(); tableService = new DefaultTableService(serviceConfig, catalogManager, tableRuntimeFactoryManager); @@ -238,7 +247,8 @@ public void startOptimizingService() throws Exception { optimizingService = new DefaultOptimizingService(serviceConfig, catalogManager, optimizerManager, tableService); - processService = new ProcessService(serviceConfig, tableService); + processService = + new ProcessService(serviceConfig, tableService, actionCoordinators, executeEngineManager); LOG.info("Setting up AMS table executors..."); InlineTableExecutors.getInstance().setup(tableService, serviceConfig); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/ProcessStateMapper.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/ProcessStateMapper.java deleted file mode 100644 index f7b7557819..0000000000 --- a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/ProcessStateMapper.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.amoro.server.persistence.mapper; - -import org.apache.amoro.process.TableProcessState; -import org.apache.ibatis.annotations.Insert; -import org.apache.ibatis.annotations.Options; -import org.apache.ibatis.annotations.Param; -import org.apache.ibatis.annotations.Result; -import org.apache.ibatis.annotations.ResultMap; -import org.apache.ibatis.annotations.Results; -import org.apache.ibatis.annotations.Select; -import org.apache.ibatis.annotations.Update; - -import java.util.Map; - -public interface ProcessStateMapper { - - @Insert( - "INSERT INTO table_process_state " - + "(process_id, action, table_id, retry_num, status, start_time, end_time, fail_reason, summary) " - + "VALUES " - + "(#{id}, #{action}, #{tableId}, #{retryNumber}, #{status}, #{startTime}, #{endTime}, #{failedReason}, #{summary})") - @Options(useGeneratedKeys = true, keyProperty = "id") - void createProcessState(TableProcessState state); - - @Update( - "UPDATE table_process_state " - + "SET status = #{status}, start_time = #{startTime} " - + "WHERE process_id = #{id} and retry_num = #{retryNumber}") - void updateProcessRunning(TableProcessState state); - - @Update( - "UPDATE table_process_state " - + "SET status = #{status}, end_time = #{endTime} " - + "WHERE process_id = #{id} and retry_num = #{retryNumber}") - void updateProcessCompleted(TableProcessState state); - - @Update( - "UPDATE table_process_state " - + "SET status = #{status}, end_time = #{endTime}, fail_reason = #{failedReason} " - + "WHERE process_id = #{id} and retry_num = #{retryNumber}") - void updateProcessFailed(TableProcessState state); - - @Select( - "SELECT process_id, action, table_id, retry_num, status, start_time, end_time, fail_reason, summary " - + "FROM table_process_state " - + "WHERE process_id = #{processId}") - @Results( - id = "TableProcessStateResultMap", - value = { - @Result(property = "id", column = "process_id"), - @Result(property = "action", column = "action"), - @Result(property = "tableId", column = "table_id"), - @Result(property = "retryNumber", column = "retry_num"), - @Result(property = "status", column = "status"), - @Result(property = "startTime", column = "start_time"), - @Result(property = "endTime", column = "end_time"), - @Result(property = "failedReason", column = "fail_reason"), - @Result(property = "summary", column = "summary", javaType = Map.class) - }) - TableProcessState getProcessStateById(@Param("processId") long processId); - - /** Query TableProcessState by table_id */ - @Select( - "SELECT process_id, action, table_id, retry_num, status, start_time, end_time, fail_reason, summary " - + "FROM table_process_state " - + "WHERE table_id = #{tableId}") - @ResultMap("TableProcessStateResultMap") - TableProcessState getProcessStateByTableId(@Param("tableId") long tableId); -} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java index 83574c6636..7b6159e640 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java @@ -20,6 +20,7 @@ import org.apache.amoro.TableFormat; import org.apache.amoro.TableRuntime; +import org.apache.amoro.process.ActionCoordinator; import org.apache.amoro.process.TableProcess; import org.apache.amoro.process.TableProcessStore; import org.apache.amoro.server.scheduler.PeriodicTableScheduler; @@ -110,16 +111,6 @@ protected void recover(TableRuntime tableRuntime, TableProcessStore processStore processService.recover(tableRuntime, process); } - /** - * Retry a failed table process. - * - * @param process process to retry - */ - protected void retry(TableProcess process) { - process = coordinator.retryTableProcess(process); - processService.retry(process); - } - /** * Get executor delay from coordinator. * diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java index ee9f136dc6..78a5ac7c77 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java @@ -25,15 +25,16 @@ import org.apache.amoro.TableRuntime; import org.apache.amoro.config.Configurations; import org.apache.amoro.config.TableConfiguration; +import org.apache.amoro.process.ActionCoordinator; import org.apache.amoro.process.ProcessEvent; import org.apache.amoro.process.ProcessStatus; import org.apache.amoro.process.TableProcess; -import org.apache.amoro.server.manager.AbstractPluginManager; import org.apache.amoro.server.optimizing.OptimizingStatus; import org.apache.amoro.server.persistence.PersistentBase; import org.apache.amoro.server.persistence.mapper.TableProcessMapper; import org.apache.amoro.server.process.executor.EngineType; import org.apache.amoro.server.process.executor.ExecuteEngine; +import org.apache.amoro.server.process.executor.ExecuteEngineManager; import org.apache.amoro.server.process.executor.TableProcessExecutor; import org.apache.amoro.server.table.RuntimeHandlerChain; import org.apache.amoro.server.table.TableService; @@ -59,11 +60,11 @@ public class ProcessService extends PersistentBase { private static final Logger LOG = LoggerFactory.getLogger(ProcessService.class); private final TableService tableService; + private final List actionCoordinatorLists; private final Map actionCoordinators = new ConcurrentHashMap<>(); private final Map executeEngines = new ConcurrentHashMap<>(); - private final ActionCoordinatorManager actionCoordinatorManager; private final ExecuteEngineManager executeEngineManager; private final ProcessRuntimeHandler tableRuntimeHandler = new ProcessRuntimeHandler(); private final ThreadPoolExecutor processExecutionPool = @@ -72,17 +73,13 @@ public class ProcessService extends PersistentBase { private final Map> activeTableProcess = new ConcurrentHashMap<>(); - public ProcessService(Configurations serviceConfig, TableService tableService) { - this(serviceConfig, tableService, new ActionCoordinatorManager(), new ExecuteEngineManager()); - } - public ProcessService( Configurations serviceConfig, TableService tableService, - ActionCoordinatorManager actionCoordinatorManager, + List actionCoordinators, ExecuteEngineManager executeEngineManager) { this.tableService = tableService; - this.actionCoordinatorManager = actionCoordinatorManager; + this.actionCoordinatorLists = actionCoordinators; this.executeEngineManager = executeEngineManager; } @@ -126,15 +123,6 @@ public void recover(TableRuntime tableRuntime, TableProcess process) { executeOrTraceProcess(process); } - /** - * Retry a failed table process. - * - * @param process process to retry - */ - public void retry(TableProcess process) { - executeOrTraceProcess(process); - } - /** * Cancel a table process and release related resources. * @@ -147,7 +135,7 @@ public void cancel(TableProcess process) { /** Dispose the service, shutdown engines and clear active processes. */ public void dispose() { - actionCoordinatorManager.close(); + this.actionCoordinators.values().forEach(RuntimeHandlerChain::dispose); executeEngineManager.close(); processExecutionPool.shutdown(); activeTableProcess.clear(); @@ -155,16 +143,12 @@ public void dispose() { private void initialize(List tableRuntimes) { LOG.info("Initializing process service"); - actionCoordinatorManager.initialize(); - actionCoordinatorManager - .installedPlugins() - .forEach( - actionCoordinator -> { - actionCoordinators.put( - actionCoordinator.action().getName(), - new ActionCoordinatorScheduler( - actionCoordinator, tableService, ProcessService.this)); - }); + actionCoordinatorLists.forEach( + actionCoordinator -> { + actionCoordinators.put( + actionCoordinator.action().getName(), + new ActionCoordinatorScheduler(actionCoordinator, tableService, ProcessService.this)); + }); executeEngineManager.initialize(); executeEngineManager .installedPlugins() @@ -242,7 +226,7 @@ private void executeOrTraceProcess(TableProcess process) { "Regular Retry.", process.getProcessParameters(), process.getSummary()); - scheduler.retry(process); + executeOrTraceProcess(process); } else { untrackTableProcessInstance( process.getTableRuntime().getTableIdentifier(), process.getId()); @@ -551,18 +535,4 @@ protected void doDispose() { // TODO: dispose } } - - /** Manager for {@link ActionCoordinator} plugins. */ - public static class ActionCoordinatorManager extends AbstractPluginManager { - public ActionCoordinatorManager() { - super("action-coordinators"); - } - } - - /** Manager for {@link ExecuteEngine} plugins. */ - public static class ExecuteEngineManager extends AbstractPluginManager { - public ExecuteEngineManager() { - super("execute-engines"); - } - } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java index 311b60e035..6890c25c10 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java @@ -19,7 +19,6 @@ package org.apache.amoro.server.process; import org.apache.amoro.process.ProcessStatus; -import org.apache.amoro.process.TableProcessState; import org.apache.amoro.process.TableProcessStore; import java.util.HashMap; @@ -189,25 +188,6 @@ public static TableProcessMeta fromTableProcessStore(TableProcessStore tableProc return tableProcessMeta; } - @Deprecated - public static TableProcessMeta fromTableProcessState(TableProcessState tableProcessState) { - TableProcessMeta tableProcessMeta = new TableProcessMeta(); - tableProcessMeta.setProcessId(tableProcessState.getId()); - tableProcessMeta.setTableId(tableProcessState.getTableIdentifier().getId()); - tableProcessMeta.setExternalProcessIdentifier(tableProcessState.getExternalProcessIdentifier()); - tableProcessMeta.setStatus(tableProcessState.getStatus()); - tableProcessMeta.setProcessType(tableProcessState.getAction().getName()); - tableProcessMeta.setProcessStage(tableProcessState.getStage().getDesc()); - tableProcessMeta.setExecutionEngine(tableProcessState.getExecutionEngine()); - tableProcessMeta.setRetryNumber(tableProcessState.getRetryNumber()); - tableProcessMeta.setCreateTime(tableProcessState.getStartTime()); - tableProcessMeta.setFinishTime(tableProcessState.getEndTime()); - tableProcessMeta.setFailMessage(tableProcessState.getFailedReason()); - tableProcessMeta.setProcessParameters(tableProcessState.getProcessParameters()); - tableProcessMeta.setSummary(tableProcessState.getSummary()); - return tableProcessMeta; - } - public static TableProcessMeta of( long processId, long tableId, diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/executor/ExecuteEngineManager.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/executor/ExecuteEngineManager.java new file mode 100644 index 0000000000..77921b882a --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/executor/ExecuteEngineManager.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.process.executor; + +import org.apache.amoro.server.manager.AbstractPluginManager; + +/** Manager for {@link org.apache.amoro.server.process.executor.ExecuteEngine} plugins. */ +public class ExecuteEngineManager extends AbstractPluginManager { + public ExecuteEngineManager() { + super("execute-engines"); + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicExternalScheduler.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicExternalScheduler.java deleted file mode 100644 index 346f02772b..0000000000 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicExternalScheduler.java +++ /dev/null @@ -1,278 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.amoro.server.scheduler; - -import org.apache.amoro.Action; -import org.apache.amoro.SupportsProcessPlugins; -import org.apache.amoro.TableRuntime; -import org.apache.amoro.process.AmoroProcess; -import org.apache.amoro.process.ManagedProcess; -import org.apache.amoro.process.ProcessFactory; -import org.apache.amoro.process.ProcessStatus; -import org.apache.amoro.process.SimpleFuture; -import org.apache.amoro.process.TableProcess; -import org.apache.amoro.process.TableProcessState; -import org.apache.amoro.process.TableProcessStore; -import org.apache.amoro.resource.ExternalResourceContainer; -import org.apache.amoro.resource.Resource; -import org.apache.amoro.resource.ResourceManager; -import org.apache.amoro.server.persistence.PersistentBase; -import org.apache.amoro.server.persistence.mapper.TableProcessMapper; -import org.apache.amoro.server.process.DefaultTableProcessStore; -import org.apache.amoro.server.process.TableProcessMeta; -import org.apache.amoro.server.table.TableService; -import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; - -import java.util.List; -import java.util.Optional; - -public abstract class PeriodicExternalScheduler extends PeriodicTableScheduler { - - private final ExternalResourceContainer resourceContainer; - private final ResourceManager resourceManager; - private final ProcessFactory processFactory; - - public PeriodicExternalScheduler( - ResourceManager resourceManager, - ExternalResourceContainer resourceContainer, - Action action, - TableService tableService, - int poolSize) { - super(action, tableService, poolSize); - this.resourceContainer = resourceContainer; - this.resourceManager = resourceManager; - this.processFactory = generateProcessFactory(); - } - - @Override - protected void initHandler(List tableRuntimeList) { - tableRuntimeList.stream() - .filter(t -> t instanceof SupportsProcessPlugins) - .map(t -> (SupportsProcessPlugins) t) - .forEach(tableRuntime -> tableRuntime.install(getAction(), processFactory)); - super.initHandler(tableRuntimeList); - } - - @Override - protected boolean enabled(TableRuntime tableRuntime) { - return Optional.of(tableRuntime) - .filter(t -> t instanceof SupportsProcessPlugins) - .map(t -> (SupportsProcessPlugins) t) - .map(t -> t.enabled(getAction())) - .orElse(false); - } - - @Override - protected void execute(TableRuntime tableRuntime) { - Preconditions.checkArgument(tableRuntime instanceof SupportsProcessPlugins); - SupportsProcessPlugins runtimeSupportProcessPlugin = (SupportsProcessPlugins) tableRuntime; - // Trigger a table process and check conflicts by table runtime - // Update process state after process completed, the callback must be register first - AmoroProcess process = runtimeSupportProcessPlugin.trigger(getAction()); - process.getCompleteFuture().whenCompleted(() -> persistTableProcess(process)); - ManagedProcess managedProcess = new ManagedTableProcess(process); - - // Submit the table process to resource manager, this is a sync operation - // update process completed and delete related resources - managedProcess.submit(); - - // Trace the table process by async framework so that process can be called back when completed - trace(process); - } - - protected int getMaxRetryNumber() { - return 1; - } - - protected abstract void trace(AmoroProcess process); - - protected ProcessFactory generateProcessFactory() { - return new ExternalProcessFactory(); - } - - protected void persistTableProcess(AmoroProcess process) { - TableProcessStore store = process.store(); - if (store.getStatus() == ProcessStatus.SUBMITTED) { - new PersistencyHelper().createProcessState(store); - } else { - new PersistencyHelper().updateProcessStatus(store); - } - } - - private static class PersistencyHelper extends PersistentBase { - - void createProcessState(TableProcessStore store) { - TableProcessMeta meta = TableProcessMeta.fromTableProcessStore(store); - doAs( - TableProcessMapper.class, - mapper -> - mapper.updateProcess( - meta.getTableId(), - meta.getProcessId(), - meta.getExternalProcessIdentifier(), - meta.getStatus(), - meta.getProcessStage(), - meta.getRetryNumber(), - meta.getFinishTime(), - meta.getFailMessage(), - meta.getProcessParameters(), - meta.getSummary())); - } - - void updateProcessStatus(TableProcessStore store) { - TableProcessMeta meta = TableProcessMeta.fromTableProcessStore(store); - doAs( - TableProcessMapper.class, - mapper -> - mapper.updateProcess( - meta.getTableId(), - meta.getProcessId(), - meta.getExternalProcessIdentifier(), - meta.getStatus(), - meta.getProcessStage(), - meta.getRetryNumber(), - meta.getFinishTime(), - meta.getFailMessage(), - meta.getProcessParameters(), - meta.getSummary())); - } - } - - private class ExternalTableProcess extends TableProcess { - - ExternalTableProcess(TableRuntime tableRuntime) { - super( - tableRuntime, - new DefaultTableProcessStore( - tableRuntime, new TableProcessMeta(), PeriodicExternalScheduler.this.getAction())); - } - - ExternalTableProcess(TableRuntime tableRuntime, TableProcessState state) { - super( - tableRuntime, - new DefaultTableProcessStore( - tableRuntime, - TableProcessMeta.fromTableProcessState(state), - PeriodicExternalScheduler.this.getAction())); - } - - @Override - protected void closeInternal() {} - } - - private class ExternalProcessFactory implements ProcessFactory { - - @Override - public AmoroProcess create(TableRuntime tableRuntime, Action action) { - return new ExternalTableProcess(tableRuntime); - } - - @Override - public AmoroProcess recover(TableRuntime tableRuntime, TableProcessState state) { - return new ExternalTableProcess(tableRuntime, state); - } - } - - protected class ManagedTableProcess implements ManagedProcess { - - private final AmoroProcess process; - - ManagedTableProcess(AmoroProcess process) { - this.process = process; - } - - @Override - public void submit() { - Resource resource = resourceContainer.submit(this); - if (resource == null) { - throw new IllegalStateException("Submit table process can not return null resource"); - } - persistTableProcess(this); - resourceManager.createResource(resource); - getCompleteFuture() - .whenCompleted( - () -> { - resourceManager.deleteResource(resource.getResourceId()); - if (store().getStatus() == ProcessStatus.FAILED - && store().getRetryNumber() < getMaxRetryNumber()) { - retry(); - } - }); - store().begin().updateTableProcessStatus(ProcessStatus.SUBMITTED).commit(); - getSubmitFuture().complete(); - } - - @Override - public void complete() { - store() - .begin() - .updateTableProcessStatus(ProcessStatus.SUCCESS) - .updateFinishTime(System.currentTimeMillis()) - .commit(); - process.getCompleteFuture().complete(); - } - - @Override - public void complete(String failedReason) { - store() - .begin() - .updateTableProcessStatus(ProcessStatus.FAILED) - .updateTableProcessFailMessage(failedReason) - .updateFinishTime(System.currentTimeMillis()) - .commit(); - process.getCompleteFuture().complete(); - } - - @Override - public void retry() { - store() - .begin() - .updateTableProcessStatus(ProcessStatus.PENDING) - .updateRetryNumber(store().getRetryNumber() + 1) - .updateExternalProcessIdentifier("") - .commit(); - submit(); - } - - @Override - public void kill() { - store() - .begin() - .updateTableProcessStatus(ProcessStatus.KILLED) - .updateFinishTime(System.currentTimeMillis()) - .commit(); - process.getCompleteFuture().complete(); - } - - @Override - public SimpleFuture getSubmitFuture() { - return process.getSubmitFuture(); - } - - @Override - public SimpleFuture getCompleteFuture() { - return process.getCompleteFuture(); - } - - @Override - public TableProcessStore store() { - return process.store(); - } - } -} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java index e07cd4432d..68b4e1bd4c 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java @@ -18,17 +18,26 @@ package org.apache.amoro.server.table; +import org.apache.amoro.Action; import org.apache.amoro.ServerTableIdentifier; -import org.apache.amoro.SupportsProcessPlugins; import org.apache.amoro.TableRuntime; import org.apache.amoro.config.TableConfiguration; +import org.apache.amoro.process.TableProcessStore; import org.apache.amoro.server.persistence.PersistentBase; +import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; +import org.apache.amoro.shade.zookeeper3.org.apache.curator.shaded.com.google.common.collect.Maps; import org.apache.amoro.table.TableRuntimeStore; -public abstract class AbstractTableRuntime extends PersistentBase - implements TableRuntime, SupportsProcessPlugins { +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; + +public abstract class AbstractTableRuntime extends PersistentBase implements TableRuntime { private final TableRuntimeStore store; + private final Map processContainerMap = Maps.newConcurrentMap(); protected AbstractTableRuntime(TableRuntimeStore store) { this.store = store; @@ -48,6 +57,44 @@ public TableConfiguration getTableConfiguration() { return TableConfigurations.parseTableConfig(store().getTableConfig()); } + @Override + public List getProcessStates() { + return processContainerMap.values().stream() + .flatMap(container -> container.getProcessStates().stream()) + .collect(Collectors.toList()); + } + + @Override + public List getProcessStates(Action action) { + return processContainerMap.get(action).getProcessStates(); + } + + @Override + public void registerProcess(TableProcessStore processStore) { + processContainerMap + .computeIfAbsent(processStore.getAction(), k -> new TableProcessContainer()) + .processLock + .lock(); + try { + processContainerMap + .get(processStore.getAction()) + .processMap + .put(processStore.getProcessId(), processStore); + } finally { + processContainerMap.get(processStore.getAction()).processLock.unlock(); + } + } + + @Override + public void removeProcess(TableProcessStore processStore) { + processContainerMap.computeIfPresent( + processStore.getAction(), + (action, container) -> { + container.processMap.remove(processStore.getProcessId()); + return container; + }); + } + @Override public String getGroupName() { return store().getGroupName(); @@ -61,4 +108,13 @@ public int getStatusCode() { public void dispose() { store().dispose(); } + + private static class TableProcessContainer { + private final Lock processLock = new ReentrantLock(); + private final Map processMap = Maps.newConcurrentMap(); + + public List getProcessStates() { + return Lists.newArrayList(processMap.values()); + } + } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java index 6cf566eef8..17ec8b257d 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java @@ -18,9 +18,7 @@ package org.apache.amoro.server.table; -import org.apache.amoro.Action; import org.apache.amoro.AmoroTable; -import org.apache.amoro.SupportsProcessPlugins; import org.apache.amoro.TableRuntime; import org.apache.amoro.api.BlockableOperation; import org.apache.amoro.config.OptimizingConfig; @@ -30,9 +28,6 @@ import org.apache.amoro.optimizing.OptimizingType; import org.apache.amoro.optimizing.TableRuntimeOptimizingState; import org.apache.amoro.optimizing.plan.AbstractOptimizingEvaluator; -import org.apache.amoro.process.AmoroProcess; -import org.apache.amoro.process.ProcessFactory; -import org.apache.amoro.process.TableProcessStore; import org.apache.amoro.server.AmoroServiceConstants; import org.apache.amoro.server.optimizing.OptimizingProcess; import org.apache.amoro.server.optimizing.OptimizingStatus; @@ -47,7 +42,6 @@ import org.apache.amoro.server.utils.IcebergTableUtil; import org.apache.amoro.server.utils.SnowflakeIdGenerator; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; -import org.apache.amoro.shade.zookeeper3.org.apache.curator.shaded.com.google.common.collect.Maps; import org.apache.amoro.table.BaseTable; import org.apache.amoro.table.ChangeTable; import org.apache.amoro.table.MixedTable; @@ -63,13 +57,9 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; /** Default table runtime implementation. */ -public class DefaultTableRuntime extends AbstractTableRuntime - implements TableRuntime, SupportsProcessPlugins { +public class DefaultTableRuntime extends AbstractTableRuntime implements TableRuntime { private static final Logger LOG = LoggerFactory.getLogger(DefaultTableRuntime.class); @@ -95,7 +85,6 @@ public class DefaultTableRuntime extends AbstractTableRuntime Lists.newArrayList( OPTIMIZING_STATE_KEY, PENDING_INPUT_KEY, PROCESS_ID_KEY, CLEANUP_STATE_KEY); - private final Map processContainerMap = Maps.newConcurrentMap(); private final TableOptimizingMetrics optimizingMetrics; private final TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics; private final TableSummaryMetrics tableSummaryMetrics; @@ -128,39 +117,6 @@ public void registerMetric(MetricRegistry metricRegistry) { this.tableSummaryMetrics.register(metricRegistry); } - @Override - public AmoroProcess trigger(Action action) { - return Optional.ofNullable(processContainerMap.get(action)) - .map(container -> container.trigger(action)) - // Define a related exception - .orElseThrow(() -> new IllegalArgumentException("No ProcessFactory for action " + action)); - } - - @Override - public void install(Action action, ProcessFactory processFactory) { - if (processContainerMap.putIfAbsent(action, new TableProcessContainer(processFactory)) - != null) { - throw new IllegalStateException("ProcessFactory for action " + action + " already exists"); - } - } - - @Override - public boolean enabled(Action action) { - return processContainerMap.get(action) != null; - } - - @Override - public List getProcessStates() { - return processContainerMap.values().stream() - .flatMap(container -> container.getProcessStates().stream()) - .collect(Collectors.toList()); - } - - @Override - public List getProcessStates(Action action) { - return processContainerMap.get(action).getProcessStates(); - } - public TableOrphanFilesCleaningMetrics getOrphanFilesCleaningMetrics() { return orphanFilesCleaningMetrics; } @@ -582,30 +538,4 @@ private long doRefreshSnapshots(UnkeyedTable table) { return currentSnapshotId; } - - private class TableProcessContainer { - private final Lock processLock = new ReentrantLock(); - private final ProcessFactory processFactory; - private final Map processMap = Maps.newConcurrentMap(); - - TableProcessContainer(ProcessFactory processFactory) { - this.processFactory = processFactory; - } - - public AmoroProcess trigger(Action action) { - processLock.lock(); - try { - AmoroProcess process = processFactory.create(DefaultTableRuntime.this, action); - process.getCompleteFuture().whenCompleted(() -> processMap.remove(process.getId())); - processMap.put(process.getId(), process); - return process; - } finally { - processLock.unlock(); - } - } - - public List getProcessStates() { - return processMap.values().stream().map(AmoroProcess::store).collect(Collectors.toList()); - } - } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java index 86e748cc54..5d43c5578b 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java @@ -21,6 +21,8 @@ import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.TableFormat; import org.apache.amoro.TableRuntime; +import org.apache.amoro.process.ActionCoordinator; +import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.amoro.table.StateKey; import org.apache.amoro.table.TableRuntimeFactory; import org.apache.amoro.table.TableRuntimeStore; @@ -41,6 +43,11 @@ public String name() { return "default"; } + @Override + public List supportedCoordinators() { + return Lists.newArrayList(); + } + @Override public Optional accept( ServerTableIdentifier tableIdentifier, Map tableProperties) { diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/simple/SimpleActionCoordinator.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/simple/SimpleActionCoordinator.java new file mode 100644 index 0000000000..63374fe058 --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/simple/SimpleActionCoordinator.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.table.simple; + +import org.apache.amoro.Action; +import org.apache.amoro.TableFormat; +import org.apache.amoro.TableRuntime; +import org.apache.amoro.process.ActionCoordinator; +import org.apache.amoro.process.ProcessFactory; +import org.apache.amoro.process.TableProcess; +import org.apache.amoro.process.TableProcessStore; + +import java.time.Duration; +import java.util.Map; +import java.util.Set; + +public class SimpleActionCoordinator implements ActionCoordinator { + + private static final String PARALLELISM = "parallelism"; + private static final int PARALLELISM_DEFAULT = 1; + private static final String CHECK_INTERVAL = "check-interval"; + private static final long CHECK_INTERVAL_DEFAULT = Duration.ofMinutes(5).toMillis(); + + private final Action action; + private final ProcessFactory factory; + private final Set supportedFormats; + + private final int parallelism; + private final long checkInterval; + + public SimpleActionCoordinator( + Action action, + ProcessFactory factory, + Set supportedFormats, + Map configuration) { + this.action = action; + this.factory = factory; + this.supportedFormats = supportedFormats; + this.parallelism = + Integer.parseInt( + configuration.getOrDefault(PARALLELISM, Integer.toString(PARALLELISM_DEFAULT))); + this.checkInterval = + Long.parseLong( + configuration.getOrDefault(CHECK_INTERVAL, Long.toString(CHECK_INTERVAL_DEFAULT))); + } + + @Override + public boolean formatSupported(TableFormat format) { + return supportedFormats.contains(format); + } + + @Override + public int parallelism() { + return parallelism; + } + + @Override + public Action action() { + return action; + } + + @Override + public long getNextExecutingTime(TableRuntime tableRuntime) { + return checkInterval; + } + + @Override + public boolean enabled(TableRuntime tableRuntime) { + return formatSupported(tableRuntime.getFormat()); + } + + @Override + public long getExecutorDelay() { + return checkInterval; + } + + @Override + public boolean isReady(TableRuntime tableRuntime) { + return factory.readyForAction(tableRuntime, action); + } + + @Override + public TableProcess createTableProcess(TableRuntime tableRuntime) { + return factory.create(tableRuntime, action); + } + + @Override + public TableProcess recoverTableProcess( + TableRuntime tableRuntime, TableProcessStore processStore) { + return factory.recover(tableRuntime, processStore); + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/simple/SimpleTableRuntime.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/simple/SimpleTableRuntime.java new file mode 100644 index 0000000000..602a4c009b --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/simple/SimpleTableRuntime.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.table.simple; + +import org.apache.amoro.metrics.MetricRegistry; +import org.apache.amoro.server.table.AbstractTableRuntime; +import org.apache.amoro.table.TableRuntimeStore; + +public class SimpleTableRuntime extends AbstractTableRuntime { + protected SimpleTableRuntime(TableRuntimeStore store) { + super(store); + } + + @Override + public void registerMetric(MetricRegistry metricRegistry) {} + + @Override + public void unregisterMetric() {} +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/simple/SimpleTableRuntimeFactory.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/simple/SimpleTableRuntimeFactory.java new file mode 100644 index 0000000000..9b602ee665 --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/simple/SimpleTableRuntimeFactory.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.table.simple; + +import org.apache.amoro.Action; +import org.apache.amoro.ServerTableIdentifier; +import org.apache.amoro.TableFormat; +import org.apache.amoro.TableRuntime; +import org.apache.amoro.process.ActionCoordinator; +import org.apache.amoro.process.ProcessFactory; +import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; +import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; +import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; +import org.apache.amoro.shade.guava32.com.google.common.collect.Sets; +import org.apache.amoro.table.StateKey; +import org.apache.amoro.table.TableRuntimeFactory; +import org.apache.amoro.table.TableRuntimeStore; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +public class SimpleTableRuntimeFactory implements TableRuntimeFactory { + private static final String SUPPORT_FORMATS = "support-formats"; + private static final String SUPPORT_ACTIONS = "support-actions"; + private static final String SUPPORT_FORMAT_ACTIONS_PREFIX = "support-format-actions."; + private static final String PROCESS_FACTORY_IMPL = "process-factory-impl"; + + private final Set supportFormats = Sets.newHashSet(); + private final Map> supportedActions = Maps.newHashMap(); + private final Map> actionConfigs = Maps.newHashMap(); + private ProcessFactory processFactory; + private final List supportedCoordinators = Lists.newArrayList(); + + @Override + public void open(Map properties) { + Preconditions.checkArgument( + properties.containsKey(SUPPORT_FORMATS), SUPPORT_FORMATS + " is required"); + String supportFormatsStr = properties.get(SUPPORT_FORMATS); + for (String format : supportFormatsStr.split(",")) { + supportFormats.add(TableFormat.register(format.trim().toUpperCase())); + } + initializeProcessFactory(properties); + initializeActions(properties); + } + + @Override + public void close() {} + + @Override + public List supportedCoordinators() { + return supportedCoordinators; + } + + @Override + public Optional accept( + ServerTableIdentifier tableIdentifier, Map tableProperties) { + if (!supportFormats.contains(tableIdentifier.getFormat())) { + return Optional.empty(); + } + return Optional.of(new TableRuntimeCreatorImpl()); + } + + private void initializeActions(Map properties) { + if (properties.containsKey(SUPPORT_ACTIONS)) { + String supportActionsStr = properties.get(SUPPORT_ACTIONS); + for (String action : supportActionsStr.split(",")) { + for (TableFormat format : supportFormats) { + supportedActions + .computeIfAbsent(format, k -> Sets.newHashSet()) + .add(Action.register(action.trim().toUpperCase())); + } + } + } + properties.forEach( + (key, value) -> { + if (key.startsWith(SUPPORT_FORMAT_ACTIONS_PREFIX)) { + TableFormat format = + TableFormat.valueOf( + key.substring(SUPPORT_FORMAT_ACTIONS_PREFIX.length()).trim().toUpperCase()); + Preconditions.checkArgument( + format == null || !supportFormats.contains(format), + "Unsupported format: " + format); + for (String action : value.split(",")) { + supportedActions + .computeIfAbsent(format, k -> Sets.newHashSet()) + .add(Action.register(action.trim().toUpperCase())); + } + } + }); + Preconditions.checkArgument( + !supportedActions.isEmpty(), + SUPPORT_ACTIONS + " or " + SUPPORT_FORMAT_ACTIONS_PREFIX + " is required"); + supportedActions.forEach( + (f, actions) -> + actions.forEach(a -> actionConfigs.computeIfAbsent(a, k -> Maps.newHashMap()))); + + properties.forEach( + (key, value) -> { + if (key.startsWith("action.")) { + String actionConfigKey = key.substring("action.".length()).trim().toUpperCase(); + int pos = actionConfigKey.indexOf("."); + String actionName = actionConfigKey.substring(0, pos); + Action action = Action.valueOf(actionName); + if (action != null && actionConfigs.containsKey(action)) { + String configKey = actionConfigKey.substring(pos + 1).trim(); + actionConfigs.get(action).put(configKey, value); + } + } + }); + + for (Action action : actionConfigs.keySet()) { + Set formatSet = Sets.newHashSet(); + supportedActions.forEach( + (f, actions) -> { + if (actions.contains(action)) { + formatSet.add(f); + } + }); + supportedCoordinators.add( + new SimpleActionCoordinator( + action, processFactory, formatSet, actionConfigs.get(action))); + } + } + + private void initializeProcessFactory(Map properties) { + Preconditions.checkArgument( + properties.containsKey(PROCESS_FACTORY_IMPL), PROCESS_FACTORY_IMPL + " is required"); + try { + processFactory = + (ProcessFactory) + Class.forName(properties.get(PROCESS_FACTORY_IMPL)) + .getDeclaredConstructor() + .newInstance(); + } catch (Exception e) { + throw new RuntimeException("Failed to create process factory", e); + } + } + + @Override + public String name() { + return "simple"; + } + + private class TableRuntimeCreatorImpl implements TableRuntimeFactory.TableRuntimeCreator { + @Override + public List> requiredStateKeys() { + return processFactory.requiredStates(); + } + + @Override + public TableRuntime create(TableRuntimeStore store) { + return new SimpleTableRuntime(store); + } + } +} diff --git a/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql b/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql index f1f91e3a32..e142a2ce95 100644 --- a/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql +++ b/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql @@ -189,20 +189,6 @@ CREATE TABLE `task_runtime` KEY `table_index` (`table_id`, `process_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT 'Optimize task basic information'; -CREATE TABLE `table_process_state` -( - `process_id` bigint(20) NOT NULL COMMENT 'optimizing_procedure UUID', - `action` varchar(16) NOT NULL COMMENT 'process action', - `table_id` bigint(20) NOT NULL, - `retry_num` int(11) DEFAULT NULL COMMENT 'Retry times', - `status` varchar(10) NOT NULL COMMENT 'Direct to TableOptimizingStatus', - `start_time` timestamp DEFAULT CURRENT_TIMESTAMP COMMENT 'First plan time', - `end_time` timestamp NULL DEFAULT NULL COMMENT 'finish time or failed time', - `fail_reason` varchar(4096) DEFAULT NULL COMMENT 'Error message after task failed', - `summary` mediumtext COMMENT 'state summary, usually a map', - PRIMARY KEY (`process_id`), - KEY `table_index` (`table_id`, `start_time`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT 'History of optimizing after each commit'; CREATE TABLE `optimizing_task_quota` ( diff --git a/amoro-ams/src/main/resources/mysql/upgrade.sql b/amoro-ams/src/main/resources/mysql/upgrade.sql index b36064109f..dca7839efc 100644 --- a/amoro-ams/src/main/resources/mysql/upgrade.sql +++ b/amoro-ams/src/main/resources/mysql/upgrade.sql @@ -107,18 +107,6 @@ CREATE TABLE `table_runtime` ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT 'Table running information of each table' ROW_FORMAT=DYNAMIC; -CREATE TABLE `table_runtime_state` ( - `state_id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'Primary key', - `table_id` bigint unsigned NOT NULL COMMENT 'Table identifier id', - `state_key` varchar(256) NOT NULL COMMENT 'Table Runtime state key', - `state_value` mediumtext COMMENT 'Table Runtime state value, string type', - `state_version` bigint NOT NULL DEFAULT '0' COMMENT 'Table runtime state version, auto inc when update', - `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time', - `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'update time', - PRIMARY KEY (`state_id`), - UNIQUE KEY `uniq_table_state_key` (`table_id`,`state_key`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='State of Table Runtimes'; - INSERT INTO table_runtime (`table_id`, `group_name`, `status_code`, `status_code_update_time`, `table_config`, `table_summary`) SELECT `table_id`, `optimizer_group`, `optimizing_status_code`, `optimizing_status_start_time`, diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java index 9f3d25b3f7..7cc2312900 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java @@ -19,10 +19,12 @@ package org.apache.amoro.server; import org.apache.amoro.config.Configurations; +import org.apache.amoro.process.ActionCoordinator; import org.apache.amoro.resource.ResourceGroup; import org.apache.amoro.server.manager.EventsManager; import org.apache.amoro.server.manager.MetricManager; import org.apache.amoro.server.process.ProcessService; +import org.apache.amoro.server.process.executor.ExecuteEngineManager; import org.apache.amoro.server.table.DefaultTableRuntime; import org.apache.amoro.server.table.DefaultTableRuntimeFactory; import org.apache.amoro.server.table.DefaultTableService; @@ -34,6 +36,8 @@ import org.mockito.Mockito; import java.time.Duration; +import java.util.List; +import java.util.stream.Collectors; public abstract class AMSServiceTestBase extends AMSManagerTestBase { private static DefaultTableService TABLE_SERVICE = null; @@ -48,6 +52,10 @@ public static void initTableService() { tableRuntimeFactoryManager = Mockito.mock(TableRuntimeFactoryManager.class); Mockito.when(tableRuntimeFactoryManager.installedPlugins()) .thenReturn(Lists.newArrayList(runtimeFactory)); + List actionCoordinators = + tableRuntimeFactoryManager.installedPlugins().stream() + .flatMap(factory -> factory.supportedCoordinators().stream()) + .collect(Collectors.toList()); try { Configurations configurations = new Configurations(); configurations.set(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT, Duration.ofMillis(800L)); @@ -59,7 +67,9 @@ public static void initTableService() { OPTIMIZING_SERVICE = new DefaultOptimizingService( configurations, CATALOG_MANAGER, OPTIMIZER_MANAGER, TABLE_SERVICE); - PROCESS_SERVICE = new ProcessService(configurations, TABLE_SERVICE); + PROCESS_SERVICE = + new ProcessService( + configurations, TABLE_SERVICE, actionCoordinators, new ExecuteEngineManager()); TABLE_SERVICE.addHandlerChain(OPTIMIZING_SERVICE.getTableRuntimeHandler()); TABLE_SERVICE.addHandlerChain(PROCESS_SERVICE.getTableHandlerChain()); diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/process/MockActionCoordinator.java b/amoro-ams/src/test/java/org/apache/amoro/server/process/MockActionCoordinator.java index da3617a148..7027f606ef 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/process/MockActionCoordinator.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/process/MockActionCoordinator.java @@ -21,18 +21,20 @@ import org.apache.amoro.Action; import org.apache.amoro.TableFormat; import org.apache.amoro.TableRuntime; +import org.apache.amoro.process.ActionCoordinator; import org.apache.amoro.process.TableProcess; import org.apache.amoro.process.TableProcessStore; +import org.apache.amoro.server.utils.SnowflakeIdGenerator; import java.util.HashMap; -import java.util.Map; /** Mock implementation of {@link ActionCoordinator} used in tests. */ public class MockActionCoordinator implements ActionCoordinator { public static final int PROCESS_MAX_POOL_SIZE = 1000; private static final TableFormat[] DEFAULT_FORMATS = new TableFormat[] {TableFormat.PAIMON}; + SnowflakeIdGenerator SNOWFLAKE_ID_GENERATOR = new SnowflakeIdGenerator(); - public static final Action DEFAULT_ACTION = new Action(DEFAULT_FORMATS, 0, "default_action"); + public static final Action DEFAULT_ACTION = Action.register("default_action"); /** * Whether the format is supported. @@ -98,7 +100,7 @@ public TableProcess createTableProcess(TableRuntime tableRuntime) { SNOWFLAKE_ID_GENERATOR.generateId(), tableRuntime.getTableIdentifier().getId(), action().getName(), - executionEngine(), + "default", new HashMap<>()); TableProcessStore tableProcessStore = new DefaultTableProcessStore( @@ -131,18 +133,4 @@ public TableProcess cancelTableProcess(TableRuntime tableRuntime, TableProcess p public TableProcess retryTableProcess(TableProcess process) { return process; } - - /** Open plugin. */ - @Override - public void open(Map properties) {} - - /** Close plugin. */ - @Override - public void close() {} - - /** Plugin name. */ - @Override - public String name() { - return "mock_action_coordinator"; - } } diff --git a/amoro-common/src/main/java/org/apache/amoro/Action.java b/amoro-common/src/main/java/org/apache/amoro/Action.java index 42671a6e1d..1d5f81b3bf 100644 --- a/amoro-common/src/main/java/org/apache/amoro/Action.java +++ b/amoro-common/src/main/java/org/apache/amoro/Action.java @@ -20,38 +20,33 @@ import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; -import java.util.Arrays; +import java.util.Locale; +import java.util.Map; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; public final class Action { private static final int MAX_NAME_LENGTH = 16; - - /** supported table formats of this action */ - private final TableFormat[] formats; + private static final Map registeredActions = new ConcurrentHashMap<>(); private final String name; - /** - * the weight number of this action, the bigger the weight number, the higher positions of - * schedulers or front pages - */ - private final int weight; - public Action(TableFormat[] formats, int weight, String name) { - Preconditions.checkArgument( - name.length() <= MAX_NAME_LENGTH, - "Action name length should be less than " + MAX_NAME_LENGTH); - this.formats = formats; - this.name = name; - this.weight = weight; + public static Action register(String name) { + final String regularName = name.trim().toUpperCase(Locale.ROOT); + return registeredActions.computeIfAbsent(regularName, s -> new Action(regularName)); } - public int getWeight() { - return weight; + public static Action valueOf(String name) { + final String regularName = name.trim().toUpperCase(Locale.ROOT); + return registeredActions.get(regularName); } - public TableFormat[] supportedFormats() { - return formats; + private Action(String name) { + Preconditions.checkArgument( + name.length() <= MAX_NAME_LENGTH, + "Action name length should be less than " + MAX_NAME_LENGTH); + this.name = name; } public String getName() { @@ -67,13 +62,11 @@ public boolean equals(Object o) { return false; } Action action = (Action) o; - return Objects.equals(name, action.name) && Arrays.equals(formats, action.formats); + return Objects.equals(name, action.name); } @Override public int hashCode() { - int result = Objects.hash(name); - result = 31 * result + Arrays.hashCode(formats); - return result; + return Objects.hash(name); } } diff --git a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java index 76f470d98c..c75c5ac8d0 100644 --- a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java +++ b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java @@ -23,9 +23,9 @@ public class IcebergActions { private static final TableFormat[] DEFAULT_FORMATS = new TableFormat[] {TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE}; - public static final Action SYSTEM = new Action(DEFAULT_FORMATS, 0, "system"); - public static final Action REWRITE = new Action(DEFAULT_FORMATS, 10, "rewrite"); - public static final Action DELETE_ORPHANS = new Action(DEFAULT_FORMATS, 2, "delete-orphans"); - public static final Action SYNC_HIVE = new Action(DEFAULT_FORMATS, 3, "sync-hive"); - public static final Action EXPIRE_DATA = new Action(DEFAULT_FORMATS, 1, "expire-data"); + public static final Action SYSTEM = Action.register("system"); + public static final Action REWRITE = Action.register("rewrite"); + public static final Action DELETE_ORPHANS = Action.register("delete-orphans"); + public static final Action SYNC_HIVE = Action.register("sync-hive"); + public static final Action EXPIRE_DATA = Action.register("expire-data"); } diff --git a/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java b/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java index 31ea74f1fe..9fa9a29e47 100644 --- a/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java +++ b/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java @@ -48,6 +48,10 @@ public interface TableRuntime { */ List getProcessStates(Action action); + void registerProcess(TableProcessStore processStore); + + void removeProcess(TableProcessStore processStore); + /** Get the group name of the table runtime. */ String getGroupName(); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinator.java b/amoro-common/src/main/java/org/apache/amoro/process/ActionCoordinator.java similarity index 72% rename from amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinator.java rename to amoro-common/src/main/java/org/apache/amoro/process/ActionCoordinator.java index 5e55154cbd..9c97f11742 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinator.java +++ b/amoro-common/src/main/java/org/apache/amoro/process/ActionCoordinator.java @@ -16,25 +16,17 @@ * limitations under the License. */ -package org.apache.amoro.server.process; +package org.apache.amoro.process; import org.apache.amoro.Action; -import org.apache.amoro.ActivePlugin; import org.apache.amoro.TableFormat; import org.apache.amoro.TableRuntime; -import org.apache.amoro.process.TableProcess; -import org.apache.amoro.process.TableProcessStore; -import org.apache.amoro.server.utils.SnowflakeIdGenerator; /** * Coordinator for a specific {@link org.apache.amoro.Action} to manage table processes. Provides * scheduling parameters and lifecycle hooks to create/recover/cancel/retry table processes. */ -public interface ActionCoordinator extends ActivePlugin { - - String PROPERTY_PARALLELISM = "parallelism"; - - SnowflakeIdGenerator SNOWFLAKE_ID_GENERATOR = new SnowflakeIdGenerator(); +public interface ActionCoordinator { /** * Check whether the given table format is supported by this coordinator. @@ -58,13 +50,6 @@ public interface ActionCoordinator extends ActivePlugin { */ Action action(); - /** - * Get the execution engine name used by this coordinator. - * - * @return execution engine name - */ - String executionEngine(); - /** * Calculate the next executing time for the given table runtime. * @@ -88,6 +73,9 @@ public interface ActionCoordinator extends ActivePlugin { */ long getExecutorDelay(); + /** Check whether the given table runtime is ready to create a process. */ + boolean isReady(TableRuntime tableRuntime); + /** * Create a new {@link TableProcess} instance for the given table runtime. * @@ -104,21 +92,4 @@ public interface ActionCoordinator extends ActivePlugin { * @return recovered table process */ TableProcess recoverTableProcess(TableRuntime tableRuntime, TableProcessStore processStore); - - /** - * Prepare a {@link TableProcess} for cancellation. - * - * @param tableRuntime table runtime - * @param process table process to cancel - * @return the process instance to be canceled - */ - TableProcess cancelTableProcess(TableRuntime tableRuntime, TableProcess process); - - /** - * Prepare a {@link TableProcess} for retrying. - * - * @param process table process to retry - * @return the process instance to be retried - */ - TableProcess retryTableProcess(TableProcess process); } diff --git a/amoro-common/src/main/java/org/apache/amoro/process/AmoroProcess.java b/amoro-common/src/main/java/org/apache/amoro/process/AmoroProcess.java index cf0ffd50bf..6ff5b6cfb6 100644 --- a/amoro-common/src/main/java/org/apache/amoro/process/AmoroProcess.java +++ b/amoro-common/src/main/java/org/apache/amoro/process/AmoroProcess.java @@ -47,7 +47,7 @@ public interface AmoroProcess { SimpleFuture getCompleteFuture(); /** - * Get {@link ProcessState} of the process + * Get {@link TableProcessStore} of the process * * @return the state of the process */ diff --git a/amoro-common/src/main/java/org/apache/amoro/process/OptimizingState.java b/amoro-common/src/main/java/org/apache/amoro/process/OptimizingState.java deleted file mode 100644 index c4a900dcac..0000000000 --- a/amoro-common/src/main/java/org/apache/amoro/process/OptimizingState.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.amoro.process; - -import org.apache.amoro.Action; -import org.apache.amoro.ServerTableIdentifier; -import org.apache.amoro.StateField; - -/** The state of the optimizing process. */ -public abstract class OptimizingState extends TableProcessState { - - @StateField private volatile long targetSnapshotId; - @StateField private volatile long watermark; - @StateField private volatile ProcessStage stage; - @StateField private volatile long currentStageStartTime; - - public OptimizingState(Action action, ServerTableIdentifier tableIdentifier) { - super(action, tableIdentifier); - } - - public OptimizingState(long id, Action action, ServerTableIdentifier tableIdentifier) { - super(id, action, tableIdentifier); - } - - protected void setStage(ProcessStage stage) { - this.stage = stage; - this.currentStageStartTime = System.currentTimeMillis(); - } - - protected void setStage(ProcessStage stage, long stageStartTime) { - this.stage = stage; - this.currentStageStartTime = stageStartTime; - } - - protected void setTargetSnapshotId(long targetSnapshotId) { - this.targetSnapshotId = targetSnapshotId; - } - - protected void setWatermark(long watermark) { - this.watermark = watermark; - } - - public long getWatermark() { - return watermark; - } - - @Override - public ProcessStage getStage() { - return stage; - } - - public long getTargetSnapshotId() { - return targetSnapshotId; - } - - public long getCurrentStageStartTime() { - return currentStageStartTime; - } - - @Override - public String getName() { - return stage.getDesc(); - } -} diff --git a/amoro-common/src/main/java/org/apache/amoro/process/ProcessFactory.java b/amoro-common/src/main/java/org/apache/amoro/process/ProcessFactory.java index a2df0a99f3..28d31c61f7 100644 --- a/amoro-common/src/main/java/org/apache/amoro/process/ProcessFactory.java +++ b/amoro-common/src/main/java/org/apache/amoro/process/ProcessFactory.java @@ -20,6 +20,10 @@ import org.apache.amoro.Action; import org.apache.amoro.TableRuntime; +import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; +import org.apache.amoro.table.StateKey; + +import java.util.List; /** * A factory to create a process. Normally, There will be default ProcessFactories for each action @@ -28,6 +32,12 @@ */ public interface ProcessFactory { + default List> requiredStates() { + return Lists.newArrayList(); + } + + boolean readyForAction(TableRuntime tableRuntime, Action action); + /** * Create a process for the action. * @@ -35,7 +45,7 @@ public interface ProcessFactory { * @param action action type * @return target process which has not been submitted yet. */ - AmoroProcess create(TableRuntime tableRuntime, Action action); + TableProcess create(TableRuntime tableRuntime, Action action); /** * Recover a process for the action from a state. @@ -44,5 +54,5 @@ public interface ProcessFactory { * @param state state of the process * @return target process which has not been submitted yet. */ - AmoroProcess recover(TableRuntime tableRuntime, TableProcessState state); + TableProcess recover(TableRuntime tableRuntime, TableProcessStore state); } diff --git a/amoro-common/src/main/java/org/apache/amoro/process/ProcessState.java b/amoro-common/src/main/java/org/apache/amoro/process/ProcessState.java deleted file mode 100644 index 91f76fcaef..0000000000 --- a/amoro-common/src/main/java/org/apache/amoro/process/ProcessState.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.amoro.process; - -import org.apache.amoro.Action; - -import java.util.Map; - -/** - * ProcessState contains information in any {@link AmoroProcess} which must be persistent and {@link - * ProcessFactory} will use to recover {@link AmoroProcess}. - */ -public interface ProcessState { - - /** @return unique identifier of the process. */ - long getId(); - - /** - * @return the name of the state. If multiple stages are involved, it should be the name of the - * current stage. - */ - String getName(); - - /** @return start time of the process. */ - long getStartTime(); - - /** @return the action of the process. */ - Action getAction(); - - /** @return the status of the process. */ - ProcessStatus getStatus(); - - /** - * Get the string encoded summary of the process, this could be a simple description or a POJO - * encoded by JSON - * - * @return the summary of the process - */ - Map getSummary(); - - /** @return the reason of process failure, null if the process has not failed yet. */ - String getFailedReason(); - - /** - * Total millisecond running time of all tasks in the process. - * - * @return actual quota runtime of the process. - */ - long getQuotaRuntime(); - - /** - * Quota value is calculated by the total millisecond running time of all tasks in the process - * divided by the total millisecond from the start time to the current time. It is used to - * evaluate the actual runtime concurrence of the process. - * - * @return the quota value of the process. - */ - default double getQuotaValue() { - return (double) getQuotaRuntime() / (System.currentTimeMillis() - getStartTime()); - } -} diff --git a/amoro-common/src/main/java/org/apache/amoro/process/TableProcessState.java b/amoro-common/src/main/java/org/apache/amoro/process/TableProcessState.java deleted file mode 100644 index c4e800ed1c..0000000000 --- a/amoro-common/src/main/java/org/apache/amoro/process/TableProcessState.java +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.amoro.process; - -import org.apache.amoro.Action; -import org.apache.amoro.ServerTableIdentifier; -import org.apache.amoro.StateField; - -import java.util.Map; - -/** A common state of a table process. */ -public class TableProcessState implements ProcessState { - - @StateField private volatile long id; - @StateField private volatile String externalProcessIdentifier; - private final Action action; - private final ServerTableIdentifier tableIdentifier; - private String executionEngine; - @StateField private int retryNumber = 0; - @StateField private long startTime = -1L; - @StateField private long endTime = -1L; - @StateField private ProcessStatus status = ProcessStatus.PENDING; - @StateField private volatile String failedReason; - private volatile Map processParameters; - private volatile Map summary; - - public TableProcessState(Action action, ServerTableIdentifier tableIdentifier) { - this.action = action; - this.tableIdentifier = tableIdentifier; - } - - public TableProcessState(long id, Action action, ServerTableIdentifier tableIdentifier) { - this.id = id; - this.action = action; - this.tableIdentifier = tableIdentifier; - } - - public TableProcessState( - long id, Action action, ServerTableIdentifier tableIdentifier, String executionEngine) { - this.id = id; - this.action = action; - this.tableIdentifier = tableIdentifier; - this.executionEngine = executionEngine; - } - - @Override - public long getId() { - return id; - } - - public String getExternalProcessIdentifier() { - return externalProcessIdentifier; - } - - public String getName() { - return action.getName(); - } - - public Action getAction() { - return action; - } - - public long getStartTime() { - return startTime; - } - - public long getEndTime() { - return endTime; - } - - public ProcessStatus getStatus() { - return status; - } - - public String getExecutionEngine() { - return executionEngine; - } - - @Override - public Map getSummary() { - return summary; - } - - @Override - public long getQuotaRuntime() { - return getDuration(); - } - - @Override - public double getQuotaValue() { - return 1; - } - - public long getDuration() { - return endTime > 0 ? endTime - startTime : System.currentTimeMillis() - startTime; - } - - public ServerTableIdentifier getTableIdentifier() { - return tableIdentifier; - } - - public void setExternalProcessIdentifier(String externalProcessIdentifier) { - this.externalProcessIdentifier = externalProcessIdentifier; - } - - public void setExecutionEngine(String executionEngine) { - this.executionEngine = executionEngine; - } - - protected void setSummary(Map summary) { - this.summary = summary; - } - - protected void setStartTime(long startTime) { - this.startTime = startTime; - } - - public void setStatus(ProcessStatus status) { - if (status == ProcessStatus.SUCCESS - || status == ProcessStatus.FAILED - || status == ProcessStatus.KILLED) { - endTime = System.currentTimeMillis(); - } else if (this.status != ProcessStatus.SUBMITTED && status == ProcessStatus.SUBMITTED) { - endTime = -1L; - failedReason = null; - summary = null; - } - this.status = status; - } - - public String getFailedReason() { - return failedReason; - } - - public ProcessStage getStage() { - return status.toStage(); - } - - protected void setId(long processId) { - this.id = processId; - } - - public Map getProcessParameters() { - return processParameters; - } - - public void setProcessParameters(Map processParameters) { - this.processParameters = processParameters; - } - - public void setSubmitted(String externalProcessIdentifier) { - this.status = ProcessStatus.SUBMITTED; - setExternalProcessIdentifier(externalProcessIdentifier); - this.startTime = System.currentTimeMillis(); - } - - public void setSubmitted() { - this.status = ProcessStatus.SUBMITTED; - this.startTime = System.currentTimeMillis(); - } - - public void setRunning() { - this.status = ProcessStatus.RUNNING; - } - - public void setCanceling() { - this.status = ProcessStatus.CANCELING; - } - - public void addRetryNumber() { - this.retryNumber += 1; - this.status = ProcessStatus.PENDING; - this.externalProcessIdentifier = ""; - this.failedReason = null; - } - - public void resetRetryNumber() { - this.retryNumber = 0; - } - - public void setCanceled() { - this.status = ProcessStatus.CANCELED; - } - - public void setCompleted() { - this.status = ProcessStatus.SUCCESS; - this.endTime = System.currentTimeMillis(); - } - - public void setKilled() { - this.status = ProcessStatus.KILLED; - this.endTime = System.currentTimeMillis(); - } - - public void setCompleted(String failedReason) { - this.status = ProcessStatus.FAILED; - this.failedReason = failedReason; - this.endTime = System.currentTimeMillis(); - } - - public int getRetryNumber() { - return retryNumber; - } - - public void setRetryNumber(int retryNumber) { - this.retryNumber = retryNumber; - } -} diff --git a/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java b/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java index 7654af7104..3022bb517b 100644 --- a/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java +++ b/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java @@ -21,6 +21,7 @@ import org.apache.amoro.ActivePlugin; import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.TableRuntime; +import org.apache.amoro.process.ActionCoordinator; import java.util.List; import java.util.Map; @@ -29,6 +30,8 @@ /** Table runtime factory. */ public interface TableRuntimeFactory extends ActivePlugin { + List supportedCoordinators(); + Optional accept( ServerTableIdentifier tableIdentifier, Map tableProperties); diff --git a/dist/src/main/amoro-bin/conf/plugins/table-runtime-factories.yaml b/dist/src/main/amoro-bin/conf/plugins/table-runtime-factories.yaml index 34cf7c94d3..a7f2e8a382 100644 --- a/dist/src/main/amoro-bin/conf/plugins/table-runtime-factories.yaml +++ b/dist/src/main/amoro-bin/conf/plugins/table-runtime-factories.yaml @@ -23,3 +23,15 @@ table-runtime-factories: enabled: true priority: 100 + - name: simple + enabled: true + priority: 1 + properties: + process-factory-impl: org.apache.amoro.server.table.simple.SimpleProcessFactory + support-formats: paimon, lance + support-actions: snapshot-expires, compact + support-format-actions.lance: optimizing-index + action.compact.check-interval: 5min + action.snapshot-expires.check-interval: 5min + action.training.check-interval: 10min + From 32bbf87928ff62f959fcb50723ed8d6c3c93079d Mon Sep 17 00:00:00 2001 From: "zhangyongxiang.alpha" Date: Fri, 13 Feb 2026 17:03:18 +0800 Subject: [PATCH 2/2] Provide a simpler way to extend process of formats. --- .../amoro/server/AmoroServiceContainer.java | 19 ++- .../process/ActionCoordinatorScheduler.java | 6 +- .../process/TableProcessFactoryManager.java | 28 ++++ .../table/DefaultTableRuntimeFactory.java | 4 + .../server/table/DefaultTableService.java | 9 +- .../table/simple/SimpleActionCoordinator.java | 51 ++---- .../simple/SimpleTableRuntimeFactory.java | 150 +++++++----------- .../amoro/server/AMSServiceTestBase.java | 2 +- .../server/process/MockActionCoordinator.java | 5 +- .../amoro/server/table/AMSTableTestBase.java | 4 + .../table/TestDefaultTableRuntimeHandler.java | 15 +- .../amoro/process/ActionCoordinator.java | 7 +- .../apache/amoro/process/ProcessFactory.java | 19 ++- .../amoro/process/ProcessTriggerStrategy.java | 54 +++++++ .../amoro/table/TableRuntimeFactory.java | 3 + 15 files changed, 216 insertions(+), 160 deletions(-) create mode 100644 amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessFactoryManager.java create mode 100644 amoro-common/src/main/java/org/apache/amoro/process/ProcessTriggerStrategy.java diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java index a5c798bf89..2cc5dc3983 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java @@ -33,6 +33,7 @@ import org.apache.amoro.config.shade.utils.ConfigShadeUtils; import org.apache.amoro.exception.AmoroRuntimeException; import org.apache.amoro.process.ActionCoordinator; +import org.apache.amoro.process.ProcessFactory; import org.apache.amoro.server.catalog.CatalogManager; import org.apache.amoro.server.catalog.DefaultCatalogManager; import org.apache.amoro.server.dashboard.DashboardServer; @@ -48,6 +49,7 @@ import org.apache.amoro.server.persistence.HttpSessionHandlerFactory; import org.apache.amoro.server.persistence.SqlSessionFactoryProvider; import org.apache.amoro.server.process.ProcessService; +import org.apache.amoro.server.process.TableProcessFactoryManager; import org.apache.amoro.server.process.executor.ExecuteEngineManager; import org.apache.amoro.server.resource.ContainerMetadata; import org.apache.amoro.server.resource.Containers; @@ -60,9 +62,12 @@ import org.apache.amoro.server.table.TableManager; import org.apache.amoro.server.table.TableRuntimeFactoryManager; import org.apache.amoro.server.table.TableService; +import org.apache.amoro.server.table.simple.SimpleTableRuntimeFactory; import org.apache.amoro.server.terminal.TerminalManager; import org.apache.amoro.server.utils.ThriftServiceProxy; import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting; +import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; +import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.amoro.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference; @@ -77,6 +82,7 @@ import org.apache.amoro.shade.thrift.org.apache.thrift.transport.TTransportException; import org.apache.amoro.shade.thrift.org.apache.thrift.transport.TTransportFactory; import org.apache.amoro.shade.thrift.org.apache.thrift.transport.layered.TFramedTransport; +import org.apache.amoro.table.TableRuntimeFactory; import org.apache.amoro.utils.IcebergThreadPools; import org.apache.amoro.utils.JacksonUtil; import org.apache.commons.lang3.StringUtils; @@ -234,6 +240,16 @@ public void transitionToFollower() { public void startOptimizingService() throws Exception { TableRuntimeFactoryManager tableRuntimeFactoryManager = new TableRuntimeFactoryManager(); tableRuntimeFactoryManager.initialize(); + List tableRuntimeFactories = tableRuntimeFactoryManager.installedPlugins(); + Preconditions.checkArgument( + tableRuntimeFactories.size() == 1, "Only one table runtime factory is supported"); + TableRuntimeFactory tableRuntimeFactory = new SimpleTableRuntimeFactory(); + + TableProcessFactoryManager tableProcessFactoryManager = new TableProcessFactoryManager(); + tableProcessFactoryManager.initialize(); + List processFactories = tableProcessFactoryManager.installedPlugins(); + tableRuntimeFactory.initialize(processFactories); + List actionCoordinators = tableRuntimeFactoryManager.installedPlugins().stream() .flatMap(f -> f.supportedCoordinators().stream()) @@ -242,7 +258,8 @@ public void startOptimizingService() throws Exception { ExecuteEngineManager executeEngineManager = new ExecuteEngineManager(); tableService = - new DefaultTableService(serviceConfig, catalogManager, tableRuntimeFactoryManager); + new DefaultTableService( + serviceConfig, catalogManager, Lists.newArrayList(tableRuntimeFactory)); optimizingService = new DefaultOptimizingService(serviceConfig, catalogManager, optimizerManager, tableService); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java index 7b6159e640..daf5869413 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java @@ -28,6 +28,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Optional; + /** * Periodic scheduler that delegates scheduling decisions to an {@link ActionCoordinator}. It * creates, recovers and retries table processes via {@link ProcessService}. @@ -96,8 +98,8 @@ protected boolean enabled(TableRuntime tableRuntime) { */ @Override protected void execute(TableRuntime tableRuntime) { - TableProcess process = coordinator.createTableProcess(tableRuntime); - processService.register(tableRuntime, process); + Optional process = coordinator.trigger(tableRuntime); + process.ifPresent(p -> processService.register(tableRuntime, p)); } /** diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessFactoryManager.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessFactoryManager.java new file mode 100644 index 0000000000..c04bde9e3b --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessFactoryManager.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.process; + +import org.apache.amoro.process.ProcessFactory; +import org.apache.amoro.server.manager.AbstractPluginManager; + +public class TableProcessFactoryManager extends AbstractPluginManager { + public TableProcessFactoryManager() { + super("process-factories"); + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java index 5d43c5578b..41c8041f22 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java @@ -22,6 +22,7 @@ import org.apache.amoro.TableFormat; import org.apache.amoro.TableRuntime; import org.apache.amoro.process.ActionCoordinator; +import org.apache.amoro.process.ProcessFactory; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.amoro.table.StateKey; import org.apache.amoro.table.TableRuntimeFactory; @@ -48,6 +49,9 @@ public List supportedCoordinators() { return Lists.newArrayList(); } + @Override + public void initialize(List factories) {} + @Override public Optional accept( ServerTableIdentifier tableIdentifier, Map tableProperties) { diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java index 464cd5601d..6deeb340d9 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java @@ -44,6 +44,7 @@ import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.amoro.shade.guava32.com.google.common.collect.Sets; import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.amoro.table.TableRuntimeFactory; import org.apache.amoro.table.TableSummary; import org.apache.amoro.utils.TablePropertyUtil; import org.slf4j.Logger; @@ -85,19 +86,19 @@ public class DefaultTableService extends PersistentBase implements TableService private final CompletableFuture initialized = new CompletableFuture<>(); private final Configurations serverConfiguration; private final CatalogManager catalogManager; - private final TableRuntimeFactoryManager tableRuntimeFactoryManager; + private final List tableRuntimeFactoryList; private RuntimeHandlerChain headHandler; private ExecutorService tableExplorerExecutors; public DefaultTableService( Configurations configuration, CatalogManager catalogManager, - TableRuntimeFactoryManager tableRuntimeFactoryManager) { + List tableRuntimeFactoryList) { this.catalogManager = catalogManager; this.externalCatalogRefreshingInterval = configuration.get(AmoroManagementConf.REFRESH_EXTERNAL_CATALOGS_INTERVAL).toMillis(); this.serverConfiguration = configuration; - this.tableRuntimeFactoryManager = tableRuntimeFactoryManager; + this.tableRuntimeFactoryList = tableRuntimeFactoryList; } @Override @@ -515,7 +516,7 @@ private Optional createTableRuntime( ServerTableIdentifier identifier, TableRuntimeMeta runtimeMeta, List restoredStates) { - return tableRuntimeFactoryManager.installedPlugins().stream() + return tableRuntimeFactoryList.stream() .map(f -> f.accept(identifier, runtimeMeta.getTableConfig())) .filter(Optional::isPresent) .map(Optional::get) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/simple/SimpleActionCoordinator.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/simple/SimpleActionCoordinator.java index 63374fe058..e05f38aa1b 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/simple/SimpleActionCoordinator.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/simple/SimpleActionCoordinator.java @@ -23,51 +23,37 @@ import org.apache.amoro.TableRuntime; import org.apache.amoro.process.ActionCoordinator; import org.apache.amoro.process.ProcessFactory; +import org.apache.amoro.process.ProcessTriggerStrategy; import org.apache.amoro.process.TableProcess; import org.apache.amoro.process.TableProcessStore; +import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; -import java.time.Duration; -import java.util.Map; -import java.util.Set; +import java.util.Optional; public class SimpleActionCoordinator implements ActionCoordinator { - private static final String PARALLELISM = "parallelism"; - private static final int PARALLELISM_DEFAULT = 1; - private static final String CHECK_INTERVAL = "check-interval"; - private static final long CHECK_INTERVAL_DEFAULT = Duration.ofMinutes(5).toMillis(); - private final Action action; private final ProcessFactory factory; - private final Set supportedFormats; - - private final int parallelism; - private final long checkInterval; + private final TableFormat format; + private final ProcessTriggerStrategy strategy; - public SimpleActionCoordinator( - Action action, - ProcessFactory factory, - Set supportedFormats, - Map configuration) { + public SimpleActionCoordinator(TableFormat format, Action action, ProcessFactory factory) { this.action = action; this.factory = factory; - this.supportedFormats = supportedFormats; - this.parallelism = - Integer.parseInt( - configuration.getOrDefault(PARALLELISM, Integer.toString(PARALLELISM_DEFAULT))); - this.checkInterval = - Long.parseLong( - configuration.getOrDefault(CHECK_INTERVAL, Long.toString(CHECK_INTERVAL_DEFAULT))); + this.format = format; + this.strategy = factory.triggerStrategy(format, action); + Preconditions.checkArgument( + strategy != null, "ProcessTriggerStrategy cannot be null for %s: %s", format, action); } @Override public boolean formatSupported(TableFormat format) { - return supportedFormats.contains(format); + return this.format.equals(format); } @Override public int parallelism() { - return parallelism; + return strategy.getTriggerParallelism(); } @Override @@ -77,7 +63,7 @@ public Action action() { @Override public long getNextExecutingTime(TableRuntime tableRuntime) { - return checkInterval; + return strategy.getTriggerInterval().toMillis(); } @Override @@ -87,17 +73,12 @@ public boolean enabled(TableRuntime tableRuntime) { @Override public long getExecutorDelay() { - return checkInterval; - } - - @Override - public boolean isReady(TableRuntime tableRuntime) { - return factory.readyForAction(tableRuntime, action); + return strategy.getTriggerInterval().toMillis(); } @Override - public TableProcess createTableProcess(TableRuntime tableRuntime) { - return factory.create(tableRuntime, action); + public Optional trigger(TableRuntime tableRuntime) { + return factory.trigger(tableRuntime, action); } @Override diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/simple/SimpleTableRuntimeFactory.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/simple/SimpleTableRuntimeFactory.java index 9b602ee665..93fdfd9ded 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/simple/SimpleTableRuntimeFactory.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/simple/SimpleTableRuntimeFactory.java @@ -24,10 +24,10 @@ import org.apache.amoro.TableRuntime; import org.apache.amoro.process.ActionCoordinator; import org.apache.amoro.process.ProcessFactory; -import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; import org.apache.amoro.shade.guava32.com.google.common.collect.Sets; +import org.apache.amoro.shade.thrift.org.apache.commons.lang3.tuple.Pair; import org.apache.amoro.table.StateKey; import org.apache.amoro.table.TableRuntimeFactory; import org.apache.amoro.table.TableRuntimeStore; @@ -38,32 +38,47 @@ import java.util.Set; public class SimpleTableRuntimeFactory implements TableRuntimeFactory { - private static final String SUPPORT_FORMATS = "support-formats"; - private static final String SUPPORT_ACTIONS = "support-actions"; - private static final String SUPPORT_FORMAT_ACTIONS_PREFIX = "support-format-actions."; - private static final String PROCESS_FACTORY_IMPL = "process-factory-impl"; - private final Set supportFormats = Sets.newHashSet(); - private final Map> supportedActions = Maps.newHashMap(); - private final Map> actionConfigs = Maps.newHashMap(); - private ProcessFactory processFactory; + // private final Set supportFormats = Sets.newHashSet(); + // private final Map> supportedActions = Maps.newHashMap(); + // private final Map> actionConfigs = Maps.newHashMap(); + // private ProcessFactory processFactory; private final List supportedCoordinators = Lists.newArrayList(); + private final Map, ProcessFactory> actions = Maps.newHashMap(); + private final Set supportFormats = Sets.newHashSet(); + @Override - public void open(Map properties) { - Preconditions.checkArgument( - properties.containsKey(SUPPORT_FORMATS), SUPPORT_FORMATS + " is required"); - String supportFormatsStr = properties.get(SUPPORT_FORMATS); - for (String format : supportFormatsStr.split(",")) { - supportFormats.add(TableFormat.register(format.trim().toUpperCase())); - } - initializeProcessFactory(properties); - initializeActions(properties); - } + public void open(Map properties) {} @Override public void close() {} + public void initialize(List processFactories) { + processFactories.forEach( + f -> { + Map> supportedActions = f.supportedActions(); + for (TableFormat format : supportedActions.keySet()) { + Set supportedActionsForFormat = supportedActions.get(format); + for (Action action : supportedActionsForFormat) { + ProcessFactory exists = actions.get(Pair.of(format, action)); + if (exists != null) { + throw new IllegalArgumentException( + String.format( + "Plugin conflict, ProcessFactory: %s is supported for format: %s, action: %s, conflict with %s", + exists.name(), format, action, f.name())); + } + actions.put(Pair.of(format, action), f); + } + } + }); + actions.keySet().forEach(e -> supportFormats.add(e.getLeft())); + for (Pair key : actions.keySet()) { + supportedCoordinators.add( + new SimpleActionCoordinator(key.getLeft(), key.getRight(), actions.get(key))); + } + } + @Override public List supportedCoordinators() { return supportedCoordinators; @@ -75,83 +90,7 @@ public Optional accept( if (!supportFormats.contains(tableIdentifier.getFormat())) { return Optional.empty(); } - return Optional.of(new TableRuntimeCreatorImpl()); - } - - private void initializeActions(Map properties) { - if (properties.containsKey(SUPPORT_ACTIONS)) { - String supportActionsStr = properties.get(SUPPORT_ACTIONS); - for (String action : supportActionsStr.split(",")) { - for (TableFormat format : supportFormats) { - supportedActions - .computeIfAbsent(format, k -> Sets.newHashSet()) - .add(Action.register(action.trim().toUpperCase())); - } - } - } - properties.forEach( - (key, value) -> { - if (key.startsWith(SUPPORT_FORMAT_ACTIONS_PREFIX)) { - TableFormat format = - TableFormat.valueOf( - key.substring(SUPPORT_FORMAT_ACTIONS_PREFIX.length()).trim().toUpperCase()); - Preconditions.checkArgument( - format == null || !supportFormats.contains(format), - "Unsupported format: " + format); - for (String action : value.split(",")) { - supportedActions - .computeIfAbsent(format, k -> Sets.newHashSet()) - .add(Action.register(action.trim().toUpperCase())); - } - } - }); - Preconditions.checkArgument( - !supportedActions.isEmpty(), - SUPPORT_ACTIONS + " or " + SUPPORT_FORMAT_ACTIONS_PREFIX + " is required"); - supportedActions.forEach( - (f, actions) -> - actions.forEach(a -> actionConfigs.computeIfAbsent(a, k -> Maps.newHashMap()))); - - properties.forEach( - (key, value) -> { - if (key.startsWith("action.")) { - String actionConfigKey = key.substring("action.".length()).trim().toUpperCase(); - int pos = actionConfigKey.indexOf("."); - String actionName = actionConfigKey.substring(0, pos); - Action action = Action.valueOf(actionName); - if (action != null && actionConfigs.containsKey(action)) { - String configKey = actionConfigKey.substring(pos + 1).trim(); - actionConfigs.get(action).put(configKey, value); - } - } - }); - - for (Action action : actionConfigs.keySet()) { - Set formatSet = Sets.newHashSet(); - supportedActions.forEach( - (f, actions) -> { - if (actions.contains(action)) { - formatSet.add(f); - } - }); - supportedCoordinators.add( - new SimpleActionCoordinator( - action, processFactory, formatSet, actionConfigs.get(action))); - } - } - - private void initializeProcessFactory(Map properties) { - Preconditions.checkArgument( - properties.containsKey(PROCESS_FACTORY_IMPL), PROCESS_FACTORY_IMPL + " is required"); - try { - processFactory = - (ProcessFactory) - Class.forName(properties.get(PROCESS_FACTORY_IMPL)) - .getDeclaredConstructor() - .newInstance(); - } catch (Exception e) { - throw new RuntimeException("Failed to create process factory", e); - } + return Optional.of(new TableRuntimeCreatorImpl(tableIdentifier.getFormat())); } @Override @@ -160,9 +99,26 @@ public String name() { } private class TableRuntimeCreatorImpl implements TableRuntimeFactory.TableRuntimeCreator { + + private final TableFormat format; + + public TableRuntimeCreatorImpl(TableFormat format) { + this.format = format; + } + @Override public List> requiredStateKeys() { - return processFactory.requiredStates(); + Map> requiredStates = Maps.newHashMap(); + actions.keySet().stream() + .filter(e -> e.getLeft().equals(format)) + .forEach( + e -> { + ProcessFactory factory = actions.get(e); + factory + .requiredStates() + .forEach(stateKey -> requiredStates.put(stateKey.getKey(), stateKey)); + }); + return Lists.newArrayList(requiredStates.values()); } @Override diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java index 7cc2312900..fbe28ead4e 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java @@ -63,7 +63,7 @@ public static void initTableService() { AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT, Duration.ofMillis(30000L)); TABLE_SERVICE = new DefaultTableService( - new Configurations(), CATALOG_MANAGER, tableRuntimeFactoryManager); + new Configurations(), CATALOG_MANAGER, Lists.newArrayList(runtimeFactory)); OPTIMIZING_SERVICE = new DefaultOptimizingService( configurations, CATALOG_MANAGER, OPTIMIZER_MANAGER, TABLE_SERVICE); diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/process/MockActionCoordinator.java b/amoro-ams/src/test/java/org/apache/amoro/server/process/MockActionCoordinator.java index 7027f606ef..9719be9475 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/process/MockActionCoordinator.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/process/MockActionCoordinator.java @@ -27,6 +27,7 @@ import org.apache.amoro.server.utils.SnowflakeIdGenerator; import java.util.HashMap; +import java.util.Optional; /** Mock implementation of {@link ActionCoordinator} used in tests. */ public class MockActionCoordinator implements ActionCoordinator { @@ -94,7 +95,7 @@ public long getExecutorDelay() { * @return mock process */ @Override - public TableProcess createTableProcess(TableRuntime tableRuntime) { + public Optional trigger(TableRuntime tableRuntime) { TableProcessMeta tableProcessMeta = TableProcessMeta.of( SNOWFLAKE_ID_GENERATOR.generateId(), @@ -106,7 +107,7 @@ public TableProcess createTableProcess(TableRuntime tableRuntime) { new DefaultTableProcessStore( tableProcessMeta.getProcessId(), tableRuntime, tableProcessMeta, action(), 3); MockTableProcess mockTableProcess = new MockTableProcess(tableRuntime, tableProcessStore); - return mockTableProcess; + return Optional.of(mockTableProcess); } /** diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/AMSTableTestBase.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/AMSTableTestBase.java index 5efab334b6..456a5da85e 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/table/AMSTableTestBase.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/AMSTableTestBase.java @@ -37,6 +37,7 @@ import org.apache.amoro.server.catalog.InternalCatalog; import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; import org.apache.amoro.table.MixedTable; +import org.apache.amoro.table.TableRuntimeFactory; import org.apache.amoro.utils.CatalogUtil; import org.apache.amoro.utils.ConvertStructUtil; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; @@ -69,6 +70,8 @@ public class AMSTableTestBase extends AMSServiceTestBase { private final boolean autoInitTable; private ServerTableIdentifier serverTableIdentifier; + protected final TableRuntimeFactory tableRuntimeFactory; + public AMSTableTestBase(CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) { this(catalogTestHelper, tableTestHelper, false); } @@ -78,6 +81,7 @@ public AMSTableTestBase( this.catalogTestHelper = catalogTestHelper; this.tableTestHelper = tableTestHelper; this.autoInitTable = autoInitTable; + this.tableRuntimeFactory = new DefaultTableRuntimeFactory(); } @Before diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java index 792faebcf7..15409e028d 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java @@ -41,7 +41,6 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.mockito.Mockito; import java.util.List; @@ -49,7 +48,6 @@ public class TestDefaultTableRuntimeHandler extends AMSTableTestBase { private DefaultTableService tableService; - private final TableRuntimeFactoryManager runtimeFactoryManager; @Parameterized.Parameters(name = "{0}, {1}") public static Object[] parameters() { @@ -66,16 +64,13 @@ public static Object[] parameters() { public TestDefaultTableRuntimeHandler( CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) { super(catalogTestHelper, tableTestHelper, false); - DefaultTableRuntimeFactory runtimeFactory = new DefaultTableRuntimeFactory(); - runtimeFactoryManager = Mockito.mock(TableRuntimeFactoryManager.class); - Mockito.when(runtimeFactoryManager.installedPlugins()) - .thenReturn(Lists.newArrayList(runtimeFactory)); } @Test public void testInitialize() throws Exception { tableService = - new DefaultTableService(new Configurations(), CATALOG_MANAGER, runtimeFactoryManager); + new DefaultTableService( + new Configurations(), CATALOG_MANAGER, Lists.newArrayList(tableRuntimeFactory)); TestHandler handler = new TestHandler(); tableService.addHandlerChain(handler); tableService.initialize(); @@ -95,7 +90,8 @@ public void testInitialize() throws Exception { // initialize with a history table tableService = - new DefaultTableService(new Configurations(), CATALOG_MANAGER, runtimeFactoryManager); + new DefaultTableService( + new Configurations(), CATALOG_MANAGER, Lists.newArrayList(tableRuntimeFactory)); handler = new TestHandler(); tableService.addHandlerChain(handler); tableService.initialize(); @@ -133,7 +129,8 @@ public void testInitialize() throws Exception { @Test public void testRefreshUpdatesOptimizerGroup() throws Exception { tableService = - new DefaultTableService(new Configurations(), CATALOG_MANAGER, runtimeFactoryManager); + new DefaultTableService( + new Configurations(), CATALOG_MANAGER, Lists.newArrayList(tableRuntimeFactory)); TestHandler handler = new TestHandler(); tableService.addHandlerChain(handler); tableService.initialize(); diff --git a/amoro-common/src/main/java/org/apache/amoro/process/ActionCoordinator.java b/amoro-common/src/main/java/org/apache/amoro/process/ActionCoordinator.java index 9c97f11742..18af6387ec 100644 --- a/amoro-common/src/main/java/org/apache/amoro/process/ActionCoordinator.java +++ b/amoro-common/src/main/java/org/apache/amoro/process/ActionCoordinator.java @@ -22,6 +22,8 @@ import org.apache.amoro.TableFormat; import org.apache.amoro.TableRuntime; +import java.util.Optional; + /** * Coordinator for a specific {@link org.apache.amoro.Action} to manage table processes. Provides * scheduling parameters and lifecycle hooks to create/recover/cancel/retry table processes. @@ -73,16 +75,13 @@ public interface ActionCoordinator { */ long getExecutorDelay(); - /** Check whether the given table runtime is ready to create a process. */ - boolean isReady(TableRuntime tableRuntime); - /** * Create a new {@link TableProcess} instance for the given table runtime. * * @param tableRuntime table runtime * @return a new table process */ - TableProcess createTableProcess(TableRuntime tableRuntime); + Optional trigger(TableRuntime tableRuntime); /** * Recover a {@link TableProcess} from persisted store. diff --git a/amoro-common/src/main/java/org/apache/amoro/process/ProcessFactory.java b/amoro-common/src/main/java/org/apache/amoro/process/ProcessFactory.java index 28d31c61f7..6e325d6934 100644 --- a/amoro-common/src/main/java/org/apache/amoro/process/ProcessFactory.java +++ b/amoro-common/src/main/java/org/apache/amoro/process/ProcessFactory.java @@ -19,33 +19,42 @@ package org.apache.amoro.process; import org.apache.amoro.Action; +import org.apache.amoro.ActivePlugin; +import org.apache.amoro.TableFormat; import org.apache.amoro.TableRuntime; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.amoro.table.StateKey; import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; /** * A factory to create a process. Normally, There will be default ProcessFactories for each action * and used by default scheduler. Meanwhile, user could extend external ProcessFactory to run jobs * on external resources like Yarn. */ -public interface ProcessFactory { +public interface ProcessFactory extends ActivePlugin { default List> requiredStates() { return Lists.newArrayList(); } - boolean readyForAction(TableRuntime tableRuntime, Action action); + /** Get supported actions for each table format. */ + Map> supportedActions(); + + /** How to trigger a process for the action. */ + ProcessTriggerStrategy triggerStrategy(TableFormat format, Action action); /** - * Create a process for the action. + * Try trigger a process for the action. * * @param tableRuntime table runtime * @param action action type * @return target process which has not been submitted yet. */ - TableProcess create(TableRuntime tableRuntime, Action action); + Optional trigger(TableRuntime tableRuntime, Action action); /** * Recover a process for the action from a state. @@ -54,5 +63,5 @@ default List> requiredStates() { * @param state state of the process * @return target process which has not been submitted yet. */ - TableProcess recover(TableRuntime tableRuntime, TableProcessStore state); + TableProcess recover(TableRuntime tableRuntime, TableProcessStore store); } diff --git a/amoro-common/src/main/java/org/apache/amoro/process/ProcessTriggerStrategy.java b/amoro-common/src/main/java/org/apache/amoro/process/ProcessTriggerStrategy.java new file mode 100644 index 0000000000..2f54a4ed20 --- /dev/null +++ b/amoro-common/src/main/java/org/apache/amoro/process/ProcessTriggerStrategy.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.process; + +import java.time.Duration; + +/** Process trigger strategy. */ +public final class ProcessTriggerStrategy { + + private final Duration triggerInterval; + + private final boolean triggerOnNewSnapshot; + + private final int triggerParallelism; + + public ProcessTriggerStrategy( + Duration triggerInterval, boolean triggerOnNewSnapshot, int triggerParallelism) { + this.triggerInterval = triggerInterval; + this.triggerOnNewSnapshot = triggerOnNewSnapshot; + this.triggerParallelism = triggerParallelism; + } + + public static ProcessTriggerStrategy triggerAtFixRate(Duration triggerInterval) { + return new ProcessTriggerStrategy(triggerInterval, false, 1); + } + + public Duration getTriggerInterval() { + return triggerInterval; + } + + public boolean isTriggerOnNewSnapshot() { + return triggerOnNewSnapshot; + } + + public int getTriggerParallelism() { + return triggerParallelism; + } +} diff --git a/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java b/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java index 3022bb517b..90e26bd4b6 100644 --- a/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java +++ b/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java @@ -22,6 +22,7 @@ import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.TableRuntime; import org.apache.amoro.process.ActionCoordinator; +import org.apache.amoro.process.ProcessFactory; import java.util.List; import java.util.Map; @@ -32,6 +33,8 @@ public interface TableRuntimeFactory extends ActivePlugin { List supportedCoordinators(); + void initialize(List factories); + Optional accept( ServerTableIdentifier tableIdentifier, Map tableProperties);