-
Notifications
You must be signed in to change notification settings - Fork 1k
fix(config_center/nacos): remove unused context.WithCancel and call CancelListenConfig on last listener removal #3441
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
bd46331
f5feb16
9cefb40
8787a89
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,7 +18,6 @@ | |
| package nacos | ||
|
|
||
| import ( | ||
| "context" | ||
| "sync" | ||
| ) | ||
|
|
||
|
|
@@ -37,29 +36,75 @@ import ( | |
| "dubbo.apache.org/dubbo-go/v3/remoting" | ||
| ) | ||
|
|
||
| func callback(listenersMap *sync.Map, _, group, dataId, data string) { | ||
| listenersMap.Range(func(key, value any) bool { | ||
| key.(config_center.ConfigurationListener).Process(&config_center.ConfigChangeEvent{Key: dataId, Value: data, ConfigType: remoting.EventTypeUpdate}) | ||
| // keyListenerSet holds the listeners for a single config key. | ||
| // The mutex protects the listeners map so that add/remove/check-empty | ||
| // operations are atomic, preventing races between concurrent addListener | ||
| // and removeListener calls for the same key. | ||
| type keyListenerSet struct { | ||
| mu sync.Mutex | ||
| listeners map[config_center.ConfigurationListener]struct{} | ||
| group string // resolved group used to register with nacos, stored for consistent cancel | ||
| } | ||
|
|
||
| func newKeyListenerSet(group string) *keyListenerSet { | ||
| return &keyListenerSet{ | ||
| listeners: make(map[config_center.ConfigurationListener]struct{}), | ||
| group: group, | ||
| } | ||
| } | ||
|
|
||
| func (s *keyListenerSet) add(listener config_center.ConfigurationListener) { | ||
| s.mu.Lock() | ||
| s.listeners[listener] = struct{}{} | ||
| s.mu.Unlock() | ||
| } | ||
|
|
||
| // remove removes a listener and reports whether the set is now empty. | ||
| // The caller must NOT rely on a non-empty result to skip CancelListenConfig, | ||
| // because a concurrent add could re-populate the set after this call returns. | ||
| func (s *keyListenerSet) remove(listener config_center.ConfigurationListener) bool { | ||
| s.mu.Lock() | ||
| delete(s.listeners, listener) | ||
| empty := len(s.listeners) == 0 | ||
| s.mu.Unlock() | ||
| return empty | ||
| } | ||
|
|
||
| // snapshot returns a snapshot of the current listeners for safe iteration. | ||
| func (s *keyListenerSet) snapshot() []config_center.ConfigurationListener { | ||
| s.mu.Lock() | ||
| snapshot := make([]config_center.ConfigurationListener, 0, len(s.listeners)) | ||
| for l := range s.listeners { | ||
| snapshot = append(snapshot, l) | ||
| } | ||
| s.mu.Unlock() | ||
| return snapshot | ||
| } | ||
|
|
||
| func callback(set *keyListenerSet, _, group, dataId, data string) { | ||
| for _, l := range set.snapshot() { | ||
| l.Process(&config_center.ConfigChangeEvent{Key: dataId, Value: data, ConfigType: remoting.EventTypeUpdate}) | ||
| metrics.Publish(metricsConfigCenter.NewIncMetricEvent(dataId, group, remoting.EventTypeUpdate, metricsConfigCenter.Nacos)) | ||
| return true | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| func (n *nacosDynamicConfiguration) addListener(key string, listener config_center.ConfigurationListener) { | ||
| rawListenersMap, loaded := n.keyListeners.Load(key) | ||
| group := n.resolvedGroup(n.url.GetParam(constant.NacosGroupKey, constant2.DEFAULT_GROUP)) | ||
|
|
||
| rawSet, loaded := n.keyListeners.Load(key) | ||
| if !loaded { | ||
| _, cancel := context.WithCancel(context.Background()) | ||
| listenersMap := &sync.Map{} | ||
| listenersMap.Store(listener, cancel) | ||
| set := newKeyListenerSet(group) | ||
| set.add(listener) | ||
|
|
||
| // double load for invalid race | ||
| rawListenersMap, loaded = n.keyListeners.LoadOrStore(key, listenersMap) | ||
| var actual any | ||
| actual, loaded = n.keyListeners.LoadOrStore(key, set) | ||
| if !loaded { | ||
| err := n.client.Client().ListenConfig(vo.ConfigParam{ | ||
| DataId: key, | ||
| Group: n.resolvedGroup(n.url.GetParam(constant.NacosGroupKey, constant2.DEFAULT_GROUP)), | ||
| Group: group, | ||
| OnChange: func(namespace, group, dataId, data string) { | ||
| go callback(listenersMap, namespace, group, dataId, data) | ||
| go callback(set, namespace, group, dataId, data) | ||
| }, | ||
| }) | ||
| if err != nil { | ||
|
|
@@ -69,18 +114,32 @@ func (n *nacosDynamicConfiguration) addListener(key string, listener config_cent | |
| } | ||
| return | ||
| } | ||
| rawSet = actual | ||
| } | ||
| _, cancel := context.WithCancel(context.Background()) | ||
| listenersMap := rawListenersMap.(*sync.Map) | ||
| listenersMap.Store(listener, cancel) | ||
| rawSet.(*keyListenerSet).add(listener) | ||
| } | ||
|
|
||
| func (n *nacosDynamicConfiguration) removeListener(key string, listener config_center.ConfigurationListener) { | ||
| rawListenersMap, loaded := n.keyListeners.Load(key) | ||
| rawSet, loaded := n.keyListeners.Load(key) | ||
| if !loaded { | ||
| logger.Errorf("[ConfigCenter][Nacos] key is not be listened, key=%s", key) | ||
| } else { | ||
| listenersMap := rawListenersMap.(*sync.Map) | ||
| listenersMap.Delete(listener) | ||
| return | ||
| } | ||
| set := rawSet.(*keyListenerSet) | ||
| isEmpty := set.remove(listener) | ||
|
|
||
| if isEmpty { | ||
| // Delete from keyListeners first to prevent new addListener from | ||
| // finding a stale set after we cancel the nacos subscription. | ||
| n.keyListeners.Delete(key) | ||
| if n.client != nil { | ||
| err := n.client.Client().CancelListenConfig(vo.ConfigParam{ | ||
| DataId: key, | ||
| Group: set.group, | ||
| }) | ||
|
Comment on lines
+136
to
+139
Comment on lines
+136
to
+139
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Valid concern. Fixed in the latest commit — the resolved |
||
| if err != nil { | ||
| logger.Errorf("[ConfigCenter][Nacos] cancel listen config fail, key=%s, err=%v", key, err) | ||
| } | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review! These imports are already present in the file — the diff only showed the first import block, but the file has three import blocks (lines 20-22, 24-29, 31-37). The code compiles and all tests pass. No change needed.