Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,10 @@ public final class Constants {
public static final String PROVENANCE_TYPE_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "provenanceType");
public static final String TIMESTAMP_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "timestamp");
public static final String ENTITY_DELETED_TIMESTAMP_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "entityDeletedTimestamp");
public static final String MODIFICATION_TIMESTAMP_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "modificationTimestamp");
public static final String IS_INCOMPLETE_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "isIncomplete");
public static final String MODIFICATION_TIMESTAMP_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "modificationTimestamp");
public static final String ENTITY_DELETE_EVENT_TIME_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "entityDeleteEventTime");
public static final String ENTITY_CREATE_EVENT_TIME_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "entityCreateEventTime");
public static final String IS_INCOMPLETE_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "isIncomplete");
/**
* search backing index name.
*/
Expand Down
18 changes: 18 additions & 0 deletions distro/src/conf/atlas-logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,19 @@
</rollingPolicy>
</appender>

<appender name="NOTIFICATION_PROCESSOR" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${atlas.log.dir}/notification_processor.log</file>
<append>true</append>
<encoder>
<pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern>
</encoder>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${atlas.log.dir}/notification_processor-%d.log</fileNamePattern>
<maxHistory>30</maxHistory>
<cleanHistoryOnStart>false</cleanHistoryOnStart>
</rollingPolicy>
</appender>

<!-- Uncomment the following for perf logs -->
<!--
<appender name="perf_appender" class="ch.qos.logback.core.rolling.RollingFileAppender">
Expand Down Expand Up @@ -171,6 +184,11 @@
<appender-ref ref="TASKS"/>
</logger>

<!-- Dedicated logger for Notification Processors -->
<logger name="NOTIFICATION_PROCESSOR" additivity="false" level="info">
<appender-ref ref="NOTIFICATION_PROCESSOR"/>
</logger>

<root level="warn">
<appender-ref ref="FILE"/>
</root>
Expand Down
6 changes: 5 additions & 1 deletion intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,12 @@ public enum AtlasConfiguration {
TASKS_USE_ENABLED("atlas.tasks.enabled", true),
SESSION_TIMEOUT_SECS("atlas.session.timeout.secs", -1),
UPDATE_COMPOSITE_INDEX_STATUS("atlas.update.composite.index.status", true),
NOTIFICATION_CONCURRENT_PROCESSING("atlas.notifications.concurrent", false),
METRICS_TIME_TO_LIVE_HOURS("atlas.metrics.ttl.hours", 336), // 14 days default
NOTIFICATION_CONCURRENT_PROCESSING("atlas.notifications.concurrent", false),
ATLAS_PARALLEL_PROCESSING_ENABLED("atlas.notification.parallel.processing.enabled", false),
ATLAS_METADATA_TOPIC_PREFIX("atlas.notification.metadata.topic.prefix", "ATLAS_METADATA_"),
ATLAS_LINEAGE_TOPIC_PREFIX("atlas.notification.lineage.topic.prefix", "ATLAS_LINEAGE_"),
ATLAS_PARALLEL_PROCESSING_INPUT_TOPICS("atlas.notification.parallel.processing.input.topics", "ATLAS_HOOK"),
SOLR_INDEX_TX_LOG_TTL_CONF("write.ahead.log.ttl.in.hours", 240), //10 days default

ATLAS_AUDIT_AGING_ENABLED("atlas.audit.aging.enabled", false),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@
public class AtlasMetrics implements Serializable {
private static final long serialVersionUID = 1L;

public static final String PREFIX_CONNECTION_STATUS = "ConnectionStatus:";
public static final String PREFIX_NOTIFICATION = "Notification:";
public static final String PREFIX_SERVER = "Server:";
public static final String PREFIX_CONNECTION_STATUS = "ConnectionStatus:";
public static final String PREFIX_NOTIFICATION = "Notification:";
public static final String PREFIX_SERVER = "Server:";
public static final String PREFIX_NOTIFICATION_PROCESSOR = "NotificationProcessor:";

public static final String STAT_NOTIFY_COUNT_CURR_DAY = PREFIX_NOTIFICATION + "currentDay";
public static final String STAT_NOTIFY_AVG_TIME_CURR_DAY = PREFIX_NOTIFICATION + "currentDayAvgTime";
Expand Down Expand Up @@ -82,6 +83,17 @@ public class AtlasMetrics implements Serializable {
public static final String STAT_SERVER_STATUS_INDEX_STORE = PREFIX_SERVER + "statusIndexStore";
public static final String STAT_SERVER_UP_TIME = PREFIX_SERVER + "upTime";

public static final String STAT_TOTAL_MESSAGES_CONSUMED_TOTAL = PREFIX_NOTIFICATION_PROCESSOR + "messagesConsumedTotal";
public static final String STAT_TOTAL_MESSAGES_PROCESSED_TOTAL = PREFIX_NOTIFICATION_PROCESSOR + "messagesProcessedTotal";
public static final String STAT_TOTAL_MESSAGES_FAILED_TOTAL = PREFIX_NOTIFICATION_PROCESSOR + "messagesFailedTotal";
public static final String STAT_AVG_PROCESSING_TIME_TOTAL = PREFIX_NOTIFICATION_PROCESSOR + "averageTimeTotal";
public static final String STAT_GLOBAL_ENTITY_TYPE_COUNTS_TOTAL = PREFIX_NOTIFICATION_PROCESSOR + "entityTypeCounts";
public static final String STAT_GLOBAL_FAILED_ENTITY_TYPE_COUNTS_TOTAL = PREFIX_NOTIFICATION_PROCESSOR + "failedEntityTypeCounts";
public static final String STAT_MESSAGES_PRODUCED_BY_TOPIC = PREFIX_NOTIFICATION_PROCESSOR + "messagesPublishedByTopic";
public static final String STAT_MESSAGES_FAILED_BY_TOPIC = PREFIX_NOTIFICATION_PROCESSOR + "messagesFailedByTopic";
public static final String STAT_INPUT_TOPIC_STATS = PREFIX_NOTIFICATION_PROCESSOR + "inputTopicStats";
public static final String STAT_OUTPUT_TOPIC_STATS = PREFIX_NOTIFICATION_PROCESSOR + "outputTopicStats";

private Map<String, Map<String, Object>> data;

public AtlasMetrics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ public class KafkaNotification extends AbstractNotification implements Service {

protected static final String CONSUMER_GROUP_ID_PROPERTY = "group.id";

private static final String[] ATLAS_HOOK_CONSUMER_TOPICS = AtlasConfiguration.NOTIFICATION_HOOK_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_HOOK_TOPIC);
private static final String[] ATLAS_ENTITIES_CONSUMER_TOPICS = AtlasConfiguration.NOTIFICATION_ENTITIES_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_ENTITIES_TOPIC);
private static final String[] ATLAS_HOOK_CONSUMER_TOPICS = AtlasConfiguration.NOTIFICATION_HOOK_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_HOOK_TOPIC);
private static final String[] ATLAS_ENTITIES_CONSUMER_TOPICS = AtlasConfiguration.NOTIFICATION_ENTITIES_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_ENTITIES_TOPIC);
private static final String[] ATLAS_PARALLEL_PROCESSING_INPUT_TOPICS = AtlasConfiguration.ATLAS_PARALLEL_PROCESSING_INPUT_TOPICS.getStringArray(ATLAS_HOOK_TOPIC);
private static final String DEFAULT_CONSUMER_CLOSED_ERROR_MESSAGE = "This consumer has already been closed.";

private static final Map<NotificationType, String> PRODUCER_TOPIC_MAP = new HashMap<>();
Expand Down Expand Up @@ -543,5 +544,6 @@ public String getMessage() {
CONSUMER_TOPICS_MAP.put(NotificationType.HOOK, trimAndPurge(ATLAS_HOOK_CONSUMER_TOPICS));
CONSUMER_TOPICS_MAP.put(NotificationType.HOOK_UNSORTED, trimAndPurge(ATLAS_HOOK_UNSORTED_CONSUMER_TOPICS));
CONSUMER_TOPICS_MAP.put(NotificationType.ENTITIES, trimAndPurge(ATLAS_ENTITIES_CONSUMER_TOPICS));
CONSUMER_TOPICS_MAP.put(NotificationType.HOOK_PREPROCESS, trimAndPurge(ATLAS_PARALLEL_PROCESSING_INPUT_TOPICS));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,11 @@ public static String getMessageJson(Object message) {
* @param source
* @return the message as a JSON string
*/
public static void createNotificationMessages(Object message, List<String> msgJsonList, MessageSource source) {
public static void createNotificationMessages(Object message, List<String> msgJsonList, MessageSource source, long msgCreationTime) {
AtlasNotificationMessage<?> notificationMsg = new AtlasNotificationMessage<>(CURRENT_MESSAGE_VERSION, message, getHostAddress(), getCurrentUser(), false, source);
if (msgCreationTime > 0) {
notificationMsg.setMsgCreationTime(msgCreationTime);
}
String msgJson = AtlasType.toV1Json(notificationMsg);

boolean msgLengthExceedsLimit = (msgJson.length() * MAX_BYTES_PER_CHAR) > MESSAGE_MAX_LENGTH_BYTES;
Expand Down Expand Up @@ -162,6 +165,10 @@ public static void createNotificationMessages(Object message, List<String> msgJs
}
}

public static void createNotificationMessages(Object message, List<String> msgJsonList, MessageSource source) {
createNotificationMessages(message, msgJsonList, source, 0);
}

@Override
public void init(String source, Object failedMessagesLogger) {
}
Expand Down Expand Up @@ -245,6 +252,17 @@ public <T> void send(String topic, List<T> messages, MessageSource source) throw
sendInternal(topic, strMessages);
}

@Override
public <T> void send(String topic, List<T> messages, MessageSource source, long msgCreationTime) throws NotificationException {
List<String> strMessages = new ArrayList<>(messages.size());

for (T message : messages) {
createNotificationMessages(message, strMessages, source, msgCreationTime);
}

sendInternal(topic, strMessages);
}

private static String getNextMessageId() {
String nextMsgIdPrefix = msgIdPrefix;
int nextMsgIdSuffix = msgIdSuffix.getAndIncrement();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,13 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
private final AtomicLong messageCountTotal = new AtomicLong(0);
private final AtomicLong messageCountSinceLastInterval = new AtomicLong(0);
private long splitMessagesLastPurgeTime = System.currentTimeMillis();
private long msgCreated;
private boolean spooled;
private String source;

// Thread-local storage for message metadata to support concurrent deserialization
// These values are set during deserialize() and read by getMsgCreated(), getSpooled(), etc.
private final ThreadLocal<Long> msgCreated = ThreadLocal.withInitial(() -> 0L);
private final ThreadLocal<Boolean> spooled = ThreadLocal.withInitial(() -> Boolean.FALSE);
private final ThreadLocal<String> source = ThreadLocal.withInitial(() -> null);
private final ThreadLocal<String> sourceVersion = ThreadLocal.withInitial(() -> null);

// ----- Constructors ----------------------------------------------------
/**
Expand Down Expand Up @@ -127,15 +131,19 @@ public TypeReference<AtlasNotificationMessage<T>> getNotificationMessageType() {

// ----- MessageDeserializer ---------------------------------------------
public long getMsgCreated() {
return this.msgCreated;
return this.msgCreated.get();
}

public boolean getSpooled() {
return this.spooled;
return this.spooled.get();
}

public String getSource() {
return this.source;
return this.source.get();
}

public String getSourceVersion() {
return this.sourceVersion.get();
}

@Override
Expand All @@ -144,19 +152,20 @@ public T deserialize(String messageJson) {

messageCountTotal.incrementAndGet();
messageCountSinceLastInterval.incrementAndGet();

this.msgCreated = 0;
this.spooled = false;
this.source = null;
this.msgCreated.set(0L);
this.spooled.set(false);
this.source.set(null);
this.sourceVersion.set(null);

AtlasNotificationBaseMessage msg = AtlasType.fromV1Json(messageJson, AtlasNotificationMessage.class);

if (msg == null || msg.getVersion() == null) { // older style messages not wrapped with AtlasNotificationMessage
ret = AtlasType.fromV1Json(messageJson, messageType);
} else {
this.msgCreated = ((AtlasNotificationMessage) msg).getMsgCreationTime();
this.spooled = ((AtlasNotificationMessage) msg).getSpooled();
this.source = msg.getSource() != null ? msg.getSource().getSource() : null;
} else {
this.msgCreated.set(((AtlasNotificationMessage) msg).getMsgCreationTime());
this.spooled.set(((AtlasNotificationMessage) msg).getSpooled());
this.source.set(msg.getSource() != null ? msg.getSource().getSource() : null);
this.sourceVersion.set(msg.getSource() != null ? msg.getSource().getVersion() : null);

String msgJson = messageJson;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ public interface NotificationInterface {
*/
default <T> void send(String topic, List<T> messages, MessageSource source) throws NotificationException {}

default <T> void send(String topic, List<T> messages, MessageSource source, long msgCreationTime) throws NotificationException {}

/**
* Associates the specified topic with the given notification type.
*
Expand Down Expand Up @@ -156,6 +158,9 @@ enum NotificationType {
// Notifications to entity change consumers.
ENTITIES(new EntityMessageDeserializer()),

// Notification to pre-process hook messages before sending to parallel topics
HOOK_PREPROCESS(new HookMessageDeserializer()),

// Notifications from Atlas async importer
ASYNC_IMPORT(new HookMessageDeserializer());

Expand Down
Loading
Loading