[feature] [client] Add two compnents to support imprecise priority messages#20
Open
poorbarcode wants to merge 3 commits into
Open
[feature] [client] Add two compnents to support imprecise priority messages#20poorbarcode wants to merge 3 commits into
poorbarcode wants to merge 3 commits into
Conversation
Comment on lines
+109
to
+126
| public void onAcknowledge(Consumer<T> consumer, MessageId messageId, Throwable exception) { | ||
| triggerPauseOrResume(); | ||
| } | ||
|
|
||
| @Override | ||
| public void onAcknowledgeCumulative(Consumer<T> consumer, MessageId messageId, Throwable exception) { | ||
| triggerPauseOrResume(); | ||
| } | ||
|
|
||
| @Override | ||
| public void onNegativeAcksSend(Consumer<T> consumer, Set<MessageId> messageIds) { | ||
| triggerPauseOrResume(); | ||
| } | ||
|
|
||
| @Override | ||
| public void onAckTimeoutSend(Consumer<T> consumer, Set<MessageId> messageIds) { | ||
| triggerPauseOrResume(); | ||
| } |
There was a problem hiding this comment.
For this solution, if users won't call ack or nack for many messages. The priority delivery will not be applied, right?
Member
Author
There was a problem hiding this comment.
For this solution, if users won't call ack or nack for many messages. The priority delivery will not be applied, right?
Yes. We can explain this scenario in other words: the messages received in the first round will not be prioritized.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
Requirement where some messages are classified in to high priority and some messages are classified as low priority. They experience very high traffic on low priority messages compare to high priority messages. But we want their consumers to process the high priority messages first than lower priority messages as high priority messages are time sensitive.
Modifications
Note:
Key_Sharedis not supported.We define multiple partitions of a topic as high-priority and low-priority partitions, which can be defined by
PriorityDefinition. And Each message is marked with a priority by propertyMSG_PROP_PRIORITYwhen it is sent.The new component
PriorityMessageRouterwill send the message to the specified partition according to the rules defined inPriorityDefinition. It will calculate the priority by the propertyMSG_PROP_PRIORITYof each message, and if there has no property in the message, it will use the default priority.The new component
PriorityConsumerInterceptorwill collect the consumers created by the client for each partition when receiving messages, calculate the priority of the message through the propertyMSG_PROP_PRIORITY, then marks the priority of each consumer through its first message, and pause the low priority consumer to make the high priority message to be processed faster.Here's how it works: If the high-priority partition has many messages, it suspends receiving messages from the low-priority partition. If some messages with low priority have already been received in the memory, they will be consumed as
high-level priority.
Note: If the priority of a partition is changed, the change takes effect only client restarts after the old messages are consumed.
Sample: see
PriorityMessageSample.Documentation
Need to update docs?
doc-requiredno-need-docdoc