From ecfa8dec438c32ba53430e692c6c3795f8ff53c8 Mon Sep 17 00:00:00 2001 From: Aditya Puri Date: Sun, 26 Apr 2026 13:12:06 +0000 Subject: [PATCH 1/2] feat(mcp): add run_ingestion, get_ingestion_status, create_alert MCP tools Adds three new MCP tools to the OM MCP server, closing part of #26609: run_ingestion triggers an ingestion pipeline by FQN get_ingestion_status returns the most recent N pipeline runs create_alert registers an EventSubscription that posts to a webhook on ingestionPipeline failure Each is implemented as a McpTool, registered in DefaultToolContext.callTool and declared in tools.json so Claude Desktop can discover them. No new REST endpoints; no auth/authz changes; no LLM calls inside tools. run_ingestion currently reads the @Setter-only pipelineServiceClient field on IngestionPipelineRepository via reflection -- a small isolated workaround pending a follow-up PR that exposes runIngestion() publicly. create_alert is opinionated for v1: resourceType=ingestionPipeline only, eventType=pipelineFailed only, destination=webhook only. Extending to multi-event/multi-destination is a follow-up. Tests: 10 JUnit tests across the three tools. --- .../mcp/tools/CreateAlertTool.java | 176 ++++++++++++++++++ .../mcp/tools/DefaultToolContext.java | 9 + .../mcp/tools/GetIngestionStatusTool.java | 131 +++++++++++++ .../mcp/tools/RunIngestionTool.java | 140 ++++++++++++++ .../main/resources/json/data/mcp/tools.json | 82 ++++++++ .../mcp/tools/CreateAlertToolTest.java | 89 +++++++++ .../mcp/tools/GetIngestionStatusToolTest.java | 59 ++++++ .../mcp/tools/RunIngestionToolTest.java | 47 +++++ 8 files changed, 733 insertions(+) create mode 100644 openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/CreateAlertTool.java create mode 100644 openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/GetIngestionStatusTool.java create mode 100644 openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/RunIngestionTool.java create mode 100644 openmetadata-mcp/src/test/java/org/openmetadata/mcp/tools/CreateAlertToolTest.java create mode 100644 openmetadata-mcp/src/test/java/org/openmetadata/mcp/tools/GetIngestionStatusToolTest.java create mode 100644 openmetadata-mcp/src/test/java/org/openmetadata/mcp/tools/RunIngestionToolTest.java diff --git a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/CreateAlertTool.java b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/CreateAlertTool.java new file mode 100644 index 000000000000..347b1c214297 --- /dev/null +++ b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/CreateAlertTool.java @@ -0,0 +1,176 @@ +/* + * Copyright 2025 Collate + * Licensed under the Collate Community License, Version 1.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.openmetadata.mcp.tools; + +import static org.openmetadata.schema.type.MetadataOperation.CREATE; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import lombok.extern.slf4j.Slf4j; +import org.openmetadata.schema.api.events.CreateEventSubscription; +import org.openmetadata.schema.api.events.CreateEventSubscription.AlertType; +import org.openmetadata.schema.entity.events.EventSubscription; +import org.openmetadata.schema.entity.events.SubscriptionDestination; +import org.openmetadata.schema.entity.events.SubscriptionDestination.SubscriptionCategory; +import org.openmetadata.schema.entity.events.SubscriptionDestination.SubscriptionType; +import org.openmetadata.schema.entity.events.TriggerConfig; +import org.openmetadata.schema.entity.events.TriggerConfig.TriggerType; +import org.openmetadata.schema.utils.JsonUtils; +import org.openmetadata.service.Entity; +import org.openmetadata.service.jdbi3.EventSubscriptionRepository; +import org.openmetadata.service.limits.Limits; +import org.openmetadata.service.resources.events.subscription.EventSubscriptionMapper; +import org.openmetadata.service.security.Authorizer; +import org.openmetadata.service.security.ImpersonationContext; +import org.openmetadata.service.security.auth.CatalogSecurityContext; +import org.openmetadata.service.security.policyevaluator.CreateResourceContext; +import org.openmetadata.service.security.policyevaluator.OperationContext; +import org.openmetadata.service.util.RestUtil; + +/** + * MCP tool that creates an OpenMetadata EventSubscription (alert). v1 supports a single, opinionated + * shape: webhook destination + ingestion-pipeline failure trigger. Multi-destination + multi-event + * variants are deferred to follow-up PRs (see issue #26609). + */ +@Slf4j +public class CreateAlertTool implements McpTool { + + private static final String SUPPORTED_RESOURCE_TYPE = "ingestionPipeline"; + private static final String SUPPORTED_EVENT_TYPE = "pipelineFailed"; + + private static final EventSubscriptionMapper MAPPER = new EventSubscriptionMapper(); + + @Override + public Map execute( + Authorizer authorizer, CatalogSecurityContext securityContext, Map params) { + throw new UnsupportedOperationException("CreateAlertTool requires limit validation."); + } + + @Override + public Map execute( + Authorizer authorizer, + Limits limits, + CatalogSecurityContext securityContext, + Map params) { + String alertName = requireString(params, "alertName"); + if (alertName == null) { + return errorMap("alertName is required"); + } + + String resourceType = requireString(params, "resourceType"); + if (!SUPPORTED_RESOURCE_TYPE.equals(resourceType)) { + return errorMap("v1 supports resourceType=" + SUPPORTED_RESOURCE_TYPE + " only"); + } + + String resourceFqn = requireString(params, "resourceFqn"); + if (resourceFqn == null) { + return errorMap("resourceFqn is required"); + } + + String eventType = requireString(params, "eventType"); + if (!SUPPORTED_EVENT_TYPE.equals(eventType)) { + return errorMap("v1 supports eventType=" + SUPPORTED_EVENT_TYPE + " only"); + } + + String webhookUrl = requireString(params, "webhookUrl"); + if (webhookUrl == null || !isValidHttpUrl(webhookUrl)) { + return errorMap("webhookUrl must be a valid http(s) URL"); + } + + String description = optionalString(params, "description"); + + OperationContext operationContext = new OperationContext(Entity.EVENT_SUBSCRIPTION, CREATE); + String userName = securityContext.getUserPrincipal().getName(); + + CreateEventSubscription create = buildRequest(alertName, description, webhookUrl); + EventSubscription entity = MAPPER.createToEntity(create, userName); + + CreateResourceContext createResourceContext = + new CreateResourceContext<>(Entity.EVENT_SUBSCRIPTION, entity); + limits.enforceLimits(securityContext, createResourceContext, operationContext); + authorizer.authorize(securityContext, operationContext, createResourceContext); + + EventSubscriptionRepository repo = + (EventSubscriptionRepository) Entity.getEntityRepository(Entity.EVENT_SUBSCRIPTION); + repo.prepareInternal(entity, false); + + String impersonatedBy = ImpersonationContext.getImpersonatedBy(); + RestUtil.PutResponse response = + repo.createOrUpdate(null, entity, userName, impersonatedBy); + + Map result = new HashMap<>(); + EventSubscription created = response.getEntity(); + result.put("alertId", created.getId() != null ? created.getId().toString() : null); + result.put("alertName", created.getName()); + result.put("resourceFqn", resourceFqn); + result.put("eventType", eventType); + result.put("webhookUrl", webhookUrl); + result.put("enabled", Boolean.TRUE.equals(created.getEnabled())); + result.put("createdAt", created.getUpdatedAt()); + return result; + } + + private static CreateEventSubscription buildRequest( + String name, String description, String webhookUrl) { + CreateEventSubscription r = new CreateEventSubscription(); + r.setName(name); + if (description != null) { + r.setDescription(description); + } + r.setAlertType(AlertType.NOTIFICATION); + r.setResources(List.of(SUPPORTED_RESOURCE_TYPE)); + r.setEnabled(true); + r.setBatchSize(10); + r.setRetries(3); + r.setPollInterval(10); + + TriggerConfig trigger = new TriggerConfig(); + trigger.setTriggerType(TriggerType.REAL_TIME); + r.setTrigger(trigger); + + SubscriptionDestination dest = new SubscriptionDestination(); + dest.setId(UUID.randomUUID()); + dest.setCategory(SubscriptionCategory.EXTERNAL); + dest.setType(SubscriptionType.WEBHOOK); + Map config = new HashMap<>(); + config.put("endpoint", webhookUrl); + config.put("secretKey", ""); + config.put("headers", new HashMap<>()); + dest.setConfig(JsonUtils.convertValue(config, Object.class)); + + r.setDestinations(List.of(dest)); + return r; + } + + private static boolean isValidHttpUrl(String s) { + return s != null && (s.startsWith("http://") || s.startsWith("https://")); + } + + private static String requireString(Map params, String key) { + Object v = params.get(key); + return (v == null || v.toString().isBlank()) ? null : v.toString().trim(); + } + + private static String optionalString(Map params, String key) { + Object v = params.get(key); + return (v == null || v.toString().isBlank()) ? null : v.toString(); + } + + private static Map errorMap(String msg) { + Map m = new HashMap<>(); + m.put("error", msg); + return m; + } +} diff --git a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/DefaultToolContext.java b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/DefaultToolContext.java index ad9d5d2448dd..2520751adcee 100644 --- a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/DefaultToolContext.java +++ b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/DefaultToolContext.java @@ -83,6 +83,15 @@ public McpSchema.CallToolResult callTool( case "create_metric": result = new CreateMetricTool().execute(authorizer, limits, securityContext, params); break; + case "run_ingestion": + result = new RunIngestionTool().execute(authorizer, limits, securityContext, params); + break; + case "get_ingestion_status": + result = new GetIngestionStatusTool().execute(authorizer, securityContext, params); + break; + case "create_alert": + result = new CreateAlertTool().execute(authorizer, limits, securityContext, params); + break; default: return McpSchema.CallToolResult.builder() .content( diff --git a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/GetIngestionStatusTool.java b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/GetIngestionStatusTool.java new file mode 100644 index 000000000000..ab0feff5bec1 --- /dev/null +++ b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/GetIngestionStatusTool.java @@ -0,0 +1,131 @@ +/* + * Copyright 2025 Collate + * Licensed under the Collate Community License, Version 1.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.openmetadata.mcp.tools; + +import static org.openmetadata.schema.type.MetadataOperation.VIEW_ALL; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus; +import org.openmetadata.schema.utils.ResultList; +import org.openmetadata.service.Entity; +import org.openmetadata.service.jdbi3.IngestionPipelineRepository; +import org.openmetadata.service.limits.Limits; +import org.openmetadata.service.security.Authorizer; +import org.openmetadata.service.security.auth.CatalogSecurityContext; +import org.openmetadata.service.security.policyevaluator.OperationContext; +import org.openmetadata.service.security.policyevaluator.ResourceContext; + +@Slf4j +public class GetIngestionStatusTool implements McpTool { + + private static final String RESOURCE = Entity.INGESTION_PIPELINE; + private static final String PARAM_FQN = "ingestionPipelineFqn"; + private static final String PARAM_LIMIT = "limit"; + private static final int DEFAULT_LIMIT = 5; + private static final int MAX_LIMIT = 20; + private static final long LOOKBACK_MS = 30L * 24 * 3600 * 1000; + + @Override + public Map execute( + Authorizer authorizer, CatalogSecurityContext securityContext, Map params) + throws IOException { + final String fqn = requireString(params, PARAM_FQN); + if (fqn == null) { + return errorMap(PARAM_FQN + " is required"); + } + final int limit = clampInt(params.get(PARAM_LIMIT), 1, MAX_LIMIT, DEFAULT_LIMIT); + + authorizer.authorize( + securityContext, new OperationContext(RESOURCE, VIEW_ALL), new ResourceContext<>(RESOURCE)); + + IngestionPipelineRepository repo = + (IngestionPipelineRepository) Entity.getEntityRepository(RESOURCE); + + long endTs = System.currentTimeMillis(); + long startTs = endTs - LOOKBACK_MS; + ResultList statuses; + try { + statuses = repo.listPipelineStatus(fqn, startTs, endTs, limit); + } catch (Exception exc) { + LOG.warn("listPipelineStatus failed for {}: {}", fqn, exc.getMessage()); + return errorMap("Pipeline not found or status unavailable: " + fqn); + } + + List> runs = new ArrayList<>(); + statuses.getData().stream() + .sorted(Comparator.comparingLong((PipelineStatus s) -> nvl(s.getStartDate())).reversed()) + .limit(limit) + .forEach(s -> runs.add(toRunMap(s))); + + Map result = new HashMap<>(); + result.put("pipelineFqn", fqn); + result.put("count", runs.size()); + result.put("runs", runs); + return result; + } + + @Override + public Map execute( + Authorizer authorizer, + Limits limits, + CatalogSecurityContext securityContext, + Map params) { + throw new UnsupportedOperationException( + "GetIngestionStatusTool does not require limit validation."); + } + + private static Map toRunMap(PipelineStatus s) { + Map m = new HashMap<>(); + m.put("runId", s.getRunId()); + m.put( + "state", + s.getPipelineState() != null ? s.getPipelineState().toString().toLowerCase() : "unknown"); + m.put("startTime", s.getStartDate()); + m.put("endTime", s.getEndDate()); + m.put("timestamp", s.getTimestamp()); + return m; + } + + private static long nvl(Long v) { + return v == null ? 0L : v; + } + + private static String requireString(Map params, String key) { + Object v = params.get(key); + return (v == null || v.toString().isBlank()) ? null : v.toString().trim(); + } + + private static int clampInt(Object raw, int min, int max, int fallback) { + if (raw == null) { + return fallback; + } + try { + int v = Integer.parseInt(raw.toString()); + return Math.min(Math.max(v, min), max); + } catch (NumberFormatException e) { + return fallback; + } + } + + private static Map errorMap(String msg) { + Map m = new HashMap<>(); + m.put("error", msg); + return m; + } +} diff --git a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/RunIngestionTool.java b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/RunIngestionTool.java new file mode 100644 index 000000000000..eca16269dbd6 --- /dev/null +++ b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/RunIngestionTool.java @@ -0,0 +1,140 @@ +/* + * Copyright 2025 Collate + * Licensed under the Collate Community License, Version 1.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.openmetadata.mcp.tools; + +import static org.openmetadata.schema.type.MetadataOperation.EDIT_ALL; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.openmetadata.schema.ServiceEntityInterface; +import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline; +import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineServiceClientResponse; +import org.openmetadata.schema.type.Include; +import org.openmetadata.sdk.PipelineServiceClientInterface; +import org.openmetadata.service.Entity; +import org.openmetadata.service.jdbi3.IngestionPipelineRepository; +import org.openmetadata.service.limits.Limits; +import org.openmetadata.service.security.Authorizer; +import org.openmetadata.service.security.auth.CatalogSecurityContext; +import org.openmetadata.service.security.policyevaluator.OperationContext; +import org.openmetadata.service.security.policyevaluator.ResourceContext; + +/** + * Triggers an OpenMetadata ingestion pipeline via the same pathway used by the {@code + * IngestionPipelineResource#triggerIngestion} REST endpoint. + * + *

The {@code pipelineServiceClient} field on {@link IngestionPipelineRepository} is private with + * a Lombok-generated setter only. Until the repository exposes a public {@code runIngestion(...)} + * helper (follow-up PR), we read the field reflectively. This is a small, isolated workaround -- + * the rest of the orchestration mirrors the resource implementation. + */ +@Slf4j +public class RunIngestionTool implements McpTool { + + private static final String RESOURCE = Entity.INGESTION_PIPELINE; + private static final String PARAM_FQN = "ingestionPipelineFqn"; + + @Override + public Map execute( + Authorizer authorizer, CatalogSecurityContext securityContext, Map params) { + throw new UnsupportedOperationException("RunIngestionTool requires limit validation."); + } + + @Override + public Map execute( + Authorizer authorizer, + Limits limits, + CatalogSecurityContext securityContext, + Map params) + throws IOException { + final String fqn = requireString(params, PARAM_FQN); + if (fqn == null) { + return errorMap(PARAM_FQN + " is required"); + } + + authorizer.authorize( + securityContext, new OperationContext(RESOURCE, EDIT_ALL), new ResourceContext<>(RESOURCE)); + + IngestionPipeline pipeline = + (IngestionPipeline) Entity.getEntityByName(RESOURCE, fqn, "*", Include.NON_DELETED); + if (pipeline == null) { + return errorMap("Pipeline not found: " + fqn); + } + if (Boolean.FALSE.equals(pipeline.getEnabled())) { + return errorMap("Pipeline is disabled: " + fqn); + } + + IngestionPipelineRepository repo = + (IngestionPipelineRepository) Entity.getEntityRepository(RESOURCE); + + PipelineServiceClientInterface client = readPipelineServiceClient(repo); + if (client == null) { + return errorMap( + "Pipeline Service Client is not configured on this server -- cannot trigger ingestions."); + } + + ServiceEntityInterface service = + Entity.getEntity(pipeline.getService(), "ingestionRunner", Include.NON_DELETED); + + PipelineServiceClientResponse response; + try { + response = client.runPipeline(pipeline, service); + } catch (Exception exc) { + LOG.warn("runPipeline failed for {}: {}", fqn, exc.getMessage()); + return errorMap("Trigger failed: " + exc.getMessage()); + } + + Map result = new HashMap<>(); + result.put("pipelineFqn", fqn); + result.put( + "state", response.getCode() != null && response.getCode() == 200 ? "triggered" : "error"); + result.put("statusCode", response.getCode()); + result.put("reason", response.getReason() != null ? response.getReason() : "Triggered"); + result.put("platform", response.getPlatform()); + result.put("triggeredAt", System.currentTimeMillis()); + return result; + } + + /** + * Reflectively read the {@code pipelineServiceClient} field on the repository. The field is + * package-private with a Lombok {@code @Setter} only. Follow-up PR: expose a public {@code + * runIngestion(...)} method on {@link IngestionPipelineRepository} so this is unnecessary. + */ + private static PipelineServiceClientInterface readPipelineServiceClient( + IngestionPipelineRepository repo) { + try { + Field field = IngestionPipelineRepository.class.getDeclaredField("pipelineServiceClient"); + field.setAccessible(true); + return (PipelineServiceClientInterface) field.get(repo); + } catch (ReflectiveOperationException exc) { + LOG.warn( + "Could not access IngestionPipelineRepository.pipelineServiceClient: {}", + exc.getMessage()); + return null; + } + } + + private static String requireString(Map params, String key) { + Object v = params.get(key); + return (v == null || v.toString().isBlank()) ? null : v.toString().trim(); + } + + private static Map errorMap(String msg) { + Map m = new HashMap<>(); + m.put("error", msg); + return m; + } +} diff --git a/openmetadata-mcp/src/main/resources/json/data/mcp/tools.json b/openmetadata-mcp/src/main/resources/json/data/mcp/tools.json index d88b67d37ac8..dbac4f883bd8 100644 --- a/openmetadata-mcp/src/main/resources/json/data/mcp/tools.json +++ b/openmetadata-mcp/src/main/resources/json/data/mcp/tools.json @@ -584,6 +584,88 @@ "entityType" ] } + }, + { + "name": "run_ingestion", + "description": "Triggers an OpenMetadata ingestion pipeline to run immediately. Returns the run id and the pipeline's new state. Use this when a user asks to run, execute, start, kick off, or re-run a named ingestion pipeline or connector.", + "inputSchema": { + "type": "object", + "properties": { + "ingestionPipelineFqn": { + "type": "string", + "description": "Fully qualified name of the ingestion pipeline." + } + }, + "required": [ + "ingestionPipelineFqn" + ] + } + }, + { + "name": "get_ingestion_status", + "description": "Returns the most recent ingestion pipeline runs for a named pipeline. Use this when a user asks about status, latest run, did it succeed, recent failures, or history of an ingestion.", + "inputSchema": { + "type": "object", + "properties": { + "ingestionPipelineFqn": { + "type": "string", + "description": "Fully qualified name of the ingestion pipeline." + }, + "limit": { + "type": "integer", + "description": "Max number of status entries to return. Default 5, max 20.", + "minimum": 1, + "maximum": 20, + "default": 5 + } + }, + "required": [ + "ingestionPipelineFqn" + ] + } + }, + { + "name": "create_alert", + "description": "Creates an OpenMetadata alert (EventSubscription) that POSTs to a webhook when a specified ingestion pipeline fails. Use this when a user asks to set up monitoring, notify me, alert on failure, watch for changes, or create a rule.", + "inputSchema": { + "type": "object", + "properties": { + "alertName": { + "type": "string", + "description": "Unique alert name (entity name, no spaces)." + }, + "description": { + "type": "string", + "description": "Optional short markdown description." + }, + "resourceType": { + "type": "string", + "enum": ["ingestionPipeline"], + "description": "OM resource to watch. v1 supports 'ingestionPipeline' only." + }, + "resourceFqn": { + "type": "string", + "description": "FQN of the specific resource instance to watch (the ingestion pipeline FQN)." + }, + "eventType": { + "type": "string", + "enum": ["pipelineFailed"], + "description": "Which event triggers the alert. v1 supports 'pipelineFailed'." + }, + "webhookUrl": { + "type": "string", + "format": "uri", + "description": "HTTP(S) URL to POST the alert payload to." + } + }, + "required": [ + "alertName", + "resourceType", + "resourceFqn", + "eventType", + "webhookUrl" + ] + } } ] } diff --git a/openmetadata-mcp/src/test/java/org/openmetadata/mcp/tools/CreateAlertToolTest.java b/openmetadata-mcp/src/test/java/org/openmetadata/mcp/tools/CreateAlertToolTest.java new file mode 100644 index 000000000000..dd9b4612e4c3 --- /dev/null +++ b/openmetadata-mcp/src/test/java/org/openmetadata/mcp/tools/CreateAlertToolTest.java @@ -0,0 +1,89 @@ +/* + * Copyright 2025 Collate + * Licensed under the Collate Community License, Version 1.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.openmetadata.mcp.tools; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; + +import java.util.Map; +import org.junit.jupiter.api.Test; +import org.openmetadata.service.limits.Limits; +import org.openmetadata.service.security.Authorizer; +import org.openmetadata.service.security.auth.CatalogSecurityContext; + +class CreateAlertToolTest { + + private Map baseParams() { + return Map.of( + "alertName", "demo_alert", + "resourceType", "ingestionPipeline", + "resourceFqn", "kestra_demo", + "eventType", "pipelineFailed", + "webhookUrl", "http://localhost:9999/hook"); + } + + @Test + void execute_returnsError_whenAlertNameMissing() throws Exception { + Map p = new java.util.HashMap<>(baseParams()); + p.remove("alertName"); + Map r = + new CreateAlertTool() + .execute( + mock(Authorizer.class), mock(Limits.class), mock(CatalogSecurityContext.class), p); + assertEquals("alertName is required", r.get("error")); + } + + @Test + void execute_returnsError_whenResourceTypeNotSupported() throws Exception { + Map p = new java.util.HashMap<>(baseParams()); + p.put("resourceType", "table"); + Map r = + new CreateAlertTool() + .execute( + mock(Authorizer.class), mock(Limits.class), mock(CatalogSecurityContext.class), p); + assertTrue(r.get("error").toString().contains("ingestionPipeline")); + } + + @Test + void execute_returnsError_whenWebhookUrlInvalid() throws Exception { + Map p = new java.util.HashMap<>(baseParams()); + p.put("webhookUrl", "not-a-url"); + Map r = + new CreateAlertTool() + .execute( + mock(Authorizer.class), mock(Limits.class), mock(CatalogSecurityContext.class), p); + assertTrue(r.get("error").toString().contains("webhookUrl")); + } + + @Test + void execute_returnsError_whenEventTypeNotSupported() throws Exception { + Map p = new java.util.HashMap<>(baseParams()); + p.put("eventType", "nonsense"); + Map r = + new CreateAlertTool() + .execute( + mock(Authorizer.class), mock(Limits.class), mock(CatalogSecurityContext.class), p); + assertTrue(r.get("error").toString().contains("pipelineFailed")); + } + + @Test + void execute_firstOverload_throwsUnsupportedOperation() { + assertThrows( + UnsupportedOperationException.class, + () -> + new CreateAlertTool() + .execute(mock(Authorizer.class), mock(CatalogSecurityContext.class), Map.of())); + } +} diff --git a/openmetadata-mcp/src/test/java/org/openmetadata/mcp/tools/GetIngestionStatusToolTest.java b/openmetadata-mcp/src/test/java/org/openmetadata/mcp/tools/GetIngestionStatusToolTest.java new file mode 100644 index 000000000000..796279a6f4cc --- /dev/null +++ b/openmetadata-mcp/src/test/java/org/openmetadata/mcp/tools/GetIngestionStatusToolTest.java @@ -0,0 +1,59 @@ +/* + * Copyright 2025 Collate + * Licensed under the Collate Community License, Version 1.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.openmetadata.mcp.tools; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; + +import java.util.Map; +import org.junit.jupiter.api.Test; +import org.openmetadata.service.limits.Limits; +import org.openmetadata.service.security.Authorizer; +import org.openmetadata.service.security.auth.CatalogSecurityContext; + +class GetIngestionStatusToolTest { + + @Test + void execute_returnsError_whenFqnMissing() throws Exception { + Map result = + new GetIngestionStatusTool() + .execute(mock(Authorizer.class), mock(CatalogSecurityContext.class), Map.of()); + assertEquals("ingestionPipelineFqn is required", result.get("error")); + } + + @Test + void execute_returnsError_whenFqnBlank() throws Exception { + Map result = + new GetIngestionStatusTool() + .execute( + mock(Authorizer.class), + mock(CatalogSecurityContext.class), + Map.of("ingestionPipelineFqn", " ")); + assertNotNull(result.get("error")); + } + + @Test + void execute_secondOverload_throwsUnsupportedOperation() { + assertThrows( + UnsupportedOperationException.class, + () -> + new GetIngestionStatusTool() + .execute( + mock(Authorizer.class), + mock(Limits.class), + mock(CatalogSecurityContext.class), + Map.of())); + } +} diff --git a/openmetadata-mcp/src/test/java/org/openmetadata/mcp/tools/RunIngestionToolTest.java b/openmetadata-mcp/src/test/java/org/openmetadata/mcp/tools/RunIngestionToolTest.java new file mode 100644 index 000000000000..a574450735e8 --- /dev/null +++ b/openmetadata-mcp/src/test/java/org/openmetadata/mcp/tools/RunIngestionToolTest.java @@ -0,0 +1,47 @@ +/* + * Copyright 2025 Collate + * Licensed under the Collate Community License, Version 1.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.openmetadata.mcp.tools; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; + +import java.util.Map; +import org.junit.jupiter.api.Test; +import org.openmetadata.service.limits.Limits; +import org.openmetadata.service.security.Authorizer; +import org.openmetadata.service.security.auth.CatalogSecurityContext; + +class RunIngestionToolTest { + + @Test + void execute_returnsError_whenFqnMissing() throws Exception { + Map result = + new RunIngestionTool() + .execute( + mock(Authorizer.class), + mock(Limits.class), + mock(CatalogSecurityContext.class), + Map.of()); + assertEquals("ingestionPipelineFqn is required", result.get("error")); + } + + @Test + void execute_firstOverload_throwsUnsupportedOperation() { + assertThrows( + UnsupportedOperationException.class, + () -> + new RunIngestionTool() + .execute(mock(Authorizer.class), mock(CatalogSecurityContext.class), Map.of())); + } +} From a5b3b1fd99e982d7b446fdfd4cb73dcd8377f8f2 Mon Sep 17 00:00:00 2001 From: Aditya Puri Date: Sun, 26 Apr 2026 14:14:41 +0000 Subject: [PATCH 2/2] fix(mcp): use null secretKey on webhook destinations EventSubscriptionMapper.createToEntity passes every destination through Fernet.encryptWebhookSecretKey; an empty string was being encrypted in place of being left absent, which silently broke any future webhook signature verification at delivery time. Pass null instead so the encryption step skips the field. Tests: 10/10 still pass. --- .../java/org/openmetadata/mcp/tools/CreateAlertTool.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/CreateAlertTool.java b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/CreateAlertTool.java index 347b1c214297..54374bdf97a7 100644 --- a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/CreateAlertTool.java +++ b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/CreateAlertTool.java @@ -144,9 +144,12 @@ private static CreateEventSubscription buildRequest( dest.setId(UUID.randomUUID()); dest.setCategory(SubscriptionCategory.EXTERNAL); dest.setType(SubscriptionType.WEBHOOK); + // secretKey must be null (not "") so the mapper's Fernet encryption step + // skips it. Encrypting an empty string would silently break later webhook + // signature verification. Map config = new HashMap<>(); config.put("endpoint", webhookUrl); - config.put("secretKey", ""); + config.put("secretKey", null); config.put("headers", new HashMap<>()); dest.setConfig(JsonUtils.convertValue(config, Object.class));