-
Notifications
You must be signed in to change notification settings - Fork 2.1k
feat(mcp): add tools to run, monitor, and alert on ingestion pipelines #27743
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,179 @@ | ||
| /* | ||
| * Copyright 2025 Collate | ||
| * Licensed under the Collate Community License, Version 1.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.openmetadata.mcp.tools; | ||
|
|
||
| import static org.openmetadata.schema.type.MetadataOperation.CREATE; | ||
|
|
||
| import java.util.HashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.UUID; | ||
| import lombok.extern.slf4j.Slf4j; | ||
| import org.openmetadata.schema.api.events.CreateEventSubscription; | ||
| import org.openmetadata.schema.api.events.CreateEventSubscription.AlertType; | ||
| import org.openmetadata.schema.entity.events.EventSubscription; | ||
| import org.openmetadata.schema.entity.events.SubscriptionDestination; | ||
| import org.openmetadata.schema.entity.events.SubscriptionDestination.SubscriptionCategory; | ||
| import org.openmetadata.schema.entity.events.SubscriptionDestination.SubscriptionType; | ||
| import org.openmetadata.schema.entity.events.TriggerConfig; | ||
| import org.openmetadata.schema.entity.events.TriggerConfig.TriggerType; | ||
| import org.openmetadata.schema.utils.JsonUtils; | ||
| import org.openmetadata.service.Entity; | ||
| import org.openmetadata.service.jdbi3.EventSubscriptionRepository; | ||
| import org.openmetadata.service.limits.Limits; | ||
| import org.openmetadata.service.resources.events.subscription.EventSubscriptionMapper; | ||
| import org.openmetadata.service.security.Authorizer; | ||
| import org.openmetadata.service.security.ImpersonationContext; | ||
| import org.openmetadata.service.security.auth.CatalogSecurityContext; | ||
| import org.openmetadata.service.security.policyevaluator.CreateResourceContext; | ||
| import org.openmetadata.service.security.policyevaluator.OperationContext; | ||
| import org.openmetadata.service.util.RestUtil; | ||
|
|
||
| /** | ||
| * MCP tool that creates an OpenMetadata EventSubscription (alert). v1 supports a single, opinionated | ||
| * shape: webhook destination + ingestion-pipeline failure trigger. Multi-destination + multi-event | ||
| * variants are deferred to follow-up PRs (see issue #26609). | ||
| */ | ||
| @Slf4j | ||
| public class CreateAlertTool implements McpTool { | ||
|
|
||
| private static final String SUPPORTED_RESOURCE_TYPE = "ingestionPipeline"; | ||
| private static final String SUPPORTED_EVENT_TYPE = "pipelineFailed"; | ||
|
|
||
| private static final EventSubscriptionMapper MAPPER = new EventSubscriptionMapper(); | ||
|
|
||
| @Override | ||
| public Map<String, Object> execute( | ||
| Authorizer authorizer, CatalogSecurityContext securityContext, Map<String, Object> params) { | ||
| throw new UnsupportedOperationException("CreateAlertTool requires limit validation."); | ||
| } | ||
|
|
||
| @Override | ||
| public Map<String, Object> execute( | ||
| Authorizer authorizer, | ||
| Limits limits, | ||
| CatalogSecurityContext securityContext, | ||
| Map<String, Object> params) { | ||
| String alertName = requireString(params, "alertName"); | ||
| if (alertName == null) { | ||
| return errorMap("alertName is required"); | ||
| } | ||
|
|
||
| String resourceType = requireString(params, "resourceType"); | ||
| if (!SUPPORTED_RESOURCE_TYPE.equals(resourceType)) { | ||
| return errorMap("v1 supports resourceType=" + SUPPORTED_RESOURCE_TYPE + " only"); | ||
| } | ||
|
|
||
| String resourceFqn = requireString(params, "resourceFqn"); | ||
| if (resourceFqn == null) { | ||
| return errorMap("resourceFqn is required"); | ||
| } | ||
|
|
||
| String eventType = requireString(params, "eventType"); | ||
| if (!SUPPORTED_EVENT_TYPE.equals(eventType)) { | ||
| return errorMap("v1 supports eventType=" + SUPPORTED_EVENT_TYPE + " only"); | ||
| } | ||
|
Comment on lines
+83
to
+85
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
|
||
| String webhookUrl = requireString(params, "webhookUrl"); | ||
| if (webhookUrl == null || !isValidHttpUrl(webhookUrl)) { | ||
| return errorMap("webhookUrl must be a valid http(s) URL"); | ||
| } | ||
|
|
||
| String description = optionalString(params, "description"); | ||
|
|
||
| OperationContext operationContext = new OperationContext(Entity.EVENT_SUBSCRIPTION, CREATE); | ||
| String userName = securityContext.getUserPrincipal().getName(); | ||
|
|
||
| CreateEventSubscription create = buildRequest(alertName, description, webhookUrl); | ||
| EventSubscription entity = MAPPER.createToEntity(create, userName); | ||
|
|
||
| CreateResourceContext<EventSubscription> createResourceContext = | ||
| new CreateResourceContext<>(Entity.EVENT_SUBSCRIPTION, entity); | ||
| limits.enforceLimits(securityContext, createResourceContext, operationContext); | ||
| authorizer.authorize(securityContext, operationContext, createResourceContext); | ||
|
|
||
| EventSubscriptionRepository repo = | ||
| (EventSubscriptionRepository) Entity.getEntityRepository(Entity.EVENT_SUBSCRIPTION); | ||
| repo.prepareInternal(entity, false); | ||
|
|
||
| String impersonatedBy = ImpersonationContext.getImpersonatedBy(); | ||
| RestUtil.PutResponse<EventSubscription> response = | ||
| repo.createOrUpdate(null, entity, userName, impersonatedBy); | ||
|
|
||
| Map<String, Object> result = new HashMap<>(); | ||
| EventSubscription created = response.getEntity(); | ||
| result.put("alertId", created.getId() != null ? created.getId().toString() : null); | ||
| result.put("alertName", created.getName()); | ||
| result.put("resourceFqn", resourceFqn); | ||
| result.put("eventType", eventType); | ||
| result.put("webhookUrl", webhookUrl); | ||
| result.put("enabled", Boolean.TRUE.equals(created.getEnabled())); | ||
| result.put("createdAt", created.getUpdatedAt()); | ||
| return result; | ||
| } | ||
|
|
||
| private static CreateEventSubscription buildRequest( | ||
| String name, String description, String webhookUrl) { | ||
| CreateEventSubscription r = new CreateEventSubscription(); | ||
| r.setName(name); | ||
| if (description != null) { | ||
| r.setDescription(description); | ||
| } | ||
| r.setAlertType(AlertType.NOTIFICATION); | ||
| r.setResources(List.of(SUPPORTED_RESOURCE_TYPE)); | ||
| r.setEnabled(true); | ||
| r.setBatchSize(10); | ||
| r.setRetries(3); | ||
| r.setPollInterval(10); | ||
|
|
||
| TriggerConfig trigger = new TriggerConfig(); | ||
| trigger.setTriggerType(TriggerType.REAL_TIME); | ||
| r.setTrigger(trigger); | ||
|
|
||
| SubscriptionDestination dest = new SubscriptionDestination(); | ||
| dest.setId(UUID.randomUUID()); | ||
| dest.setCategory(SubscriptionCategory.EXTERNAL); | ||
| dest.setType(SubscriptionType.WEBHOOK); | ||
| // secretKey must be null (not "") so the mapper's Fernet encryption step | ||
| // skips it. Encrypting an empty string would silently break later webhook | ||
| // signature verification. | ||
| Map<String, Object> config = new HashMap<>(); | ||
| config.put("endpoint", webhookUrl); | ||
| config.put("secretKey", null); | ||
| config.put("headers", new HashMap<>()); | ||
| dest.setConfig(JsonUtils.convertValue(config, Object.class)); | ||
|
|
||
| r.setDestinations(List.of(dest)); | ||
| return r; | ||
| } | ||
|
|
||
| private static boolean isValidHttpUrl(String s) { | ||
| return s != null && (s.startsWith("http://") || s.startsWith("https://")); | ||
| } | ||
|
|
||
| private static String requireString(Map<String, Object> params, String key) { | ||
| Object v = params.get(key); | ||
| return (v == null || v.toString().isBlank()) ? null : v.toString().trim(); | ||
| } | ||
|
|
||
| private static String optionalString(Map<String, Object> params, String key) { | ||
| Object v = params.get(key); | ||
| return (v == null || v.toString().isBlank()) ? null : v.toString(); | ||
| } | ||
|
|
||
| private static Map<String, Object> errorMap(String msg) { | ||
| Map<String, Object> m = new HashMap<>(); | ||
| m.put("error", msg); | ||
| return m; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,131 @@ | ||
| /* | ||
| * Copyright 2025 Collate | ||
| * Licensed under the Collate Community License, Version 1.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.openmetadata.mcp.tools; | ||
|
|
||
| import static org.openmetadata.schema.type.MetadataOperation.VIEW_ALL; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.ArrayList; | ||
| import java.util.Comparator; | ||
| import java.util.HashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import lombok.extern.slf4j.Slf4j; | ||
| import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus; | ||
| import org.openmetadata.schema.utils.ResultList; | ||
| import org.openmetadata.service.Entity; | ||
| import org.openmetadata.service.jdbi3.IngestionPipelineRepository; | ||
| import org.openmetadata.service.limits.Limits; | ||
| import org.openmetadata.service.security.Authorizer; | ||
| import org.openmetadata.service.security.auth.CatalogSecurityContext; | ||
| import org.openmetadata.service.security.policyevaluator.OperationContext; | ||
| import org.openmetadata.service.security.policyevaluator.ResourceContext; | ||
|
|
||
| @Slf4j | ||
| public class GetIngestionStatusTool implements McpTool { | ||
|
|
||
| private static final String RESOURCE = Entity.INGESTION_PIPELINE; | ||
| private static final String PARAM_FQN = "ingestionPipelineFqn"; | ||
| private static final String PARAM_LIMIT = "limit"; | ||
| private static final int DEFAULT_LIMIT = 5; | ||
| private static final int MAX_LIMIT = 20; | ||
| private static final long LOOKBACK_MS = 30L * 24 * 3600 * 1000; | ||
|
|
||
| @Override | ||
| public Map<String, Object> execute( | ||
| Authorizer authorizer, CatalogSecurityContext securityContext, Map<String, Object> params) | ||
| throws IOException { | ||
| final String fqn = requireString(params, PARAM_FQN); | ||
| if (fqn == null) { | ||
| return errorMap(PARAM_FQN + " is required"); | ||
| } | ||
| final int limit = clampInt(params.get(PARAM_LIMIT), 1, MAX_LIMIT, DEFAULT_LIMIT); | ||
|
|
||
| authorizer.authorize( | ||
| securityContext, new OperationContext(RESOURCE, VIEW_ALL), new ResourceContext<>(RESOURCE)); | ||
|
|
||
| IngestionPipelineRepository repo = | ||
| (IngestionPipelineRepository) Entity.getEntityRepository(RESOURCE); | ||
|
|
||
| long endTs = System.currentTimeMillis(); | ||
| long startTs = endTs - LOOKBACK_MS; | ||
| ResultList<PipelineStatus> statuses; | ||
| try { | ||
| statuses = repo.listPipelineStatus(fqn, startTs, endTs, limit); | ||
| } catch (Exception exc) { | ||
| LOG.warn("listPipelineStatus failed for {}: {}", fqn, exc.getMessage()); | ||
| return errorMap("Pipeline not found or status unavailable: " + fqn); | ||
| } | ||
|
|
||
| List<Map<String, Object>> runs = new ArrayList<>(); | ||
| statuses.getData().stream() | ||
| .sorted(Comparator.comparingLong((PipelineStatus s) -> nvl(s.getStartDate())).reversed()) | ||
| .limit(limit) | ||
| .forEach(s -> runs.add(toRunMap(s))); | ||
|
|
||
| Map<String, Object> result = new HashMap<>(); | ||
| result.put("pipelineFqn", fqn); | ||
| result.put("count", runs.size()); | ||
| result.put("runs", runs); | ||
| return result; | ||
| } | ||
|
|
||
| @Override | ||
| public Map<String, Object> execute( | ||
| Authorizer authorizer, | ||
| Limits limits, | ||
| CatalogSecurityContext securityContext, | ||
| Map<String, Object> params) { | ||
| throw new UnsupportedOperationException( | ||
| "GetIngestionStatusTool does not require limit validation."); | ||
| } | ||
|
|
||
| private static Map<String, Object> toRunMap(PipelineStatus s) { | ||
| Map<String, Object> m = new HashMap<>(); | ||
| m.put("runId", s.getRunId()); | ||
| m.put( | ||
| "state", | ||
| s.getPipelineState() != null ? s.getPipelineState().toString().toLowerCase() : "unknown"); | ||
| m.put("startTime", s.getStartDate()); | ||
| m.put("endTime", s.getEndDate()); | ||
| m.put("timestamp", s.getTimestamp()); | ||
| return m; | ||
| } | ||
|
|
||
| private static long nvl(Long v) { | ||
| return v == null ? 0L : v; | ||
| } | ||
|
|
||
| private static String requireString(Map<String, Object> params, String key) { | ||
| Object v = params.get(key); | ||
| return (v == null || v.toString().isBlank()) ? null : v.toString().trim(); | ||
| } | ||
|
|
||
| private static int clampInt(Object raw, int min, int max, int fallback) { | ||
| if (raw == null) { | ||
| return fallback; | ||
| } | ||
| try { | ||
| int v = Integer.parseInt(raw.toString()); | ||
| return Math.min(Math.max(v, min), max); | ||
| } catch (NumberFormatException e) { | ||
| return fallback; | ||
| } | ||
| } | ||
|
|
||
| private static Map<String, Object> errorMap(String msg) { | ||
| Map<String, Object> m = new HashMap<>(); | ||
| m.put("error", msg); | ||
| return m; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚨 Bug:
resourceFqnis validated but never used to scope the alertThe
resourceFqnparameter is validated (line 77-79) and echoed back in the response (line 117), but it is never passed tobuildRequest()or otherwise used to configure theEventSubscriptionfilter. The resulting alert fires for allingestionPipelinefailures rather than just the specific pipeline the user requested. This silently violates the API contract described in the JSON schema ("FQN of the specific resource instance to watch") and will flood the webhook with unrelated failure notifications.The
buildRequestmethod needs to acceptresourceFqnand configure an appropriate filter rule (e.g., viaFilteringRulesor equivalent) on theCreateEventSubscriptionto restrict the alert to the named pipeline.Suggested fix:
Was this helpful? React with 👍 / 👎 | Reply
gitar fixto apply this suggestion