diff --git a/config_center/nacos/impl.go b/config_center/nacos/impl.go index db672d7967..d428d0aef6 100644 --- a/config_center/nacos/impl.go +++ b/config_center/nacos/impl.go @@ -57,7 +57,7 @@ type nacosDynamicConfiguration struct { cltLock sync.Mutex done chan struct{} client *nacosClient.NacosConfigClient - keyListeners sync.Map // sync.Map[listenKey]*sync.Map[config_center.ConfigurationListener]context.CancelFunc + keyListeners sync.Map // sync.Map[listenKey]*keyListenerSet parser parser.ConfigurationParser } diff --git a/config_center/nacos/listener.go b/config_center/nacos/listener.go index 542da3a17e..f6d15aea70 100644 --- a/config_center/nacos/listener.go +++ b/config_center/nacos/listener.go @@ -18,7 +18,6 @@ package nacos import ( - "context" "sync" ) @@ -37,29 +36,75 @@ import ( "dubbo.apache.org/dubbo-go/v3/remoting" ) -func callback(listenersMap *sync.Map, _, group, dataId, data string) { - listenersMap.Range(func(key, value any) bool { - key.(config_center.ConfigurationListener).Process(&config_center.ConfigChangeEvent{Key: dataId, Value: data, ConfigType: remoting.EventTypeUpdate}) +// keyListenerSet holds the listeners for a single config key. +// The mutex protects the listeners map so that add/remove/check-empty +// operations are atomic, preventing races between concurrent addListener +// and removeListener calls for the same key. +type keyListenerSet struct { + mu sync.Mutex + listeners map[config_center.ConfigurationListener]struct{} + group string // resolved group used to register with nacos, stored for consistent cancel +} + +func newKeyListenerSet(group string) *keyListenerSet { + return &keyListenerSet{ + listeners: make(map[config_center.ConfigurationListener]struct{}), + group: group, + } +} + +func (s *keyListenerSet) add(listener config_center.ConfigurationListener) { + s.mu.Lock() + s.listeners[listener] = struct{}{} + s.mu.Unlock() +} + +// remove removes a listener and reports whether the set is now empty. +// The caller must NOT rely on a non-empty result to skip CancelListenConfig, +// because a concurrent add could re-populate the set after this call returns. +func (s *keyListenerSet) remove(listener config_center.ConfigurationListener) bool { + s.mu.Lock() + delete(s.listeners, listener) + empty := len(s.listeners) == 0 + s.mu.Unlock() + return empty +} + +// snapshot returns a snapshot of the current listeners for safe iteration. +func (s *keyListenerSet) snapshot() []config_center.ConfigurationListener { + s.mu.Lock() + snapshot := make([]config_center.ConfigurationListener, 0, len(s.listeners)) + for l := range s.listeners { + snapshot = append(snapshot, l) + } + s.mu.Unlock() + return snapshot +} + +func callback(set *keyListenerSet, _, group, dataId, data string) { + for _, l := range set.snapshot() { + l.Process(&config_center.ConfigChangeEvent{Key: dataId, Value: data, ConfigType: remoting.EventTypeUpdate}) metrics.Publish(metricsConfigCenter.NewIncMetricEvent(dataId, group, remoting.EventTypeUpdate, metricsConfigCenter.Nacos)) - return true - }) + } } func (n *nacosDynamicConfiguration) addListener(key string, listener config_center.ConfigurationListener) { - rawListenersMap, loaded := n.keyListeners.Load(key) + group := n.resolvedGroup(n.url.GetParam(constant.NacosGroupKey, constant2.DEFAULT_GROUP)) + + rawSet, loaded := n.keyListeners.Load(key) if !loaded { - _, cancel := context.WithCancel(context.Background()) - listenersMap := &sync.Map{} - listenersMap.Store(listener, cancel) + set := newKeyListenerSet(group) + set.add(listener) // double load for invalid race - rawListenersMap, loaded = n.keyListeners.LoadOrStore(key, listenersMap) + var actual any + actual, loaded = n.keyListeners.LoadOrStore(key, set) if !loaded { err := n.client.Client().ListenConfig(vo.ConfigParam{ DataId: key, - Group: n.resolvedGroup(n.url.GetParam(constant.NacosGroupKey, constant2.DEFAULT_GROUP)), + Group: group, OnChange: func(namespace, group, dataId, data string) { - go callback(listenersMap, namespace, group, dataId, data) + go callback(set, namespace, group, dataId, data) }, }) if err != nil { @@ -69,18 +114,32 @@ func (n *nacosDynamicConfiguration) addListener(key string, listener config_cent } return } + rawSet = actual } - _, cancel := context.WithCancel(context.Background()) - listenersMap := rawListenersMap.(*sync.Map) - listenersMap.Store(listener, cancel) + rawSet.(*keyListenerSet).add(listener) } func (n *nacosDynamicConfiguration) removeListener(key string, listener config_center.ConfigurationListener) { - rawListenersMap, loaded := n.keyListeners.Load(key) + rawSet, loaded := n.keyListeners.Load(key) if !loaded { logger.Errorf("[ConfigCenter][Nacos] key is not be listened, key=%s", key) - } else { - listenersMap := rawListenersMap.(*sync.Map) - listenersMap.Delete(listener) + return + } + set := rawSet.(*keyListenerSet) + isEmpty := set.remove(listener) + + if isEmpty { + // Delete from keyListeners first to prevent new addListener from + // finding a stale set after we cancel the nacos subscription. + n.keyListeners.Delete(key) + if n.client != nil { + err := n.client.Client().CancelListenConfig(vo.ConfigParam{ + DataId: key, + Group: set.group, + }) + if err != nil { + logger.Errorf("[ConfigCenter][Nacos] cancel listen config fail, key=%s, err=%v", key, err) + } + } } } diff --git a/config_center/nacos/listener_test.go b/config_center/nacos/listener_test.go index 00ceb7f1d0..121fe5d328 100644 --- a/config_center/nacos/listener_test.go +++ b/config_center/nacos/listener_test.go @@ -18,7 +18,6 @@ package nacos import ( - "sync" "testing" ) @@ -37,10 +36,10 @@ func (r *recordingListener) Process(e *config_center.ConfigChangeEvent) { func TestCallback(t *testing.T) { l := &recordingListener{} - var m sync.Map - m.Store(l, struct{}{}) + set := newKeyListenerSet("test-group") + set.add(l) - callback(&m, "", "g", "data", "payload") + callback(set, "", "g", "data", "payload") if len(l.events) != 1 { t.Fatalf("expected 1 event, got %d", len(l.events)) @@ -54,13 +53,40 @@ func TestRemoveListener(t *testing.T) { n := &nacosDynamicConfiguration{} key := "k" l := &recordingListener{} - inner := &sync.Map{} - inner.Store(l, struct{}{}) - n.keyListeners.Store(key, inner) + set := newKeyListenerSet("test-group") + set.add(l) + n.keyListeners.Store(key, set) n.removeListener(key, l) - if _, ok := inner.Load(l); ok { + if _, ok := set.listeners[l]; ok { t.Fatalf("listener should be removed") } + // After removing the only listener, the key should be deleted from keyListeners + if _, loaded := n.keyListeners.Load(key); loaded { + t.Fatalf("key should be deleted from keyListeners after last listener is removed") + } +} + +func TestRemoveListenerMultipleListeners(t *testing.T) { + n := &nacosDynamicConfiguration{} + key := "k" + l1 := &recordingListener{} + l2 := &recordingListener{} + set := newKeyListenerSet("test-group") + set.add(l1) + set.add(l2) + n.keyListeners.Store(key, set) + + // Remove first listener — key should still exist + n.removeListener(key, l1) + if _, loaded := n.keyListeners.Load(key); !loaded { + t.Fatalf("key should still exist after removing one of multiple listeners") + } + + // Remove second listener — key should be deleted + n.removeListener(key, l2) + if _, loaded := n.keyListeners.Load(key); loaded { + t.Fatalf("key should be deleted from keyListeners after last listener is removed") + } }