Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config_center/nacos/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type nacosDynamicConfiguration struct {
cltLock sync.Mutex
done chan struct{}
client *nacosClient.NacosConfigClient
keyListeners sync.Map // sync.Map[listenKey]*sync.Map[config_center.ConfigurationListener]context.CancelFunc
keyListeners sync.Map // sync.Map[listenKey]*keyListenerSet
parser parser.ConfigurationParser
}

Expand Down
99 changes: 79 additions & 20 deletions config_center/nacos/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package nacos

import (
"context"
"sync"
)
Comment on lines 20 to 22

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review! These imports are already present in the file — the diff only showed the first import block, but the file has three import blocks (lines 20-22, 24-29, 31-37). The code compiles and all tests pass. No change needed.


Expand All @@ -37,29 +36,75 @@ import (
"dubbo.apache.org/dubbo-go/v3/remoting"
)

func callback(listenersMap *sync.Map, _, group, dataId, data string) {
listenersMap.Range(func(key, value any) bool {
key.(config_center.ConfigurationListener).Process(&config_center.ConfigChangeEvent{Key: dataId, Value: data, ConfigType: remoting.EventTypeUpdate})
// keyListenerSet holds the listeners for a single config key.
// The mutex protects the listeners map so that add/remove/check-empty
// operations are atomic, preventing races between concurrent addListener
// and removeListener calls for the same key.
type keyListenerSet struct {
mu sync.Mutex
listeners map[config_center.ConfigurationListener]struct{}
group string // resolved group used to register with nacos, stored for consistent cancel
}

func newKeyListenerSet(group string) *keyListenerSet {
return &keyListenerSet{
listeners: make(map[config_center.ConfigurationListener]struct{}),
group: group,
}
}

func (s *keyListenerSet) add(listener config_center.ConfigurationListener) {
s.mu.Lock()
s.listeners[listener] = struct{}{}
s.mu.Unlock()
}

// remove removes a listener and reports whether the set is now empty.
// The caller must NOT rely on a non-empty result to skip CancelListenConfig,
// because a concurrent add could re-populate the set after this call returns.
func (s *keyListenerSet) remove(listener config_center.ConfigurationListener) bool {
s.mu.Lock()
delete(s.listeners, listener)
empty := len(s.listeners) == 0
s.mu.Unlock()
return empty
}

// snapshot returns a snapshot of the current listeners for safe iteration.
func (s *keyListenerSet) snapshot() []config_center.ConfigurationListener {
s.mu.Lock()
snapshot := make([]config_center.ConfigurationListener, 0, len(s.listeners))
for l := range s.listeners {
snapshot = append(snapshot, l)
}
s.mu.Unlock()
return snapshot
}

func callback(set *keyListenerSet, _, group, dataId, data string) {
for _, l := range set.snapshot() {
l.Process(&config_center.ConfigChangeEvent{Key: dataId, Value: data, ConfigType: remoting.EventTypeUpdate})
metrics.Publish(metricsConfigCenter.NewIncMetricEvent(dataId, group, remoting.EventTypeUpdate, metricsConfigCenter.Nacos))
return true
})
}
}

func (n *nacosDynamicConfiguration) addListener(key string, listener config_center.ConfigurationListener) {
rawListenersMap, loaded := n.keyListeners.Load(key)
group := n.resolvedGroup(n.url.GetParam(constant.NacosGroupKey, constant2.DEFAULT_GROUP))

rawSet, loaded := n.keyListeners.Load(key)
if !loaded {
_, cancel := context.WithCancel(context.Background())
listenersMap := &sync.Map{}
listenersMap.Store(listener, cancel)
set := newKeyListenerSet(group)
set.add(listener)

// double load for invalid race
rawListenersMap, loaded = n.keyListeners.LoadOrStore(key, listenersMap)
var actual any
actual, loaded = n.keyListeners.LoadOrStore(key, set)
if !loaded {
err := n.client.Client().ListenConfig(vo.ConfigParam{
DataId: key,
Group: n.resolvedGroup(n.url.GetParam(constant.NacosGroupKey, constant2.DEFAULT_GROUP)),
Group: group,
OnChange: func(namespace, group, dataId, data string) {
go callback(listenersMap, namespace, group, dataId, data)
go callback(set, namespace, group, dataId, data)
},
})
if err != nil {
Expand All @@ -69,18 +114,32 @@ func (n *nacosDynamicConfiguration) addListener(key string, listener config_cent
}
return
}
rawSet = actual
}
_, cancel := context.WithCancel(context.Background())
listenersMap := rawListenersMap.(*sync.Map)
listenersMap.Store(listener, cancel)
rawSet.(*keyListenerSet).add(listener)
}

func (n *nacosDynamicConfiguration) removeListener(key string, listener config_center.ConfigurationListener) {
rawListenersMap, loaded := n.keyListeners.Load(key)
rawSet, loaded := n.keyListeners.Load(key)
if !loaded {
logger.Errorf("[ConfigCenter][Nacos] key is not be listened, key=%s", key)
} else {
listenersMap := rawListenersMap.(*sync.Map)
listenersMap.Delete(listener)
return
}
set := rawSet.(*keyListenerSet)
isEmpty := set.remove(listener)

if isEmpty {
// Delete from keyListeners first to prevent new addListener from
// finding a stale set after we cancel the nacos subscription.
n.keyListeners.Delete(key)
if n.client != nil {
err := n.client.Client().CancelListenConfig(vo.ConfigParam{
DataId: key,
Group: set.group,
})
Comment on lines +136 to +139
Comment on lines +136 to +139

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Valid concern. Fixed in the latest commit — the resolved group string is now stored in the keyListenerSet struct at registration time, and CancelListenConfig uses set.group directly instead of re-deriving it. This guarantees the same (DataId, Group) pair is used for both ListenConfig and CancelListenConfig.

if err != nil {
logger.Errorf("[ConfigCenter][Nacos] cancel listen config fail, key=%s, err=%v", key, err)
}
}
}
}
42 changes: 34 additions & 8 deletions config_center/nacos/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package nacos

import (
"sync"
"testing"
)

Expand All @@ -37,10 +36,10 @@ func (r *recordingListener) Process(e *config_center.ConfigChangeEvent) {

func TestCallback(t *testing.T) {
l := &recordingListener{}
var m sync.Map
m.Store(l, struct{}{})
set := newKeyListenerSet("test-group")
set.add(l)

callback(&m, "", "g", "data", "payload")
callback(set, "", "g", "data", "payload")

if len(l.events) != 1 {
t.Fatalf("expected 1 event, got %d", len(l.events))
Expand All @@ -54,13 +53,40 @@ func TestRemoveListener(t *testing.T) {
n := &nacosDynamicConfiguration{}
key := "k"
l := &recordingListener{}
inner := &sync.Map{}
inner.Store(l, struct{}{})
n.keyListeners.Store(key, inner)
set := newKeyListenerSet("test-group")
set.add(l)
n.keyListeners.Store(key, set)

n.removeListener(key, l)

if _, ok := inner.Load(l); ok {
if _, ok := set.listeners[l]; ok {
t.Fatalf("listener should be removed")
}
// After removing the only listener, the key should be deleted from keyListeners
if _, loaded := n.keyListeners.Load(key); loaded {
t.Fatalf("key should be deleted from keyListeners after last listener is removed")
}
}

func TestRemoveListenerMultipleListeners(t *testing.T) {
n := &nacosDynamicConfiguration{}
key := "k"
l1 := &recordingListener{}
l2 := &recordingListener{}
set := newKeyListenerSet("test-group")
set.add(l1)
set.add(l2)
n.keyListeners.Store(key, set)

// Remove first listener — key should still exist
n.removeListener(key, l1)
if _, loaded := n.keyListeners.Load(key); !loaded {
t.Fatalf("key should still exist after removing one of multiple listeners")
}

// Remove second listener — key should be deleted
n.removeListener(key, l2)
if _, loaded := n.keyListeners.Load(key); loaded {
t.Fatalf("key should be deleted from keyListeners after last listener is removed")
}
}
Loading