From bd4633108216b20b3c8c6bfd0eba3212335fcded Mon Sep 17 00:00:00 2001 From: liuhy Date: Wed, 17 Jun 2026 15:27:02 +0800 Subject: [PATCH 1/2] fix(config_center/nacos): remove unused context.WithCancel and call CancelListenConfig on last listener removal The addListener() function created context.WithCancel but the cancel function was never called, causing a goroutine and context leak. Additionally, removeListener() never called CancelListenConfig to unsubscribe from the nacos server, leaving stale config change listeners active indefinitely. Changes: - Remove unused context.WithCancel calls in addListener() - Replace context.CancelFunc value type with struct{} in keyListeners map (the cancel function was never used) - In removeListener(), call CancelListenConfig when the last listener for a key is removed (matching Apollo and File implementations) - Clean up keyListeners entry when no listeners remain - Add nil client guard for test compatibility Co-Authored-By: Claude --- config_center/nacos/impl.go | 2 +- config_center/nacos/listener.go | 32 ++++++++++++++++++++++++-------- 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/config_center/nacos/impl.go b/config_center/nacos/impl.go index db672d7967..c6ef185652 100644 --- a/config_center/nacos/impl.go +++ b/config_center/nacos/impl.go @@ -57,7 +57,7 @@ type nacosDynamicConfiguration struct { cltLock sync.Mutex done chan struct{} client *nacosClient.NacosConfigClient - keyListeners sync.Map // sync.Map[listenKey]*sync.Map[config_center.ConfigurationListener]context.CancelFunc + keyListeners sync.Map // sync.Map[listenKey]*sync.Map[config_center.ConfigurationListener]struct{} parser parser.ConfigurationParser } diff --git a/config_center/nacos/listener.go b/config_center/nacos/listener.go index 542da3a17e..9838683815 100644 --- a/config_center/nacos/listener.go +++ b/config_center/nacos/listener.go @@ -18,7 +18,6 @@ package nacos import ( - "context" "sync" ) @@ -48,9 +47,8 @@ func callback(listenersMap *sync.Map, _, group, dataId, data string) { func (n *nacosDynamicConfiguration) addListener(key string, listener config_center.ConfigurationListener) { rawListenersMap, loaded := n.keyListeners.Load(key) if !loaded { - _, cancel := context.WithCancel(context.Background()) listenersMap := &sync.Map{} - listenersMap.Store(listener, cancel) + listenersMap.Store(listener, struct{}{}) // double load for invalid race rawListenersMap, loaded = n.keyListeners.LoadOrStore(key, listenersMap) @@ -70,17 +68,35 @@ func (n *nacosDynamicConfiguration) addListener(key string, listener config_cent return } } - _, cancel := context.WithCancel(context.Background()) listenersMap := rawListenersMap.(*sync.Map) - listenersMap.Store(listener, cancel) + listenersMap.Store(listener, struct{}{}) } func (n *nacosDynamicConfiguration) removeListener(key string, listener config_center.ConfigurationListener) { rawListenersMap, loaded := n.keyListeners.Load(key) if !loaded { logger.Errorf("[ConfigCenter][Nacos] key is not be listened, key=%s", key) - } else { - listenersMap := rawListenersMap.(*sync.Map) - listenersMap.Delete(listener) + return + } + listenersMap := rawListenersMap.(*sync.Map) + listenersMap.Delete(listener) + + // If no listeners remain for this key, cancel the nacos config subscription + isEmpty := true + listenersMap.Range(func(_, _ any) bool { + isEmpty = false + return false + }) + if isEmpty { + n.keyListeners.Delete(key) + if n.client != nil { + err := n.client.Client().CancelListenConfig(vo.ConfigParam{ + DataId: key, + Group: n.resolvedGroup(n.url.GetParam(constant.NacosGroupKey, constant2.DEFAULT_GROUP)), + }) + if err != nil { + logger.Errorf("[ConfigCenter][Nacos] cancel listen config fail, key=%s, err=%v", key, err) + } + } } } From 9cefb40bbe48178673031634205c67588ed86c65 Mon Sep 17 00:00:00 2001 From: liuhy Date: Wed, 17 Jun 2026 15:42:23 +0800 Subject: [PATCH 2/2] fix(config_center/nacos): use keyListenerSet with mutex for thread-safe listener management Address Copilot review feedback: 1. Race condition: Replace bare sync.Map with keyListenerSet struct that uses sync.Mutex to make add/remove/check-empty atomic, preventing concurrent addListener from racing with removeListener's emptiness check and CancelListenConfig call. 2. DataId/Group consistency: Store the resolved group string in keyListenerSet so that CancelListenConfig uses the exact same group that was passed to ListenConfig, avoiding mismatch from re-derivation. 3. Delete keyListeners entry before CancelListenConfig to prevent new addListener from finding a stale set after cancellation. 4. Use snapshot() for callback iteration to safely iterate while mutations may occur. Also add test for multi-listener removal behavior. --- config_center/nacos/impl.go | 2 +- config_center/nacos/listener.go | 91 ++++++++++++++++++++-------- config_center/nacos/listener_test.go | 42 ++++++++++--- 3 files changed, 102 insertions(+), 33 deletions(-) diff --git a/config_center/nacos/impl.go b/config_center/nacos/impl.go index c6ef185652..d428d0aef6 100644 --- a/config_center/nacos/impl.go +++ b/config_center/nacos/impl.go @@ -57,7 +57,7 @@ type nacosDynamicConfiguration struct { cltLock sync.Mutex done chan struct{} client *nacosClient.NacosConfigClient - keyListeners sync.Map // sync.Map[listenKey]*sync.Map[config_center.ConfigurationListener]struct{} + keyListeners sync.Map // sync.Map[listenKey]*keyListenerSet parser parser.ConfigurationParser } diff --git a/config_center/nacos/listener.go b/config_center/nacos/listener.go index 9838683815..f6d15aea70 100644 --- a/config_center/nacos/listener.go +++ b/config_center/nacos/listener.go @@ -36,28 +36,75 @@ import ( "dubbo.apache.org/dubbo-go/v3/remoting" ) -func callback(listenersMap *sync.Map, _, group, dataId, data string) { - listenersMap.Range(func(key, value any) bool { - key.(config_center.ConfigurationListener).Process(&config_center.ConfigChangeEvent{Key: dataId, Value: data, ConfigType: remoting.EventTypeUpdate}) +// keyListenerSet holds the listeners for a single config key. +// The mutex protects the listeners map so that add/remove/check-empty +// operations are atomic, preventing races between concurrent addListener +// and removeListener calls for the same key. +type keyListenerSet struct { + mu sync.Mutex + listeners map[config_center.ConfigurationListener]struct{} + group string // resolved group used to register with nacos, stored for consistent cancel +} + +func newKeyListenerSet(group string) *keyListenerSet { + return &keyListenerSet{ + listeners: make(map[config_center.ConfigurationListener]struct{}), + group: group, + } +} + +func (s *keyListenerSet) add(listener config_center.ConfigurationListener) { + s.mu.Lock() + s.listeners[listener] = struct{}{} + s.mu.Unlock() +} + +// remove removes a listener and reports whether the set is now empty. +// The caller must NOT rely on a non-empty result to skip CancelListenConfig, +// because a concurrent add could re-populate the set after this call returns. +func (s *keyListenerSet) remove(listener config_center.ConfigurationListener) bool { + s.mu.Lock() + delete(s.listeners, listener) + empty := len(s.listeners) == 0 + s.mu.Unlock() + return empty +} + +// snapshot returns a snapshot of the current listeners for safe iteration. +func (s *keyListenerSet) snapshot() []config_center.ConfigurationListener { + s.mu.Lock() + snapshot := make([]config_center.ConfigurationListener, 0, len(s.listeners)) + for l := range s.listeners { + snapshot = append(snapshot, l) + } + s.mu.Unlock() + return snapshot +} + +func callback(set *keyListenerSet, _, group, dataId, data string) { + for _, l := range set.snapshot() { + l.Process(&config_center.ConfigChangeEvent{Key: dataId, Value: data, ConfigType: remoting.EventTypeUpdate}) metrics.Publish(metricsConfigCenter.NewIncMetricEvent(dataId, group, remoting.EventTypeUpdate, metricsConfigCenter.Nacos)) - return true - }) + } } func (n *nacosDynamicConfiguration) addListener(key string, listener config_center.ConfigurationListener) { - rawListenersMap, loaded := n.keyListeners.Load(key) + group := n.resolvedGroup(n.url.GetParam(constant.NacosGroupKey, constant2.DEFAULT_GROUP)) + + rawSet, loaded := n.keyListeners.Load(key) if !loaded { - listenersMap := &sync.Map{} - listenersMap.Store(listener, struct{}{}) + set := newKeyListenerSet(group) + set.add(listener) // double load for invalid race - rawListenersMap, loaded = n.keyListeners.LoadOrStore(key, listenersMap) + var actual any + actual, loaded = n.keyListeners.LoadOrStore(key, set) if !loaded { err := n.client.Client().ListenConfig(vo.ConfigParam{ DataId: key, - Group: n.resolvedGroup(n.url.GetParam(constant.NacosGroupKey, constant2.DEFAULT_GROUP)), + Group: group, OnChange: func(namespace, group, dataId, data string) { - go callback(listenersMap, namespace, group, dataId, data) + go callback(set, namespace, group, dataId, data) }, }) if err != nil { @@ -67,32 +114,28 @@ func (n *nacosDynamicConfiguration) addListener(key string, listener config_cent } return } + rawSet = actual } - listenersMap := rawListenersMap.(*sync.Map) - listenersMap.Store(listener, struct{}{}) + rawSet.(*keyListenerSet).add(listener) } func (n *nacosDynamicConfiguration) removeListener(key string, listener config_center.ConfigurationListener) { - rawListenersMap, loaded := n.keyListeners.Load(key) + rawSet, loaded := n.keyListeners.Load(key) if !loaded { logger.Errorf("[ConfigCenter][Nacos] key is not be listened, key=%s", key) return } - listenersMap := rawListenersMap.(*sync.Map) - listenersMap.Delete(listener) - - // If no listeners remain for this key, cancel the nacos config subscription - isEmpty := true - listenersMap.Range(func(_, _ any) bool { - isEmpty = false - return false - }) + set := rawSet.(*keyListenerSet) + isEmpty := set.remove(listener) + if isEmpty { + // Delete from keyListeners first to prevent new addListener from + // finding a stale set after we cancel the nacos subscription. n.keyListeners.Delete(key) if n.client != nil { err := n.client.Client().CancelListenConfig(vo.ConfigParam{ DataId: key, - Group: n.resolvedGroup(n.url.GetParam(constant.NacosGroupKey, constant2.DEFAULT_GROUP)), + Group: set.group, }) if err != nil { logger.Errorf("[ConfigCenter][Nacos] cancel listen config fail, key=%s, err=%v", key, err) diff --git a/config_center/nacos/listener_test.go b/config_center/nacos/listener_test.go index 00ceb7f1d0..121fe5d328 100644 --- a/config_center/nacos/listener_test.go +++ b/config_center/nacos/listener_test.go @@ -18,7 +18,6 @@ package nacos import ( - "sync" "testing" ) @@ -37,10 +36,10 @@ func (r *recordingListener) Process(e *config_center.ConfigChangeEvent) { func TestCallback(t *testing.T) { l := &recordingListener{} - var m sync.Map - m.Store(l, struct{}{}) + set := newKeyListenerSet("test-group") + set.add(l) - callback(&m, "", "g", "data", "payload") + callback(set, "", "g", "data", "payload") if len(l.events) != 1 { t.Fatalf("expected 1 event, got %d", len(l.events)) @@ -54,13 +53,40 @@ func TestRemoveListener(t *testing.T) { n := &nacosDynamicConfiguration{} key := "k" l := &recordingListener{} - inner := &sync.Map{} - inner.Store(l, struct{}{}) - n.keyListeners.Store(key, inner) + set := newKeyListenerSet("test-group") + set.add(l) + n.keyListeners.Store(key, set) n.removeListener(key, l) - if _, ok := inner.Load(l); ok { + if _, ok := set.listeners[l]; ok { t.Fatalf("listener should be removed") } + // After removing the only listener, the key should be deleted from keyListeners + if _, loaded := n.keyListeners.Load(key); loaded { + t.Fatalf("key should be deleted from keyListeners after last listener is removed") + } +} + +func TestRemoveListenerMultipleListeners(t *testing.T) { + n := &nacosDynamicConfiguration{} + key := "k" + l1 := &recordingListener{} + l2 := &recordingListener{} + set := newKeyListenerSet("test-group") + set.add(l1) + set.add(l2) + n.keyListeners.Store(key, set) + + // Remove first listener — key should still exist + n.removeListener(key, l1) + if _, loaded := n.keyListeners.Load(key); !loaded { + t.Fatalf("key should still exist after removing one of multiple listeners") + } + + // Remove second listener — key should be deleted + n.removeListener(key, l2) + if _, loaded := n.keyListeners.Load(key); loaded { + t.Fatalf("key should be deleted from keyListeners after last listener is removed") + } }