Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.amoro.config.Configurations;
import org.apache.amoro.config.shade.utils.ConfigShadeUtils;
import org.apache.amoro.exception.AmoroRuntimeException;
import org.apache.amoro.process.ActionCoordinator;
import org.apache.amoro.process.ProcessFactory;
import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.catalog.DefaultCatalogManager;
import org.apache.amoro.server.dashboard.DashboardServer;
Expand All @@ -47,6 +49,8 @@
import org.apache.amoro.server.persistence.HttpSessionHandlerFactory;
import org.apache.amoro.server.persistence.SqlSessionFactoryProvider;
import org.apache.amoro.server.process.ProcessService;
import org.apache.amoro.server.process.TableProcessFactoryManager;
import org.apache.amoro.server.process.executor.ExecuteEngineManager;
import org.apache.amoro.server.resource.ContainerMetadata;
import org.apache.amoro.server.resource.Containers;
import org.apache.amoro.server.resource.DefaultOptimizerManager;
Expand All @@ -58,9 +62,12 @@
import org.apache.amoro.server.table.TableManager;
import org.apache.amoro.server.table.TableRuntimeFactoryManager;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.table.simple.SimpleTableRuntimeFactory;
import org.apache.amoro.server.terminal.TerminalManager;
import org.apache.amoro.server.utils.ThriftServiceProxy;
import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.amoro.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
Expand All @@ -75,6 +82,7 @@
import org.apache.amoro.shade.thrift.org.apache.thrift.transport.TTransportException;
import org.apache.amoro.shade.thrift.org.apache.thrift.transport.TTransportFactory;
import org.apache.amoro.shade.thrift.org.apache.thrift.transport.layered.TFramedTransport;
import org.apache.amoro.table.TableRuntimeFactory;
import org.apache.amoro.utils.IcebergThreadPools;
import org.apache.amoro.utils.JacksonUtil;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -96,6 +104,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;

public class AmoroServiceContainer {

Expand Down Expand Up @@ -231,14 +240,32 @@ public void transitionToFollower() {
public void startOptimizingService() throws Exception {
TableRuntimeFactoryManager tableRuntimeFactoryManager = new TableRuntimeFactoryManager();
tableRuntimeFactoryManager.initialize();
List<TableRuntimeFactory> tableRuntimeFactories = tableRuntimeFactoryManager.installedPlugins();
Preconditions.checkArgument(
tableRuntimeFactories.size() == 1, "Only one table runtime factory is supported");
TableRuntimeFactory tableRuntimeFactory = new SimpleTableRuntimeFactory();

TableProcessFactoryManager tableProcessFactoryManager = new TableProcessFactoryManager();
tableProcessFactoryManager.initialize();
List<ProcessFactory> processFactories = tableProcessFactoryManager.installedPlugins();
tableRuntimeFactory.initialize(processFactories);

List<ActionCoordinator> actionCoordinators =
tableRuntimeFactoryManager.installedPlugins().stream()
.flatMap(f -> f.supportedCoordinators().stream())
.collect(Collectors.toList());

ExecuteEngineManager executeEngineManager = new ExecuteEngineManager();

tableService =
new DefaultTableService(serviceConfig, catalogManager, tableRuntimeFactoryManager);
new DefaultTableService(
serviceConfig, catalogManager, Lists.newArrayList(tableRuntimeFactory));

optimizingService =
new DefaultOptimizingService(serviceConfig, catalogManager, optimizerManager, tableService);

processService = new ProcessService(serviceConfig, tableService);
processService =
new ProcessService(serviceConfig, tableService, actionCoordinators, executeEngineManager);

LOG.info("Setting up AMS table executors...");
InlineTableExecutors.getInstance().setup(tableService, serviceConfig);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@

import org.apache.amoro.TableFormat;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.process.ActionCoordinator;
import org.apache.amoro.process.TableProcess;
import org.apache.amoro.process.TableProcessStore;
import org.apache.amoro.server.scheduler.PeriodicTableScheduler;
import org.apache.amoro.server.table.TableService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Optional;

/**
* Periodic scheduler that delegates scheduling decisions to an {@link ActionCoordinator}. It
* creates, recovers and retries table processes via {@link ProcessService}.
Expand Down Expand Up @@ -95,8 +98,8 @@ protected boolean enabled(TableRuntime tableRuntime) {
*/
@Override
protected void execute(TableRuntime tableRuntime) {
TableProcess process = coordinator.createTableProcess(tableRuntime);
processService.register(tableRuntime, process);
Optional<TableProcess> process = coordinator.trigger(tableRuntime);
process.ifPresent(p -> processService.register(tableRuntime, p));
}

/**
Expand All @@ -110,16 +113,6 @@ protected void recover(TableRuntime tableRuntime, TableProcessStore processStore
processService.recover(tableRuntime, process);
}

/**
* Retry a failed table process.
*
* @param process process to retry
*/
protected void retry(TableProcess process) {
process = coordinator.retryTableProcess(process);
processService.retry(process);
}

/**
* Get executor delay from coordinator.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@
import org.apache.amoro.TableRuntime;
import org.apache.amoro.config.Configurations;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.process.ActionCoordinator;
import org.apache.amoro.process.ProcessEvent;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.process.TableProcess;
import org.apache.amoro.server.manager.AbstractPluginManager;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
import org.apache.amoro.server.process.executor.EngineType;
import org.apache.amoro.server.process.executor.ExecuteEngine;
import org.apache.amoro.server.process.executor.ExecuteEngineManager;
import org.apache.amoro.server.process.executor.TableProcessExecutor;
import org.apache.amoro.server.table.RuntimeHandlerChain;
import org.apache.amoro.server.table.TableService;
Expand All @@ -59,11 +60,11 @@ public class ProcessService extends PersistentBase {
private static final Logger LOG = LoggerFactory.getLogger(ProcessService.class);
private final TableService tableService;

private final List<ActionCoordinator> actionCoordinatorLists;
private final Map<String, ActionCoordinatorScheduler> actionCoordinators =
new ConcurrentHashMap<>();
private final Map<EngineType, ExecuteEngine> executeEngines = new ConcurrentHashMap<>();

private final ActionCoordinatorManager actionCoordinatorManager;
private final ExecuteEngineManager executeEngineManager;
private final ProcessRuntimeHandler tableRuntimeHandler = new ProcessRuntimeHandler();
private final ThreadPoolExecutor processExecutionPool =
Expand All @@ -72,17 +73,13 @@ public class ProcessService extends PersistentBase {
private final Map<ServerTableIdentifier, Map<Long, TableProcess>> activeTableProcess =
new ConcurrentHashMap<>();

public ProcessService(Configurations serviceConfig, TableService tableService) {
this(serviceConfig, tableService, new ActionCoordinatorManager(), new ExecuteEngineManager());
}

public ProcessService(
Configurations serviceConfig,
TableService tableService,
ActionCoordinatorManager actionCoordinatorManager,
List<ActionCoordinator> actionCoordinators,
ExecuteEngineManager executeEngineManager) {
this.tableService = tableService;
this.actionCoordinatorManager = actionCoordinatorManager;
this.actionCoordinatorLists = actionCoordinators;
this.executeEngineManager = executeEngineManager;
}

Expand Down Expand Up @@ -126,15 +123,6 @@ public void recover(TableRuntime tableRuntime, TableProcess process) {
executeOrTraceProcess(process);
}

/**
* Retry a failed table process.
*
* @param process process to retry
*/
public void retry(TableProcess process) {
executeOrTraceProcess(process);
}

/**
* Cancel a table process and release related resources.
*
Expand All @@ -147,24 +135,20 @@ public void cancel(TableProcess process) {

/** Dispose the service, shutdown engines and clear active processes. */
public void dispose() {
actionCoordinatorManager.close();
this.actionCoordinators.values().forEach(RuntimeHandlerChain::dispose);
executeEngineManager.close();
processExecutionPool.shutdown();
activeTableProcess.clear();
}

private void initialize(List<TableRuntime> tableRuntimes) {
LOG.info("Initializing process service");
actionCoordinatorManager.initialize();
actionCoordinatorManager
.installedPlugins()
.forEach(
actionCoordinator -> {
actionCoordinators.put(
actionCoordinator.action().getName(),
new ActionCoordinatorScheduler(
actionCoordinator, tableService, ProcessService.this));
});
actionCoordinatorLists.forEach(
actionCoordinator -> {
actionCoordinators.put(
actionCoordinator.action().getName(),
new ActionCoordinatorScheduler(actionCoordinator, tableService, ProcessService.this));
});
executeEngineManager.initialize();
executeEngineManager
.installedPlugins()
Expand Down Expand Up @@ -242,7 +226,7 @@ private void executeOrTraceProcess(TableProcess process) {
"Regular Retry.",
process.getProcessParameters(),
process.getSummary());
scheduler.retry(process);
executeOrTraceProcess(process);
} else {
untrackTableProcessInstance(
process.getTableRuntime().getTableIdentifier(), process.getId());
Expand Down Expand Up @@ -551,18 +535,4 @@ protected void doDispose() {
// TODO: dispose
}
}

/** Manager for {@link ActionCoordinator} plugins. */
public static class ActionCoordinatorManager extends AbstractPluginManager<ActionCoordinator> {
public ActionCoordinatorManager() {
super("action-coordinators");
}
}

/** Manager for {@link ExecuteEngine} plugins. */
public static class ExecuteEngineManager extends AbstractPluginManager<ExecuteEngine> {
public ExecuteEngineManager() {
super("execute-engines");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.amoro.server.process;

import org.apache.amoro.process.ProcessFactory;
import org.apache.amoro.server.manager.AbstractPluginManager;

public class TableProcessFactoryManager extends AbstractPluginManager<ProcessFactory> {
public TableProcessFactoryManager() {
super("process-factories");
}
}
Loading
Loading