From fb7c32fb5eb6f9f4e3aa3bda902ee03ce18a9d30 Mon Sep 17 00:00:00 2001 From: Keerthivasan Venkitajalam Date: Sun, 26 Apr 2026 22:06:00 +0530 Subject: [PATCH 1/4] feat(mcp): add workflow automation MCP tools for ingestion pipeline management Addresses the Workflow Automation category from issue #26609 (New MCP Tools). Adds three new MCP tools: - list_ingestion_pipelines: list pipelines by service/type with pagination - get_pipeline_status: fetch latest run status and recent run history - trigger_ingestion_pipeline: trigger an immediate pipeline run on demand Also adds McpApplicationContext to store OpenMetadataApplicationConfig at server init time, making it accessible from MCP tools without reflection. --- .../mcp/McpApplicationContext.java | 18 +++ .../java/org/openmetadata/mcp/McpServer.java | 1 + .../mcp/tools/DefaultToolContext.java | 9 ++ .../mcp/tools/GetPipelineStatusTool.java | 81 ++++++++++++++ .../mcp/tools/ListIngestionPipelinesTool.java | 105 ++++++++++++++++++ .../tools/TriggerIngestionPipelineTool.java | 79 +++++++++++++ .../main/resources/json/data/mcp/tools.json | 59 ++++++++++ 7 files changed, 352 insertions(+) create mode 100644 openmetadata-mcp/src/main/java/org/openmetadata/mcp/McpApplicationContext.java create mode 100644 openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/GetPipelineStatusTool.java create mode 100644 openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/ListIngestionPipelinesTool.java create mode 100644 openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/TriggerIngestionPipelineTool.java diff --git a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/McpApplicationContext.java b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/McpApplicationContext.java new file mode 100644 index 000000000000..58782ff0d8cf --- /dev/null +++ b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/McpApplicationContext.java @@ -0,0 +1,18 @@ +package org.openmetadata.mcp; + +import org.openmetadata.service.OpenMetadataApplicationConfig; + +/** Holds application-level singletons needed by MCP tools at runtime. */ +public class McpApplicationContext { + private static volatile OpenMetadataApplicationConfig config; + + private McpApplicationContext() {} + + public static void setConfig(OpenMetadataApplicationConfig applicationConfig) { + config = applicationConfig; + } + + public static OpenMetadataApplicationConfig getConfig() { + return config; + } +} diff --git a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/McpServer.java b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/McpServer.java index 9a6711715485..ead942f3e383 100644 --- a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/McpServer.java +++ b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/McpServer.java @@ -61,6 +61,7 @@ public void initializeMcpServer( SecurityConfigurationManager.getCurrentAuthzConfig()); this.authorizer = authorizer; this.limits = limits; + McpApplicationContext.setConfig(config); this.environment = environment; MutableServletContextHandler contextHandler = environment.getApplicationContext(); List tools = getTools(); diff --git a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/DefaultToolContext.java b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/DefaultToolContext.java index ad9d5d2448dd..de96036ef334 100644 --- a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/DefaultToolContext.java +++ b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/DefaultToolContext.java @@ -83,6 +83,15 @@ public McpSchema.CallToolResult callTool( case "create_metric": result = new CreateMetricTool().execute(authorizer, limits, securityContext, params); break; + case "list_ingestion_pipelines": + result = new ListIngestionPipelinesTool().execute(authorizer, securityContext, params); + break; + case "get_pipeline_status": + result = new GetPipelineStatusTool().execute(authorizer, securityContext, params); + break; + case "trigger_ingestion_pipeline": + result = new TriggerIngestionPipelineTool().execute(authorizer, securityContext, params); + break; default: return McpSchema.CallToolResult.builder() .content( diff --git a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/GetPipelineStatusTool.java b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/GetPipelineStatusTool.java new file mode 100644 index 000000000000..7db49476c907 --- /dev/null +++ b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/GetPipelineStatusTool.java @@ -0,0 +1,81 @@ +package org.openmetadata.mcp.tools; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline; +import org.openmetadata.schema.type.MetadataOperation; +import org.openmetadata.schema.utils.JsonUtils; +import org.openmetadata.service.Entity; +import org.openmetadata.service.jdbi3.IngestionPipelineRepository; +import org.openmetadata.service.limits.Limits; +import org.openmetadata.service.security.Authorizer; +import org.openmetadata.service.security.auth.CatalogSecurityContext; +import org.openmetadata.service.security.policyevaluator.OperationContext; +import org.openmetadata.service.security.policyevaluator.ResourceContext; + +@Slf4j +public class GetPipelineStatusTool implements McpTool { + + @Override + public Map execute( + Authorizer authorizer, CatalogSecurityContext securityContext, Map params) + throws IOException { + String fqn = (String) params.get("fqn"); + if (fqn == null || fqn.isBlank()) { + throw new IllegalArgumentException("Parameter 'fqn' is required"); + } + + int limit = 5; + if (params.containsKey("limit")) { + Object limitObj = params.get("limit"); + if (limitObj instanceof Number number) { + limit = number.intValue(); + } else if (limitObj instanceof String s) { + try { + limit = Integer.parseInt(s); + } catch (NumberFormatException ignored) { + limit = 5; + } + } + } + + authorizer.authorize( + securityContext, + new OperationContext(Entity.INGESTION_PIPELINE, MetadataOperation.VIEW_BASIC), + new ResourceContext<>(Entity.INGESTION_PIPELINE)); + + LOG.info("Getting pipeline status for FQN: {}, limit: {}", fqn, limit); + + IngestionPipelineRepository repository = + (IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE); + + IngestionPipeline pipeline = + (IngestionPipeline) Entity.getEntityByName(Entity.INGESTION_PIPELINE, fqn, "", null); + + var latestStatus = repository.getLatestPipelineStatus(pipeline); + var recentRuns = repository.listPipelineStatus(fqn, null, null, limit); + + Map response = new HashMap<>(); + response.put("fqn", fqn); + response.put("pipelineName", pipeline.getName()); + response.put("pipelineType", pipeline.getPipelineType()); + response.put("enabled", pipeline.getEnabled()); + response.put("deployed", pipeline.getDeployed()); + response.put("latestStatus", latestStatus != null ? JsonUtils.getMap(latestStatus) : null); + response.put("recentRuns", JsonUtils.getMap(recentRuns)); + return response; + } + + @Override + public Map execute( + Authorizer authorizer, + Limits limits, + CatalogSecurityContext securityContext, + Map params) + throws IOException { + throw new UnsupportedOperationException( + "GetPipelineStatusTool does not require limit validation."); + } +} diff --git a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/ListIngestionPipelinesTool.java b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/ListIngestionPipelinesTool.java new file mode 100644 index 000000000000..83c45e63fee0 --- /dev/null +++ b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/ListIngestionPipelinesTool.java @@ -0,0 +1,105 @@ +package org.openmetadata.mcp.tools; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.openmetadata.schema.type.Include; +import org.openmetadata.schema.type.MetadataOperation; +import org.openmetadata.schema.utils.JsonUtils; +import org.openmetadata.service.Entity; +import org.openmetadata.service.jdbi3.IngestionPipelineRepository; +import org.openmetadata.service.jdbi3.ListFilter; +import org.openmetadata.service.limits.Limits; +import org.openmetadata.service.security.Authorizer; +import org.openmetadata.service.security.auth.CatalogSecurityContext; +import org.openmetadata.service.security.policyevaluator.OperationContext; +import org.openmetadata.service.security.policyevaluator.ResourceContext; + +@Slf4j +public class ListIngestionPipelinesTool implements McpTool { + + private static final List EXCLUDE_FIELDS = + List.of( + "version", + "updatedAt", + "updatedBy", + "changeDescription", + "sourceHash", + "openMetadataServerConnection", + "airflowConfig"); + + @Override + public Map execute( + Authorizer authorizer, CatalogSecurityContext securityContext, Map params) + throws IOException { + String service = params.containsKey("service") ? (String) params.get("service") : null; + String pipelineType = + params.containsKey("pipelineType") ? (String) params.get("pipelineType") : null; + String after = params.containsKey("after") ? (String) params.get("after") : null; + int limit = 10; + if (params.containsKey("limit")) { + Object limitObj = params.get("limit"); + if (limitObj instanceof Number number) { + limit = number.intValue(); + } else if (limitObj instanceof String s) { + try { + limit = Integer.parseInt(s); + } catch (NumberFormatException ignored) { + limit = 10; + } + } + } + + authorizer.authorize( + securityContext, + new OperationContext(Entity.INGESTION_PIPELINE, MetadataOperation.VIEW_BASIC), + new ResourceContext<>(Entity.INGESTION_PIPELINE)); + + LOG.info( + "Listing ingestion pipelines — service: {}, pipelineType: {}, limit: {}", + service, + pipelineType, + limit); + + IngestionPipelineRepository repository = + (IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE); + + ListFilter filter = new ListFilter(Include.NON_DELETED); + if (service != null && !service.isBlank()) { + filter.addQueryParam("service", service); + } + if (pipelineType != null && !pipelineType.isBlank()) { + filter.addQueryParam("pipelineType", pipelineType); + } + + var resultList = + repository.listAfter(null, repository.getFields("sourceConfig,pipelineType"), filter, limit, after); + + Map response = JsonUtils.getMap(resultList); + // Strip verbose nested fields from each pipeline entry + if (response.get("data") instanceof List pipelines) { + pipelines.forEach( + p -> { + if (p instanceof Map pipeline) { + @SuppressWarnings("unchecked") + Map m = (Map) pipeline; + EXCLUDE_FIELDS.forEach(m::remove); + } + }); + } + return response; + } + + @Override + public Map execute( + Authorizer authorizer, + Limits limits, + CatalogSecurityContext securityContext, + Map params) + throws IOException { + throw new UnsupportedOperationException( + "ListIngestionPipelinesTool does not require limit validation."); + } +} diff --git a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/TriggerIngestionPipelineTool.java b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/TriggerIngestionPipelineTool.java new file mode 100644 index 000000000000..fb079b70249c --- /dev/null +++ b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/TriggerIngestionPipelineTool.java @@ -0,0 +1,79 @@ +package org.openmetadata.mcp.tools; + +import java.io.IOException; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.openmetadata.mcp.McpApplicationContext; +import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline; +import org.openmetadata.schema.type.EntityReference; +import org.openmetadata.schema.type.MetadataOperation; +import org.openmetadata.schema.utils.JsonUtils; +import org.openmetadata.sdk.PipelineServiceClientInterface; +import org.openmetadata.service.Entity; +import org.openmetadata.service.clients.pipeline.PipelineServiceClientFactory; +import org.openmetadata.service.limits.Limits; +import org.openmetadata.service.security.Authorizer; +import org.openmetadata.service.security.auth.CatalogSecurityContext; +import org.openmetadata.service.security.policyevaluator.OperationContext; +import org.openmetadata.service.security.policyevaluator.ResourceContext; +import org.openmetadata.service.util.OpenMetadataConnectionBuilder; + +@Slf4j +public class TriggerIngestionPipelineTool implements McpTool { + + @Override + public Map execute( + Authorizer authorizer, CatalogSecurityContext securityContext, Map params) + throws IOException { + String fqn = (String) params.get("fqn"); + if (fqn == null || fqn.isBlank()) { + throw new IllegalArgumentException("Parameter 'fqn' is required"); + } + + authorizer.authorize( + securityContext, + new OperationContext(Entity.INGESTION_PIPELINE, MetadataOperation.EDIT_ALL), + new ResourceContext<>(Entity.INGESTION_PIPELINE)); + + PipelineServiceClientInterface pipelineServiceClient = + PipelineServiceClientFactory.createPipelineServiceClient(null); + if (pipelineServiceClient == null) { + return Map.of( + "error", + "Pipeline service client is not configured. Ensure the ingestion infrastructure is set up.", + "fqn", + fqn); + } + + LOG.info("Triggering ingestion pipeline: {}", fqn); + + IngestionPipeline pipeline = + (IngestionPipeline) + Entity.getEntityByName(Entity.INGESTION_PIPELINE, fqn, "*", null); + + // Set the OpenMetadata server connection so the pipeline can call back home + if (McpApplicationContext.getConfig() != null) { + pipeline.setOpenMetadataServerConnection( + new OpenMetadataConnectionBuilder(McpApplicationContext.getConfig()).build()); + } + + EntityReference serviceRef = pipeline.getService(); + Object service = Entity.getEntity(serviceRef, "ingestionRunner", null); + + var response = pipelineServiceClient.runPipeline(pipeline, service); + + LOG.info("Trigger response for pipeline {}: {}", fqn, response); + return JsonUtils.getMap(response); + } + + @Override + public Map execute( + Authorizer authorizer, + Limits limits, + CatalogSecurityContext securityContext, + Map params) + throws IOException { + throw new UnsupportedOperationException( + "TriggerIngestionPipelineTool does not require limit validation."); + } +} diff --git a/openmetadata-mcp/src/main/resources/json/data/mcp/tools.json b/openmetadata-mcp/src/main/resources/json/data/mcp/tools.json index d88b67d37ac8..ed10b6799ffb 100644 --- a/openmetadata-mcp/src/main/resources/json/data/mcp/tools.json +++ b/openmetadata-mcp/src/main/resources/json/data/mcp/tools.json @@ -584,6 +584,65 @@ "entityType" ] } + }, + { + "name": "list_ingestion_pipelines", + "description": "List ingestion pipelines configured in OpenMetadata. Optionally filter by service name or pipeline type (metadata, usage, lineage, profiler, dataInsight, etc.). Returns pipeline names, FQNs, types, and enabled/deployed status. Use 'get_pipeline_status' to check the latest run status of a specific pipeline.", + "parameters": { + "type": "object", + "properties": { + "service": { + "type": "string", + "description": "Filter pipelines by service name (e.g., 'bigquery_prod', 'snowflake_dw'). Returns only pipelines belonging to this service." + }, + "pipelineType": { + "type": "string", + "description": "Filter by pipeline type. Values: metadata, usage, lineage, profiler, dataInsight, elasticSearchReindex, dbt, application. Leave empty to return all types." + }, + "limit": { + "type": "integer", + "description": "Maximum number of pipelines to return. Default: 10. Max recommended: 50.", + "default": 10 + }, + "after": { + "type": "string", + "description": "Pagination cursor. Pass the 'after' value from a previous response to retrieve the next page." + } + } + } + }, + { + "name": "get_pipeline_status", + "description": "Get execution status of a specific ingestion pipeline. Returns the latest run status (success, failed, running, queued) and recent run history. Use 'list_ingestion_pipelines' to find a pipeline's FQN first.", + "parameters": { + "type": "object", + "properties": { + "fqn": { + "type": "string", + "description": "Fully qualified name of the ingestion pipeline (e.g., 'bigquery_prod.metadata_ingestion'). Use 'list_ingestion_pipelines' or 'search_metadata' to find it." + }, + "limit": { + "type": "integer", + "description": "Number of recent pipeline runs to return. Default: 5.", + "default": 5 + } + }, + "required": ["fqn"] + } + }, + { + "name": "trigger_ingestion_pipeline", + "description": "Trigger an immediate run of an ingestion pipeline. Use this to kick off metadata, lineage, or profiling ingestion on demand without waiting for the scheduled run. Requires the pipeline to be deployed. Check the pipeline status with 'get_pipeline_status' after triggering to monitor progress.", + "parameters": { + "type": "object", + "properties": { + "fqn": { + "type": "string", + "description": "Fully qualified name of the ingestion pipeline to trigger (e.g., 'bigquery_prod.metadata_ingestion'). Use 'list_ingestion_pipelines' to find the FQN." + } + }, + "required": ["fqn"] + } } ] } From f2e4bf42302bb0eb6f0251a0aa5e5d37cdc5fe9a Mon Sep 17 00:00:00 2001 From: Keerthivasan Venkitajalam Date: Sun, 26 Apr 2026 22:18:14 +0530 Subject: [PATCH 2/4] =?UTF-8?q?fix(mcp):=20address=20review=20feedback=20?= =?UTF-8?q?=E2=80=94=20extract=20parseLimit,=20add=20deployed=20guard,=20a?= =?UTF-8?q?pply=20google-java-format?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Extract duplicate limit-parsing logic to CommonUtils.parseLimit() - Add deployed check in TriggerIngestionPipelineTool before attempting runPipeline - Reformat all new files with google-java-format (GOOGLE style) to pass spotless check --- .../java/org/openmetadata/mcp/McpServer.java | 10 +++---- .../openmetadata/mcp/tools/CommonUtils.java | 18 +++++++++++++ .../mcp/tools/DefaultToolContext.java | 4 +-- .../mcp/tools/GetPipelineStatusTool.java | 15 +---------- .../mcp/tools/ListIngestionPipelinesTool.java | 26 +++++-------------- .../tools/TriggerIngestionPipelineTool.java | 22 +++++++++++----- 6 files changed, 46 insertions(+), 49 deletions(-) diff --git a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/McpServer.java b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/McpServer.java index ead942f3e383..d02fd20edbb8 100644 --- a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/McpServer.java +++ b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/McpServer.java @@ -287,9 +287,7 @@ private List getAllowedOriginsFromConfig() { return List.of(); } - /** - * Get base URL from system settings, with fallback to localhost for development. - */ + /** Get base URL from system settings, with fallback to localhost for development. */ private String getBaseUrlFromSettings() { try { org.openmetadata.service.jdbi3.SystemRepository systemRepository = @@ -314,9 +312,9 @@ private String getBaseUrlFromSettings() { LOG.warn("Could not get instance URL from SystemSettings, using fallback", e); } LOG.error( - "No base URL configured in MCP settings or system settings. " - + "Falling back to http://localhost:8585 — this is only suitable for local development. " - + "Configure a proper base URL for production deployments."); + "No base URL configured in MCP settings or system settings. Falling back to" + + " http://localhost:8585 — this is only suitable for local development. Configure a" + + " proper base URL for production deployments."); return "http://localhost:8585"; } } diff --git a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/CommonUtils.java b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/CommonUtils.java index a2ee6aed95d0..c37db8ff5620 100644 --- a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/CommonUtils.java +++ b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/CommonUtils.java @@ -53,4 +53,22 @@ public static List getTeamsOrUsers(Object teamsOrUsersParam) { } return teamsOrUsers; } + + public static int parseLimit(Map params, String key, int defaultValue) { + if (!params.containsKey(key)) { + return defaultValue; + } + Object v = params.get(key); + if (v instanceof Number n) { + return n.intValue(); + } + if (v instanceof String s) { + try { + return Integer.parseInt(s); + } catch (NumberFormatException e) { + LOG.warn("Invalid value for '{}': '{}', using default {}", key, s, defaultValue); + } + } + return defaultValue; + } } diff --git a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/DefaultToolContext.java b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/DefaultToolContext.java index de96036ef334..44f80cd8d484 100644 --- a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/DefaultToolContext.java +++ b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/DefaultToolContext.java @@ -17,8 +17,8 @@ public class DefaultToolContext { public DefaultToolContext() {} /** - * Loads tool definitions from a JSON file located at the specified path. - * The JSON file should contain an array of tool definitions under the "tools" key. + * Loads tool definitions from a JSON file located at the specified path. The JSON file should + * contain an array of tool definitions under the "tools" key. * * @return List of McpSchema.Tool objects loaded from the JSON file. */ diff --git a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/GetPipelineStatusTool.java b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/GetPipelineStatusTool.java index 7db49476c907..59b395f6b67e 100644 --- a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/GetPipelineStatusTool.java +++ b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/GetPipelineStatusTool.java @@ -26,20 +26,7 @@ public Map execute( if (fqn == null || fqn.isBlank()) { throw new IllegalArgumentException("Parameter 'fqn' is required"); } - - int limit = 5; - if (params.containsKey("limit")) { - Object limitObj = params.get("limit"); - if (limitObj instanceof Number number) { - limit = number.intValue(); - } else if (limitObj instanceof String s) { - try { - limit = Integer.parseInt(s); - } catch (NumberFormatException ignored) { - limit = 5; - } - } - } + int limit = CommonUtils.parseLimit(params, "limit", 5); authorizer.authorize( securityContext, diff --git a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/ListIngestionPipelinesTool.java b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/ListIngestionPipelinesTool.java index 83c45e63fee0..7b4352551e7e 100644 --- a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/ListIngestionPipelinesTool.java +++ b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/ListIngestionPipelinesTool.java @@ -1,7 +1,6 @@ package org.openmetadata.mcp.tools; import java.io.IOException; -import java.util.HashMap; import java.util.List; import java.util.Map; import lombok.extern.slf4j.Slf4j; @@ -34,23 +33,10 @@ public class ListIngestionPipelinesTool implements McpTool { public Map execute( Authorizer authorizer, CatalogSecurityContext securityContext, Map params) throws IOException { - String service = params.containsKey("service") ? (String) params.get("service") : null; - String pipelineType = - params.containsKey("pipelineType") ? (String) params.get("pipelineType") : null; - String after = params.containsKey("after") ? (String) params.get("after") : null; - int limit = 10; - if (params.containsKey("limit")) { - Object limitObj = params.get("limit"); - if (limitObj instanceof Number number) { - limit = number.intValue(); - } else if (limitObj instanceof String s) { - try { - limit = Integer.parseInt(s); - } catch (NumberFormatException ignored) { - limit = 10; - } - } - } + String service = (String) params.get("service"); + String pipelineType = (String) params.get("pipelineType"); + String after = (String) params.get("after"); + int limit = CommonUtils.parseLimit(params, "limit", 10); authorizer.authorize( securityContext, @@ -75,10 +61,10 @@ public Map execute( } var resultList = - repository.listAfter(null, repository.getFields("sourceConfig,pipelineType"), filter, limit, after); + repository.listAfter( + null, repository.getFields("sourceConfig,pipelineType"), filter, limit, after); Map response = JsonUtils.getMap(resultList); - // Strip verbose nested fields from each pipeline entry if (response.get("data") instanceof List pipelines) { pipelines.forEach( p -> { diff --git a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/TriggerIngestionPipelineTool.java b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/TriggerIngestionPipelineTool.java index fb079b70249c..3efba487a51e 100644 --- a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/TriggerIngestionPipelineTool.java +++ b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/TriggerIngestionPipelineTool.java @@ -40,18 +40,27 @@ public Map execute( if (pipelineServiceClient == null) { return Map.of( "error", - "Pipeline service client is not configured. Ensure the ingestion infrastructure is set up.", + "Pipeline service client is not configured. Ensure the ingestion infrastructure is" + + " set up.", "fqn", fqn); } - LOG.info("Triggering ingestion pipeline: {}", fqn); - IngestionPipeline pipeline = - (IngestionPipeline) - Entity.getEntityByName(Entity.INGESTION_PIPELINE, fqn, "*", null); + (IngestionPipeline) Entity.getEntityByName(Entity.INGESTION_PIPELINE, fqn, "*", null); + + if (!Boolean.TRUE.equals(pipeline.getDeployed())) { + return Map.of( + "error", + "Pipeline '" + fqn + "' is not deployed. Deploy it first before triggering a run.", + "fqn", + fqn, + "deployed", + false); + } + + LOG.info("Triggering ingestion pipeline: {}", fqn); - // Set the OpenMetadata server connection so the pipeline can call back home if (McpApplicationContext.getConfig() != null) { pipeline.setOpenMetadataServerConnection( new OpenMetadataConnectionBuilder(McpApplicationContext.getConfig()).build()); @@ -61,7 +70,6 @@ public Map execute( Object service = Entity.getEntity(serviceRef, "ingestionRunner", null); var response = pipelineServiceClient.runPipeline(pipeline, service); - LOG.info("Trigger response for pipeline {}: {}", fqn, response); return JsonUtils.getMap(response); } From 6b33d439f9f3123179bb84c0eeeebc3598973803 Mon Sep 17 00:00:00 2001 From: Keerthivasan Venkitajalam Date: Mon, 27 Apr 2026 13:43:45 +0530 Subject: [PATCH 3/4] refactor(mcp): extract private helpers and add named tool constants - Extract private helper methods in all three tool classes so execute() stays well under 15 lines (list: 8, status: 8, trigger: 13) - Add TOOL_LIST_INGESTION_PIPELINES, TOOL_GET_PIPELINE_STATUS, TOOL_TRIGGER_INGESTION_PIPELINE constants in DefaultToolContext; replace raw string literals in switch cases with those constants - google-java-format applied (GOOGLE style) --- .../mcp/tools/DefaultToolContext.java | 10 ++- .../mcp/tools/GetPipelineStatusTool.java | 25 ++++--- .../mcp/tools/ListIngestionPipelinesTool.java | 50 +++++++------ .../tools/TriggerIngestionPipelineTool.java | 73 +++++++++++-------- 4 files changed, 95 insertions(+), 63 deletions(-) diff --git a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/DefaultToolContext.java b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/DefaultToolContext.java index 44f80cd8d484..0ee02aea6b72 100644 --- a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/DefaultToolContext.java +++ b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/DefaultToolContext.java @@ -14,6 +14,10 @@ @Slf4j public class DefaultToolContext { + static final String TOOL_LIST_INGESTION_PIPELINES = "list_ingestion_pipelines"; + static final String TOOL_GET_PIPELINE_STATUS = "get_pipeline_status"; + static final String TOOL_TRIGGER_INGESTION_PIPELINE = "trigger_ingestion_pipeline"; + public DefaultToolContext() {} /** @@ -83,13 +87,13 @@ public McpSchema.CallToolResult callTool( case "create_metric": result = new CreateMetricTool().execute(authorizer, limits, securityContext, params); break; - case "list_ingestion_pipelines": + case TOOL_LIST_INGESTION_PIPELINES: result = new ListIngestionPipelinesTool().execute(authorizer, securityContext, params); break; - case "get_pipeline_status": + case TOOL_GET_PIPELINE_STATUS: result = new GetPipelineStatusTool().execute(authorizer, securityContext, params); break; - case "trigger_ingestion_pipeline": + case TOOL_TRIGGER_INGESTION_PIPELINE: result = new TriggerIngestionPipelineTool().execute(authorizer, securityContext, params); break; default: diff --git a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/GetPipelineStatusTool.java b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/GetPipelineStatusTool.java index 59b395f6b67e..999934b9d763 100644 --- a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/GetPipelineStatusTool.java +++ b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/GetPipelineStatusTool.java @@ -22,28 +22,35 @@ public class GetPipelineStatusTool implements McpTool { public Map execute( Authorizer authorizer, CatalogSecurityContext securityContext, Map params) throws IOException { + String fqn = requireFqn(params); + int limit = CommonUtils.parseLimit(params, "limit", 5); + authorize(authorizer, securityContext); + LOG.info("Getting pipeline status for FQN: {}, limit: {}", fqn, limit); + return buildStatusResponse(fqn, limit); + } + + private static String requireFqn(Map params) { String fqn = (String) params.get("fqn"); if (fqn == null || fqn.isBlank()) { throw new IllegalArgumentException("Parameter 'fqn' is required"); } - int limit = CommonUtils.parseLimit(params, "limit", 5); + return fqn; + } + private static void authorize(Authorizer authorizer, CatalogSecurityContext securityContext) { authorizer.authorize( securityContext, new OperationContext(Entity.INGESTION_PIPELINE, MetadataOperation.VIEW_BASIC), new ResourceContext<>(Entity.INGESTION_PIPELINE)); + } - LOG.info("Getting pipeline status for FQN: {}, limit: {}", fqn, limit); - - IngestionPipelineRepository repository = + private static Map buildStatusResponse(String fqn, int limit) throws IOException { + IngestionPipelineRepository repo = (IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE); - IngestionPipeline pipeline = (IngestionPipeline) Entity.getEntityByName(Entity.INGESTION_PIPELINE, fqn, "", null); - - var latestStatus = repository.getLatestPipelineStatus(pipeline); - var recentRuns = repository.listPipelineStatus(fqn, null, null, limit); - + var latestStatus = repo.getLatestPipelineStatus(pipeline); + var recentRuns = repo.listPipelineStatus(fqn, null, null, limit); Map response = new HashMap<>(); response.put("fqn", fqn); response.put("pipelineName", pipeline.getName()); diff --git a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/ListIngestionPipelinesTool.java b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/ListIngestionPipelinesTool.java index 7b4352551e7e..0f7fbe95f302 100644 --- a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/ListIngestionPipelinesTool.java +++ b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/ListIngestionPipelinesTool.java @@ -33,38 +33,47 @@ public class ListIngestionPipelinesTool implements McpTool { public Map execute( Authorizer authorizer, CatalogSecurityContext securityContext, Map params) throws IOException { - String service = (String) params.get("service"); - String pipelineType = (String) params.get("pipelineType"); - String after = (String) params.get("after"); + authorize(authorizer, securityContext); int limit = CommonUtils.parseLimit(params, "limit", 10); + LOG.info( + "Listing ingestion pipelines — service: {}, pipelineType: {}, limit: {}", + params.get("service"), + params.get("pipelineType"), + limit); + return fetchAndClean(params, limit); + } + private static void authorize(Authorizer authorizer, CatalogSecurityContext securityContext) { authorizer.authorize( securityContext, new OperationContext(Entity.INGESTION_PIPELINE, MetadataOperation.VIEW_BASIC), new ResourceContext<>(Entity.INGESTION_PIPELINE)); + } - LOG.info( - "Listing ingestion pipelines — service: {}, pipelineType: {}, limit: {}", - service, - pipelineType, - limit); - - IngestionPipelineRepository repository = + private static Map fetchAndClean(Map params, int limit) + throws IOException { + IngestionPipelineRepository repo = (IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE); + ListFilter filter = buildFilter(params); + String after = (String) params.get("after"); + var resultList = + repo.listAfter(null, repo.getFields("sourceConfig,pipelineType"), filter, limit, after); + Map response = JsonUtils.getMap(resultList); + stripVerboseFields(response); + return response; + } + private static ListFilter buildFilter(Map params) { ListFilter filter = new ListFilter(Include.NON_DELETED); - if (service != null && !service.isBlank()) { - filter.addQueryParam("service", service); - } - if (pipelineType != null && !pipelineType.isBlank()) { + String service = (String) params.get("service"); + String pipelineType = (String) params.get("pipelineType"); + if (service != null && !service.isBlank()) filter.addQueryParam("service", service); + if (pipelineType != null && !pipelineType.isBlank()) filter.addQueryParam("pipelineType", pipelineType); - } - - var resultList = - repository.listAfter( - null, repository.getFields("sourceConfig,pipelineType"), filter, limit, after); + return filter; + } - Map response = JsonUtils.getMap(resultList); + private static void stripVerboseFields(Map response) { if (response.get("data") instanceof List pipelines) { pipelines.forEach( p -> { @@ -75,7 +84,6 @@ public Map execute( } }); } - return response; } @Override diff --git a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/TriggerIngestionPipelineTool.java b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/TriggerIngestionPipelineTool.java index 3efba487a51e..c8cdda499d04 100644 --- a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/TriggerIngestionPipelineTool.java +++ b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/TriggerIngestionPipelineTool.java @@ -5,7 +5,6 @@ import lombok.extern.slf4j.Slf4j; import org.openmetadata.mcp.McpApplicationContext; import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline; -import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.MetadataOperation; import org.openmetadata.schema.utils.JsonUtils; import org.openmetadata.sdk.PipelineServiceClientInterface; @@ -25,53 +24,67 @@ public class TriggerIngestionPipelineTool implements McpTool { public Map execute( Authorizer authorizer, CatalogSecurityContext securityContext, Map params) throws IOException { + String fqn = requireFqn(params); + authorize(authorizer, securityContext); + PipelineServiceClientInterface client = resolveClient(fqn); + if (client == null) return clientNotConfiguredError(fqn); + IngestionPipeline pipeline = fetchPipeline(fqn); + if (!Boolean.TRUE.equals(pipeline.getDeployed())) return notDeployedError(fqn); + LOG.info("Triggering ingestion pipeline: {}", fqn); + setupServerConnection(pipeline); + Object service = Entity.getEntity(pipeline.getService(), "ingestionRunner", null); + var response = client.runPipeline(pipeline, service); + LOG.info("Trigger response for pipeline {}: {}", fqn, response); + return JsonUtils.getMap(response); + } + + private static String requireFqn(Map params) { String fqn = (String) params.get("fqn"); if (fqn == null || fqn.isBlank()) { throw new IllegalArgumentException("Parameter 'fqn' is required"); } + return fqn; + } + private static void authorize(Authorizer authorizer, CatalogSecurityContext securityContext) { authorizer.authorize( securityContext, new OperationContext(Entity.INGESTION_PIPELINE, MetadataOperation.EDIT_ALL), new ResourceContext<>(Entity.INGESTION_PIPELINE)); + } - PipelineServiceClientInterface pipelineServiceClient = - PipelineServiceClientFactory.createPipelineServiceClient(null); - if (pipelineServiceClient == null) { - return Map.of( - "error", - "Pipeline service client is not configured. Ensure the ingestion infrastructure is" - + " set up.", - "fqn", - fqn); - } + private static PipelineServiceClientInterface resolveClient(String fqn) { + return PipelineServiceClientFactory.createPipelineServiceClient(null); + } - IngestionPipeline pipeline = - (IngestionPipeline) Entity.getEntityByName(Entity.INGESTION_PIPELINE, fqn, "*", null); + private static Map clientNotConfiguredError(String fqn) { + return Map.of( + "error", + "Pipeline service client is not configured." + + " Ensure the ingestion infrastructure is set up.", + "fqn", + fqn); + } - if (!Boolean.TRUE.equals(pipeline.getDeployed())) { - return Map.of( - "error", - "Pipeline '" + fqn + "' is not deployed. Deploy it first before triggering a run.", - "fqn", - fqn, - "deployed", - false); - } + private static Map notDeployedError(String fqn) { + return Map.of( + "error", + "Pipeline '" + fqn + "' is not deployed. Deploy it first before triggering a run.", + "fqn", + fqn, + "deployed", + false); + } - LOG.info("Triggering ingestion pipeline: {}", fqn); + private static IngestionPipeline fetchPipeline(String fqn) throws IOException { + return (IngestionPipeline) Entity.getEntityByName(Entity.INGESTION_PIPELINE, fqn, "*", null); + } + private static void setupServerConnection(IngestionPipeline pipeline) { if (McpApplicationContext.getConfig() != null) { pipeline.setOpenMetadataServerConnection( new OpenMetadataConnectionBuilder(McpApplicationContext.getConfig()).build()); } - - EntityReference serviceRef = pipeline.getService(); - Object service = Entity.getEntity(serviceRef, "ingestionRunner", null); - - var response = pipelineServiceClient.runPipeline(pipeline, service); - LOG.info("Trigger response for pipeline {}: {}", fqn, response); - return JsonUtils.getMap(response); } @Override From 8136e9c92c47388dd0f4b67ec014c24ab361db9a Mon Sep 17 00:00:00 2001 From: Keerthivasan Venkitajalam Date: Mon, 27 Apr 2026 13:59:45 +0530 Subject: [PATCH 4/4] fix(mcp): remove unused parameter from resolveClient helper --- .../openmetadata/mcp/tools/TriggerIngestionPipelineTool.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/TriggerIngestionPipelineTool.java b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/TriggerIngestionPipelineTool.java index c8cdda499d04..897297232781 100644 --- a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/TriggerIngestionPipelineTool.java +++ b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/TriggerIngestionPipelineTool.java @@ -26,7 +26,7 @@ public Map execute( throws IOException { String fqn = requireFqn(params); authorize(authorizer, securityContext); - PipelineServiceClientInterface client = resolveClient(fqn); + PipelineServiceClientInterface client = resolveClient(); if (client == null) return clientNotConfiguredError(fqn); IngestionPipeline pipeline = fetchPipeline(fqn); if (!Boolean.TRUE.equals(pipeline.getDeployed())) return notDeployedError(fqn); @@ -53,7 +53,7 @@ private static void authorize(Authorizer authorizer, CatalogSecurityContext secu new ResourceContext<>(Entity.INGESTION_PIPELINE)); } - private static PipelineServiceClientInterface resolveClient(String fqn) { + private static PipelineServiceClientInterface resolveClient() { return PipelineServiceClientFactory.createPipelineServiceClient(null); }