diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java index 19627f86f0..cf45e67b5e 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java @@ -671,10 +671,17 @@ public Pair, Integer> getOptimizingProcessesInfo( int pageNumber = (offset / limit) + 1; List processMetaList = Collections.emptyList(); try (Page ignored = PageHelper.startPage(pageNumber, limit, true)) { + org.apache.amoro.Action action = null; + if (type != null && !type.isEmpty()) { + action = + org.apache.amoro.server.persistence.converter.Action2StringConverter.getActionByName( + type); + } + final org.apache.amoro.Action finalAction = action; processMetaList = getAs( TableProcessMapper.class, - mapper -> mapper.listProcessMeta(identifier.getId(), type, status)); + mapper -> mapper.listProcessMeta(identifier.getId(), finalAction, status)); PageInfo pageInfo = new PageInfo<>(processMetaList); total = (int) pageInfo.getTotal(); LOG.info( diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java index ec207705c7..844d0e0f6e 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java @@ -490,7 +490,7 @@ public TableOptimizingProcess( OptimizingProcessState processState) { this.tableRuntime = tableRuntime; processId = tableRuntime.getProcessId(); - optimizingType = OptimizingType.valueOf(processMeta.getProcessType()); + optimizingType = getOptimizingTypeFromAction(processMeta.getAction()); targetSnapshotId = processState.getTargetSnapshotId(); targetChangeSnapshotId = processState.getTargetChangeSnapshotId(); planTime = processMeta.getCreateTime(); @@ -528,6 +528,53 @@ public OptimizingType getOptimizingType() { return optimizingType; } + /** + * Convert OptimizingType to corresponding Action. + * + * @param optimizingType optimizing type + * @return corresponding Action + */ + private org.apache.amoro.Action getOptimizingAction(OptimizingType optimizingType) { + switch (optimizingType) { + case MINOR: + return org.apache.amoro.IcebergActions.OPTIMIZING_MINOR; + case MAJOR: + return org.apache.amoro.IcebergActions.OPTIMIZING_MAJOR; + case FULL: + return org.apache.amoro.IcebergActions.OPTIMIZING_FULL; + default: + throw new IllegalArgumentException("Unknown optimizing type: " + optimizingType); + } + } + + /** + * Convert Action to corresponding OptimizingType. + * + * @param action action + * @return corresponding OptimizingType + */ + private OptimizingType getOptimizingTypeFromAction(org.apache.amoro.Action action) { + if (action == null) { + throw new IllegalArgumentException("Action cannot be null"); + } + String actionName = action.getName(); + if (org.apache.amoro.IcebergActions.OPTIMIZING_MINOR.getName().equals(actionName)) { + return OptimizingType.MINOR; + } else if (org.apache.amoro.IcebergActions.OPTIMIZING_MAJOR.getName().equals(actionName)) { + return OptimizingType.MAJOR; + } else if (org.apache.amoro.IcebergActions.OPTIMIZING_FULL.getName().equals(actionName)) { + return OptimizingType.FULL; + } else { + // Fallback to old behavior for backward compatibility + try { + return OptimizingType.valueOf(actionName.toUpperCase()); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException( + "Cannot convert action " + actionName + " to OptimizingType", e); + } + } + } + @Override public ProcessStatus getStatus() { return status; @@ -838,7 +885,7 @@ private void beginAndPersistProcess() { processId, "", status, - optimizingType.name().toUpperCase(), + getOptimizingAction(optimizingType), tableRuntime.getOptimizingStatus().name().toLowerCase(), "AMORO", 0, diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/converter/Action2StringConverter.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/converter/Action2StringConverter.java new file mode 100644 index 0000000000..5ac935825a --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/converter/Action2StringConverter.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.persistence.converter; + +import org.apache.amoro.Action; +import org.apache.amoro.IcebergActions; +import org.apache.amoro.PaimonActions; +import org.apache.amoro.TableFormat; +import org.apache.ibatis.type.JdbcType; +import org.apache.ibatis.type.MappedJdbcTypes; +import org.apache.ibatis.type.MappedTypes; +import org.apache.ibatis.type.TypeHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.CallableStatement; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * MyBatis TypeHandler for converting Action to/from String in database. This converter maintains a + * registry of known actions and can dynamically create temporary actions for unknown names to + * support backward compatibility. + */ +@MappedTypes(Action.class) +@MappedJdbcTypes(JdbcType.VARCHAR) +public class Action2StringConverter implements TypeHandler { + + private static final Logger LOG = LoggerFactory.getLogger(Action2StringConverter.class); + + /** Registry of all registered actions, keyed by action name. */ + private static final Map ACTION_REGISTRY = new ConcurrentHashMap<>(); + + /** Default formats for dynamically created actions. */ + private static final TableFormat[] DEFAULT_FORMATS = + new TableFormat[] { + TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE, TableFormat.PAIMON + }; + + /** Static initialization block to register all built-in Iceberg and Paimon actions. */ + static { + // Register Iceberg actions + registerAction(IcebergActions.SYSTEM); + registerAction(IcebergActions.REWRITE); + registerAction(IcebergActions.DELETE_ORPHANS); + registerAction(IcebergActions.SYNC_HIVE); + registerAction(IcebergActions.EXPIRE_DATA); + registerAction(IcebergActions.OPTIMIZING_MINOR); + registerAction(IcebergActions.OPTIMIZING_MAJOR); + registerAction(IcebergActions.OPTIMIZING_FULL); + + // Register Paimon actions + registerAction(PaimonActions.COMPACT); + registerAction(PaimonActions.FULL_COMPACT); + registerAction(PaimonActions.CLEAN_METADATA); + registerAction(PaimonActions.DELETE_SNAPSHOTS); + } + + /** + * Register an action in the registry. + * + * @param action the action to register + */ + public static void registerAction(Action action) { + if (action != null && action.getName() != null) { + ACTION_REGISTRY.put(action.getName(), action); + } + } + + /** + * Register a custom action. This is a convenience method that delegates to {@link + * #registerAction(Action)}. + * + * @param action the custom action to register + */ + public static void registerCustomAction(Action action) { + registerAction(action); + } + + /** + * Get an action by its name from the registry. + * + * @param name the action name to look up + * @return the registered action, or null if not found and name is null/empty + */ + public static Action getActionByName(String name) { + if (name == null || name.isEmpty()) { + return null; + } + return ACTION_REGISTRY.get(name); + } + + /** + * Get all registered actions. + * + * @return array of all registered actions + */ + public static Action[] getRegisteredActions() { + return ACTION_REGISTRY.values().toArray(new Action[0]); + } + + @Override + public void setParameter(PreparedStatement ps, int i, Action parameter, JdbcType jdbcType) + throws SQLException { + if (parameter == null) { + ps.setString(i, ""); + } else { + ps.setString(i, parameter.getName()); + } + } + + @Override + public Action getResult(ResultSet rs, String columnName) throws SQLException { + String actionName = rs.getString(columnName); + return convertToAction(actionName); + } + + @Override + public Action getResult(ResultSet rs, int columnIndex) throws SQLException { + String actionName = rs.getString(columnIndex); + return convertToAction(actionName); + } + + @Override + public Action getResult(CallableStatement cs, int columnIndex) throws SQLException { + String actionName = cs.getString(columnIndex); + return convertToAction(actionName); + } + + /** + * Convert a string action name to an Action object. First attempts to find the action in the + * registry. If not found, creates a temporary action with the given name for backward + * compatibility. + * + * @param actionName the action name to convert + * @return the corresponding Action object, or null if actionName is null/empty + */ + private Action convertToAction(String actionName) { + if (actionName == null || actionName.isEmpty()) { + return null; + } + + Action action = ACTION_REGISTRY.get(actionName); + if (action != null) { + return action; + } + + LOG.warn( + "Unknown action name '{}', creating temporary action for backward compatibility", + actionName); + Action tempAction = new Action(DEFAULT_FORMATS, 0, actionName); + registerAction(tempAction); + return tempAction; + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/ProcessStateMapper.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/ProcessStateMapper.java index f7b7557819..e15f7f192e 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/ProcessStateMapper.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/ProcessStateMapper.java @@ -30,8 +30,21 @@ import java.util.Map; +/** + * Mapper for table_process_state table. + * + * @deprecated This mapper is deprecated as of AMORO-3951. Use {@link TableProcessMapper} instead. + * The table_process_state table has been merged into table_process. + */ +@Deprecated public interface ProcessStateMapper { + /** + * Create a new process state. + * + * @deprecated Use {@link TableProcessMapper#insertProcess} instead. + */ + @Deprecated @Insert( "INSERT INTO table_process_state " + "(process_id, action, table_id, retry_num, status, start_time, end_time, fail_reason, summary) " @@ -40,24 +53,48 @@ public interface ProcessStateMapper { @Options(useGeneratedKeys = true, keyProperty = "id") void createProcessState(TableProcessState state); + /** + * Update process state to running. + * + * @deprecated Use {@link TableProcessMapper#updateProcess} instead. + */ + @Deprecated @Update( "UPDATE table_process_state " + "SET status = #{status}, start_time = #{startTime} " + "WHERE process_id = #{id} and retry_num = #{retryNumber}") void updateProcessRunning(TableProcessState state); + /** + * Update process state to completed. + * + * @deprecated Use {@link TableProcessMapper#updateProcess} instead. + */ + @Deprecated @Update( "UPDATE table_process_state " + "SET status = #{status}, end_time = #{endTime} " + "WHERE process_id = #{id} and retry_num = #{retryNumber}") void updateProcessCompleted(TableProcessState state); + /** + * Update process state to failed. + * + * @deprecated Use {@link TableProcessMapper#updateProcess} instead. + */ + @Deprecated @Update( "UPDATE table_process_state " + "SET status = #{status}, end_time = #{endTime}, fail_reason = #{failedReason} " + "WHERE process_id = #{id} and retry_num = #{retryNumber}") void updateProcessFailed(TableProcessState state); + /** + * Query TableProcessState by process_id. + * + * @deprecated Use {@link TableProcessMapper#getProcessMeta} instead. + */ + @Deprecated @Select( "SELECT process_id, action, table_id, retry_num, status, start_time, end_time, fail_reason, summary " + "FROM table_process_state " @@ -77,7 +114,12 @@ public interface ProcessStateMapper { }) TableProcessState getProcessStateById(@Param("processId") long processId); - /** Query TableProcessState by table_id */ + /** + * Query TableProcessState by table_id. + * + * @deprecated Use {@link TableProcessMapper#listProcessMeta} instead. + */ + @Deprecated @Select( "SELECT process_id, action, table_id, retry_num, status, start_time, end_time, fail_reason, summary " + "FROM table_process_state " diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableProcessMapper.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableProcessMapper.java index ed61924522..bdf19d638d 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableProcessMapper.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableProcessMapper.java @@ -18,7 +18,9 @@ package org.apache.amoro.server.persistence.mapper; +import org.apache.amoro.Action; import org.apache.amoro.process.ProcessStatus; +import org.apache.amoro.server.persistence.converter.Action2StringConverter; import org.apache.amoro.server.persistence.converter.Long2TsConverter; import org.apache.amoro.server.persistence.converter.Map2StringConverter; import org.apache.amoro.server.persistence.extension.InListExtendedLanguageDriver; @@ -46,7 +48,8 @@ public interface TableProcessMapper { "INSERT INTO table_process " + "(process_id, table_id, external_process_identifier, status, process_type, process_stage, execution_engine, retry_number, " + "create_time, process_parameters, summary) " - + "VALUES (#{processId}, #{tableId}, #{externalProcessIdentifier}, #{status}, #{processType}, #{processStage}, " + + "VALUES (#{processId}, #{tableId}, #{externalProcessIdentifier}, #{status}, " + + "#{action, typeHandler=org.apache.amoro.server.persistence.converter.Action2StringConverter}, #{processStage}, " + "#{executionEngine}, #{retryNumber}, " + "#{createTime, typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}, " + "#{processParameters, typeHandler=org.apache.amoro.server.persistence.converter.Map2StringConverter}, " @@ -56,7 +59,7 @@ void insertProcess( @Param("processId") long processId, @Param("externalProcessIdentifier") String externalProcessIdentifier, @Param("status") ProcessStatus status, - @Param("processType") String processType, + @Param("action") Action action, @Param("processStage") String processStage, @Param("executionEngine") String executionEngine, @Param("retryNumber") int retryNumber, @@ -97,7 +100,10 @@ void updateProcess( @Result(column = "table_id", property = "tableId"), @Result(column = "external_process_identifier", property = "externalProcessIdentifier"), @Result(column = "status", property = "status"), - @Result(column = "process_type", property = "processType"), + @Result( + column = "process_type", + property = "action", + typeHandler = Action2StringConverter.class), @Result(column = "process_stage", property = "processStage"), @Result(column = "execution_engine", property = "executionEngine"), @Result(column = "retry_number", property = "retryNumber"), @@ -123,14 +129,14 @@ void updateProcess( + "SELECT process_id, table_id, external_process_identifier, status, process_type, process_stage, execution_engine, retry_number, " + "create_time, finish_time, fail_message, process_parameters, summary " + "FROM table_process WHERE table_id = #{tableId} " - + " AND process_type = #{processType}" + + " AND process_type = #{action.name}" + " AND status = #{status}" + " ORDER BY process_id desc" + "") @ResultMap("tableProcessMap") List listProcessMeta( @Param("tableId") long tableId, - @Param("processType") String processType, + @Param("action") Action action, @Param("status") ProcessStatus optimizingStatus); @Select( diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java index ee9f136dc6..4f5041de52 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java @@ -192,7 +192,9 @@ public void recoverProcesses(List tableRuntimes) { processMeta -> { TableRuntime tableRuntime = tableIdToRuntimes.get(processMeta.getTableId()); ActionCoordinatorScheduler scheduler = - actionCoordinators.get(processMeta.getProcessType()); + processMeta.getAction() != null + ? actionCoordinators.get(processMeta.getAction().getName()) + : null; if (tableRuntime != null && scheduler != null) { scheduler.recover( tableRuntime, @@ -201,7 +203,7 @@ public void recoverProcesses(List tableRuntimes) { tableRuntime, processMeta, scheduler.getAction(), - scheduler.PROCESS_MAX_RETRY_NUMBER)); + ActionCoordinatorScheduler.PROCESS_MAX_RETRY_NUMBER)); } }); } @@ -231,7 +233,8 @@ private void executeOrTraceProcess(TableProcess process) { actionCoordinators.get(process.store().getAction().getName()); if (scheduler != null && process.getStatus() == ProcessStatus.FAILED - && process.store().getRetryNumber() < scheduler.PROCESS_MAX_RETRY_NUMBER + && process.store().getRetryNumber() + < ActionCoordinatorScheduler.PROCESS_MAX_RETRY_NUMBER && process.getTableRuntime() != null) { process .store() @@ -343,7 +346,7 @@ public TableProcessMeta persistTableProcess(TableProcess process) { processMeta.getProcessId(), processMeta.getExternalProcessIdentifier(), processMeta.getStatus(), - processMeta.getProcessType(), + processMeta.getAction(), processMeta.getProcessStage(), processMeta.getExecutionEngine(), processMeta.getRetryNumber(), diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java index 311b60e035..6ea3976f08 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java @@ -18,6 +18,7 @@ package org.apache.amoro.server.process; +import org.apache.amoro.Action; import org.apache.amoro.process.ProcessStatus; import org.apache.amoro.process.TableProcessState; import org.apache.amoro.process.TableProcessStore; @@ -30,7 +31,7 @@ public class TableProcessMeta { private long tableId; private volatile String externalProcessIdentifier; private ProcessStatus status; - private String processType; + private Action action; private String processStage; private String executionEngine; private int retryNumber; @@ -64,12 +65,57 @@ public void setStatus(ProcessStatus status) { this.status = status; } + /** + * Get the action of this process. + * + * @return action + */ + public Action getAction() { + return action; + } + + /** + * Set the action of this process. + * + * @param action action + */ + public void setAction(Action action) { + this.action = action; + } + + /** + * Get process type (action name) for backward compatibility. + * + * @return process type name + * @deprecated Use {@link #getAction()} instead + */ + @Deprecated public String getProcessType() { - return processType; + return action != null ? action.getName() : null; } + /** + * Set process type (action name) for backward compatibility. + * + * @param processType process type name + * @deprecated Use {@link #setAction(Action)} instead + */ + @Deprecated public void setProcessType(String processType) { - this.processType = processType; + // This method is kept for backward compatibility but should not be used + // Action should be set directly via setAction() + if (processType != null && action == null) { + // Try to find action by name from registry + org.apache.amoro.server.persistence.converter.Action2StringConverter.registerCustomAction( + new Action( + new org.apache.amoro.TableFormat[] { + org.apache.amoro.TableFormat.ICEBERG, + org.apache.amoro.TableFormat.MIXED_ICEBERG, + org.apache.amoro.TableFormat.MIXED_HIVE + }, + 0, + processType)); + } } public String getProcessStage() { @@ -154,7 +200,7 @@ public TableProcessMeta copy() { meta.setFinishTime(this.finishTime); meta.setExternalProcessIdentifier(this.externalProcessIdentifier); - meta.setProcessType(this.processType); + meta.setAction(this.action); meta.setProcessStage(this.processStage); meta.setExecutionEngine(this.executionEngine); meta.setFailMessage(this.failMessage); @@ -177,7 +223,7 @@ public static TableProcessMeta fromTableProcessStore(TableProcessStore tableProc tableProcessMeta.setTableId(tableProcessStore.getTableId()); tableProcessMeta.setExternalProcessIdentifier(tableProcessStore.getExternalProcessIdentifier()); tableProcessMeta.setStatus(tableProcessStore.getStatus()); - tableProcessMeta.setProcessType(tableProcessStore.getProcessType()); + tableProcessMeta.setAction(tableProcessStore.getAction()); tableProcessMeta.setProcessStage(tableProcessStore.getProcessStage()); tableProcessMeta.setExecutionEngine(tableProcessStore.getExecutionEngine()); tableProcessMeta.setRetryNumber(tableProcessStore.getRetryNumber()); @@ -196,7 +242,7 @@ public static TableProcessMeta fromTableProcessState(TableProcessState tableProc tableProcessMeta.setTableId(tableProcessState.getTableIdentifier().getId()); tableProcessMeta.setExternalProcessIdentifier(tableProcessState.getExternalProcessIdentifier()); tableProcessMeta.setStatus(tableProcessState.getStatus()); - tableProcessMeta.setProcessType(tableProcessState.getAction().getName()); + tableProcessMeta.setAction(tableProcessState.getAction()); tableProcessMeta.setProcessStage(tableProcessState.getStage().getDesc()); tableProcessMeta.setExecutionEngine(tableProcessState.getExecutionEngine()); tableProcessMeta.setRetryNumber(tableProcessState.getRetryNumber()); @@ -208,10 +254,20 @@ public static TableProcessMeta fromTableProcessState(TableProcessState tableProc return tableProcessMeta; } + /** + * Create a TableProcessMeta with Action. + * + * @param processId process id + * @param tableId table id + * @param action action + * @param executionEngine execution engine + * @param processParameters process parameters + * @return TableProcessMeta instance + */ public static TableProcessMeta of( long processId, long tableId, - String actionName, + Action action, String executionEngine, Map processParameters) { TableProcessMeta tableProcessMeta = new TableProcessMeta(); @@ -219,7 +275,7 @@ public static TableProcessMeta of( tableProcessMeta.setTableId(tableId); tableProcessMeta.setExternalProcessIdentifier(""); tableProcessMeta.setStatus(ProcessStatus.UNKNOWN); - tableProcessMeta.setProcessType(actionName); + tableProcessMeta.setAction(action); tableProcessMeta.setProcessStage(ProcessStatus.UNKNOWN.name()); tableProcessMeta.setExecutionEngine(executionEngine); tableProcessMeta.setRetryNumber(0); @@ -230,4 +286,41 @@ public static TableProcessMeta of( tableProcessMeta.setSummary(new HashMap<>()); return tableProcessMeta; } + + /** + * Create a TableProcessMeta with action name (for backward compatibility). + * + * @param processId process id + * @param tableId table id + * @param actionName action name + * @param executionEngine execution engine + * @param processParameters process parameters + * @return TableProcessMeta instance + * @deprecated Use {@link #of(long, long, Action, String, Map)} instead + */ + @Deprecated + public static TableProcessMeta of( + long processId, + long tableId, + String actionName, + String executionEngine, + Map processParameters) { + // Try to find action from registry + Action action = + org.apache.amoro.server.persistence.converter.Action2StringConverter.getActionByName( + actionName); + if (action == null) { + // Create a temporary action if not found + action = + new Action( + new org.apache.amoro.TableFormat[] { + org.apache.amoro.TableFormat.ICEBERG, + org.apache.amoro.TableFormat.MIXED_ICEBERG, + org.apache.amoro.TableFormat.MIXED_HIVE + }, + 0, + actionName); + } + return of(processId, tableId, action, executionEngine, processParameters); + } } diff --git a/amoro-ams/src/main/resources/db/migration/V0.9.0__merge_table_process.sql b/amoro-ams/src/main/resources/db/migration/V0.9.0__merge_table_process.sql new file mode 100644 index 0000000000..34ef224cda --- /dev/null +++ b/amoro-ams/src/main/resources/db/migration/V0.9.0__merge_table_process.sql @@ -0,0 +1,64 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +-- ============================================================================ +-- Amoro Schema Upgrade: Merge table_process and table_process_state +-- JIRA: AMORO-3951 +-- +-- This script migrates data from table_process_state to table_process and +-- deprecates the table_process_state table. +-- ============================================================================ + +-- Step 1: Migrate data from table_process_state to table_process +-- This step copies all records from table_process_state that don't already +-- exist in table_process (based on process_id) +INSERT INTO table_process ( + process_id, + table_id, + external_process_identifier, + status, + process_type, + process_stage, + execution_engine, + retry_number, + create_time, + finish_time, + fail_message, + process_parameters, + summary +) +SELECT + tps.process_id, + tps.table_id, + '' as external_process_identifier, + tps.status, + tps.action as process_type, + tps.status as process_stage, + 'AMORO' as execution_engine, + COALESCE(tps.retry_num, 0) as retry_number, + tps.start_time as create_time, + tps.end_time as finish_time, + tps.fail_reason as fail_message, + '' as process_parameters, + tps.summary +FROM table_process_state tps +WHERE NOT EXISTS ( + SELECT 1 FROM table_process tp + WHERE tp.process_id = tps.process_id +); + +-- Step 2: Rename old table as backup +-- This preserves the old table for rollback purposes if needed +ALTER TABLE table_process_state RENAME TO table_process_state_backup; diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestIcebergServerTableDescriptor.java b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestIcebergServerTableDescriptor.java index 265a5700da..335c88f57d 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestIcebergServerTableDescriptor.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestIcebergServerTableDescriptor.java @@ -21,7 +21,9 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import org.apache.amoro.Action; import org.apache.amoro.AmoroTable; +import org.apache.amoro.IcebergActions; import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.TableFormat; import org.apache.amoro.formats.AmoroCatalogTestHelper; @@ -387,6 +389,7 @@ public void insertOptimizingProcess( MetricsSummary summary, Map fromSequence, Map toSequence) { + Action action = getOptimizingAction(type); doAs( TableProcessMapper.class, mapper -> @@ -395,7 +398,7 @@ public void insertOptimizingProcess( processId, "", status, - type.name(), + action, type.name(), "AMORO", 0, @@ -413,5 +416,24 @@ public void insertOptimizingProcess( fromSequence, toSequence)); } + + /** + * Convert OptimizingType to corresponding Action. + * + * @param optimizingType optimizing type + * @return corresponding Action + */ + private Action getOptimizingAction(OptimizingType optimizingType) { + switch (optimizingType) { + case MINOR: + return IcebergActions.OPTIMIZING_MINOR; + case MAJOR: + return IcebergActions.OPTIMIZING_MAJOR; + case FULL: + return IcebergActions.OPTIMIZING_FULL; + default: + throw new IllegalArgumentException("Unknown optimizing type: " + optimizingType); + } + } } } diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/persistence/converter/Action2StringConverterTest.java b/amoro-ams/src/test/java/org/apache/amoro/server/persistence/converter/Action2StringConverterTest.java new file mode 100644 index 0000000000..e8d6604275 --- /dev/null +++ b/amoro-ams/src/test/java/org/apache/amoro/server/persistence/converter/Action2StringConverterTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.persistence.converter; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.apache.amoro.Action; +import org.apache.amoro.IcebergActions; +import org.apache.amoro.PaimonActions; +import org.junit.jupiter.api.Test; + +/** Unit tests for {@link Action2StringConverter}. */ +public class Action2StringConverterTest { + + @Test + public void testGetRegisteredIcebergAction() { + Action action = Action2StringConverter.getActionByName("optimizing-minor"); + assertNotNull(action, "Should find registered optimizing-minor action"); + assertEquals("optimizing-minor", action.getName()); + assertSame(IcebergActions.OPTIMIZING_MINOR, action, "Should return the same instance"); + } + + @Test + public void testGetRegisteredIcebergActions() { + assertEquals(IcebergActions.SYSTEM, Action2StringConverter.getActionByName("system")); + assertEquals(IcebergActions.REWRITE, Action2StringConverter.getActionByName("rewrite")); + assertEquals( + IcebergActions.DELETE_ORPHANS, Action2StringConverter.getActionByName("delete-orphans")); + assertEquals(IcebergActions.SYNC_HIVE, Action2StringConverter.getActionByName("sync-hive")); + assertEquals(IcebergActions.EXPIRE_DATA, Action2StringConverter.getActionByName("expire-data")); + assertEquals( + IcebergActions.OPTIMIZING_MINOR, + Action2StringConverter.getActionByName("optimizing-minor")); + assertEquals( + IcebergActions.OPTIMIZING_MAJOR, + Action2StringConverter.getActionByName("optimizing-major")); + assertEquals( + IcebergActions.OPTIMIZING_FULL, Action2StringConverter.getActionByName("optimizing-full")); + } + + @Test + public void testGetRegisteredPaimonActions() { + assertEquals(PaimonActions.COMPACT, Action2StringConverter.getActionByName("compact")); + assertEquals( + PaimonActions.FULL_COMPACT, Action2StringConverter.getActionByName("full-compact")); + assertEquals( + PaimonActions.CLEAN_METADATA, Action2StringConverter.getActionByName("clean-meta")); + assertEquals( + PaimonActions.DELETE_SNAPSHOTS, Action2StringConverter.getActionByName("del-snapshots")); + } + + @Test + public void testGetUnknownActionCreatesTemporary() { + String unknownActionName = "custom-optimizing-action"; + Action action = Action2StringConverter.getActionByName(unknownActionName); + + assertNotNull(action, "Should create action for unknown name"); + assertEquals(unknownActionName, action.getName()); + } + + @Test + public void testGetUnknownActionReturnsSameInstance() { + String unknownActionName = "another-custom-action"; + Action action1 = Action2StringConverter.getActionByName(unknownActionName); + Action action2 = Action2StringConverter.getActionByName(unknownActionName); + + assertSame(action1, action2, "Should return the same instance for same unknown action name"); + } + + @Test + public void testGetNullAction() { + Action action = Action2StringConverter.getActionByName(null); + assertNull(action, "Should return null for null input"); + } + + @Test + public void testGetEmptyAction() { + Action action = Action2StringConverter.getActionByName(""); + assertNull(action, "Should return null for empty string"); + } + + @Test + public void testRegisterCustomAction() { + Action customAction = + new Action( + new org.apache.amoro.TableFormat[] {org.apache.amoro.TableFormat.PAIMON}, + 50, + "custom-action"); + + Action2StringConverter.registerCustomAction(customAction); + Action retrieved = Action2StringConverter.getActionByName("custom-action"); + + assertSame(customAction, retrieved, "Should retrieve the same custom action"); + } + + @Test + public void testGetRegisteredActions() { + Action[] actions = Action2StringConverter.getRegisteredActions(); + assertTrue(actions.length > 0, "Should have registered actions"); + + boolean hasOptimizingMinor = false; + for (Action action : actions) { + if ("optimizing-minor".equals(action.getName())) { + hasOptimizingMinor = true; + break; + } + } + assertTrue(hasOptimizingMinor, "Should include optimizing-minor action"); + } +} diff --git a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java index 76f470d98c..bafca9c6ae 100644 --- a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java +++ b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java @@ -28,4 +28,11 @@ public class IcebergActions { public static final Action DELETE_ORPHANS = new Action(DEFAULT_FORMATS, 2, "delete-orphans"); public static final Action SYNC_HIVE = new Action(DEFAULT_FORMATS, 3, "sync-hive"); public static final Action EXPIRE_DATA = new Action(DEFAULT_FORMATS, 1, "expire-data"); + + // Optimizing Actions + public static final Action OPTIMIZING_MINOR = + new Action(DEFAULT_FORMATS, 100, "optimizing-minor"); + public static final Action OPTIMIZING_MAJOR = + new Action(DEFAULT_FORMATS, 200, "optimizing-major"); + public static final Action OPTIMIZING_FULL = new Action(DEFAULT_FORMATS, 300, "optimizing-full"); } diff --git a/amoro-common/src/main/java/org/apache/amoro/PaimonActions.java b/amoro-common/src/main/java/org/apache/amoro/PaimonActions.java new file mode 100644 index 0000000000..d15d05c66b --- /dev/null +++ b/amoro-common/src/main/java/org/apache/amoro/PaimonActions.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro; + +/** Pre-defined actions for Paimon table format operations. */ +public class PaimonActions { + + private static final TableFormat[] PAIMON_FORMATS = new TableFormat[] {TableFormat.PAIMON}; + + /** Minor compaction action for Paimon tables. */ + public static final Action COMPACT = new Action(PAIMON_FORMATS, 100, "compact"); + + /** Full compaction action for Paimon tables. */ + public static final Action FULL_COMPACT = new Action(PAIMON_FORMATS, 200, "full-compact"); + + /** Clean metadata action for removing expired snapshots. */ + public static final Action CLEAN_METADATA = new Action(PAIMON_FORMATS, 10, "clean-meta"); + + /** Delete snapshots action for Paimon tables. */ + public static final Action DELETE_SNAPSHOTS = new Action(PAIMON_FORMATS, 5, "del-snapshots"); + + private PaimonActions() { + // Private constructor to prevent instantiation + } +}