diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java index 4fd3e5e9af..2cc5dc3983 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java @@ -32,6 +32,8 @@ import org.apache.amoro.config.Configurations; import org.apache.amoro.config.shade.utils.ConfigShadeUtils; import org.apache.amoro.exception.AmoroRuntimeException; +import org.apache.amoro.process.ActionCoordinator; +import org.apache.amoro.process.ProcessFactory; import org.apache.amoro.server.catalog.CatalogManager; import org.apache.amoro.server.catalog.DefaultCatalogManager; import org.apache.amoro.server.dashboard.DashboardServer; @@ -47,6 +49,8 @@ import org.apache.amoro.server.persistence.HttpSessionHandlerFactory; import org.apache.amoro.server.persistence.SqlSessionFactoryProvider; import org.apache.amoro.server.process.ProcessService; +import org.apache.amoro.server.process.TableProcessFactoryManager; +import org.apache.amoro.server.process.executor.ExecuteEngineManager; import org.apache.amoro.server.resource.ContainerMetadata; import org.apache.amoro.server.resource.Containers; import org.apache.amoro.server.resource.DefaultOptimizerManager; @@ -58,9 +62,12 @@ import org.apache.amoro.server.table.TableManager; import org.apache.amoro.server.table.TableRuntimeFactoryManager; import org.apache.amoro.server.table.TableService; +import org.apache.amoro.server.table.simple.SimpleTableRuntimeFactory; import org.apache.amoro.server.terminal.TerminalManager; import org.apache.amoro.server.utils.ThriftServiceProxy; import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting; +import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; +import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.amoro.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference; @@ -75,6 +82,7 @@ import org.apache.amoro.shade.thrift.org.apache.thrift.transport.TTransportException; import org.apache.amoro.shade.thrift.org.apache.thrift.transport.TTransportFactory; import org.apache.amoro.shade.thrift.org.apache.thrift.transport.layered.TFramedTransport; +import org.apache.amoro.table.TableRuntimeFactory; import org.apache.amoro.utils.IcebergThreadPools; import org.apache.amoro.utils.JacksonUtil; import org.apache.commons.lang3.StringUtils; @@ -96,6 +104,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import java.util.stream.Collectors; public class AmoroServiceContainer { @@ -231,14 +240,32 @@ public void transitionToFollower() { public void startOptimizingService() throws Exception { TableRuntimeFactoryManager tableRuntimeFactoryManager = new TableRuntimeFactoryManager(); tableRuntimeFactoryManager.initialize(); + List tableRuntimeFactories = tableRuntimeFactoryManager.installedPlugins(); + Preconditions.checkArgument( + tableRuntimeFactories.size() == 1, "Only one table runtime factory is supported"); + TableRuntimeFactory tableRuntimeFactory = new SimpleTableRuntimeFactory(); + + TableProcessFactoryManager tableProcessFactoryManager = new TableProcessFactoryManager(); + tableProcessFactoryManager.initialize(); + List processFactories = tableProcessFactoryManager.installedPlugins(); + tableRuntimeFactory.initialize(processFactories); + + List actionCoordinators = + tableRuntimeFactoryManager.installedPlugins().stream() + .flatMap(f -> f.supportedCoordinators().stream()) + .collect(Collectors.toList()); + + ExecuteEngineManager executeEngineManager = new ExecuteEngineManager(); tableService = - new DefaultTableService(serviceConfig, catalogManager, tableRuntimeFactoryManager); + new DefaultTableService( + serviceConfig, catalogManager, Lists.newArrayList(tableRuntimeFactory)); optimizingService = new DefaultOptimizingService(serviceConfig, catalogManager, optimizerManager, tableService); - processService = new ProcessService(serviceConfig, tableService); + processService = + new ProcessService(serviceConfig, tableService, actionCoordinators, executeEngineManager); LOG.info("Setting up AMS table executors..."); InlineTableExecutors.getInstance().setup(tableService, serviceConfig); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/ProcessStateMapper.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/ProcessStateMapper.java deleted file mode 100644 index f7b7557819..0000000000 --- a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/ProcessStateMapper.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.amoro.server.persistence.mapper; - -import org.apache.amoro.process.TableProcessState; -import org.apache.ibatis.annotations.Insert; -import org.apache.ibatis.annotations.Options; -import org.apache.ibatis.annotations.Param; -import org.apache.ibatis.annotations.Result; -import org.apache.ibatis.annotations.ResultMap; -import org.apache.ibatis.annotations.Results; -import org.apache.ibatis.annotations.Select; -import org.apache.ibatis.annotations.Update; - -import java.util.Map; - -public interface ProcessStateMapper { - - @Insert( - "INSERT INTO table_process_state " - + "(process_id, action, table_id, retry_num, status, start_time, end_time, fail_reason, summary) " - + "VALUES " - + "(#{id}, #{action}, #{tableId}, #{retryNumber}, #{status}, #{startTime}, #{endTime}, #{failedReason}, #{summary})") - @Options(useGeneratedKeys = true, keyProperty = "id") - void createProcessState(TableProcessState state); - - @Update( - "UPDATE table_process_state " - + "SET status = #{status}, start_time = #{startTime} " - + "WHERE process_id = #{id} and retry_num = #{retryNumber}") - void updateProcessRunning(TableProcessState state); - - @Update( - "UPDATE table_process_state " - + "SET status = #{status}, end_time = #{endTime} " - + "WHERE process_id = #{id} and retry_num = #{retryNumber}") - void updateProcessCompleted(TableProcessState state); - - @Update( - "UPDATE table_process_state " - + "SET status = #{status}, end_time = #{endTime}, fail_reason = #{failedReason} " - + "WHERE process_id = #{id} and retry_num = #{retryNumber}") - void updateProcessFailed(TableProcessState state); - - @Select( - "SELECT process_id, action, table_id, retry_num, status, start_time, end_time, fail_reason, summary " - + "FROM table_process_state " - + "WHERE process_id = #{processId}") - @Results( - id = "TableProcessStateResultMap", - value = { - @Result(property = "id", column = "process_id"), - @Result(property = "action", column = "action"), - @Result(property = "tableId", column = "table_id"), - @Result(property = "retryNumber", column = "retry_num"), - @Result(property = "status", column = "status"), - @Result(property = "startTime", column = "start_time"), - @Result(property = "endTime", column = "end_time"), - @Result(property = "failedReason", column = "fail_reason"), - @Result(property = "summary", column = "summary", javaType = Map.class) - }) - TableProcessState getProcessStateById(@Param("processId") long processId); - - /** Query TableProcessState by table_id */ - @Select( - "SELECT process_id, action, table_id, retry_num, status, start_time, end_time, fail_reason, summary " - + "FROM table_process_state " - + "WHERE table_id = #{tableId}") - @ResultMap("TableProcessStateResultMap") - TableProcessState getProcessStateByTableId(@Param("tableId") long tableId); -} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java index 83574c6636..daf5869413 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java @@ -20,6 +20,7 @@ import org.apache.amoro.TableFormat; import org.apache.amoro.TableRuntime; +import org.apache.amoro.process.ActionCoordinator; import org.apache.amoro.process.TableProcess; import org.apache.amoro.process.TableProcessStore; import org.apache.amoro.server.scheduler.PeriodicTableScheduler; @@ -27,6 +28,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Optional; + /** * Periodic scheduler that delegates scheduling decisions to an {@link ActionCoordinator}. It * creates, recovers and retries table processes via {@link ProcessService}. @@ -95,8 +98,8 @@ protected boolean enabled(TableRuntime tableRuntime) { */ @Override protected void execute(TableRuntime tableRuntime) { - TableProcess process = coordinator.createTableProcess(tableRuntime); - processService.register(tableRuntime, process); + Optional process = coordinator.trigger(tableRuntime); + process.ifPresent(p -> processService.register(tableRuntime, p)); } /** @@ -110,16 +113,6 @@ protected void recover(TableRuntime tableRuntime, TableProcessStore processStore processService.recover(tableRuntime, process); } - /** - * Retry a failed table process. - * - * @param process process to retry - */ - protected void retry(TableProcess process) { - process = coordinator.retryTableProcess(process); - processService.retry(process); - } - /** * Get executor delay from coordinator. * diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java index ee9f136dc6..78a5ac7c77 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java @@ -25,15 +25,16 @@ import org.apache.amoro.TableRuntime; import org.apache.amoro.config.Configurations; import org.apache.amoro.config.TableConfiguration; +import org.apache.amoro.process.ActionCoordinator; import org.apache.amoro.process.ProcessEvent; import org.apache.amoro.process.ProcessStatus; import org.apache.amoro.process.TableProcess; -import org.apache.amoro.server.manager.AbstractPluginManager; import org.apache.amoro.server.optimizing.OptimizingStatus; import org.apache.amoro.server.persistence.PersistentBase; import org.apache.amoro.server.persistence.mapper.TableProcessMapper; import org.apache.amoro.server.process.executor.EngineType; import org.apache.amoro.server.process.executor.ExecuteEngine; +import org.apache.amoro.server.process.executor.ExecuteEngineManager; import org.apache.amoro.server.process.executor.TableProcessExecutor; import org.apache.amoro.server.table.RuntimeHandlerChain; import org.apache.amoro.server.table.TableService; @@ -59,11 +60,11 @@ public class ProcessService extends PersistentBase { private static final Logger LOG = LoggerFactory.getLogger(ProcessService.class); private final TableService tableService; + private final List actionCoordinatorLists; private final Map actionCoordinators = new ConcurrentHashMap<>(); private final Map executeEngines = new ConcurrentHashMap<>(); - private final ActionCoordinatorManager actionCoordinatorManager; private final ExecuteEngineManager executeEngineManager; private final ProcessRuntimeHandler tableRuntimeHandler = new ProcessRuntimeHandler(); private final ThreadPoolExecutor processExecutionPool = @@ -72,17 +73,13 @@ public class ProcessService extends PersistentBase { private final Map> activeTableProcess = new ConcurrentHashMap<>(); - public ProcessService(Configurations serviceConfig, TableService tableService) { - this(serviceConfig, tableService, new ActionCoordinatorManager(), new ExecuteEngineManager()); - } - public ProcessService( Configurations serviceConfig, TableService tableService, - ActionCoordinatorManager actionCoordinatorManager, + List actionCoordinators, ExecuteEngineManager executeEngineManager) { this.tableService = tableService; - this.actionCoordinatorManager = actionCoordinatorManager; + this.actionCoordinatorLists = actionCoordinators; this.executeEngineManager = executeEngineManager; } @@ -126,15 +123,6 @@ public void recover(TableRuntime tableRuntime, TableProcess process) { executeOrTraceProcess(process); } - /** - * Retry a failed table process. - * - * @param process process to retry - */ - public void retry(TableProcess process) { - executeOrTraceProcess(process); - } - /** * Cancel a table process and release related resources. * @@ -147,7 +135,7 @@ public void cancel(TableProcess process) { /** Dispose the service, shutdown engines and clear active processes. */ public void dispose() { - actionCoordinatorManager.close(); + this.actionCoordinators.values().forEach(RuntimeHandlerChain::dispose); executeEngineManager.close(); processExecutionPool.shutdown(); activeTableProcess.clear(); @@ -155,16 +143,12 @@ public void dispose() { private void initialize(List tableRuntimes) { LOG.info("Initializing process service"); - actionCoordinatorManager.initialize(); - actionCoordinatorManager - .installedPlugins() - .forEach( - actionCoordinator -> { - actionCoordinators.put( - actionCoordinator.action().getName(), - new ActionCoordinatorScheduler( - actionCoordinator, tableService, ProcessService.this)); - }); + actionCoordinatorLists.forEach( + actionCoordinator -> { + actionCoordinators.put( + actionCoordinator.action().getName(), + new ActionCoordinatorScheduler(actionCoordinator, tableService, ProcessService.this)); + }); executeEngineManager.initialize(); executeEngineManager .installedPlugins() @@ -242,7 +226,7 @@ private void executeOrTraceProcess(TableProcess process) { "Regular Retry.", process.getProcessParameters(), process.getSummary()); - scheduler.retry(process); + executeOrTraceProcess(process); } else { untrackTableProcessInstance( process.getTableRuntime().getTableIdentifier(), process.getId()); @@ -551,18 +535,4 @@ protected void doDispose() { // TODO: dispose } } - - /** Manager for {@link ActionCoordinator} plugins. */ - public static class ActionCoordinatorManager extends AbstractPluginManager { - public ActionCoordinatorManager() { - super("action-coordinators"); - } - } - - /** Manager for {@link ExecuteEngine} plugins. */ - public static class ExecuteEngineManager extends AbstractPluginManager { - public ExecuteEngineManager() { - super("execute-engines"); - } - } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessFactoryManager.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessFactoryManager.java new file mode 100644 index 0000000000..c04bde9e3b --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessFactoryManager.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.process; + +import org.apache.amoro.process.ProcessFactory; +import org.apache.amoro.server.manager.AbstractPluginManager; + +public class TableProcessFactoryManager extends AbstractPluginManager { + public TableProcessFactoryManager() { + super("process-factories"); + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java index 311b60e035..6890c25c10 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java @@ -19,7 +19,6 @@ package org.apache.amoro.server.process; import org.apache.amoro.process.ProcessStatus; -import org.apache.amoro.process.TableProcessState; import org.apache.amoro.process.TableProcessStore; import java.util.HashMap; @@ -189,25 +188,6 @@ public static TableProcessMeta fromTableProcessStore(TableProcessStore tableProc return tableProcessMeta; } - @Deprecated - public static TableProcessMeta fromTableProcessState(TableProcessState tableProcessState) { - TableProcessMeta tableProcessMeta = new TableProcessMeta(); - tableProcessMeta.setProcessId(tableProcessState.getId()); - tableProcessMeta.setTableId(tableProcessState.getTableIdentifier().getId()); - tableProcessMeta.setExternalProcessIdentifier(tableProcessState.getExternalProcessIdentifier()); - tableProcessMeta.setStatus(tableProcessState.getStatus()); - tableProcessMeta.setProcessType(tableProcessState.getAction().getName()); - tableProcessMeta.setProcessStage(tableProcessState.getStage().getDesc()); - tableProcessMeta.setExecutionEngine(tableProcessState.getExecutionEngine()); - tableProcessMeta.setRetryNumber(tableProcessState.getRetryNumber()); - tableProcessMeta.setCreateTime(tableProcessState.getStartTime()); - tableProcessMeta.setFinishTime(tableProcessState.getEndTime()); - tableProcessMeta.setFailMessage(tableProcessState.getFailedReason()); - tableProcessMeta.setProcessParameters(tableProcessState.getProcessParameters()); - tableProcessMeta.setSummary(tableProcessState.getSummary()); - return tableProcessMeta; - } - public static TableProcessMeta of( long processId, long tableId, diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/executor/ExecuteEngineManager.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/executor/ExecuteEngineManager.java new file mode 100644 index 0000000000..77921b882a --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/executor/ExecuteEngineManager.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.process.executor; + +import org.apache.amoro.server.manager.AbstractPluginManager; + +/** Manager for {@link org.apache.amoro.server.process.executor.ExecuteEngine} plugins. */ +public class ExecuteEngineManager extends AbstractPluginManager { + public ExecuteEngineManager() { + super("execute-engines"); + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicExternalScheduler.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicExternalScheduler.java deleted file mode 100644 index 346f02772b..0000000000 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicExternalScheduler.java +++ /dev/null @@ -1,278 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.amoro.server.scheduler; - -import org.apache.amoro.Action; -import org.apache.amoro.SupportsProcessPlugins; -import org.apache.amoro.TableRuntime; -import org.apache.amoro.process.AmoroProcess; -import org.apache.amoro.process.ManagedProcess; -import org.apache.amoro.process.ProcessFactory; -import org.apache.amoro.process.ProcessStatus; -import org.apache.amoro.process.SimpleFuture; -import org.apache.amoro.process.TableProcess; -import org.apache.amoro.process.TableProcessState; -import org.apache.amoro.process.TableProcessStore; -import org.apache.amoro.resource.ExternalResourceContainer; -import org.apache.amoro.resource.Resource; -import org.apache.amoro.resource.ResourceManager; -import org.apache.amoro.server.persistence.PersistentBase; -import org.apache.amoro.server.persistence.mapper.TableProcessMapper; -import org.apache.amoro.server.process.DefaultTableProcessStore; -import org.apache.amoro.server.process.TableProcessMeta; -import org.apache.amoro.server.table.TableService; -import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; - -import java.util.List; -import java.util.Optional; - -public abstract class PeriodicExternalScheduler extends PeriodicTableScheduler { - - private final ExternalResourceContainer resourceContainer; - private final ResourceManager resourceManager; - private final ProcessFactory processFactory; - - public PeriodicExternalScheduler( - ResourceManager resourceManager, - ExternalResourceContainer resourceContainer, - Action action, - TableService tableService, - int poolSize) { - super(action, tableService, poolSize); - this.resourceContainer = resourceContainer; - this.resourceManager = resourceManager; - this.processFactory = generateProcessFactory(); - } - - @Override - protected void initHandler(List tableRuntimeList) { - tableRuntimeList.stream() - .filter(t -> t instanceof SupportsProcessPlugins) - .map(t -> (SupportsProcessPlugins) t) - .forEach(tableRuntime -> tableRuntime.install(getAction(), processFactory)); - super.initHandler(tableRuntimeList); - } - - @Override - protected boolean enabled(TableRuntime tableRuntime) { - return Optional.of(tableRuntime) - .filter(t -> t instanceof SupportsProcessPlugins) - .map(t -> (SupportsProcessPlugins) t) - .map(t -> t.enabled(getAction())) - .orElse(false); - } - - @Override - protected void execute(TableRuntime tableRuntime) { - Preconditions.checkArgument(tableRuntime instanceof SupportsProcessPlugins); - SupportsProcessPlugins runtimeSupportProcessPlugin = (SupportsProcessPlugins) tableRuntime; - // Trigger a table process and check conflicts by table runtime - // Update process state after process completed, the callback must be register first - AmoroProcess process = runtimeSupportProcessPlugin.trigger(getAction()); - process.getCompleteFuture().whenCompleted(() -> persistTableProcess(process)); - ManagedProcess managedProcess = new ManagedTableProcess(process); - - // Submit the table process to resource manager, this is a sync operation - // update process completed and delete related resources - managedProcess.submit(); - - // Trace the table process by async framework so that process can be called back when completed - trace(process); - } - - protected int getMaxRetryNumber() { - return 1; - } - - protected abstract void trace(AmoroProcess process); - - protected ProcessFactory generateProcessFactory() { - return new ExternalProcessFactory(); - } - - protected void persistTableProcess(AmoroProcess process) { - TableProcessStore store = process.store(); - if (store.getStatus() == ProcessStatus.SUBMITTED) { - new PersistencyHelper().createProcessState(store); - } else { - new PersistencyHelper().updateProcessStatus(store); - } - } - - private static class PersistencyHelper extends PersistentBase { - - void createProcessState(TableProcessStore store) { - TableProcessMeta meta = TableProcessMeta.fromTableProcessStore(store); - doAs( - TableProcessMapper.class, - mapper -> - mapper.updateProcess( - meta.getTableId(), - meta.getProcessId(), - meta.getExternalProcessIdentifier(), - meta.getStatus(), - meta.getProcessStage(), - meta.getRetryNumber(), - meta.getFinishTime(), - meta.getFailMessage(), - meta.getProcessParameters(), - meta.getSummary())); - } - - void updateProcessStatus(TableProcessStore store) { - TableProcessMeta meta = TableProcessMeta.fromTableProcessStore(store); - doAs( - TableProcessMapper.class, - mapper -> - mapper.updateProcess( - meta.getTableId(), - meta.getProcessId(), - meta.getExternalProcessIdentifier(), - meta.getStatus(), - meta.getProcessStage(), - meta.getRetryNumber(), - meta.getFinishTime(), - meta.getFailMessage(), - meta.getProcessParameters(), - meta.getSummary())); - } - } - - private class ExternalTableProcess extends TableProcess { - - ExternalTableProcess(TableRuntime tableRuntime) { - super( - tableRuntime, - new DefaultTableProcessStore( - tableRuntime, new TableProcessMeta(), PeriodicExternalScheduler.this.getAction())); - } - - ExternalTableProcess(TableRuntime tableRuntime, TableProcessState state) { - super( - tableRuntime, - new DefaultTableProcessStore( - tableRuntime, - TableProcessMeta.fromTableProcessState(state), - PeriodicExternalScheduler.this.getAction())); - } - - @Override - protected void closeInternal() {} - } - - private class ExternalProcessFactory implements ProcessFactory { - - @Override - public AmoroProcess create(TableRuntime tableRuntime, Action action) { - return new ExternalTableProcess(tableRuntime); - } - - @Override - public AmoroProcess recover(TableRuntime tableRuntime, TableProcessState state) { - return new ExternalTableProcess(tableRuntime, state); - } - } - - protected class ManagedTableProcess implements ManagedProcess { - - private final AmoroProcess process; - - ManagedTableProcess(AmoroProcess process) { - this.process = process; - } - - @Override - public void submit() { - Resource resource = resourceContainer.submit(this); - if (resource == null) { - throw new IllegalStateException("Submit table process can not return null resource"); - } - persistTableProcess(this); - resourceManager.createResource(resource); - getCompleteFuture() - .whenCompleted( - () -> { - resourceManager.deleteResource(resource.getResourceId()); - if (store().getStatus() == ProcessStatus.FAILED - && store().getRetryNumber() < getMaxRetryNumber()) { - retry(); - } - }); - store().begin().updateTableProcessStatus(ProcessStatus.SUBMITTED).commit(); - getSubmitFuture().complete(); - } - - @Override - public void complete() { - store() - .begin() - .updateTableProcessStatus(ProcessStatus.SUCCESS) - .updateFinishTime(System.currentTimeMillis()) - .commit(); - process.getCompleteFuture().complete(); - } - - @Override - public void complete(String failedReason) { - store() - .begin() - .updateTableProcessStatus(ProcessStatus.FAILED) - .updateTableProcessFailMessage(failedReason) - .updateFinishTime(System.currentTimeMillis()) - .commit(); - process.getCompleteFuture().complete(); - } - - @Override - public void retry() { - store() - .begin() - .updateTableProcessStatus(ProcessStatus.PENDING) - .updateRetryNumber(store().getRetryNumber() + 1) - .updateExternalProcessIdentifier("") - .commit(); - submit(); - } - - @Override - public void kill() { - store() - .begin() - .updateTableProcessStatus(ProcessStatus.KILLED) - .updateFinishTime(System.currentTimeMillis()) - .commit(); - process.getCompleteFuture().complete(); - } - - @Override - public SimpleFuture getSubmitFuture() { - return process.getSubmitFuture(); - } - - @Override - public SimpleFuture getCompleteFuture() { - return process.getCompleteFuture(); - } - - @Override - public TableProcessStore store() { - return process.store(); - } - } -} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java index e07cd4432d..68b4e1bd4c 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java @@ -18,17 +18,26 @@ package org.apache.amoro.server.table; +import org.apache.amoro.Action; import org.apache.amoro.ServerTableIdentifier; -import org.apache.amoro.SupportsProcessPlugins; import org.apache.amoro.TableRuntime; import org.apache.amoro.config.TableConfiguration; +import org.apache.amoro.process.TableProcessStore; import org.apache.amoro.server.persistence.PersistentBase; +import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; +import org.apache.amoro.shade.zookeeper3.org.apache.curator.shaded.com.google.common.collect.Maps; import org.apache.amoro.table.TableRuntimeStore; -public abstract class AbstractTableRuntime extends PersistentBase - implements TableRuntime, SupportsProcessPlugins { +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; + +public abstract class AbstractTableRuntime extends PersistentBase implements TableRuntime { private final TableRuntimeStore store; + private final Map processContainerMap = Maps.newConcurrentMap(); protected AbstractTableRuntime(TableRuntimeStore store) { this.store = store; @@ -48,6 +57,44 @@ public TableConfiguration getTableConfiguration() { return TableConfigurations.parseTableConfig(store().getTableConfig()); } + @Override + public List getProcessStates() { + return processContainerMap.values().stream() + .flatMap(container -> container.getProcessStates().stream()) + .collect(Collectors.toList()); + } + + @Override + public List getProcessStates(Action action) { + return processContainerMap.get(action).getProcessStates(); + } + + @Override + public void registerProcess(TableProcessStore processStore) { + processContainerMap + .computeIfAbsent(processStore.getAction(), k -> new TableProcessContainer()) + .processLock + .lock(); + try { + processContainerMap + .get(processStore.getAction()) + .processMap + .put(processStore.getProcessId(), processStore); + } finally { + processContainerMap.get(processStore.getAction()).processLock.unlock(); + } + } + + @Override + public void removeProcess(TableProcessStore processStore) { + processContainerMap.computeIfPresent( + processStore.getAction(), + (action, container) -> { + container.processMap.remove(processStore.getProcessId()); + return container; + }); + } + @Override public String getGroupName() { return store().getGroupName(); @@ -61,4 +108,13 @@ public int getStatusCode() { public void dispose() { store().dispose(); } + + private static class TableProcessContainer { + private final Lock processLock = new ReentrantLock(); + private final Map processMap = Maps.newConcurrentMap(); + + public List getProcessStates() { + return Lists.newArrayList(processMap.values()); + } + } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java index 6cf566eef8..17ec8b257d 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java @@ -18,9 +18,7 @@ package org.apache.amoro.server.table; -import org.apache.amoro.Action; import org.apache.amoro.AmoroTable; -import org.apache.amoro.SupportsProcessPlugins; import org.apache.amoro.TableRuntime; import org.apache.amoro.api.BlockableOperation; import org.apache.amoro.config.OptimizingConfig; @@ -30,9 +28,6 @@ import org.apache.amoro.optimizing.OptimizingType; import org.apache.amoro.optimizing.TableRuntimeOptimizingState; import org.apache.amoro.optimizing.plan.AbstractOptimizingEvaluator; -import org.apache.amoro.process.AmoroProcess; -import org.apache.amoro.process.ProcessFactory; -import org.apache.amoro.process.TableProcessStore; import org.apache.amoro.server.AmoroServiceConstants; import org.apache.amoro.server.optimizing.OptimizingProcess; import org.apache.amoro.server.optimizing.OptimizingStatus; @@ -47,7 +42,6 @@ import org.apache.amoro.server.utils.IcebergTableUtil; import org.apache.amoro.server.utils.SnowflakeIdGenerator; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; -import org.apache.amoro.shade.zookeeper3.org.apache.curator.shaded.com.google.common.collect.Maps; import org.apache.amoro.table.BaseTable; import org.apache.amoro.table.ChangeTable; import org.apache.amoro.table.MixedTable; @@ -63,13 +57,9 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; /** Default table runtime implementation. */ -public class DefaultTableRuntime extends AbstractTableRuntime - implements TableRuntime, SupportsProcessPlugins { +public class DefaultTableRuntime extends AbstractTableRuntime implements TableRuntime { private static final Logger LOG = LoggerFactory.getLogger(DefaultTableRuntime.class); @@ -95,7 +85,6 @@ public class DefaultTableRuntime extends AbstractTableRuntime Lists.newArrayList( OPTIMIZING_STATE_KEY, PENDING_INPUT_KEY, PROCESS_ID_KEY, CLEANUP_STATE_KEY); - private final Map processContainerMap = Maps.newConcurrentMap(); private final TableOptimizingMetrics optimizingMetrics; private final TableOrphanFilesCleaningMetrics orphanFilesCleaningMetrics; private final TableSummaryMetrics tableSummaryMetrics; @@ -128,39 +117,6 @@ public void registerMetric(MetricRegistry metricRegistry) { this.tableSummaryMetrics.register(metricRegistry); } - @Override - public AmoroProcess trigger(Action action) { - return Optional.ofNullable(processContainerMap.get(action)) - .map(container -> container.trigger(action)) - // Define a related exception - .orElseThrow(() -> new IllegalArgumentException("No ProcessFactory for action " + action)); - } - - @Override - public void install(Action action, ProcessFactory processFactory) { - if (processContainerMap.putIfAbsent(action, new TableProcessContainer(processFactory)) - != null) { - throw new IllegalStateException("ProcessFactory for action " + action + " already exists"); - } - } - - @Override - public boolean enabled(Action action) { - return processContainerMap.get(action) != null; - } - - @Override - public List getProcessStates() { - return processContainerMap.values().stream() - .flatMap(container -> container.getProcessStates().stream()) - .collect(Collectors.toList()); - } - - @Override - public List getProcessStates(Action action) { - return processContainerMap.get(action).getProcessStates(); - } - public TableOrphanFilesCleaningMetrics getOrphanFilesCleaningMetrics() { return orphanFilesCleaningMetrics; } @@ -582,30 +538,4 @@ private long doRefreshSnapshots(UnkeyedTable table) { return currentSnapshotId; } - - private class TableProcessContainer { - private final Lock processLock = new ReentrantLock(); - private final ProcessFactory processFactory; - private final Map processMap = Maps.newConcurrentMap(); - - TableProcessContainer(ProcessFactory processFactory) { - this.processFactory = processFactory; - } - - public AmoroProcess trigger(Action action) { - processLock.lock(); - try { - AmoroProcess process = processFactory.create(DefaultTableRuntime.this, action); - process.getCompleteFuture().whenCompleted(() -> processMap.remove(process.getId())); - processMap.put(process.getId(), process); - return process; - } finally { - processLock.unlock(); - } - } - - public List getProcessStates() { - return processMap.values().stream().map(AmoroProcess::store).collect(Collectors.toList()); - } - } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java index 86e748cc54..41c8041f22 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java @@ -21,6 +21,9 @@ import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.TableFormat; import org.apache.amoro.TableRuntime; +import org.apache.amoro.process.ActionCoordinator; +import org.apache.amoro.process.ProcessFactory; +import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.amoro.table.StateKey; import org.apache.amoro.table.TableRuntimeFactory; import org.apache.amoro.table.TableRuntimeStore; @@ -41,6 +44,14 @@ public String name() { return "default"; } + @Override + public List supportedCoordinators() { + return Lists.newArrayList(); + } + + @Override + public void initialize(List factories) {} + @Override public Optional accept( ServerTableIdentifier tableIdentifier, Map tableProperties) { diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java index 464cd5601d..6deeb340d9 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java @@ -44,6 +44,7 @@ import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.amoro.shade.guava32.com.google.common.collect.Sets; import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.amoro.table.TableRuntimeFactory; import org.apache.amoro.table.TableSummary; import org.apache.amoro.utils.TablePropertyUtil; import org.slf4j.Logger; @@ -85,19 +86,19 @@ public class DefaultTableService extends PersistentBase implements TableService private final CompletableFuture initialized = new CompletableFuture<>(); private final Configurations serverConfiguration; private final CatalogManager catalogManager; - private final TableRuntimeFactoryManager tableRuntimeFactoryManager; + private final List tableRuntimeFactoryList; private RuntimeHandlerChain headHandler; private ExecutorService tableExplorerExecutors; public DefaultTableService( Configurations configuration, CatalogManager catalogManager, - TableRuntimeFactoryManager tableRuntimeFactoryManager) { + List tableRuntimeFactoryList) { this.catalogManager = catalogManager; this.externalCatalogRefreshingInterval = configuration.get(AmoroManagementConf.REFRESH_EXTERNAL_CATALOGS_INTERVAL).toMillis(); this.serverConfiguration = configuration; - this.tableRuntimeFactoryManager = tableRuntimeFactoryManager; + this.tableRuntimeFactoryList = tableRuntimeFactoryList; } @Override @@ -515,7 +516,7 @@ private Optional createTableRuntime( ServerTableIdentifier identifier, TableRuntimeMeta runtimeMeta, List restoredStates) { - return tableRuntimeFactoryManager.installedPlugins().stream() + return tableRuntimeFactoryList.stream() .map(f -> f.accept(identifier, runtimeMeta.getTableConfig())) .filter(Optional::isPresent) .map(Optional::get) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/simple/SimpleActionCoordinator.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/simple/SimpleActionCoordinator.java new file mode 100644 index 0000000000..e05f38aa1b --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/simple/SimpleActionCoordinator.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.table.simple; + +import org.apache.amoro.Action; +import org.apache.amoro.TableFormat; +import org.apache.amoro.TableRuntime; +import org.apache.amoro.process.ActionCoordinator; +import org.apache.amoro.process.ProcessFactory; +import org.apache.amoro.process.ProcessTriggerStrategy; +import org.apache.amoro.process.TableProcess; +import org.apache.amoro.process.TableProcessStore; +import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; + +import java.util.Optional; + +public class SimpleActionCoordinator implements ActionCoordinator { + + private final Action action; + private final ProcessFactory factory; + private final TableFormat format; + private final ProcessTriggerStrategy strategy; + + public SimpleActionCoordinator(TableFormat format, Action action, ProcessFactory factory) { + this.action = action; + this.factory = factory; + this.format = format; + this.strategy = factory.triggerStrategy(format, action); + Preconditions.checkArgument( + strategy != null, "ProcessTriggerStrategy cannot be null for %s: %s", format, action); + } + + @Override + public boolean formatSupported(TableFormat format) { + return this.format.equals(format); + } + + @Override + public int parallelism() { + return strategy.getTriggerParallelism(); + } + + @Override + public Action action() { + return action; + } + + @Override + public long getNextExecutingTime(TableRuntime tableRuntime) { + return strategy.getTriggerInterval().toMillis(); + } + + @Override + public boolean enabled(TableRuntime tableRuntime) { + return formatSupported(tableRuntime.getFormat()); + } + + @Override + public long getExecutorDelay() { + return strategy.getTriggerInterval().toMillis(); + } + + @Override + public Optional trigger(TableRuntime tableRuntime) { + return factory.trigger(tableRuntime, action); + } + + @Override + public TableProcess recoverTableProcess( + TableRuntime tableRuntime, TableProcessStore processStore) { + return factory.recover(tableRuntime, processStore); + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/simple/SimpleTableRuntime.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/simple/SimpleTableRuntime.java new file mode 100644 index 0000000000..602a4c009b --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/simple/SimpleTableRuntime.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.table.simple; + +import org.apache.amoro.metrics.MetricRegistry; +import org.apache.amoro.server.table.AbstractTableRuntime; +import org.apache.amoro.table.TableRuntimeStore; + +public class SimpleTableRuntime extends AbstractTableRuntime { + protected SimpleTableRuntime(TableRuntimeStore store) { + super(store); + } + + @Override + public void registerMetric(MetricRegistry metricRegistry) {} + + @Override + public void unregisterMetric() {} +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/simple/SimpleTableRuntimeFactory.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/simple/SimpleTableRuntimeFactory.java new file mode 100644 index 0000000000..93fdfd9ded --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/simple/SimpleTableRuntimeFactory.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.table.simple; + +import org.apache.amoro.Action; +import org.apache.amoro.ServerTableIdentifier; +import org.apache.amoro.TableFormat; +import org.apache.amoro.TableRuntime; +import org.apache.amoro.process.ActionCoordinator; +import org.apache.amoro.process.ProcessFactory; +import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; +import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; +import org.apache.amoro.shade.guava32.com.google.common.collect.Sets; +import org.apache.amoro.shade.thrift.org.apache.commons.lang3.tuple.Pair; +import org.apache.amoro.table.StateKey; +import org.apache.amoro.table.TableRuntimeFactory; +import org.apache.amoro.table.TableRuntimeStore; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +public class SimpleTableRuntimeFactory implements TableRuntimeFactory { + + // private final Set supportFormats = Sets.newHashSet(); + // private final Map> supportedActions = Maps.newHashMap(); + // private final Map> actionConfigs = Maps.newHashMap(); + // private ProcessFactory processFactory; + private final List supportedCoordinators = Lists.newArrayList(); + + private final Map, ProcessFactory> actions = Maps.newHashMap(); + private final Set supportFormats = Sets.newHashSet(); + + @Override + public void open(Map properties) {} + + @Override + public void close() {} + + public void initialize(List processFactories) { + processFactories.forEach( + f -> { + Map> supportedActions = f.supportedActions(); + for (TableFormat format : supportedActions.keySet()) { + Set supportedActionsForFormat = supportedActions.get(format); + for (Action action : supportedActionsForFormat) { + ProcessFactory exists = actions.get(Pair.of(format, action)); + if (exists != null) { + throw new IllegalArgumentException( + String.format( + "Plugin conflict, ProcessFactory: %s is supported for format: %s, action: %s, conflict with %s", + exists.name(), format, action, f.name())); + } + actions.put(Pair.of(format, action), f); + } + } + }); + actions.keySet().forEach(e -> supportFormats.add(e.getLeft())); + for (Pair key : actions.keySet()) { + supportedCoordinators.add( + new SimpleActionCoordinator(key.getLeft(), key.getRight(), actions.get(key))); + } + } + + @Override + public List supportedCoordinators() { + return supportedCoordinators; + } + + @Override + public Optional accept( + ServerTableIdentifier tableIdentifier, Map tableProperties) { + if (!supportFormats.contains(tableIdentifier.getFormat())) { + return Optional.empty(); + } + return Optional.of(new TableRuntimeCreatorImpl(tableIdentifier.getFormat())); + } + + @Override + public String name() { + return "simple"; + } + + private class TableRuntimeCreatorImpl implements TableRuntimeFactory.TableRuntimeCreator { + + private final TableFormat format; + + public TableRuntimeCreatorImpl(TableFormat format) { + this.format = format; + } + + @Override + public List> requiredStateKeys() { + Map> requiredStates = Maps.newHashMap(); + actions.keySet().stream() + .filter(e -> e.getLeft().equals(format)) + .forEach( + e -> { + ProcessFactory factory = actions.get(e); + factory + .requiredStates() + .forEach(stateKey -> requiredStates.put(stateKey.getKey(), stateKey)); + }); + return Lists.newArrayList(requiredStates.values()); + } + + @Override + public TableRuntime create(TableRuntimeStore store) { + return new SimpleTableRuntime(store); + } + } +} diff --git a/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql b/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql index f1f91e3a32..e142a2ce95 100644 --- a/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql +++ b/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql @@ -189,20 +189,6 @@ CREATE TABLE `task_runtime` KEY `table_index` (`table_id`, `process_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT 'Optimize task basic information'; -CREATE TABLE `table_process_state` -( - `process_id` bigint(20) NOT NULL COMMENT 'optimizing_procedure UUID', - `action` varchar(16) NOT NULL COMMENT 'process action', - `table_id` bigint(20) NOT NULL, - `retry_num` int(11) DEFAULT NULL COMMENT 'Retry times', - `status` varchar(10) NOT NULL COMMENT 'Direct to TableOptimizingStatus', - `start_time` timestamp DEFAULT CURRENT_TIMESTAMP COMMENT 'First plan time', - `end_time` timestamp NULL DEFAULT NULL COMMENT 'finish time or failed time', - `fail_reason` varchar(4096) DEFAULT NULL COMMENT 'Error message after task failed', - `summary` mediumtext COMMENT 'state summary, usually a map', - PRIMARY KEY (`process_id`), - KEY `table_index` (`table_id`, `start_time`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT 'History of optimizing after each commit'; CREATE TABLE `optimizing_task_quota` ( diff --git a/amoro-ams/src/main/resources/mysql/upgrade.sql b/amoro-ams/src/main/resources/mysql/upgrade.sql index b36064109f..dca7839efc 100644 --- a/amoro-ams/src/main/resources/mysql/upgrade.sql +++ b/amoro-ams/src/main/resources/mysql/upgrade.sql @@ -107,18 +107,6 @@ CREATE TABLE `table_runtime` ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT 'Table running information of each table' ROW_FORMAT=DYNAMIC; -CREATE TABLE `table_runtime_state` ( - `state_id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'Primary key', - `table_id` bigint unsigned NOT NULL COMMENT 'Table identifier id', - `state_key` varchar(256) NOT NULL COMMENT 'Table Runtime state key', - `state_value` mediumtext COMMENT 'Table Runtime state value, string type', - `state_version` bigint NOT NULL DEFAULT '0' COMMENT 'Table runtime state version, auto inc when update', - `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time', - `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'update time', - PRIMARY KEY (`state_id`), - UNIQUE KEY `uniq_table_state_key` (`table_id`,`state_key`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='State of Table Runtimes'; - INSERT INTO table_runtime (`table_id`, `group_name`, `status_code`, `status_code_update_time`, `table_config`, `table_summary`) SELECT `table_id`, `optimizer_group`, `optimizing_status_code`, `optimizing_status_start_time`, diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java index 9f3d25b3f7..fbe28ead4e 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java @@ -19,10 +19,12 @@ package org.apache.amoro.server; import org.apache.amoro.config.Configurations; +import org.apache.amoro.process.ActionCoordinator; import org.apache.amoro.resource.ResourceGroup; import org.apache.amoro.server.manager.EventsManager; import org.apache.amoro.server.manager.MetricManager; import org.apache.amoro.server.process.ProcessService; +import org.apache.amoro.server.process.executor.ExecuteEngineManager; import org.apache.amoro.server.table.DefaultTableRuntime; import org.apache.amoro.server.table.DefaultTableRuntimeFactory; import org.apache.amoro.server.table.DefaultTableService; @@ -34,6 +36,8 @@ import org.mockito.Mockito; import java.time.Duration; +import java.util.List; +import java.util.stream.Collectors; public abstract class AMSServiceTestBase extends AMSManagerTestBase { private static DefaultTableService TABLE_SERVICE = null; @@ -48,6 +52,10 @@ public static void initTableService() { tableRuntimeFactoryManager = Mockito.mock(TableRuntimeFactoryManager.class); Mockito.when(tableRuntimeFactoryManager.installedPlugins()) .thenReturn(Lists.newArrayList(runtimeFactory)); + List actionCoordinators = + tableRuntimeFactoryManager.installedPlugins().stream() + .flatMap(factory -> factory.supportedCoordinators().stream()) + .collect(Collectors.toList()); try { Configurations configurations = new Configurations(); configurations.set(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT, Duration.ofMillis(800L)); @@ -55,11 +63,13 @@ public static void initTableService() { AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT, Duration.ofMillis(30000L)); TABLE_SERVICE = new DefaultTableService( - new Configurations(), CATALOG_MANAGER, tableRuntimeFactoryManager); + new Configurations(), CATALOG_MANAGER, Lists.newArrayList(runtimeFactory)); OPTIMIZING_SERVICE = new DefaultOptimizingService( configurations, CATALOG_MANAGER, OPTIMIZER_MANAGER, TABLE_SERVICE); - PROCESS_SERVICE = new ProcessService(configurations, TABLE_SERVICE); + PROCESS_SERVICE = + new ProcessService( + configurations, TABLE_SERVICE, actionCoordinators, new ExecuteEngineManager()); TABLE_SERVICE.addHandlerChain(OPTIMIZING_SERVICE.getTableRuntimeHandler()); TABLE_SERVICE.addHandlerChain(PROCESS_SERVICE.getTableHandlerChain()); diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/process/MockActionCoordinator.java b/amoro-ams/src/test/java/org/apache/amoro/server/process/MockActionCoordinator.java index da3617a148..9719be9475 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/process/MockActionCoordinator.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/process/MockActionCoordinator.java @@ -21,18 +21,21 @@ import org.apache.amoro.Action; import org.apache.amoro.TableFormat; import org.apache.amoro.TableRuntime; +import org.apache.amoro.process.ActionCoordinator; import org.apache.amoro.process.TableProcess; import org.apache.amoro.process.TableProcessStore; +import org.apache.amoro.server.utils.SnowflakeIdGenerator; import java.util.HashMap; -import java.util.Map; +import java.util.Optional; /** Mock implementation of {@link ActionCoordinator} used in tests. */ public class MockActionCoordinator implements ActionCoordinator { public static final int PROCESS_MAX_POOL_SIZE = 1000; private static final TableFormat[] DEFAULT_FORMATS = new TableFormat[] {TableFormat.PAIMON}; + SnowflakeIdGenerator SNOWFLAKE_ID_GENERATOR = new SnowflakeIdGenerator(); - public static final Action DEFAULT_ACTION = new Action(DEFAULT_FORMATS, 0, "default_action"); + public static final Action DEFAULT_ACTION = Action.register("default_action"); /** * Whether the format is supported. @@ -92,19 +95,19 @@ public long getExecutorDelay() { * @return mock process */ @Override - public TableProcess createTableProcess(TableRuntime tableRuntime) { + public Optional trigger(TableRuntime tableRuntime) { TableProcessMeta tableProcessMeta = TableProcessMeta.of( SNOWFLAKE_ID_GENERATOR.generateId(), tableRuntime.getTableIdentifier().getId(), action().getName(), - executionEngine(), + "default", new HashMap<>()); TableProcessStore tableProcessStore = new DefaultTableProcessStore( tableProcessMeta.getProcessId(), tableRuntime, tableProcessMeta, action(), 3); MockTableProcess mockTableProcess = new MockTableProcess(tableRuntime, tableProcessStore); - return mockTableProcess; + return Optional.of(mockTableProcess); } /** @@ -131,18 +134,4 @@ public TableProcess cancelTableProcess(TableRuntime tableRuntime, TableProcess p public TableProcess retryTableProcess(TableProcess process) { return process; } - - /** Open plugin. */ - @Override - public void open(Map properties) {} - - /** Close plugin. */ - @Override - public void close() {} - - /** Plugin name. */ - @Override - public String name() { - return "mock_action_coordinator"; - } } diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/AMSTableTestBase.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/AMSTableTestBase.java index 5efab334b6..456a5da85e 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/table/AMSTableTestBase.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/AMSTableTestBase.java @@ -37,6 +37,7 @@ import org.apache.amoro.server.catalog.InternalCatalog; import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; import org.apache.amoro.table.MixedTable; +import org.apache.amoro.table.TableRuntimeFactory; import org.apache.amoro.utils.CatalogUtil; import org.apache.amoro.utils.ConvertStructUtil; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; @@ -69,6 +70,8 @@ public class AMSTableTestBase extends AMSServiceTestBase { private final boolean autoInitTable; private ServerTableIdentifier serverTableIdentifier; + protected final TableRuntimeFactory tableRuntimeFactory; + public AMSTableTestBase(CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) { this(catalogTestHelper, tableTestHelper, false); } @@ -78,6 +81,7 @@ public AMSTableTestBase( this.catalogTestHelper = catalogTestHelper; this.tableTestHelper = tableTestHelper; this.autoInitTable = autoInitTable; + this.tableRuntimeFactory = new DefaultTableRuntimeFactory(); } @Before diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java index 792faebcf7..15409e028d 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java @@ -41,7 +41,6 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.mockito.Mockito; import java.util.List; @@ -49,7 +48,6 @@ public class TestDefaultTableRuntimeHandler extends AMSTableTestBase { private DefaultTableService tableService; - private final TableRuntimeFactoryManager runtimeFactoryManager; @Parameterized.Parameters(name = "{0}, {1}") public static Object[] parameters() { @@ -66,16 +64,13 @@ public static Object[] parameters() { public TestDefaultTableRuntimeHandler( CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) { super(catalogTestHelper, tableTestHelper, false); - DefaultTableRuntimeFactory runtimeFactory = new DefaultTableRuntimeFactory(); - runtimeFactoryManager = Mockito.mock(TableRuntimeFactoryManager.class); - Mockito.when(runtimeFactoryManager.installedPlugins()) - .thenReturn(Lists.newArrayList(runtimeFactory)); } @Test public void testInitialize() throws Exception { tableService = - new DefaultTableService(new Configurations(), CATALOG_MANAGER, runtimeFactoryManager); + new DefaultTableService( + new Configurations(), CATALOG_MANAGER, Lists.newArrayList(tableRuntimeFactory)); TestHandler handler = new TestHandler(); tableService.addHandlerChain(handler); tableService.initialize(); @@ -95,7 +90,8 @@ public void testInitialize() throws Exception { // initialize with a history table tableService = - new DefaultTableService(new Configurations(), CATALOG_MANAGER, runtimeFactoryManager); + new DefaultTableService( + new Configurations(), CATALOG_MANAGER, Lists.newArrayList(tableRuntimeFactory)); handler = new TestHandler(); tableService.addHandlerChain(handler); tableService.initialize(); @@ -133,7 +129,8 @@ public void testInitialize() throws Exception { @Test public void testRefreshUpdatesOptimizerGroup() throws Exception { tableService = - new DefaultTableService(new Configurations(), CATALOG_MANAGER, runtimeFactoryManager); + new DefaultTableService( + new Configurations(), CATALOG_MANAGER, Lists.newArrayList(tableRuntimeFactory)); TestHandler handler = new TestHandler(); tableService.addHandlerChain(handler); tableService.initialize(); diff --git a/amoro-common/src/main/java/org/apache/amoro/Action.java b/amoro-common/src/main/java/org/apache/amoro/Action.java index 42671a6e1d..1d5f81b3bf 100644 --- a/amoro-common/src/main/java/org/apache/amoro/Action.java +++ b/amoro-common/src/main/java/org/apache/amoro/Action.java @@ -20,38 +20,33 @@ import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; -import java.util.Arrays; +import java.util.Locale; +import java.util.Map; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; public final class Action { private static final int MAX_NAME_LENGTH = 16; - - /** supported table formats of this action */ - private final TableFormat[] formats; + private static final Map registeredActions = new ConcurrentHashMap<>(); private final String name; - /** - * the weight number of this action, the bigger the weight number, the higher positions of - * schedulers or front pages - */ - private final int weight; - public Action(TableFormat[] formats, int weight, String name) { - Preconditions.checkArgument( - name.length() <= MAX_NAME_LENGTH, - "Action name length should be less than " + MAX_NAME_LENGTH); - this.formats = formats; - this.name = name; - this.weight = weight; + public static Action register(String name) { + final String regularName = name.trim().toUpperCase(Locale.ROOT); + return registeredActions.computeIfAbsent(regularName, s -> new Action(regularName)); } - public int getWeight() { - return weight; + public static Action valueOf(String name) { + final String regularName = name.trim().toUpperCase(Locale.ROOT); + return registeredActions.get(regularName); } - public TableFormat[] supportedFormats() { - return formats; + private Action(String name) { + Preconditions.checkArgument( + name.length() <= MAX_NAME_LENGTH, + "Action name length should be less than " + MAX_NAME_LENGTH); + this.name = name; } public String getName() { @@ -67,13 +62,11 @@ public boolean equals(Object o) { return false; } Action action = (Action) o; - return Objects.equals(name, action.name) && Arrays.equals(formats, action.formats); + return Objects.equals(name, action.name); } @Override public int hashCode() { - int result = Objects.hash(name); - result = 31 * result + Arrays.hashCode(formats); - return result; + return Objects.hash(name); } } diff --git a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java index 76f470d98c..c75c5ac8d0 100644 --- a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java +++ b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java @@ -23,9 +23,9 @@ public class IcebergActions { private static final TableFormat[] DEFAULT_FORMATS = new TableFormat[] {TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE}; - public static final Action SYSTEM = new Action(DEFAULT_FORMATS, 0, "system"); - public static final Action REWRITE = new Action(DEFAULT_FORMATS, 10, "rewrite"); - public static final Action DELETE_ORPHANS = new Action(DEFAULT_FORMATS, 2, "delete-orphans"); - public static final Action SYNC_HIVE = new Action(DEFAULT_FORMATS, 3, "sync-hive"); - public static final Action EXPIRE_DATA = new Action(DEFAULT_FORMATS, 1, "expire-data"); + public static final Action SYSTEM = Action.register("system"); + public static final Action REWRITE = Action.register("rewrite"); + public static final Action DELETE_ORPHANS = Action.register("delete-orphans"); + public static final Action SYNC_HIVE = Action.register("sync-hive"); + public static final Action EXPIRE_DATA = Action.register("expire-data"); } diff --git a/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java b/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java index 31ea74f1fe..9fa9a29e47 100644 --- a/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java +++ b/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java @@ -48,6 +48,10 @@ public interface TableRuntime { */ List getProcessStates(Action action); + void registerProcess(TableProcessStore processStore); + + void removeProcess(TableProcessStore processStore); + /** Get the group name of the table runtime. */ String getGroupName(); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinator.java b/amoro-common/src/main/java/org/apache/amoro/process/ActionCoordinator.java similarity index 70% rename from amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinator.java rename to amoro-common/src/main/java/org/apache/amoro/process/ActionCoordinator.java index 5e55154cbd..18af6387ec 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinator.java +++ b/amoro-common/src/main/java/org/apache/amoro/process/ActionCoordinator.java @@ -16,25 +16,19 @@ * limitations under the License. */ -package org.apache.amoro.server.process; +package org.apache.amoro.process; import org.apache.amoro.Action; -import org.apache.amoro.ActivePlugin; import org.apache.amoro.TableFormat; import org.apache.amoro.TableRuntime; -import org.apache.amoro.process.TableProcess; -import org.apache.amoro.process.TableProcessStore; -import org.apache.amoro.server.utils.SnowflakeIdGenerator; + +import java.util.Optional; /** * Coordinator for a specific {@link org.apache.amoro.Action} to manage table processes. Provides * scheduling parameters and lifecycle hooks to create/recover/cancel/retry table processes. */ -public interface ActionCoordinator extends ActivePlugin { - - String PROPERTY_PARALLELISM = "parallelism"; - - SnowflakeIdGenerator SNOWFLAKE_ID_GENERATOR = new SnowflakeIdGenerator(); +public interface ActionCoordinator { /** * Check whether the given table format is supported by this coordinator. @@ -58,13 +52,6 @@ public interface ActionCoordinator extends ActivePlugin { */ Action action(); - /** - * Get the execution engine name used by this coordinator. - * - * @return execution engine name - */ - String executionEngine(); - /** * Calculate the next executing time for the given table runtime. * @@ -94,7 +81,7 @@ public interface ActionCoordinator extends ActivePlugin { * @param tableRuntime table runtime * @return a new table process */ - TableProcess createTableProcess(TableRuntime tableRuntime); + Optional trigger(TableRuntime tableRuntime); /** * Recover a {@link TableProcess} from persisted store. @@ -104,21 +91,4 @@ public interface ActionCoordinator extends ActivePlugin { * @return recovered table process */ TableProcess recoverTableProcess(TableRuntime tableRuntime, TableProcessStore processStore); - - /** - * Prepare a {@link TableProcess} for cancellation. - * - * @param tableRuntime table runtime - * @param process table process to cancel - * @return the process instance to be canceled - */ - TableProcess cancelTableProcess(TableRuntime tableRuntime, TableProcess process); - - /** - * Prepare a {@link TableProcess} for retrying. - * - * @param process table process to retry - * @return the process instance to be retried - */ - TableProcess retryTableProcess(TableProcess process); } diff --git a/amoro-common/src/main/java/org/apache/amoro/process/AmoroProcess.java b/amoro-common/src/main/java/org/apache/amoro/process/AmoroProcess.java index cf0ffd50bf..6ff5b6cfb6 100644 --- a/amoro-common/src/main/java/org/apache/amoro/process/AmoroProcess.java +++ b/amoro-common/src/main/java/org/apache/amoro/process/AmoroProcess.java @@ -47,7 +47,7 @@ public interface AmoroProcess { SimpleFuture getCompleteFuture(); /** - * Get {@link ProcessState} of the process + * Get {@link TableProcessStore} of the process * * @return the state of the process */ diff --git a/amoro-common/src/main/java/org/apache/amoro/process/OptimizingState.java b/amoro-common/src/main/java/org/apache/amoro/process/OptimizingState.java deleted file mode 100644 index c4a900dcac..0000000000 --- a/amoro-common/src/main/java/org/apache/amoro/process/OptimizingState.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.amoro.process; - -import org.apache.amoro.Action; -import org.apache.amoro.ServerTableIdentifier; -import org.apache.amoro.StateField; - -/** The state of the optimizing process. */ -public abstract class OptimizingState extends TableProcessState { - - @StateField private volatile long targetSnapshotId; - @StateField private volatile long watermark; - @StateField private volatile ProcessStage stage; - @StateField private volatile long currentStageStartTime; - - public OptimizingState(Action action, ServerTableIdentifier tableIdentifier) { - super(action, tableIdentifier); - } - - public OptimizingState(long id, Action action, ServerTableIdentifier tableIdentifier) { - super(id, action, tableIdentifier); - } - - protected void setStage(ProcessStage stage) { - this.stage = stage; - this.currentStageStartTime = System.currentTimeMillis(); - } - - protected void setStage(ProcessStage stage, long stageStartTime) { - this.stage = stage; - this.currentStageStartTime = stageStartTime; - } - - protected void setTargetSnapshotId(long targetSnapshotId) { - this.targetSnapshotId = targetSnapshotId; - } - - protected void setWatermark(long watermark) { - this.watermark = watermark; - } - - public long getWatermark() { - return watermark; - } - - @Override - public ProcessStage getStage() { - return stage; - } - - public long getTargetSnapshotId() { - return targetSnapshotId; - } - - public long getCurrentStageStartTime() { - return currentStageStartTime; - } - - @Override - public String getName() { - return stage.getDesc(); - } -} diff --git a/amoro-common/src/main/java/org/apache/amoro/process/ProcessFactory.java b/amoro-common/src/main/java/org/apache/amoro/process/ProcessFactory.java index a2df0a99f3..6e325d6934 100644 --- a/amoro-common/src/main/java/org/apache/amoro/process/ProcessFactory.java +++ b/amoro-common/src/main/java/org/apache/amoro/process/ProcessFactory.java @@ -19,23 +19,42 @@ package org.apache.amoro.process; import org.apache.amoro.Action; +import org.apache.amoro.ActivePlugin; +import org.apache.amoro.TableFormat; import org.apache.amoro.TableRuntime; +import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; +import org.apache.amoro.table.StateKey; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; /** * A factory to create a process. Normally, There will be default ProcessFactories for each action * and used by default scheduler. Meanwhile, user could extend external ProcessFactory to run jobs * on external resources like Yarn. */ -public interface ProcessFactory { +public interface ProcessFactory extends ActivePlugin { + + default List> requiredStates() { + return Lists.newArrayList(); + } + + /** Get supported actions for each table format. */ + Map> supportedActions(); + + /** How to trigger a process for the action. */ + ProcessTriggerStrategy triggerStrategy(TableFormat format, Action action); /** - * Create a process for the action. + * Try trigger a process for the action. * * @param tableRuntime table runtime * @param action action type * @return target process which has not been submitted yet. */ - AmoroProcess create(TableRuntime tableRuntime, Action action); + Optional trigger(TableRuntime tableRuntime, Action action); /** * Recover a process for the action from a state. @@ -44,5 +63,5 @@ public interface ProcessFactory { * @param state state of the process * @return target process which has not been submitted yet. */ - AmoroProcess recover(TableRuntime tableRuntime, TableProcessState state); + TableProcess recover(TableRuntime tableRuntime, TableProcessStore store); } diff --git a/amoro-common/src/main/java/org/apache/amoro/process/ProcessState.java b/amoro-common/src/main/java/org/apache/amoro/process/ProcessState.java deleted file mode 100644 index 91f76fcaef..0000000000 --- a/amoro-common/src/main/java/org/apache/amoro/process/ProcessState.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.amoro.process; - -import org.apache.amoro.Action; - -import java.util.Map; - -/** - * ProcessState contains information in any {@link AmoroProcess} which must be persistent and {@link - * ProcessFactory} will use to recover {@link AmoroProcess}. - */ -public interface ProcessState { - - /** @return unique identifier of the process. */ - long getId(); - - /** - * @return the name of the state. If multiple stages are involved, it should be the name of the - * current stage. - */ - String getName(); - - /** @return start time of the process. */ - long getStartTime(); - - /** @return the action of the process. */ - Action getAction(); - - /** @return the status of the process. */ - ProcessStatus getStatus(); - - /** - * Get the string encoded summary of the process, this could be a simple description or a POJO - * encoded by JSON - * - * @return the summary of the process - */ - Map getSummary(); - - /** @return the reason of process failure, null if the process has not failed yet. */ - String getFailedReason(); - - /** - * Total millisecond running time of all tasks in the process. - * - * @return actual quota runtime of the process. - */ - long getQuotaRuntime(); - - /** - * Quota value is calculated by the total millisecond running time of all tasks in the process - * divided by the total millisecond from the start time to the current time. It is used to - * evaluate the actual runtime concurrence of the process. - * - * @return the quota value of the process. - */ - default double getQuotaValue() { - return (double) getQuotaRuntime() / (System.currentTimeMillis() - getStartTime()); - } -} diff --git a/amoro-common/src/main/java/org/apache/amoro/process/ProcessTriggerStrategy.java b/amoro-common/src/main/java/org/apache/amoro/process/ProcessTriggerStrategy.java new file mode 100644 index 0000000000..2f54a4ed20 --- /dev/null +++ b/amoro-common/src/main/java/org/apache/amoro/process/ProcessTriggerStrategy.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.process; + +import java.time.Duration; + +/** Process trigger strategy. */ +public final class ProcessTriggerStrategy { + + private final Duration triggerInterval; + + private final boolean triggerOnNewSnapshot; + + private final int triggerParallelism; + + public ProcessTriggerStrategy( + Duration triggerInterval, boolean triggerOnNewSnapshot, int triggerParallelism) { + this.triggerInterval = triggerInterval; + this.triggerOnNewSnapshot = triggerOnNewSnapshot; + this.triggerParallelism = triggerParallelism; + } + + public static ProcessTriggerStrategy triggerAtFixRate(Duration triggerInterval) { + return new ProcessTriggerStrategy(triggerInterval, false, 1); + } + + public Duration getTriggerInterval() { + return triggerInterval; + } + + public boolean isTriggerOnNewSnapshot() { + return triggerOnNewSnapshot; + } + + public int getTriggerParallelism() { + return triggerParallelism; + } +} diff --git a/amoro-common/src/main/java/org/apache/amoro/process/TableProcessState.java b/amoro-common/src/main/java/org/apache/amoro/process/TableProcessState.java deleted file mode 100644 index c4e800ed1c..0000000000 --- a/amoro-common/src/main/java/org/apache/amoro/process/TableProcessState.java +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.amoro.process; - -import org.apache.amoro.Action; -import org.apache.amoro.ServerTableIdentifier; -import org.apache.amoro.StateField; - -import java.util.Map; - -/** A common state of a table process. */ -public class TableProcessState implements ProcessState { - - @StateField private volatile long id; - @StateField private volatile String externalProcessIdentifier; - private final Action action; - private final ServerTableIdentifier tableIdentifier; - private String executionEngine; - @StateField private int retryNumber = 0; - @StateField private long startTime = -1L; - @StateField private long endTime = -1L; - @StateField private ProcessStatus status = ProcessStatus.PENDING; - @StateField private volatile String failedReason; - private volatile Map processParameters; - private volatile Map summary; - - public TableProcessState(Action action, ServerTableIdentifier tableIdentifier) { - this.action = action; - this.tableIdentifier = tableIdentifier; - } - - public TableProcessState(long id, Action action, ServerTableIdentifier tableIdentifier) { - this.id = id; - this.action = action; - this.tableIdentifier = tableIdentifier; - } - - public TableProcessState( - long id, Action action, ServerTableIdentifier tableIdentifier, String executionEngine) { - this.id = id; - this.action = action; - this.tableIdentifier = tableIdentifier; - this.executionEngine = executionEngine; - } - - @Override - public long getId() { - return id; - } - - public String getExternalProcessIdentifier() { - return externalProcessIdentifier; - } - - public String getName() { - return action.getName(); - } - - public Action getAction() { - return action; - } - - public long getStartTime() { - return startTime; - } - - public long getEndTime() { - return endTime; - } - - public ProcessStatus getStatus() { - return status; - } - - public String getExecutionEngine() { - return executionEngine; - } - - @Override - public Map getSummary() { - return summary; - } - - @Override - public long getQuotaRuntime() { - return getDuration(); - } - - @Override - public double getQuotaValue() { - return 1; - } - - public long getDuration() { - return endTime > 0 ? endTime - startTime : System.currentTimeMillis() - startTime; - } - - public ServerTableIdentifier getTableIdentifier() { - return tableIdentifier; - } - - public void setExternalProcessIdentifier(String externalProcessIdentifier) { - this.externalProcessIdentifier = externalProcessIdentifier; - } - - public void setExecutionEngine(String executionEngine) { - this.executionEngine = executionEngine; - } - - protected void setSummary(Map summary) { - this.summary = summary; - } - - protected void setStartTime(long startTime) { - this.startTime = startTime; - } - - public void setStatus(ProcessStatus status) { - if (status == ProcessStatus.SUCCESS - || status == ProcessStatus.FAILED - || status == ProcessStatus.KILLED) { - endTime = System.currentTimeMillis(); - } else if (this.status != ProcessStatus.SUBMITTED && status == ProcessStatus.SUBMITTED) { - endTime = -1L; - failedReason = null; - summary = null; - } - this.status = status; - } - - public String getFailedReason() { - return failedReason; - } - - public ProcessStage getStage() { - return status.toStage(); - } - - protected void setId(long processId) { - this.id = processId; - } - - public Map getProcessParameters() { - return processParameters; - } - - public void setProcessParameters(Map processParameters) { - this.processParameters = processParameters; - } - - public void setSubmitted(String externalProcessIdentifier) { - this.status = ProcessStatus.SUBMITTED; - setExternalProcessIdentifier(externalProcessIdentifier); - this.startTime = System.currentTimeMillis(); - } - - public void setSubmitted() { - this.status = ProcessStatus.SUBMITTED; - this.startTime = System.currentTimeMillis(); - } - - public void setRunning() { - this.status = ProcessStatus.RUNNING; - } - - public void setCanceling() { - this.status = ProcessStatus.CANCELING; - } - - public void addRetryNumber() { - this.retryNumber += 1; - this.status = ProcessStatus.PENDING; - this.externalProcessIdentifier = ""; - this.failedReason = null; - } - - public void resetRetryNumber() { - this.retryNumber = 0; - } - - public void setCanceled() { - this.status = ProcessStatus.CANCELED; - } - - public void setCompleted() { - this.status = ProcessStatus.SUCCESS; - this.endTime = System.currentTimeMillis(); - } - - public void setKilled() { - this.status = ProcessStatus.KILLED; - this.endTime = System.currentTimeMillis(); - } - - public void setCompleted(String failedReason) { - this.status = ProcessStatus.FAILED; - this.failedReason = failedReason; - this.endTime = System.currentTimeMillis(); - } - - public int getRetryNumber() { - return retryNumber; - } - - public void setRetryNumber(int retryNumber) { - this.retryNumber = retryNumber; - } -} diff --git a/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java b/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java index 7654af7104..90e26bd4b6 100644 --- a/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java +++ b/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java @@ -21,6 +21,8 @@ import org.apache.amoro.ActivePlugin; import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.TableRuntime; +import org.apache.amoro.process.ActionCoordinator; +import org.apache.amoro.process.ProcessFactory; import java.util.List; import java.util.Map; @@ -29,6 +31,10 @@ /** Table runtime factory. */ public interface TableRuntimeFactory extends ActivePlugin { + List supportedCoordinators(); + + void initialize(List factories); + Optional accept( ServerTableIdentifier tableIdentifier, Map tableProperties); diff --git a/dist/src/main/amoro-bin/conf/plugins/table-runtime-factories.yaml b/dist/src/main/amoro-bin/conf/plugins/table-runtime-factories.yaml index 34cf7c94d3..a7f2e8a382 100644 --- a/dist/src/main/amoro-bin/conf/plugins/table-runtime-factories.yaml +++ b/dist/src/main/amoro-bin/conf/plugins/table-runtime-factories.yaml @@ -23,3 +23,15 @@ table-runtime-factories: enabled: true priority: 100 + - name: simple + enabled: true + priority: 1 + properties: + process-factory-impl: org.apache.amoro.server.table.simple.SimpleProcessFactory + support-formats: paimon, lance + support-actions: snapshot-expires, compact + support-format-actions.lance: optimizing-index + action.compact.check-interval: 5min + action.snapshot-expires.check-interval: 5min + action.training.check-interval: 10min +