From 582984828271dab503c951c25798f63dd0f672de Mon Sep 17 00:00:00 2001 From: mohangjie Date: Fri, 18 Apr 2025 17:34:19 +0800 Subject: [PATCH 01/19] update --- pkg/server/conn.go | 12 ++++++++++++ pkg/server/conn_stmt.go | 16 ++++++++++++++++ pkg/util/sqlkiller/sqlkiller.go | 20 ++++++++++++++++++++ pkg/util/topsql/reporter/pubsub.go | 3 +-- 4 files changed, 49 insertions(+), 2 deletions(-) diff --git a/pkg/server/conn.go b/pkg/server/conn.go index 88a4d948b72e7..a8b6dc12a3367 100644 --- a/pkg/server/conn.go +++ b/pkg/server/conn.go @@ -2076,9 +2076,21 @@ func (cc *clientConn) handleStmt( //nolint: errcheck rs.Finish() }) + fn := func() bool { + b := make([]byte, 1) + cc.bufReadConn.SetReadDeadline(time.Now().Add(30 * time.Microsecond)) + _, err = cc.bufReadConn.Read(b) + if terror.ErrorEqual(err, io.EOF) { + return false + } + cc.bufReadConn.SetReadDeadline(time.Time{}) + return true + } + cc.ctx.GetSessionVars().SQLKiller.IsConnectionAlive.Store(&fn) cc.ctx.GetSessionVars().SQLKiller.InWriteResultSet.Store(true) defer cc.ctx.GetSessionVars().SQLKiller.InWriteResultSet.Store(false) defer cc.ctx.GetSessionVars().SQLKiller.ClearFinishFunc() + defer cc.ctx.GetSessionVars().SQLKiller.IsConnectionAlive.Store(nil) if retryable, err := cc.writeResultSet(ctx, rs, false, status, 0); err != nil { return retryable, err } diff --git a/pkg/server/conn_stmt.go b/pkg/server/conn_stmt.go index f39853af49c85..61354a852b785 100644 --- a/pkg/server/conn_stmt.go +++ b/pkg/server/conn_stmt.go @@ -38,6 +38,7 @@ package server import ( "context" "encoding/binary" + "io" "runtime/trace" "strconv" "time" @@ -50,6 +51,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/charset" "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/parser/terror" plannercore "github.com/pingcap/tidb/pkg/planner/core" "github.com/pingcap/tidb/pkg/server/internal/dump" "github.com/pingcap/tidb/pkg/server/internal/parse" @@ -231,6 +233,20 @@ func (cc *clientConn) executePlanCacheStmt(ctx context.Context, stmt any, args [ ctx = context.WithValue(ctx, execdetails.StmtExecDetailKey, &execdetails.StmtExecDetails{}) ctx = context.WithValue(ctx, util.ExecDetailsKey, &util.ExecDetails{}) ctx = context.WithValue(ctx, util.RUDetailsCtxKey, util.NewRUDetails()) + + fn := func() bool { + b := make([]byte, 1) + cc.bufReadConn.SetReadDeadline(time.Now().Add(30*time.Microsecond)) + _, err := cc.bufReadConn.Read(b) + if terror.ErrorEqual(err, io.EOF) { + return false + } + cc.bufReadConn.SetReadDeadline(time.Time{}) + return true + } + cc.ctx.GetSessionVars().SQLKiller.IsConnectionAlive.Store(&fn) + defer cc.ctx.GetSessionVars().SQLKiller.IsConnectionAlive.Store(nil) + //nolint:forcetypeassert retryable, err := cc.executePreparedStmtAndWriteResult(ctx, stmt.(PreparedStatement), args, useCursor) if err != nil { diff --git a/pkg/util/sqlkiller/sqlkiller.go b/pkg/util/sqlkiller/sqlkiller.go index da81f99ee535d..ab821846e809a 100644 --- a/pkg/util/sqlkiller/sqlkiller.go +++ b/pkg/util/sqlkiller/sqlkiller.go @@ -18,6 +18,7 @@ import ( "math/rand" "sync" "sync/atomic" + "time" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/util/dbterror/exeerrors" @@ -39,6 +40,8 @@ const ( // so that errors in client can be correctly converted to tidb errors. ) +const checkConnectionAliveDur time.Duration = time.Second + // SQLKiller is used to kill a query. type SQLKiller struct { Signal killSignal @@ -51,6 +54,9 @@ type SQLKiller struct { // InWriteResultSet is used to indicate whether the query is currently calling clientConn.writeResultSet(). // If the query is in writeResultSet and Finish() can acquire rs.finishLock, we can assume the query is waiting for the client to receive data from the server over network I/O. InWriteResultSet atomic.Bool + + lastCheckTime atomic.Pointer[time.Time] + IsConnectionAlive atomic.Pointer[func() bool] } // SendKillSignal sends a kill signal to the query. @@ -122,6 +128,20 @@ func (killer *SQLKiller) HandleSignal() error { } } }) + + // Checks if the connection is alive. + // For performance reasons, the check interval should be at least `checkConnectionAliveDur`(1 second). + fn := killer.IsConnectionAlive.Load() + ts := killer.lastCheckTime.Load() + if fn != nil && (ts == nil || time.Since(*ts) > checkConnectionAliveDur) { + now := time.Now() + killer.lastCheckTime.Store(&now) + // Skip the first time. + if ts != nil && !(*fn)() { + atomic.CompareAndSwapUint32(&killer.Signal, 0, QueryInterrupted) + } + } + status := atomic.LoadUint32(&killer.Signal) err := killer.getKillError(status) if status == ServerMemoryExceeded { diff --git a/pkg/util/topsql/reporter/pubsub.go b/pkg/util/topsql/reporter/pubsub.go index a9ce1fe7e9b8c..49c5be18af372 100644 --- a/pkg/util/topsql/reporter/pubsub.go +++ b/pkg/util/topsql/reporter/pubsub.go @@ -109,12 +109,11 @@ func (ds *pubSubDataSink) run() error { for { select { case task := <-ds.sendTaskCh: - ctx, cancel := context.WithDeadline(ds.ctx, task.deadline) + ctx, _ := context.WithDeadline(ds.ctx, task.deadline) // nolint: lostcancel var err error start := time.Now() go util.WithRecovery(func() { - defer cancel() err = ds.doSend(ctx, task.data) if err != nil { From 165e4a054edd7c406397e12a7dcad8cbb4456ba2 Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Mon, 21 Apr 2025 16:30:40 +0800 Subject: [PATCH 02/19] for test --- pkg/server/conn_stmt.go | 2 +- pkg/util/sqlkiller/sqlkiller.go | 10 ++-------- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/pkg/server/conn_stmt.go b/pkg/server/conn_stmt.go index 61354a852b785..cb91d9b1b5c03 100644 --- a/pkg/server/conn_stmt.go +++ b/pkg/server/conn_stmt.go @@ -236,7 +236,7 @@ func (cc *clientConn) executePlanCacheStmt(ctx context.Context, stmt any, args [ fn := func() bool { b := make([]byte, 1) - cc.bufReadConn.SetReadDeadline(time.Now().Add(30*time.Microsecond)) + cc.bufReadConn.SetReadDeadline(time.Now().Add(30 * time.Microsecond)) _, err := cc.bufReadConn.Read(b) if terror.ErrorEqual(err, io.EOF) { return false diff --git a/pkg/util/sqlkiller/sqlkiller.go b/pkg/util/sqlkiller/sqlkiller.go index ab821846e809a..5b7a72455b01f 100644 --- a/pkg/util/sqlkiller/sqlkiller.go +++ b/pkg/util/sqlkiller/sqlkiller.go @@ -132,14 +132,8 @@ func (killer *SQLKiller) HandleSignal() error { // Checks if the connection is alive. // For performance reasons, the check interval should be at least `checkConnectionAliveDur`(1 second). fn := killer.IsConnectionAlive.Load() - ts := killer.lastCheckTime.Load() - if fn != nil && (ts == nil || time.Since(*ts) > checkConnectionAliveDur) { - now := time.Now() - killer.lastCheckTime.Store(&now) - // Skip the first time. - if ts != nil && !(*fn)() { - atomic.CompareAndSwapUint32(&killer.Signal, 0, QueryInterrupted) - } + if fn != nil && !(*fn)() { + atomic.CompareAndSwapUint32(&killer.Signal, 0, QueryInterrupted) } status := atomic.LoadUint32(&killer.Signal) From f1a6bdedcc536da01a716fbf305261b76c4ad1e2 Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Mon, 21 Apr 2025 16:43:44 +0800 Subject: [PATCH 03/19] update --- pkg/util/topsql/reporter/pubsub.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/util/topsql/reporter/pubsub.go b/pkg/util/topsql/reporter/pubsub.go index 49c5be18af372..a9ce1fe7e9b8c 100644 --- a/pkg/util/topsql/reporter/pubsub.go +++ b/pkg/util/topsql/reporter/pubsub.go @@ -109,11 +109,12 @@ func (ds *pubSubDataSink) run() error { for { select { case task := <-ds.sendTaskCh: - ctx, _ := context.WithDeadline(ds.ctx, task.deadline) // nolint: lostcancel + ctx, cancel := context.WithDeadline(ds.ctx, task.deadline) var err error start := time.Now() go util.WithRecovery(func() { + defer cancel() err = ds.doSend(ctx, task.data) if err != nil { From 476104bf2e3b0927a54d7c7d3f6626c72b1a8529 Mon Sep 17 00:00:00 2001 From: Jason Mo Date: Mon, 21 Apr 2025 17:50:09 +0800 Subject: [PATCH 04/19] fix --- pkg/server/conn.go | 16 ++++++++++++---- pkg/server/conn_stmt.go | 16 ++++++++++++---- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/pkg/server/conn.go b/pkg/server/conn.go index a8b6dc12a3367..5965b6e838f5f 100644 --- a/pkg/server/conn.go +++ b/pkg/server/conn.go @@ -2078,12 +2078,20 @@ func (cc *clientConn) handleStmt( }) fn := func() bool { b := make([]byte, 1) - cc.bufReadConn.SetReadDeadline(time.Now().Add(30 * time.Microsecond)) - _, err = cc.bufReadConn.Read(b) - if terror.ErrorEqual(err, io.EOF) { + cc.mu.Lock() + err1 := cc.bufReadConn.SetReadDeadline(time.Now().Add(30 * time.Microsecond)) + if err1 != nil { + return true + } + _, err1 = cc.bufReadConn.Read(b) + if terror.ErrorEqual(err1, io.EOF) { return false } - cc.bufReadConn.SetReadDeadline(time.Time{}) + err1 = cc.bufReadConn.SetReadDeadline(time.Time{}) + if err1 != nil { + return true + } + cc.mu.Unlock() return true } cc.ctx.GetSessionVars().SQLKiller.IsConnectionAlive.Store(&fn) diff --git a/pkg/server/conn_stmt.go b/pkg/server/conn_stmt.go index cb91d9b1b5c03..a40b4322071fa 100644 --- a/pkg/server/conn_stmt.go +++ b/pkg/server/conn_stmt.go @@ -236,12 +236,20 @@ func (cc *clientConn) executePlanCacheStmt(ctx context.Context, stmt any, args [ fn := func() bool { b := make([]byte, 1) - cc.bufReadConn.SetReadDeadline(time.Now().Add(30 * time.Microsecond)) - _, err := cc.bufReadConn.Read(b) - if terror.ErrorEqual(err, io.EOF) { + cc.mu.Lock() + err1 := cc.bufReadConn.SetReadDeadline(time.Now().Add(30 * time.Microsecond)) + if err1 != nil { + return true + } + _, err1 = cc.bufReadConn.Read(b) + if terror.ErrorEqual(err1, io.EOF) { return false } - cc.bufReadConn.SetReadDeadline(time.Time{}) + err1 = cc.bufReadConn.SetReadDeadline(time.Time{}) + if err1 != nil { + return true + } + cc.mu.Unlock() return true } cc.ctx.GetSessionVars().SQLKiller.IsConnectionAlive.Store(&fn) From 70c1922a7e8c73f0f8af2102f231b0609a3df054 Mon Sep 17 00:00:00 2001 From: mohangjie Date: Mon, 21 Apr 2025 20:38:23 +0800 Subject: [PATCH 05/19] update --- pkg/server/conn.go | 3 +-- pkg/server/conn_stmt.go | 3 +-- pkg/server/internal/util/buffered_read_conn.go | 5 +++++ 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/pkg/server/conn.go b/pkg/server/conn.go index 5965b6e838f5f..f7d3e2f240185 100644 --- a/pkg/server/conn.go +++ b/pkg/server/conn.go @@ -2077,13 +2077,12 @@ func (cc *clientConn) handleStmt( rs.Finish() }) fn := func() bool { - b := make([]byte, 1) cc.mu.Lock() err1 := cc.bufReadConn.SetReadDeadline(time.Now().Add(30 * time.Microsecond)) if err1 != nil { return true } - _, err1 = cc.bufReadConn.Read(b) + _, err1 = cc.bufReadConn.Peek(1) if terror.ErrorEqual(err1, io.EOF) { return false } diff --git a/pkg/server/conn_stmt.go b/pkg/server/conn_stmt.go index a40b4322071fa..282360b4d20fe 100644 --- a/pkg/server/conn_stmt.go +++ b/pkg/server/conn_stmt.go @@ -235,13 +235,12 @@ func (cc *clientConn) executePlanCacheStmt(ctx context.Context, stmt any, args [ ctx = context.WithValue(ctx, util.RUDetailsCtxKey, util.NewRUDetails()) fn := func() bool { - b := make([]byte, 1) cc.mu.Lock() err1 := cc.bufReadConn.SetReadDeadline(time.Now().Add(30 * time.Microsecond)) if err1 != nil { return true } - _, err1 = cc.bufReadConn.Read(b) + _, err1 = cc.bufReadConn.Peek(1) if terror.ErrorEqual(err1, io.EOF) { return false } diff --git a/pkg/server/internal/util/buffered_read_conn.go b/pkg/server/internal/util/buffered_read_conn.go index d6bc1f24c9bc4..ef523c6667b56 100644 --- a/pkg/server/internal/util/buffered_read_conn.go +++ b/pkg/server/internal/util/buffered_read_conn.go @@ -40,3 +40,8 @@ func NewBufferedReadConn(conn net.Conn) *BufferedReadConn { func (conn BufferedReadConn) Read(b []byte) (n int, err error) { return conn.rb.Read(b) } + +// Peek reads data from the connection. +func (conn BufferedReadConn) Peek(n int) ([]byte, error) { + return conn.rb.Peek(n) +} From e2cc7d5e9531752a596b7d2b990e8d5a4a487674 Mon Sep 17 00:00:00 2001 From: mohangjie Date: Tue, 22 Apr 2025 14:38:28 +0800 Subject: [PATCH 06/19] update --- pkg/server/conn.go | 26 +++++++++++++------------- pkg/server/conn_stmt.go | 26 +++++++++++++------------- 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/pkg/server/conn.go b/pkg/server/conn.go index f7d3e2f240185..73eccf5b441a1 100644 --- a/pkg/server/conn.go +++ b/pkg/server/conn.go @@ -2077,20 +2077,20 @@ func (cc *clientConn) handleStmt( rs.Finish() }) fn := func() bool { - cc.mu.Lock() - err1 := cc.bufReadConn.SetReadDeadline(time.Now().Add(30 * time.Microsecond)) - if err1 != nil { - return true - } - _, err1 = cc.bufReadConn.Peek(1) - if terror.ErrorEqual(err1, io.EOF) { - return false - } - err1 = cc.bufReadConn.SetReadDeadline(time.Time{}) - if err1 != nil { - return true + if cc.bufReadConn != nil { + cc.mu.Lock() + defer cc.mu.Unlock() + err1 := cc.bufReadConn.SetReadDeadline(time.Now().Add(30 * time.Microsecond)) + if err1 != nil { + return true + } + // nolint:errcheck + defer cc.bufReadConn.SetDeadline(time.Time{}) + _, err1 = cc.bufReadConn.Peek(1) + if terror.ErrorEqual(err1, io.EOF) { + return false + } } - cc.mu.Unlock() return true } cc.ctx.GetSessionVars().SQLKiller.IsConnectionAlive.Store(&fn) diff --git a/pkg/server/conn_stmt.go b/pkg/server/conn_stmt.go index 282360b4d20fe..066d3974f4d47 100644 --- a/pkg/server/conn_stmt.go +++ b/pkg/server/conn_stmt.go @@ -235,20 +235,20 @@ func (cc *clientConn) executePlanCacheStmt(ctx context.Context, stmt any, args [ ctx = context.WithValue(ctx, util.RUDetailsCtxKey, util.NewRUDetails()) fn := func() bool { - cc.mu.Lock() - err1 := cc.bufReadConn.SetReadDeadline(time.Now().Add(30 * time.Microsecond)) - if err1 != nil { - return true - } - _, err1 = cc.bufReadConn.Peek(1) - if terror.ErrorEqual(err1, io.EOF) { - return false - } - err1 = cc.bufReadConn.SetReadDeadline(time.Time{}) - if err1 != nil { - return true + if cc.bufReadConn != nil { + cc.mu.Lock() + defer cc.mu.Unlock() + err1 := cc.bufReadConn.SetReadDeadline(time.Now().Add(30 * time.Microsecond)) + if err1 != nil { + return true + } + // nolint:errcheck + defer cc.bufReadConn.SetReadDeadline(time.Time{}) + _, err1 = cc.bufReadConn.Peek(1) + if terror.ErrorEqual(err1, io.EOF) { + return false + } } - cc.mu.Unlock() return true } cc.ctx.GetSessionVars().SQLKiller.IsConnectionAlive.Store(&fn) From 7203695b8dcf32a6471cb28f5fdcea9cabe3923c Mon Sep 17 00:00:00 2001 From: mohangjie Date: Tue, 22 Apr 2025 15:05:32 +0800 Subject: [PATCH 07/19] update --- pkg/server/conn.go | 3 +-- pkg/server/conn_stmt.go | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/server/conn.go b/pkg/server/conn.go index 73eccf5b441a1..3b57060cc0104 100644 --- a/pkg/server/conn.go +++ b/pkg/server/conn.go @@ -2077,8 +2077,7 @@ func (cc *clientConn) handleStmt( rs.Finish() }) fn := func() bool { - if cc.bufReadConn != nil { - cc.mu.Lock() + if cc.bufReadConn != nil && cc.mu.TryLock() { defer cc.mu.Unlock() err1 := cc.bufReadConn.SetReadDeadline(time.Now().Add(30 * time.Microsecond)) if err1 != nil { diff --git a/pkg/server/conn_stmt.go b/pkg/server/conn_stmt.go index 066d3974f4d47..152015be1a22f 100644 --- a/pkg/server/conn_stmt.go +++ b/pkg/server/conn_stmt.go @@ -235,8 +235,7 @@ func (cc *clientConn) executePlanCacheStmt(ctx context.Context, stmt any, args [ ctx = context.WithValue(ctx, util.RUDetailsCtxKey, util.NewRUDetails()) fn := func() bool { - if cc.bufReadConn != nil { - cc.mu.Lock() + if cc.bufReadConn != nil && cc.mu.TryLock() { defer cc.mu.Unlock() err1 := cc.bufReadConn.SetReadDeadline(time.Now().Add(30 * time.Microsecond)) if err1 != nil { From 34687cf1586d3597191b97789739c3122fa5e905 Mon Sep 17 00:00:00 2001 From: mohangjie Date: Tue, 22 Apr 2025 15:41:52 +0800 Subject: [PATCH 08/19] update --- pkg/util/deeptest/statictesthelper.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/util/deeptest/statictesthelper.go b/pkg/util/deeptest/statictesthelper.go index b9493444037ae..5b4252dbdb3e2 100644 --- a/pkg/util/deeptest/statictesthelper.go +++ b/pkg/util/deeptest/statictesthelper.go @@ -142,7 +142,7 @@ func (h *staticTestHelper) assertDeepClonedEqual(t require.TestingT, valA, valB for i := range valA.NumField() { h.assertDeepClonedEqual(t, valA.Field(i), valB.Field(i), path+"."+valA.Type().Field(i).Name) } - case reflect.Ptr: + case reflect.Ptr, reflect.UnsafePointer: if valA.IsNil() && valB.IsNil() { return } From 5e9adc9b8335727b8055b727e939be744f93e6ce Mon Sep 17 00:00:00 2001 From: mohangjie Date: Tue, 22 Apr 2025 20:13:06 +0800 Subject: [PATCH 09/19] use syscall --- pkg/server/conn.go | 11 +--- pkg/server/conn_stmt.go | 13 +---- pkg/server/internal/util/BUILD.bazel | 2 + .../internal/util/buffered_read_conn.go | 5 -- .../internal/util/buffered_read_conn_linux.go | 53 +++++++++++++++++++ .../util/buffered_read_conn_windows.go | 25 +++++++++ 6 files changed, 82 insertions(+), 27 deletions(-) create mode 100644 pkg/server/internal/util/buffered_read_conn_linux.go create mode 100644 pkg/server/internal/util/buffered_read_conn_windows.go diff --git a/pkg/server/conn.go b/pkg/server/conn.go index 3b57060cc0104..bdb24704de4db 100644 --- a/pkg/server/conn.go +++ b/pkg/server/conn.go @@ -2079,16 +2079,7 @@ func (cc *clientConn) handleStmt( fn := func() bool { if cc.bufReadConn != nil && cc.mu.TryLock() { defer cc.mu.Unlock() - err1 := cc.bufReadConn.SetReadDeadline(time.Now().Add(30 * time.Microsecond)) - if err1 != nil { - return true - } - // nolint:errcheck - defer cc.bufReadConn.SetDeadline(time.Time{}) - _, err1 = cc.bufReadConn.Peek(1) - if terror.ErrorEqual(err1, io.EOF) { - return false - } + return cc.bufReadConn.IsAlive() != 0 } return true } diff --git a/pkg/server/conn_stmt.go b/pkg/server/conn_stmt.go index 152015be1a22f..80b04eb098256 100644 --- a/pkg/server/conn_stmt.go +++ b/pkg/server/conn_stmt.go @@ -38,7 +38,6 @@ package server import ( "context" "encoding/binary" - "io" "runtime/trace" "strconv" "time" @@ -51,7 +50,6 @@ import ( "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/charset" "github.com/pingcap/tidb/pkg/parser/mysql" - "github.com/pingcap/tidb/pkg/parser/terror" plannercore "github.com/pingcap/tidb/pkg/planner/core" "github.com/pingcap/tidb/pkg/server/internal/dump" "github.com/pingcap/tidb/pkg/server/internal/parse" @@ -237,16 +235,7 @@ func (cc *clientConn) executePlanCacheStmt(ctx context.Context, stmt any, args [ fn := func() bool { if cc.bufReadConn != nil && cc.mu.TryLock() { defer cc.mu.Unlock() - err1 := cc.bufReadConn.SetReadDeadline(time.Now().Add(30 * time.Microsecond)) - if err1 != nil { - return true - } - // nolint:errcheck - defer cc.bufReadConn.SetReadDeadline(time.Time{}) - _, err1 = cc.bufReadConn.Peek(1) - if terror.ErrorEqual(err1, io.EOF) { - return false - } + return cc.bufReadConn.IsAlive() != 0 } return true } diff --git a/pkg/server/internal/util/BUILD.bazel b/pkg/server/internal/util/BUILD.bazel index eb027c3457139..58d5563387cfb 100644 --- a/pkg/server/internal/util/BUILD.bazel +++ b/pkg/server/internal/util/BUILD.bazel @@ -4,6 +4,8 @@ go_library( name = "util", srcs = [ "buffered_read_conn.go", + "buffered_read_conn_linux.go", + "buffered_read_conn_windows.go", "util.go", ], importpath = "github.com/pingcap/tidb/pkg/server/internal/util", diff --git a/pkg/server/internal/util/buffered_read_conn.go b/pkg/server/internal/util/buffered_read_conn.go index ef523c6667b56..d6bc1f24c9bc4 100644 --- a/pkg/server/internal/util/buffered_read_conn.go +++ b/pkg/server/internal/util/buffered_read_conn.go @@ -40,8 +40,3 @@ func NewBufferedReadConn(conn net.Conn) *BufferedReadConn { func (conn BufferedReadConn) Read(b []byte) (n int, err error) { return conn.rb.Read(b) } - -// Peek reads data from the connection. -func (conn BufferedReadConn) Peek(n int) ([]byte, error) { - return conn.rb.Peek(n) -} diff --git a/pkg/server/internal/util/buffered_read_conn_linux.go b/pkg/server/internal/util/buffered_read_conn_linux.go new file mode 100644 index 0000000000000..f1cd5e40d0574 --- /dev/null +++ b/pkg/server/internal/util/buffered_read_conn_linux.go @@ -0,0 +1,53 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build darwin || linux + +package util + +import ( + "crypto/tls" + "net" + "syscall" +) + +// IsAlive use syscall to detect the connection is alive or not. +// return value < 0, means unknow +// return value = 0, means not alive +// return value = 1, means still alive +func (conn BufferedReadConn) IsAlive() int { + var tcp *net.TCPConn + if tlsConn, ok := conn.Conn.(*tls.Conn); ok { + tcp, _ = tlsConn.NetConn().(*BufferedReadConn).Conn.(*net.TCPConn) + } else { + tcp, _ = conn.Conn.(*net.TCPConn) + } + + if tcp == nil { + return -1 + } + + f, err := tcp.File() + if err != nil { + return -1 + } + + var n int + b := []byte{0} + n, _, err = syscall.Recvfrom(int(f.Fd()), b, syscall.MSG_PEEK|syscall.MSG_DONTWAIT) + if (n == 0 && err == nil) || err == syscall.ECONNRESET { + return 0 + } + return 1 +} diff --git a/pkg/server/internal/util/buffered_read_conn_windows.go b/pkg/server/internal/util/buffered_read_conn_windows.go new file mode 100644 index 0000000000000..ed67329e7b41b --- /dev/null +++ b/pkg/server/internal/util/buffered_read_conn_windows.go @@ -0,0 +1,25 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build windows + +package util + +// IsAlive use syscall to detect the connection is alive or not. +// return value < 0, means unknow +// return value = 0, means not alive +// return value = 1, means still alive +func (conn BufferedReadConn) IsAlive() int { + return -1 +} From db7eb12f6f5cf91e8b15542c252e373c89dbd3fb Mon Sep 17 00:00:00 2001 From: mohangjie Date: Wed, 23 Apr 2025 15:29:30 +0800 Subject: [PATCH 10/19] add test cases --- pkg/server/conn.go | 3 +- pkg/server/conn_stmt.go | 3 +- pkg/server/tests/commontest/tidb_test.go | 49 ++++++++++++++++++++++++ pkg/util/sqlkiller/BUILD.bazel | 1 + pkg/util/sqlkiller/sqlkiller.go | 21 ++++++++-- 5 files changed, 69 insertions(+), 8 deletions(-) diff --git a/pkg/server/conn.go b/pkg/server/conn.go index bdb24704de4db..96abe06efe71c 100644 --- a/pkg/server/conn.go +++ b/pkg/server/conn.go @@ -2077,8 +2077,7 @@ func (cc *clientConn) handleStmt( rs.Finish() }) fn := func() bool { - if cc.bufReadConn != nil && cc.mu.TryLock() { - defer cc.mu.Unlock() + if cc.bufReadConn != nil { return cc.bufReadConn.IsAlive() != 0 } return true diff --git a/pkg/server/conn_stmt.go b/pkg/server/conn_stmt.go index 80b04eb098256..0a78e97f69d8d 100644 --- a/pkg/server/conn_stmt.go +++ b/pkg/server/conn_stmt.go @@ -233,8 +233,7 @@ func (cc *clientConn) executePlanCacheStmt(ctx context.Context, stmt any, args [ ctx = context.WithValue(ctx, util.RUDetailsCtxKey, util.NewRUDetails()) fn := func() bool { - if cc.bufReadConn != nil && cc.mu.TryLock() { - defer cc.mu.Unlock() + if cc.bufReadConn != nil { return cc.bufReadConn.IsAlive() != 0 } return true diff --git a/pkg/server/tests/commontest/tidb_test.go b/pkg/server/tests/commontest/tidb_test.go index bc699144c51fa..1cfa0163f8ba1 100644 --- a/pkg/server/tests/commontest/tidb_test.go +++ b/pkg/server/tests/commontest/tidb_test.go @@ -30,6 +30,7 @@ import ( "sync/atomic" "testing" "time" + "unsafe" "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" @@ -3396,3 +3397,51 @@ func TestBatchGetTypeForRowExpr(t *testing.T) { ts.CheckRows(t, rows, "a b\nc d") }) } + +func TestIssue57531(t *testing.T) { + if runtime.GOOS != "linux" && runtime.GOOS != "darwin" { + t.Skip() + } + ts := servertestkit.CreateTidbTestSuite(t) + + ts.RunTests(t, nil, func(dbt *testkit.DBTestKit) { + var conn *sql.Conn + var netConn net.Conn + conn, _ = dbt.GetDB().Conn(context.Background()) + conn.Raw(func(driverConn any) error { + v := reflect.ValueOf(driverConn) + if v.Kind() == reflect.Ptr { + v = v.Elem() + } + f := v.FieldByName("netConn") + if f.IsValid() && f.Type().Implements(reflect.TypeOf((*net.Conn)(nil)).Elem()) { + netConn = *(*net.Conn)(unsafe.Pointer(f.UnsafeAddr())) + } + return nil + }) + + go func() { + conn.QueryContext(context.Background(), "select sleep(300)") + }() + time.Sleep(200 * time.Millisecond) + len := 0 + rs := dbt.MustQuery("show processlist") + for rs.Next() { + len++ + } + require.Equal(t, len, 2) + + netConn.Close() + }) + + time.Sleep(10 * time.Millisecond) + + ts.RunTests(t, nil, func(dbt *testkit.DBTestKit) { + len := 0 + rs := dbt.MustQuery("show processlist") + for rs.Next() { + len++ + } + require.Equal(t, len, 1) + }) +} diff --git a/pkg/util/sqlkiller/BUILD.bazel b/pkg/util/sqlkiller/BUILD.bazel index 5a4eacf70afd6..5a3a76d5a558e 100644 --- a/pkg/util/sqlkiller/BUILD.bazel +++ b/pkg/util/sqlkiller/BUILD.bazel @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/util/dbterror/exeerrors", + "//pkg/util/intest", "//pkg/util/logutil", "@com_github_pingcap_failpoint//:failpoint", "@org_uber_go_zap//:zap", diff --git a/pkg/util/sqlkiller/sqlkiller.go b/pkg/util/sqlkiller/sqlkiller.go index 5b7a72455b01f..36da73de52a4f 100644 --- a/pkg/util/sqlkiller/sqlkiller.go +++ b/pkg/util/sqlkiller/sqlkiller.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/util/dbterror/exeerrors" + "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/logutil" "go.uber.org/zap" ) @@ -40,8 +41,6 @@ const ( // so that errors in client can be correctly converted to tidb errors. ) -const checkConnectionAliveDur time.Duration = time.Second - // SQLKiller is used to kill a query. type SQLKiller struct { Signal killSignal @@ -132,8 +131,21 @@ func (killer *SQLKiller) HandleSignal() error { // Checks if the connection is alive. // For performance reasons, the check interval should be at least `checkConnectionAliveDur`(1 second). fn := killer.IsConnectionAlive.Load() - if fn != nil && !(*fn)() { - atomic.CompareAndSwapUint32(&killer.Signal, 0, QueryInterrupted) + lastCheckTime := killer.lastCheckTime.Load() + if fn != nil { + var checkConnectionAliveDur time.Duration = time.Second + now := time.Now() + if intest.InTest { + checkConnectionAliveDur = time.Millisecond + } + if lastCheckTime == nil { + killer.lastCheckTime.Store(&now) + } else if time.Since(*lastCheckTime) > checkConnectionAliveDur { + killer.lastCheckTime.Store(&now) + if !(*fn)() { + atomic.CompareAndSwapUint32(&killer.Signal, 0, QueryInterrupted) + } + } } status := atomic.LoadUint32(&killer.Signal) @@ -151,4 +163,5 @@ func (killer *SQLKiller) Reset() { logutil.BgLogger().Warn("kill finished", zap.Uint64("conn", killer.ConnID.Load())) } atomic.StoreUint32(&killer.Signal, 0) + killer.lastCheckTime.Store(nil) } From 0df96be2ff9430f86899e2e46094b86c1f227cbf Mon Sep 17 00:00:00 2001 From: mohangjie Date: Wed, 23 Apr 2025 15:53:20 +0800 Subject: [PATCH 11/19] improve test cases --- pkg/server/tests/commontest/tidb_test.go | 80 +++++++++++++----------- 1 file changed, 44 insertions(+), 36 deletions(-) diff --git a/pkg/server/tests/commontest/tidb_test.go b/pkg/server/tests/commontest/tidb_test.go index 1cfa0163f8ba1..a1f4e1f33e89f 100644 --- a/pkg/server/tests/commontest/tidb_test.go +++ b/pkg/server/tests/commontest/tidb_test.go @@ -3404,44 +3404,52 @@ func TestIssue57531(t *testing.T) { } ts := servertestkit.CreateTidbTestSuite(t) - ts.RunTests(t, nil, func(dbt *testkit.DBTestKit) { - var conn *sql.Conn - var netConn net.Conn - conn, _ = dbt.GetDB().Conn(context.Background()) - conn.Raw(func(driverConn any) error { - v := reflect.ValueOf(driverConn) - if v.Kind() == reflect.Ptr { - v = v.Elem() - } - f := v.FieldByName("netConn") - if f.IsValid() && f.Type().Implements(reflect.TypeOf((*net.Conn)(nil)).Elem()) { - netConn = *(*net.Conn)(unsafe.Pointer(f.UnsafeAddr())) - } - return nil - }) + for i := range 2 { + ts.RunTests(t, nil, func(dbt *testkit.DBTestKit) { + var conn *sql.Conn + var netConn net.Conn + conn, _ = dbt.GetDB().Conn(context.Background()) + conn.Raw(func(driverConn any) error { + v := reflect.ValueOf(driverConn) + if v.Kind() == reflect.Ptr { + v = v.Elem() + } + f := v.FieldByName("netConn") + if f.IsValid() && f.Type().Implements(reflect.TypeOf((*net.Conn)(nil)).Elem()) { + netConn = *(*net.Conn)(unsafe.Pointer(f.UnsafeAddr())) + } + return nil + }) - go func() { - conn.QueryContext(context.Background(), "select sleep(300)") - }() - time.Sleep(200 * time.Millisecond) - len := 0 - rs := dbt.MustQuery("show processlist") - for rs.Next() { - len++ - } - require.Equal(t, len, 2) + go func() { + if i == 0 { + conn.QueryContext(context.Background(), "select sleep(300)") + } else { + stmt, err := conn.PrepareContext(context.Background(), "select sleep(?)") + require.NoError(t, err) + stmt.Exec(300) + } + }() + time.Sleep(200 * time.Millisecond) + len := 0 + rs := dbt.MustQuery("show processlist") + for rs.Next() { + len++ + } + require.Equal(t, len, 2) - netConn.Close() - }) + netConn.Close() + }) - time.Sleep(10 * time.Millisecond) + time.Sleep(10 * time.Millisecond) - ts.RunTests(t, nil, func(dbt *testkit.DBTestKit) { - len := 0 - rs := dbt.MustQuery("show processlist") - for rs.Next() { - len++ - } - require.Equal(t, len, 1) - }) + ts.RunTests(t, nil, func(dbt *testkit.DBTestKit) { + len := 0 + rs := dbt.MustQuery("show processlist") + for rs.Next() { + len++ + } + require.Equal(t, len, 1) + }) + } } From 06542d079014980d9b3e795cb1ed97928616a4b8 Mon Sep 17 00:00:00 2001 From: mohangjie Date: Wed, 23 Apr 2025 16:05:18 +0800 Subject: [PATCH 12/19] add comments --- pkg/server/tests/commontest/tidb_test.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pkg/server/tests/commontest/tidb_test.go b/pkg/server/tests/commontest/tidb_test.go index a1f4e1f33e89f..cd50e9026ba3b 100644 --- a/pkg/server/tests/commontest/tidb_test.go +++ b/pkg/server/tests/commontest/tidb_test.go @@ -3409,6 +3409,8 @@ func TestIssue57531(t *testing.T) { var conn *sql.Conn var netConn net.Conn conn, _ = dbt.GetDB().Conn(context.Background()) + + // get the TCP connection conn.Raw(func(driverConn any) error { v := reflect.ValueOf(driverConn) if v.Kind() == reflect.Ptr { @@ -3421,6 +3423,7 @@ func TestIssue57531(t *testing.T) { return nil }) + // execute `select sleep(300)` go func() { if i == 0 { conn.QueryContext(context.Background(), "select sleep(300)") @@ -3431,6 +3434,8 @@ func TestIssue57531(t *testing.T) { } }() time.Sleep(200 * time.Millisecond) + + // have two sessions len := 0 rs := dbt.MustQuery("show processlist") for rs.Next() { @@ -3438,11 +3443,13 @@ func TestIssue57531(t *testing.T) { } require.Equal(t, len, 2) + // close tcp connection netConn.Close() }) time.Sleep(10 * time.Millisecond) + // the `select sleep(300)` is killed ts.RunTests(t, nil, func(dbt *testkit.DBTestKit) { len := 0 rs := dbt.MustQuery("show processlist") From 89b55ca893ab90501ef507f571f735c8fb6d6108 Mon Sep 17 00:00:00 2001 From: mohangjie Date: Wed, 23 Apr 2025 16:37:18 +0800 Subject: [PATCH 13/19] make lint --- pkg/server/tests/commontest/tidb_test.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/pkg/server/tests/commontest/tidb_test.go b/pkg/server/tests/commontest/tidb_test.go index cd50e9026ba3b..8ea8ff155a70d 100644 --- a/pkg/server/tests/commontest/tidb_test.go +++ b/pkg/server/tests/commontest/tidb_test.go @@ -3404,6 +3404,7 @@ func TestIssue57531(t *testing.T) { } ts := servertestkit.CreateTidbTestSuite(t) + var rsCnt int for i := range 2 { ts.RunTests(t, nil, func(dbt *testkit.DBTestKit) { var conn *sql.Conn @@ -3436,12 +3437,12 @@ func TestIssue57531(t *testing.T) { time.Sleep(200 * time.Millisecond) // have two sessions - len := 0 + rsCnt = 0 rs := dbt.MustQuery("show processlist") for rs.Next() { - len++ + rsCnt++ } - require.Equal(t, len, 2) + require.Equal(t, rsCnt, 2) // close tcp connection netConn.Close() @@ -3451,12 +3452,12 @@ func TestIssue57531(t *testing.T) { // the `select sleep(300)` is killed ts.RunTests(t, nil, func(dbt *testkit.DBTestKit) { - len := 0 + rsCnt = 0 rs := dbt.MustQuery("show processlist") for rs.Next() { - len++ + rsCnt++ } - require.Equal(t, len, 1) + require.Equal(t, rsCnt, 1) }) } } From 2213a6844840cf1915fb63646789baa684b02bbf Mon Sep 17 00:00:00 2001 From: mohangjie Date: Wed, 23 Apr 2025 19:49:57 +0800 Subject: [PATCH 14/19] change it --- pkg/server/internal/util/BUILD.bazel | 2 - .../internal/util/buffered_read_conn.go | 34 ++++++++++++ .../internal/util/buffered_read_conn_linux.go | 53 ------------------- .../util/buffered_read_conn_windows.go | 25 --------- 4 files changed, 34 insertions(+), 80 deletions(-) delete mode 100644 pkg/server/internal/util/buffered_read_conn_linux.go delete mode 100644 pkg/server/internal/util/buffered_read_conn_windows.go diff --git a/pkg/server/internal/util/BUILD.bazel b/pkg/server/internal/util/BUILD.bazel index 58d5563387cfb..eb027c3457139 100644 --- a/pkg/server/internal/util/BUILD.bazel +++ b/pkg/server/internal/util/BUILD.bazel @@ -4,8 +4,6 @@ go_library( name = "util", srcs = [ "buffered_read_conn.go", - "buffered_read_conn_linux.go", - "buffered_read_conn_windows.go", "util.go", ], importpath = "github.com/pingcap/tidb/pkg/server/internal/util", diff --git a/pkg/server/internal/util/buffered_read_conn.go b/pkg/server/internal/util/buffered_read_conn.go index d6bc1f24c9bc4..51dbcf4073046 100644 --- a/pkg/server/internal/util/buffered_read_conn.go +++ b/pkg/server/internal/util/buffered_read_conn.go @@ -16,7 +16,10 @@ package util import ( "bufio" + "io" "net" + "sync" + "time" ) // DefaultReaderSize is the default size of bufio.Reader. @@ -26,11 +29,13 @@ const DefaultReaderSize = 16 * 1024 type BufferedReadConn struct { net.Conn rb *bufio.Reader + mu *sync.Mutex } // NewBufferedReadConn creates a BufferedReadConn. func NewBufferedReadConn(conn net.Conn) *BufferedReadConn { return &BufferedReadConn{ + mu: &sync.Mutex{}, Conn: conn, rb: bufio.NewReaderSize(conn, DefaultReaderSize), } @@ -40,3 +45,32 @@ func NewBufferedReadConn(conn net.Conn) *BufferedReadConn { func (conn BufferedReadConn) Read(b []byte) (n int, err error) { return conn.rb.Read(b) } + +// Peek peeks from the connection. +func (conn BufferedReadConn) Peek(n int) ([]byte, error) { + return conn.rb.Peek(n) +} + +// IsAlive use syscall to detect the connection is alive or not. +// return value < 0, means unknow +// return value = 0, means not alive +// return value = 1, means still alive +func (conn BufferedReadConn) IsAlive() int { + if conn.mu.TryLock() { + defer conn.mu.Unlock() + err := conn.SetReadDeadline(time.Now().Add(30 * time.Microsecond)) + if err != nil { + return -1 + } + // nolint:errcheck + defer conn.SetReadDeadline(time.Time{}) + _, err = conn.Peek(1) + if err == io.EOF { + return 0 + } else if ne, ok := err.(net.Error); ok && ne.Timeout() { + return 1 + } + return -1 + } + return -1 +} diff --git a/pkg/server/internal/util/buffered_read_conn_linux.go b/pkg/server/internal/util/buffered_read_conn_linux.go deleted file mode 100644 index f1cd5e40d0574..0000000000000 --- a/pkg/server/internal/util/buffered_read_conn_linux.go +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright 2025 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build darwin || linux - -package util - -import ( - "crypto/tls" - "net" - "syscall" -) - -// IsAlive use syscall to detect the connection is alive or not. -// return value < 0, means unknow -// return value = 0, means not alive -// return value = 1, means still alive -func (conn BufferedReadConn) IsAlive() int { - var tcp *net.TCPConn - if tlsConn, ok := conn.Conn.(*tls.Conn); ok { - tcp, _ = tlsConn.NetConn().(*BufferedReadConn).Conn.(*net.TCPConn) - } else { - tcp, _ = conn.Conn.(*net.TCPConn) - } - - if tcp == nil { - return -1 - } - - f, err := tcp.File() - if err != nil { - return -1 - } - - var n int - b := []byte{0} - n, _, err = syscall.Recvfrom(int(f.Fd()), b, syscall.MSG_PEEK|syscall.MSG_DONTWAIT) - if (n == 0 && err == nil) || err == syscall.ECONNRESET { - return 0 - } - return 1 -} diff --git a/pkg/server/internal/util/buffered_read_conn_windows.go b/pkg/server/internal/util/buffered_read_conn_windows.go deleted file mode 100644 index ed67329e7b41b..0000000000000 --- a/pkg/server/internal/util/buffered_read_conn_windows.go +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright 2025 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build windows - -package util - -// IsAlive use syscall to detect the connection is alive or not. -// return value < 0, means unknow -// return value = 0, means not alive -// return value = 1, means still alive -func (conn BufferedReadConn) IsAlive() int { - return -1 -} From a8855523cc9c0806ebad612dd1dab67e87533edf Mon Sep 17 00:00:00 2001 From: mohangjie Date: Thu, 24 Apr 2025 10:01:03 +0800 Subject: [PATCH 15/19] update --- pkg/server/internal/util/buffered_read_conn.go | 6 +++--- pkg/server/tests/commontest/tidb_test.go | 3 --- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/pkg/server/internal/util/buffered_read_conn.go b/pkg/server/internal/util/buffered_read_conn.go index 51dbcf4073046..390dc6cd0bb2c 100644 --- a/pkg/server/internal/util/buffered_read_conn.go +++ b/pkg/server/internal/util/buffered_read_conn.go @@ -64,13 +64,13 @@ func (conn BufferedReadConn) IsAlive() int { } // nolint:errcheck defer conn.SetReadDeadline(time.Time{}) - _, err = conn.Peek(1) + b := make([]byte, 1) + b, err = conn.Peek(1) if err == io.EOF { return 0 - } else if ne, ok := err.(net.Error); ok && ne.Timeout() { + } else if ne, ok := err.(net.Error); ok && ne.Timeout() || len(b) != 0 { return 1 } - return -1 } return -1 } diff --git a/pkg/server/tests/commontest/tidb_test.go b/pkg/server/tests/commontest/tidb_test.go index 8ea8ff155a70d..17e0701dcfd82 100644 --- a/pkg/server/tests/commontest/tidb_test.go +++ b/pkg/server/tests/commontest/tidb_test.go @@ -3399,9 +3399,6 @@ func TestBatchGetTypeForRowExpr(t *testing.T) { } func TestIssue57531(t *testing.T) { - if runtime.GOOS != "linux" && runtime.GOOS != "darwin" { - t.Skip() - } ts := servertestkit.CreateTidbTestSuite(t) var rsCnt int From 750e087e4425e1cc84df068e8bb31fb635b3bd61 Mon Sep 17 00:00:00 2001 From: mohangjie Date: Thu, 24 Apr 2025 10:14:23 +0800 Subject: [PATCH 16/19] update --- pkg/server/internal/util/buffered_read_conn.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/server/internal/util/buffered_read_conn.go b/pkg/server/internal/util/buffered_read_conn.go index 390dc6cd0bb2c..3876054482124 100644 --- a/pkg/server/internal/util/buffered_read_conn.go +++ b/pkg/server/internal/util/buffered_read_conn.go @@ -51,7 +51,7 @@ func (conn BufferedReadConn) Peek(n int) ([]byte, error) { return conn.rb.Peek(n) } -// IsAlive use syscall to detect the connection is alive or not. +// IsAlive detects the connection is alive or not. // return value < 0, means unknow // return value = 0, means not alive // return value = 1, means still alive @@ -64,11 +64,10 @@ func (conn BufferedReadConn) IsAlive() int { } // nolint:errcheck defer conn.SetReadDeadline(time.Time{}) - b := make([]byte, 1) - b, err = conn.Peek(1) + _, err = conn.Peek(1) if err == io.EOF { return 0 - } else if ne, ok := err.(net.Error); ok && ne.Timeout() || len(b) != 0 { + } else if ne, ok := err.(net.Error); ok && ne.Timeout() { return 1 } } From 2e79985acf01d631a7172c258950fc87ba3e0fd6 Mon Sep 17 00:00:00 2001 From: mohangjie Date: Thu, 24 Apr 2025 10:41:12 +0800 Subject: [PATCH 17/19] add comments --- pkg/server/internal/util/buffered_read_conn.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/server/internal/util/buffered_read_conn.go b/pkg/server/internal/util/buffered_read_conn.go index 3876054482124..51df3965b388e 100644 --- a/pkg/server/internal/util/buffered_read_conn.go +++ b/pkg/server/internal/util/buffered_read_conn.go @@ -64,6 +64,10 @@ func (conn BufferedReadConn) IsAlive() int { } // nolint:errcheck defer conn.SetReadDeadline(time.Time{}) + // From the TCP level, the return of `Peek` results + // does not guarantee that the data link will survive. + // From the MySQL protocol, the client should not send + // new data to the server while it is processing the SQL. _, err = conn.Peek(1) if err == io.EOF { return 0 From 977d4ed0af983f3e6ab29bfa3b43540f45e5538f Mon Sep 17 00:00:00 2001 From: mohangjie Date: Thu, 24 Apr 2025 10:57:30 +0800 Subject: [PATCH 18/19] improve comments --- pkg/server/internal/util/buffered_read_conn.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/pkg/server/internal/util/buffered_read_conn.go b/pkg/server/internal/util/buffered_read_conn.go index 51df3965b388e..c1d2840e56fcc 100644 --- a/pkg/server/internal/util/buffered_read_conn.go +++ b/pkg/server/internal/util/buffered_read_conn.go @@ -64,10 +64,13 @@ func (conn BufferedReadConn) IsAlive() int { } // nolint:errcheck defer conn.SetReadDeadline(time.Time{}) - // From the TCP level, the return of `Peek` results - // does not guarantee that the data link will survive. - // From the MySQL protocol, the client should not send - // new data to the server while it is processing the SQL. + // At the TCP level, a successful `Peek` operation doesn't guarantee + // the connection remains active. However, in the MySQL protocol, + // clients shouldn't send new data while the server is processing SQL. + // Therefore, we can safely assume `Peek` won't intercept any data + // during this period. Even if `Peek` does capture data, it only means + // the liveness check might be inaccurate - this won't impact the + // actual connection state or its operations. _, err = conn.Peek(1) if err == io.EOF { return 0 From 4f6c85b939718ea0eaace0943f850a3f2a85d398 Mon Sep 17 00:00:00 2001 From: mohangjie Date: Wed, 7 May 2025 10:42:44 +0800 Subject: [PATCH 19/19] follow comments --- pkg/server/internal/util/buffered_read_conn.go | 2 ++ pkg/util/sqlkiller/sqlkiller.go | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/server/internal/util/buffered_read_conn.go b/pkg/server/internal/util/buffered_read_conn.go index c1d2840e56fcc..e425bdb1f5b8c 100644 --- a/pkg/server/internal/util/buffered_read_conn.go +++ b/pkg/server/internal/util/buffered_read_conn.go @@ -29,6 +29,8 @@ const DefaultReaderSize = 16 * 1024 type BufferedReadConn struct { net.Conn rb *bufio.Reader + // `mu` is for `IsAlive()` function. + // We use this to ensure that `SetReadDeadline` is not called concurrently. mu *sync.Mutex } diff --git a/pkg/util/sqlkiller/sqlkiller.go b/pkg/util/sqlkiller/sqlkiller.go index 36da73de52a4f..c674a1c2f02f9 100644 --- a/pkg/util/sqlkiller/sqlkiller.go +++ b/pkg/util/sqlkiller/sqlkiller.go @@ -140,7 +140,7 @@ func (killer *SQLKiller) HandleSignal() error { } if lastCheckTime == nil { killer.lastCheckTime.Store(&now) - } else if time.Since(*lastCheckTime) > checkConnectionAliveDur { + } else if now.Sub(*lastCheckTime) > checkConnectionAliveDur { killer.lastCheckTime.Store(&now) if !(*fn)() { atomic.CompareAndSwapUint32(&killer.Signal, 0, QueryInterrupted)