consumer, NotificationEntityProcessor entityProcessor) {
+ super(consumerThreadName);
+
+ this.consumer = consumer;
+ this.entityProcessor = entityProcessor;
+
+ LOG.info("entityProcessor: {}", entityProcessor.getClass().getSimpleName());
+ }
+
@Override
public void run() {
LOG.info("==> HookConsumer run()");
diff --git a/webapp/src/main/java/org/apache/atlas/notification/QualifiedNameRouter.java b/webapp/src/main/java/org/apache/atlas/notification/QualifiedNameRouter.java
new file mode 100644
index 00000000000..508ae8a64be
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/notification/QualifiedNameRouter.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification;
+
+import org.apache.atlas.AtlasConfiguration;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Router class that determines which topic to use based on
+ * the first two parts of the qualifiedName of entities in the notification.
+ */
+public class QualifiedNameRouter {
+ private static final Logger LOG = LoggerFactory.getLogger(QualifiedNameRouter.class);
+ private static final String DEFAULT_TOPIC_PREFIX = AtlasConfiguration.ATLAS_METADATA_TOPIC_PREFIX.getString();
+
+ private final int maxTopicCount;
+ private final String topicPrefix;
+ private final ConcurrentHashMap routingCache;
+ private final AtomicInteger nextTopicIndex;
+
+ public QualifiedNameRouter(int maxTopicCount) {
+ this(maxTopicCount, DEFAULT_TOPIC_PREFIX);
+ }
+
+ public QualifiedNameRouter(int maxTopicCount, String topicPrefix) {
+ this.maxTopicCount = maxTopicCount > 0 ? maxTopicCount : 1;
+ this.topicPrefix = topicPrefix.endsWith("_") ? topicPrefix : topicPrefix + "_";
+ this.routingCache = new ConcurrentHashMap<>();
+ this.nextTopicIndex = new AtomicInteger(0);
+ }
+
+ public String getTargetTopic(String routingKey) {
+ if (StringUtils.isNotBlank(routingKey)) {
+ int topicIndex = getTopicIndexForRoutingKey(routingKey);
+ String targetTopic = topicPrefix + topicIndex;
+
+ LOG.debug("Routing message with key '{}' to topic '{}'", routingKey, targetTopic);
+ return targetTopic;
+ } else {
+ // Fallback to round-robin if no routing key found
+ int topicIndex = nextTopicIndex.getAndIncrement() % maxTopicCount;
+ String targetTopic = topicPrefix + topicIndex;
+
+ LOG.debug("No routing key found, using round-robin to topic '{}'", targetTopic);
+ return targetTopic;
+ }
+ }
+
+ /**
+ * Gets the topic index for a routing key using consistent hashing.
+ *
+ * @param routingKey The routing key (first two parts of qualifiedName)
+ * @return The topic index (0 to maxTopicCount-1)
+ */
+ private int getTopicIndexForRoutingKey(String routingKey) {
+ return routingCache.computeIfAbsent(routingKey, key -> {
+ // Use consistent hashing to ensure same routing key always goes to same topic
+ int hash = Math.abs(key.hashCode());
+ int topicIndex = hash % maxTopicCount;
+
+ LOG.debug("Mapped routing key '{}' to topic index {}", key, topicIndex);
+ return topicIndex;
+ });
+ }
+
+ /**
+ * Gets statistics about the routing cache.
+ *
+ * @return A string with routing statistics
+ */
+ public String getRoutingStats() {
+ return String.format("Routing cache size: %d, Max topics: %d",
+ routingCache.size(), maxTopicCount);
+ }
+
+ /**
+ * Clears the routing cache.
+ */
+ public void clearCache() {
+ routingCache.clear();
+ LOG.info("Routing cache cleared");
+ }
+}
diff --git a/webapp/src/main/java/org/apache/atlas/notification/SerialEntityProcessor.java b/webapp/src/main/java/org/apache/atlas/notification/SerialEntityProcessor.java
index fd39249d5cf..dc626e74eba 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/SerialEntityProcessor.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/SerialEntityProcessor.java
@@ -20,7 +20,6 @@
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.AtlasConfiguration;
-import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.kafka.AtlasKafkaMessage;
@@ -43,6 +42,7 @@
import org.apache.atlas.notification.preprocessor.GenericEntityPreprocessor;
import org.apache.atlas.notification.preprocessor.PreprocessorContext;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
+import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.impexp.AsyncImporter;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream;
@@ -73,8 +73,6 @@
import org.springframework.security.core.userdetails.User;
import org.springframework.security.core.userdetails.UserDetails;
-import javax.ws.rs.core.Response;
-
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
@@ -86,6 +84,7 @@
import java.util.Set;
import java.util.regex.Pattern;
+import static org.apache.atlas.AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND;
import static org.apache.atlas.model.instance.AtlasObjectId.KEY_GUID;
import static org.apache.atlas.model.instance.AtlasObjectId.KEY_TYPENAME;
import static org.apache.atlas.model.instance.AtlasObjectId.KEY_UNIQUE_ATTRIBUTES;
@@ -124,9 +123,11 @@ public class SerialEntityProcessor implements NotificationEntityProcessor {
private static final Logger LOG = LoggerFactory.getLogger(SerialEntityProcessor.class);
private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger(NotificationHookConsumer.class);
- private static final String EXCEPTION_CLASS_NAME_JANUSGRAPH_EXCEPTION = "JanusGraphException";
- private static final String EXCEPTION_CLASS_NAME_PERMANENTLOCKING_EXCEPTION = "PermanentLockingException";
- private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName();
+ private static final String EXCEPTION_CLASS_NAME_JANUSGRAPH_EXCEPTION = "JanusGraphException";
+ private static final String EXCEPTION_CLASS_NAME_PERMANENTLOCKING_EXCEPTION = "PermanentLockingException";
+ private static final String EXCEPTION_CLASS_NAME_TEMPORARY_BACKEND_EXCEPTION = "TemporaryBackendException";
+ private static final String EXCEPTION_CLASS_NAME_TEMPORARY_LOCKING_EXCEPTION = "TemporaryLockingException";
+ private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName();
private static final int SC_OK = 200;
private static final int SC_BAD_REQUEST = 400;
@@ -330,7 +331,76 @@ public TopicPartitionOffsetResult collectResults() {
@Override
public void shutdown() {
- recordFailedMessages(failedMessages);
+ }
+
+ /**
+ * Classifies exceptions as retryable or non-retryable for better error handling.
+ *
+ * Retryable exceptions (temporary issues, will retry with fresh transaction):
+ * - JanusGraphException: General graph database errors (e.g., transaction conflicts)
+ * - PermanentLockingException: Optimistic locking failures
+ * - TemporaryBackendException: Temporary storage backend issues
+ * - TemporaryLockingException: Temporary lock conflicts
+ *
+ * Non-retryable exceptions (fail fast, no retry):
+ * - AtlasSchemaViolationException: Unique constraint violations (code bug or duplicate message)
+ * - IllegalStateException: Invalid application state
+ * - NullPointerException: Programming errors
+ * - AtlasBaseException with INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND: Entity not found (acceptable)
+ *
+ * @param e the exception to classify
+ * @return true if the exception is retryable, false otherwise
+ */
+ private boolean isRetryableException(Throwable e) {
+ if (e == null) {
+ return false;
+ }
+
+ // Non-retryable: Schema violations indicate code bugs or duplicate messages
+ if (e instanceof org.apache.atlas.repository.graphdb.AtlasSchemaViolationException) {
+ LOG.warn("Non-retryable exception: SchemaViolationException indicates unique constraint violation");
+ return false;
+ }
+
+ // Non-retryable: Programming errors
+ if (e instanceof IllegalStateException || e instanceof NullPointerException) {
+ LOG.warn("Non-retryable exception: {} indicates programming error", e.getClass().getSimpleName());
+ return false;
+ }
+
+ // Non-retryable: Entity not found is acceptable in some scenarios
+ if (e instanceof AtlasBaseException) {
+ AtlasBaseException baseException = (AtlasBaseException) e;
+ if (baseException.getAtlasErrorCode().equals(INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND)) {
+ LOG.warn("Non-retryable exception: INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND");
+ return false;
+ }
+ }
+
+ // Non-retryable: InterruptedException should terminate processing
+ if (e instanceof InterruptedException) {
+ LOG.error("Non-retryable exception: InterruptedException - thread interrupted");
+ return false;
+ }
+
+ // Retryable: Transaction conflicts and locking issues
+ if (isTransactionRelatedError(e)) {
+ return true;
+ }
+
+ // Default: Retry all other exceptions (conservative approach)
+ return true;
+ }
+
+ private boolean isTransactionRelatedError(Throwable e) {
+ String exceptionClassName = e.getClass().getSimpleName();
+ if (exceptionClassName.equals(EXCEPTION_CLASS_NAME_JANUSGRAPH_EXCEPTION) ||
+ exceptionClassName.equals(EXCEPTION_CLASS_NAME_PERMANENTLOCKING_EXCEPTION) ||
+ exceptionClassName.equals(EXCEPTION_CLASS_NAME_TEMPORARY_BACKEND_EXCEPTION) ||
+ exceptionClassName.equals(EXCEPTION_CLASS_NAME_TEMPORARY_LOCKING_EXCEPTION)) {
+ return true;
+ }
+ return false;
}
public TopicPartitionOffsetResult handleMessage(Ticket ticket) {
@@ -385,6 +455,8 @@ public TopicPartitionOffsetResult handleMessage(Ticket ticket) {
return new TopicPartitionOffsetResult(kafkaMsg.getTopicPartition(), kafkaMsg.getOffset());
}
+ final String threadName = Thread.currentThread().getName();
+
// Used for intermediate conversions during create and update
String exceptionClassName = StringUtils.EMPTY;
for (int numRetries = 0; numRetries < maxRetries; numRetries++) {
@@ -402,11 +474,17 @@ public TopicPartitionOffsetResult handleMessage(Ticket ticket) {
requestContext.setInNotificationProcessing(true);
requestContext.setCreateShellEntityForNonExistingReference(createShellEntityForNonExistingReference);
+ LOG.debug("Parallel Processing-{}: Starting attempt={}/{} for message type={}, topic={}, offset={}, msgCreationTime={}",
+ threadName, numRetries + 1, maxRetries, message.getType().name(),
+ kafkaMsg.getTopic(), kafkaMsg.getOffset(), kafkaMsg.getMsgCreated());
+
switch (message.getType()) {
case ENTITY_CREATE: {
final HookNotificationV1.EntityCreateRequest createRequest = (HookNotificationV1.EntityCreateRequest) message;
final AtlasEntity.AtlasEntitiesWithExtInfo entities = instanceConverter.toAtlasEntities(createRequest.getEntities());
+ requestContext.setCreateEventMsgTime(kafkaMsg.getMsgCreated());
+
if (auditLog == null) {
auditLog = new AuditFilter.AuditLog(messageUser, THREADNAME_PREFIX,
AtlasClient.API_V1.CREATE_ENTITY.getMethod(),
@@ -441,6 +519,8 @@ public TopicPartitionOffsetResult handleMessage(Ticket ticket) {
case ENTITY_DELETE: {
final HookNotificationV1.EntityDeleteRequest deleteRequest = (HookNotificationV1.EntityDeleteRequest) message;
+ requestContext.setDeleteEventMsgTime(kafkaMsg.getMsgCreated());
+
if (auditLog == null) {
auditLog = new AuditFilter.AuditLog(messageUser, THREADNAME_PREFIX,
AtlasClientV2.API_V2.DELETE_ENTITY_BY_ATTRIBUTE.getMethod(),
@@ -478,6 +558,8 @@ public TopicPartitionOffsetResult handleMessage(Ticket ticket) {
final EntityCreateRequestV2 createRequestV2 = (EntityCreateRequestV2) message;
final AtlasEntitiesWithExtInfo entities = createRequestV2.getEntities();
+ requestContext.setCreateEventMsgTime(kafkaMsg.getMsgCreated());
+
if (auditLog == null) {
auditLog = new AuditFilter.AuditLog(messageUser, THREADNAME_PREFIX,
AtlasClientV2.API_V2.CREATE_ENTITY.getMethod(),
@@ -523,6 +605,8 @@ public TopicPartitionOffsetResult handleMessage(Ticket ticket) {
final EntityDeleteRequestV2 deleteRequest = (EntityDeleteRequestV2) message;
final List entities = deleteRequest.getEntities();
+ requestContext.setDeleteEventMsgTime(kafkaMsg.getMsgCreated());
+
try {
for (AtlasObjectId entity : entities) {
if (auditLog == null) {
@@ -596,61 +680,66 @@ public TopicPartitionOffsetResult handleMessage(Ticket ticket) {
RequestContext.get().resetEntityGuidUpdates();
exceptionClassName = e.getClass().getSimpleName();
- // don't retry in following conditions:
- // 1. number of retry attempts reached configured count
- // 2. notification processing failed due to invalid data (non-existing type, entity, ..)
- boolean maxRetriesReached = numRetries == (maxRetries - 1);
- AtlasErrorCode errorCode = (e instanceof AtlasBaseException) ? ((AtlasBaseException) e).getAtlasErrorCode() : null;
- boolean unrecoverableFailure = errorCode != null && (Response.Status.NOT_FOUND.equals(errorCode.getHttpCode()) || Response.Status.BAD_REQUEST.equals(errorCode.getHttpCode()));
+ // Check if exception is retryable
+ boolean isRetryable = isRetryableException(e);
+
+ if (!isRetryable) {
+ // Non-retryable exceptions: fail fast and move to next message
+ if (e instanceof InterruptedException) {
+ LOG.error("Interrupted!", e);
+ return null;
+ } else if (e instanceof org.apache.atlas.repository.graphdb.AtlasSchemaViolationException) {
+ LOG.warn("{} - {}: Non-retryable {}: Skipping message: {}",
+ kafkaMsg.getTopicPartition().toString(), kafkaMsg.getOffset(), exceptionClassName, e.getMessage());
+ return new TopicPartitionOffsetResult(kafkaMsg.getTopicPartition(), kafkaMsg.getOffset());
+ } else if (e instanceof AtlasBaseException) {
+ AtlasBaseException baseException = (AtlasBaseException) e;
+ LOG.warn("Error handling message: {}: {} - {} - {}",
+ ticket.getMessage().getMessage().getType(), baseException.getAtlasErrorCode(), baseException.getMessage(), ticket.getQualifiedNamesSet());
+ return new TopicPartitionOffsetResult(kafkaMsg.getTopicPartition(), kafkaMsg.getOffset());
+ } else {
+ LOG.error("Non-retryable exception: {}", exceptionClassName, e);
+ return new TopicPartitionOffsetResult(kafkaMsg.getTopicPartition(), kafkaMsg.getOffset());
+ }
+ }
- if (maxRetriesReached || unrecoverableFailure) {
- try {
- String strMessage = AbstractNotification.getMessageJson(message);
+ // Retryable exceptions: check if max retries exceeded
+ if (numRetries == (maxRetries - 1)) {
+ String strMessage = AbstractNotification.getMessageJson(message);
- if (unrecoverableFailure) {
- LOG.warn("Unrecoverable failure while processing message {}", strMessage, e);
- } else {
- LOG.warn("Max retries exceeded for message {}", strMessage, e);
- }
+ LOG.warn("Offset: {}: Max retries: {} exceeded for message {}", kafkaMsg.getOffset(), maxRetries, strMessage, e);
- stats.isFailedMsg = true;
+ stats.isFailedMsg = true;
- failedMessages.add(strMessage);
+ failedMessages.add(strMessage);
- if (failedMessages.size() >= failedMsgCacheSize) {
- recordFailedMessages(failedMessages);
- }
- } catch (Throwable t) {
- LOG.warn("error while recording failed message: type={}, topic={}, partition={}, offset={}",
- message.getType(), kafkaMsg.getTopic(), kafkaMsg.getPartition(), kafkaMsg.getOffset(), t);
+ if (failedMessages.size() >= failedMsgCacheSize) {
+ recordFailedMessages(kafkaMsg.getTopic(), failedMessages);
}
return new TopicPartitionOffsetResult(kafkaMsg.getTopicPartition(), kafkaMsg.getOffset());
- } else if (e instanceof InterruptedException) {
- LOG.error("Interrupted!", e);
- return null;
- } else if (e instanceof org.apache.atlas.repository.graphdb.AtlasSchemaViolationException) {
- LOG.warn("{}: Continuing: {}", exceptionClassName, e.getMessage());
- return new TopicPartitionOffsetResult(kafkaMsg.getTopicPartition(), kafkaMsg.getOffset());
- } else if (exceptionClassName.equals(EXCEPTION_CLASS_NAME_JANUSGRAPH_EXCEPTION)
- || exceptionClassName.equals(EXCEPTION_CLASS_NAME_PERMANENTLOCKING_EXCEPTION)) {
- LOG.warn("{}: Offset: {}: Pausing & retry: Try: {}: Pause: {} ms. {}",
- exceptionClassName, kafkaMsg.getOffset(), numRetries, adaptiveWaiter.getWaitDuration(), e.getMessage());
-
- adaptiveWaiter.pause((Exception) e);
- } else if (e instanceof java.lang.IllegalStateException || e instanceof NullPointerException) {
- return null;
- } else {
- LOG.warn("Error handling message", e);
+ }
- try {
- LOG.info("Sleeping for {} ms before retry", consumerRetryInterval);
+ if (isTransactionRelatedError(e)) {
+ // Retryable exceptions: perform rollback, clear cache, and retry
+ LOG.warn("{}: Offset: {}: Retryable exception: Try: {}/{}: Pause: {} ms. {}",
+ exceptionClassName, kafkaMsg.getOffset(), numRetries + 1, maxRetries, adaptiveWaiter.getWaitDuration(), e.getMessage());
- Thread.sleep(consumerRetryInterval);
- } catch (InterruptedException ie) {
- LOG.error("Notification consumer thread sleep interrupted");
+ try {
+ LOG.info("Parallel Processing-{}: Rolling back failed transaction (attempt {}/{}) to enable fresh retry",
+ threadName, numRetries + 1, maxRetries);
+ AtlasGraphProvider.getGraphInstance().rollback();
+ } catch (Exception rollbackEx) {
+ LOG.warn("Parallel Processing-{}: Error during transaction rollback: {}",
+ threadName, rollbackEx.getMessage());
}
+ } else {
+ LOG.warn("Error handling message", e);
+ LOG.info("Sleeping for {} ms before retry", adaptiveWaiter.getWaitDuration());
}
+
+ // Adaptive wait before retry
+ adaptiveWaiter.pause((Exception) e);
} finally {
RequestContext.clear();
}
@@ -1160,10 +1249,10 @@ private Authentication getAuthenticationForUser(String userName) {
return ret;
}
- private void recordFailedMessages(List failedMessages) {
+ private void recordFailedMessages(String topic, List failedMessages) {
//logging failed messages
for (String message : failedMessages) {
- failedMessageLog.error("[DROPPED_NOTIFICATION] {}", message);
+ failedMessageLog.error("[{}-DROPPED_NOTIFICATION] {}", topic, message);
}
failedMessages.clear();
diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/NotificationPreProcessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/NotificationPreProcessor.java
new file mode 100644
index 00000000000..47509da8ac7
--- /dev/null
+++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/NotificationPreProcessor.java
@@ -0,0 +1,1814 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.preprocessor;
+
+import org.apache.atlas.AtlasConfiguration;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.kafka.AtlasKafkaMessage;
+import org.apache.atlas.kafka.NotificationProvider;
+import org.apache.atlas.listener.ChangedTypeDefs;
+import org.apache.atlas.listener.TypeDefChangeListener;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2;
+import org.apache.atlas.model.notification.HookNotification.EntityDeleteRequestV2;
+import org.apache.atlas.model.notification.HookNotification.EntityPartialUpdateRequestV2;
+import org.apache.atlas.model.notification.HookNotification.EntityUpdateRequestV2;
+import org.apache.atlas.model.notification.MessageSource;
+import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
+import org.apache.atlas.notification.AbstractNotification;
+import org.apache.atlas.notification.NotificationEntityProcessor;
+import org.apache.atlas.notification.NotificationException;
+import org.apache.atlas.notification.NotificationInterface;
+import org.apache.atlas.notification.QualifiedNameRouter;
+import org.apache.atlas.notification.TopicPartitionOffsetResult;
+import org.apache.atlas.notification.pc.Ticket;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.util.AtlasMetricsUtil;
+import org.apache.atlas.util.AtlasMetricsUtil.NotificationProcessorStats;
+import org.apache.atlas.utils.AtlasPerfTracer;
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityDeleteRequest;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityPartialUpdateRequest;
+import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityUpdateRequest;
+import org.apache.commons.configuration2.Configuration;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static org.apache.atlas.notification.NotificationHookConsumer.CONSUMER_FAILEDCACHESIZE_PROPERTY;
+import static org.apache.atlas.notification.NotificationHookConsumer.CONSUMER_RETRIES_PROPERTY;
+import static org.apache.atlas.notification.preprocessor.EntityPreprocessor.getQualifiedName;
+
+public class NotificationPreProcessor implements NotificationEntityProcessor, TypeDefChangeListener {
+ private static final Logger LOG = LoggerFactory.getLogger("NOTIFICATION_PROCESSOR");
+ private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger(NotificationPreProcessor.class);
+
+ private final int metadataTopicCount;
+ private final int lineageTopicCount;
+ private final int maxRetries;
+ private final boolean lineageTopicEnabled;
+ private final Configuration configuration;
+ private final NotificationInterface notificationInterface;
+ private final QualifiedNameRouter metadataRouter;
+ private final QualifiedNameRouter lineageRouter;
+ private final AtlasMetricsUtil metricsUtil;
+ private final AtlasTypeRegistry typeRegistry;
+ private final List failedMessages;
+ private final int failedMsgCacheSize;
+ private final Map processTypeCache = new ConcurrentHashMap<>();
+ private Set allProcessTypes;
+ private Logger failedMessageLog;
+ private static final java.util.regex.Pattern GUID_PATTERN = java.util.regex.Pattern.compile("^[a-fA-F0-9-]{36}$");
+ public static final String ATTRIBUTE_GUID = "guid";
+ public static final String ATTRIBUTE_NAME = "name";
+ public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
+ /**
+ * Maximum size of ThreadLocal rename cache per thread.
+ * Uses LRU eviction policy - when cache exceeds this size, oldest entries are automatically removed.
+ */
+ private static final int MAX_RENAME_CACHE_SIZE = 1000;
+
+ // ThreadLocal rename routing map - each thread maintains its own rename chain mappings
+ private static final ThreadLocal