From b6a88c47a96e9bfb8c0b0712be3da540bbb22cc9 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Wed, 15 Apr 2026 15:43:56 +0800 Subject: [PATCH 01/10] Show pipe plugin loading errors in plugin listing Add plugin loading exception messages to pipe plugin metadata and expose them through SHOW PIPEPLUGINS and information_schema.pipe_plugins, so users can diagnose initialization and class loading failures directly from query results. Made-with: Cursor --- .../persistence/pipe/PipePluginInfo.java | 27 +++++++++++++++++ ...formationSchemaContentSupplierFactory.java | 6 ++++ .../config/metadata/ShowPipePluginsTask.java | 7 +++++ .../agent/plugin/meta/PipePluginMeta.java | 29 +++++++++++++++++-- .../schema/column/ColumnHeaderConstant.java | 3 +- .../schema/table/InformationSchema.java | 3 ++ 6 files changed, 72 insertions(+), 3 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java index c7c138718f683..4d178b3321b77 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java @@ -412,6 +412,15 @@ public void processLoadSnapshot(final File snapshotDir) throws IOException { Class.forName(pipePluginMeta.getClassName(), true, pipePluginClassLoader); pipePluginMetaKeeper.addPipePluginVisibility( pluginName, VisibilityUtils.calculateFromPluginClass(pluginClass)); + pipePluginMetaKeeper.addPipePluginMeta( + pluginName, + new PipePluginMeta( + pipePluginMeta.getPluginName(), + pipePluginMeta.getClassName(), + pipePluginMeta.isBuiltin(), + pipePluginMeta.getJarName(), + pipePluginMeta.getJarMD5(), + null)); classLoaderManager.addPluginAndClassLoader(pluginName, pipePluginClassLoader); } catch (final Throwable e) { try { @@ -421,6 +430,15 @@ public void processLoadSnapshot(final File snapshotDir) throws IOException { throw e; } } catch (final Throwable e) { + pipePluginMetaKeeper.addPipePluginMeta( + pluginName, + new PipePluginMeta( + pipePluginMeta.getPluginName(), + pipePluginMeta.getClassName(), + pipePluginMeta.isBuiltin(), + pipePluginMeta.getJarName(), + pipePluginMeta.getJarMD5(), + getRootCauseMessage(e))); LOGGER.warn( "Failed to load plugin class for plugin [{}] when loading snapshot [{}] ", pluginName, @@ -433,6 +451,15 @@ public void processLoadSnapshot(final File snapshotDir) throws IOException { } } + private String getRootCauseMessage(final Throwable throwable) { + Throwable current = throwable; + while (current.getCause() != null && current.getCause() != current) { + current = current.getCause(); + } + final String message = current.getMessage(); + return current.getClass().getSimpleName() + (message == null ? "" : (": " + message)); + } + /////////////////////////////// hashCode & equals /////////////////////////////// @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java index d7162b03be316..a5c95db9ede79 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java @@ -706,6 +706,12 @@ protected void constructLine() { } else { columnBuilders[3].appendNull(); } + if (Objects.nonNull(pipePluginMeta.getPluginLoadingExceptionMessage())) { + columnBuilders[4].writeBinary( + BytesUtils.valueOf(pipePluginMeta.getPluginLoadingExceptionMessage())); + } else { + columnBuilders[4].appendNull(); + } resultBuilder.declarePosition(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowPipePluginsTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowPipePluginsTask.java index f186702d595ce..cd6edab749421 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowPipePluginsTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowPipePluginsTask.java @@ -50,6 +50,7 @@ public class ShowPipePluginsTask implements IConfigTask { public static final Binary PIPE_PLUGIN_TYPE_EXTERNAL = BytesUtils.valueOf("External"); private static final Binary PIPE_JAR_NAME_EMPTY_FIELD = BytesUtils.valueOf(""); + private static final Binary PIPE_PLUGIN_EXCEPTION_MESSAGE_EMPTY_FIELD = BytesUtils.valueOf(""); private final ShowPipePluginsStatement showPipePluginsStatement; @@ -103,6 +104,12 @@ public static void buildTsBlock( pipePluginMeta.getJarName() == null ? PIPE_JAR_NAME_EMPTY_FIELD : BytesUtils.valueOf(pipePluginMeta.getJarName())); + builder + .getColumnBuilder(4) + .writeBinary( + pipePluginMeta.getPluginLoadingExceptionMessage() == null + ? PIPE_PLUGIN_EXCEPTION_MESSAGE_EMPTY_FIELD + : BytesUtils.valueOf(pipePluginMeta.getPluginLoadingExceptionMessage())); builder.declarePosition(); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java index 0fb2314a1cfca..1816bf855ffcf 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java @@ -38,9 +38,20 @@ public class PipePluginMeta { private final boolean isBuiltin; private final String jarName; private final String jarMD5; + private final String pluginLoadingExceptionMessage; public PipePluginMeta( String pluginName, String className, boolean isBuiltin, String jarName, String jarMD5) { + this(pluginName, className, isBuiltin, jarName, jarMD5, null); + } + + public PipePluginMeta( + String pluginName, + String className, + boolean isBuiltin, + String jarName, + String jarMD5, + String pluginLoadingExceptionMessage) { this.pluginName = Objects.requireNonNull(pluginName).toUpperCase(); this.className = Objects.requireNonNull(className); @@ -52,6 +63,7 @@ public PipePluginMeta( this.jarName = Objects.requireNonNull(jarName); this.jarMD5 = Objects.requireNonNull(jarMD5); } + this.pluginLoadingExceptionMessage = pluginLoadingExceptionMessage; } public PipePluginMeta(String pluginName, String className) { @@ -61,6 +73,7 @@ public PipePluginMeta(String pluginName, String className) { this.isBuiltin = true; this.jarName = null; this.jarMD5 = null; + this.pluginLoadingExceptionMessage = null; } public boolean isBuiltin() { @@ -83,6 +96,10 @@ public String getJarMD5() { return jarMD5; } + public String getPluginLoadingExceptionMessage() { + return pluginLoadingExceptionMessage; + } + public ByteBuffer serialize() throws IOException { PublicBAOS byteArrayOutputStream = new PublicBAOS(); DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); @@ -96,6 +113,7 @@ public void serialize(DataOutputStream outputStream) throws IOException { ReadWriteIOUtils.write(isBuiltin, outputStream); ReadWriteIOUtils.write(jarName, outputStream); ReadWriteIOUtils.write(jarMD5, outputStream); + ReadWriteIOUtils.write(pluginLoadingExceptionMessage, outputStream); } public static PipePluginMeta deserialize(ByteBuffer byteBuffer) { @@ -104,7 +122,10 @@ public static PipePluginMeta deserialize(ByteBuffer byteBuffer) { final boolean isBuiltin = ReadWriteIOUtils.readBool(byteBuffer); final String jarName = ReadWriteIOUtils.readString(byteBuffer); final String jarMD5 = ReadWriteIOUtils.readString(byteBuffer); - return new PipePluginMeta(pluginName, className, isBuiltin, jarName, jarMD5); + final String pluginLoadingExceptionMessage = + byteBuffer.hasRemaining() ? ReadWriteIOUtils.readString(byteBuffer) : null; + return new PipePluginMeta( + pluginName, className, isBuiltin, jarName, jarMD5, pluginLoadingExceptionMessage); } public static PipePluginMeta deserialize(InputStream inputStream) throws IOException { @@ -125,7 +146,8 @@ public boolean equals(Object obj) { && className.equals(that.className) && isBuiltin == that.isBuiltin && Objects.equals(jarName, that.jarName) - && Objects.equals(jarMD5, that.jarMD5); + && Objects.equals(jarMD5, that.jarMD5) + && Objects.equals(pluginLoadingExceptionMessage, that.pluginLoadingExceptionMessage); } @Override @@ -150,6 +172,9 @@ public String toString() { + ", jarMD5='" + jarMD5 + '\'' + + ", pluginLoadingExceptionMessage='" + + pluginLoadingExceptionMessage + + '\'' + '}'; } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java index f436165f9e8c3..186f7daa6846d 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java @@ -579,7 +579,8 @@ private ColumnHeaderConstant() { new ColumnHeader(PLUGIN_NAME, TSDataType.TEXT), new ColumnHeader(PLUGIN_TYPE, TSDataType.TEXT), new ColumnHeader(CLASS_NAME, TSDataType.TEXT), - new ColumnHeader(PLUGIN_JAR, TSDataType.TEXT)); + new ColumnHeader(PLUGIN_JAR, TSDataType.TEXT), + new ColumnHeader(EXCEPTION_MESSAGE, TSDataType.TEXT)); public static final List showSchemaTemplateHeaders = ImmutableList.of(new ColumnHeader(TEMPLATE_NAME, TSDataType.TEXT)); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java index 98bf4a9a83023..75b71efc75d0c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java @@ -230,6 +230,9 @@ public class InformationSchema { new AttributeColumnSchema(ColumnHeaderConstant.CLASS_NAME_TABLE_MODEL, TSDataType.STRING)); pipePluginTable.addColumnSchema( new AttributeColumnSchema(ColumnHeaderConstant.PLUGIN_JAR_TABLE_MODEL, TSDataType.STRING)); + pipePluginTable.addColumnSchema( + new AttributeColumnSchema( + ColumnHeaderConstant.EXCEPTION_MESSAGE_TABLE_MODEL, TSDataType.STRING)); schemaTables.put(PIPE_PLUGINS, pipePluginTable); final TsTable topicTable = new TsTable(TOPICS); From 125b762f7f17803e27f6582e839ee5961a5d8e66 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Wed, 15 Apr 2026 18:25:43 +0800 Subject: [PATCH 02/10] spotless --- .../persistence/pipe/PipePluginInfo.java | 178 +++++++++++++----- .../pipe/plugin/DropPipePluginProcedure.java | 19 +- .../agent/plugin/PipeDataNodePluginAgent.java | 25 +++ .../pipe/agent/runtime/PipeAgentLauncher.java | 20 +- .../constructor/PipePluginConstructor.java | 8 + .../plugin/meta/PipePluginMetaKeeper.java | 11 +- 6 files changed, 200 insertions(+), 61 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java index 4d178b3321b77..3faeb4fdda57e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java @@ -29,6 +29,7 @@ import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant; import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; +import org.apache.iotdb.commons.pipe.datastructure.visibility.Visibility; import org.apache.iotdb.commons.pipe.datastructure.visibility.VisibilityUtils; import org.apache.iotdb.commons.snapshot.SnapshotProcessor; import org.apache.iotdb.confignode.conf.ConfigNodeConfig; @@ -111,6 +112,15 @@ public boolean validateBeforeCreatingPipePlugin( final String pluginName, final boolean isSetIfNotExistsCondition) { // both build-in and user defined pipe plugin should be unique if (pipePluginMetaKeeper.containsPipePlugin(pluginName)) { + final PipePluginMeta existedPipePluginMeta = + pipePluginMetaKeeper.getPipePluginMeta(pluginName); + final String loadingFailureMessage = existedPipePluginMeta.getPluginLoadingExceptionMessage(); + if (loadingFailureMessage != null) { + throw new PipeException( + String.format( + "Failed to create PipePlugin [%s], this PipePlugin exists but failed to load: %s", + pluginName, loadingFailureMessage)); + } if (isSetIfNotExistsCondition) { return true; } @@ -177,6 +187,7 @@ public void checkPipePluginExistence( LOGGER.info(exceptionMessage); throw new PipeException(exceptionMessage); } + checkPipePluginAvailabilityForPipeCreation(sourcePluginName, "extractor"); final PipeParameters processorParameters = new PipeParameters(processorAttributes); final String processorPluginName = @@ -190,6 +201,7 @@ public void checkPipePluginExistence( LOGGER.warn(exceptionMessage); throw new PipeException(exceptionMessage); } + checkPipePluginAvailabilityForPipeCreation(processorPluginName, "processor"); final PipeParameters sinkParameters = new PipeParameters(sinkAttributes); final String sinkPluginName = @@ -204,22 +216,52 @@ public void checkPipePluginExistence( LOGGER.warn(exceptionMessage); throw new PipeException(exceptionMessage); } + checkPipePluginAvailabilityForPipeCreation(sinkPluginName, "connector"); } /////////////////////////////// Pipe Plugin Management /////////////////////////////// public TSStatus createPipePlugin(final CreatePipePluginPlan createPipePluginPlan) { + boolean shouldRecordLoadingFailure = false; + final PipePluginMeta pipePluginMeta = createPipePluginPlan.getPipePluginMeta(); try { - final PipePluginMeta pipePluginMeta = createPipePluginPlan.getPipePluginMeta(); final String pluginName = pipePluginMeta.getPluginName(); final String className = pipePluginMeta.getClassName(); final String jarName = pipePluginMeta.getJarName(); if (createPipePluginPlan.getJarFile() != null) { + shouldRecordLoadingFailure = true; savePipePluginWithRollback(createPipePluginPlan); } else { final String existed = pipePluginMetaKeeper.getPluginNameByJarName(jarName); if (Objects.nonNull(existed)) { + final PipePluginMeta existedPipePluginMeta = + pipePluginMetaKeeper.getPipePluginMeta(existed); + final String existedLoadingFailureMessage = + existedPipePluginMeta.getPluginLoadingExceptionMessage(); + if (existedLoadingFailureMessage != null) { + pipePluginMetaKeeper.addPipePluginMeta( + pluginName, + new PipePluginMeta( + pipePluginMeta.getPluginName(), + pipePluginMeta.getClassName(), + pipePluginMeta.isBuiltin(), + pipePluginMeta.getJarName(), + pipePluginMeta.getJarMD5(), + existedLoadingFailureMessage)); + pipePluginMetaKeeper.addPipePluginVisibility(pluginName, Visibility.BOTH); + throw new PipeException( + String.format( + "Failed to create PipePlugin [%s], source PipePlugin [%s] failed to load: %s", + pluginName, existed, existedLoadingFailureMessage)); + } + if (!pipePluginExecutableManager.hasPluginFileUnderInstallDir(existed, jarName)) { + throw new PipeException( + String.format( + "Failed to create PipePlugin [%s], source PipePlugin [%s] jar [%s] does not exist in install dir.", + pluginName, existed, jarName)); + } + shouldRecordLoadingFailure = true; pipePluginExecutableManager.linkExistedPlugin(existed, pluginName, jarName); computeFromPluginClass(pluginName, className); } else { @@ -237,7 +279,10 @@ public TSStatus createPipePlugin(final CreatePipePluginPlan createPipePluginPlan pipePluginMetaKeeper.addJarNameAndMd5(jarName, pipePluginMeta.getJarMD5()); return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); - } catch (final Exception e) { + } catch (final Throwable e) { + if (shouldRecordLoadingFailure) { + savePipePluginLoadingFailure(pipePluginMeta, e); + } final String errorMessage = String.format( "Failed to execute createPipePlugin(%s) on config nodes, because of %s", @@ -249,7 +294,7 @@ public TSStatus createPipePlugin(final CreatePipePluginPlan createPipePluginPlan } private void savePipePluginWithRollback(final CreatePipePluginPlan createPipePluginPlan) - throws Exception { + throws Throwable { final PipePluginMeta pipePluginMeta = createPipePluginPlan.getPipePluginMeta(); final String pluginName = pipePluginMeta.getPluginName(); final String className = pipePluginMeta.getClassName(); @@ -258,7 +303,7 @@ private void savePipePluginWithRollback(final CreatePipePluginPlan createPipePlu pipePluginExecutableManager.savePluginToInstallDir( ByteBuffer.wrap(createPipePluginPlan.getJarFile().getValues()), pluginName, jarName); computeFromPluginClass(pluginName, className); - } catch (final Exception e) { + } catch (final Throwable e) { // We need to rollback if the creation has failed pipePluginExecutableManager.removePluginFileUnderLibRoot(pluginName, jarName); throw e; @@ -266,7 +311,7 @@ private void savePipePluginWithRollback(final CreatePipePluginPlan createPipePlu } private void computeFromPluginClass(final String pluginName, final String className) - throws Exception { + throws Throwable { final String pluginDirPath = pipePluginExecutableManager.getPluginsDirPath(pluginName); final PipePluginClassLoader pipePluginClassLoader = classLoaderManager.createPipePluginClassLoader(pluginDirPath); @@ -275,7 +320,7 @@ private void computeFromPluginClass(final String pluginName, final String classN pipePluginMetaKeeper.addPipePluginVisibility( pluginName, VisibilityUtils.calculateFromPluginClass(pluginClass)); classLoaderManager.addPluginAndClassLoader(pluginName, pipePluginClassLoader); - } catch (final Exception e) { + } catch (final Throwable e) { try { pipePluginClassLoader.close(); } catch (final Exception ignored) { @@ -402,49 +447,7 @@ public void processLoadSnapshot(final File snapshotDir) throws IOException { if (pipePluginMeta.isBuiltin()) { continue; } - final String pluginName = pipePluginMeta.getPluginName(); - try { - final String pluginDirPath = pipePluginExecutableManager.getPluginsDirPath(pluginName); - final PipePluginClassLoader pipePluginClassLoader = - classLoaderManager.createPipePluginClassLoader(pluginDirPath); - try { - final Class pluginClass = - Class.forName(pipePluginMeta.getClassName(), true, pipePluginClassLoader); - pipePluginMetaKeeper.addPipePluginVisibility( - pluginName, VisibilityUtils.calculateFromPluginClass(pluginClass)); - pipePluginMetaKeeper.addPipePluginMeta( - pluginName, - new PipePluginMeta( - pipePluginMeta.getPluginName(), - pipePluginMeta.getClassName(), - pipePluginMeta.isBuiltin(), - pipePluginMeta.getJarName(), - pipePluginMeta.getJarMD5(), - null)); - classLoaderManager.addPluginAndClassLoader(pluginName, pipePluginClassLoader); - } catch (final Throwable e) { - try { - pipePluginClassLoader.close(); - } catch (final Exception ignored) { - } - throw e; - } - } catch (final Throwable e) { - pipePluginMetaKeeper.addPipePluginMeta( - pluginName, - new PipePluginMeta( - pipePluginMeta.getPluginName(), - pipePluginMeta.getClassName(), - pipePluginMeta.isBuiltin(), - pipePluginMeta.getJarName(), - pipePluginMeta.getJarMD5(), - getRootCauseMessage(e))); - LOGGER.warn( - "Failed to load plugin class for plugin [{}] when loading snapshot [{}] ", - pluginName, - snapshotFile.getAbsolutePath(), - e); - } + createPipePluginOnStartup(pipePluginMeta, snapshotFile); } } finally { releasePipePluginInfoLock(); @@ -460,6 +463,83 @@ private String getRootCauseMessage(final Throwable throwable) { return current.getClass().getSimpleName() + (message == null ? "" : (": " + message)); } + private void savePipePluginLoadingFailure( + final PipePluginMeta pipePluginMeta, final Throwable throwable) { + final String pluginName = pipePluginMeta.getPluginName(); + pipePluginMetaKeeper.addPipePluginMeta( + pluginName, + new PipePluginMeta( + pipePluginMeta.getPluginName(), + pipePluginMeta.getClassName(), + pipePluginMeta.isBuiltin(), + pipePluginMeta.getJarName(), + pipePluginMeta.getJarMD5(), + getRootCauseMessage(throwable))); + pipePluginMetaKeeper.addPipePluginVisibility(pluginName, Visibility.BOTH); + } + + private void checkPipePluginAvailabilityForPipeCreation( + final String pluginName, final String pluginType) { + final PipePluginMeta pipePluginMeta = pipePluginMetaKeeper.getPipePluginMeta(pluginName); + final String loadingFailureMessage = pipePluginMeta.getPluginLoadingExceptionMessage(); + if (loadingFailureMessage != null) { + final String exceptionMessage = + String.format( + "Failed to create or alter pipe, the pipe %s plugin %s failed to load: %s", + pluginType, pluginName, loadingFailureMessage); + LOGGER.warn(exceptionMessage); + throw new PipeException(exceptionMessage); + } + } + + private void createPipePluginOnStartup( + final PipePluginMeta pipePluginMeta, final File snapshotFile) { + final String pluginName = pipePluginMeta.getPluginName(); + try { + final String pluginDirPath = pipePluginExecutableManager.getPluginsDirPath(pluginName); + final PipePluginClassLoader pipePluginClassLoader = + classLoaderManager.createPipePluginClassLoader(pluginDirPath); + try { + final Class pluginClass = + Class.forName(pipePluginMeta.getClassName(), true, pipePluginClassLoader); + pipePluginMetaKeeper.addPipePluginVisibility( + pluginName, VisibilityUtils.calculateFromPluginClass(pluginClass)); + pipePluginMetaKeeper.addPipePluginMeta( + pluginName, + new PipePluginMeta( + pipePluginMeta.getPluginName(), + pipePluginMeta.getClassName(), + pipePluginMeta.isBuiltin(), + pipePluginMeta.getJarName(), + pipePluginMeta.getJarMD5(), + null)); + classLoaderManager.addPluginAndClassLoader(pluginName, pipePluginClassLoader); + } catch (final Throwable e) { + try { + pipePluginClassLoader.close(); + } catch (final Exception ignored) { + } + throw e; + } + } catch (final Throwable e) { + pipePluginMetaKeeper.addPipePluginMeta( + pluginName, + new PipePluginMeta( + pipePluginMeta.getPluginName(), + pipePluginMeta.getClassName(), + pipePluginMeta.isBuiltin(), + pipePluginMeta.getJarName(), + pipePluginMeta.getJarMD5(), + getRootCauseMessage(e))); + pipePluginMetaKeeper.addPipePluginVisibility(pluginName, Visibility.BOTH); + LOGGER.warn( + "Failed to load plugin class for plugin [{}] when loading snapshot [{}] ", + pluginName, + snapshotFile.getAbsolutePath(), + e); + } + } + /////////////////////////////// hashCode & equals /////////////////////////////// @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java index efbe1ee6ccdda..ab48a2478506c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java @@ -19,6 +19,7 @@ package org.apache.iotdb.confignode.procedure.impl.pipe.plugin; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan; import org.apache.iotdb.confignode.manager.pipe.coordinator.plugin.PipePluginCoordinator; import org.apache.iotdb.confignode.manager.pipe.coordinator.task.PipeTaskCoordinator; @@ -35,7 +36,6 @@ import org.apache.iotdb.confignode.procedure.store.ProcedureType; import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.pipe.api.exception.PipeException; -import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.tsfile.utils.ReadWriteIOUtils; @@ -45,6 +45,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; @@ -162,8 +163,8 @@ private Flow executeFromLock(ConfigNodeProcedureEnv env) { private Flow executeFromDropOnDataNodes(ConfigNodeProcedureEnv env) { LOGGER.info("DropPipePluginProcedure: executeFromDropOnDataNodes({})", pluginName); - if (RpcUtils.squashResponseStatusList(env.dropPipePluginOnDataNodes(pluginName, true)).getCode() - == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + final List dropStatusList = env.dropPipePluginOnDataNodes(pluginName, true); + if (dropStatusList.stream().allMatch(this::isDropPipePluginSuccessOrNotExists)) { setNextState(DropPipePluginState.DROP_ON_CONFIG_NODES); return Flow.HAS_MORE_STATE; } @@ -172,6 +173,18 @@ private Flow executeFromDropOnDataNodes(ConfigNodeProcedureEnv env) { String.format("Failed to drop pipe plugin %s on data nodes", pluginName)); } + private boolean isDropPipePluginSuccessOrNotExists(final TSStatus status) { + if (status == null) { + return false; + } + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return true; + } + final String message = status.getMessage(); + return message != null + && (message.contains("does not exist") || message.contains("not been created")); + } + private Flow executeFromDropOnConfigNodes(ConfigNodeProcedureEnv env) { LOGGER.info("DropPipePluginProcedure: executeFromDropOnConfigNodes({})", pluginName); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java index d57956109d46d..567893c2cbe09 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginClassLoader; import org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginClassLoaderManager; import org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginExecutableManager; +import org.apache.iotdb.commons.pipe.datastructure.visibility.Visibility; import org.apache.iotdb.commons.pipe.datastructure.visibility.VisibilityUtils; import org.apache.iotdb.db.pipe.agent.plugin.dataregion.PipeDataRegionPluginAgent; import org.apache.iotdb.db.pipe.agent.plugin.schemaregion.PipeSchemaRegionPluginAgent; @@ -176,6 +177,30 @@ public void doRegister(final PipePluginMeta pipePluginMeta) throws PipeException } } + public void markPluginLoadFailure( + final PipePluginMeta pipePluginMeta, final Throwable throwable) { + final String pluginName = pipePluginMeta.getPluginName(); + pipePluginMetaKeeper.addPipePluginMeta( + pluginName, + new PipePluginMeta( + pipePluginMeta.getPluginName(), + pipePluginMeta.getClassName(), + pipePluginMeta.isBuiltin(), + pipePluginMeta.getJarName(), + pipePluginMeta.getJarMD5(), + getRootCauseMessage(throwable))); + pipePluginMetaKeeper.addPipePluginVisibility(pluginName, Visibility.BOTH); + } + + private String getRootCauseMessage(final Throwable throwable) { + Throwable current = throwable; + while (current.getCause() != null && current.getCause() != current) { + current = current.getCause(); + } + final String message = current.getMessage(); + return current.getClass().getSimpleName() + (message == null ? "" : (": " + message)); + } + public void deregister(final String pluginName, final boolean needToDeleteJar) throws PipeException { lock.lock(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java index 5a408a319e7f0..286c8a5eaebfe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java @@ -83,16 +83,20 @@ public static synchronized void launchPipePluginAgent( } // create instances of pipe plugins and do registration - try { - for (PipePluginMeta meta : resourcesInformationHolder.getPipePluginMetaList()) { - if (meta.isBuiltin()) { - continue; - } + for (PipePluginMeta meta : resourcesInformationHolder.getPipePluginMetaList()) { + if (meta.isBuiltin()) { + continue; + } + try { PipeDataNodeAgent.plugin().doRegister(meta); + } catch (Throwable e) { + PipeDataNodeAgent.plugin().markPluginLoadFailure(meta, e); + // Ignore a single broken plugin and continue startup. + LOGGER.warn( + "Failure when register pipe plugin {}. Skip this plugin and continue startup.", + meta.getPluginName(), + e); } - } catch (Throwable e) { - // Ignore the pipe plugin errors and continue to start - LOGGER.warn("Failure when register pipe plugins, will ignore.", e); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipePluginConstructor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipePluginConstructor.java index cbb25340f0099..6e87444c2d935 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipePluginConstructor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipePluginConstructor.java @@ -81,6 +81,14 @@ private PipePlugin reflect(String pluginName) { LOGGER.warn(errorMessage); throw new PipeException(errorMessage); } + if (information.getPluginLoadingExceptionMessage() != null) { + final String errorMessage = + String.format( + "Failed to reflect PipePlugin instance, because PipePlugin %s failed to load: %s", + pluginName.toUpperCase(), information.getPluginLoadingExceptionMessage()); + LOGGER.warn(errorMessage); + throw new PipeException(errorMessage); + } try { final Class pluginClass = diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMetaKeeper.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMetaKeeper.java index 6f4ee1caa2f95..43fb0c5909d32 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMetaKeeper.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMetaKeeper.java @@ -125,7 +125,16 @@ protected void processTakeSnapshot(OutputStream outputStream) throws IOException if (pipePluginMeta.isBuiltin()) { continue; } - ReadWriteIOUtils.write(pipePluginMeta.serialize(), outputStream); + ReadWriteIOUtils.write( + new PipePluginMeta( + pipePluginMeta.getPluginName(), + pipePluginMeta.getClassName(), + pipePluginMeta.isBuiltin(), + pipePluginMeta.getJarName(), + pipePluginMeta.getJarMD5(), + null) + .serialize(), + outputStream); } } From edd76d5bfaed804b205249fafdbdf95cb97c027a Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Wed, 15 Apr 2026 18:57:37 +0800 Subject: [PATCH 03/10] spotless --- .../commons/pipe/agent/plugin/meta/PipePluginMeta.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java index 1816bf855ffcf..aa91eb7df5a12 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java @@ -113,7 +113,6 @@ public void serialize(DataOutputStream outputStream) throws IOException { ReadWriteIOUtils.write(isBuiltin, outputStream); ReadWriteIOUtils.write(jarName, outputStream); ReadWriteIOUtils.write(jarMD5, outputStream); - ReadWriteIOUtils.write(pluginLoadingExceptionMessage, outputStream); } public static PipePluginMeta deserialize(ByteBuffer byteBuffer) { @@ -122,10 +121,8 @@ public static PipePluginMeta deserialize(ByteBuffer byteBuffer) { final boolean isBuiltin = ReadWriteIOUtils.readBool(byteBuffer); final String jarName = ReadWriteIOUtils.readString(byteBuffer); final String jarMD5 = ReadWriteIOUtils.readString(byteBuffer); - final String pluginLoadingExceptionMessage = - byteBuffer.hasRemaining() ? ReadWriteIOUtils.readString(byteBuffer) : null; return new PipePluginMeta( - pluginName, className, isBuiltin, jarName, jarMD5, pluginLoadingExceptionMessage); + pluginName, className, isBuiltin, jarName, jarMD5, null); } public static PipePluginMeta deserialize(InputStream inputStream) throws IOException { @@ -146,8 +143,7 @@ public boolean equals(Object obj) { && className.equals(that.className) && isBuiltin == that.isBuiltin && Objects.equals(jarName, that.jarName) - && Objects.equals(jarMD5, that.jarMD5) - && Objects.equals(pluginLoadingExceptionMessage, that.pluginLoadingExceptionMessage); + && Objects.equals(jarMD5, that.jarMD5); } @Override From 6015e477e2eee1b792c25896efab35d77d627b5c Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Wed, 15 Apr 2026 19:06:54 +0800 Subject: [PATCH 04/10] fix --- .../iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java index aa91eb7df5a12..2c0ab82d026ca 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java @@ -121,8 +121,7 @@ public static PipePluginMeta deserialize(ByteBuffer byteBuffer) { final boolean isBuiltin = ReadWriteIOUtils.readBool(byteBuffer); final String jarName = ReadWriteIOUtils.readString(byteBuffer); final String jarMD5 = ReadWriteIOUtils.readString(byteBuffer); - return new PipePluginMeta( - pluginName, className, isBuiltin, jarName, jarMD5, null); + return new PipePluginMeta(pluginName, className, isBuiltin, jarName, jarMD5, null); } public static PipePluginMeta deserialize(InputStream inputStream) throws IOException { From 178ec5376e41b6f44788733e710091a6a4ad2e6e Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Thu, 16 Apr 2026 10:01:46 +0800 Subject: [PATCH 05/10] fix --- .../iotdb/confignode/persistence/pipe/PipePluginInfo.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java index 3faeb4fdda57e..0eb10298ba1a1 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java @@ -187,7 +187,7 @@ public void checkPipePluginExistence( LOGGER.info(exceptionMessage); throw new PipeException(exceptionMessage); } - checkPipePluginAvailabilityForPipeCreation(sourcePluginName, "extractor"); + checkPipePluginAvailabilityForPipeCreation(sourcePluginName, "source"); final PipeParameters processorParameters = new PipeParameters(processorAttributes); final String processorPluginName = @@ -216,7 +216,7 @@ public void checkPipePluginExistence( LOGGER.warn(exceptionMessage); throw new PipeException(exceptionMessage); } - checkPipePluginAvailabilityForPipeCreation(sinkPluginName, "connector"); + checkPipePluginAvailabilityForPipeCreation(sinkPluginName, "sink"); } /////////////////////////////// Pipe Plugin Management /////////////////////////////// From 75d4b869767a6b209e41c9c2a15707096e95d727 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Thu, 16 Apr 2026 10:50:42 +0800 Subject: [PATCH 06/10] fix --- .../relational/it/schema/IoTDBDatabaseIT.java | 7 ++++--- .../persistence/pipe/PipePluginInfo.java | 18 ------------------ 2 files changed, 4 insertions(+), 21 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java index 398830efaec56..3ff36d0cad9b2 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java @@ -490,7 +490,8 @@ public void testInformationSchema() throws SQLException { "plugin_name,STRING,TAG,", "plugin_type,STRING,ATTRIBUTE,", "class_name,STRING,ATTRIBUTE,", - "plugin_jar,STRING,ATTRIBUTE,"))); + "plugin_jar,STRING,ATTRIBUTE,", + "exception_message,STRING,ATTRIBUTE,"))); TestUtils.assertResultSetEqual( statement.executeQuery("desc topics"), "ColumnName,DataType,Category,", @@ -708,9 +709,9 @@ public void testInformationSchema() throws SQLException { TestUtils.assertResultSetEqual( statement.executeQuery( "select * from pipe_plugins where plugin_name = 'IOTDB-THRIFT-SINK'"), - "plugin_name,plugin_type,class_name,plugin_jar,", + "plugin_name,plugin_type,class_name,plugin_jar,exception_message,", Collections.singleton( - "IOTDB-THRIFT-SINK,Builtin,org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBThriftSink,null,")); + "IOTDB-THRIFT-SINK,Builtin,org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBThriftSink,null,null,")); TestUtils.assertResultSetEqual( statement.executeQuery("select * from views"), diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java index 0eb10298ba1a1..9a05351d08337 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java @@ -280,9 +280,6 @@ public TSStatus createPipePlugin(final CreatePipePluginPlan createPipePluginPlan return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } catch (final Throwable e) { - if (shouldRecordLoadingFailure) { - savePipePluginLoadingFailure(pipePluginMeta, e); - } final String errorMessage = String.format( "Failed to execute createPipePlugin(%s) on config nodes, because of %s", @@ -463,21 +460,6 @@ private String getRootCauseMessage(final Throwable throwable) { return current.getClass().getSimpleName() + (message == null ? "" : (": " + message)); } - private void savePipePluginLoadingFailure( - final PipePluginMeta pipePluginMeta, final Throwable throwable) { - final String pluginName = pipePluginMeta.getPluginName(); - pipePluginMetaKeeper.addPipePluginMeta( - pluginName, - new PipePluginMeta( - pipePluginMeta.getPluginName(), - pipePluginMeta.getClassName(), - pipePluginMeta.isBuiltin(), - pipePluginMeta.getJarName(), - pipePluginMeta.getJarMD5(), - getRootCauseMessage(throwable))); - pipePluginMetaKeeper.addPipePluginVisibility(pluginName, Visibility.BOTH); - } - private void checkPipePluginAvailabilityForPipeCreation( final String pluginName, final String pluginType) { final PipePluginMeta pipePluginMeta = pipePluginMetaKeeper.getPipePluginMeta(pluginName); From dd7cbc88a9c3018a357d06155b9933822ea370f1 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Thu, 16 Apr 2026 11:27:42 +0800 Subject: [PATCH 07/10] fix --- .../persistence/pipe/PipePluginInfo.java | 13 --------- .../agent/plugin/meta/PipePluginMeta.java | 27 +++++++++++++++++-- .../plugin/meta/PipePluginMetaKeeper.java | 11 +------- 3 files changed, 26 insertions(+), 25 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java index 9a05351d08337..30771e78e4696 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java @@ -222,7 +222,6 @@ public void checkPipePluginExistence( /////////////////////////////// Pipe Plugin Management /////////////////////////////// public TSStatus createPipePlugin(final CreatePipePluginPlan createPipePluginPlan) { - boolean shouldRecordLoadingFailure = false; final PipePluginMeta pipePluginMeta = createPipePluginPlan.getPipePluginMeta(); try { final String pluginName = pipePluginMeta.getPluginName(); @@ -230,7 +229,6 @@ public TSStatus createPipePlugin(final CreatePipePluginPlan createPipePluginPlan final String jarName = pipePluginMeta.getJarName(); if (createPipePluginPlan.getJarFile() != null) { - shouldRecordLoadingFailure = true; savePipePluginWithRollback(createPipePluginPlan); } else { final String existed = pipePluginMetaKeeper.getPluginNameByJarName(jarName); @@ -240,16 +238,6 @@ public TSStatus createPipePlugin(final CreatePipePluginPlan createPipePluginPlan final String existedLoadingFailureMessage = existedPipePluginMeta.getPluginLoadingExceptionMessage(); if (existedLoadingFailureMessage != null) { - pipePluginMetaKeeper.addPipePluginMeta( - pluginName, - new PipePluginMeta( - pipePluginMeta.getPluginName(), - pipePluginMeta.getClassName(), - pipePluginMeta.isBuiltin(), - pipePluginMeta.getJarName(), - pipePluginMeta.getJarMD5(), - existedLoadingFailureMessage)); - pipePluginMetaKeeper.addPipePluginVisibility(pluginName, Visibility.BOTH); throw new PipeException( String.format( "Failed to create PipePlugin [%s], source PipePlugin [%s] failed to load: %s", @@ -261,7 +249,6 @@ public TSStatus createPipePlugin(final CreatePipePluginPlan createPipePluginPlan "Failed to create PipePlugin [%s], source PipePlugin [%s] jar [%s] does not exist in install dir.", pluginName, existed, jarName)); } - shouldRecordLoadingFailure = true; pipePluginExecutableManager.linkExistedPlugin(existed, pluginName, jarName); computeFromPluginClass(pluginName, className); } else { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java index 2c0ab82d026ca..31fb17b309481 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java @@ -101,18 +101,38 @@ public String getPluginLoadingExceptionMessage() { } public ByteBuffer serialize() throws IOException { + return serialize(true); + } + + public ByteBuffer serializeForState() throws IOException { + return serialize(false); + } + + private ByteBuffer serialize(final boolean includeExceptionMessage) throws IOException { PublicBAOS byteArrayOutputStream = new PublicBAOS(); DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); - serialize(outputStream); + serialize(outputStream, includeExceptionMessage); return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); } public void serialize(DataOutputStream outputStream) throws IOException { + serialize(outputStream, true); + } + + public void serializeForState(DataOutputStream outputStream) throws IOException { + serialize(outputStream, false); + } + + private void serialize(DataOutputStream outputStream, boolean includeExceptionMessage) + throws IOException { ReadWriteIOUtils.write(pluginName, outputStream); ReadWriteIOUtils.write(className, outputStream); ReadWriteIOUtils.write(isBuiltin, outputStream); ReadWriteIOUtils.write(jarName, outputStream); ReadWriteIOUtils.write(jarMD5, outputStream); + if (includeExceptionMessage) { + ReadWriteIOUtils.write(pluginLoadingExceptionMessage, outputStream); + } } public static PipePluginMeta deserialize(ByteBuffer byteBuffer) { @@ -121,7 +141,10 @@ public static PipePluginMeta deserialize(ByteBuffer byteBuffer) { final boolean isBuiltin = ReadWriteIOUtils.readBool(byteBuffer); final String jarName = ReadWriteIOUtils.readString(byteBuffer); final String jarMD5 = ReadWriteIOUtils.readString(byteBuffer); - return new PipePluginMeta(pluginName, className, isBuiltin, jarName, jarMD5, null); + final String pluginLoadingExceptionMessage = + byteBuffer.hasRemaining() ? ReadWriteIOUtils.readString(byteBuffer) : null; + return new PipePluginMeta( + pluginName, className, isBuiltin, jarName, jarMD5, pluginLoadingExceptionMessage); } public static PipePluginMeta deserialize(InputStream inputStream) throws IOException { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMetaKeeper.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMetaKeeper.java index 43fb0c5909d32..90a42d2376993 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMetaKeeper.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMetaKeeper.java @@ -125,16 +125,7 @@ protected void processTakeSnapshot(OutputStream outputStream) throws IOException if (pipePluginMeta.isBuiltin()) { continue; } - ReadWriteIOUtils.write( - new PipePluginMeta( - pipePluginMeta.getPluginName(), - pipePluginMeta.getClassName(), - pipePluginMeta.isBuiltin(), - pipePluginMeta.getJarName(), - pipePluginMeta.getJarMD5(), - null) - .serialize(), - outputStream); + ReadWriteIOUtils.write(pipePluginMeta.serializeForState(), outputStream); } } From 497ed8cf8c28a0138675ff7f67e4360bcdce9874 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Thu, 16 Apr 2026 11:43:27 +0800 Subject: [PATCH 08/10] fix --- .../pipe/plugin/PipePluginTableResp.java | 2 +- ...formationSchemaContentSupplierFactory.java | 2 +- .../config/metadata/ShowPipePluginsTask.java | 3 +- .../agent/plugin/meta/PipePluginMeta.java | 46 +++++++++---------- 4 files changed, 27 insertions(+), 26 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/plugin/PipePluginTableResp.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/plugin/PipePluginTableResp.java index 534b824ef47df..93642f50312b9 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/plugin/PipePluginTableResp.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/plugin/PipePluginTableResp.java @@ -51,7 +51,7 @@ public PipePluginTableResp( public TGetPipePluginTableResp convertToThriftResponse() throws IOException { final List pipePluginInformationByteBuffers = new ArrayList<>(); for (PipePluginMeta pipePluginMeta : allPipePluginMeta) { - pipePluginInformationByteBuffers.add(pipePluginMeta.serialize()); + pipePluginInformationByteBuffers.add(pipePluginMeta.serializeForShowPipePlugin()); } return new TGetPipePluginTableResp(status, pipePluginInformationByteBuffers); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java index a5c95db9ede79..5b51c00942bb7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java @@ -685,7 +685,7 @@ private PipePluginSupplier(final List dataTypes, final UserEntity en ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { iterator = client.getPipePluginTable().getAllPipePluginMeta().stream() - .map(PipePluginMeta::deserialize) + .map(PipePluginMeta::deserializeForShowPipePlugin) .filter( pipePluginMeta -> !BuiltinPipePlugin.SHOW_PIPE_PLUGINS_BLACKLIST.contains( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowPipePluginsTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowPipePluginsTask.java index cd6edab749421..bffc9fe43d9fc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowPipePluginsTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowPipePluginsTask.java @@ -75,7 +75,8 @@ public static void buildTsBlock( final List pipePluginMetaList = new ArrayList<>(); if (allPipePluginsInformation != null) { for (final ByteBuffer pipePluginInformationByteBuffer : allPipePluginsInformation) { - pipePluginMetaList.add(PipePluginMeta.deserialize(pipePluginInformationByteBuffer)); + pipePluginMetaList.add( + PipePluginMeta.deserializeForShowPipePlugin(pipePluginInformationByteBuffer)); } } pipePluginMetaList.sort(Comparator.comparing(PipePluginMeta::getPluginName)); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java index 31fb17b309481..19f6863be3ca3 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java @@ -101,38 +101,30 @@ public String getPluginLoadingExceptionMessage() { } public ByteBuffer serialize() throws IOException { - return serialize(true); - } - - public ByteBuffer serializeForState() throws IOException { - return serialize(false); + PublicBAOS byteArrayOutputStream = new PublicBAOS(); + DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); + serialize(outputStream); + return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); } - private ByteBuffer serialize(final boolean includeExceptionMessage) throws IOException { + public ByteBuffer serializeForShowPipePlugin() throws IOException { PublicBAOS byteArrayOutputStream = new PublicBAOS(); DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); - serialize(outputStream, includeExceptionMessage); + serializeForShowPipePlugin(outputStream); return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); } public void serialize(DataOutputStream outputStream) throws IOException { - serialize(outputStream, true); - } - - public void serializeForState(DataOutputStream outputStream) throws IOException { - serialize(outputStream, false); - } - - private void serialize(DataOutputStream outputStream, boolean includeExceptionMessage) - throws IOException { ReadWriteIOUtils.write(pluginName, outputStream); ReadWriteIOUtils.write(className, outputStream); ReadWriteIOUtils.write(isBuiltin, outputStream); ReadWriteIOUtils.write(jarName, outputStream); ReadWriteIOUtils.write(jarMD5, outputStream); - if (includeExceptionMessage) { - ReadWriteIOUtils.write(pluginLoadingExceptionMessage, outputStream); - } + } + + public void serializeForShowPipePlugin(DataOutputStream outputStream) throws IOException { + serialize(outputStream); + ReadWriteIOUtils.write(pluginLoadingExceptionMessage, outputStream); } public static PipePluginMeta deserialize(ByteBuffer byteBuffer) { @@ -141,10 +133,7 @@ public static PipePluginMeta deserialize(ByteBuffer byteBuffer) { final boolean isBuiltin = ReadWriteIOUtils.readBool(byteBuffer); final String jarName = ReadWriteIOUtils.readString(byteBuffer); final String jarMD5 = ReadWriteIOUtils.readString(byteBuffer); - final String pluginLoadingExceptionMessage = - byteBuffer.hasRemaining() ? ReadWriteIOUtils.readString(byteBuffer) : null; - return new PipePluginMeta( - pluginName, className, isBuiltin, jarName, jarMD5, pluginLoadingExceptionMessage); + return new PipePluginMeta(pluginName, className, isBuiltin, jarName, jarMD5, null); } public static PipePluginMeta deserialize(InputStream inputStream) throws IOException { @@ -152,6 +141,17 @@ public static PipePluginMeta deserialize(InputStream inputStream) throws IOExcep ByteBuffer.wrap(ReadWriteIOUtils.readBytesWithSelfDescriptionLength(inputStream))); } + public static PipePluginMeta deserializeForShowPipePlugin(ByteBuffer byteBuffer) { + final String pluginName = ReadWriteIOUtils.readString(byteBuffer); + final String className = ReadWriteIOUtils.readString(byteBuffer); + final boolean isBuiltin = ReadWriteIOUtils.readBool(byteBuffer); + final String jarName = ReadWriteIOUtils.readString(byteBuffer); + final String jarMD5 = ReadWriteIOUtils.readString(byteBuffer); + final String pluginLoadingExceptionMessage = ReadWriteIOUtils.readString(byteBuffer); + return new PipePluginMeta( + pluginName, className, isBuiltin, jarName, jarMD5, pluginLoadingExceptionMessage); + } + @Override public boolean equals(Object obj) { if (this == obj) { From e84c7ffd03b436c34c63a4bfeeb22c42cfb48cbf Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Thu, 16 Apr 2026 11:50:08 +0800 Subject: [PATCH 09/10] fix --- .../commons/pipe/agent/plugin/meta/PipePluginMetaKeeper.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMetaKeeper.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMetaKeeper.java index 90a42d2376993..6f4ee1caa2f95 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMetaKeeper.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMetaKeeper.java @@ -125,7 +125,7 @@ protected void processTakeSnapshot(OutputStream outputStream) throws IOException if (pipePluginMeta.isBuiltin()) { continue; } - ReadWriteIOUtils.write(pipePluginMeta.serializeForState(), outputStream); + ReadWriteIOUtils.write(pipePluginMeta.serialize(), outputStream); } } From f8ace87381b210461d055457c6aea9f827958b61 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Thu, 16 Apr 2026 12:16:14 +0800 Subject: [PATCH 10/10] fix --- .../consensus/response/pipe/PipePluginTableRespTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipePluginTableRespTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipePluginTableRespTest.java index 64551f7920347..146a6e14d0bf7 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipePluginTableRespTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipePluginTableRespTest.java @@ -49,7 +49,7 @@ public void testConvertToThriftResponse() throws IOException { final List pipePluginByteBuffers = new ArrayList<>(); for (PipePluginMeta pipePluginMeta : pipePluginMetaList) { - pipePluginByteBuffers.add(pipePluginMeta.serialize()); + pipePluginByteBuffers.add(pipePluginMeta.serializeForShowPipePlugin()); } TGetPipePluginTableResp getPipePluginTableResp = new TGetPipePluginTableResp(status, pipePluginByteBuffers);