-
Notifications
You must be signed in to change notification settings - Fork 49
INTERNAL: Do not cancel operations when node is removed from ZK but still alive. #749
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weโll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -297,6 +297,9 @@ public void handleIO() throws IOException { | |
| } | ||
| } | ||
|
|
||
| // Deal with the memcached nodes that removed from ZK but has operation in queue. | ||
| handleDelayedClosingNodes(); | ||
|
|
||
| // Deal with the memcached server group that's been added by CacheManager. | ||
| handleCacheNodesChange(); | ||
|
|
||
|
|
@@ -323,12 +326,18 @@ private void handleNodesToRemove(final List<MemcachedNode> nodesToRemove) { | |
| } | ||
| /* ENABLE_MIGRATION end */ | ||
|
|
||
| if (node.isActive()) { | ||
oliviarla marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // if a memcached node is removed from ZK but can still serve operations, do NOT cancel it. | ||
| // operations that remain in operation queue will be processed until connection is lost. | ||
| // once all remaining operations are processed, client will close connection. | ||
| // if connection is lost before remaining operations are processed, | ||
| // all of them will be canceled after connection is lost. | ||
| continue; | ||
| } | ||
|
|
||
| // removing node is not related to failure mode. | ||
| // so, cancel operations regardless of failure mode. | ||
| String cause = "node removed."; | ||
| cancelOperations(node.destroyReadQueue(false), cause); | ||
| cancelOperations(node.destroyWriteQueue(false), cause); | ||
| cancelOperations(node.destroyInputQueue(), cause); | ||
oliviarla marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| cancelAllOperations(node, "node removed."); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -706,6 +715,39 @@ public void complete() { | |
| getLogger().debug("Added %s to writeQ of %s", op, node); | ||
| } | ||
|
|
||
| // Handle the memcached nodes that removed from ZK but has operation in queue. | ||
| void handleDelayedClosingNodes() { | ||
| Collection<MemcachedNode> closingNodes = locator.getDelayedClosingNodes(); | ||
| if (closingNodes.isEmpty()) { | ||
| return; | ||
| } | ||
|
|
||
| Collection<MemcachedNode> closedNodes = new HashSet<>(); | ||
| for (MemcachedNode node : closingNodes) { | ||
| boolean isConnected = node.isConnected(); | ||
| boolean hasOp = node.hasOp(); | ||
|
|
||
| if (isConnected && !hasOp) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hasOp์ฌ๋ closeํด์ผ ํ์ง ์๋์?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ์ฐ๊ฒฐ์ด ์ด์ ์๊ณ , Op ํ์ ์ฐ์ฐ์ด ๋จ์ ์์ผ๋ฉด ๊ณ์ ์ฒ๋ฆฌํ๋ ค๊ณ ํฉ๋๋ค. |
||
| try { | ||
| node.closeChannel(); | ||
| } catch (IOException e) { | ||
| getLogger().error("Failed to closeChannel the node : " + node); | ||
oliviarla marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| } else if (!isConnected && hasOp) { | ||
| cancelAllOperations(node, "connection lost after node removed."); | ||
| } else { | ||
| addedQueue.offer(node); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ์ฌ๊ธฐ๋ ์ด๋ค ๊ฒฝ์ฐ์ด๊ณ ์ addedQueue์ ๋ค์ ๋ฃ๋์ง ๊ถ๊ธํฉ๋๋ค.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ์ฐ๊ฒฐ์ด ์์ง ์ด์ ์๊ณ , Op ํ์ ์ฐ์ฐ์ด ๋จ์ ์์ต๋๋ค. |
||
| continue; | ||
| } | ||
|
|
||
| closedNodes.add(node); | ||
| } | ||
|
|
||
| if (!closedNodes.isEmpty()) { | ||
| locator.removeDelayedClosingNodes(closedNodes); | ||
| } | ||
| } | ||
|
|
||
| // Handle the memcached server group that's been added by CacheManager. | ||
| void handleCacheNodesChange() throws IOException { | ||
| /* ENABLE_MIGRATION if */ | ||
|
|
@@ -1279,6 +1321,12 @@ private void cancelOperations(Collection<Operation> ops, String cause) { | |
| } | ||
| } | ||
|
|
||
| private void cancelAllOperations(MemcachedNode node, String cause) { | ||
| cancelOperations(node.destroyReadQueue(false), cause); | ||
| cancelOperations(node.destroyWriteQueue(false), cause); | ||
| cancelOperations(node.destroyInputQueue(), cause); | ||
| } | ||
|
|
||
| private void redistributeOperations(Collection<Operation> ops, String cause) { | ||
| for (Operation op : ops) { | ||
| if (op instanceof KeyedOperation) { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.