From 9b65fd20f58836e25d32b75613bb490644d01ad6 Mon Sep 17 00:00:00 2001 From: mazheng Date: Tue, 2 Sep 2025 12:20:57 +0800 Subject: [PATCH 01/25] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E4=B8=AD=E6=96=87?= =?UTF-8?q?=E7=89=88readme=E7=9A=84examples=E5=B7=A5=E7=A8=8B=E8=B7=B3?= =?UTF-8?q?=E8=BD=AC=E9=93=BE=E6=8E=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README_CN.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README_CN.md b/README_CN.md index ccc98d87..4b346cf0 100644 --- a/README_CN.md +++ b/README_CN.md @@ -18,7 +18,7 @@ Getty 是一个使用 Golang 开发的异步网络 I/O 库。它适用于 TCP、 如果您使用 WebSocket,您无需担心心跳请求/响应,因为 Getty 在 session.go 的 (Session)handleLoop 方法内通过发送和接收 WebSocket ping/pong 帧来处理此任务。您只需在 codec.go 的 (Codec)OnCron 方法内使用 session.go 的 (Session)GetActive 方法检查 WebSocket 会话是否已超时。 -有关代码示例,请参阅 https://github.com/AlexStocks/getty-examples。 +有关代码示例,请参阅 https://github.com/AlexStocks/getty-examples ## 关于 Getty 中的网络传输 From 6bbc2b5817fe9b06af73298dc345aecd811fce79 Mon Sep 17 00:00:00 2001 From: mazheng Date: Tue, 2 Sep 2025 16:27:17 +0800 Subject: [PATCH 02/25] =?UTF-8?q?=E4=B8=BAsession=E5=A2=9E=E5=8A=A0closeca?= =?UTF-8?q?llback=E6=9C=BA=E5=88=B6=E7=94=A8=E4=BA=8E=E6=96=AD=E7=BA=BF?= =?UTF-8?q?=E6=97=B6=E5=81=9A=E4=B8=80=E4=BA=9B=E5=9B=9E=E6=94=B6=E6=93=8D?= =?UTF-8?q?=E4=BD=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- transport/callback.go | 99 +++++++++++++ transport/callback_test.go | 99 +++++++++++++ transport/session.go | 18 +++ transport/session_callback.go | 68 +++++++++ transport/session_callback_test.go | 220 +++++++++++++++++++++++++++++ 5 files changed, 504 insertions(+) create mode 100644 transport/callback.go create mode 100644 transport/callback_test.go create mode 100644 transport/session_callback.go create mode 100644 transport/session_callback_test.go diff --git a/transport/callback.go b/transport/callback.go new file mode 100644 index 00000000..ebb6116e --- /dev/null +++ b/transport/callback.go @@ -0,0 +1,99 @@ +package getty + +// callbackCommon 表示回调链表中的一个节点 +// 每个节点包含处理器标识、键值、回调函数和指向下一个节点的指针 +type callbackCommon struct { + handler interface{} // 处理器标识,用于标识回调的来源或类型 + key interface{} // 回调的唯一标识键,与 handler 组合使用 + call func() // 实际要执行的回调函数 + next *callbackCommon // 指向下一个节点的指针,形成链表结构 +} + +// callbacks 是一个单向链表结构,用于管理多个回调函数 +// 支持动态添加、移除和执行回调 +type callbacks struct { + first *callbackCommon // 指向链表第一个节点的指针 + last *callbackCommon // 指向链表最后一个节点的指针,用于快速添加新节点 +} + +// Add 向回调链表中添加一个新的回调函数 +// 参数说明: +// - handler: 处理器标识,可以是任意类型 +// - key: 回调的唯一标识键,与 handler 组合使用 +// - callback: 要执行的回调函数,如果为 nil 则忽略 +func (t *callbacks) Add(handler, key interface{}, callback func()) { + // 防止添加空回调函数 + if callback == nil { + return + } + + // 创建新的回调节点 + newItem := &callbackCommon{handler, key, callback, nil} + + if t.first == nil { + // 如果链表为空,新节点成为第一个节点 + t.first = newItem + } else { + // 否则将新节点添加到链表末尾 + t.last.next = newItem + } + // 更新最后一个节点的指针 + t.last = newItem +} + +// Remove 从回调链表中移除指定的回调函数 +// 参数说明: +// - handler: 要移除的回调的处理器标识 +// - key: 要移除的回调的唯一标识键 +// 注意: 如果找不到匹配的回调,此方法不会产生任何效果 +func (t *callbacks) Remove(handler, key interface{}) { + var prev *callbackCommon + + // 遍历链表查找要移除的节点 + for callback := t.first; callback != nil; prev, callback = callback, callback.next { + // 找到匹配的节点 + if callback.handler == handler && callback.key == key { + if t.first == callback { + // 如果是第一个节点,更新 first 指针 + t.first = callback.next + } else if prev != nil { + // 如果是中间节点,更新前一个节点的 next 指针 + prev.next = callback.next + } + + if t.last == callback { + // 如果是最后一个节点,更新 last 指针 + t.last = prev + } + + // 找到并移除后立即返回 + return + } + } +} + +// Invoke 执行链表中所有注册的回调函数 +// 按照添加的顺序依次执行每个回调 +// 注意: 如果某个回调函数为 nil,会被跳过 +func (t *callbacks) Invoke() { + // 从头节点开始遍历整个链表 + for callback := t.first; callback != nil; callback = callback.next { + // 确保回调函数不为 nil 再执行 + if callback.call != nil { + callback.call() + } + } +} + +// Count 返回链表中回调函数的数量 +// 返回值: 当前注册的回调函数总数 +func (t *callbacks) Count() int { + var count int + + // 遍历链表计数 + for callback := t.first; callback != nil; callback = callback.next { + count++ + } + + return count +} diff --git a/transport/callback_test.go b/transport/callback_test.go new file mode 100644 index 00000000..09c84d7d --- /dev/null +++ b/transport/callback_test.go @@ -0,0 +1,99 @@ +package getty + +import ( + "testing" +) + +func TestCallback(t *testing.T) { + var count, expected, remove, totalCount int + var cb = &callbacks{} + + totalCount = 10 + remove = 5 + + // 添加回调函数 + for i := 1; i < totalCount; i++ { + expected = expected + i + func(ii int) { + cb.Add(ii, ii, func() { count = count + ii }) + }(i) + } + + // 验证添加后的数量 + expectedCallbacks := totalCount - 1 + if cb.Count() != expectedCallbacks { + t.Errorf("期望回调数量为 %d,实际为 %d", expectedCallbacks, cb.Count()) + } + + // 测试添加 nil 回调 + cb.Add(remove, remove, nil) + if cb.Count() != expectedCallbacks { + t.Errorf("添加 nil 回调后期望数量为 %d,实际为 %d", expectedCallbacks, cb.Count()) + } + + // 移除指定的回调 + cb.Remove(remove, remove) + + // 尝试移除不存在的回调 + cb.Remove(remove+1, remove+2) + + // 执行所有回调 + cb.Invoke() + + // 验证执行结果 + expectedCount := expected - remove + if count != expectedCount { + t.Errorf("期望执行结果为 %d,实际为 %d", expectedCount, count) + } +} + +func TestCallbackAddRemove(t *testing.T) { + cb := &callbacks{} + + // 测试空列表 + if cb.Count() != 0 { + t.Errorf("空列表期望数量为 0,实际为 %d", cb.Count()) + } + + // 添加回调 + cb.Add("handler1", "key1", func() {}) + cb.Add("handler2", "key2", func() {}) + cb.Add("handler3", "key3", func() {}) + + if cb.Count() != 3 { + t.Errorf("期望回调数量为 3,实际为 %d", cb.Count()) + } + + // 移除中间的回调 + cb.Remove("handler2", "key2") + if cb.Count() != 2 { + t.Errorf("移除中间回调后期望数量为 2,实际为 %d", cb.Count()) + } + + // 移除第一个回调 + cb.Remove("handler1", "key1") + if cb.Count() != 1 { + t.Errorf("移除第一个回调后期望数量为 1,实际为 %d", cb.Count()) + } + + // 移除最后一个回调 + cb.Remove("handler3", "key3") + if cb.Count() != 0 { + t.Errorf("移除最后一个回调后期望数量为 0,实际为 %d", cb.Count()) + } +} + +func TestCallbackRemoveNonExistent(t *testing.T) { + cb := &callbacks{} + + // 添加一个回调 + cb.Add("handler1", "key1", func() {}) + + // 尝试移除不存在的回调 + cb.Remove("handler2", "key2") + + // 应该仍然有1个回调 + if cb.Count() != 1 { + t.Errorf("期望回调数量为 1,实际为 %d", cb.Count()) + } +} diff --git a/transport/session.go b/transport/session.go index f986fb3e..5303feba 100644 --- a/transport/session.go +++ b/transport/session.go @@ -101,6 +101,9 @@ type Session interface { WriteBytes([]byte) (int, error) WriteBytesArray(...[]byte) (int, error) Close() + + AddCloseCallback(handler, key any, callback CallBackFunc) + RemoveCloseCallback(handler, key any) } // getty base session @@ -135,6 +138,10 @@ type session struct { grNum uatomic.Int32 lock sync.RWMutex packetLock sync.RWMutex + + // callbacks + closeCallback callbacks + closeCallbackMutex sync.RWMutex } func newSession(endPoint EndPoint, conn Connection) *session { @@ -868,6 +875,17 @@ func (s *session) stop() { } } close(s.done) + + go func() { + defer func() { + if r := recover(); r != nil { + log.Errorf("invokeCloseCallbacks panic: %v", r) + } + }() + // 执行关闭回调函数 + s.invokeCloseCallbacks() + }() + clt, cltFound := s.GetAttribute(sessionClientKey).(*client) ignoreReconnect, flagFound := s.GetAttribute(ignoreReconnectKey).(bool) if cltFound && flagFound && !ignoreReconnect { diff --git a/transport/session_callback.go b/transport/session_callback.go new file mode 100644 index 00000000..791f3275 --- /dev/null +++ b/transport/session_callback.go @@ -0,0 +1,68 @@ +package getty + +// AddCloseCallback 添加 Session 关闭回调函数 +// +// 参数说明: +// - handler: 处理器标识,用于标识回调的来源或类型 +// - key: 回调的唯一标识键,与 handler 组合使用 +// - f: 要执行的回调函数,在 session 关闭时自动调用 +// +// 注意事项: +// - 如果 session 已经关闭,则忽略此次添加 +// - handler 和 key 的组合必须唯一,否则会覆盖之前的回调 +// - 回调函数会在 session 关闭时按照添加顺序执行 +func (s *session) AddCloseCallback(handler, key any, f CallBackFunc) { + if s.IsClosed() { + return + } + s.closeCallbackMutex.Lock() + s.closeCallback.Add(handler, key, f) + s.closeCallbackMutex.Unlock() +} + +// RemoveCloseCallback 移除指定的 Session 关闭回调函数 +// +// 参数说明: +// - handler: 要移除的回调的处理器标识 +// - key: 要移除的回调的唯一标识键 +// +// 返回值: 无 +// +// 注意事项: +// - 如果 session 已经关闭,则忽略此次移除操作 +// - 如果找不到匹配的回调,此操作不会产生任何效果 +// - 移除操作是线程安全的 +func (s *session) RemoveCloseCallback(handler, key any) { + if s.IsClosed() { + return + } + s.closeCallbackMutex.Lock() + s.closeCallback.Remove(handler, key) + s.closeCallbackMutex.Unlock() +} + +// invokeCloseCallbacks 执行所有注册的关闭回调函数 +// +// 功能说明: +// - 按照添加顺序依次执行所有注册的关闭回调 +// - 使用读锁保护回调列表,确保并发安全 +// - 此方法通常在 session 关闭时自动调用 +// +// 注意事项: +// - 此方法是内部方法,不建议外部直接调用 +// - 回调执行过程中如果发生 panic,会被捕获并记录日志 +// - 回调函数应该避免长时间阻塞,建议异步处理耗时操作 +func (s *session) invokeCloseCallbacks() { + s.closeCallbackMutex.RLock() + s.closeCallback.Invoke() + s.closeCallbackMutex.RUnlock() +} + +// CallBackFunc 定义 Session 关闭时的回调函数类型 +// +// 使用说明: +// - 回调函数不接受任何参数 +// - 回调函数不返回任何值 +// - 回调函数应该处理资源清理、状态更新等操作 +// - 建议在回调函数中避免访问已关闭的 session 状态 +type CallBackFunc func() diff --git a/transport/session_callback_test.go b/transport/session_callback_test.go new file mode 100644 index 00000000..605edec7 --- /dev/null +++ b/transport/session_callback_test.go @@ -0,0 +1,220 @@ +package getty + +import ( + "fmt" + "sync" + "testing" + "time" +) + +func TestSessionCallback(t *testing.T) { + // 创建一个简单的 session 对象用于测试 + s := &session{ + once: &sync.Once{}, + done: make(chan struct{}), + closeCallback: callbacks{}, + } + + // 测试添加关闭回调 + var callbackExecuted bool + var callbackMutex sync.Mutex + + callback := func() { + callbackMutex.Lock() + callbackExecuted = true + callbackMutex.Unlock() + } + + // 添加回调 + s.AddCloseCallback("testHandler", "testKey", callback) + if s.closeCallback.Count() != 1 { + t.Errorf("期望回调数量为 1,实际为 %d", s.closeCallback.Count()) + } + + // 测试移除回调 + s.RemoveCloseCallback("testHandler", "testKey") + if s.closeCallback.Count() != 0 { + t.Errorf("期望回调数量为 0,实际为 %d", s.closeCallback.Count()) + } + + // 重新添加回调 + s.AddCloseCallback("testHandler", "testKey", callback) + + // 测试关闭时回调执行 + go func() { + time.Sleep(10 * time.Millisecond) + s.stop() + }() + + // 等待回调执行 + time.Sleep(50 * time.Millisecond) + + callbackMutex.Lock() + if !callbackExecuted { + t.Error("回调函数未被执行") + } + callbackMutex.Unlock() +} + +func TestSessionCallbackMultiple(t *testing.T) { + // 创建一个简单的 session 对象用于测试 + s := &session{ + once: &sync.Once{}, + done: make(chan struct{}), + closeCallback: callbacks{}, + } + + var callbackCount int + var callbackMutex sync.Mutex + + // 添加多个回调 + totalCallbacks := 3 + for i := 0; i < totalCallbacks; i++ { + index := i // 捕获循环变量 + callback := func() { + callbackMutex.Lock() + callbackCount++ + callbackMutex.Unlock() + } + s.AddCloseCallback(fmt.Sprintf("handler%d", index), fmt.Sprintf("key%d", index), callback) + } + + if s.closeCallback.Count() != totalCallbacks { + t.Errorf("期望回调数量为 %d,实际为 %d", totalCallbacks, s.closeCallback.Count()) + } + + // 移除一个回调 + s.RemoveCloseCallback("handler0", "key0") + expectedAfterRemove := totalCallbacks - 1 + if s.closeCallback.Count() != expectedAfterRemove { + t.Errorf("期望回调数量为 %d,实际为 %d", expectedAfterRemove, s.closeCallback.Count()) + } + + // 测试关闭时剩余回调执行 + go func() { + time.Sleep(10 * time.Millisecond) + s.stop() + }() + + time.Sleep(50 * time.Millisecond) + + callbackMutex.Lock() + if callbackCount != expectedAfterRemove { + t.Errorf("期望执行的回调数量为 %d,实际为 %d", expectedAfterRemove, callbackCount) + } + callbackMutex.Unlock() +} + +func TestInvokeCloseCallbacks(t *testing.T) { + // 创建一个简单的 session 对象用于测试 + s := &session{ + once: &sync.Once{}, + done: make(chan struct{}), + closeCallback: callbacks{}, + } + + var callbackResults []string + var callbackMutex sync.Mutex + + // 添加多个不同类型的关闭回调 + callbacks := []struct { + handler string + key string + action string + }{ + {"cleanup", "resources", "清理资源"}, + {"cleanup", "connections", "关闭连接"}, + {"logging", "audit", "记录审计日志"}, + {"metrics", "stats", "更新统计信息"}, + } + + // 注册所有回调 + for _, cb := range callbacks { + cbCopy := cb // 捕获循环变量 + callback := func() { + callbackMutex.Lock() + callbackResults = append(callbackResults, cbCopy.action) + callbackMutex.Unlock() + } + s.AddCloseCallback(cbCopy.handler, cbCopy.key, callback) + } + + // 验证回调数量 + expectedCount := len(callbacks) + if s.closeCallback.Count() != expectedCount { + t.Errorf("期望回调数量为 %d,实际为 %d", expectedCount, s.closeCallback.Count()) + } + + // 手动调用关闭回调(模拟 invokeCloseCallbacks) + callbackMutex.Lock() + callbackResults = nil // 清空之前的结果 + callbackMutex.Unlock() + + // 执行所有关闭回调 + s.closeCallback.Invoke() + + // 等待回调执行完成 + time.Sleep(10 * time.Millisecond) + + // 验证所有回调都被执行 + callbackMutex.Lock() + if len(callbackResults) != expectedCount { + t.Errorf("期望执行 %d 个回调,实际执行了 %d 个", expectedCount, len(callbackResults)) + } + + // 验证回调执行顺序(应该按照添加顺序执行) + expectedActions := []string{"清理资源", "关闭连接", "记录审计日志", "更新统计信息"} + for i, result := range callbackResults { + if i < len(expectedActions) && result != expectedActions[i] { + t.Errorf("位置 %d: 期望执行 '%s',实际执行了 '%s'", i, expectedActions[i], result) + } + } + callbackMutex.Unlock() + + // 测试移除回调后再次执行 + s.RemoveCloseCallback("cleanup", "resources") + + callbackMutex.Lock() + callbackResults = nil + callbackMutex.Unlock() + + // 再次执行回调 + s.closeCallback.Invoke() + time.Sleep(10 * time.Millisecond) + + // 验证移除后的执行结果 + callbackMutex.Lock() + expectedAfterRemove := expectedCount - 1 + if len(callbackResults) != expectedAfterRemove { + t.Errorf("移除一个回调后期望执行 %d 个回调,实际执行了 %d 个", expectedAfterRemove, len(callbackResults)) + } + callbackMutex.Unlock() +} + +func TestInvokeCloseCallbacksEmpty(t *testing.T) { + // 测试空回调列表的情况 + s := &session{ + once: &sync.Once{}, + done: make(chan struct{}), + closeCallback: callbacks{}, + } + + // 验证空列表 + if s.closeCallback.Count() != 0 { + t.Errorf("空列表期望数量为 0,实际为 %d", s.closeCallback.Count()) + } + + // 执行空回调列表(不应该panic) + s.closeCallback.Invoke() + + // 添加一个回调然后移除,再次执行 + s.AddCloseCallback("test", "key", func() {}) + s.RemoveCloseCallback("test", "key") + + // 移除后执行空列表(不应该panic) + s.closeCallback.Invoke() + + if s.closeCallback.Count() != 0 { + t.Errorf("移除后期望数量为 0,实际为 %d", s.closeCallback.Count()) + } +} From af7c6e6d4b6b5403c55369cf306dd7b018656de9 Mon Sep 17 00:00:00 2001 From: mazheng Date: Tue, 2 Sep 2025 16:44:04 +0800 Subject: [PATCH 03/25] =?UTF-8?q?=E6=95=B4=E5=90=88test=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- transport/callback_test.go | 65 ++--- transport/session_callback_test.go | 408 +++++++++++++++-------------- 2 files changed, 234 insertions(+), 239 deletions(-) diff --git a/transport/callback_test.go b/transport/callback_test.go index 09c84d7d..8879bdf9 100644 --- a/transport/callback_test.go +++ b/transport/callback_test.go @@ -5,13 +5,18 @@ import ( ) func TestCallback(t *testing.T) { - var count, expected, remove, totalCount int - var cb = &callbacks{} + // 测试空列表 + cb := &callbacks{} + if cb.Count() != 0 { + t.Errorf("空列表期望数量为 0,实际为 %d", cb.Count()) + } + // 测试添加回调函数 + var count, expected, remove, totalCount int totalCount = 10 remove = 5 - // 添加回调函数 + // 添加多个回调函数 for i := 1; i < totalCount; i++ { expected = expected + i func(ii int) { @@ -45,55 +50,43 @@ func TestCallback(t *testing.T) { if count != expectedCount { t.Errorf("期望执行结果为 %d,实际为 %d", expectedCount, count) } -} - -func TestCallbackAddRemove(t *testing.T) { - cb := &callbacks{} - // 测试空列表 - if cb.Count() != 0 { - t.Errorf("空列表期望数量为 0,实际为 %d", cb.Count()) - } + // 测试字符串类型的 handler 和 key + cb2 := &callbacks{} // 添加回调 - cb.Add("handler1", "key1", func() {}) - cb.Add("handler2", "key2", func() {}) - cb.Add("handler3", "key3", func() {}) + cb2.Add("handler1", "key1", func() {}) + cb2.Add("handler2", "key2", func() {}) + cb2.Add("handler3", "key3", func() {}) - if cb.Count() != 3 { - t.Errorf("期望回调数量为 3,实际为 %d", cb.Count()) + if cb2.Count() != 3 { + t.Errorf("期望回调数量为 3,实际为 %d", cb2.Count()) } // 移除中间的回调 - cb.Remove("handler2", "key2") - if cb.Count() != 2 { - t.Errorf("移除中间回调后期望数量为 2,实际为 %d", cb.Count()) + cb2.Remove("handler2", "key2") + if cb2.Count() != 2 { + t.Errorf("移除中间回调后期望数量为 2,实际为 %d", cb2.Count()) } // 移除第一个回调 - cb.Remove("handler1", "key1") - if cb.Count() != 1 { - t.Errorf("移除第一个回调后期望数量为 1,实际为 %d", cb.Count()) + cb2.Remove("handler1", "key1") + if cb2.Count() != 1 { + t.Errorf("移除第一个回调后期望数量为 1,实际为 %d", cb2.Count()) } // 移除最后一个回调 - cb.Remove("handler3", "key3") - if cb.Count() != 0 { - t.Errorf("移除最后一个回调后期望数量为 0,实际为 %d", cb.Count()) + cb2.Remove("handler3", "key3") + if cb2.Count() != 0 { + t.Errorf("移除最后一个回调后期望数量为 0,实际为 %d", cb2.Count()) } -} - -func TestCallbackRemoveNonExistent(t *testing.T) { - cb := &callbacks{} - // 添加一个回调 - cb.Add("handler1", "key1", func() {}) - - // 尝试移除不存在的回调 - cb.Remove("handler2", "key2") + // 测试移除不存在的回调 + cb2.Add("handler1", "key1", func() {}) + cb2.Remove("handler2", "key2") // 尝试移除不存在的回调 // 应该仍然有1个回调 - if cb.Count() != 1 { - t.Errorf("期望回调数量为 1,实际为 %d", cb.Count()) + if cb2.Count() != 1 { + t.Errorf("期望回调数量为 1,实际为 %d", cb2.Count()) } } diff --git a/transport/session_callback_test.go b/transport/session_callback_test.go index 605edec7..86e01caf 100644 --- a/transport/session_callback_test.go +++ b/transport/session_callback_test.go @@ -8,213 +8,215 @@ import ( ) func TestSessionCallback(t *testing.T) { - // 创建一个简单的 session 对象用于测试 - s := &session{ - once: &sync.Once{}, - done: make(chan struct{}), - closeCallback: callbacks{}, - } - - // 测试添加关闭回调 - var callbackExecuted bool - var callbackMutex sync.Mutex - - callback := func() { - callbackMutex.Lock() - callbackExecuted = true - callbackMutex.Unlock() - } - - // 添加回调 - s.AddCloseCallback("testHandler", "testKey", callback) - if s.closeCallback.Count() != 1 { - t.Errorf("期望回调数量为 1,实际为 %d", s.closeCallback.Count()) - } - - // 测试移除回调 - s.RemoveCloseCallback("testHandler", "testKey") - if s.closeCallback.Count() != 0 { - t.Errorf("期望回调数量为 0,实际为 %d", s.closeCallback.Count()) - } - - // 重新添加回调 - s.AddCloseCallback("testHandler", "testKey", callback) - - // 测试关闭时回调执行 - go func() { - time.Sleep(10 * time.Millisecond) - s.stop() - }() - - // 等待回调执行 - time.Sleep(50 * time.Millisecond) - - callbackMutex.Lock() - if !callbackExecuted { - t.Error("回调函数未被执行") - } - callbackMutex.Unlock() -} - -func TestSessionCallbackMultiple(t *testing.T) { - // 创建一个简单的 session 对象用于测试 - s := &session{ - once: &sync.Once{}, - done: make(chan struct{}), - closeCallback: callbacks{}, - } - - var callbackCount int - var callbackMutex sync.Mutex - - // 添加多个回调 - totalCallbacks := 3 - for i := 0; i < totalCallbacks; i++ { - index := i // 捕获循环变量 + // 测试基本的添加、移除和执行回调功能 + t.Run("BasicCallback", func(t *testing.T) { + s := &session{ + once: &sync.Once{}, + done: make(chan struct{}), + closeCallback: callbacks{}, + } + + var callbackExecuted bool + var callbackMutex sync.Mutex + callback := func() { callbackMutex.Lock() - callbackCount++ + callbackExecuted = true callbackMutex.Unlock() } - s.AddCloseCallback(fmt.Sprintf("handler%d", index), fmt.Sprintf("key%d", index), callback) - } - - if s.closeCallback.Count() != totalCallbacks { - t.Errorf("期望回调数量为 %d,实际为 %d", totalCallbacks, s.closeCallback.Count()) - } - - // 移除一个回调 - s.RemoveCloseCallback("handler0", "key0") - expectedAfterRemove := totalCallbacks - 1 - if s.closeCallback.Count() != expectedAfterRemove { - t.Errorf("期望回调数量为 %d,实际为 %d", expectedAfterRemove, s.closeCallback.Count()) - } - - // 测试关闭时剩余回调执行 - go func() { + + // 添加回调 + s.AddCloseCallback("testHandler", "testKey", callback) + if s.closeCallback.Count() != 1 { + t.Errorf("期望回调数量为 1,实际为 %d", s.closeCallback.Count()) + } + + // 测试移除回调 + s.RemoveCloseCallback("testHandler", "testKey") + if s.closeCallback.Count() != 0 { + t.Errorf("期望回调数量为 0,实际为 %d", s.closeCallback.Count()) + } + + // 重新添加回调 + s.AddCloseCallback("testHandler", "testKey", callback) + + // 测试关闭时回调执行 + go func() { + time.Sleep(10 * time.Millisecond) + s.stop() + }() + + // 等待回调执行 + time.Sleep(50 * time.Millisecond) + + callbackMutex.Lock() + if !callbackExecuted { + t.Error("回调函数未被执行") + } + callbackMutex.Unlock() + }) + + // 测试多个回调的添加、移除和执行 + t.Run("MultipleCallbacks", func(t *testing.T) { + s := &session{ + once: &sync.Once{}, + done: make(chan struct{}), + closeCallback: callbacks{}, + } + + var callbackCount int + var callbackMutex sync.Mutex + + // 添加多个回调 + totalCallbacks := 3 + for i := 0; i < totalCallbacks; i++ { + index := i // 捕获循环变量 + callback := func() { + callbackMutex.Lock() + callbackCount++ + callbackMutex.Unlock() + } + s.AddCloseCallback(fmt.Sprintf("handler%d", index), fmt.Sprintf("key%d", index), callback) + } + + if s.closeCallback.Count() != totalCallbacks { + t.Errorf("期望回调数量为 %d,实际为 %d", totalCallbacks, s.closeCallback.Count()) + } + + // 移除一个回调 + s.RemoveCloseCallback("handler0", "key0") + expectedAfterRemove := totalCallbacks - 1 + if s.closeCallback.Count() != expectedAfterRemove { + t.Errorf("期望回调数量为 %d,实际为 %d", expectedAfterRemove, s.closeCallback.Count()) + } + + // 测试关闭时剩余回调执行 + go func() { + time.Sleep(10 * time.Millisecond) + s.stop() + }() + + time.Sleep(50 * time.Millisecond) + + callbackMutex.Lock() + if callbackCount != expectedAfterRemove { + t.Errorf("期望执行的回调数量为 %d,实际为 %d", expectedAfterRemove, callbackCount) + } + callbackMutex.Unlock() + }) + + // 测试 invokeCloseCallbacks 功能 + t.Run("InvokeCloseCallbacks", func(t *testing.T) { + s := &session{ + once: &sync.Once{}, + done: make(chan struct{}), + closeCallback: callbacks{}, + } + + var callbackResults []string + var callbackMutex sync.Mutex + + // 添加多个不同类型的关闭回调 + callbacks := []struct { + handler string + key string + action string + }{ + {"cleanup", "resources", "清理资源"}, + {"cleanup", "connections", "关闭连接"}, + {"logging", "audit", "记录审计日志"}, + {"metrics", "stats", "更新统计信息"}, + } + + // 注册所有回调 + for _, cb := range callbacks { + cbCopy := cb // 捕获循环变量 + callback := func() { + callbackMutex.Lock() + callbackResults = append(callbackResults, cbCopy.action) + callbackMutex.Unlock() + } + s.AddCloseCallback(cbCopy.handler, cbCopy.key, callback) + } + + // 验证回调数量 + expectedCount := len(callbacks) + if s.closeCallback.Count() != expectedCount { + t.Errorf("期望回调数量为 %d,实际为 %d", expectedCount, s.closeCallback.Count()) + } + + // 手动调用关闭回调(模拟 invokeCloseCallbacks) + callbackMutex.Lock() + callbackResults = nil // 清空之前的结果 + callbackMutex.Unlock() + + // 执行所有关闭回调 + s.closeCallback.Invoke() + + // 等待回调执行完成 time.Sleep(10 * time.Millisecond) - s.stop() - }() - - time.Sleep(50 * time.Millisecond) - - callbackMutex.Lock() - if callbackCount != expectedAfterRemove { - t.Errorf("期望执行的回调数量为 %d,实际为 %d", expectedAfterRemove, callbackCount) - } - callbackMutex.Unlock() -} - -func TestInvokeCloseCallbacks(t *testing.T) { - // 创建一个简单的 session 对象用于测试 - s := &session{ - once: &sync.Once{}, - done: make(chan struct{}), - closeCallback: callbacks{}, - } - - var callbackResults []string - var callbackMutex sync.Mutex - - // 添加多个不同类型的关闭回调 - callbacks := []struct { - handler string - key string - action string - }{ - {"cleanup", "resources", "清理资源"}, - {"cleanup", "connections", "关闭连接"}, - {"logging", "audit", "记录审计日志"}, - {"metrics", "stats", "更新统计信息"}, - } - - // 注册所有回调 - for _, cb := range callbacks { - cbCopy := cb // 捕获循环变量 - callback := func() { - callbackMutex.Lock() - callbackResults = append(callbackResults, cbCopy.action) - callbackMutex.Unlock() + + // 验证所有回调都被执行 + callbackMutex.Lock() + if len(callbackResults) != expectedCount { + t.Errorf("期望执行 %d 个回调,实际执行了 %d 个", expectedCount, len(callbackResults)) } - s.AddCloseCallback(cbCopy.handler, cbCopy.key, callback) - } - - // 验证回调数量 - expectedCount := len(callbacks) - if s.closeCallback.Count() != expectedCount { - t.Errorf("期望回调数量为 %d,实际为 %d", expectedCount, s.closeCallback.Count()) - } - - // 手动调用关闭回调(模拟 invokeCloseCallbacks) - callbackMutex.Lock() - callbackResults = nil // 清空之前的结果 - callbackMutex.Unlock() - - // 执行所有关闭回调 - s.closeCallback.Invoke() - - // 等待回调执行完成 - time.Sleep(10 * time.Millisecond) - - // 验证所有回调都被执行 - callbackMutex.Lock() - if len(callbackResults) != expectedCount { - t.Errorf("期望执行 %d 个回调,实际执行了 %d 个", expectedCount, len(callbackResults)) - } - - // 验证回调执行顺序(应该按照添加顺序执行) - expectedActions := []string{"清理资源", "关闭连接", "记录审计日志", "更新统计信息"} - for i, result := range callbackResults { - if i < len(expectedActions) && result != expectedActions[i] { - t.Errorf("位置 %d: 期望执行 '%s',实际执行了 '%s'", i, expectedActions[i], result) - } - } - callbackMutex.Unlock() - - // 测试移除回调后再次执行 - s.RemoveCloseCallback("cleanup", "resources") - - callbackMutex.Lock() - callbackResults = nil - callbackMutex.Unlock() - - // 再次执行回调 - s.closeCallback.Invoke() - time.Sleep(10 * time.Millisecond) - - // 验证移除后的执行结果 - callbackMutex.Lock() - expectedAfterRemove := expectedCount - 1 - if len(callbackResults) != expectedAfterRemove { - t.Errorf("移除一个回调后期望执行 %d 个回调,实际执行了 %d 个", expectedAfterRemove, len(callbackResults)) - } - callbackMutex.Unlock() -} - -func TestInvokeCloseCallbacksEmpty(t *testing.T) { - // 测试空回调列表的情况 - s := &session{ - once: &sync.Once{}, - done: make(chan struct{}), - closeCallback: callbacks{}, - } - - // 验证空列表 - if s.closeCallback.Count() != 0 { - t.Errorf("空列表期望数量为 0,实际为 %d", s.closeCallback.Count()) - } - - // 执行空回调列表(不应该panic) - s.closeCallback.Invoke() - - // 添加一个回调然后移除,再次执行 - s.AddCloseCallback("test", "key", func() {}) - s.RemoveCloseCallback("test", "key") - - // 移除后执行空列表(不应该panic) - s.closeCallback.Invoke() - - if s.closeCallback.Count() != 0 { - t.Errorf("移除后期望数量为 0,实际为 %d", s.closeCallback.Count()) - } + + // 验证回调执行顺序(应该按照添加顺序执行) + expectedActions := []string{"清理资源", "关闭连接", "记录审计日志", "更新统计信息"} + for i, result := range callbackResults { + if i < len(expectedActions) && result != expectedActions[i] { + t.Errorf("位置 %d: 期望执行 '%s',实际执行了 '%s'", i, expectedActions[i], result) + } + } + callbackMutex.Unlock() + + // 测试移除回调后再次执行 + s.RemoveCloseCallback("cleanup", "resources") + + callbackMutex.Lock() + callbackResults = nil + callbackMutex.Unlock() + + // 再次执行回调 + s.closeCallback.Invoke() + time.Sleep(10 * time.Millisecond) + + // 验证移除后的执行结果 + callbackMutex.Lock() + expectedAfterRemove := expectedCount - 1 + if len(callbackResults) != expectedAfterRemove { + t.Errorf("移除一个回调后期望执行 %d 个回调,实际执行了 %d 个", expectedAfterRemove, len(callbackResults)) + } + callbackMutex.Unlock() + }) + + // 测试边界情况 + t.Run("EdgeCases", func(t *testing.T) { + // 测试空回调列表的情况 + s := &session{ + once: &sync.Once{}, + done: make(chan struct{}), + closeCallback: callbacks{}, + } + + // 验证空列表 + if s.closeCallback.Count() != 0 { + t.Errorf("空列表期望数量为 0,实际为 %d", s.closeCallback.Count()) + } + + // 执行空回调列表(不应该panic) + s.closeCallback.Invoke() + + // 添加一个回调然后移除,再次执行 + s.AddCloseCallback("test", "key", func() {}) + s.RemoveCloseCallback("test", "key") + + // 移除后执行空列表(不应该panic) + s.closeCallback.Invoke() + + if s.closeCallback.Count() != 0 { + t.Errorf("移除后期望数量为 0,实际为 %d", s.closeCallback.Count()) + } + }) } From f7fe83d99ea71a3fda68d79e777e7b7c371f1e14 Mon Sep 17 00:00:00 2001 From: mazheng Date: Tue, 2 Sep 2025 16:52:37 +0800 Subject: [PATCH 04/25] change desc to english --- transport/callback_test.go | 52 ++++++++-------- transport/session.go | 1 - transport/session_callback.go | 66 ++++++++++---------- transport/session_callback_test.go | 96 +++++++++++++++--------------- 4 files changed, 107 insertions(+), 108 deletions(-) diff --git a/transport/callback_test.go b/transport/callback_test.go index 8879bdf9..98cf266e 100644 --- a/transport/callback_test.go +++ b/transport/callback_test.go @@ -5,18 +5,18 @@ import ( ) func TestCallback(t *testing.T) { - // 测试空列表 + // Test empty list cb := &callbacks{} if cb.Count() != 0 { - t.Errorf("空列表期望数量为 0,实际为 %d", cb.Count()) + t.Errorf("Expected count for empty list is 0, but got %d", cb.Count()) } - // 测试添加回调函数 + // Test adding callback functions var count, expected, remove, totalCount int totalCount = 10 remove = 5 - // 添加多个回调函数 + // Add multiple callback functions for i := 1; i < totalCount; i++ { expected = expected + i func(ii int) { @@ -24,69 +24,69 @@ func TestCallback(t *testing.T) { }(i) } - // 验证添加后的数量 + // Verify count after adding expectedCallbacks := totalCount - 1 if cb.Count() != expectedCallbacks { - t.Errorf("期望回调数量为 %d,实际为 %d", expectedCallbacks, cb.Count()) + t.Errorf("Expected callback count is %d, but got %d", expectedCallbacks, cb.Count()) } - // 测试添加 nil 回调 + // Test adding nil callback cb.Add(remove, remove, nil) if cb.Count() != expectedCallbacks { - t.Errorf("添加 nil 回调后期望数量为 %d,实际为 %d", expectedCallbacks, cb.Count()) + t.Errorf("Expected count after adding nil callback is %d, but got %d", expectedCallbacks, cb.Count()) } - // 移除指定的回调 + // Remove specified callback cb.Remove(remove, remove) - // 尝试移除不存在的回调 + // Try to remove non-existent callback cb.Remove(remove+1, remove+2) - // 执行所有回调 + // Execute all callbacks cb.Invoke() - // 验证执行结果 + // Verify execution result expectedCount := expected - remove if count != expectedCount { - t.Errorf("期望执行结果为 %d,实际为 %d", expectedCount, count) + t.Errorf("Expected execution result is %d, but got %d", expectedCount, count) } - // 测试字符串类型的 handler 和 key + // Test string type handler and key cb2 := &callbacks{} - // 添加回调 + // Add callbacks cb2.Add("handler1", "key1", func() {}) cb2.Add("handler2", "key2", func() {}) cb2.Add("handler3", "key3", func() {}) if cb2.Count() != 3 { - t.Errorf("期望回调数量为 3,实际为 %d", cb2.Count()) + t.Errorf("Expected callback count is 3, but got %d", cb2.Count()) } - // 移除中间的回调 + // Remove middle callback cb2.Remove("handler2", "key2") if cb2.Count() != 2 { - t.Errorf("移除中间回调后期望数量为 2,实际为 %d", cb2.Count()) + t.Errorf("Expected count after removing middle callback is 2, but got %d", cb2.Count()) } - // 移除第一个回调 + // Remove first callback cb2.Remove("handler1", "key1") if cb2.Count() != 1 { - t.Errorf("移除第一个回调后期望数量为 1,实际为 %d", cb2.Count()) + t.Errorf("Expected count after removing first callback is 1, but got %d", cb2.Count()) } - // 移除最后一个回调 + // Remove last callback cb2.Remove("handler3", "key3") if cb2.Count() != 0 { - t.Errorf("移除最后一个回调后期望数量为 0,实际为 %d", cb2.Count()) + t.Errorf("Expected count after removing last callback is 0, but got %d", cb2.Count()) } - // 测试移除不存在的回调 + // Test removing non-existent callback cb2.Add("handler1", "key1", func() {}) - cb2.Remove("handler2", "key2") // 尝试移除不存在的回调 + cb2.Remove("handler2", "key2") // Try to remove non-existent callback - // 应该仍然有1个回调 + // Should still have 1 callback if cb2.Count() != 1 { - t.Errorf("期望回调数量为 1,实际为 %d", cb2.Count()) + t.Errorf("Expected callback count is 1, but got %d", cb2.Count()) } } diff --git a/transport/session.go b/transport/session.go index 5303feba..021d590f 100644 --- a/transport/session.go +++ b/transport/session.go @@ -882,7 +882,6 @@ func (s *session) stop() { log.Errorf("invokeCloseCallbacks panic: %v", r) } }() - // 执行关闭回调函数 s.invokeCloseCallbacks() }() diff --git a/transport/session_callback.go b/transport/session_callback.go index 791f3275..d7fd233d 100644 --- a/transport/session_callback.go +++ b/transport/session_callback.go @@ -1,16 +1,16 @@ package getty -// AddCloseCallback 添加 Session 关闭回调函数 +// AddCloseCallback adds a close callback function to the Session // -// 参数说明: -// - handler: 处理器标识,用于标识回调的来源或类型 -// - key: 回调的唯一标识键,与 handler 组合使用 -// - f: 要执行的回调函数,在 session 关闭时自动调用 +// Parameters: +// - handler: handler identifier, used to identify the source or type of the callback +// - key: unique identifier key for the callback, used in combination with handler +// - f: callback function to be executed when the session is closed // -// 注意事项: -// - 如果 session 已经关闭,则忽略此次添加 -// - handler 和 key 的组合必须唯一,否则会覆盖之前的回调 -// - 回调函数会在 session 关闭时按照添加顺序执行 +// Notes: +// - If the session is already closed, this addition will be ignored +// - The combination of handler and key must be unique, otherwise it will override previous callbacks +// - Callback functions will be executed in the order they were added when the session closes func (s *session) AddCloseCallback(handler, key any, f CallBackFunc) { if s.IsClosed() { return @@ -20,18 +20,18 @@ func (s *session) AddCloseCallback(handler, key any, f CallBackFunc) { s.closeCallbackMutex.Unlock() } -// RemoveCloseCallback 移除指定的 Session 关闭回调函数 +// RemoveCloseCallback removes the specified Session close callback function // -// 参数说明: -// - handler: 要移除的回调的处理器标识 -// - key: 要移除的回调的唯一标识键 +// Parameters: +// - handler: handler identifier of the callback to be removed +// - key: unique identifier key of the callback to be removed // -// 返回值: 无 +// Return value: none // -// 注意事项: -// - 如果 session 已经关闭,则忽略此次移除操作 -// - 如果找不到匹配的回调,此操作不会产生任何效果 -// - 移除操作是线程安全的 +// Notes: +// - If the session is already closed, this removal operation will be ignored +// - If no matching callback is found, this operation will have no effect +// - The removal operation is thread-safe func (s *session) RemoveCloseCallback(handler, key any) { if s.IsClosed() { return @@ -41,28 +41,28 @@ func (s *session) RemoveCloseCallback(handler, key any) { s.closeCallbackMutex.Unlock() } -// invokeCloseCallbacks 执行所有注册的关闭回调函数 +// invokeCloseCallbacks executes all registered close callback functions // -// 功能说明: -// - 按照添加顺序依次执行所有注册的关闭回调 -// - 使用读锁保护回调列表,确保并发安全 -// - 此方法通常在 session 关闭时自动调用 +// Function description: +// - Executes all registered close callbacks in the order they were added +// - Uses read lock to protect the callback list, ensuring concurrency safety +// - This method is typically called automatically when the session closes // -// 注意事项: -// - 此方法是内部方法,不建议外部直接调用 -// - 回调执行过程中如果发生 panic,会被捕获并记录日志 -// - 回调函数应该避免长时间阻塞,建议异步处理耗时操作 +// Notes: +// - This is an internal method, not recommended for external direct calls +// - If panic occurs during callback execution, it will be caught and logged +// - Callback functions should avoid long blocking operations, async processing is recommended for time-consuming tasks func (s *session) invokeCloseCallbacks() { s.closeCallbackMutex.RLock() s.closeCallback.Invoke() s.closeCallbackMutex.RUnlock() } -// CallBackFunc 定义 Session 关闭时的回调函数类型 +// CallBackFunc defines the callback function type when Session closes // -// 使用说明: -// - 回调函数不接受任何参数 -// - 回调函数不返回任何值 -// - 回调函数应该处理资源清理、状态更新等操作 -// - 建议在回调函数中避免访问已关闭的 session 状态 +// Usage notes: +// - Callback function accepts no parameters +// - Callback function returns no values +// - Callback function should handle resource cleanup, state updates, etc. +// - It's recommended to avoid accessing closed session state in callback functions type CallBackFunc func() diff --git a/transport/session_callback_test.go b/transport/session_callback_test.go index 86e01caf..3b0bf5cd 100644 --- a/transport/session_callback_test.go +++ b/transport/session_callback_test.go @@ -8,7 +8,7 @@ import ( ) func TestSessionCallback(t *testing.T) { - // 测试基本的添加、移除和执行回调功能 + // Test basic add, remove and execute callback functionality t.Run("BasicCallback", func(t *testing.T) { s := &session{ once: &sync.Once{}, @@ -25,38 +25,38 @@ func TestSessionCallback(t *testing.T) { callbackMutex.Unlock() } - // 添加回调 + // Add callback s.AddCloseCallback("testHandler", "testKey", callback) if s.closeCallback.Count() != 1 { - t.Errorf("期望回调数量为 1,实际为 %d", s.closeCallback.Count()) + t.Errorf("Expected callback count is 1, but got %d", s.closeCallback.Count()) } - // 测试移除回调 + // Test removing callback s.RemoveCloseCallback("testHandler", "testKey") if s.closeCallback.Count() != 0 { - t.Errorf("期望回调数量为 0,实际为 %d", s.closeCallback.Count()) + t.Errorf("Expected callback count is 0, but got %d", s.closeCallback.Count()) } - // 重新添加回调 + // Re-add callback s.AddCloseCallback("testHandler", "testKey", callback) - // 测试关闭时回调执行 + // Test callback execution when closing go func() { time.Sleep(10 * time.Millisecond) s.stop() }() - // 等待回调执行 + // Wait for callback execution time.Sleep(50 * time.Millisecond) callbackMutex.Lock() if !callbackExecuted { - t.Error("回调函数未被执行") + t.Error("Callback function was not executed") } callbackMutex.Unlock() }) - // 测试多个回调的添加、移除和执行 + // Test adding, removing and executing multiple callbacks t.Run("MultipleCallbacks", func(t *testing.T) { s := &session{ once: &sync.Once{}, @@ -67,10 +67,10 @@ func TestSessionCallback(t *testing.T) { var callbackCount int var callbackMutex sync.Mutex - // 添加多个回调 + // Add multiple callbacks totalCallbacks := 3 for i := 0; i < totalCallbacks; i++ { - index := i // 捕获循环变量 + index := i // Capture loop variable callback := func() { callbackMutex.Lock() callbackCount++ @@ -80,17 +80,17 @@ func TestSessionCallback(t *testing.T) { } if s.closeCallback.Count() != totalCallbacks { - t.Errorf("期望回调数量为 %d,实际为 %d", totalCallbacks, s.closeCallback.Count()) + t.Errorf("Expected callback count is %d, but got %d", totalCallbacks, s.closeCallback.Count()) } - // 移除一个回调 + // Remove one callback s.RemoveCloseCallback("handler0", "key0") expectedAfterRemove := totalCallbacks - 1 if s.closeCallback.Count() != expectedAfterRemove { - t.Errorf("期望回调数量为 %d,实际为 %d", expectedAfterRemove, s.closeCallback.Count()) + t.Errorf("Expected callback count is %d, but got %d", expectedAfterRemove, s.closeCallback.Count()) } - // 测试关闭时剩余回调执行 + // Test execution of remaining callbacks when closing go func() { time.Sleep(10 * time.Millisecond) s.stop() @@ -100,12 +100,12 @@ func TestSessionCallback(t *testing.T) { callbackMutex.Lock() if callbackCount != expectedAfterRemove { - t.Errorf("期望执行的回调数量为 %d,实际为 %d", expectedAfterRemove, callbackCount) + t.Errorf("Expected executed callback count is %d, but got %d", expectedAfterRemove, callbackCount) } callbackMutex.Unlock() }) - // 测试 invokeCloseCallbacks 功能 + // Test invokeCloseCallbacks functionality t.Run("InvokeCloseCallbacks", func(t *testing.T) { s := &session{ once: &sync.Once{}, @@ -116,21 +116,21 @@ func TestSessionCallback(t *testing.T) { var callbackResults []string var callbackMutex sync.Mutex - // 添加多个不同类型的关闭回调 + // Add multiple different types of close callbacks callbacks := []struct { handler string key string action string }{ - {"cleanup", "resources", "清理资源"}, - {"cleanup", "connections", "关闭连接"}, - {"logging", "audit", "记录审计日志"}, - {"metrics", "stats", "更新统计信息"}, + {"cleanup", "resources", "Clean resources"}, + {"cleanup", "connections", "Close connections"}, + {"logging", "audit", "Log audit info"}, + {"metrics", "stats", "Update statistics"}, } - // 注册所有回调 + // Register all callbacks for _, cb := range callbacks { - cbCopy := cb // 捕获循环变量 + cbCopy := cb // Capture loop variable callback := func() { callbackMutex.Lock() callbackResults = append(callbackResults, cbCopy.action) @@ -139,84 +139,84 @@ func TestSessionCallback(t *testing.T) { s.AddCloseCallback(cbCopy.handler, cbCopy.key, callback) } - // 验证回调数量 + // Verify callback count expectedCount := len(callbacks) if s.closeCallback.Count() != expectedCount { - t.Errorf("期望回调数量为 %d,实际为 %d", expectedCount, s.closeCallback.Count()) + t.Errorf("Expected callback count is %d, but got %d", expectedCount, s.closeCallback.Count()) } - // 手动调用关闭回调(模拟 invokeCloseCallbacks) + // Manually invoke close callbacks (simulate invokeCloseCallbacks) callbackMutex.Lock() - callbackResults = nil // 清空之前的结果 + callbackResults = nil // Clear previous results callbackMutex.Unlock() - // 执行所有关闭回调 + // Execute all close callbacks s.closeCallback.Invoke() - // 等待回调执行完成 + // Wait for callback execution to complete time.Sleep(10 * time.Millisecond) - // 验证所有回调都被执行 + // Verify all callbacks were executed callbackMutex.Lock() if len(callbackResults) != expectedCount { - t.Errorf("期望执行 %d 个回调,实际执行了 %d 个", expectedCount, len(callbackResults)) + t.Errorf("Expected to execute %d callbacks, but executed %d", expectedCount, len(callbackResults)) } - // 验证回调执行顺序(应该按照添加顺序执行) - expectedActions := []string{"清理资源", "关闭连接", "记录审计日志", "更新统计信息"} + // Verify callback execution order (should execute in order of addition) + expectedActions := []string{"Clean resources", "Close connections", "Log audit info", "Update statistics"} for i, result := range callbackResults { if i < len(expectedActions) && result != expectedActions[i] { - t.Errorf("位置 %d: 期望执行 '%s',实际执行了 '%s'", i, expectedActions[i], result) + t.Errorf("Position %d: Expected to execute '%s', but executed '%s'", i, expectedActions[i], result) } } callbackMutex.Unlock() - // 测试移除回调后再次执行 + // Test execution after removing a callback s.RemoveCloseCallback("cleanup", "resources") callbackMutex.Lock() callbackResults = nil callbackMutex.Unlock() - // 再次执行回调 + // Execute callbacks again s.closeCallback.Invoke() time.Sleep(10 * time.Millisecond) - // 验证移除后的执行结果 + // Verify execution results after removal callbackMutex.Lock() expectedAfterRemove := expectedCount - 1 if len(callbackResults) != expectedAfterRemove { - t.Errorf("移除一个回调后期望执行 %d 个回调,实际执行了 %d 个", expectedAfterRemove, len(callbackResults)) + t.Errorf("Expected to execute %d callbacks after removal, but executed %d", expectedAfterRemove, len(callbackResults)) } callbackMutex.Unlock() }) - // 测试边界情况 + // Test edge cases t.Run("EdgeCases", func(t *testing.T) { - // 测试空回调列表的情况 + // Test empty callback list scenario s := &session{ once: &sync.Once{}, done: make(chan struct{}), closeCallback: callbacks{}, } - // 验证空列表 + // Verify empty list if s.closeCallback.Count() != 0 { - t.Errorf("空列表期望数量为 0,实际为 %d", s.closeCallback.Count()) + t.Errorf("Expected count for empty list is 0, but got %d", s.closeCallback.Count()) } - // 执行空回调列表(不应该panic) + // Execute empty callback list (should not panic) s.closeCallback.Invoke() - // 添加一个回调然后移除,再次执行 + // Add a callback then remove it, execute again s.AddCloseCallback("test", "key", func() {}) s.RemoveCloseCallback("test", "key") - // 移除后执行空列表(不应该panic) + // Execute empty list after removal (should not panic) s.closeCallback.Invoke() if s.closeCallback.Count() != 0 { - t.Errorf("移除后期望数量为 0,实际为 %d", s.closeCallback.Count()) + t.Errorf("Expected count after removal is 0, but got %d", s.closeCallback.Count()) } }) } From 558c6266a117619ceba159d5ec779d45aa0cf9d6 Mon Sep 17 00:00:00 2001 From: mazheng Date: Sat, 13 Sep 2025 13:44:26 +0800 Subject: [PATCH 05/25] =?UTF-8?q?=E7=BF=BB=E8=AF=91=E6=B3=A8=E9=87=8A?= =?UTF-8?q?=EF=BC=8C=E5=A2=9E=E5=8A=A0=E8=AE=B8=E5=8F=AF=E8=AF=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- transport/callback.go | 95 +++++++++++++++++++++++++------------------ 1 file changed, 56 insertions(+), 39 deletions(-) diff --git a/transport/callback.go b/transport/callback.go index ebb6116e..1d4dbeef 100644 --- a/transport/callback.go +++ b/transport/callback.go @@ -1,96 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package getty -// callbackCommon 表示回调链表中的一个节点 -// 每个节点包含处理器标识、键值、回调函数和指向下一个节点的指针 +// callbackCommon represents a node in the callback linked list +// Each node contains handler identifier, key, callback function and pointer to next node type callbackCommon struct { - handler interface{} // 处理器标识,用于标识回调的来源或类型 - key interface{} // 回调的唯一标识键,与 handler 组合使用 - call func() // 实际要执行的回调函数 - next *callbackCommon // 指向下一个节点的指针,形成链表结构 + handler interface{} // Handler identifier, used to identify the source or type of callback + key interface{} // Unique identifier key for callback, used in combination with handler + call func() // Actual callback function to be executed + next *callbackCommon // Pointer to next node, forming linked list structure } -// callbacks 是一个单向链表结构,用于管理多个回调函数 -// 支持动态添加、移除和执行回调 +// callbacks is a singly linked list structure for managing multiple callback functions +// Supports dynamic addition, removal and execution of callbacks type callbacks struct { - first *callbackCommon // 指向链表第一个节点的指针 - last *callbackCommon // 指向链表最后一个节点的指针,用于快速添加新节点 + first *callbackCommon // Pointer to the first node of the linked list + last *callbackCommon // Pointer to the last node of the linked list, used for quick addition of new nodes } -// Add 向回调链表中添加一个新的回调函数 -// 参数说明: -// - handler: 处理器标识,可以是任意类型 -// - key: 回调的唯一标识键,与 handler 组合使用 -// - callback: 要执行的回调函数,如果为 nil 则忽略 +// Add adds a new callback function to the callback linked list +// Parameters: +// - handler: Handler identifier, can be any type +// - key: Unique identifier key for callback, used in combination with handler +// - callback: Callback function to be executed, ignored if nil func (t *callbacks) Add(handler, key interface{}, callback func()) { - // 防止添加空回调函数 + // Prevent adding empty callback function if callback == nil { return } - // 创建新的回调节点 + // Create new callback node newItem := &callbackCommon{handler, key, callback, nil} if t.first == nil { - // 如果链表为空,新节点成为第一个节点 + // If linked list is empty, new node becomes the first node t.first = newItem } else { - // 否则将新节点添加到链表末尾 + // Otherwise add new node to the end of linked list t.last.next = newItem } - // 更新最后一个节点的指针 + // Update pointer to last node t.last = newItem } -// Remove 从回调链表中移除指定的回调函数 -// 参数说明: -// - handler: 要移除的回调的处理器标识 -// - key: 要移除的回调的唯一标识键 -// 注意: 如果找不到匹配的回调,此方法不会产生任何效果 +// Remove removes the specified callback function from the callback linked list +// Parameters: +// - handler: Handler identifier of the callback to be removed +// - key: Unique identifier key of the callback to be removed +// Note: If no matching callback is found, this method has no effect func (t *callbacks) Remove(handler, key interface{}) { var prev *callbackCommon - // 遍历链表查找要移除的节点 + // Traverse linked list to find the node to be removed for callback := t.first; callback != nil; prev, callback = callback, callback.next { - // 找到匹配的节点 + // Found matching node if callback.handler == handler && callback.key == key { if t.first == callback { - // 如果是第一个节点,更新 first 指针 + // If it's the first node, update first pointer t.first = callback.next } else if prev != nil { - // 如果是中间节点,更新前一个节点的 next 指针 + // If it's a middle node, update the next pointer of the previous node prev.next = callback.next } if t.last == callback { - // 如果是最后一个节点,更新 last 指针 + // If it's the last node, update last pointer t.last = prev } - // 找到并移除后立即返回 + // Return immediately after finding and removing return } } } -// Invoke 执行链表中所有注册的回调函数 -// 按照添加的顺序依次执行每个回调 -// 注意: 如果某个回调函数为 nil,会被跳过 +// Invoke executes all registered callback functions in the linked list +// Executes each callback in the order they were added +// Note: If a callback function is nil, it will be skipped func (t *callbacks) Invoke() { - // 从头节点开始遍历整个链表 + // Traverse the entire linked list starting from the head node for callback := t.first; callback != nil; callback = callback.next { - // 确保回调函数不为 nil 再执行 + // Ensure callback function is not nil before executing if callback.call != nil { callback.call() } } } -// Count 返回链表中回调函数的数量 -// 返回值: 当前注册的回调函数总数 +// Count returns the number of callback functions in the linked list +// Return value: Total number of currently registered callback functions func (t *callbacks) Count() int { var count int - // 遍历链表计数 + // Traverse linked list to count for callback := t.first; callback != nil; callback = callback.next { count++ } From a51bf83ef8449b2916062d1e426e17b260895085 Mon Sep 17 00:00:00 2001 From: mazheng Date: Sat, 13 Sep 2025 14:02:40 +0800 Subject: [PATCH 06/25] =?UTF-8?q?=E8=B0=83=E6=95=B4=E6=A0=BC=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- transport/callback.go | 27 ++++----- transport/callback_test.go | 36 ++++++------ transport/session_callback.go | 25 +++++++-- transport/session_callback_test.go | 88 +++++++++++++++--------------- 4 files changed, 97 insertions(+), 79 deletions(-) diff --git a/transport/callback.go b/transport/callback.go index 1d4dbeef..281354a7 100644 --- a/transport/callback.go +++ b/transport/callback.go @@ -20,17 +20,17 @@ package getty // callbackCommon represents a node in the callback linked list // Each node contains handler identifier, key, callback function and pointer to next node type callbackCommon struct { - handler interface{} // Handler identifier, used to identify the source or type of callback - key interface{} // Unique identifier key for callback, used in combination with handler - call func() // Actual callback function to be executed - next *callbackCommon // Pointer to next node, forming linked list structure + handler interface{} // Handler identifier, used to identify the source or type of callback + key interface{} // Unique identifier key for callback, used in combination with handler + call func() // Actual callback function to be executed + next *callbackCommon // Pointer to next node, forming linked list structure } // callbacks is a singly linked list structure for managing multiple callback functions // Supports dynamic addition, removal and execution of callbacks type callbacks struct { - first *callbackCommon // Pointer to the first node of the linked list - last *callbackCommon // Pointer to the last node of the linked list, used for quick addition of new nodes + first *callbackCommon // Pointer to the first node of the linked list + last *callbackCommon // Pointer to the last node of the linked list, used for quick addition of new nodes } // Add adds a new callback function to the callback linked list @@ -43,10 +43,10 @@ func (t *callbacks) Add(handler, key interface{}, callback func()) { if callback == nil { return } - + // Create new callback node newItem := &callbackCommon{handler, key, callback, nil} - + if t.first == nil { // If linked list is empty, new node becomes the first node t.first = newItem @@ -62,10 +62,11 @@ func (t *callbacks) Add(handler, key interface{}, callback func()) { // Parameters: // - handler: Handler identifier of the callback to be removed // - key: Unique identifier key of the callback to be removed +// // Note: If no matching callback is found, this method has no effect func (t *callbacks) Remove(handler, key interface{}) { var prev *callbackCommon - + // Traverse linked list to find the node to be removed for callback := t.first; callback != nil; prev, callback = callback, callback.next { // Found matching node @@ -77,12 +78,12 @@ func (t *callbacks) Remove(handler, key interface{}) { // If it's a middle node, update the next pointer of the previous node prev.next = callback.next } - + if t.last == callback { // If it's the last node, update last pointer t.last = prev } - + // Return immediately after finding and removing return } @@ -106,11 +107,11 @@ func (t *callbacks) Invoke() { // Return value: Total number of currently registered callback functions func (t *callbacks) Count() int { var count int - + // Traverse linked list to count for callback := t.first; callback != nil; callback = callback.next { count++ } - + return count } diff --git a/transport/callback_test.go b/transport/callback_test.go index 98cf266e..d62e87aa 100644 --- a/transport/callback_test.go +++ b/transport/callback_test.go @@ -10,81 +10,81 @@ func TestCallback(t *testing.T) { if cb.Count() != 0 { t.Errorf("Expected count for empty list is 0, but got %d", cb.Count()) } - + // Test adding callback functions var count, expected, remove, totalCount int totalCount = 10 remove = 5 - + // Add multiple callback functions for i := 1; i < totalCount; i++ { expected = expected + i - func(ii int) { - cb.Add(ii, ii, func() { count = count + ii }) + func(ii int) { + cb.Add(ii, ii, func() { count = count + ii }) }(i) } - + // Verify count after adding expectedCallbacks := totalCount - 1 if cb.Count() != expectedCallbacks { t.Errorf("Expected callback count is %d, but got %d", expectedCallbacks, cb.Count()) } - + // Test adding nil callback cb.Add(remove, remove, nil) if cb.Count() != expectedCallbacks { t.Errorf("Expected count after adding nil callback is %d, but got %d", expectedCallbacks, cb.Count()) } - + // Remove specified callback cb.Remove(remove, remove) - + // Try to remove non-existent callback cb.Remove(remove+1, remove+2) - + // Execute all callbacks cb.Invoke() - + // Verify execution result expectedCount := expected - remove if count != expectedCount { t.Errorf("Expected execution result is %d, but got %d", expectedCount, count) } - + // Test string type handler and key cb2 := &callbacks{} - + // Add callbacks cb2.Add("handler1", "key1", func() {}) cb2.Add("handler2", "key2", func() {}) cb2.Add("handler3", "key3", func() {}) - + if cb2.Count() != 3 { t.Errorf("Expected callback count is 3, but got %d", cb2.Count()) } - + // Remove middle callback cb2.Remove("handler2", "key2") if cb2.Count() != 2 { t.Errorf("Expected count after removing middle callback is 2, but got %d", cb2.Count()) } - + // Remove first callback cb2.Remove("handler1", "key1") if cb2.Count() != 1 { t.Errorf("Expected count after removing first callback is 1, but got %d", cb2.Count()) } - + // Remove last callback cb2.Remove("handler3", "key3") if cb2.Count() != 0 { t.Errorf("Expected count after removing last callback is 0, but got %d", cb2.Count()) } - + // Test removing non-existent callback cb2.Add("handler1", "key1", func() {}) cb2.Remove("handler2", "key2") // Try to remove non-existent callback - + // Should still have 1 callback if cb2.Count() != 1 { t.Errorf("Expected callback count is 1, but got %d", cb2.Count()) diff --git a/transport/session_callback.go b/transport/session_callback.go index d7fd233d..38e046e1 100644 --- a/transport/session_callback.go +++ b/transport/session_callback.go @@ -1,7 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package getty // AddCloseCallback adds a close callback function to the Session -// +// // Parameters: // - handler: handler identifier, used to identify the source or type of the callback // - key: unique identifier key for the callback, used in combination with handler @@ -21,7 +38,7 @@ func (s *session) AddCloseCallback(handler, key any, f CallBackFunc) { } // RemoveCloseCallback removes the specified Session close callback function -// +// // Parameters: // - handler: handler identifier of the callback to be removed // - key: unique identifier key of the callback to be removed @@ -42,7 +59,7 @@ func (s *session) RemoveCloseCallback(handler, key any) { } // invokeCloseCallbacks executes all registered close callback functions -// +// // Function description: // - Executes all registered close callbacks in the order they were added // - Uses read lock to protect the callback list, ensuring concurrency safety @@ -59,7 +76,7 @@ func (s *session) invokeCloseCallbacks() { } // CallBackFunc defines the callback function type when Session closes -// +// // Usage notes: // - Callback function accepts no parameters // - Callback function returns no values diff --git a/transport/session_callback_test.go b/transport/session_callback_test.go index 3b0bf5cd..8729e1b8 100644 --- a/transport/session_callback_test.go +++ b/transport/session_callback_test.go @@ -11,62 +11,62 @@ func TestSessionCallback(t *testing.T) { // Test basic add, remove and execute callback functionality t.Run("BasicCallback", func(t *testing.T) { s := &session{ - once: &sync.Once{}, - done: make(chan struct{}), + once: &sync.Once{}, + done: make(chan struct{}), closeCallback: callbacks{}, } - + var callbackExecuted bool var callbackMutex sync.Mutex - + callback := func() { callbackMutex.Lock() callbackExecuted = true callbackMutex.Unlock() } - + // Add callback s.AddCloseCallback("testHandler", "testKey", callback) if s.closeCallback.Count() != 1 { t.Errorf("Expected callback count is 1, but got %d", s.closeCallback.Count()) } - + // Test removing callback s.RemoveCloseCallback("testHandler", "testKey") if s.closeCallback.Count() != 0 { t.Errorf("Expected callback count is 0, but got %d", s.closeCallback.Count()) } - + // Re-add callback s.AddCloseCallback("testHandler", "testKey", callback) - + // Test callback execution when closing go func() { time.Sleep(10 * time.Millisecond) s.stop() }() - + // Wait for callback execution time.Sleep(50 * time.Millisecond) - + callbackMutex.Lock() if !callbackExecuted { t.Error("Callback function was not executed") } callbackMutex.Unlock() }) - + // Test adding, removing and executing multiple callbacks t.Run("MultipleCallbacks", func(t *testing.T) { s := &session{ - once: &sync.Once{}, - done: make(chan struct{}), + once: &sync.Once{}, + done: make(chan struct{}), closeCallback: callbacks{}, } - + var callbackCount int var callbackMutex sync.Mutex - + // Add multiple callbacks totalCallbacks := 3 for i := 0; i < totalCallbacks; i++ { @@ -78,44 +78,44 @@ func TestSessionCallback(t *testing.T) { } s.AddCloseCallback(fmt.Sprintf("handler%d", index), fmt.Sprintf("key%d", index), callback) } - + if s.closeCallback.Count() != totalCallbacks { t.Errorf("Expected callback count is %d, but got %d", totalCallbacks, s.closeCallback.Count()) } - + // Remove one callback s.RemoveCloseCallback("handler0", "key0") expectedAfterRemove := totalCallbacks - 1 if s.closeCallback.Count() != expectedAfterRemove { t.Errorf("Expected callback count is %d, but got %d", expectedAfterRemove, s.closeCallback.Count()) } - + // Test execution of remaining callbacks when closing go func() { time.Sleep(10 * time.Millisecond) s.stop() }() - + time.Sleep(50 * time.Millisecond) - + callbackMutex.Lock() if callbackCount != expectedAfterRemove { t.Errorf("Expected executed callback count is %d, but got %d", expectedAfterRemove, callbackCount) } callbackMutex.Unlock() }) - + // Test invokeCloseCallbacks functionality t.Run("InvokeCloseCallbacks", func(t *testing.T) { s := &session{ - once: &sync.Once{}, - done: make(chan struct{}), + once: &sync.Once{}, + done: make(chan struct{}), closeCallback: callbacks{}, } - + var callbackResults []string var callbackMutex sync.Mutex - + // Add multiple different types of close callbacks callbacks := []struct { handler string @@ -127,7 +127,7 @@ func TestSessionCallback(t *testing.T) { {"logging", "audit", "Log audit info"}, {"metrics", "stats", "Update statistics"}, } - + // Register all callbacks for _, cb := range callbacks { cbCopy := cb // Capture loop variable @@ -138,30 +138,30 @@ func TestSessionCallback(t *testing.T) { } s.AddCloseCallback(cbCopy.handler, cbCopy.key, callback) } - + // Verify callback count expectedCount := len(callbacks) if s.closeCallback.Count() != expectedCount { t.Errorf("Expected callback count is %d, but got %d", expectedCount, s.closeCallback.Count()) } - + // Manually invoke close callbacks (simulate invokeCloseCallbacks) callbackMutex.Lock() callbackResults = nil // Clear previous results callbackMutex.Unlock() - + // Execute all close callbacks s.closeCallback.Invoke() - + // Wait for callback execution to complete time.Sleep(10 * time.Millisecond) - + // Verify all callbacks were executed callbackMutex.Lock() if len(callbackResults) != expectedCount { t.Errorf("Expected to execute %d callbacks, but executed %d", expectedCount, len(callbackResults)) } - + // Verify callback execution order (should execute in order of addition) expectedActions := []string{"Clean resources", "Close connections", "Log audit info", "Update statistics"} for i, result := range callbackResults { @@ -170,18 +170,18 @@ func TestSessionCallback(t *testing.T) { } } callbackMutex.Unlock() - + // Test execution after removing a callback s.RemoveCloseCallback("cleanup", "resources") - + callbackMutex.Lock() callbackResults = nil callbackMutex.Unlock() - + // Execute callbacks again s.closeCallback.Invoke() time.Sleep(10 * time.Millisecond) - + // Verify execution results after removal callbackMutex.Lock() expectedAfterRemove := expectedCount - 1 @@ -190,31 +190,31 @@ func TestSessionCallback(t *testing.T) { } callbackMutex.Unlock() }) - + // Test edge cases t.Run("EdgeCases", func(t *testing.T) { // Test empty callback list scenario s := &session{ - once: &sync.Once{}, - done: make(chan struct{}), + once: &sync.Once{}, + done: make(chan struct{}), closeCallback: callbacks{}, } - + // Verify empty list if s.closeCallback.Count() != 0 { t.Errorf("Expected count for empty list is 0, but got %d", s.closeCallback.Count()) } - + // Execute empty callback list (should not panic) s.closeCallback.Invoke() - + // Add a callback then remove it, execute again s.AddCloseCallback("test", "key", func() {}) s.RemoveCloseCallback("test", "key") - + // Execute empty list after removal (should not panic) s.closeCallback.Invoke() - + if s.closeCallback.Count() != 0 { t.Errorf("Expected count after removal is 0, but got %d", s.closeCallback.Count()) } From ecef7ccb91531446c4ee74c246c18a583e6b2d3d Mon Sep 17 00:00:00 2001 From: mazheng Date: Sat, 13 Sep 2025 14:04:52 +0800 Subject: [PATCH 07/25] test file add License Header --- transport/callback_test.go | 17 +++++++++++++++++ transport/session_callback_test.go | 17 +++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/transport/callback_test.go b/transport/callback_test.go index d62e87aa..5b794be7 100644 --- a/transport/callback_test.go +++ b/transport/callback_test.go @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package getty import ( diff --git a/transport/session_callback_test.go b/transport/session_callback_test.go index 8729e1b8..efe1df44 100644 --- a/transport/session_callback_test.go +++ b/transport/session_callback_test.go @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package getty import ( From a1fa804e1d91ff5cf44e753930fba6624dc71ef4 Mon Sep 17 00:00:00 2001 From: mazheng Date: Sat, 13 Sep 2025 14:29:25 +0800 Subject: [PATCH 08/25] invokeCloseCallbacks panic add stack info --- transport/session.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/transport/session.go b/transport/session.go index ceeb316a..e6d37781 100644 --- a/transport/session.go +++ b/transport/session.go @@ -200,6 +200,9 @@ func (s *session) Reset() { period: period, wait: pendingDuration, attrs: gxcontext.NewValuesContext(context.Background()), + // callbacks: intentionally cleared on reset + // closeCallback: zero-value (empty registry) + // closeCallbackMutex: zero-value (unlocked) } } @@ -872,7 +875,10 @@ func (s *session) stop() { go func() { defer func() { if r := recover(); r != nil { - log.Errorf("invokeCloseCallbacks panic: %v", r) + const size = 64 << 10 + buf := make([]byte, size) + buf = buf[:runtime.Stack(buf, false)] + log.Errorf("invokeCloseCallbacks panic: %v\n%s", r, buf) } }() s.invokeCloseCallbacks() From c27bc00bb7bc805e97ed5d5d5e570a49396d0058 Mon Sep 17 00:00:00 2001 From: mazheng Date: Sat, 13 Sep 2025 14:35:19 +0800 Subject: [PATCH 09/25] =?UTF-8?q?=E9=87=87=E7=BA=B3ai=20comment?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 +- README_CN.md | 2 +- transport/callback.go | 29 ++++++++++++++++++++++++----- transport/callback_test.go | 27 ++++++++++++++++++++++++--- 4 files changed, 50 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 10a70e4a..f5b1f324 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ Additionally, you can manage heartbeat logic within the (Codec)OnCron method in If you're using WebSocket, you don't need to worry about heartbeat request/response, as Getty handles this task within session.go's (Session)handleLoop method by sending and receiving WebSocket ping/pong frames. Your responsibility is to check whether the WebSocket session has timed out or not within codec.go's (Codec)OnCron method using session.go's (Session)GetActive method. -For code examples, you can refer to https://github.com/AlexStocks/getty-examples. +For code examples, you can refer to [getty-examples](https://github.com/AlexStocks/getty-examples). ## About network transmission in getty diff --git a/README_CN.md b/README_CN.md index 4b346cf0..c1c5e3d3 100644 --- a/README_CN.md +++ b/README_CN.md @@ -18,7 +18,7 @@ Getty 是一个使用 Golang 开发的异步网络 I/O 库。它适用于 TCP、 如果您使用 WebSocket,您无需担心心跳请求/响应,因为 Getty 在 session.go 的 (Session)handleLoop 方法内通过发送和接收 WebSocket ping/pong 帧来处理此任务。您只需在 codec.go 的 (Codec)OnCron 方法内使用 session.go 的 (Session)GetActive 方法检查 WebSocket 会话是否已超时。 -有关代码示例,请参阅 https://github.com/AlexStocks/getty-examples +有关代码示例,请参阅 [AlexStocks/getty-examples](https://github.com/AlexStocks/getty-examples)。 ## 关于 Getty 中的网络传输 diff --git a/transport/callback.go b/transport/callback.go index 281354a7..c812c79d 100644 --- a/transport/callback.go +++ b/transport/callback.go @@ -20,8 +20,8 @@ package getty // callbackCommon represents a node in the callback linked list // Each node contains handler identifier, key, callback function and pointer to next node type callbackCommon struct { - handler interface{} // Handler identifier, used to identify the source or type of callback - key interface{} // Unique identifier key for callback, used in combination with handler + handler any // Handler identifier, used to identify the source or type of callback + key any // Unique identifier key for callback, used in combination with handler call func() // Actual callback function to be executed next *callbackCommon // Pointer to next node, forming linked list structure } @@ -38,12 +38,22 @@ type callbacks struct { // - handler: Handler identifier, can be any type // - key: Unique identifier key for callback, used in combination with handler // - callback: Callback function to be executed, ignored if nil -func (t *callbacks) Add(handler, key interface{}, callback func()) { +// Note: If a callback with the same handler and key already exists, it will be replaced +func (t *callbacks) Add(handler, key any, callback func()) { // Prevent adding empty callback function if callback == nil { return } + // Check if a callback with the same handler and key already exists + for cb := t.first; cb != nil; cb = cb.next { + if cb.handler == handler && cb.key == key { + // Replace existing callback + cb.call = callback + return + } + } + // Create new callback node newItem := &callbackCommon{handler, key, callback, nil} @@ -64,7 +74,7 @@ func (t *callbacks) Add(handler, key interface{}, callback func()) { // - key: Unique identifier key of the callback to be removed // // Note: If no matching callback is found, this method has no effect -func (t *callbacks) Remove(handler, key interface{}) { +func (t *callbacks) Remove(handler, key any) { var prev *callbackCommon // Traverse linked list to find the node to be removed @@ -93,12 +103,21 @@ func (t *callbacks) Remove(handler, key interface{}) { // Invoke executes all registered callback functions in the linked list // Executes each callback in the order they were added // Note: If a callback function is nil, it will be skipped +// If a callback panics, it will be caught and the execution will continue with the next callback func (t *callbacks) Invoke() { // Traverse the entire linked list starting from the head node for callback := t.first; callback != nil; callback = callback.next { // Ensure callback function is not nil before executing if callback.call != nil { - callback.call() + // Execute callback with panic recovery to ensure other callbacks continue + func() { + defer func() { + if r := recover(); r != nil { + // Panic caught, continue with next callback + } + }() + callback.call() + }() } } } diff --git a/transport/callback_test.go b/transport/callback_test.go index 5b794be7..d2aed828 100644 --- a/transport/callback_test.go +++ b/transport/callback_test.go @@ -28,6 +28,9 @@ func TestCallback(t *testing.T) { t.Errorf("Expected count for empty list is 0, but got %d", cb.Count()) } + // Ensure invoking on an empty registry is a no-op (no panic). + cb.Invoke() + // Test adding callback functions var count, expected, remove, totalCount int totalCount = 10 @@ -53,6 +56,12 @@ func TestCallback(t *testing.T) { t.Errorf("Expected count after adding nil callback is %d, but got %d", expectedCallbacks, cb.Count()) } + // Replace an existing callback with a non-nil one; count should remain unchanged. + cb.Add(remove, remove, func() { count += remove }) + if cb.Count() != expectedCallbacks { + t.Errorf("Expected count after replacing existing callback is %d, but got %d", expectedCallbacks, cb.Count()) + } + // Remove specified callback cb.Remove(remove, remove) @@ -63,9 +72,9 @@ func TestCallback(t *testing.T) { cb.Invoke() // Verify execution result - expectedCount := expected - remove - if count != expectedCount { - t.Errorf("Expected execution result is %d, but got %d", expectedCount, count) + expectedSum := expected - remove + if count != expectedSum { + t.Errorf("Expected execution result is %d, but got %d", expectedSum, count) } // Test string type handler and key @@ -107,3 +116,15 @@ func TestCallback(t *testing.T) { t.Errorf("Expected callback count is 1, but got %d", cb2.Count()) } } + +func TestCallbackInvokePanicSafe(t *testing.T) { + cb := &callbacks{} + var ran bool + cb.Add("h", "k1", func() { panic("boom") }) + cb.Add("h", "k2", func() { ran = true }) + // Expect: Invoke swallows panics and continues executing remaining callbacks. + cb.Invoke() + if !ran { + t.Errorf("Expected subsequent callbacks to run even if one panics") + } +} From f585a70b4ad650085f49ab711e5b885fd4fa1004 Mon Sep 17 00:00:00 2001 From: mazheng Date: Sat, 13 Sep 2025 14:44:22 +0800 Subject: [PATCH 10/25] remove ai commets recover design --- transport/callback.go | 12 ++---------- transport/callback_test.go | 19 ++++++++++++++----- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/transport/callback.go b/transport/callback.go index c812c79d..03301435 100644 --- a/transport/callback.go +++ b/transport/callback.go @@ -103,21 +103,13 @@ func (t *callbacks) Remove(handler, key any) { // Invoke executes all registered callback functions in the linked list // Executes each callback in the order they were added // Note: If a callback function is nil, it will be skipped -// If a callback panics, it will be caught and the execution will continue with the next callback +// If a callback panics, it will be handled by the outer caller's panic recovery func (t *callbacks) Invoke() { // Traverse the entire linked list starting from the head node for callback := t.first; callback != nil; callback = callback.next { // Ensure callback function is not nil before executing if callback.call != nil { - // Execute callback with panic recovery to ensure other callbacks continue - func() { - defer func() { - if r := recover(); r != nil { - // Panic caught, continue with next callback - } - }() - callback.call() - }() + callback.call() } } } diff --git a/transport/callback_test.go b/transport/callback_test.go index d2aed828..8991590f 100644 --- a/transport/callback_test.go +++ b/transport/callback_test.go @@ -117,14 +117,23 @@ func TestCallback(t *testing.T) { } } -func TestCallbackInvokePanicSafe(t *testing.T) { +func TestCallbackInvokePanicPropagation(t *testing.T) { cb := &callbacks{} var ran bool cb.Add("h", "k1", func() { panic("boom") }) cb.Add("h", "k2", func() { ran = true }) - // Expect: Invoke swallows panics and continues executing remaining callbacks. + + // Test that panic is propagated (not swallowed by Invoke) + defer func() { + if r := recover(); r != nil { + if r != "boom" { + t.Errorf("Expected panic 'boom', got %v", r) + } + } else { + t.Errorf("Expected panic to be propagated, but it was swallowed") + } + }() + + // This should panic and be caught by the defer above cb.Invoke() - if !ran { - t.Errorf("Expected subsequent callbacks to run even if one panics") - } } From 63913385f77556d2ac10e1abecb6bf714197a8e9 Mon Sep 17 00:00:00 2001 From: mazheng Date: Sat, 13 Sep 2025 14:48:23 +0800 Subject: [PATCH 11/25] =?UTF-8?q?=E6=A0=BC=E5=BC=8F=E6=95=B4=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- transport/callback.go | 5 +++-- transport/callback_test.go | 6 ++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/transport/callback.go b/transport/callback.go index 03301435..c3ac7b2b 100644 --- a/transport/callback.go +++ b/transport/callback.go @@ -20,8 +20,8 @@ package getty // callbackCommon represents a node in the callback linked list // Each node contains handler identifier, key, callback function and pointer to next node type callbackCommon struct { - handler any // Handler identifier, used to identify the source or type of callback - key any // Unique identifier key for callback, used in combination with handler + handler any // Handler identifier, used to identify the source or type of callback + key any // Unique identifier key for callback, used in combination with handler call func() // Actual callback function to be executed next *callbackCommon // Pointer to next node, forming linked list structure } @@ -38,6 +38,7 @@ type callbacks struct { // - handler: Handler identifier, can be any type // - key: Unique identifier key for callback, used in combination with handler // - callback: Callback function to be executed, ignored if nil +// // Note: If a callback with the same handler and key already exists, it will be replaced func (t *callbacks) Add(handler, key any, callback func()) { // Prevent adding empty callback function diff --git a/transport/callback_test.go b/transport/callback_test.go index 8991590f..ca5b0ec7 100644 --- a/transport/callback_test.go +++ b/transport/callback_test.go @@ -119,10 +119,8 @@ func TestCallback(t *testing.T) { func TestCallbackInvokePanicPropagation(t *testing.T) { cb := &callbacks{} - var ran bool cb.Add("h", "k1", func() { panic("boom") }) - cb.Add("h", "k2", func() { ran = true }) - + // Test that panic is propagated (not swallowed by Invoke) defer func() { if r := recover(); r != nil { @@ -133,7 +131,7 @@ func TestCallbackInvokePanicPropagation(t *testing.T) { t.Errorf("Expected panic to be propagated, but it was swallowed") } }() - + // This should panic and be caught by the defer above cb.Invoke() } From 5c5b1565426f2246286376470058e84183d5d3f8 Mon Sep 17 00:00:00 2001 From: mazheng Date: Sat, 13 Sep 2025 16:19:32 +0800 Subject: [PATCH 12/25] funcname count change to len,change recover logic --- transport/callback.go | 24 ++++++++++---------- transport/callback_test.go | 36 +++++++++++++++--------------- transport/session.go | 13 ++++++----- transport/session_callback_test.go | 28 +++++++++++------------ 4 files changed, 52 insertions(+), 49 deletions(-) diff --git a/transport/callback.go b/transport/callback.go index c3ac7b2b..d7a62043 100644 --- a/transport/callback.go +++ b/transport/callback.go @@ -17,20 +17,20 @@ package getty -// callbackCommon represents a node in the callback linked list +// callbackNode represents a node in the callback linked list // Each node contains handler identifier, key, callback function and pointer to next node -type callbackCommon struct { - handler any // Handler identifier, used to identify the source or type of callback - key any // Unique identifier key for callback, used in combination with handler - call func() // Actual callback function to be executed - next *callbackCommon // Pointer to next node, forming linked list structure +type callbackNode struct { + handler any // Handler identifier, used to identify the source or type of callback + key any // Unique identifier key for callback, used in combination with handler + call func() // Actual callback function to be executed + next *callbackNode // Pointer to next node, forming linked list structure } // callbacks is a singly linked list structure for managing multiple callback functions // Supports dynamic addition, removal and execution of callbacks type callbacks struct { - first *callbackCommon // Pointer to the first node of the linked list - last *callbackCommon // Pointer to the last node of the linked list, used for quick addition of new nodes + first *callbackNode // Pointer to the first node of the linked list + last *callbackNode // Pointer to the last node of the linked list, used for quick addition of new nodes } // Add adds a new callback function to the callback linked list @@ -56,7 +56,7 @@ func (t *callbacks) Add(handler, key any, callback func()) { } // Create new callback node - newItem := &callbackCommon{handler, key, callback, nil} + newItem := &callbackNode{handler, key, callback, nil} if t.first == nil { // If linked list is empty, new node becomes the first node @@ -76,7 +76,7 @@ func (t *callbacks) Add(handler, key any, callback func()) { // // Note: If no matching callback is found, this method has no effect func (t *callbacks) Remove(handler, key any) { - var prev *callbackCommon + var prev *callbackNode // Traverse linked list to find the node to be removed for callback := t.first; callback != nil; prev, callback = callback, callback.next { @@ -115,9 +115,9 @@ func (t *callbacks) Invoke() { } } -// Count returns the number of callback functions in the linked list +// Len returns the number of callback functions in the linked list // Return value: Total number of currently registered callback functions -func (t *callbacks) Count() int { +func (t *callbacks) Len() int { var count int // Traverse linked list to count diff --git a/transport/callback_test.go b/transport/callback_test.go index ca5b0ec7..ac3b9fc9 100644 --- a/transport/callback_test.go +++ b/transport/callback_test.go @@ -24,8 +24,8 @@ import ( func TestCallback(t *testing.T) { // Test empty list cb := &callbacks{} - if cb.Count() != 0 { - t.Errorf("Expected count for empty list is 0, but got %d", cb.Count()) + if cb.Len() != 0 { + t.Errorf("Expected count for empty list is 0, but got %d", cb.Len()) } // Ensure invoking on an empty registry is a no-op (no panic). @@ -46,20 +46,20 @@ func TestCallback(t *testing.T) { // Verify count after adding expectedCallbacks := totalCount - 1 - if cb.Count() != expectedCallbacks { - t.Errorf("Expected callback count is %d, but got %d", expectedCallbacks, cb.Count()) + if cb.Len() != expectedCallbacks { + t.Errorf("Expected callback count is %d, but got %d", expectedCallbacks, cb.Len()) } // Test adding nil callback cb.Add(remove, remove, nil) - if cb.Count() != expectedCallbacks { - t.Errorf("Expected count after adding nil callback is %d, but got %d", expectedCallbacks, cb.Count()) + if cb.Len() != expectedCallbacks { + t.Errorf("Expected count after adding nil callback is %d, but got %d", expectedCallbacks, cb.Len()) } // Replace an existing callback with a non-nil one; count should remain unchanged. cb.Add(remove, remove, func() { count += remove }) - if cb.Count() != expectedCallbacks { - t.Errorf("Expected count after replacing existing callback is %d, but got %d", expectedCallbacks, cb.Count()) + if cb.Len() != expectedCallbacks { + t.Errorf("Expected count after replacing existing callback is %d, but got %d", expectedCallbacks, cb.Len()) } // Remove specified callback @@ -85,26 +85,26 @@ func TestCallback(t *testing.T) { cb2.Add("handler2", "key2", func() {}) cb2.Add("handler3", "key3", func() {}) - if cb2.Count() != 3 { - t.Errorf("Expected callback count is 3, but got %d", cb2.Count()) + if cb2.Len() != 3 { + t.Errorf("Expected callback count is 3, but got %d", cb2.Len()) } // Remove middle callback cb2.Remove("handler2", "key2") - if cb2.Count() != 2 { - t.Errorf("Expected count after removing middle callback is 2, but got %d", cb2.Count()) + if cb2.Len() != 2 { + t.Errorf("Expected count after removing middle callback is 2, but got %d", cb2.Len()) } // Remove first callback cb2.Remove("handler1", "key1") - if cb2.Count() != 1 { - t.Errorf("Expected count after removing first callback is 1, but got %d", cb2.Count()) + if cb2.Len() != 1 { + t.Errorf("Expected count after removing first callback is 1, but got %d", cb2.Len()) } // Remove last callback cb2.Remove("handler3", "key3") - if cb2.Count() != 0 { - t.Errorf("Expected count after removing last callback is 0, but got %d", cb2.Count()) + if cb2.Len() != 0 { + t.Errorf("Expected count after removing last callback is 0, but got %d", cb2.Len()) } // Test removing non-existent callback @@ -112,8 +112,8 @@ func TestCallback(t *testing.T) { cb2.Remove("handler2", "key2") // Try to remove non-existent callback // Should still have 1 callback - if cb2.Count() != 1 { - t.Errorf("Expected callback count is 1, but got %d", cb2.Count()) + if cb2.Len() != 1 { + t.Errorf("Expected callback count is 1, but got %d", cb2.Len()) } } diff --git a/transport/session.go b/transport/session.go index e6d37781..a1f54872 100644 --- a/transport/session.go +++ b/transport/session.go @@ -872,17 +872,20 @@ func (s *session) stop() { } close(s.done) - go func() { + go func(sessionToken string) { defer func() { if r := recover(); r != nil { const size = 64 << 10 - buf := make([]byte, size) - buf = buf[:runtime.Stack(buf, false)] - log.Errorf("invokeCloseCallbacks panic: %v\n%s", r, buf) + rBuf := make([]byte, size) + rBuf = rBuf[:runtime.Stack(rBuf, false)] + err := perrors.WithStack(fmt.Errorf("[session.invokeCloseCallbacks] panic session %s: err=%v\n%s", + sessionToken, r, rBuf)) + log.Error(err) } }() + s.invokeCloseCallbacks() - }() + }(s.sessionToken()) clt, cltFound := s.GetAttribute(sessionClientKey).(*client) ignoreReconnect, flagFound := s.GetAttribute(ignoreReconnectKey).(bool) diff --git a/transport/session_callback_test.go b/transport/session_callback_test.go index efe1df44..68228fb2 100644 --- a/transport/session_callback_test.go +++ b/transport/session_callback_test.go @@ -44,14 +44,14 @@ func TestSessionCallback(t *testing.T) { // Add callback s.AddCloseCallback("testHandler", "testKey", callback) - if s.closeCallback.Count() != 1 { - t.Errorf("Expected callback count is 1, but got %d", s.closeCallback.Count()) + if s.closeCallback.Len() != 1 { + t.Errorf("Expected callback count is 1, but got %d", s.closeCallback.Len()) } // Test removing callback s.RemoveCloseCallback("testHandler", "testKey") - if s.closeCallback.Count() != 0 { - t.Errorf("Expected callback count is 0, but got %d", s.closeCallback.Count()) + if s.closeCallback.Len() != 0 { + t.Errorf("Expected callback count is 0, but got %d", s.closeCallback.Len()) } // Re-add callback @@ -96,15 +96,15 @@ func TestSessionCallback(t *testing.T) { s.AddCloseCallback(fmt.Sprintf("handler%d", index), fmt.Sprintf("key%d", index), callback) } - if s.closeCallback.Count() != totalCallbacks { - t.Errorf("Expected callback count is %d, but got %d", totalCallbacks, s.closeCallback.Count()) + if s.closeCallback.Len() != totalCallbacks { + t.Errorf("Expected callback count is %d, but got %d", totalCallbacks, s.closeCallback.Len()) } // Remove one callback s.RemoveCloseCallback("handler0", "key0") expectedAfterRemove := totalCallbacks - 1 - if s.closeCallback.Count() != expectedAfterRemove { - t.Errorf("Expected callback count is %d, but got %d", expectedAfterRemove, s.closeCallback.Count()) + if s.closeCallback.Len() != expectedAfterRemove { + t.Errorf("Expected callback count is %d, but got %d", expectedAfterRemove, s.closeCallback.Len()) } // Test execution of remaining callbacks when closing @@ -158,8 +158,8 @@ func TestSessionCallback(t *testing.T) { // Verify callback count expectedCount := len(callbacks) - if s.closeCallback.Count() != expectedCount { - t.Errorf("Expected callback count is %d, but got %d", expectedCount, s.closeCallback.Count()) + if s.closeCallback.Len() != expectedCount { + t.Errorf("Expected callback count is %d, but got %d", expectedCount, s.closeCallback.Len()) } // Manually invoke close callbacks (simulate invokeCloseCallbacks) @@ -218,8 +218,8 @@ func TestSessionCallback(t *testing.T) { } // Verify empty list - if s.closeCallback.Count() != 0 { - t.Errorf("Expected count for empty list is 0, but got %d", s.closeCallback.Count()) + if s.closeCallback.Len() != 0 { + t.Errorf("Expected count for empty list is 0, but got %d", s.closeCallback.Len()) } // Execute empty callback list (should not panic) @@ -232,8 +232,8 @@ func TestSessionCallback(t *testing.T) { // Execute empty list after removal (should not panic) s.closeCallback.Invoke() - if s.closeCallback.Count() != 0 { - t.Errorf("Expected count after removal is 0, but got %d", s.closeCallback.Count()) + if s.closeCallback.Len() != 0 { + t.Errorf("Expected count after removal is 0, but got %d", s.closeCallback.Len()) } }) } From 63ab8bf12999bff3d09750567f4a015a823837dc Mon Sep 17 00:00:00 2001 From: mazheng Date: Sat, 13 Sep 2025 16:21:47 +0800 Subject: [PATCH 13/25] update readme --- README.md | 32 ++++++++++++++++++++++++++++++++ README_CN.md | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+) diff --git a/README.md b/README.md index f5b1f324..d75d5496 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,38 @@ If you're using WebSocket, you don't need to worry about heartbeat request/respo For code examples, you can refer to [getty-examples](https://github.com/AlexStocks/getty-examples). +## Callback System + +Getty provides a robust callback system that allows you to register and manage callback functions for session lifecycle events. This is particularly useful for cleanup operations, resource management, and custom event handling. + +### Key Features + +- **Thread-safe operations**: All callback operations are protected by mutex locks +- **Replace semantics**: Adding a callback with the same handler and key will replace the existing one +- **Panic safety**: Callback panics are properly handled and logged with stack traces +- **Ordered execution**: Callbacks are executed in the order they were added + +### Usage Example + +```go +// Add a close callback +session.AddCloseCallback("cleanup", "resources", func() { + // Cleanup resources when session closes + cleanupResources() +}) + +// Remove a specific callback +session.RemoveCloseCallback("cleanup", "resources") + +// Callbacks are automatically executed when the session closes +``` + +### Callback Management + +- **AddCloseCallback**: Register a callback to be executed when the session closes +- **RemoveCloseCallback**: Remove a previously registered callback +- **Thread Safety**: All operations are thread-safe and can be called concurrently + ## About network transmission in getty In network communication, the data transmission interface of getty does not guarantee that data will be sent successfully; it lacks an internal retry mechanism. Instead, getty delegates the outcome of data transmission to the underlying operating system mechanism. Under this mechanism, if data is successfully transmitted, it is considered a success; if transmission fails, it is regarded as a failure. These outcomes are then communicated back to the upper-layer caller. diff --git a/README_CN.md b/README_CN.md index c1c5e3d3..b17f0a7a 100644 --- a/README_CN.md +++ b/README_CN.md @@ -20,6 +20,38 @@ Getty 是一个使用 Golang 开发的异步网络 I/O 库。它适用于 TCP、 有关代码示例,请参阅 [AlexStocks/getty-examples](https://github.com/AlexStocks/getty-examples)。 +## 回调系统 + +Getty 提供了一个强大的回调系统,允许您为会话生命周期事件注册和管理回调函数。这对于清理操作、资源管理和自定义事件处理特别有用。 + +### 主要特性 + +- **线程安全操作**:所有回调操作都受到互斥锁保护 +- **替换语义**:使用相同的处理器和键添加回调将替换现有的回调 +- **Panic 安全性**:回调中的 panic 会被正确处理并记录堆栈跟踪 +- **有序执行**:回调按照添加的顺序执行 + +### 使用示例 + +```go +// 添加关闭回调 +session.AddCloseCallback("cleanup", "resources", func() { + // 当会话关闭时清理资源 + cleanupResources() +}) + +// 移除特定回调 +session.RemoveCloseCallback("cleanup", "resources") + +// 当会话关闭时,回调会自动执行 +``` + +### 回调管理 + +- **AddCloseCallback**:注册一个在会话关闭时执行的回调 +- **RemoveCloseCallback**:移除之前注册的回调 +- **线程安全**:所有操作都是线程安全的,可以并发调用 + ## 关于 Getty 中的网络传输 在网络通信中,Getty 的数据传输接口并不保证数据一定会成功发送,它缺乏内部的重试机制。相反,Getty 将数据传输的结果委托给底层操作系统机制处理。在这种机制下,如果数据成功传输,将被视为成功;如果传输失败,则被视为失败。这些结果随后会传递给上层调用者。 From d1359cee6e0ff466c8230930cef4044de35fdf2c Mon Sep 17 00:00:00 2001 From: mazheng Date: Sat, 13 Sep 2025 17:10:43 +0800 Subject: [PATCH 14/25] update readme --- README.md | 2 ++ README_CN.md | 2 ++ 2 files changed, 4 insertions(+) diff --git a/README.md b/README.md index d75d5496..e25dc7b9 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,8 @@ session.RemoveCloseCallback("cleanup", "resources") // Callbacks are automatically executed when the session closes ``` +**Note**: Callbacks should be fast/non-blocking; move heavy work to separate goroutines to avoid delaying shutdown. + ### Callback Management - **AddCloseCallback**: Register a callback to be executed when the session closes diff --git a/README_CN.md b/README_CN.md index b17f0a7a..c1051152 100644 --- a/README_CN.md +++ b/README_CN.md @@ -46,6 +46,8 @@ session.RemoveCloseCallback("cleanup", "resources") // 当会话关闭时,回调会自动执行 ``` +**注意**:关闭回调应尽量快速、避免阻塞或长耗时操作,否则可能延长会话关闭耗时;重活移交到独立 goroutine。 + ### 回调管理 - **AddCloseCallback**:注册一个在会话关闭时执行的回调 From 8a74d19de1757e9399a0cea1c414f09ea78b4568 Mon Sep 17 00:00:00 2001 From: mazheng Date: Sat, 13 Sep 2025 17:25:30 +0800 Subject: [PATCH 15/25] =?UTF-8?q?=E6=96=87=E6=A1=A3=E9=87=8C=E8=AF=B4?= =?UTF-8?q?=E6=98=8Ehandler=20key=E7=9A=84=E5=8F=82=E6=95=B0=E7=B1=BB?= =?UTF-8?q?=E5=9E=8B=E8=A6=81=E6=B1=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 30 ++++++++++++++++++++++++++++++ README_CN.md | 30 ++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/README.md b/README.md index e25dc7b9..5624bdc8 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,36 @@ session.RemoveCloseCallback("cleanup", "resources") - **RemoveCloseCallback**: Remove a previously registered callback - **Thread Safety**: All operations are thread-safe and can be called concurrently +### Type Requirements + +The `handler` and `key` parameters must be **comparable types** that support the `==` operator: + +**✅ Supported types:** +- **Basic types**: `string`, `int`, `int8`, `int16`, `int32`, `int64`, `uint`, `uint8`, `uint16`, `uint32`, `uint64`, `uintptr`, `float32`, `float64`, `bool`, `complex64`, `complex128` +- **Pointer types**: Pointers to any type (e.g., `*int`, `*string`, `*MyStruct`) +- **Interface types**: Interface types (compared by type and value) +- **Channel types**: Channel types (compared by channel identity) +- **Array types**: Arrays of comparable elements (e.g., `[3]int`, `[2]string`) +- **Struct types**: Structs where all fields are comparable types + +**❌ Not supported (will cause compile errors):** +- `map` types (e.g., `map[string]int`) +- `slice` types (e.g., `[]int`, `[]string`) +- `func` types (e.g., `func()`, `func(int) string`) +- Structs containing non-comparable fields (maps, slices, functions) + +**Examples:** +```go +// ✅ Valid usage +session.AddCloseCallback("user", "cleanup", callback) +session.AddCloseCallback(123, "cleanup", callback) +session.AddCloseCallback(true, false, callback) + +// ❌ Invalid usage (compile error) +session.AddCloseCallback(map[string]int{"a": 1}, "key", callback) +session.AddCloseCallback([]int{1, 2, 3}, "key", callback) +``` + ## About network transmission in getty In network communication, the data transmission interface of getty does not guarantee that data will be sent successfully; it lacks an internal retry mechanism. Instead, getty delegates the outcome of data transmission to the underlying operating system mechanism. Under this mechanism, if data is successfully transmitted, it is considered a success; if transmission fails, it is regarded as a failure. These outcomes are then communicated back to the upper-layer caller. diff --git a/README_CN.md b/README_CN.md index c1051152..df556a46 100644 --- a/README_CN.md +++ b/README_CN.md @@ -54,6 +54,36 @@ session.RemoveCloseCallback("cleanup", "resources") - **RemoveCloseCallback**:移除之前注册的回调 - **线程安全**:所有操作都是线程安全的,可以并发调用 +### 类型要求 + +`handler` 和 `key` 参数必须是**可比较的类型**,支持 `==` 操作符: + +**✅ 支持的类型:** +- **基本类型**:`string`、`int`、`int8`、`int16`、`int32`、`int64`、`uint`、`uint8`、`uint16`、`uint32`、`uint64`、`uintptr`、`float32`、`float64`、`bool`、`complex64`、`complex128` +- **指针类型**:指向任何类型的指针(如 `*int`、`*string`、`*MyStruct`) +- **接口类型**:接口类型(按类型和值比较) +- **通道类型**:通道类型(按通道标识比较) +- **数组类型**:可比较元素的数组(如 `[3]int`、`[2]string`) +- **结构体类型**:所有字段都是可比较类型的结构体 + +**❌ 不支持的类型(会导致编译错误):** +- `map` 类型(如 `map[string]int`) +- `slice` 类型(如 `[]int`、`[]string`) +- `func` 类型(如 `func()`、`func(int) string`) +- 包含不可比较字段的结构体(maps、slices、functions) + +**示例:** +```go +// ✅ 有效用法 +session.AddCloseCallback("user", "cleanup", callback) +session.AddCloseCallback(123, "cleanup", callback) +session.AddCloseCallback(true, false, callback) + +// ❌ 无效用法(编译错误) +session.AddCloseCallback(map[string]int{"a": 1}, "key", callback) +session.AddCloseCallback([]int{1, 2, 3}, "key", callback) +``` + ## 关于 Getty 中的网络传输 在网络通信中,Getty 的数据传输接口并不保证数据一定会成功发送,它缺乏内部的重试机制。相反,Getty 将数据传输的结果委托给底层操作系统机制处理。在这种机制下,如果数据成功传输,将被视为成功;如果传输失败,则被视为失败。这些结果随后会传递给上层调用者。 From 27e1af0e49e0a4c3691a077a35b238a686a27841 Mon Sep 17 00:00:00 2001 From: mazheng Date: Mon, 15 Sep 2025 10:48:59 +0800 Subject: [PATCH 16/25] =?UTF-8?q?ai=20comment=E5=86=85=E5=AE=B9=E4=BF=AE?= =?UTF-8?q?=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 6 +++--- README_CN.md | 6 +++--- transport/session.go | 3 --- 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 5624bdc8..38a2ceda 100644 --- a/README.md +++ b/README.md @@ -61,12 +61,12 @@ The `handler` and `key` parameters must be **comparable types** that support the **✅ Supported types:** - **Basic types**: `string`, `int`, `int8`, `int16`, `int32`, `int64`, `uint`, `uint8`, `uint16`, `uint32`, `uint64`, `uintptr`, `float32`, `float64`, `bool`, `complex64`, `complex128` - **Pointer types**: Pointers to any type (e.g., `*int`, `*string`, `*MyStruct`) -- **Interface types**: Interface types (compared by type and value) +- **Interface types**: Interface types are comparable only when their dynamic values are comparable types; using "==" with non-comparable dynamic values will trigger a runtime panic - **Channel types**: Channel types (compared by channel identity) - **Array types**: Arrays of comparable elements (e.g., `[3]int`, `[2]string`) - **Struct types**: Structs where all fields are comparable types -**❌ Not supported (will cause compile errors):** +**❌ Not supported (using these types for comparison will cause runtime panic):** - `map` types (e.g., `map[string]int`) - `slice` types (e.g., `[]int`, `[]string`) - `func` types (e.g., `func()`, `func(int) string`) @@ -79,7 +79,7 @@ session.AddCloseCallback("user", "cleanup", callback) session.AddCloseCallback(123, "cleanup", callback) session.AddCloseCallback(true, false, callback) -// ❌ Invalid usage (compile error) +// ❌ Invalid usage (runtime will panic due to comparing non-comparable types) session.AddCloseCallback(map[string]int{"a": 1}, "key", callback) session.AddCloseCallback([]int{1, 2, 3}, "key", callback) ``` diff --git a/README_CN.md b/README_CN.md index df556a46..86f9e92e 100644 --- a/README_CN.md +++ b/README_CN.md @@ -61,12 +61,12 @@ session.RemoveCloseCallback("cleanup", "resources") **✅ 支持的类型:** - **基本类型**:`string`、`int`、`int8`、`int16`、`int32`、`int64`、`uint`、`uint8`、`uint16`、`uint32`、`uint64`、`uintptr`、`float32`、`float64`、`bool`、`complex64`、`complex128` - **指针类型**:指向任何类型的指针(如 `*int`、`*string`、`*MyStruct`) -- **接口类型**:接口类型(按类型和值比较) +- **接口类型**:仅当其动态值为可比较类型时可比较;若动态值不可比较,使用"=="将触发运行时 panic - **通道类型**:通道类型(按通道标识比较) - **数组类型**:可比较元素的数组(如 `[3]int`、`[2]string`) - **结构体类型**:所有字段都是可比较类型的结构体 -**❌ 不支持的类型(会导致编译错误):** +**❌ 不支持的类型(用于比较将导致运行时 panic):** - `map` 类型(如 `map[string]int`) - `slice` 类型(如 `[]int`、`[]string`) - `func` 类型(如 `func()`、`func(int) string`) @@ -79,7 +79,7 @@ session.AddCloseCallback("user", "cleanup", callback) session.AddCloseCallback(123, "cleanup", callback) session.AddCloseCallback(true, false, callback) -// ❌ 无效用法(编译错误) +// ❌ 无效用法(运行时将因比较不可比较类型而 panic) session.AddCloseCallback(map[string]int{"a": 1}, "key", callback) session.AddCloseCallback([]int{1, 2, 3}, "key", callback) ``` diff --git a/transport/session.go b/transport/session.go index a1f54872..61159d19 100644 --- a/transport/session.go +++ b/transport/session.go @@ -200,9 +200,6 @@ func (s *session) Reset() { period: period, wait: pendingDuration, attrs: gxcontext.NewValuesContext(context.Background()), - // callbacks: intentionally cleared on reset - // closeCallback: zero-value (empty registry) - // closeCallbackMutex: zero-value (unlocked) } } From 7fb5dbbd1c3949ed788568b19659ed16fd2c1b2b Mon Sep 17 00:00:00 2001 From: mazheng Date: Thu, 18 Sep 2025 10:55:51 +0800 Subject: [PATCH 17/25] =?UTF-8?q?unlock=E5=9C=A8defer=E4=B8=AD=E6=89=A7?= =?UTF-8?q?=E8=A1=8C,=E5=A2=9E=E5=8A=A0cbnum=E6=9D=A5=E8=AE=B0=E5=BD=95?= =?UTF-8?q?=E9=93=BE=E8=A1=A8=E5=85=83=E7=B4=A0=E6=95=B0=E9=87=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- transport/callback.go | 15 +++++++-------- transport/session_callback.go | 6 +++--- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/transport/callback.go b/transport/callback.go index d7a62043..dcd169e2 100644 --- a/transport/callback.go +++ b/transport/callback.go @@ -31,6 +31,7 @@ type callbackNode struct { type callbacks struct { first *callbackNode // Pointer to the first node of the linked list last *callbackNode // Pointer to the last node of the linked list, used for quick addition of new nodes + cbNum int // Number of callback functions in the linked list } // Add adds a new callback function to the callback linked list @@ -67,6 +68,8 @@ func (t *callbacks) Add(handler, key any, callback func()) { } // Update pointer to last node t.last = newItem + // Increment callback count + t.cbNum++ } // Remove removes the specified callback function from the callback linked list @@ -95,6 +98,9 @@ func (t *callbacks) Remove(handler, key any) { t.last = prev } + // Decrement callback count + t.cbNum-- + // Return immediately after finding and removing return } @@ -118,12 +124,5 @@ func (t *callbacks) Invoke() { // Len returns the number of callback functions in the linked list // Return value: Total number of currently registered callback functions func (t *callbacks) Len() int { - var count int - - // Traverse linked list to count - for callback := t.first; callback != nil; callback = callback.next { - count++ - } - - return count + return t.cbNum } diff --git a/transport/session_callback.go b/transport/session_callback.go index 38e046e1..1cd2e4fc 100644 --- a/transport/session_callback.go +++ b/transport/session_callback.go @@ -33,8 +33,8 @@ func (s *session) AddCloseCallback(handler, key any, f CallBackFunc) { return } s.closeCallbackMutex.Lock() + defer s.closeCallbackMutex.Unlock() s.closeCallback.Add(handler, key, f) - s.closeCallbackMutex.Unlock() } // RemoveCloseCallback removes the specified Session close callback function @@ -54,8 +54,8 @@ func (s *session) RemoveCloseCallback(handler, key any) { return } s.closeCallbackMutex.Lock() + defer s.closeCallbackMutex.Unlock() s.closeCallback.Remove(handler, key) - s.closeCallbackMutex.Unlock() } // invokeCloseCallbacks executes all registered close callback functions @@ -71,8 +71,8 @@ func (s *session) RemoveCloseCallback(handler, key any) { // - Callback functions should avoid long blocking operations, async processing is recommended for time-consuming tasks func (s *session) invokeCloseCallbacks() { s.closeCallbackMutex.RLock() + defer s.closeCallbackMutex.RUnlock() s.closeCallback.Invoke() - s.closeCallbackMutex.RUnlock() } // CallBackFunc defines the callback function type when Session closes From aedd29e1796fd5b1b6b121aad0d9e75ae40416ad Mon Sep 17 00:00:00 2001 From: mazheng Date: Thu, 18 Sep 2025 15:38:06 +0800 Subject: [PATCH 18/25] =?UTF-8?q?=E5=A2=9E=E5=8A=A0iscompare=E6=96=B9?= =?UTF-8?q?=E6=B3=95=E5=AF=B9=E4=B8=8D=E5=8F=AF=E6=AF=94=E8=BE=83=E7=B1=BB?= =?UTF-8?q?=E5=9E=8B=E8=BF=9B=E8=A1=8C=E5=88=A4=E6=96=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- transport/callback.go | 28 ++++++++++++++++++ transport/callback_test.go | 59 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+) diff --git a/transport/callback.go b/transport/callback.go index dcd169e2..c5276094 100644 --- a/transport/callback.go +++ b/transport/callback.go @@ -17,6 +17,13 @@ package getty +import ( + "fmt" + log "github.com/AlexStocks/getty/util" + perrors "github.com/pkg/errors" + "reflect" +) + // callbackNode represents a node in the callback linked list // Each node contains handler identifier, key, callback function and pointer to next node type callbackNode struct { @@ -34,6 +41,15 @@ type callbacks struct { cbNum int // Number of callback functions in the linked list } +// isComparable checks if a value is comparable using Go's == operator +// Returns true if the value can be safely compared, false otherwise +func isComparable(v any) bool { + if v == nil { + return true + } + return reflect.TypeOf(v).Comparable() +} + // Add adds a new callback function to the callback linked list // Parameters: // - handler: Handler identifier, can be any type @@ -47,6 +63,12 @@ func (t *callbacks) Add(handler, key any, callback func()) { return } + // Guard: avoid runtime panic on non-comparable types + if !isComparable(handler) || !isComparable(key) { + log.Error(perrors.New(fmt.Sprintf("callbacks.Add: non-comparable handler/key: %T, %T; ignored", handler, key))) + return + } + // Check if a callback with the same handler and key already exists for cb := t.first; cb != nil; cb = cb.next { if cb.handler == handler && cb.key == key { @@ -79,6 +101,12 @@ func (t *callbacks) Add(handler, key any, callback func()) { // // Note: If no matching callback is found, this method has no effect func (t *callbacks) Remove(handler, key any) { + // Guard: avoid runtime panic on non-comparable types + if !isComparable(handler) || !isComparable(key) { + log.Error(perrors.New(fmt.Sprintf("callbacks.Remove: non-comparable handler/key: %T, %T; ignored", handler, key))) + return + } + var prev *callbackNode // Traverse linked list to find the node to be removed diff --git a/transport/callback_test.go b/transport/callback_test.go index ac3b9fc9..5aaf2ccb 100644 --- a/transport/callback_test.go +++ b/transport/callback_test.go @@ -135,3 +135,62 @@ func TestCallbackInvokePanicPropagation(t *testing.T) { // This should panic and be caught by the defer above cb.Invoke() } + +func TestCallbackNonComparableTypes(t *testing.T) { + cb := &callbacks{} + + // Test with non-comparable types (slice, map, function) + nonComparableTypes := []struct { + name string + handler any + key any + expected bool // whether the callback should be added + }{ + {"slice_handler", []int{1, 2, 3}, "key", false}, + {"map_handler", map[string]int{"a": 1}, "key", false}, + {"func_handler", func() {}, "key", false}, + {"slice_key", "handler", []int{1, 2, 3}, false}, + {"map_key", "handler", map[string]int{"a": 1}, false}, + {"func_key", "handler", func() {}, false}, + {"both_non_comparable", []int{1}, map[string]int{"a": 1}, false}, + {"comparable_types", "handler", "key", true}, + {"nil_values", nil, nil, true}, + {"mixed_comparable", "handler", 123, true}, + } + + for _, tt := range nonComparableTypes { + t.Run(tt.name, func(t *testing.T) { + initialCount := cb.Len() + + // Try to add callback + cb.Add(tt.handler, tt.key, func() {}) + + // Check if callback was added + finalCount := cb.Len() + if tt.expected { + if finalCount != initialCount+1 { + t.Errorf("Expected callback to be added, but count remained %d", initialCount) + } + // Clean up for next test + cb.Remove(tt.handler, tt.key) + } else { + if finalCount != initialCount { + t.Errorf("Expected callback to be ignored, but count changed from %d to %d", initialCount, finalCount) + } + } + }) + } + + // Test Remove with non-comparable types + t.Run("RemoveNonComparable", func(t *testing.T) { + initialCount := cb.Len() + + // Try to remove with non-comparable types + cb.Remove([]int{1, 2, 3}, map[string]int{"a": 1}) + + // Count should remain unchanged + if cb.Len() != initialCount { + t.Errorf("Expected count to remain %d after removing non-comparable types, but got %d", initialCount, cb.Len()) + } + }) +} From 8aeb95235ac0f500e92d9abe64de5bc42a366067 Mon Sep 17 00:00:00 2001 From: mazheng Date: Thu, 18 Sep 2025 15:44:23 +0800 Subject: [PATCH 19/25] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E6=96=87=E6=A1=A3?= =?UTF-8?q?=E6=8F=8F=E8=BF=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 12 ++++++------ README_CN.md | 12 ++++++------ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 38a2ceda..ba913abb 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,7 @@ session.RemoveCloseCallback("cleanup", "resources") // Callbacks are automatically executed when the session closes ``` -**Note**: Callbacks should be fast/non-blocking; move heavy work to separate goroutines to avoid delaying shutdown. +**Note**: Callbacks are executed in a separate goroutine during session shutdown. ### Callback Management @@ -61,12 +61,12 @@ The `handler` and `key` parameters must be **comparable types** that support the **✅ Supported types:** - **Basic types**: `string`, `int`, `int8`, `int16`, `int32`, `int64`, `uint`, `uint8`, `uint16`, `uint32`, `uint64`, `uintptr`, `float32`, `float64`, `bool`, `complex64`, `complex128` - **Pointer types**: Pointers to any type (e.g., `*int`, `*string`, `*MyStruct`) -- **Interface types**: Interface types are comparable only when their dynamic values are comparable types; using "==" with non-comparable dynamic values will trigger a runtime panic +- **Interface types**: Interface types are comparable only when their dynamic values are comparable types; using "==" with non-comparable dynamic values will be safely ignored with error log - **Channel types**: Channel types (compared by channel identity) - **Array types**: Arrays of comparable elements (e.g., `[3]int`, `[2]string`) - **Struct types**: Structs where all fields are comparable types -**❌ Not supported (using these types for comparison will cause runtime panic):** +**⚠️ Non-comparable types (will be safely ignored with error log):** - `map` types (e.g., `map[string]int`) - `slice` types (e.g., `[]int`, `[]string`) - `func` types (e.g., `func()`, `func(int) string`) @@ -79,9 +79,9 @@ session.AddCloseCallback("user", "cleanup", callback) session.AddCloseCallback(123, "cleanup", callback) session.AddCloseCallback(true, false, callback) -// ❌ Invalid usage (runtime will panic due to comparing non-comparable types) -session.AddCloseCallback(map[string]int{"a": 1}, "key", callback) -session.AddCloseCallback([]int{1, 2, 3}, "key", callback) +// ⚠️ Non-comparable types (safely ignored with error log) +session.AddCloseCallback(map[string]int{"a": 1}, "key", callback) // Logged and ignored +session.AddCloseCallback([]int{1, 2, 3}, "key", callback) // Logged and ignored ``` ## About network transmission in getty diff --git a/README_CN.md b/README_CN.md index 86f9e92e..ba1994d2 100644 --- a/README_CN.md +++ b/README_CN.md @@ -46,7 +46,7 @@ session.RemoveCloseCallback("cleanup", "resources") // 当会话关闭时,回调会自动执行 ``` -**注意**:关闭回调应尽量快速、避免阻塞或长耗时操作,否则可能延长会话关闭耗时;重活移交到独立 goroutine。 +**注意**:关闭回调在会话关闭时会在独立 goroutine 中执行。 ### 回调管理 @@ -61,12 +61,12 @@ session.RemoveCloseCallback("cleanup", "resources") **✅ 支持的类型:** - **基本类型**:`string`、`int`、`int8`、`int16`、`int32`、`int64`、`uint`、`uint8`、`uint16`、`uint32`、`uint64`、`uintptr`、`float32`、`float64`、`bool`、`complex64`、`complex128` - **指针类型**:指向任何类型的指针(如 `*int`、`*string`、`*MyStruct`) -- **接口类型**:仅当其动态值为可比较类型时可比较;若动态值不可比较,使用"=="将触发运行时 panic +- **接口类型**:仅当其动态值为可比较类型时可比较;若动态值不可比较,使用"=="将被安全忽略并记录错误日志 - **通道类型**:通道类型(按通道标识比较) - **数组类型**:可比较元素的数组(如 `[3]int`、`[2]string`) - **结构体类型**:所有字段都是可比较类型的结构体 -**❌ 不支持的类型(用于比较将导致运行时 panic):** +**⚠️ 不可比较类型(将被安全忽略并记录错误日志):** - `map` 类型(如 `map[string]int`) - `slice` 类型(如 `[]int`、`[]string`) - `func` 类型(如 `func()`、`func(int) string`) @@ -79,9 +79,9 @@ session.AddCloseCallback("user", "cleanup", callback) session.AddCloseCallback(123, "cleanup", callback) session.AddCloseCallback(true, false, callback) -// ❌ 无效用法(运行时将因比较不可比较类型而 panic) -session.AddCloseCallback(map[string]int{"a": 1}, "key", callback) -session.AddCloseCallback([]int{1, 2, 3}, "key", callback) +// ⚠️ 不可比较类型(安全忽略并记录错误日志) +session.AddCloseCallback(map[string]int{"a": 1}, "key", callback) // 记录日志并忽略 +session.AddCloseCallback([]int{1, 2, 3}, "key", callback) // 记录日志并忽略 ``` ## 关于 Getty 中的网络传输 From 7585325b0dc398ff9066a39040a7d5f556662b49 Mon Sep 17 00:00:00 2001 From: mazheng Date: Thu, 18 Sep 2025 15:46:28 +0800 Subject: [PATCH 20/25] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=E6=A0=BC=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- transport/callback_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/transport/callback_test.go b/transport/callback_test.go index 5aaf2ccb..e5ac3053 100644 --- a/transport/callback_test.go +++ b/transport/callback_test.go @@ -161,10 +161,10 @@ func TestCallbackNonComparableTypes(t *testing.T) { for _, tt := range nonComparableTypes { t.Run(tt.name, func(t *testing.T) { initialCount := cb.Len() - + // Try to add callback cb.Add(tt.handler, tt.key, func() {}) - + // Check if callback was added finalCount := cb.Len() if tt.expected { @@ -184,10 +184,10 @@ func TestCallbackNonComparableTypes(t *testing.T) { // Test Remove with non-comparable types t.Run("RemoveNonComparable", func(t *testing.T) { initialCount := cb.Len() - + // Try to remove with non-comparable types cb.Remove([]int{1, 2, 3}, map[string]int{"a": 1}) - + // Count should remain unchanged if cb.Len() != initialCount { t.Errorf("Expected count to remain %d after removing non-comparable types, but got %d", initialCount, cb.Len()) From a951232c0ce05665bddec52bf12551ebb8b5ae56 Mon Sep 17 00:00:00 2001 From: mazheng Date: Thu, 18 Sep 2025 15:59:27 +0800 Subject: [PATCH 21/25] gofmt --- transport/callback.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/transport/callback.go b/transport/callback.go index c5276094..02569d62 100644 --- a/transport/callback.go +++ b/transport/callback.go @@ -19,9 +19,10 @@ package getty import ( "fmt" + "reflect" + log "github.com/AlexStocks/getty/util" perrors "github.com/pkg/errors" - "reflect" ) // callbackNode represents a node in the callback linked list From 979da7b7e5e9c5e8c528f612db5cb80f0b126460 Mon Sep 17 00:00:00 2001 From: mazheng Date: Thu, 18 Sep 2025 16:08:07 +0800 Subject: [PATCH 22/25] =?UTF-8?q?=E8=B0=83=E6=95=B4=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 7 ++++--- README_CN.md | 7 ++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index ba913abb..0b0a6796 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ Getty provides a robust callback system that allows you to register and manage c ### Key Features - **Thread-safe operations**: All callback operations are protected by mutex locks -- **Replace semantics**: Adding a callback with the same handler and key will replace the existing one +- **Replace semantics**: Adding with the same (handler, key) replaces the existing callback in place (position preserved) - **Panic safety**: Callback panics are properly handled and logged with stack traces - **Ordered execution**: Callbacks are executed in the order they were added @@ -46,12 +46,12 @@ session.RemoveCloseCallback("cleanup", "resources") // Callbacks are automatically executed when the session closes ``` -**Note**: Callbacks are executed in a separate goroutine during session shutdown. +**Note**: During session shutdown, callbacks are executed sequentially in a dedicated goroutine to preserve the add-order. ### Callback Management - **AddCloseCallback**: Register a callback to be executed when the session closes -- **RemoveCloseCallback**: Remove a previously registered callback +- **RemoveCloseCallback**: Remove a previously registered callback (no-op if not found; safe to call multiple times) - **Thread Safety**: All operations are thread-safe and can be called concurrently ### Type Requirements @@ -60,6 +60,7 @@ The `handler` and `key` parameters must be **comparable types** that support the **✅ Supported types:** - **Basic types**: `string`, `int`, `int8`, `int16`, `int32`, `int64`, `uint`, `uint8`, `uint16`, `uint32`, `uint64`, `uintptr`, `float32`, `float64`, `bool`, `complex64`, `complex128` + - ⚠️ Avoid `float*`/`complex*` as keys due to NaN and precision semantics; prefer strings/ints - **Pointer types**: Pointers to any type (e.g., `*int`, `*string`, `*MyStruct`) - **Interface types**: Interface types are comparable only when their dynamic values are comparable types; using "==" with non-comparable dynamic values will be safely ignored with error log - **Channel types**: Channel types (compared by channel identity) diff --git a/README_CN.md b/README_CN.md index ba1994d2..680c5dba 100644 --- a/README_CN.md +++ b/README_CN.md @@ -27,7 +27,7 @@ Getty 提供了一个强大的回调系统,允许您为会话生命周期事 ### 主要特性 - **线程安全操作**:所有回调操作都受到互斥锁保护 -- **替换语义**:使用相同的处理器和键添加回调将替换现有的回调 +- **替换语义**:使用相同的 (handler, key) 添加会替换现有回调并保持位置不变 - **Panic 安全性**:回调中的 panic 会被正确处理并记录堆栈跟踪 - **有序执行**:回调按照添加的顺序执行 @@ -46,12 +46,12 @@ session.RemoveCloseCallback("cleanup", "resources") // 当会话关闭时,回调会自动执行 ``` -**注意**:关闭回调在会话关闭时会在独立 goroutine 中执行。 +**注意**:在会话关闭期间,回调在专用 goroutine 中顺序执行以保持添加顺序。 ### 回调管理 - **AddCloseCallback**:注册一个在会话关闭时执行的回调 -- **RemoveCloseCallback**:移除之前注册的回调 +- **RemoveCloseCallback**:移除之前注册的回调(未找到时无操作;可安全多次调用) - **线程安全**:所有操作都是线程安全的,可以并发调用 ### 类型要求 @@ -60,6 +60,7 @@ session.RemoveCloseCallback("cleanup", "resources") **✅ 支持的类型:** - **基本类型**:`string`、`int`、`int8`、`int16`、`int32`、`int64`、`uint`、`uint8`、`uint16`、`uint32`、`uint64`、`uintptr`、`float32`、`float64`、`bool`、`complex64`、`complex128` + - ⚠️ 避免使用 `float*`/`complex*` 作为键,因为 NaN 和精度语义问题;建议使用字符串/整数 - **指针类型**:指向任何类型的指针(如 `*int`、`*string`、`*MyStruct`) - **接口类型**:仅当其动态值为可比较类型时可比较;若动态值不可比较,使用"=="将被安全忽略并记录错误日志 - **通道类型**:通道类型(按通道标识比较) From 59ecd59533497c55d3896ddf408e9168f0f8932c Mon Sep 17 00:00:00 2001 From: mazheng Date: Thu, 18 Sep 2025 16:16:37 +0800 Subject: [PATCH 23/25] =?UTF-8?q?isclose=E6=94=BE=E5=9C=A8unlock=E5=90=8E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- transport/session_callback.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/transport/session_callback.go b/transport/session_callback.go index 1cd2e4fc..5fe91eba 100644 --- a/transport/session_callback.go +++ b/transport/session_callback.go @@ -29,11 +29,14 @@ package getty // - The combination of handler and key must be unique, otherwise it will override previous callbacks // - Callback functions will be executed in the order they were added when the session closes func (s *session) AddCloseCallback(handler, key any, f CallBackFunc) { - if s.IsClosed() { + if f == nil { return } s.closeCallbackMutex.Lock() defer s.closeCallbackMutex.Unlock() + if s.IsClosed() { + return + } s.closeCallback.Add(handler, key, f) } @@ -50,11 +53,11 @@ func (s *session) AddCloseCallback(handler, key any, f CallBackFunc) { // - If no matching callback is found, this operation will have no effect // - The removal operation is thread-safe func (s *session) RemoveCloseCallback(handler, key any) { + s.closeCallbackMutex.Lock() + defer s.closeCallbackMutex.Unlock() if s.IsClosed() { return } - s.closeCallbackMutex.Lock() - defer s.closeCallbackMutex.Unlock() s.closeCallback.Remove(handler, key) } From 8200644aaf8abaf704a4046f31efae406ae90025 Mon Sep 17 00:00:00 2001 From: mazheng Date: Thu, 18 Sep 2025 18:25:46 +0800 Subject: [PATCH 24/25] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dcallback.go=E6=A0=BC?= =?UTF-8?q?=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- transport/callback.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/transport/callback.go b/transport/callback.go index 02569d62..4ea94c0d 100644 --- a/transport/callback.go +++ b/transport/callback.go @@ -20,11 +20,16 @@ package getty import ( "fmt" "reflect" +) - log "github.com/AlexStocks/getty/util" +import ( perrors "github.com/pkg/errors" ) +import ( + log "github.com/AlexStocks/getty/util" +) + // callbackNode represents a node in the callback linked list // Each node contains handler identifier, key, callback function and pointer to next node type callbackNode struct { From c0ee5b7ef25b4d3d697c5b37845cba9020ae6948 Mon Sep 17 00:00:00 2001 From: mazheng Date: Fri, 19 Sep 2025 12:18:21 +0800 Subject: [PATCH 25/25] =?UTF-8?q?doc=E6=8F=8F=E8=BF=B0=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 5 +++-- README_CN.md | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 0b0a6796..9cf3136f 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,7 @@ Getty provides a robust callback system that allows you to register and manage c - **Thread-safe operations**: All callback operations are protected by mutex locks - **Replace semantics**: Adding with the same (handler, key) replaces the existing callback in place (position preserved) -- **Panic safety**: Callback panics are properly handled and logged with stack traces +- **Panic safety**: During session close, callbacks run in a dedicated goroutine with defer/recover; panics are logged with stack traces and do not escape the close path - **Ordered execution**: Callbacks are executed in the order they were added ### Usage Example @@ -41,12 +41,13 @@ session.AddCloseCallback("cleanup", "resources", func() { }) // Remove a specific callback +// Safe to call even if the pair was never added (no-op) session.RemoveCloseCallback("cleanup", "resources") // Callbacks are automatically executed when the session closes ``` -**Note**: During session shutdown, callbacks are executed sequentially in a dedicated goroutine to preserve the add-order. +**Note**: During session shutdown, callbacks are executed sequentially in a dedicated goroutine to preserve add-order, with defer/recover to log panics without letting them escape the close path. ### Callback Management diff --git a/README_CN.md b/README_CN.md index 680c5dba..c88ab627 100644 --- a/README_CN.md +++ b/README_CN.md @@ -28,7 +28,7 @@ Getty 提供了一个强大的回调系统,允许您为会话生命周期事 - **线程安全操作**:所有回调操作都受到互斥锁保护 - **替换语义**:使用相同的 (handler, key) 添加会替换现有回调并保持位置不变 -- **Panic 安全性**:回调中的 panic 会被正确处理并记录堆栈跟踪 +- **Panic 安全性**:在会话关闭期间,回调在专用 goroutine 中运行,带有 defer/recover;panic 会被记录堆栈跟踪且不会逃逸出关闭路径 - **有序执行**:回调按照添加的顺序执行 ### 使用示例 @@ -41,12 +41,13 @@ session.AddCloseCallback("cleanup", "resources", func() { }) // 移除特定回调 +// 即使从未添加过该对也可以安全调用(无操作) session.RemoveCloseCallback("cleanup", "resources") // 当会话关闭时,回调会自动执行 ``` -**注意**:在会话关闭期间,回调在专用 goroutine 中顺序执行以保持添加顺序。 +**注意**:在会话关闭期间,回调在专用 goroutine 中顺序执行以保持添加顺序,带有 defer/recover 来记录 panic 而不让它们逃逸出关闭路径。 ### 回调管理