Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* Copyright 2025 Collate
* Licensed under the Collate Community License, Version 1.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.mcp.tools;

import static org.openmetadata.schema.type.MetadataOperation.CREATE;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.api.events.CreateEventSubscription;
import org.openmetadata.schema.api.events.CreateEventSubscription.AlertType;
import org.openmetadata.schema.entity.events.EventSubscription;
import org.openmetadata.schema.entity.events.SubscriptionDestination;
import org.openmetadata.schema.entity.events.SubscriptionDestination.SubscriptionCategory;
import org.openmetadata.schema.entity.events.SubscriptionDestination.SubscriptionType;
import org.openmetadata.schema.entity.events.TriggerConfig;
import org.openmetadata.schema.entity.events.TriggerConfig.TriggerType;
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.service.Entity;
import org.openmetadata.service.jdbi3.EventSubscriptionRepository;
import org.openmetadata.service.limits.Limits;
import org.openmetadata.service.resources.events.subscription.EventSubscriptionMapper;
import org.openmetadata.service.security.Authorizer;
import org.openmetadata.service.security.ImpersonationContext;
import org.openmetadata.service.security.auth.CatalogSecurityContext;
import org.openmetadata.service.security.policyevaluator.CreateResourceContext;
import org.openmetadata.service.security.policyevaluator.OperationContext;
import org.openmetadata.service.util.RestUtil;

/**
* MCP tool that creates an OpenMetadata EventSubscription (alert). v1 supports a single, opinionated
* shape: webhook destination + ingestion-pipeline failure trigger. Multi-destination + multi-event
* variants are deferred to follow-up PRs (see issue #26609).
*/
@Slf4j
public class CreateAlertTool implements McpTool {

private static final String SUPPORTED_RESOURCE_TYPE = "ingestionPipeline";
private static final String SUPPORTED_EVENT_TYPE = "pipelineFailed";

private static final EventSubscriptionMapper MAPPER = new EventSubscriptionMapper();

@Override
public Map<String, Object> execute(
Authorizer authorizer, CatalogSecurityContext securityContext, Map<String, Object> params) {
throw new UnsupportedOperationException("CreateAlertTool requires limit validation.");
}

@Override
public Map<String, Object> execute(
Authorizer authorizer,
Limits limits,
CatalogSecurityContext securityContext,
Map<String, Object> params) {
String alertName = requireString(params, "alertName");
if (alertName == null) {
return errorMap("alertName is required");
}

String resourceType = requireString(params, "resourceType");
if (!SUPPORTED_RESOURCE_TYPE.equals(resourceType)) {
return errorMap("v1 supports resourceType=" + SUPPORTED_RESOURCE_TYPE + " only");
}

String resourceFqn = requireString(params, "resourceFqn");
if (resourceFqn == null) {
return errorMap("resourceFqn is required");
Comment on lines +77 to +79
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 Bug: resourceFqn is validated but never used to scope the alert

The resourceFqn parameter is validated (line 77-79) and echoed back in the response (line 117), but it is never passed to buildRequest() or otherwise used to configure the EventSubscription filter. The resulting alert fires for all ingestionPipeline failures rather than just the specific pipeline the user requested. This silently violates the API contract described in the JSON schema ("FQN of the specific resource instance to watch") and will flood the webhook with unrelated failure notifications.

The buildRequest method needs to accept resourceFqn and configure an appropriate filter rule (e.g., via FilteringRules or equivalent) on the CreateEventSubscription to restrict the alert to the named pipeline.

Suggested fix:

Pass `resourceFqn` into `buildRequest` and configure a
filtering rule that restricts the subscription to the
specific pipeline entity, e.g.:

  FilteringRules rules = new FilteringRules();
  rules.setResources(List.of(resourceFqn));
  r.setInput(rules);

(Exact API depends on EventSubscription schema.)

Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion

}

String eventType = requireString(params, "eventType");
if (!SUPPORTED_EVENT_TYPE.equals(eventType)) {
return errorMap("v1 supports eventType=" + SUPPORTED_EVENT_TYPE + " only");
}
Comment on lines +83 to +85
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Bug: CreateAlertTool does not set eventType filter on subscription

Similar to the resourceFqn issue, the eventType parameter ("pipelineFailed") is validated (line 83-85) and echoed in the response (line 118), but buildRequest() never configures a filter for the specific event type. The alert will trigger on all events for the ingestionPipeline resource (creation, update, deletion, etc.), not just pipelineFailed as the user requested. This makes the tool's described behavior ("alert when pipeline fails") incorrect.

Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion


String webhookUrl = requireString(params, "webhookUrl");
if (webhookUrl == null || !isValidHttpUrl(webhookUrl)) {
return errorMap("webhookUrl must be a valid http(s) URL");
}

String description = optionalString(params, "description");

OperationContext operationContext = new OperationContext(Entity.EVENT_SUBSCRIPTION, CREATE);
String userName = securityContext.getUserPrincipal().getName();

CreateEventSubscription create = buildRequest(alertName, description, webhookUrl);
EventSubscription entity = MAPPER.createToEntity(create, userName);

CreateResourceContext<EventSubscription> createResourceContext =
new CreateResourceContext<>(Entity.EVENT_SUBSCRIPTION, entity);
limits.enforceLimits(securityContext, createResourceContext, operationContext);
authorizer.authorize(securityContext, operationContext, createResourceContext);

EventSubscriptionRepository repo =
(EventSubscriptionRepository) Entity.getEntityRepository(Entity.EVENT_SUBSCRIPTION);
repo.prepareInternal(entity, false);

String impersonatedBy = ImpersonationContext.getImpersonatedBy();
RestUtil.PutResponse<EventSubscription> response =
repo.createOrUpdate(null, entity, userName, impersonatedBy);

Map<String, Object> result = new HashMap<>();
EventSubscription created = response.getEntity();
result.put("alertId", created.getId() != null ? created.getId().toString() : null);
result.put("alertName", created.getName());
result.put("resourceFqn", resourceFqn);
result.put("eventType", eventType);
result.put("webhookUrl", webhookUrl);
result.put("enabled", Boolean.TRUE.equals(created.getEnabled()));
result.put("createdAt", created.getUpdatedAt());
return result;
}

private static CreateEventSubscription buildRequest(
String name, String description, String webhookUrl) {
CreateEventSubscription r = new CreateEventSubscription();
r.setName(name);
if (description != null) {
r.setDescription(description);
}
r.setAlertType(AlertType.NOTIFICATION);
r.setResources(List.of(SUPPORTED_RESOURCE_TYPE));
r.setEnabled(true);
r.setBatchSize(10);
r.setRetries(3);
r.setPollInterval(10);

TriggerConfig trigger = new TriggerConfig();
trigger.setTriggerType(TriggerType.REAL_TIME);
r.setTrigger(trigger);

SubscriptionDestination dest = new SubscriptionDestination();
dest.setId(UUID.randomUUID());
dest.setCategory(SubscriptionCategory.EXTERNAL);
dest.setType(SubscriptionType.WEBHOOK);
// secretKey must be null (not "") so the mapper's Fernet encryption step
// skips it. Encrypting an empty string would silently break later webhook
// signature verification.
Map<String, Object> config = new HashMap<>();
config.put("endpoint", webhookUrl);
config.put("secretKey", null);
config.put("headers", new HashMap<>());
dest.setConfig(JsonUtils.convertValue(config, Object.class));

r.setDestinations(List.of(dest));
return r;
}

private static boolean isValidHttpUrl(String s) {
return s != null && (s.startsWith("http://") || s.startsWith("https://"));
}

private static String requireString(Map<String, Object> params, String key) {
Object v = params.get(key);
return (v == null || v.toString().isBlank()) ? null : v.toString().trim();
}

private static String optionalString(Map<String, Object> params, String key) {
Object v = params.get(key);
return (v == null || v.toString().isBlank()) ? null : v.toString();
}

private static Map<String, Object> errorMap(String msg) {
Map<String, Object> m = new HashMap<>();
m.put("error", msg);
return m;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ public McpSchema.CallToolResult callTool(
case "create_metric":
result = new CreateMetricTool().execute(authorizer, limits, securityContext, params);
break;
case "run_ingestion":
result = new RunIngestionTool().execute(authorizer, limits, securityContext, params);
break;
case "get_ingestion_status":
result = new GetIngestionStatusTool().execute(authorizer, securityContext, params);
break;
case "create_alert":
result = new CreateAlertTool().execute(authorizer, limits, securityContext, params);
break;
default:
return McpSchema.CallToolResult.builder()
.content(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright 2025 Collate
* Licensed under the Collate Community License, Version 1.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.mcp.tools;

import static org.openmetadata.schema.type.MetadataOperation.VIEW_ALL;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus;
import org.openmetadata.schema.utils.ResultList;
import org.openmetadata.service.Entity;
import org.openmetadata.service.jdbi3.IngestionPipelineRepository;
import org.openmetadata.service.limits.Limits;
import org.openmetadata.service.security.Authorizer;
import org.openmetadata.service.security.auth.CatalogSecurityContext;
import org.openmetadata.service.security.policyevaluator.OperationContext;
import org.openmetadata.service.security.policyevaluator.ResourceContext;

@Slf4j
public class GetIngestionStatusTool implements McpTool {

private static final String RESOURCE = Entity.INGESTION_PIPELINE;
private static final String PARAM_FQN = "ingestionPipelineFqn";
private static final String PARAM_LIMIT = "limit";
private static final int DEFAULT_LIMIT = 5;
private static final int MAX_LIMIT = 20;
private static final long LOOKBACK_MS = 30L * 24 * 3600 * 1000;

@Override
public Map<String, Object> execute(
Authorizer authorizer, CatalogSecurityContext securityContext, Map<String, Object> params)
throws IOException {
final String fqn = requireString(params, PARAM_FQN);
if (fqn == null) {
return errorMap(PARAM_FQN + " is required");
}
final int limit = clampInt(params.get(PARAM_LIMIT), 1, MAX_LIMIT, DEFAULT_LIMIT);

authorizer.authorize(
securityContext, new OperationContext(RESOURCE, VIEW_ALL), new ResourceContext<>(RESOURCE));

IngestionPipelineRepository repo =
(IngestionPipelineRepository) Entity.getEntityRepository(RESOURCE);

long endTs = System.currentTimeMillis();
long startTs = endTs - LOOKBACK_MS;
ResultList<PipelineStatus> statuses;
try {
statuses = repo.listPipelineStatus(fqn, startTs, endTs, limit);
} catch (Exception exc) {
LOG.warn("listPipelineStatus failed for {}: {}", fqn, exc.getMessage());
return errorMap("Pipeline not found or status unavailable: " + fqn);
}

List<Map<String, Object>> runs = new ArrayList<>();
statuses.getData().stream()
.sorted(Comparator.comparingLong((PipelineStatus s) -> nvl(s.getStartDate())).reversed())
.limit(limit)
.forEach(s -> runs.add(toRunMap(s)));

Map<String, Object> result = new HashMap<>();
result.put("pipelineFqn", fqn);
result.put("count", runs.size());
result.put("runs", runs);
return result;
}

@Override
public Map<String, Object> execute(
Authorizer authorizer,
Limits limits,
CatalogSecurityContext securityContext,
Map<String, Object> params) {
throw new UnsupportedOperationException(
"GetIngestionStatusTool does not require limit validation.");
}

private static Map<String, Object> toRunMap(PipelineStatus s) {
Map<String, Object> m = new HashMap<>();
m.put("runId", s.getRunId());
m.put(
"state",
s.getPipelineState() != null ? s.getPipelineState().toString().toLowerCase() : "unknown");
m.put("startTime", s.getStartDate());
m.put("endTime", s.getEndDate());
m.put("timestamp", s.getTimestamp());
return m;
}

private static long nvl(Long v) {
return v == null ? 0L : v;
}

private static String requireString(Map<String, Object> params, String key) {
Object v = params.get(key);
return (v == null || v.toString().isBlank()) ? null : v.toString().trim();
}

private static int clampInt(Object raw, int min, int max, int fallback) {
if (raw == null) {
return fallback;
}
try {
int v = Integer.parseInt(raw.toString());
return Math.min(Math.max(v, min), max);
} catch (NumberFormatException e) {
return fallback;
}
}

private static Map<String, Object> errorMap(String msg) {
Map<String, Object> m = new HashMap<>();
m.put("error", msg);
return m;
}
}
Loading
Loading