Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -7766,13 +7766,13 @@ def go_deps():
build_tags = ["nextgen", "intest"],
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sha256 = "daff9dec412c452d98f2adf2e2c4ac5132e93acd0f773612c69681cd6f76b4d4",
strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20260326064539-728676760deb",
sha256 = "06b88aebfa4f416d2fd77edb70d51f8c590f61f71be6cbfb328e525e9907d027",
strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20260330060945-282ada62b856",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260326064539-728676760deb.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260326064539-728676760deb.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260326064539-728676760deb.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260326064539-728676760deb.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260330060945-282ada62b856.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260330060945-282ada62b856.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260330060945-282ada62b856.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260330060945-282ada62b856.zip",
],
)
go_repository(
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ require (
github.com/stathat/consistent v1.0.0
github.com/stretchr/testify v1.11.1
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.8-0.20260326064539-728676760deb
github.com/tikv/client-go/v2 v2.0.8-0.20260330060945-282ada62b856
github.com/tikv/pd/client v0.0.0-20260323032024-d7b638033a14
github.com/timakin/bodyclose v0.0.0-20241222091800-1db5c5ca4d67
github.com/twmb/murmur3 v1.1.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -895,8 +895,8 @@ github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY=
github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28=
github.com/tikv/client-go/v2 v2.0.8-0.20260326064539-728676760deb h1:ZVxuRRtpKp76G4zNePQ9vLV2UwEf8nJjr6Z2kAHTbmg=
github.com/tikv/client-go/v2 v2.0.8-0.20260326064539-728676760deb/go.mod h1:2itxkVYz1s+dKsfJZqKcFD2MGUoDxvyy/Yqjd7vkdjg=
github.com/tikv/client-go/v2 v2.0.8-0.20260330060945-282ada62b856 h1:CJ2alESXl+dW4hQwO1vY5ingIDr6dGRB+8WodZeh24g=
github.com/tikv/client-go/v2 v2.0.8-0.20260330060945-282ada62b856/go.mod h1:2itxkVYz1s+dKsfJZqKcFD2MGUoDxvyy/Yqjd7vkdjg=
github.com/tikv/pd/client v0.0.0-20260323032024-d7b638033a14 h1:TVSx20m6DZMSiI37Dduu9RZb8yUvT1sgW8kCLAe+T5U=
github.com/tikv/pd/client v0.0.0-20260323032024-d7b638033a14/go.mod h1:4kxXuAQAREpH+lVbydVwGNNDmcwdj0RG4Ofwky08W/k=
github.com/timakin/bodyclose v0.0.0-20241222091800-1db5c5ca4d67 h1:9LPGD+jzxMlnk5r6+hJnar67cgpDIz/iyD+rfl5r2Vk=
Expand Down
11 changes: 8 additions & 3 deletions pkg/distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,9 +643,14 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr
}
}

if ruDetailsRaw := ctx.Value(clientutil.RUDetailsCtxKey); ruDetailsRaw != nil && r.storeType == kv.TiFlash {
if err = execdetails.MergeTiFlashRUConsumption(r.selectResp.GetExecutionSummaries(), ruDetailsRaw.(*clientutil.RUDetails)); err != nil {
return err
if r.storeType == kv.TiFlash {
ruv2Metrics := execdetails.RUV2MetricsFromContext(ctx)
if ruv2Metrics == nil || !ruv2Metrics.Bypass() {
if ruDetailsRaw := ctx.Value(clientutil.RUDetailsCtxKey); ruDetailsRaw != nil {
if err = execdetails.MergeTiFlashRUConsumption(r.selectResp.GetExecutionSummaries(), ruDetailsRaw.(*clientutil.RUDetails)); err != nil {
return err
}
}
}
}
if copStats.TimeDetail.ProcessTime > 0 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1708,7 +1708,7 @@ func recordInsertRows2Metrics(sessVars *variable.SessionVars) {

func (a *ExecStmt) finalizeStatementRUV2Metrics() {
sessVars := a.Ctx.GetSessionVars()
if sessVars.RUV2Metrics == nil {
if sessVars.RUV2Metrics == nil || sessVars.RUV2Metrics.Bypass() {
return
}

Expand Down
36 changes: 36 additions & 0 deletions pkg/executor/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,42 @@ func TestFinishExecuteStmtReportsTiDBRUV2WithoutSyncingRUDetails(t *testing.T) {
"update `stmt_summary_retry`%",
).Check(testkit.Rows("0 0"))
})

t.Run("bypass ru skips final reporting", func(t *testing.T) {
reporter := &mockRUV2ConsumptionReporter{}
ctx := &mockRUV2ReportingContext{
Context: mock.NewContext(),
reporter: reporter,
}
sessVars := ctx.GetSessionVars()
sessVars.StartTime = time.Now()
sessVars.StmtCtx.StmtType = "Select"
sessVars.StmtCtx.OriginalSQL = "select 1"
sessVars.StmtCtx.ResetSQLDigest(sessVars.StmtCtx.OriginalSQL)
sessVars.StmtCtx.ResourceGroupName = "rg1"

goCtx := execdetails.ContextWithInitializedExecDetails(context.Background())
sessVars.RUV2Metrics = execdetails.RUV2MetricsFromContext(goCtx)
require.NotNil(t, sessVars.RUV2Metrics)
sessVars.RUV2Metrics.SetBypass(true)
sessVars.RUV2Metrics.AddResultChunkCells(100)

ruDetails := goCtx.Value(util.RUDetailsCtxKey).(*util.RUDetails)
ruDetails.AddTiKVRUV2(12345)
ruDetails.UpdateTiFlash(&rmpb.Consumption{RRU: 10, WRU: 20})

execStmt := &executor.ExecStmt{
Ctx: ctx,
GoCtx: goCtx,
StmtNode: &ast.SelectStmt{},
}
execStmt.FinishExecuteStmt(0, nil, false)

require.Empty(t, reporter.group)
require.Zero(t, reporter.tikvRUV2)
require.Zero(t, reporter.tidbRUV2)
require.Zero(t, reporter.tiflashRU)
})
}

func TestSlowLogMaxPerSec(t *testing.T) {
Expand Down
9 changes: 6 additions & 3 deletions pkg/executor/internal/mpp/local_mpp_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,9 +784,12 @@ func (c *localMppCoordinator) handleAllReports() error {
RecordOneCopTask(-1, kv.TiFlash, detail)] = 0
}
}
if ruDetailsRaw := c.ctx.Value(clientutil.RUDetailsCtxKey); ruDetailsRaw != nil {
if err := execdetails.MergeTiFlashRUConsumption(report.executionSummaries, ruDetailsRaw.(*clientutil.RUDetails)); err != nil {
return err
ruv2Metrics := execdetails.RUV2MetricsFromContext(c.ctx)
if ruv2Metrics == nil || !ruv2Metrics.Bypass() {
if ruDetailsRaw := c.ctx.Value(clientutil.RUDetailsCtxKey); ruDetailsRaw != nil {
if err := execdetails.MergeTiFlashRUConsumption(report.executionSummaries, ruDetailsRaw.(*clientutil.RUDetails)); err != nil {
return err
}
}
}
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/server/conn_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,22 @@ func TestCursorWithParams(t *testing.T) {
require.Equal(t, float64(13), reporter.tiflashRU)
})

t.Run("cursor ruv2 bypass skips tracker creation", func(t *testing.T) {
reporter := &mockCursorRUV2ConsumptionReporter{}
goCtx := execdetails.ContextWithInitializedExecDetails(context.Background())
ruv2Metrics := execdetails.RUV2MetricsFromContext(goCtx)
require.NotNil(t, ruv2Metrics)
ruv2Metrics.SetBypass(true)
ruDetails := goCtx.Value(clientutil.RUDetailsCtxKey).(*clientutil.RUDetails)
ruDetails.AddTiKVRUV2(11)
tracker := resultset.NewCursorRUV2Tracker(reporter, "rg1", ruv2Metrics, ruDetails, execdetails.RUV2Weights{})
require.Nil(t, tracker)
require.Empty(t, reporter.group)
require.Zero(t, reporter.tikvRUV2)
require.Zero(t, reporter.tidbRUV2)
require.Zero(t, reporter.tiflashRU)
})

t.Run("write chunks skips column access on first next error", func(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
srv := CreateMockServer(t, store)
Expand Down
3 changes: 3 additions & 0 deletions pkg/server/internal/resultset/resultset.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ func NewCursorRUV2Tracker(
if metrics == nil && ruDetails == nil {
return nil
}
if metrics != nil && metrics.Bypass() {
return nil
}
tracker := &CursorRUV2Tracker{
reporter: reporter,
resourceGroupName: resourceGroupName,
Expand Down
1 change: 1 addition & 0 deletions pkg/session/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ go_test(
"//pkg/util",
"//pkg/util/benchdaily",
"//pkg/util/chunk",
"//pkg/util/execdetails",
"//pkg/util/logutil",
"//pkg/util/sqlexec",
"@com_github_pingcap_errors//:errors",
Expand Down
92 changes: 68 additions & 24 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1127,15 +1127,19 @@ func (*session) isTxnRetryableError(err error) bool {
}

func isEndTxnStmt(stmt ast.StmtNode, vars *variable.SessionVars) (bool, error) {
switch n := stmt.(type) {
resolvedStmt, err := resolvePreparedStmt(stmt, vars)
if err != nil {
return false, err
}
if resolvedStmt == nil {
return false, nil
}
if resolvedStmt != stmt {
return isEndTxnStmt(resolvedStmt, vars)
}
switch resolvedStmt.(type) {
case *ast.RollbackStmt, *ast.CommitStmt:
return true, nil
case *ast.ExecuteStmt:
ps, err := plannercore.GetPreparedStmt(n, vars)
if err != nil {
return false, err
}
return isEndTxnStmt(ps.PreparedAst.Stmt, vars)
}
return false, nil
}
Expand Down Expand Up @@ -2228,6 +2232,7 @@ func (s *session) useCurrentSession(execOption sqlexec.ExecOption) (*session, fu
prevSQL := s.sessionVars.StmtCtx.OriginalSQL
prevStmtType := s.sessionVars.StmtCtx.StmtType
prevTables := s.sessionVars.StmtCtx.Tables
prevRUV2Metrics := s.sessionVars.RUV2Metrics
return s, func() {
s.sessionVars.AnalyzeVersion = prevStatsVer
s.sessionVars.EnableAnalyzeSnapshot = prevAnalyzeSnapshot
Expand All @@ -2240,6 +2245,7 @@ func (s *session) useCurrentSession(execOption sqlexec.ExecOption) (*session, fu
s.sessionVars.StmtCtx.OriginalSQL = prevSQL
s.sessionVars.StmtCtx.StmtType = prevStmtType
s.sessionVars.StmtCtx.Tables = prevTables
s.sessionVars.RUV2Metrics = prevRUV2Metrics
s.sessionVars.MemTracker.Detach()
}, nil
}
Expand Down Expand Up @@ -2441,6 +2447,10 @@ func (s *session) executeStmtImpl(ctx context.Context, stmtNode ast.StmtNode) (s
ctx = context.WithValue(ctx, execdetails.RUV2MetricsCtxKey, ruv2Metrics)
}
sessVars.RUV2Metrics = ruv2Metrics
bypass := shouldBypass(ctx, stmtNode, sessVars)
if ruv2Metrics != nil {
ruv2Metrics.SetBypass(bypass)
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
if pending := sessVars.RUV2PendingSessionParserTotal.Swap(0); pending > 0 && ruv2Metrics != nil {
ruv2Metrics.AddSessionParserTotal(pending)
}
Expand Down Expand Up @@ -2489,11 +2499,8 @@ func (s *session) executeStmtImpl(ctx context.Context, stmtNode ast.StmtNode) (s
})

var stmtLabel string
if execStmt, ok := stmtNode.(*ast.ExecuteStmt); ok {
prepareStmt, err := plannercore.GetPreparedStmt(execStmt, s.sessionVars)
if err == nil && prepareStmt.PreparedAst != nil {
stmtLabel = stmtctx.GetStmtLabel(ctx, prepareStmt.PreparedAst.Stmt)
}
if resolvedStmt, err := resolvePreparedStmt(stmtNode, s.sessionVars); err == nil && resolvedStmt != nil {
stmtLabel = stmtctx.GetStmtLabel(ctx, resolvedStmt)
}
if stmtLabel == "" {
stmtLabel = stmtctx.GetStmtLabel(ctx, stmtNode)
Expand Down Expand Up @@ -2728,6 +2735,52 @@ func (s *session) executeStmtImpl(ctx context.Context, stmtNode ast.StmtNode) (s
return recordSet, nil
}

var isNextGenForRUV2 = kerneltype.IsNextGen

func resolvePreparedStmt(stmt ast.StmtNode, vars *variable.SessionVars) (ast.StmtNode, error) {
if stmt == nil {
return nil, nil
}
execStmt, ok := stmt.(*ast.ExecuteStmt)
if !ok {
return stmt, nil
}
if vars == nil {
return nil, nil
}
prepareStmt, err := plannercore.GetPreparedStmt(execStmt, vars)
if err != nil {
return nil, err
}
if prepareStmt == nil || prepareStmt.PreparedAst == nil {
return nil, nil
}
return prepareStmt.PreparedAst.Stmt, nil
}

func shouldBypass(ctx context.Context, stmtNode ast.StmtNode, sessVars *variable.SessionVars) bool {
switch kv.GetInternalSourceType(ctx) {
case kv.InternalTxnOthers:
return true
case kv.InternalTxnStats:
return isNextGenForRUV2() && isAnalyzeStatementForRUV2(stmtNode, sessVars)
default:
return false
}
}

func isAnalyzeStatementForRUV2(stmtNode ast.StmtNode, sessVars *variable.SessionVars) bool {
if stmtNode == nil || sessVars == nil {
return false
}
resolvedStmt, err := resolvePreparedStmt(stmtNode, sessVars)
if err != nil || resolvedStmt == nil {
return false
}
_, ok := resolvedStmt.(*ast.AnalyzeTableStmt)
return ok
}

func (s *session) GetSQLExecutor() sqlexec.SQLExecutor {
return s
}
Expand Down Expand Up @@ -4765,22 +4818,13 @@ func (s *session) shouldUsePessimisticAutoCommit(stmtNode ast.StmtNode) bool {
}

// isDMLStatement checks if the given statement should use pessimistic-auto-commit.
// It handles EXECUTE unwrapping and properly handles EXPLAIN statements by checking their inner statement.
// It unwraps EXECUTE statements and properly handles EXPLAIN statements by checking their inner statement.
func (s *session) isDMLStatement(stmtNode ast.StmtNode) bool {
if stmtNode == nil {
actualStmt, err := resolvePreparedStmt(stmtNode, s.GetSessionVars())
if err != nil || actualStmt == nil {
return false
}

// Handle EXECUTE statements - unwrap to get the actual prepared statement
actualStmt := stmtNode
if execStmt, ok := stmtNode.(*ast.ExecuteStmt); ok {
prepareStmt, err := plannercore.GetPreparedStmt(execStmt, s.GetSessionVars())
if err != nil || prepareStmt == nil {
return false
}
actualStmt = prepareStmt.PreparedAst.Stmt
}

// For EXPLAIN statements, check the underlying statement
// This ensures EXPLAIN shows the correct plan that would be used if the statement were executed
if explainStmt, ok := actualStmt.(*ast.ExplainStmt); ok {
Expand Down
Loading
Loading