From 72eef46056f526baf50175e111da8c3d78e876d9 Mon Sep 17 00:00:00 2001 From: "majin.nathan" Date: Fri, 30 Jan 2026 17:25:23 +0800 Subject: [PATCH 1/2] [Refactor] Introduce Iceberg TableRuntimePlugin --- .../amoro/server/AmoroServiceContainer.java | 46 ++-- .../server/table/DefaultTableManager.java | 4 +- .../table/DefaultTableRuntimeFactory.java | 4 +- .../server/table/DefaultTableService.java | 219 +++++++----------- .../server/table/IcebergTablePlugin.java | 127 ++++++++++ .../server/table/TableRuntimePlugin.java | 47 ++++ .../amoro/server/table/TableService.java | 11 +- .../amoro/server/AMSServiceTestBase.java | 11 +- .../table/TestDefaultTableRuntimeHandler.java | 6 +- .../amoro/table/TableRuntimeFactory.java | 4 +- 10 files changed, 298 insertions(+), 181 deletions(-) create mode 100644 amoro-ams/src/main/java/org/apache/amoro/server/table/IcebergTablePlugin.java create mode 100644 amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntimePlugin.java diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java index 4fd3e5e9af..096a554370 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java @@ -54,9 +54,10 @@ import org.apache.amoro.server.scheduler.inline.InlineTableExecutors; import org.apache.amoro.server.table.DefaultTableManager; import org.apache.amoro.server.table.DefaultTableService; -import org.apache.amoro.server.table.RuntimeHandlerChain; +import org.apache.amoro.server.table.IcebergTablePlugin; import org.apache.amoro.server.table.TableManager; import org.apache.amoro.server.table.TableRuntimeFactoryManager; +import org.apache.amoro.server.table.TableRuntimePlugin; import org.apache.amoro.server.table.TableService; import org.apache.amoro.server.terminal.TerminalManager; import org.apache.amoro.server.utils.ThriftServiceProxy; @@ -198,7 +199,7 @@ public void waitFollowerShip() throws Exception { haContainer.waitFollowerShip(); } - public void startRestServices() throws Exception { + public void startRestServices() { EventsManager.getInstance(); MetricManager.getInstance(); @@ -239,33 +240,32 @@ public void startOptimizingService() throws Exception { new DefaultOptimizingService(serviceConfig, catalogManager, optimizerManager, tableService); processService = new ProcessService(serviceConfig, tableService); - - LOG.info("Setting up AMS table executors..."); - InlineTableExecutors.getInstance().setup(tableService, serviceConfig); - addHandlerChain(optimizingService.getTableRuntimeHandler()); - addHandlerChain(processService.getTableHandlerChain()); - addHandlerChain(InlineTableExecutors.getInstance().getDataExpiringExecutor()); - addHandlerChain(InlineTableExecutors.getInstance().getSnapshotsExpiringExecutor()); - addHandlerChain(InlineTableExecutors.getInstance().getOrphanFilesCleaningExecutor()); - addHandlerChain(InlineTableExecutors.getInstance().getDanglingDeleteFilesCleaningExecutor()); - addHandlerChain(InlineTableExecutors.getInstance().getOptimizingCommitExecutor()); - addHandlerChain(InlineTableExecutors.getInstance().getOptimizingExpiringExecutor()); - addHandlerChain(InlineTableExecutors.getInstance().getBlockerExpiringExecutor()); - addHandlerChain(InlineTableExecutors.getInstance().getHiveCommitSyncExecutor()); - addHandlerChain(InlineTableExecutors.getInstance().getTableRefreshingExecutor()); - addHandlerChain(InlineTableExecutors.getInstance().getTagsAutoCreatingExecutor()); - tableService.initialize(); + tableService.initialize(initTablePlugins()); LOG.info("AMS table service have been initialized"); - tableManager.setTableService(tableService); initThriftService(); startThriftService(); } - private void addHandlerChain(RuntimeHandlerChain chain) { - if (chain != null) { - tableService.addHandlerChain(chain); - } + private List initTablePlugins() { + LOG.info("Setting up AMS table executors..."); + InlineTableExecutors.getInstance().setup(tableService, serviceConfig); + IcebergTablePlugin icebergTablePlugin = + IcebergTablePlugin.builder() + .addHandler(optimizingService.getTableRuntimeHandler()) + .addHandler(processService.getTableHandlerChain()) + .addHandler(InlineTableExecutors.getInstance().getDataExpiringExecutor()) + .addHandler(InlineTableExecutors.getInstance().getSnapshotsExpiringExecutor()) + .addHandler(InlineTableExecutors.getInstance().getOrphanFilesCleaningExecutor()) + .addHandler(InlineTableExecutors.getInstance().getDanglingDeleteFilesCleaningExecutor()) + .addHandler(InlineTableExecutors.getInstance().getOptimizingCommitExecutor()) + .addHandler(InlineTableExecutors.getInstance().getOptimizingExpiringExecutor()) + .addHandler(InlineTableExecutors.getInstance().getBlockerExpiringExecutor()) + .addHandler(InlineTableExecutors.getInstance().getHiveCommitSyncExecutor()) + .addHandler(InlineTableExecutors.getInstance().getTableRefreshingExecutor()) + .addHandler(InlineTableExecutors.getInstance().getTagsAutoCreatingExecutor()) + .build(); + return List.of(icebergTablePlugin); } public void disposeOptimizingService() { diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableManager.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableManager.java index 2e0f24ff3d..fab5da615e 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableManager.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableManager.java @@ -114,7 +114,7 @@ public void dropTableMetadata(TableIdentifier tableIdentifier, boolean deleteDat } ServerTableIdentifier serverTableIdentifier = internalCatalog.dropTable(database, table); - tableService().ifPresent(s -> s.onTableDropped(internalCatalog, serverTableIdentifier)); + tableService().ifPresent(s -> s.removeTable(serverTableIdentifier)); } @Override @@ -128,7 +128,7 @@ public void createTable(String catalogName, TableMetadata tableMetadata) { } TableMetadata metadata = catalog.createTable(tableMetadata); - tableService().ifPresent(s -> s.onTableCreated(catalog, metadata.getTableIdentifier())); + tableService().ifPresent(s -> s.addTable(metadata)); } @Override diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java index 86e748cc54..7b57ccc8ee 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java @@ -42,7 +42,7 @@ public String name() { } @Override - public Optional accept( + public Optional accept( ServerTableIdentifier tableIdentifier, Map tableProperties) { if (tableIdentifier .getFormat() @@ -52,7 +52,7 @@ public Optional accept( return Optional.empty(); } - private static class TableRuntimeCreatorImpl implements TableRuntimeFactory.TableRuntimeCreator { + private static class TableRuntimeCreatorImpl implements TableRuntimeFactory.Creator { @Override public List> requiredStateKeys() { return DefaultTableRuntime.REQUIRED_STATES; diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java index 464cd5601d..2c876a580e 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java @@ -74,8 +74,8 @@ public class DefaultTableService extends PersistentBase implements TableService public static final Logger LOG = LoggerFactory.getLogger(DefaultTableService.class); private final long externalCatalogRefreshingInterval; - private final Map tableRuntimeMap = new ConcurrentHashMap<>(); - + protected final Map tableRuntimeMap = new ConcurrentHashMap<>(); + protected final TableRuntimeFactoryManager tableRuntimeFactoryManager; private final ScheduledExecutorService tableExplorerScheduler = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder() @@ -85,8 +85,7 @@ public class DefaultTableService extends PersistentBase implements TableService private final CompletableFuture initialized = new CompletableFuture<>(); private final Configurations serverConfiguration; private final CatalogManager catalogManager; - private final TableRuntimeFactoryManager tableRuntimeFactoryManager; - private RuntimeHandlerChain headHandler; + private List tableRuntimePlugins; private ExecutorService tableExplorerExecutors; public DefaultTableService( @@ -101,59 +100,16 @@ public DefaultTableService( } @Override - public void onTableCreated(InternalCatalog catalog, ServerTableIdentifier identifier) { - triggerTableAdded(catalog, identifier); - } - - @Override - public void onTableDropped(InternalCatalog catalog, ServerTableIdentifier identifier) { - Optional.ofNullable(tableRuntimeMap.get(identifier.getId())) - .ifPresent( - tableRuntime -> { - try { - if (headHandler != null) { - headHandler.fireTableRemoved(tableRuntime); - } - tableRuntime.dispose(); - tableRuntimeMap.remove( - identifier.getId()); // remove only after successful operation - } catch (Exception e) { - LOG.error( - "Error occurred while removing tableRuntime of table {}", - identifier.getId(), - e); - } - }); - } - - @Override - public void addHandlerChain(RuntimeHandlerChain handler) { + public void initialize(List tableRuntimePlugins) { + this.tableRuntimePlugins = + tableRuntimePlugins == null ? new ArrayList<>() : tableRuntimePlugins; checkNotStarted(); - if (headHandler == null) { - headHandler = handler; - } else { - headHandler.appendNext(handler); - } - } - - @Override - public void handleTableChanged(TableRuntime tableRuntime, OptimizingStatus originalStatus) { - if (headHandler != null) { - headHandler.fireStatusChanged(tableRuntime, originalStatus); - } + initTableRuntimes(); + initTableRuntimePlugins(); + initTableExplorer(); } - @Override - public void handleTableChanged(TableRuntime tableRuntime, TableConfiguration originalConfig) { - if (headHandler != null) { - headHandler.fireConfigChanged(tableRuntime, originalConfig); - } - } - - @Override - public void initialize() { - checkNotStarted(); - + private void initTableRuntimes() { List tableRuntimeMetaList = getAs(TableRuntimeMapper.class, TableRuntimeMapper::selectAllRuntimes); Map identifierMap = @@ -171,8 +127,6 @@ public void initialize() { return a; })); - List tableRuntimes = new ArrayList<>(tableRuntimeMetaList.size()); - for (TableRuntimeMeta tableRuntimeMeta : tableRuntimeMetaList) { ServerTableIdentifier identifier = identifierMap.get(tableRuntimeMeta.getTableId()); if (identifier == null) { @@ -182,23 +136,46 @@ public void initialize() { continue; } List states = statesMap.get(tableRuntimeMeta.getTableId()); - Optional tableRuntime = - createTableRuntime(identifier, tableRuntimeMeta, states); - if (!tableRuntime.isPresent()) { - LOG.warn("No available table runtime factory found for table {}", identifier); - continue; - } - tableRuntime.ifPresent( - t -> { - t.registerMetric(MetricManager.getInstance().getGlobalRegistry()); - tableRuntimeMap.put(t.getTableIdentifier().getId(), t); - tableRuntimes.add(t); - }); + createTableRuntime(identifier, tableRuntimeMeta, states) + .ifPresentOrElse( + tableRuntime -> + tableRuntimeMap.put(tableRuntime.getTableIdentifier().getId(), tableRuntime), + () -> LOG.warn("No available table runtime factory found for table {}", identifier)); } + } - if (headHandler != null) { - headHandler.initialize(tableRuntimes); - } + private void initTableRuntimePlugins() { + List tableRuntimes = new ArrayList<>(tableRuntimeMap.values()); + tableRuntimePlugins.forEach(plugin -> plugin.initialize(tableRuntimes)); + } + + private Optional createTableRuntime( + ServerTableIdentifier identifier, + TableRuntimeMeta runtimeMeta, + List restoredStates) { + return tableRuntimeFactoryManager.installedPlugins().stream() + .map(f -> f.accept(identifier, runtimeMeta.getTableConfig())) + .filter(Optional::isPresent) + .map(Optional::get) + .findFirst() + .map( + creator -> { + DefaultTableRuntimeStore store = + new DefaultTableRuntimeStore( + identifier, runtimeMeta, creator.requiredStateKeys(), restoredStates); + TableRuntime tableRuntime = + tableRuntimePlugins.stream() + .filter(plugin -> plugin.accept(identifier)) + .findFirst() + .map(plugin -> plugin.createTableRuntime(creator, store)) + .orElse(creator.create(store)); + tableRuntime.registerMetric(MetricManager.getInstance().getGlobalRegistry()); + store.setTableRuntime(tableRuntime); + return tableRuntime; + }); + } + + private void initTableExplorer() { if (tableExplorerExecutors == null) { int threadCount = serverConfiguration.getInteger( @@ -234,12 +211,6 @@ public void setRuntime(TableRuntime tableRuntime) { tableRuntimeMap.put(tableRuntime.getTableIdentifier().getId(), tableRuntime); } - @Override - public boolean contains(Long tableId) { - checkStarted(); - return tableRuntimeMap.containsKey(tableId); - } - @Override public AmoroTable loadTable(ServerTableIdentifier identifier) { return catalogManager.loadTable(identifier.getIdentifier()); @@ -251,12 +222,20 @@ public void dispose() { if (tableExplorerExecutors != null) { tableExplorerExecutors.shutdown(); } - if (headHandler != null) { - headHandler.dispose(); - } + tableRuntimePlugins.forEach(TableRuntimePlugin::dispose); tableRuntimeMap.values().forEach(TableRuntime::unregisterMetric); } + @Override + public void addTable(TableMetadata tableMetadata) { + triggerTableAdded(tableMetadata.getTableIdentifier()); + } + + @Override + public void removeTable(ServerTableIdentifier tableIdentifier) { + disposeTable(tableIdentifier); + } + @VisibleForTesting void exploreTableRuntimes() { if (!initialized.isDone()) { @@ -412,7 +391,7 @@ private void exploreInternalCatalog(InternalCatalog internalCatalog) { i, internalCatalog.name())) .peek(i -> addedCount.incrementAndGet()) - .forEach(i -> triggerTableAdded(internalCatalog, i)); + .forEach(this::triggerTableAdded); Set tableIds = identifiers.stream().map(ServerTableIdentifier::getId).collect(Collectors.toSet()); @@ -437,7 +416,7 @@ private void exploreInternalCatalog(InternalCatalog internalCatalog) { tablesToBeDisposed.size()); } - private void checkStarted() { + protected void checkStarted() { try { initialized.get(); } catch (Exception e) { @@ -445,7 +424,7 @@ private void checkStarted() { } } - private void checkNotStarted() { + protected void checkNotStarted() { if (initialized.isDone()) { throw new IllegalStateException("Table service has started."); } @@ -464,21 +443,23 @@ private void syncTable(ExternalCatalog externalCatalog, TableIdentity tableIdent ServerTableIdentifier tableIdentifier = externalCatalog.getServerTableIdentifier( tableIdentity.getDatabase(), tableIdentity.getTableName()); - tableRuntimeAdded.set(triggerTableAdded(externalCatalog, tableIdentifier)); + tableRuntimeAdded.set(triggerTableAdded(tableIdentifier)); }); } catch (Throwable t) { if (tableRuntimeAdded.get()) { - revertTableRuntimeAdded(externalCatalog, tableIdentity); + ServerTableIdentifier tableIdentifier = + externalCatalog.getServerTableIdentifier( + tableIdentity.getDatabase(), tableIdentity.getTableName()); + if (tableIdentifier != null) { + tableRuntimeMap.remove(tableIdentifier.getId()); + } } throw t; } } - private boolean triggerTableAdded( - ServerCatalog catalog, ServerTableIdentifier serverTableIdentifier) { - AmoroTable table = - catalog.loadTable( - serverTableIdentifier.getDatabase(), serverTableIdentifier.getTableName()); + protected boolean triggerTableAdded(ServerTableIdentifier serverTableIdentifier) { + AmoroTable table = loadTable(serverTableIdentifier); if (TableFormat.ICEBERG.equals(table.format())) { if (TablePropertyUtil.isMixedTableStore(table.properties())) { return false; @@ -497,49 +478,16 @@ private boolean triggerTableAdded( Optional tableRuntimeOpt = createTableRuntime(serverTableIdentifier, meta, Collections.emptyList()); - if (!tableRuntimeOpt.isPresent()) { + if (tableRuntimeOpt.isPresent()) { + TableRuntime tableRuntime = tableRuntimeOpt.get(); + tableRuntimeMap.put(serverTableIdentifier.getId(), tableRuntime); + tableRuntime.registerMetric(MetricManager.getInstance().getGlobalRegistry()); + tableRuntimePlugins.forEach(plugin -> plugin.onTableCreated(table, tableRuntime)); + return true; + } else { LOG.warn("No available table runtime factory found for table {}", serverTableIdentifier); return false; } - - TableRuntime tableRuntime = tableRuntimeOpt.get(); - tableRuntimeMap.put(serverTableIdentifier.getId(), tableRuntime); - tableRuntime.registerMetric(MetricManager.getInstance().getGlobalRegistry()); - if (headHandler != null) { - headHandler.fireTableAdded(table, tableRuntime); - } - return true; - } - - private Optional createTableRuntime( - ServerTableIdentifier identifier, - TableRuntimeMeta runtimeMeta, - List restoredStates) { - return tableRuntimeFactoryManager.installedPlugins().stream() - .map(f -> f.accept(identifier, runtimeMeta.getTableConfig())) - .filter(Optional::isPresent) - .map(Optional::get) - .findFirst() - .map( - creator -> { - DefaultTableRuntimeStore store = - new DefaultTableRuntimeStore( - identifier, runtimeMeta, creator.requiredStateKeys(), restoredStates); - store.setRuntimeHandler(this); - TableRuntime tableRuntime = creator.create(store); - store.setTableRuntime(tableRuntime); - return tableRuntime; - }); - } - - private void revertTableRuntimeAdded( - ExternalCatalog externalCatalog, TableIdentity tableIdentity) { - ServerTableIdentifier tableIdentifier = - externalCatalog.getServerTableIdentifier( - tableIdentity.getDatabase(), tableIdentity.getTableName()); - if (tableIdentifier != null) { - tableRuntimeMap.remove(tableIdentifier.getId()); - } } @VisibleForTesting @@ -552,9 +500,8 @@ public void disposeTable(ServerTableIdentifier tableIdentifier) { .ifPresent( tableRuntime -> { try { - if (headHandler != null) { - headHandler.fireTableRemoved(tableRuntime); - } + tableRuntimePlugins.forEach( + plugin -> plugin.onTableDropped(tableRuntime)); tableRuntime.dispose(); tableRuntimeMap.remove( tableIdentifier.getId()); // remove only after successful operation @@ -596,12 +543,6 @@ protected TableIdentity(ServerTableIdentifier serverTableIdentifier) { this.format = serverTableIdentifier.getFormat(); } - protected TableIdentity(String database, String tableName, TableFormat format) { - this.database = database; - this.tableName = tableName; - this.format = format; - } - public String getDatabase() { return database; } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/IcebergTablePlugin.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/IcebergTablePlugin.java new file mode 100644 index 0000000000..8a8597752c --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/IcebergTablePlugin.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.table; + +import org.apache.amoro.AmoroTable; +import org.apache.amoro.ServerTableIdentifier; +import org.apache.amoro.TableFormat; +import org.apache.amoro.TableRuntime; +import org.apache.amoro.config.TableConfiguration; +import org.apache.amoro.server.optimizing.OptimizingStatus; +import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; +import org.apache.amoro.table.TableRuntimeFactory; +import org.apache.amoro.table.TableRuntimeStore; + +import javax.annotation.Nullable; + +import java.util.List; + +public class IcebergTablePlugin implements TableRuntimePlugin, TableRuntimeHandler { + + private final RuntimeHandlerChain headHandler; + + private IcebergTablePlugin(RuntimeHandlerChain headHandler) { + Preconditions.checkNotNull(headHandler); + this.headHandler = headHandler; + } + + @Override + public boolean accept(ServerTableIdentifier tableIdentifier) { + return tableIdentifier.getFormat() == TableFormat.ICEBERG + || tableIdentifier.getFormat() == TableFormat.MIXED_HIVE + || tableIdentifier.getFormat() == TableFormat.MIXED_ICEBERG; + } + + @Override + public TableRuntime createTableRuntime( + TableRuntimeFactory.Creator creator, TableRuntimeStore store) { + if (!(store instanceof DefaultTableRuntimeStore)) { + throw new IllegalStateException("Only support DefaultTableRuntimeStore"); + } + DefaultTableRuntimeStore icebergRuntimeStore = (DefaultTableRuntimeStore) store; + icebergRuntimeStore.setRuntimeHandler(this); + return creator.create(store); + } + + @Override + public void initialize(List tableRuntimes) { + if (headHandler != null) { + headHandler.initialize(tableRuntimes); + } + } + + @Override + public void onTableCreated(@Nullable AmoroTable amoroTable, TableRuntime tableRuntime) { + if (headHandler != null) { + headHandler.fireTableAdded(amoroTable, tableRuntime); + } + } + + @Override + public void onTableDropped(TableRuntime tableRuntime) { + if (headHandler != null) { + headHandler.fireTableRemoved(tableRuntime); + } + } + + @Override + public void dispose() { + if (headHandler != null) { + headHandler.dispose(); + } + } + + public void handleTableChanged(TableRuntime tableRuntime, OptimizingStatus originalStatus) { + if (headHandler != null) { + headHandler.fireStatusChanged(tableRuntime, originalStatus); + } + } + + public void handleTableChanged(TableRuntime tableRuntime, TableConfiguration originalConfig) { + if (headHandler != null) { + headHandler.fireConfigChanged(tableRuntime, originalConfig); + } + } + + public static IcebergTablePluginBuilder builder() { + return new IcebergTablePluginBuilder(); + } + + public static class IcebergTablePluginBuilder { + private RuntimeHandlerChain headHandler; + + private IcebergTablePluginBuilder() {} + + public IcebergTablePluginBuilder addHandler(RuntimeHandlerChain handler) { + if (handler == null) { + return this; + } + if (headHandler == null) { + headHandler = handler; + } else { + headHandler.appendNext(handler); + } + return this; + } + + public IcebergTablePlugin build() { + return new IcebergTablePlugin(headHandler); + } + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntimePlugin.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntimePlugin.java new file mode 100644 index 0000000000..82144c8cc6 --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntimePlugin.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.table; + +import org.apache.amoro.AmoroTable; +import org.apache.amoro.ServerTableIdentifier; +import org.apache.amoro.TableRuntime; +import org.apache.amoro.table.TableRuntimeFactory; +import org.apache.amoro.table.TableRuntimeStore; + +import javax.annotation.Nullable; + +import java.util.List; + +public interface TableRuntimePlugin { + + boolean accept(ServerTableIdentifier tableIdentifier); + + void initialize(List tableRuntimes); + + void onTableCreated(@Nullable AmoroTable amoroTable, TableRuntime tableRuntime); + + void onTableDropped(TableRuntime tableRuntime); + + void dispose(); + + default TableRuntime createTableRuntime( + TableRuntimeFactory.Creator creator, TableRuntimeStore store) { + return creator.create(store); + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableService.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableService.java index 152f951640..f89e9104db 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableService.java @@ -21,19 +21,18 @@ import org.apache.amoro.AmoroTable; import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.TableRuntime; -import org.apache.amoro.server.catalog.InternalCatalog; -public interface TableService extends TableRuntimeHandler { +import java.util.List; - void addHandlerChain(RuntimeHandlerChain handler); +public interface TableService { - void initialize(); + void initialize(List tableRuntimePlugins); void dispose(); - void onTableCreated(InternalCatalog catalog, ServerTableIdentifier identifier); + void addTable(TableMetadata tableMetadata); - void onTableDropped(InternalCatalog catalog, ServerTableIdentifier identifier); + void removeTable(ServerTableIdentifier identifier); TableRuntime getRuntime(Long tableId); diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java index 9f3d25b3f7..714d88a2ef 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java @@ -26,6 +26,7 @@ import org.apache.amoro.server.table.DefaultTableRuntime; import org.apache.amoro.server.table.DefaultTableRuntimeFactory; import org.apache.amoro.server.table.DefaultTableService; +import org.apache.amoro.server.table.IcebergTablePlugin; import org.apache.amoro.server.table.TableRuntimeFactoryManager; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.junit.AfterClass; @@ -34,6 +35,7 @@ import org.mockito.Mockito; import java.time.Duration; +import java.util.List; public abstract class AMSServiceTestBase extends AMSManagerTestBase { private static DefaultTableService TABLE_SERVICE = null; @@ -61,9 +63,12 @@ public static void initTableService() { configurations, CATALOG_MANAGER, OPTIMIZER_MANAGER, TABLE_SERVICE); PROCESS_SERVICE = new ProcessService(configurations, TABLE_SERVICE); - TABLE_SERVICE.addHandlerChain(OPTIMIZING_SERVICE.getTableRuntimeHandler()); - TABLE_SERVICE.addHandlerChain(PROCESS_SERVICE.getTableHandlerChain()); - TABLE_SERVICE.initialize(); + TABLE_SERVICE.initialize( + List.of( + IcebergTablePlugin.builder() + .addHandler(OPTIMIZING_SERVICE.getTableRuntimeHandler()) + .addHandler(PROCESS_SERVICE.getTableHandlerChain()) + .build())); try { ResourceGroup group = defaultResourceGroup(); OPTIMIZER_MANAGER.createResourceGroup(group); diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java index 8b6b9c3158..bd1b727950 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java @@ -77,8 +77,7 @@ public void testInitialize() throws Exception { tableService = new DefaultTableService(new Configurations(), CATALOG_MANAGER, runtimeFactoryManager); TestHandler handler = new TestHandler(); - tableService.addHandlerChain(handler); - tableService.initialize(); + tableService.initialize(List.of(IcebergTablePlugin.builder().addHandler(handler).build())); if (!(catalogTestHelper().tableFormat().equals(TableFormat.MIXED_HIVE) && TEST_HMS.getHiveClient().getDatabase(TableTestHelper.TEST_DB_NAME) != null)) { createDatabase(); @@ -97,8 +96,7 @@ public void testInitialize() throws Exception { tableService = new DefaultTableService(new Configurations(), CATALOG_MANAGER, runtimeFactoryManager); handler = new TestHandler(); - tableService.addHandlerChain(handler); - tableService.initialize(); + tableService.initialize(List.of(IcebergTablePlugin.builder().addHandler(handler).build())); Assert.assertEquals(1, handler.getInitTables().size()); Assert.assertEquals( (Long) createTableId.getId().longValue(), diff --git a/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java b/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java index 7654af7104..a15e28d3eb 100644 --- a/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java +++ b/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java @@ -29,10 +29,10 @@ /** Table runtime factory. */ public interface TableRuntimeFactory extends ActivePlugin { - Optional accept( + Optional accept( ServerTableIdentifier tableIdentifier, Map tableProperties); - interface TableRuntimeCreator { + interface Creator { List> requiredStateKeys(); TableRuntime create(TableRuntimeStore store); From f3606a6c2556f58740071dea2091adabf6f0a383 Mon Sep 17 00:00:00 2001 From: "majin.nathan" Date: Sat, 7 Feb 2026 20:59:44 +0800 Subject: [PATCH 2/2] fix ci --- .../main/java/org/apache/amoro/server/AmoroServiceContainer.java | 1 + 1 file changed, 1 insertion(+) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java index 096a554370..6f786ada0f 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java @@ -241,6 +241,7 @@ public void startOptimizingService() throws Exception { processService = new ProcessService(serviceConfig, tableService); tableService.initialize(initTablePlugins()); + tableManager.setTableService(tableService); LOG.info("AMS table service have been initialized"); initThriftService();